From be9668679d1d15a649849459d05e6b5c0763d488 Mon Sep 17 00:00:00 2001 From: Naresh Kumar D Date: Mon, 26 Aug 2024 11:30:26 +0530 Subject: [PATCH] core upgrade - core upgraded - listens in a different thread - added thread stop at shutdown. --- requirements.txt | 2 +- src/config.py | 2 +- src/gtfx_flex_validator.py | 6 +++++- src/main.py | 8 +++++++- 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/requirements.txt b/requirements.txt index e7403a4..48d9270 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,5 @@ fastapi~=0.111.1 pydantic==1.10.4 html_testRunner==1.2.1 uvicorn==0.20.0 -python-ms-core==0.0.21 +python-ms-core==0.0.22 tcat-gtfs-csv-validator~=0.0.38 \ No newline at end of file diff --git a/src/config.py b/src/config.py index 5f942a4..3fa0540 100644 --- a/src/config.py +++ b/src/config.py @@ -12,7 +12,7 @@ class Settings(BaseSettings): response_topic_name: str = os.environ.get('RESPONSE_TOPIC', None) request_subscription: str = os.environ.get('REQUEST_SUBSCRIPTION', None) storage_container_name: str = os.environ.get('CONTAINER_NAME', 'gtfsflex') - max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 1) + max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 2) def get_unique_id(self) -> str: return str(uuid.uuid4()) diff --git a/src/gtfx_flex_validator.py b/src/gtfx_flex_validator.py index 30342b0..2d77f48 100644 --- a/src/gtfx_flex_validator.py +++ b/src/gtfx_flex_validator.py @@ -26,7 +26,8 @@ def __init__(self): # self.response_topic = self.core.get_topic(topic_name=self.settings.response_topic_name) self.logger = self.core.get_logger() self.storage_client = self.core.get_storage_client() - self.subscribe() + self.listening_thread = threading.Thread(target=self.subscribe) + self.listening_thread.start() def subscribe(self) -> None: # Process the incoming message @@ -82,3 +83,6 @@ def send_response(self, data: QueueMessage) -> None: response_topic.publish(data=data) except Exception as e: logger.error(f'Error sending response: {e}') + + def stop_listening(self): + self.listening_thread.join(timeout=0) \ No newline at end of file diff --git a/src/main.py b/src/main.py index 11a2a5c..506b807 100644 --- a/src/main.py +++ b/src/main.py @@ -6,6 +6,7 @@ from .gtfx_flex_validator import GTFSFlexValidator app = FastAPI() +app.flex_validator = None prefix_router = APIRouter(prefix='/health') @@ -18,7 +19,7 @@ def get_settings(): @app.on_event('startup') async def startup_event(settings: Settings = Depends(get_settings)) -> None: try: - GTFSFlexValidator() + app.flex_validator = GTFSFlexValidator() except: print('\n\n\x1b[31m Application startup failed due to missing or invalid .env file \x1b[0m') print('\x1b[31m Please provide the valid .env file and .env file should contains following parameters\x1b[0m') @@ -34,6 +35,11 @@ async def startup_event(settings: Settings = Depends(get_settings)) -> None: child.kill() parent.kill() +@app.on_event('shutdown') +async def shutdown_event(): + if app.flex_validator: + app.flex_validator.shutdown() + @app.get('/', status_code=status.HTTP_200_OK) @prefix_router.get('/', status_code=status.HTTP_200_OK)