diff --git a/README.md b/README.md index ae5210d..9974015 100644 --- a/README.md +++ b/README.md @@ -27,10 +27,13 @@ RESPONSE_TOPIC=xxxx REQUEST_SUBSCRIPTION=xxxx QUEUECONNECTION=xxxx STORAGECONNECTION=xxxx +MAX_CONCURRENT_MESSAGES=xx ``` The application connect with the `STORAGECONNECTION` string provided in `.env` file and validates downloaded zipfile using `tdei-gtfs-csv-validator` package. `QUEUECONNECTION` is not being used in this application but this is the main requirement for `python-ms-core` package +`MAX_CONCURRENT_MESSAGES` is the maximum number of concurrent messages that the service can handle. If not provided, defaults to 1 + ### How to Setup and Build Follow the steps to install the node packages required for both building and running the application. diff --git a/requirements.txt b/requirements.txt index ba74e61..48d9270 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,5 @@ fastapi~=0.111.1 pydantic==1.10.4 html_testRunner==1.2.1 uvicorn==0.20.0 -python-ms-core==0.0.19 +python-ms-core==0.0.22 tcat-gtfs-csv-validator~=0.0.38 \ No newline at end of file diff --git a/src/config.py b/src/config.py index a52beb6..3fa0540 100644 --- a/src/config.py +++ b/src/config.py @@ -1,4 +1,5 @@ import os +import uuid from dotenv import load_dotenv from pydantic import BaseSettings @@ -11,3 +12,7 @@ class Settings(BaseSettings): response_topic_name: str = os.environ.get('RESPONSE_TOPIC', None) request_subscription: str = os.environ.get('REQUEST_SUBSCRIPTION', None) storage_container_name: str = os.environ.get('CONTAINER_NAME', 'gtfsflex') + max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 2) + + def get_unique_id(self) -> str: + return str(uuid.uuid4()) diff --git a/src/gtfs_flex_validation.py b/src/gtfs_flex_validation.py index 57be9bb..2ed4057 100644 --- a/src/gtfs_flex_validation.py +++ b/src/gtfs_flex_validation.py @@ -23,8 +23,8 @@ class GTFSFlexValidation: def __init__(self, file_path=None, storage_client=None): - settings = Settings() - self.container_name = settings.storage_container_name + self.settings = Settings() + self.container_name = self.settings.storage_container_name self.storage_client = storage_client self.file_path = file_path self.file_relative_path = file_path.split('/')[-1] @@ -44,9 +44,9 @@ def is_gtfs_flex_valid(self) -> tuple[Union[bool, Any], Union[str, Any]]: validation_message = '' root, ext = os.path.splitext(self.file_relative_path) if ext and ext.lower() == '.zip': + downloaded_file_path = self.download_single_file(self.file_path) + logger.info(f' Downloaded file path: {downloaded_file_path}') try: - downloaded_file_path = self.download_single_file(self.file_path) - logger.info(f' Downloaded file path: {downloaded_file_path}') gcv_test_release.test_release(DATA_TYPE, SCHEMA_VERSION, downloaded_file_path) is_valid = True except Exception as err: @@ -67,17 +67,23 @@ def download_single_file(self, file_upload_path=None) -> str: if not is_exists: os.makedirs(DOWNLOAD_FILE_PATH) + unique_folder = self.settings.get_unique_id() + dl_folder_path = os.path.join(DOWNLOAD_FILE_PATH, unique_folder) + + # Ensure the unique folder path is created + os.makedirs(dl_folder_path, exist_ok=True) + file = self.storage_client.get_file_from_url(self.container_name, file_upload_path) try: if file.file_path: file_path = os.path.basename(file.file_path) - with open(f'{DOWNLOAD_FILE_PATH}/{file_path}', 'wb') as blob: + with open(f'{dl_folder_path}/{file_path}', 'wb') as blob: blob.write(file.get_stream()) - logger.info(f' File downloaded to location: {DOWNLOAD_FILE_PATH}/{file_path}') - return f'{DOWNLOAD_FILE_PATH}/{file_path}' + logger.info(f' File downloaded to location: {dl_folder_path}/{file_path}') + return f'{dl_folder_path}/{file_path}' else: logger.info(' File not found!') - raise Exception('File not found!') + raise Exception('File not found!') except Exception as e: traceback.print_exc() logger.error(e) diff --git a/src/gtfx_flex_validator.py b/src/gtfx_flex_validator.py index 135b1ed..2d77f48 100644 --- a/src/gtfx_flex_validator.py +++ b/src/gtfx_flex_validator.py @@ -8,6 +8,7 @@ from .serializer.gtfx_flex_serializer import GTFSFlexUpload from .models.file_upload_msg import FileUploadMsg import threading +import time logging.basicConfig() logger = logging.getLogger('FLEX_VALIDATOR') @@ -18,14 +19,15 @@ class GTFSFlexValidator: _settings = Settings() def __init__(self): - core = Core() - settings = Settings() - self._subscription_name = settings.request_subscription - self.request_topic = core.get_topic(topic_name=settings.request_topic_name) - self.response_topic = core.get_topic(topic_name=settings.response_topic_name) - self.logger = core.get_logger() - self.storage_client = core.get_storage_client() - self.subscribe() + self.core = Core() + self.settings = Settings() + self._subscription_name = self.settings.request_subscription + self.request_topic = self.core.get_topic(topic_name=self.settings.request_topic_name,max_concurrent_messages=self.settings.max_concurrent_messages) + # self.response_topic = self.core.get_topic(topic_name=self.settings.response_topic_name) + self.logger = self.core.get_logger() + self.storage_client = self.core.get_storage_client() + self.listening_thread = threading.Thread(target=self.subscribe) + self.listening_thread.start() def subscribe(self) -> None: # Process the incoming message @@ -34,41 +36,38 @@ def process(message) -> None: gtfs_upload_message = QueueMessage.to_dict(message) upload_msg = FileUploadMsg.from_dict(gtfs_upload_message) logger.info(upload_msg) - # upload_message = GTFSFlexUpload.data_from(gtfs_upload_message) - process_thread = threading.Thread(target=self.process_message,args=[upload_msg]) - process_thread.start() + self.process_message(upload_msg) else: logger.info(' No Message') self.request_topic.subscribe(subscription=self._subscription_name, callback=process) def process_message(self, upload_msg: FileUploadMsg) -> None: - file_upload_path = urllib.parse.unquote(upload_msg.data.file_upload_path) - logger.info(f' Received message for Project Group: {upload_msg.data.tdei_project_group_id}') - logger.info(file_upload_path) - if file_upload_path: - # Do the validation in the other class - validator = GTFSFlexValidation(file_path=file_upload_path, storage_client=self.storage_client) - validation = validator.validate() - self.send_status(valid=validation[0], upload_message=upload_msg, - validation_message=validation[1]) - else: - logger.info(' No file Path found in message!') + + try: + file_upload_path = urllib.parse.unquote(upload_msg.data.file_upload_path) + logger.info(f' Received message for Project Group: {upload_msg.data.tdei_project_group_id}') + logger.info(file_upload_path) + if file_upload_path: + # Do the validation in the other class + validator = GTFSFlexValidation(file_path=file_upload_path, storage_client=self.storage_client) + validation = validator.validate() + self.send_status(valid=validation[0], upload_message=upload_msg, + validation_message=validation[1]) + else: + logger.info(' No file Path found in message!') + except Exception as e: + logger.error(f' Error processing message: {e}') + self.send_status(valid=False, upload_message=upload_msg, validation_message=str(e)) def send_status(self, valid: bool, upload_message: FileUploadMsg, validation_message: str = '') -> None: - # upload_message.data.stage = 'flex-validation' - # upload_message.data.meta.isValid = valid - # upload_message.data.meta.validationMessage = validation_message or 'Validation successful' - # upload_message.data.response.success = valid - # upload_message.data.response.message = validation_message or 'Validation successful' - # message_id = uuid.uuid1().hex[0:24] response_message = { - "file_upload_path": upload_message.data.file_upload_path, - "user_id": upload_message.data.user_id , - "tdei_project_group_id": upload_message.data.tdei_project_group_id, - "success": valid, - "message": validation_message - } + "file_upload_path": upload_message.data.file_upload_path, + "user_id": upload_message.data.user_id , + "tdei_project_group_id": upload_message.data.tdei_project_group_id, + "success": valid, + "message": validation_message + } logger.info(f' Publishing new message with ID: {upload_message.messageId} with status: {valid} and Message: {validation_message}') data = QueueMessage.data_from({ 'messageId': upload_message.messageId, @@ -76,5 +75,14 @@ def send_status(self, valid: bool, upload_message: FileUploadMsg, validation_mes 'messageType': upload_message.messageType, 'data': response_message }) - self.response_topic.publish(data=data) - return + self.send_response(data=data) + + def send_response(self, data: QueueMessage) -> None: + try: + response_topic = self.core.get_topic(self.settings.response_topic_name) + response_topic.publish(data=data) + except Exception as e: + logger.error(f'Error sending response: {e}') + + def stop_listening(self): + self.listening_thread.join(timeout=0) \ No newline at end of file diff --git a/src/main.py b/src/main.py index 11a2a5c..506b807 100644 --- a/src/main.py +++ b/src/main.py @@ -6,6 +6,7 @@ from .gtfx_flex_validator import GTFSFlexValidator app = FastAPI() +app.flex_validator = None prefix_router = APIRouter(prefix='/health') @@ -18,7 +19,7 @@ def get_settings(): @app.on_event('startup') async def startup_event(settings: Settings = Depends(get_settings)) -> None: try: - GTFSFlexValidator() + app.flex_validator = GTFSFlexValidator() except: print('\n\n\x1b[31m Application startup failed due to missing or invalid .env file \x1b[0m') print('\x1b[31m Please provide the valid .env file and .env file should contains following parameters\x1b[0m') @@ -34,6 +35,11 @@ async def startup_event(settings: Settings = Depends(get_settings)) -> None: child.kill() parent.kill() +@app.on_event('shutdown') +async def shutdown_event(): + if app.flex_validator: + app.flex_validator.shutdown() + @app.get('/', status_code=status.HTTP_200_OK) @prefix_router.get('/', status_code=status.HTTP_200_OK) diff --git a/test_report.py b/test_report.py index a037021..2038b62 100644 --- a/test_report.py +++ b/test_report.py @@ -4,7 +4,7 @@ # Define your test cases from tests.unit_tests.test_gtfs_flex_serializer import TestGTFSFlexUpload, TestGTFSFlexUploadData, TestRequest, \ TestMeta, TestResponse -from tests.unit_tests.test_gtfs_flex_validation import TestSuccessGTFSFlexValidation, TestFailureGTFSFlexValidation, TestSuccessWithWithMacOSFile +from tests.unit_tests.test_gtfs_flex_validation import TestSuccessGTFSFlexValidation, TestFailureGTFSFlexValidation, TestSuccessWithMacOSFile from tests.unit_tests.test_gtfx_flex_validator import TestGTFSFlexValidator from tests.unit_tests.test_file_upload_msg import TestFileUploadMsg from tests.unit_tests.test_main import TestApp @@ -19,7 +19,7 @@ test_suite.addTest(unittest.makeSuite(TestMeta)) test_suite.addTest(unittest.makeSuite(TestResponse)) test_suite.addTest(unittest.makeSuite(TestSuccessGTFSFlexValidation)) - test_suite.addTest(unittest.makeSuite(TestSuccessWithWithMacOSFile)) + test_suite.addTest(unittest.makeSuite(TestSuccessWithMacOSFile)) test_suite.addTest(unittest.makeSuite(TestFailureGTFSFlexValidation)) test_suite.addTest(unittest.makeSuite(TestGTFSFlexValidator)) test_suite.addTest(unittest.makeSuite(TestApp)) diff --git a/tests/unit_tests/test_gtfs_flex_validation.py b/tests/unit_tests/test_gtfs_flex_validation.py index f9ea704..f398467 100644 --- a/tests/unit_tests/test_gtfs_flex_validation.py +++ b/tests/unit_tests/test_gtfs_flex_validation.py @@ -4,6 +4,7 @@ from pathlib import Path from unittest.mock import patch, MagicMock from src.gtfs_flex_validation import GTFSFlexValidation +from src.config import Settings DOWNLOAD_FILE_PATH = f'{Path.cwd()}/downloads' SAVED_FILE_PATH = f'{Path.cwd()}/tests/unit_tests/test_files' @@ -16,7 +17,7 @@ SCHEMA_VERSION = 'v2.0' -class TestSuccessWithWithMacOSFile(unittest.TestCase): +class TestSuccessWithMacOSFile(unittest.TestCase): @patch.object(GTFSFlexValidation, 'download_single_file') def setUp(self, mock_download_single_file): os.makedirs(DOWNLOAD_FILE_PATH, exist_ok=True) @@ -33,6 +34,7 @@ def setUp(self, mock_download_single_file): self.validator.file_path = file_path self.validator.file_relative_path = MAC_SUCCESS_FILE_NAME self.validator.container_name = None + self.validator.settings = Settings() mock_download_single_file.return_value = file_path def tearDown(self): @@ -47,11 +49,11 @@ def test_validate_with_valid_file(self): # Act is_valid, _ = self.validator.validate() - (is_valid) # Assert self.assertTrue(is_valid) + class TestSuccessGTFSFlexValidation(unittest.TestCase): @patch.object(GTFSFlexValidation, 'download_single_file') @@ -64,13 +66,16 @@ def setUp(self, mock_download_single_file): shutil.copyfile(source, destination) file_path = f'{DOWNLOAD_FILE_PATH}/{SUCCESS_FILE_NAME}' + dl_folder_path = os.path.join(DOWNLOAD_FILE_PATH, 'dummy-uuid') # Mock the UUID generation + os.makedirs(dl_folder_path, exist_ok=True) # Ensure this directory is created in the test with patch.object(GTFSFlexValidation, '__init__', return_value=None): self.validator = GTFSFlexValidation(file_path=file_path, storage_client=MagicMock()) self.validator.file_path = file_path self.validator.file_relative_path = SUCCESS_FILE_NAME self.validator.container_name = None - mock_download_single_file.return_value = file_path + self.validator.settings = Settings() + mock_download_single_file.return_value = os.path.join(dl_folder_path, SUCCESS_FILE_NAME) def tearDown(self): pass @@ -123,6 +128,33 @@ def test_download_single_file(self): content = f.read() self.assertEqual(content, b'file_content') + def test_download_multiple_file_with_same_name(self): + # Arrange + file_upload_path = DOWNLOAD_FILE_PATH + self.validator.storage_client = MagicMock() + self.validator.storage_client.get_file_from_url = MagicMock() + file = MagicMock() + file.file_path = 'text_file.txt' + file.get_stream = MagicMock(return_value=b'file_content') + self.validator.storage_client.get_file_from_url.return_value = file + + # Act + first_downloaded_file_path = self.validator.download_single_file(file_upload_path=file_upload_path) + second_downloaded_file_path = self.validator.download_single_file(file_upload_path=file_upload_path) + + # Assert + self.assertNotEqual(first_downloaded_file_path, second_downloaded_file_path, + "The downloaded file paths should be different for files with the same name.") + + # Check if the get_file_from_url was called for both download attempts + self.assertEqual(self.validator.storage_client.get_file_from_url.call_count, 2, + "get_file_from_url should be called twice for two downloads.") + file.get_stream.assert_called() + + # Additional assertions to verify that the paths indeed point to different locations + self.assertTrue(first_downloaded_file_path.startswith(DOWNLOAD_FILE_PATH)) + self.assertTrue(second_downloaded_file_path.startswith(DOWNLOAD_FILE_PATH)) + def test_clean_up_file(self): # Arrange file_upload_path = DOWNLOAD_FILE_PATH @@ -149,7 +181,7 @@ def test_clean_up_folder(self): GTFSFlexValidation.clean_up = MagicMock() # Assert - # self.assertFalse(os.path.exists(directory_name)) + self.assertFalse(os.path.exists(directory_name)) class TestFailureGTFSFlexValidation(unittest.TestCase): @@ -170,6 +202,7 @@ def setUp(self, mock_download_single_file): self.validator.file_path = file_path self.validator.file_relative_path = FAILURE_FILE_NAME self.validator.container_name = None + self.validator.settings = MagicMock() mock_download_single_file.return_value = file_path def tearDown(self): @@ -223,19 +256,20 @@ def test_download_single_file_exception(self): self.validator.storage_client.get_file_from_url = MagicMock() file = MagicMock() file.file_path = 'text_file.txt' - file.get_stream = MagicMock(return_value=b'file_content') + file.get_stream = MagicMock(side_effect=FileNotFoundError("Mocked FileNotFoundError")) self.validator.storage_client.get_file_from_url.return_value = file - # Act - downloaded_file_path = self.validator.download_single_file(file_upload_path=file_upload_path) + # Create the mock folder that would be used + unique_id = "mocked-uuid" + self.validator.settings.get_unique_id = MagicMock() + self.validator.settings.get_unique_id.return_value = unique_id - # Assert - self.validator.storage_client.get_file_from_url.assert_called_once_with(self.validator.container_name, - file_upload_path) - file.get_stream.assert_called_once() - with open(downloaded_file_path, 'rb') as f: - content = f.read() - self.assertEqual(content, b'file_content') + dl_folder_path = os.path.join(DOWNLOAD_FILE_PATH, unique_id) + os.makedirs(dl_folder_path, exist_ok=True) + + # Act & Assert + with self.assertRaises(FileNotFoundError): + self.validator.download_single_file(file_upload_path=file_upload_path) if __name__ == '__main__':