Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions .github/workflows/unit_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,25 @@ jobs:
with:
fetch-depth: 0

- name: Setup Virtual Environment
run: |
python3.10 -m venv .venv
source .venv/bin/activate
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.10'

- name: Install Dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install numpy==1.26.4

- name: Run Unit Test Cases
run: python test_report.py
- name: Run tests with coverage
run: |
coverage run --source=src -m unittest discover -s tests/unit_tests/
coverage xml

- name: Check coverage
run: |
coverage report --fail-under=85

#- name: Run Coverage Report
# run: coverage run --source=src -m unittest discover -s tests/unit_tests
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ Follow the steps to install the node packages required for both building and run
# Installing requirements
pip install -r requirements.txt
```

NOTE: if you have problems building on a Mac, e.g. with uamqb, see here: https://github.com/Azure/azure-uamqp-python/issues/386

### How to Run the Server/APIs

1. The http server by default starts with `8000` port
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
fastapi~=0.111.1
pydantic==1.10.4
pydantic==1.10.16
html_testRunner==1.2.1
uvicorn==0.20.0
python-ms-core==0.0.22
tcat-gtfs-csv-validator~=0.0.38
gtfs-canonical-validator==0.0.5
75 changes: 75 additions & 0 deletions src/flex_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# in an effort to be more permissive of small errors, accept these which could conceivably be calculated/fixed/interpreted by common applications
CHANGE_ERROR_TO_WARNING = [
'block_trips_with_overlapping_stop_times',
'trip_distance_exceeds_shape_distance',
'decreasing_or_equal_stop_time_distance',
'decreasing_shape_distance',
'empty_file',
'equal_shape_distance_diff_coordinates',
'fare_transfer_rule_duration_limit_type_without_duration_limit',
'fare_transfer_rule_duration_limit_without_type',
'fare_transfer_rule_invalid_transfer_count',
'fare_transfer_rule_missing_transfer_count',
'fare_transfer_rule_with_forbidden_transfer_count',
'forbidden_shape_dist_traveled',
'invalid_currency',
'invalid_currency_amount',
'invalid_url',
'location_with_unexpected_stop_time',
'missing_trip_edge',
'new_line_in_value',
'point_near_origin',
'point_near_pole',
'route_both_short_and_long_name_missing',
'route_networks_specified_in_more_than_one_file',
'start_and_end_range_equal',
'start_and_end_range_out_of_order',
'station_with_parent_station',
'stop_time_timepoint_without_times',
'stop_time_with_arrival_before_previous_departure_time',
'stop_time_with_only_arrival_or_departure_time',
'stop_without_location',
'timeframe_only_start_or_end_time_specified',
'timeframe_overlap',
'timeframe_start_or_end_time_greater_than_twenty_four_hours',
'u_r_i_syntax_error'
]

FLEX_FATAL_ERROR_CODES = [
'missing_required_element',
'unsupported_feature_type',
'unsupported_geo_json_type',
'unsupported_geometry_type',
'invalid_geometry',
'forbidden_prior_day_booking_field_value',
'forbidden_prior_notice_start_day',
'forbidden_prior_notice_start_time',
'forbidden_real_time_booking_field_value',
'forbidden_same_day_booking_field_value',
'invalid_prior_notice_duration_min',
'missing_prior_day_booking_field_value',
'missing_prior_notice_duration_min',
'missing_prior_notice_start_time',
'prior_notice_last_day_after_start_day'
]

FLEX_FIELDS = {
'stop_times.txt': [
'start_pickup_dropoff_window',
'end_pickup_dropoff_window',
'pickup_booking_rule_id',
'drop_off_booking_rule_id',
'mean_duration_factor',
'mean_duration_offset',
'safe_duration_factor',
'safe_duration_offset'
]
}

FLEX_FILES = [
'locations.geojson',
'booking_rules.txt',
'location_groups.txt',
'location_group_stops.txt'

]
79 changes: 58 additions & 21 deletions src/gtfs_flex_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
from pathlib import Path
from typing import Union, Any
from .config import Settings

from tcat_gtfs_csv_validator import gcv_test_release
from tcat_gtfs_csv_validator import exceptions as gcvex
from gtfs_canonical_validator import CanonicalValidator
from .flex_config import CHANGE_ERROR_TO_WARNING, FLEX_FATAL_ERROR_CODES, FLEX_FIELDS, FLEX_FILES

ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
# Path used for download file generation.
Expand All @@ -17,18 +16,19 @@
logger = logging.getLogger('FLEX_VALIDATION')
logger.setLevel(logging.INFO)

DATA_TYPE = 'gtfs_flex'
SCHEMA_VERSION = 'v2.0'


class GTFSFlexValidation:
def __init__(self, file_path=None, storage_client=None):
def __init__(self, file_path=None, storage_client=None, prefix=None):
self.settings = Settings()
self.container_name = self.settings.storage_container_name
self.storage_client = storage_client
self.file_path = file_path
self.file_relative_path = file_path.split('/')[-1]
self.client = self.storage_client.get_container(container_name=self.container_name)
if prefix:
self.prefix = prefix
else:
self.prefix = self.settings.get_unique_id()

# Facade function to validate the file
# Focuses on the file name with file name validation
Expand All @@ -46,31 +46,69 @@ def is_gtfs_flex_valid(self) -> tuple[Union[bool, Any], Union[str, Any]]:
if ext and ext.lower() == '.zip':
downloaded_file_path = self.download_single_file(self.file_path)
logger.info(f' Downloaded file path: {downloaded_file_path}')
try:
gcv_test_release.test_release(DATA_TYPE, SCHEMA_VERSION, downloaded_file_path)
is_valid = True
except Exception as err:
traceback.print_exc()
validation_message = str(err)
logger.error(f' Error While Validating File: {str(err)}')
flex_validator = CanonicalValidator(zip_file=downloaded_file_path)
result = flex_validator.validate()

is_valid = result.status
if isinstance(result.error, list) and result.error is not None:
for error in result.error[:]:
# change some smaller errors to warnings instead to relax the strict validation MD gives us
if error['code'] in CHANGE_ERROR_TO_WARNING:
if result.info is None: result.info = []
result.info.append(error)
result.error.remove(error)
continue

# these are error codes from MD that relate to pathways that are fatal
if error['code'] in FLEX_FATAL_ERROR_CODES:
is_valid = False
continue

# some of the notices relate to pathways, but there's no way to tell except with this logic:
for notice in error['sampleNotices']:
# one of the fields in a given file is a pathway-spec field--if it's flagged, fail
if "fieldName" in notice and "filename" in notice:
if notice['filename'] in FLEX_FIELDS and \
notice['fieldName'] in FLEX_FIELDS[notice['filename']]:
is_valid = False
continue

# one of the pathways spec'd files has an error--if so, fail
if "filename" in notice:
if notice['filename'] in FLEX_FILES:
is_valid = False
continue

# similar to the above, but the field for the filename is parent/child
if "childFilename" in notice:
if notice['childFilename'] in FLEX_FILES:
is_valid = False
continue

# if all errors have been downgraded to warnings, mark us as a success
if len(result.error) == 0:
is_valid = True

if result.error is not None:
validation_message = str(result.error)
logger.error(f' Error While Validating File: {str(result.error)}')

GTFSFlexValidation.clean_up(downloaded_file_path)
else:
logger.error(f' Failed to validate because unknown file format')

return is_valid, validation_message

# Downloads the file to local folder of the server
# file_upload_path is the fullUrl of where the
# file_upload_path is the fullUrl of where the
# file is uploaded.
def download_single_file(self, file_upload_path=None) -> str:
is_exists = os.path.exists(DOWNLOAD_FILE_PATH)
if not is_exists:
os.makedirs(DOWNLOAD_FILE_PATH)

unique_folder = self.settings.get_unique_id()
unique_folder = self.prefix
dl_folder_path = os.path.join(DOWNLOAD_FILE_PATH, unique_folder)

# Ensure the unique folder path is created
os.makedirs(dl_folder_path, exist_ok=True)

file = self.storage_client.get_file_from_url(self.container_name, file_upload_path)
Expand All @@ -95,6 +133,5 @@ def clean_up(path):
logger.info(f' Removing File: {path}')
os.remove(path)
else:
folder = os.path.join(DOWNLOAD_FILE_PATH, path)
logger.info(f' Removing Folder: {folder}')
shutil.rmtree(folder, ignore_errors=False)
logger.info(f' Removing Folder: {path}')
shutil.rmtree(path, ignore_errors=False)
57 changes: 33 additions & 24 deletions src/gtfx_flex_validator.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import os
import gc
import logging
import uuid
import threading
import urllib.parse
from python_ms_core import Core
from python_ms_core.core.queue.models.queue_message import QueueMessage
from pathlib import Path
from .config import Settings
from .gtfs_flex_validation import GTFSFlexValidation
from .serializer.gtfx_flex_serializer import GTFSFlexUpload
from python_ms_core import Core
from .models.file_upload_msg import FileUploadMsg
import threading
import time
from .gtfs_flex_validation import GTFSFlexValidation
from python_ms_core.core.queue.models.queue_message import QueueMessage

logging.basicConfig()
logger = logging.getLogger('FLEX_VALIDATOR')
logger.setLevel(logging.INFO)

DOWNLOAD_FILE_PATH = f'{Path.cwd()}/downloads'

class GTFSFlexValidator:
_settings = Settings()
Expand All @@ -22,8 +23,8 @@ def __init__(self):
self.core = Core()
self.settings = Settings()
self._subscription_name = self.settings.request_subscription
self.request_topic = self.core.get_topic(topic_name=self.settings.request_topic_name,max_concurrent_messages=self.settings.max_concurrent_messages)
# self.response_topic = self.core.get_topic(topic_name=self.settings.response_topic_name)
self.request_topic = self.core.get_topic(topic_name=self.settings.request_topic_name,
max_concurrent_messages=self.settings.max_concurrent_messages)
self.logger = self.core.get_logger()
self.storage_client = self.core.get_storage_client()
self.listening_thread = threading.Thread(target=self.subscribe)
Expand All @@ -41,37 +42,45 @@ def process(message) -> None:
logger.info(' No Message')

self.request_topic.subscribe(subscription=self._subscription_name, callback=process)

def process_message(self, upload_msg: FileUploadMsg) -> None:

def process_message(self, upload_msg: FileUploadMsg) -> None:
try:
file_upload_path = urllib.parse.unquote(upload_msg.data.file_upload_path)
logger.info(f' Received message for Project Group: {upload_msg.data.tdei_project_group_id}')
logger.info(f' Message ID: {upload_msg.messageId}')
logger.info(file_upload_path)
if file_upload_path:
# Do the validation in the other class
validator = GTFSFlexValidation(file_path=file_upload_path, storage_client=self.storage_client)
validator = GTFSFlexValidation(file_path=file_upload_path, storage_client=self.storage_client, prefix=upload_msg.messageId)
validation = validator.validate()
self.send_status(valid=validation[0], upload_message=upload_msg,
validation_message=validation[1])
self.send_status(
valid=validation[0],
upload_message=upload_msg,
validation_message=validation[1]
)
else:
logger.info(' No file Path found in message!')
except Exception as e:
logger.error(f' Error processing message: {e}')
self.send_status(valid=False, upload_message=upload_msg, validation_message=str(e))
finally:
folder_to_delete = os.path.join(DOWNLOAD_FILE_PATH, upload_msg.messageId)
GTFSFlexValidation.clean_up(folder_to_delete)
gc.collect()


def send_status(self, valid: bool, upload_message: FileUploadMsg, validation_message: str = '') -> None:
response_message = {
"file_upload_path": upload_message.data.file_upload_path,
"user_id": upload_message.data.user_id ,
"tdei_project_group_id": upload_message.data.tdei_project_group_id,
"success": valid,
"message": validation_message
}
logger.info(f' Publishing new message with ID: {upload_message.messageId} with status: {valid} and Message: {validation_message}')
'file_upload_path': upload_message.data.file_upload_path,
'user_id': upload_message.data.user_id,
'tdei_project_group_id': upload_message.data.tdei_project_group_id,
'success': valid,
'message': validation_message
}
logger.info(
f' Publishing new message with ID: {upload_message.messageId} with status: {valid} and Message: {validation_message}')
data = QueueMessage.data_from({
'messageId': upload_message.messageId,
'message': 'Validation complete',
'message': 'Validation complete',
'messageType': upload_message.messageType,
'data': response_message
})
Expand All @@ -85,4 +94,4 @@ def send_response(self, data: QueueMessage) -> None:
logger.error(f'Error sending response: {e}')

def stop_listening(self):
self.listening_thread.join(timeout=0)
self.listening_thread.join(timeout=0)
2 changes: 1 addition & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async def startup_event(settings: Settings = Depends(get_settings)) -> None:
@app.on_event('shutdown')
async def shutdown_event():
if app.flex_validator:
app.flex_validator.shutdown()
app.flex_validator.stop_listening()


@app.get('/', status_code=status.HTTP_200_OK)
Expand Down
Loading
Loading