Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ pydantic==1.10.4
html_testRunner==1.2.1
uvicorn==0.20.0
python-ms-core==0.0.22
tcat-gtfs-csv-validator~=0.0.40
gtfs-canonical-validator==0.0.5
34 changes: 16 additions & 18 deletions src/gtfs_flex_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
from pathlib import Path
from typing import Union, Any
from .config import Settings

from tcat_gtfs_csv_validator import gcv_test_release
from tcat_gtfs_csv_validator import exceptions as gcvex
from gtfs_canonical_validator import CanonicalValidator

ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
# Path used for download file generation.
Expand All @@ -17,18 +15,19 @@
logger = logging.getLogger('FLEX_VALIDATION')
logger.setLevel(logging.INFO)

DATA_TYPE = 'gtfs_flex'
SCHEMA_VERSION = 'v2.0'


class GTFSFlexValidation:
def __init__(self, file_path=None, storage_client=None):
def __init__(self, file_path=None, storage_client=None, prefix=None):
self.settings = Settings()
self.container_name = self.settings.storage_container_name
self.storage_client = storage_client
self.file_path = file_path
self.file_relative_path = file_path.split('/')[-1]
self.client = self.storage_client.get_container(container_name=self.container_name)
if prefix:
self.prefix = prefix
else:
self.prefix = self.settings.get_unique_id()

# Facade function to validate the file
# Focuses on the file name with file name validation
Expand All @@ -46,13 +45,13 @@ def is_gtfs_flex_valid(self) -> tuple[Union[bool, Any], Union[str, Any]]:
if ext and ext.lower() == '.zip':
downloaded_file_path = self.download_single_file(self.file_path)
logger.info(f' Downloaded file path: {downloaded_file_path}')
try:
gcv_test_release.test_release(DATA_TYPE, SCHEMA_VERSION, downloaded_file_path)
is_valid = True
except Exception as err:
traceback.print_exc()
validation_message = str(err)
logger.error(f' Error While Validating File: {str(err)}')
flex_validator = CanonicalValidator(zip_file=downloaded_file_path)
result = flex_validator.validate()
is_valid = result.status

if result.error is not None:
validation_message = str(result.error)
logger.error(f' Error While Validating File: {str(result.error)}')
GTFSFlexValidation.clean_up(downloaded_file_path)
else:
logger.error(f' Failed to validate because unknown file format')
Expand All @@ -67,7 +66,7 @@ def download_single_file(self, file_upload_path=None) -> str:
if not is_exists:
os.makedirs(DOWNLOAD_FILE_PATH)

unique_folder = self.settings.get_unique_id()
unique_folder = self.prefix
dl_folder_path = os.path.join(DOWNLOAD_FILE_PATH, unique_folder)

# Ensure the unique folder path is created
Expand Down Expand Up @@ -95,6 +94,5 @@ def clean_up(path):
logger.info(f' Removing File: {path}')
os.remove(path)
else:
folder = os.path.join(DOWNLOAD_FILE_PATH, path)
logger.info(f' Removing Folder: {folder}')
shutil.rmtree(folder, ignore_errors=False)
logger.info(f' Removing Folder: {path}')
shutil.rmtree(path, ignore_errors=False)
57 changes: 33 additions & 24 deletions src/gtfx_flex_validator.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import os
import gc
import logging
import uuid
import threading
import urllib.parse
from python_ms_core import Core
from python_ms_core.core.queue.models.queue_message import QueueMessage
from pathlib import Path
from .config import Settings
from .gtfs_flex_validation import GTFSFlexValidation
from .serializer.gtfx_flex_serializer import GTFSFlexUpload
from python_ms_core import Core
from .models.file_upload_msg import FileUploadMsg
import threading
import time
from .gtfs_flex_validation import GTFSFlexValidation
from python_ms_core.core.queue.models.queue_message import QueueMessage

logging.basicConfig()
logger = logging.getLogger('FLEX_VALIDATOR')
logger.setLevel(logging.INFO)

DOWNLOAD_FILE_PATH = f'{Path.cwd()}/downloads'

class GTFSFlexValidator:
_settings = Settings()
Expand All @@ -22,8 +23,8 @@ def __init__(self):
self.core = Core()
self.settings = Settings()
self._subscription_name = self.settings.request_subscription
self.request_topic = self.core.get_topic(topic_name=self.settings.request_topic_name,max_concurrent_messages=self.settings.max_concurrent_messages)
# self.response_topic = self.core.get_topic(topic_name=self.settings.response_topic_name)
self.request_topic = self.core.get_topic(topic_name=self.settings.request_topic_name,
max_concurrent_messages=self.settings.max_concurrent_messages)
self.logger = self.core.get_logger()
self.storage_client = self.core.get_storage_client()
self.listening_thread = threading.Thread(target=self.subscribe)
Expand All @@ -41,37 +42,45 @@ def process(message) -> None:
logger.info(' No Message')

