Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ RESPONSE_TOPIC=xxxx
REQUEST_SUBSCRIPTION=xxxx
QUEUECONNECTION=xxxx
STORAGECONNECTION=xxxx
MAX_CONCURRENT_MESSAGES=xx
```
The application connect with the `STORAGECONNECTION` string provided in `.env` file and validates downloaded zipfile using `tdei-gtfs-csv-validator` package.
`QUEUECONNECTION` is not being used in this application but this is the main requirement for `python-ms-core` package

`MAX_CONCURRENT_MESSAGES` is the maximum number of concurrent messages that the service can handle. If not provided, defaults to 1

### How to Setup and Build
Follow the steps to install the node packages required for both building and running the application.

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ fastapi~=0.111.1
pydantic==1.10.4
html_testRunner==1.2.1
uvicorn==0.20.0
python-ms-core==0.0.19
python-ms-core==0.0.21
tcat-gtfs-csv-validator~=0.0.38
1 change: 1 addition & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class Settings(BaseSettings):
response_topic_name: str = os.environ.get('RESPONSE_TOPIC', None)
request_subscription: str = os.environ.get('REQUEST_SUBSCRIPTION', None)
storage_container_name: str = os.environ.get('CONTAINER_NAME', 'gtfsflex')
max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 1)

def get_unique_id(self) -> str:
return str(uuid.uuid4())
74 changes: 39 additions & 35 deletions src/gtfx_flex_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .serializer.gtfx_flex_serializer import GTFSFlexUpload
from .models.file_upload_msg import FileUploadMsg
import threading
import time

logging.basicConfig()
logger = logging.getLogger('FLEX_VALIDATOR')
Expand All @@ -18,13 +19,13 @@ class GTFSFlexValidator:
_settings = Settings()

def __init__(self):
core = Core()
settings = Settings()
self._subscription_name = settings.request_subscription
self.request_topic = core.get_topic(topic_name=settings.request_topic_name)
self.response_topic = core.get_topic(topic_name=settings.response_topic_name)
self.logger = core.get_logger()
self.storage_client = core.get_storage_client()
self.core = Core()
self.settings = Settings()
self._subscription_name = self.settings.request_subscription
self.request_topic = self.core.get_topic(topic_name=self.settings.request_topic_name,max_concurrent_messages=self.settings.max_concurrent_messages)
# self.response_topic = self.core.get_topic(topic_name=self.settings.response_topic_name)
self.logger = self.core.get_logger()
self.storage_client = self.core.get_storage_client()
self.subscribe()

def subscribe(self) -> None:
Expand All @@ -34,47 +35,50 @@ def process(message) -> None:
gtfs_upload_message = QueueMessage.to_dict(message)
upload_msg = FileUploadMsg.from_dict(gtfs_upload_message)
logger.info(upload_msg)
# upload_message = GTFSFlexUpload.data_from(gtfs_upload_message)
process_thread = threading.Thread(target=self.process_message,args=[upload_msg])
process_thread.start()
self.process_message(upload_msg)
else:
logger.info(' No Message')

self.request_topic.subscribe(subscription=self._subscription_name, callback=process)

def process_message(self, upload_msg: FileUploadMsg) -> None:
file_upload_path = urllib.parse.unquote(upload_msg.data.file_upload_path)
logger.info(f' Received message for Project Group: {upload_msg.data.tdei_project_group_id}')
logger.info(file_upload_path)
if file_upload_path:
# Do the validation in the other class
validator = GTFSFlexValidation(file_path=file_upload_path, storage_client=self.storage_client)
validation = validator.validate()
self.send_status(valid=validation[0], upload_message=upload_msg,
validation_message=validation[1])
else:
logger.info(' No file Path found in message!')

try:
file_upload_path = urllib.parse.unquote(upload_msg.data.file_upload_path)
logger.info(f' Received message for Project Group: {upload_msg.data.tdei_project_group_id}')
logger.info(file_upload_path)
if file_upload_path:
# Do the validation in the other class
validator = GTFSFlexValidation(file_path=file_upload_path, storage_client=self.storage_client)
validation = validator.validate()
self.send_status(valid=validation[0], upload_message=upload_msg,
validation_message=validation[1])
else:
logger.info(' No file Path found in message!')
except Exception as e:
logger.error(f' Error processing message: {e}')
self.send_status(valid=False, upload_message=upload_msg, validation_message=str(e))

def send_status(self, valid: bool, upload_message: FileUploadMsg, validation_message: str = '') -> None:
# upload_message.data.stage = 'flex-validation'
# upload_message.data.meta.isValid = valid
# upload_message.data.meta.validationMessage = validation_message or 'Validation successful'
# upload_message.data.response.success = valid
# upload_message.data.response.message = validation_message or 'Validation successful'
# message_id = uuid.uuid1().hex[0:24]
response_message = {
"file_upload_path": upload_message.data.file_upload_path,
"user_id": upload_message.data.user_id ,
"tdei_project_group_id": upload_message.data.tdei_project_group_id,
"success": valid,
"message": validation_message
}
"file_upload_path": upload_message.data.file_upload_path,
"user_id": upload_message.data.user_id ,
"tdei_project_group_id": upload_message.data.tdei_project_group_id,
"success": valid,
"message": validation_message
}
logger.info(f' Publishing new message with ID: {upload_message.messageId} with status: {valid} and Message: {validation_message}')
data = QueueMessage.data_from({
'messageId': upload_message.messageId,
'message': 'Validation complete',
'messageType': upload_message.messageType,
'data': response_message
})
self.response_topic.publish(data=data)
return
self.send_response(data=data)

def send_response(self, data: QueueMessage) -> None:
try:
response_topic = self.core.get_topic(self.settings.response_topic_name)
response_topic.publish(data=data)
except Exception as e:
logger.error(f'Error sending response: {e}')