From 46cd0cddb5df08ee15a98d70c1b8f4e0317cd380 Mon Sep 17 00:00:00 2001 From: Sujata Date: Fri, 16 Aug 2024 18:32:53 +0530 Subject: [PATCH 1/8] Fixed No such file or directory issue --- src/config.py | 4 ++++ src/gtfs_flex_validation.py | 17 ++++++++++------- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/config.py b/src/config.py index a52beb6..0578a0a 100644 --- a/src/config.py +++ b/src/config.py @@ -1,4 +1,5 @@ import os +import uuid from dotenv import load_dotenv from pydantic import BaseSettings @@ -11,3 +12,6 @@ class Settings(BaseSettings): response_topic_name: str = os.environ.get('RESPONSE_TOPIC', None) request_subscription: str = os.environ.get('REQUEST_SUBSCRIPTION', None) storage_container_name: str = os.environ.get('CONTAINER_NAME', 'gtfsflex') + + def get_unique_id(self) -> str: + return str(uuid.uuid4()) diff --git a/src/gtfs_flex_validation.py b/src/gtfs_flex_validation.py index 57be9bb..0e1dc43 100644 --- a/src/gtfs_flex_validation.py +++ b/src/gtfs_flex_validation.py @@ -23,8 +23,8 @@ class GTFSFlexValidation: def __init__(self, file_path=None, storage_client=None): - settings = Settings() - self.container_name = settings.storage_container_name + self.settings = Settings() + self.container_name = self.settings.storage_container_name self.storage_client = storage_client self.file_path = file_path self.file_relative_path = file_path.split('/')[-1] @@ -44,9 +44,9 @@ def is_gtfs_flex_valid(self) -> tuple[Union[bool, Any], Union[str, Any]]: validation_message = '' root, ext = os.path.splitext(self.file_relative_path) if ext and ext.lower() == '.zip': + downloaded_file_path = self.download_single_file(self.file_path) + logger.info(f' Downloaded file path: {downloaded_file_path}') try: - downloaded_file_path = self.download_single_file(self.file_path) - logger.info(f' Downloaded file path: {downloaded_file_path}') gcv_test_release.test_release(DATA_TYPE, SCHEMA_VERSION, downloaded_file_path) is_valid = True except Exception as err: @@ -67,14 +67,17 @@ def download_single_file(self, file_upload_path=None) -> str: if not is_exists: os.makedirs(DOWNLOAD_FILE_PATH) + unique_folder = self.settings.get_unique_id() + dl_folder_path = os.path.join(DOWNLOAD_FILE_PATH, unique_folder) + file = self.storage_client.get_file_from_url(self.container_name, file_upload_path) try: if file.file_path: file_path = os.path.basename(file.file_path) - with open(f'{DOWNLOAD_FILE_PATH}/{file_path}', 'wb') as blob: + with open(f'{dl_folder_path}/{file_path}', 'wb') as blob: blob.write(file.get_stream()) - logger.info(f' File downloaded to location: {DOWNLOAD_FILE_PATH}/{file_path}') - return f'{DOWNLOAD_FILE_PATH}/{file_path}' + logger.info(f' File downloaded to location: {dl_folder_path}/{file_path}') + return f'{dl_folder_path}/{file_path}' else: logger.info(' File not found!') raise Exception('File not found!') From 39b2322c4616f5a065fae17537cc91dcb761a933 Mon Sep 17 00:00:00 2001 From: Sujata Date: Fri, 16 Aug 2024 19:47:51 +0530 Subject: [PATCH 2/8] Updated unit test cases --- src/gtfs_flex_validation.py | 5 +- tests/unit_tests/test_gtfs_flex_validation.py | 62 ++++++++++++++----- 2 files changed, 52 insertions(+), 15 deletions(-) diff --git a/src/gtfs_flex_validation.py b/src/gtfs_flex_validation.py index 0e1dc43..2ed4057 100644 --- a/src/gtfs_flex_validation.py +++ b/src/gtfs_flex_validation.py @@ -70,6 +70,9 @@ def download_single_file(self, file_upload_path=None) -> str: unique_folder = self.settings.get_unique_id() dl_folder_path = os.path.join(DOWNLOAD_FILE_PATH, unique_folder) + # Ensure the unique folder path is created + os.makedirs(dl_folder_path, exist_ok=True) + file = self.storage_client.get_file_from_url(self.container_name, file_upload_path) try: if file.file_path: @@ -80,7 +83,7 @@ def download_single_file(self, file_upload_path=None) -> str: return f'{dl_folder_path}/{file_path}' else: logger.info(' File not found!') - raise Exception('File not found!') + raise Exception('File not found!') except Exception as e: traceback.print_exc() logger.error(e) diff --git a/tests/unit_tests/test_gtfs_flex_validation.py b/tests/unit_tests/test_gtfs_flex_validation.py index f9ea704..f398467 100644 --- a/tests/unit_tests/test_gtfs_flex_validation.py +++ b/tests/unit_tests/test_gtfs_flex_validation.py @@ -4,6 +4,7 @@ from pathlib import Path from unittest.mock import patch, MagicMock from src.gtfs_flex_validation import GTFSFlexValidation +from src.config import Settings DOWNLOAD_FILE_PATH = f'{Path.cwd()}/downloads' SAVED_FILE_PATH = f'{Path.cwd()}/tests/unit_tests/test_files' @@ -16,7 +17,7 @@ SCHEMA_VERSION = 'v2.0' -class TestSuccessWithWithMacOSFile(unittest.TestCase): +class TestSuccessWithMacOSFile(unittest.TestCase): @patch.object(GTFSFlexValidation, 'download_single_file') def setUp(self, mock_download_single_file): os.makedirs(DOWNLOAD_FILE_PATH, exist_ok=True) @@ -33,6 +34,7 @@ def setUp(self, mock_download_single_file): self.validator.file_path = file_path self.validator.file_relative_path = MAC_SUCCESS_FILE_NAME self.validator.container_name = None + self.validator.settings = Settings() mock_download_single_file.return_value = file_path def tearDown(self): @@ -47,11 +49,11 @@ def test_validate_with_valid_file(self): # Act is_valid, _ = self.validator.validate() - (is_valid) # Assert self.assertTrue(is_valid) + class TestSuccessGTFSFlexValidation(unittest.TestCase): @patch.object(GTFSFlexValidation, 'download_single_file') @@ -64,13 +66,16 @@ def setUp(self, mock_download_single_file): shutil.copyfile(source, destination) file_path = f'{DOWNLOAD_FILE_PATH}/{SUCCESS_FILE_NAME}' + dl_folder_path = os.path.join(DOWNLOAD_FILE_PATH, 'dummy-uuid') # Mock the UUID generation + os.makedirs(dl_folder_path, exist_ok=True) # Ensure this directory is created in the test with patch.object(GTFSFlexValidation, '__init__', return_value=None): self.validator = GTFSFlexValidation(file_path=file_path, storage_client=MagicMock()) self.validator.file_path = file_path self.validator.file_relative_path = SUCCESS_FILE_NAME self.validator.container_name = None - mock_download_single_file.return_value = file_path + self.validator.settings = Settings() + mock_download_single_file.return_value = os.path.join(dl_folder_path, SUCCESS_FILE_NAME) def tearDown(self): pass @@ -123,6 +128,33 @@ def test_download_single_file(self): content = f.read() self.assertEqual(content, b'file_content') + def test_download_multiple_file_with_same_name(self): + # Arrange + file_upload_path = DOWNLOAD_FILE_PATH + self.validator.storage_client = MagicMock() + self.validator.storage_client.get_file_from_url = MagicMock() + file = MagicMock() + file.file_path = 'text_file.txt' + file.get_stream = MagicMock(return_value=b'file_content') + self.validator.storage_client.get_file_from_url.return_value = file + + # Act + first_downloaded_file_path = self.validator.download_single_file(file_upload_path=file_upload_path) + second_downloaded_file_path = self.validator.download_single_file(file_upload_path=file_upload_path) + + # Assert + self.assertNotEqual(first_downloaded_file_path, second_downloaded_file_path, + "The downloaded file paths should be different for files with the same name.") + + # Check if the get_file_from_url was called for both download attempts + self.assertEqual(self.validator.storage_client.get_file_from_url.call_count, 2, + "get_file_from_url should be called twice for two downloads.") + file.get_stream.assert_called() + + # Additional assertions to verify that the paths indeed point to different locations + self.assertTrue(first_downloaded_file_path.startswith(DOWNLOAD_FILE_PATH)) + self.assertTrue(second_downloaded_file_path.startswith(DOWNLOAD_FILE_PATH)) + def test_clean_up_file(self): # Arrange file_upload_path = DOWNLOAD_FILE_PATH @@ -149,7 +181,7 @@ def test_clean_up_folder(self): GTFSFlexValidation.clean_up = MagicMock() # Assert - # self.assertFalse(os.path.exists(directory_name)) + self.assertFalse(os.path.exists(directory_name)) class TestFailureGTFSFlexValidation(unittest.TestCase): @@ -170,6 +202,7 @@ def setUp(self, mock_download_single_file): self.validator.file_path = file_path self.validator.file_relative_path = FAILURE_FILE_NAME self.validator.container_name = None + self.validator.settings = MagicMock() mock_download_single_file.return_value = file_path def tearDown(self): @@ -223,19 +256,20 @@ def test_download_single_file_exception(self): self.validator.storage_client.get_file_from_url = MagicMock() file = MagicMock() file.file_path = 'text_file.txt' - file.get_stream = MagicMock(return_value=b'file_content') + file.get_stream = MagicMock(side_effect=FileNotFoundError("Mocked FileNotFoundError")) self.validator.storage_client.get_file_from_url.return_value = file - # Act - downloaded_file_path = self.validator.download_single_file(file_upload_path=file_upload_path) + # Create the mock folder that would be used + unique_id = "mocked-uuid" + self.validator.settings.get_unique_id = MagicMock() + self.validator.settings.get_unique_id.return_value = unique_id - # Assert - self.validator.storage_client.get_file_from_url.assert_called_once_with(self.validator.container_name, - file_upload_path) - file.get_stream.assert_called_once() - with open(downloaded_file_path, 'rb') as f: - content = f.read() - self.assertEqual(content, b'file_content') + dl_folder_path = os.path.join(DOWNLOAD_FILE_PATH, unique_id) + os.makedirs(dl_folder_path, exist_ok=True) + + # Act & Assert + with self.assertRaises(FileNotFoundError): + self.validator.download_single_file(file_upload_path=file_upload_path) if __name__ == '__main__': From 5b5517b12127cc051c28a39c936bd1ae09b5eb49 Mon Sep 17 00:00:00 2001 From: Sujata Date: Fri, 16 Aug 2024 19:48:46 +0530 Subject: [PATCH 3/8] Updated unit test cases --- test_report.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test_report.py b/test_report.py index a037021..2038b62 100644 --- a/test_report.py +++ b/test_report.py @@ -4,7 +4,7 @@ # Define your test cases from tests.unit_tests.test_gtfs_flex_serializer import TestGTFSFlexUpload, TestGTFSFlexUploadData, TestRequest, \ TestMeta, TestResponse -from tests.unit_tests.test_gtfs_flex_validation import TestSuccessGTFSFlexValidation, TestFailureGTFSFlexValidation, TestSuccessWithWithMacOSFile +from tests.unit_tests.test_gtfs_flex_validation import TestSuccessGTFSFlexValidation, TestFailureGTFSFlexValidation, TestSuccessWithMacOSFile from tests.unit_tests.test_gtfx_flex_validator import TestGTFSFlexValidator from tests.unit_tests.test_file_upload_msg import TestFileUploadMsg from tests.unit_tests.test_main import TestApp @@ -19,7 +19,7 @@ test_suite.addTest(unittest.makeSuite(TestMeta)) test_suite.addTest(unittest.makeSuite(TestResponse)) test_suite.addTest(unittest.makeSuite(TestSuccessGTFSFlexValidation)) - test_suite.addTest(unittest.makeSuite(TestSuccessWithWithMacOSFile)) + test_suite.addTest(unittest.makeSuite(TestSuccessWithMacOSFile)) test_suite.addTest(unittest.makeSuite(TestFailureGTFSFlexValidation)) test_suite.addTest(unittest.makeSuite(TestGTFSFlexValidator)) test_suite.addTest(unittest.makeSuite(TestApp)) From bcc581fac57669728f9d078289c14d33456ddc7c Mon Sep 17 00:00:00 2001 From: Naresh Kumar D Date: Tue, 20 Aug 2024 18:34:33 +0530 Subject: [PATCH 4/8] Multi thread issue fix - Fixes the issue with multi threaded approach --- requirements.txt | 2 +- src/gtfx_flex_validator.py | 47 ++++++++++++++++++++++++++++---------- 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/requirements.txt b/requirements.txt index ba74e61..e7403a4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,5 @@ fastapi~=0.111.1 pydantic==1.10.4 html_testRunner==1.2.1 uvicorn==0.20.0 -python-ms-core==0.0.19 +python-ms-core==0.0.21 tcat-gtfs-csv-validator~=0.0.38 \ No newline at end of file diff --git a/src/gtfx_flex_validator.py b/src/gtfx_flex_validator.py index 135b1ed..4b1734e 100644 --- a/src/gtfx_flex_validator.py +++ b/src/gtfx_flex_validator.py @@ -8,6 +8,7 @@ from .serializer.gtfx_flex_serializer import GTFSFlexUpload from .models.file_upload_msg import FileUploadMsg import threading +import time logging.basicConfig() logger = logging.getLogger('FLEX_VALIDATOR') @@ -18,13 +19,15 @@ class GTFSFlexValidator: _settings = Settings() def __init__(self): - core = Core() - settings = Settings() - self._subscription_name = settings.request_subscription - self.request_topic = core.get_topic(topic_name=settings.request_topic_name) - self.response_topic = core.get_topic(topic_name=settings.response_topic_name) - self.logger = core.get_logger() - self.storage_client = core.get_storage_client() + self.core = Core() + self.settings = Settings() + self._subscription_name = self.settings.request_subscription + self.request_topic = self.core.get_topic(topic_name=self.settings.request_topic_name,max_concurrent_messages=1) + self.response_topic = self.core.get_topic(topic_name=self.settings.response_topic_name) + self.logger = self.core.get_logger() + self.storage_client = self.core.get_storage_client() + # self.core = core + # self.settings = settings self.subscribe() def subscribe(self) -> None: @@ -34,9 +37,10 @@ def process(message) -> None: gtfs_upload_message = QueueMessage.to_dict(message) upload_msg = FileUploadMsg.from_dict(gtfs_upload_message) logger.info(upload_msg) - # upload_message = GTFSFlexUpload.data_from(gtfs_upload_message) - process_thread = threading.Thread(target=self.process_message,args=[upload_msg]) - process_thread.start() + # process_thread = threading.Thread(target=self.process_message,args=[upload_msg]) + # process_thread.start() + # process_thread.join() + self.process_message(upload_msg) else: logger.info(' No Message') @@ -76,5 +80,24 @@ def send_status(self, valid: bool, upload_message: FileUploadMsg, validation_mes 'messageType': upload_message.messageType, 'data': response_message }) - self.response_topic.publish(data=data) - return + # self.response_topic.publish(data=data) + self.try_to_send_response(data=data) + + def try_to_send_response(self, data: QueueMessage, retry=3) -> None: + # try to send the message to the response topic + if retry == 0: + logger.error(f' Failed to send message to response topic for ID {data.messageId}') + return + while retry > 0: + try: + logger.error(f'Publishing message for message ID: {data.messageId}') + self.response_topic.publish(data=data) + return + except Exception as e: + logger.error(f' Error while publishing message: {e}') + logger.error(f' Retrying to send message to response topic message ID: {data.messageId}') + # sleep for one second + time.sleep(2) + self.response_topic = self.core.get_topic(topic_name=self.settings.response_topic_name) + retry -= 1 + self.try_to_send_response(data=data, retry=retry) From b5b26c4d5e390cf79f17a76ef27975af2865bee2 Mon Sep 17 00:00:00 2001 From: Naresh Kumar D Date: Wed, 21 Aug 2024 10:36:01 +0530 Subject: [PATCH 5/8] updated and cleaned code code cleaned and updated --- src/config.py | 1 + src/gtfx_flex_validator.py | 54 +++++++++++--------------------------- 2 files changed, 16 insertions(+), 39 deletions(-) diff --git a/src/config.py b/src/config.py index 0578a0a..5f942a4 100644 --- a/src/config.py +++ b/src/config.py @@ -12,6 +12,7 @@ class Settings(BaseSettings): response_topic_name: str = os.environ.get('RESPONSE_TOPIC', None) request_subscription: str = os.environ.get('REQUEST_SUBSCRIPTION', None) storage_container_name: str = os.environ.get('CONTAINER_NAME', 'gtfsflex') + max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 1) def get_unique_id(self) -> str: return str(uuid.uuid4()) diff --git a/src/gtfx_flex_validator.py b/src/gtfx_flex_validator.py index 4b1734e..96b6097 100644 --- a/src/gtfx_flex_validator.py +++ b/src/gtfx_flex_validator.py @@ -22,12 +22,10 @@ def __init__(self): self.core = Core() self.settings = Settings() self._subscription_name = self.settings.request_subscription - self.request_topic = self.core.get_topic(topic_name=self.settings.request_topic_name,max_concurrent_messages=1) - self.response_topic = self.core.get_topic(topic_name=self.settings.response_topic_name) + self.request_topic = self.core.get_topic(topic_name=self.settings.request_topic_name,max_concurrent_messages=self.settings.max_concurrent_messages) + # self.response_topic = self.core.get_topic(topic_name=self.settings.response_topic_name) self.logger = self.core.get_logger() self.storage_client = self.core.get_storage_client() - # self.core = core - # self.settings = settings self.subscribe() def subscribe(self) -> None: @@ -37,9 +35,6 @@ def process(message) -> None: gtfs_upload_message = QueueMessage.to_dict(message) upload_msg = FileUploadMsg.from_dict(gtfs_upload_message) logger.info(upload_msg) - # process_thread = threading.Thread(target=self.process_message,args=[upload_msg]) - # process_thread.start() - # process_thread.join() self.process_message(upload_msg) else: logger.info(' No Message') @@ -60,19 +55,13 @@ def process_message(self, upload_msg: FileUploadMsg) -> None: logger.info(' No file Path found in message!') def send_status(self, valid: bool, upload_message: FileUploadMsg, validation_message: str = '') -> None: - # upload_message.data.stage = 'flex-validation' - # upload_message.data.meta.isValid = valid - # upload_message.data.meta.validationMessage = validation_message or 'Validation successful' - # upload_message.data.response.success = valid - # upload_message.data.response.message = validation_message or 'Validation successful' - # message_id = uuid.uuid1().hex[0:24] response_message = { - "file_upload_path": upload_message.data.file_upload_path, - "user_id": upload_message.data.user_id , - "tdei_project_group_id": upload_message.data.tdei_project_group_id, - "success": valid, - "message": validation_message - } + "file_upload_path": upload_message.data.file_upload_path, + "user_id": upload_message.data.user_id , + "tdei_project_group_id": upload_message.data.tdei_project_group_id, + "success": valid, + "message": validation_message + } logger.info(f' Publishing new message with ID: {upload_message.messageId} with status: {valid} and Message: {validation_message}') data = QueueMessage.data_from({ 'messageId': upload_message.messageId, @@ -80,24 +69,11 @@ def send_status(self, valid: bool, upload_message: FileUploadMsg, validation_mes 'messageType': upload_message.messageType, 'data': response_message }) - # self.response_topic.publish(data=data) - self.try_to_send_response(data=data) + self.send_response(data=data) - def try_to_send_response(self, data: QueueMessage, retry=3) -> None: - # try to send the message to the response topic - if retry == 0: - logger.error(f' Failed to send message to response topic for ID {data.messageId}') - return - while retry > 0: - try: - logger.error(f'Publishing message for message ID: {data.messageId}') - self.response_topic.publish(data=data) - return - except Exception as e: - logger.error(f' Error while publishing message: {e}') - logger.error(f' Retrying to send message to response topic message ID: {data.messageId}') - # sleep for one second - time.sleep(2) - self.response_topic = self.core.get_topic(topic_name=self.settings.response_topic_name) - retry -= 1 - self.try_to_send_response(data=data, retry=retry) + def send_response(self, data: QueueMessage) -> None: + try: + response_topic = self.core.get_topic(self.settings.response_topic_name) + response_topic.publish(data=data) + except Exception as e: + logger.error(f'Error sending response: {e}') From 8b0b27b1693f5db83f0cae6daf7956caf11d4131 Mon Sep 17 00:00:00 2001 From: Naresh Kumar D Date: Wed, 21 Aug 2024 10:52:40 +0530 Subject: [PATCH 6/8] Update README.md --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index ae5210d..9974015 100644 --- a/README.md +++ b/README.md @@ -27,10 +27,13 @@ RESPONSE_TOPIC=xxxx REQUEST_SUBSCRIPTION=xxxx QUEUECONNECTION=xxxx STORAGECONNECTION=xxxx +MAX_CONCURRENT_MESSAGES=xx ``` The application connect with the `STORAGECONNECTION` string provided in `.env` file and validates downloaded zipfile using `tdei-gtfs-csv-validator` package. `QUEUECONNECTION` is not being used in this application but this is the main requirement for `python-ms-core` package +`MAX_CONCURRENT_MESSAGES` is the maximum number of concurrent messages that the service can handle. If not provided, defaults to 1 + ### How to Setup and Build Follow the steps to install the node packages required for both building and running the application. From 7f6f5870e021b673b05fe355d5dbba03924090ca Mon Sep 17 00:00:00 2001 From: Naresh Kumar D Date: Wed, 21 Aug 2024 10:54:23 +0530 Subject: [PATCH 7/8] Update gtfx_flex_validator.py try catch exception added. --- src/gtfx_flex_validator.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/src/gtfx_flex_validator.py b/src/gtfx_flex_validator.py index 96b6097..30342b0 100644 --- a/src/gtfx_flex_validator.py +++ b/src/gtfx_flex_validator.py @@ -42,17 +42,22 @@ def process(message) -> None: self.request_topic.subscribe(subscription=self._subscription_name, callback=process) def process_message(self, upload_msg: FileUploadMsg) -> None: - file_upload_path = urllib.parse.unquote(upload_msg.data.file_upload_path) - logger.info(f' Received message for Project Group: {upload_msg.data.tdei_project_group_id}') - logger.info(file_upload_path) - if file_upload_path: - # Do the validation in the other class - validator = GTFSFlexValidation(file_path=file_upload_path, storage_client=self.storage_client) - validation = validator.validate() - self.send_status(valid=validation[0], upload_message=upload_msg, - validation_message=validation[1]) - else: - logger.info(' No file Path found in message!') + + try: + file_upload_path = urllib.parse.unquote(upload_msg.data.file_upload_path) + logger.info(f' Received message for Project Group: {upload_msg.data.tdei_project_group_id}') + logger.info(file_upload_path) + if file_upload_path: + # Do the validation in the other class + validator = GTFSFlexValidation(file_path=file_upload_path, storage_client=self.storage_client) + validation = validator.validate() + self.send_status(valid=validation[0], upload_message=upload_msg, + validation_message=validation[1]) + else: + logger.info(' No file Path found in message!') + except Exception as e: + logger.error(f' Error processing message: {e}') + self.send_status(valid=False, upload_message=upload_msg, validation_message=str(e)) def send_status(self, valid: bool, upload_message: FileUploadMsg, validation_message: str = '') -> None: response_message = { From be9668679d1d15a649849459d05e6b5c0763d488 Mon Sep 17 00:00:00 2001 From: Naresh Kumar D Date: Mon, 26 Aug 2024 11:30:26 +0530 Subject: [PATCH 8/8] core upgrade - core upgraded - listens in a different thread - added thread stop at shutdown. --- requirements.txt | 2 +- src/config.py | 2 +- src/gtfx_flex_validator.py | 6 +++++- src/main.py | 8 +++++++- 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/requirements.txt b/requirements.txt index e7403a4..48d9270 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,5 @@ fastapi~=0.111.1 pydantic==1.10.4 html_testRunner==1.2.1 uvicorn==0.20.0 -python-ms-core==0.0.21 +python-ms-core==0.0.22 tcat-gtfs-csv-validator~=0.0.38 \ No newline at end of file diff --git a/src/config.py b/src/config.py index 5f942a4..3fa0540 100644 --- a/src/config.py +++ b/src/config.py @@ -12,7 +12,7 @@ class Settings(BaseSettings): response_topic_name: str = os.environ.get('RESPONSE_TOPIC', None) request_subscription: str = os.environ.get('REQUEST_SUBSCRIPTION', None) storage_container_name: str = os.environ.get('CONTAINER_NAME', 'gtfsflex') - max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 1) + max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 2) def get_unique_id(self) -> str: return str(uuid.uuid4()) diff --git a/src/gtfx_flex_validator.py b/src/gtfx_flex_validator.py index 30342b0..2d77f48 100644 --- a/src/gtfx_flex_validator.py +++ b/src/gtfx_flex_validator.py @@ -26,7 +26,8 @@ def __init__(self): # self.response_topic = self.core.get_topic(topic_name=self.settings.response_topic_name) self.logger = self.core.get_logger() self.storage_client = self.core.get_storage_client() - self.subscribe() + self.listening_thread = threading.Thread(target=self.subscribe) + self.listening_thread.start() def subscribe(self) -> None: # Process the incoming message @@ -82,3 +83,6 @@ def send_response(self, data: QueueMessage) -> None: response_topic.publish(data=data) except Exception as e: logger.error(f'Error sending response: {e}') + + def stop_listening(self): + self.listening_thread.join(timeout=0) \ No newline at end of file diff --git a/src/main.py b/src/main.py index 11a2a5c..506b807 100644 --- a/src/main.py +++ b/src/main.py @@ -6,6 +6,7 @@ from .gtfx_flex_validator import GTFSFlexValidator app = FastAPI() +app.flex_validator = None prefix_router = APIRouter(prefix='/health') @@ -18,7 +19,7 @@ def get_settings(): @app.on_event('startup') async def startup_event(settings: Settings = Depends(get_settings)) -> None: try: - GTFSFlexValidator() + app.flex_validator = GTFSFlexValidator() except: print('\n\n\x1b[31m Application startup failed due to missing or invalid .env file \x1b[0m') print('\x1b[31m Please provide the valid .env file and .env file should contains following parameters\x1b[0m') @@ -34,6 +35,11 @@ async def startup_event(settings: Settings = Depends(get_settings)) -> None: child.kill() parent.kill() +@app.on_event('shutdown') +async def shutdown_event(): + if app.flex_validator: + app.flex_validator.shutdown() + @app.get('/', status_code=status.HTTP_200_OK) @prefix_router.get('/', status_code=status.HTTP_200_OK)