diff --git a/README.md b/README.md index ae5210d..9974015 100644 --- a/README.md +++ b/README.md @@ -27,10 +27,13 @@ RESPONSE_TOPIC=xxxx REQUEST_SUBSCRIPTION=xxxx QUEUECONNECTION=xxxx STORAGECONNECTION=xxxx +MAX_CONCURRENT_MESSAGES=xx ``` The application connect with the `STORAGECONNECTION` string provided in `.env` file and validates downloaded zipfile using `tdei-gtfs-csv-validator` package. `QUEUECONNECTION` is not being used in this application but this is the main requirement for `python-ms-core` package +`MAX_CONCURRENT_MESSAGES` is the maximum number of concurrent messages that the service can handle. If not provided, defaults to 1 + ### How to Setup and Build Follow the steps to install the node packages required for both building and running the application. diff --git a/requirements.txt b/requirements.txt index ba74e61..e7403a4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,5 @@ fastapi~=0.111.1 pydantic==1.10.4 html_testRunner==1.2.1 uvicorn==0.20.0 -python-ms-core==0.0.19 +python-ms-core==0.0.21 tcat-gtfs-csv-validator~=0.0.38 \ No newline at end of file diff --git a/src/config.py b/src/config.py index 0578a0a..5f942a4 100644 --- a/src/config.py +++ b/src/config.py @@ -12,6 +12,7 @@ class Settings(BaseSettings): response_topic_name: str = os.environ.get('RESPONSE_TOPIC', None) request_subscription: str = os.environ.get('REQUEST_SUBSCRIPTION', None) storage_container_name: str = os.environ.get('CONTAINER_NAME', 'gtfsflex') + max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 1) def get_unique_id(self) -> str: return str(uuid.uuid4()) diff --git a/src/gtfx_flex_validator.py b/src/gtfx_flex_validator.py index 135b1ed..30342b0 100644 --- a/src/gtfx_flex_validator.py +++ b/src/gtfx_flex_validator.py @@ -8,6 +8,7 @@ from .serializer.gtfx_flex_serializer import GTFSFlexUpload from .models.file_upload_msg import FileUploadMsg import threading +import time logging.basicConfig() logger = logging.getLogger('FLEX_VALIDATOR') @@ -18,13 +19,13 @@ class GTFSFlexValidator: _settings = Settings() def __init__(self): - core = Core() - settings = Settings() - self._subscription_name = settings.request_subscription - self.request_topic = core.get_topic(topic_name=settings.request_topic_name) - self.response_topic = core.get_topic(topic_name=settings.response_topic_name) - self.logger = core.get_logger() - self.storage_client = core.get_storage_client() + self.core = Core() + self.settings = Settings() + self._subscription_name = self.settings.request_subscription + self.request_topic = self.core.get_topic(topic_name=self.settings.request_topic_name,max_concurrent_messages=self.settings.max_concurrent_messages) + # self.response_topic = self.core.get_topic(topic_name=self.settings.response_topic_name) + self.logger = self.core.get_logger() + self.storage_client = self.core.get_storage_client() self.subscribe() def subscribe(self) -> None: @@ -34,41 +35,38 @@ def process(message) -> None: gtfs_upload_message = QueueMessage.to_dict(message) upload_msg = FileUploadMsg.from_dict(gtfs_upload_message) logger.info(upload_msg) - # upload_message = GTFSFlexUpload.data_from(gtfs_upload_message) - process_thread = threading.Thread(target=self.process_message,args=[upload_msg]) - process_thread.start() + self.process_message(upload_msg) else: logger.info(' No Message') self.request_topic.subscribe(subscription=self._subscription_name, callback=process) def process_message(self, upload_msg: FileUploadMsg) -> None: - file_upload_path = urllib.parse.unquote(upload_msg.data.file_upload_path) - logger.info(f' Received message for Project Group: {upload_msg.data.tdei_project_group_id}') - logger.info(file_upload_path) - if file_upload_path: - # Do the validation in the other class - validator = GTFSFlexValidation(file_path=file_upload_path, storage_client=self.storage_client) - validation = validator.validate() - self.send_status(valid=validation[0], upload_message=upload_msg, - validation_message=validation[1]) - else: - logger.info(' No file Path found in message!') + + try: + file_upload_path = urllib.parse.unquote(upload_msg.data.file_upload_path) + logger.info(f' Received message for Project Group: {upload_msg.data.tdei_project_group_id}') + logger.info(file_upload_path) + if file_upload_path: + # Do the validation in the other class + validator = GTFSFlexValidation(file_path=file_upload_path, storage_client=self.storage_client) + validation = validator.validate() + self.send_status(valid=validation[0], upload_message=upload_msg, + validation_message=validation[1]) + else: + logger.info(' No file Path found in message!') + except Exception as e: + logger.error(f' Error processing message: {e}') + self.send_status(valid=False, upload_message=upload_msg, validation_message=str(e)) def send_status(self, valid: bool, upload_message: FileUploadMsg, validation_message: str = '') -> None: - # upload_message.data.stage = 'flex-validation' - # upload_message.data.meta.isValid = valid - # upload_message.data.meta.validationMessage = validation_message or 'Validation successful' - # upload_message.data.response.success = valid - # upload_message.data.response.message = validation_message or 'Validation successful' - # message_id = uuid.uuid1().hex[0:24] response_message = { - "file_upload_path": upload_message.data.file_upload_path, - "user_id": upload_message.data.user_id , - "tdei_project_group_id": upload_message.data.tdei_project_group_id, - "success": valid, - "message": validation_message - } + "file_upload_path": upload_message.data.file_upload_path, + "user_id": upload_message.data.user_id , + "tdei_project_group_id": upload_message.data.tdei_project_group_id, + "success": valid, + "message": validation_message + } logger.info(f' Publishing new message with ID: {upload_message.messageId} with status: {valid} and Message: {validation_message}') data = QueueMessage.data_from({ 'messageId': upload_message.messageId, @@ -76,5 +74,11 @@ def send_status(self, valid: bool, upload_message: FileUploadMsg, validation_mes 'messageType': upload_message.messageType, 'data': response_message }) - self.response_topic.publish(data=data) - return + self.send_response(data=data) + + def send_response(self, data: QueueMessage) -> None: + try: + response_topic = self.core.get_topic(self.settings.response_topic_name) + response_topic.publish(data=data) + except Exception as e: + logger.error(f'Error sending response: {e}')