self.request_topic.subscribe(subscription=self._subscription_name, callback=process)

def process_message(self, upload_msg: FileUploadMsg) -> None:

def process_message(self, upload_msg: FileUploadMsg) -> None:
try:
file_upload_path = urllib.parse.unquote(upload_msg.data.file_upload_path)
logger.info(f' Received message for Project Group: {upload_msg.data.tdei_project_group_id}')
logger.info(f' Message ID: {upload_msg.messageId}')
logger.info(file_upload_path)
if file_upload_path:
# Do the validation in the other class
validator = GTFSFlexValidation(file_path=file_upload_path, storage_client=self.storage_client)
validator = GTFSFlexValidation(file_path=file_upload_path, storage_client=self.storage_client, prefix=upload_msg.messageId)
validation = validator.validate()
self.send_status(valid=validation[0], upload_message=upload_msg,
validation_message=validation[1])
self.send_status(
valid=validation[0],
upload_message=upload_msg,
validation_message=validation[1]
)
else:
logger.info(' No file Path found in message!')
except Exception as e:
logger.error(f' Error processing message: {e}')
self.send_status(valid=False, upload_message=upload_msg, validation_message=str(e))
finally:
folder_to_delete = os.path.join(DOWNLOAD_FILE_PATH, upload_msg.messageId)
GTFSFlexValidation.clean_up(folder_to_delete)
gc.collect()


def send_status(self, valid: bool, upload_message: FileUploadMsg, validation_message: str = '') -> None:
response_message = {
"file_upload_path": upload_message.data.file_upload_path,
"user_id": upload_message.data.user_id ,
"tdei_project_group_id": upload_message.data.tdei_project_group_id,
"success": valid,
"message": validation_message
}
logger.info(f' Publishing new message with ID: {upload_message.messageId} with status: {valid} and Message: {validation_message}')
'file_upload_path': upload_message.data.file_upload_path,
'user_id': upload_message.data.user_id,
'tdei_project_group_id': upload_message.data.tdei_project_group_id,
'success': valid,
'message': validation_message
}
logger.info(
f' Publishing new message with ID: {upload_message.messageId} with status: {valid} and Message: {validation_message}')
data = QueueMessage.data_from({
'messageId': upload_message.messageId,
'message': 'Validation complete',
'message': 'Validation complete',
'messageType': upload_message.messageType,
'data': response_message
})
Expand All @@ -85,4 +94,4 @@ def send_response(self, data: QueueMessage) -> None:
logger.error(f'Error sending response: {e}')

def stop_listening(self):
self.listening_thread.join(timeout=0)
self.listening_thread.join(timeout=0)
6 changes: 3 additions & 3 deletions tests/unit_tests/test_file_upload_msg.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ def setUp(self):
self.upload = FileUploadMsg.from_dict(data=data)

def test_message_type(self):
self.assertEqual(self.upload.messageType, "workflow_identifier")
self.assertEqual(self.upload.messageType, 'workflow_identifier')

def test_message_id(self):
self.upload.messageId = "abc"
self.assertEqual(self.upload.messageId, "abc")
self.upload.messageId = 'abc'
self.assertEqual(self.upload.messageId, 'abc')

def test_file_upload_path(self):
self.assertEqual(self.upload.data.file_upload_path,
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
53 changes: 11 additions & 42 deletions tests/unit_tests/test_gtfs_flex_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,10 @@
DOWNLOAD_FILE_PATH = f'{Path.cwd()}/downloads'
SAVED_FILE_PATH = f'{Path.cwd()}/tests/unit_tests/test_files'

SUCCESS_FILE_NAME = 'success_1_all_attrs.zip'
MAC_SUCCESS_FILE_NAME = 'success_2_mac_issue.zip'
SUCCESS_FILE_NAME = 'browncounty-mn-us--flex-v2.zip'
MAC_SUCCESS_FILE_NAME = 'otterexpress-mn-us--flex-v2.zip'
FAILURE_FILE_NAME = 'fail_schema_1.zip'

DATA_TYPE = 'gtfs_flex'
SCHEMA_VERSION = 'v2.0'


class TestSuccessWithMacOSFile(unittest.TestCase):
@patch.object(GTFSFlexValidation, 'download_single_file')
Expand All @@ -35,10 +32,11 @@ def setUp(self, mock_download_single_file):
self.validator.file_relative_path = MAC_SUCCESS_FILE_NAME
self.validator.container_name = None
self.validator.settings = Settings()
self.validator.prefix = self.validator.settings.get_unique_id()
mock_download_single_file.return_value = file_path

def tearDown(self):
pass
GTFSFlexValidation.clean_up(os.path.join(DOWNLOAD_FILE_PATH, self.validator.prefix))

def test_validate_with_valid_file(self):
# Arrange
Expand Down Expand Up @@ -70,15 +68,17 @@ def setUp(self, mock_download_single_file):
os.makedirs(dl_folder_path, exist_ok=True) # Ensure this directory is created in the test

with patch.object(GTFSFlexValidation, '__init__', return_value=None):
self.validator = GTFSFlexValidation(file_path=file_path, storage_client=MagicMock())
self.validator = GTFSFlexValidation(file_path=file_path, storage_client=MagicMock(),
prefix=Settings().get_unique_id())
self.validator.file_path = file_path
self.validator.file_relative_path = SUCCESS_FILE_NAME
self.validator.container_name = None
self.validator.settings = Settings()
self.validator.prefix = self.validator.settings.get_unique_id()
mock_download_single_file.return_value = os.path.join(dl_folder_path, SUCCESS_FILE_NAME)

def tearDown(self):
pass
GTFSFlexValidation.clean_up(os.path.join(DOWNLOAD_FILE_PATH, self.validator.prefix))

def test_validate_with_valid_file(self):
# Arrange
Expand Down Expand Up @@ -128,33 +128,6 @@ def test_download_single_file(self):
content = f.read()
self.assertEqual(content, b'file_content')

def test_download_multiple_file_with_same_name(self):
# Arrange
file_upload_path = DOWNLOAD_FILE_PATH
self.validator.storage_client = MagicMock()
self.validator.storage_client.get_file_from_url = MagicMock()
file = MagicMock()
file.file_path = 'text_file.txt'
file.get_stream = MagicMock(return_value=b'file_content')
self.validator.storage_client.get_file_from_url.return_value = file

# Act
first_downloaded_file_path = self.validator.download_single_file(file_upload_path=file_upload_path)
second_downloaded_file_path = self.validator.download_single_file(file_upload_path=file_upload_path)

# Assert
self.assertNotEqual(first_downloaded_file_path, second_downloaded_file_path,
"The downloaded file paths should be different for files with the same name.")

# Check if the get_file_from_url was called for both download attempts
self.assertEqual(self.validator.storage_client.get_file_from_url.call_count, 2,
"get_file_from_url should be called twice for two downloads.")
file.get_stream.assert_called()

# Additional assertions to verify that the paths indeed point to different locations
self.assertTrue(first_downloaded_file_path.startswith(DOWNLOAD_FILE_PATH))
self.assertTrue(second_downloaded_file_path.startswith(DOWNLOAD_FILE_PATH))

def test_clean_up_file(self):
# Arrange
file_upload_path = DOWNLOAD_FILE_PATH
Expand Down Expand Up @@ -203,10 +176,11 @@ def setUp(self, mock_download_single_file):
self.validator.file_relative_path = FAILURE_FILE_NAME
self.validator.container_name = None
self.validator.settings = MagicMock()
self.validator.prefix = Settings().get_unique_id()
mock_download_single_file.return_value = file_path

def tearDown(self):
pass
GTFSFlexValidation.clean_up(os.path.join(DOWNLOAD_FILE_PATH, self.validator.prefix))

def test_validate_with_invalid_file(self):
# Arrange
Expand Down Expand Up @@ -259,12 +233,7 @@ def test_download_single_file_exception(self):
file.get_stream = MagicMock(side_effect=FileNotFoundError("Mocked FileNotFoundError"))
self.validator.storage_client.get_file_from_url.return_value = file

# Create the mock folder that would be used
unique_id = "mocked-uuid"
self.validator.settings.get_unique_id = MagicMock()
self.validator.settings.get_unique_id.return_value = unique_id

dl_folder_path = os.path.join(DOWNLOAD_FILE_PATH, unique_id)
dl_folder_path = os.path.join(DOWNLOAD_FILE_PATH, self.validator.prefix)
os.makedirs(dl_folder_path, exist_ok=True)

# Act & Assert
Expand Down
Loading