From 597e9d826718ee533de6a7324c7410faad75f396 Mon Sep 17 00:00:00 2001 From: Sujata Date: Wed, 21 Aug 2024 12:21:29 +0530 Subject: [PATCH] Fixed 'NoneType' object has no attribute 'client_ready' Updated package dependencies Updated code to handle 2 messages at a time --- README.md | 2 ++ requirements.txt | 8 ++------ src/assets/osw-upload.json | 2 +- src/config.py | 1 + src/osw_validator.py | 24 ++++++++++-------------- 5 files changed, 16 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index a62dfdb..60f6bfe 100644 --- a/README.md +++ b/README.md @@ -30,12 +30,14 @@ VALIDATION_REQ_SUB=xxxx VALIDATION_RES_TOPIC=xxxx CONTAINER_NAME=xxxx AUTH_PERMISSION_URL=xxx +MAX_CONCURRENT_MESSAGES=xxx ``` The application connect with the `STORAGECONNECTION` string provided in `.env` file and validates downloaded zipfile using `python-osw-validation` package. `QUEUECONNECTION` is used to send out the messages and listen to messages. +`MAX_CONCURRENT_MESSAGES` is the maximum number of concurrent messages that the service can handle. If not provided, defaults to 2 ### How to Set up and Build Follow the steps to install the python packages required for both building and running the application diff --git a/requirements.txt b/requirements.txt index 73cac28..38e606f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,6 @@ -psutil==5.9.5 fastapi==0.88.0 -python-dotenv==0.21.0 pydantic==1.10.4 -python-ms-core==0.0.18 +python-ms-core==0.0.21 uvicorn==0.20.0 -coverage==7.2.7 html_testRunner==1.2.1 -httpx==0.24.1 -python-osw-validation==0.2.3 \ No newline at end of file +python-osw-validation==0.2.4 \ No newline at end of file diff --git a/src/assets/osw-upload.json b/src/assets/osw-upload.json index 15e722e..8ab7a85 100644 --- a/src/assets/osw-upload.json +++ b/src/assets/osw-upload.json @@ -2,7 +2,7 @@ "messageId": "c8c76e89f30944d2b2abd2491bd95337", "messageType": "workflow_identifier", "data": { - "file_upload_path": "https://tdeisamplestorage.blob.core.windows.net/osw/test_upload/valid.zip", + "file_upload_path": "https://tdeisamplestorage.blob.core.windows.net/tdei-storage-test/Archivew.zip", "user_id": "c59d29b6-a063-4249-943f-d320d15ac9ab", "tdei_project_group_id": "0b41ebc5-350c-42d3-90af-3af4ad3628fb" } diff --git a/src/config.py b/src/config.py index 3e1accb..7c92892 100644 --- a/src/config.py +++ b/src/config.py @@ -17,6 +17,7 @@ class Settings(BaseSettings): app_name: str = 'python-osw-validation' event_bus = EventBusSettings() auth_permission_url: str = os.environ.get('AUTH_PERMISSION_URL', None) + max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 2) @property def auth_provider(self) -> str: diff --git a/src/osw_validator.py b/src/osw_validator.py index 2b9a34b..e1dcf4b 100644 --- a/src/osw_validator.py +++ b/src/osw_validator.py @@ -20,20 +20,17 @@ class OSWValidator: _settings = Settings() def __init__(self): - core = Core() + self.core = Core() options = { 'provider': self._settings.auth_provider, 'api_url': self._settings.auth_permission_url } listening_topic_name = self._settings.event_bus.upload_topic or '' - publishing_topic_name = self._settings.event_bus.validation_topic or '' self.subscription_name = self._settings.event_bus.upload_subscription or '' - self.listening_topic = core.get_topic(topic_name=listening_topic_name) - self.publishing_topic = core.get_topic(topic_name=publishing_topic_name) - self.logger = core.get_logger() - self.storage_client = core.get_storage_client() - self.auth = core.get_authorizer(config=options) - self.container_name = self._settings.event_bus.container_name + self.listening_topic = self.core.get_topic(topic_name=listening_topic_name, max_concurrent_messages=self._settings.max_concurrent_messages) + self.logger = self.core.get_logger() + self.storage_client = self.core.get_storage_client() + self.auth = self.core.get_authorizer(config=options) self.start_listening() def start_listening(self): @@ -41,9 +38,7 @@ def process(message) -> None: if message is not None: queue_message = QueueMessage.to_dict(message) upload_message = Upload.data_from(queue_message) - process_thread = threading.Thread(target=self.validate, args=[upload_message]) - process_thread.start() - # self.validate(upload_message) + self.validate(received_message=upload_message) self.listening_topic.subscribe(subscription=self.subscription_name, callback=process) @@ -89,10 +84,11 @@ def send_status(self, result: ValidationResult, upload_message: Upload): 'data': upload_message.data.to_json() }) try: - self.publishing_topic.publish(data=data) + self.core.get_topic(topic_name=self._settings.event_bus.validation_topic).publish(data=data) + logger.info(f'Publishing message for : {upload_message.message_id}') except Exception as e: - print(e) - logger.info(f'Publishing message for : {upload_message.message_id}') + logger.error(f'Error occurred while publishing message for : {upload_message.message_id} with error: {e}') + def has_permission(self, roles: List[str], queue_message: Upload) -> bool: try: