From 597e9d826718ee533de6a7324c7410faad75f396 Mon Sep 17 00:00:00 2001 From: Sujata Date: Wed, 21 Aug 2024 12:21:29 +0530 Subject: [PATCH 1/3] Fixed 'NoneType' object has no attribute 'client_ready' Updated package dependencies Updated code to handle 2 messages at a time --- README.md | 2 ++ requirements.txt | 8 ++------ src/assets/osw-upload.json | 2 +- src/config.py | 1 + src/osw_validator.py | 24 ++++++++++-------------- 5 files changed, 16 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index a62dfdb..60f6bfe 100644 --- a/README.md +++ b/README.md @@ -30,12 +30,14 @@ VALIDATION_REQ_SUB=xxxx VALIDATION_RES_TOPIC=xxxx CONTAINER_NAME=xxxx AUTH_PERMISSION_URL=xxx +MAX_CONCURRENT_MESSAGES=xxx ``` The application connect with the `STORAGECONNECTION` string provided in `.env` file and validates downloaded zipfile using `python-osw-validation` package. `QUEUECONNECTION` is used to send out the messages and listen to messages. +`MAX_CONCURRENT_MESSAGES` is the maximum number of concurrent messages that the service can handle. If not provided, defaults to 2 ### How to Set up and Build Follow the steps to install the python packages required for both building and running the application diff --git a/requirements.txt b/requirements.txt index 73cac28..38e606f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,6 @@ -psutil==5.9.5 fastapi==0.88.0 -python-dotenv==0.21.0 pydantic==1.10.4 -python-ms-core==0.0.18 +python-ms-core==0.0.21 uvicorn==0.20.0 -coverage==7.2.7 html_testRunner==1.2.1 -httpx==0.24.1 -python-osw-validation==0.2.3 \ No newline at end of file +python-osw-validation==0.2.4 \ No newline at end of file diff --git a/src/assets/osw-upload.json b/src/assets/osw-upload.json index 15e722e..8ab7a85 100644 --- a/src/assets/osw-upload.json +++ b/src/assets/osw-upload.json @@ -2,7 +2,7 @@ "messageId": "c8c76e89f30944d2b2abd2491bd95337", "messageType": "workflow_identifier", "data": { - "file_upload_path": "https://tdeisamplestorage.blob.core.windows.net/osw/test_upload/valid.zip", + "file_upload_path": "https://tdeisamplestorage.blob.core.windows.net/tdei-storage-test/Archivew.zip", "user_id": "c59d29b6-a063-4249-943f-d320d15ac9ab", "tdei_project_group_id": "0b41ebc5-350c-42d3-90af-3af4ad3628fb" } diff --git a/src/config.py b/src/config.py index 3e1accb..7c92892 100644 --- a/src/config.py +++ b/src/config.py @@ -17,6 +17,7 @@ class Settings(BaseSettings): app_name: str = 'python-osw-validation' event_bus = EventBusSettings() auth_permission_url: str = os.environ.get('AUTH_PERMISSION_URL', None) + max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 2) @property def auth_provider(self) -> str: diff --git a/src/osw_validator.py b/src/osw_validator.py index 2b9a34b..e1dcf4b 100644 --- a/src/osw_validator.py +++ b/src/osw_validator.py @@ -20,20 +20,17 @@ class OSWValidator: _settings = Settings() def __init__(self): - core = Core() + self.core = Core() options = { 'provider': self._settings.auth_provider, 'api_url': self._settings.auth_permission_url } listening_topic_name = self._settings.event_bus.upload_topic or '' - publishing_topic_name = self._settings.event_bus.validation_topic or '' self.subscription_name = self._settings.event_bus.upload_subscription or '' - self.listening_topic = core.get_topic(topic_name=listening_topic_name) - self.publishing_topic = core.get_topic(topic_name=publishing_topic_name) - self.logger = core.get_logger() - self.storage_client = core.get_storage_client() - self.auth = core.get_authorizer(config=options) - self.container_name = self._settings.event_bus.container_name + self.listening_topic = self.core.get_topic(topic_name=listening_topic_name, max_concurrent_messages=self._settings.max_concurrent_messages) + self.logger = self.core.get_logger() + self.storage_client = self.core.get_storage_client() + self.auth = self.core.get_authorizer(config=options) self.start_listening() def start_listening(self): @@ -41,9 +38,7 @@ def process(message) -> None: if message is not None: queue_message = QueueMessage.to_dict(message) upload_message = Upload.data_from(queue_message) - process_thread = threading.Thread(target=self.validate, args=[upload_message]) - process_thread.start() - # self.validate(upload_message) + self.validate(received_message=upload_message) self.listening_topic.subscribe(subscription=self.subscription_name, callback=process) @@ -89,10 +84,11 @@ def send_status(self, result: ValidationResult, upload_message: Upload): 'data': upload_message.data.to_json() }) try: - self.publishing_topic.publish(data=data) + self.core.get_topic(topic_name=self._settings.event_bus.validation_topic).publish(data=data) + logger.info(f'Publishing message for : {upload_message.message_id}') except Exception as e: - print(e) - logger.info(f'Publishing message for : {upload_message.message_id}') + logger.error(f'Error occurred while publishing message for : {upload_message.message_id} with error: {e}') + def has_permission(self, roles: List[str], queue_message: Upload) -> bool: try: From 4a704365e7487f44ef9a5cc06550e43b598aaadd Mon Sep 17 00:00:00 2001 From: Naresh Kumar D Date: Wed, 21 Aug 2024 17:51:35 +0530 Subject: [PATCH 2/3] Update Dockerfile minor bug fix for docker --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index bd8eee7..21808b7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,4 +4,4 @@ COPY ./requirements.txt /code/requirements.txt RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt COPY ./src /code/src EXPOSE 8080 -CMD ["uvicorn", "src.main:app", "--reload", "--host", "0.0.0.0", "--port", "8080"] +CMD ["uvicorn", "src.main:app","--host", "0.0.0.0", "--port", "8080"] From 467c252ce07c5bd05f48aeedf978cce9b3090bd6 Mon Sep 17 00:00:00 2001 From: Naresh Kumar D Date: Mon, 26 Aug 2024 10:51:00 +0530 Subject: [PATCH 3/3] core upgrade - Core upgraded - max 2 concurrent messages at a time --- requirements.txt | 2 +- src/main.py | 10 +++++++++- src/osw_validator.py | 6 +++++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/requirements.txt b/requirements.txt index 38e606f..c8de9a2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ fastapi==0.88.0 pydantic==1.10.4 -python-ms-core==0.0.21 +python-ms-core==0.0.22 uvicorn==0.20.0 html_testRunner==1.2.1 python-osw-validation==0.2.4 \ No newline at end of file diff --git a/src/main.py b/src/main.py index 41b15d9..b8cfc2e 100644 --- a/src/main.py +++ b/src/main.py @@ -9,6 +9,8 @@ prefix_router = APIRouter(prefix='/health') +# Have a reference to validator in the app object +app.validator = None @lru_cache() def get_settings(): @@ -18,7 +20,8 @@ def get_settings(): @app.on_event('startup') async def startup_event(settings: Settings = Depends(get_settings)) -> None: try: - OSWValidator() + # OSWValidator() + app.validator = OSWValidator() except: print('\n\n\x1b[31m Application startup failed due to missing or invalid .env file \x1b[0m') print('\x1b[31m Please provide the valid .env file and .env file should contains following parameters\x1b[0m') @@ -34,6 +37,11 @@ async def startup_event(settings: Settings = Depends(get_settings)) -> None: child.kill() parent.kill() +@app.on_event('shutdown') +async def shutdown_event() -> None: + print('Shutting down the application') + if app.validator: + app.validator.stop_listening() @app.get('/', status_code=status.HTTP_200_OK) @prefix_router.get('/', status_code=status.HTTP_200_OK) diff --git a/src/osw_validator.py b/src/osw_validator.py index e1dcf4b..d5467b0 100644 --- a/src/osw_validator.py +++ b/src/osw_validator.py @@ -31,7 +31,8 @@ def __init__(self): self.logger = self.core.get_logger() self.storage_client = self.core.get_storage_client() self.auth = self.core.get_authorizer(config=options) - self.start_listening() + self.listener_thread = threading.Thread(target=self.start_listening) + self.listener_thread.start() def start_listening(self): def process(message) -> None: @@ -103,3 +104,6 @@ def has_permission(self, roles: List[str], queue_message: Upload) -> bool: except Exception as error: print('Error validating the request authorization:', error) return False + + def stop_listening(self): + self.listener_thread.join(timeout=0) # Stop the thread during shutdown.Its still an attempt. Not sure if this will work. \ No newline at end of file