Skip to content

Commit b47c0c1

Browse files
authored
Merge pull request #26 from TaskarCenterAtUW/feature-fix-version
Fixed 'NoneType' object has no attribute 'client_ready'
2 parents c468ab1 + 597e9d8 commit b47c0c1

File tree

5 files changed

+16
-21
lines changed

5 files changed

+16
-21
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@ VALIDATION_REQ_SUB=xxxx
3030
VALIDATION_RES_TOPIC=xxxx
3131
CONTAINER_NAME=xxxx
3232
AUTH_PERMISSION_URL=xxx
33+
MAX_CONCURRENT_MESSAGES=xxx
3334

3435
```
3536

3637
The application connect with the `STORAGECONNECTION` string provided in `.env` file and validates downloaded zipfile using `python-osw-validation` package.
3738
`QUEUECONNECTION` is used to send out the messages and listen to messages.
3839

40+
`MAX_CONCURRENT_MESSAGES` is the maximum number of concurrent messages that the service can handle. If not provided, defaults to 2
3941

4042
### How to Set up and Build
4143
Follow the steps to install the python packages required for both building and running the application

requirements.txt

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
1-
psutil==5.9.5
21
fastapi==0.88.0
3-
python-dotenv==0.21.0
42
pydantic==1.10.4
5-
python-ms-core==0.0.18
3+
python-ms-core==0.0.21
64
uvicorn==0.20.0
7-
coverage==7.2.7
85
html_testRunner==1.2.1
9-
httpx==0.24.1
10-
python-osw-validation==0.2.3
6+
python-osw-validation==0.2.4

src/assets/osw-upload.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"messageId": "c8c76e89f30944d2b2abd2491bd95337",
33
"messageType": "workflow_identifier",
44
"data": {
5-
"file_upload_path": "https://tdeisamplestorage.blob.core.windows.net/osw/test_upload/valid.zip",
5+
"file_upload_path": "https://tdeisamplestorage.blob.core.windows.net/tdei-storage-test/Archivew.zip",
66
"user_id": "c59d29b6-a063-4249-943f-d320d15ac9ab",
77
"tdei_project_group_id": "0b41ebc5-350c-42d3-90af-3af4ad3628fb"
88
}

src/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class Settings(BaseSettings):
1717
app_name: str = 'python-osw-validation'
1818
event_bus = EventBusSettings()
1919
auth_permission_url: str = os.environ.get('AUTH_PERMISSION_URL', None)
20+
max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 2)
2021

2122
@property
2223
def auth_provider(self) -> str:

src/osw_validator.py

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,30 +20,25 @@ class OSWValidator:
2020
_settings = Settings()
2121

2222
def __init__(self):
23-
core = Core()
23+
self.core = Core()
2424
options = {
2525
'provider': self._settings.auth_provider,
2626
'api_url': self._settings.auth_permission_url
2727
}
2828
listening_topic_name = self._settings.event_bus.upload_topic or ''
29-
publishing_topic_name = self._settings.event_bus.validation_topic or ''
3029
self.subscription_name = self._settings.event_bus.upload_subscription or ''
31-
self.listening_topic = core.get_topic(topic_name=listening_topic_name)
32-
self.publishing_topic = core.get_topic(topic_name=publishing_topic_name)
33-
self.logger = core.get_logger()
34-
self.storage_client = core.get_storage_client()
35-
self.auth = core.get_authorizer(config=options)
36-
self.container_name = self._settings.event_bus.container_name
30+
self.listening_topic = self.core.get_topic(topic_name=listening_topic_name, max_concurrent_messages=self._settings.max_concurrent_messages)
31+
self.logger = self.core.get_logger()
32+
self.storage_client = self.core.get_storage_client()
33+
self.auth = self.core.get_authorizer(config=options)
3734
self.start_listening()
3835

3936
def start_listening(self):
4037
def process(message) -> None:
4138
if message is not None:
4239
queue_message = QueueMessage.to_dict(message)
4340
upload_message = Upload.data_from(queue_message)
44-
process_thread = threading.Thread(target=self.validate, args=[upload_message])
45-
process_thread.start()
46-
# self.validate(upload_message)
41+
self.validate(received_message=upload_message)
4742

4843
self.listening_topic.subscribe(subscription=self.subscription_name, callback=process)
4944

@@ -89,10 +84,11 @@ def send_status(self, result: ValidationResult, upload_message: Upload):
8984
'data': upload_message.data.to_json()
9085
})
9186
try:
92-
self.publishing_topic.publish(data=data)
87+
self.core.get_topic(topic_name=self._settings.event_bus.validation_topic).publish(data=data)
88+
logger.info(f'Publishing message for : {upload_message.message_id}')
9389
except Exception as e:
94-
print(e)
95-
logger.info(f'Publishing message for : {upload_message.message_id}')
90+
logger.error(f'Error occurred while publishing message for : {upload_message.message_id} with error: {e}')
91+
9692

9793
def has_permission(self, roles: List[str], queue_message: Upload) -> bool:
9894
try:

0 commit comments

Comments
 (0)