Skip to content

Commit be96686

Browse files
committed
core upgrade
- core upgraded - listens in a different thread - added thread stop at shutdown.
1 parent 44ec3d3 commit be96686

File tree

4 files changed

+14
-4
lines changed

4 files changed

+14
-4
lines changed

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@ fastapi~=0.111.1
22
pydantic==1.10.4
33
html_testRunner==1.2.1
44
uvicorn==0.20.0
5-
python-ms-core==0.0.21
5+
python-ms-core==0.0.22
66
tcat-gtfs-csv-validator~=0.0.38

src/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class Settings(BaseSettings):
1212
response_topic_name: str = os.environ.get('RESPONSE_TOPIC', None)
1313
request_subscription: str = os.environ.get('REQUEST_SUBSCRIPTION', None)
1414
storage_container_name: str = os.environ.get('CONTAINER_NAME', 'gtfsflex')
15-
max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 1)
15+
max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 2)
1616

1717
def get_unique_id(self) -> str:
1818
return str(uuid.uuid4())

src/gtfx_flex_validator.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ def __init__(self):
2626
# self.response_topic = self.core.get_topic(topic_name=self.settings.response_topic_name)
2727
self.logger = self.core.get_logger()
2828
self.storage_client = self.core.get_storage_client()
29-
self.subscribe()
29+
self.listening_thread = threading.Thread(target=self.subscribe)
30+
self.listening_thread.start()
3031

3132
def subscribe(self) -> None:
3233
# Process the incoming message
@@ -82,3 +83,6 @@ def send_response(self, data: QueueMessage) -> None:
8283
response_topic.publish(data=data)
8384
except Exception as e:
8485
logger.error(f'Error sending response: {e}')
86+
87+
def stop_listening(self):
88+
self.listening_thread.join(timeout=0)

src/main.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from .gtfx_flex_validator import GTFSFlexValidator
77

88
app = FastAPI()
9+
app.flex_validator = None
910

1011
prefix_router = APIRouter(prefix='/health')
1112

@@ -18,7 +19,7 @@ def get_settings():
1819
@app.on_event('startup')
1920
async def startup_event(settings: Settings = Depends(get_settings)) -> None:
2021
try:
21-
GTFSFlexValidator()
22+
app.flex_validator = GTFSFlexValidator()
2223
except:
2324
print('\n\n\x1b[31m Application startup failed due to missing or invalid .env file \x1b[0m')
2425
print('\x1b[31m Please provide the valid .env file and .env file should contains following parameters\x1b[0m')
@@ -34,6 +35,11 @@ async def startup_event(settings: Settings = Depends(get_settings)) -> None:
3435
child.kill()
3536
parent.kill()
3637

38+
@app.on_event('shutdown')
39+
async def shutdown_event():
40+
if app.flex_validator:
41+
app.flex_validator.shutdown()
42+
3743

3844
@app.get('/', status_code=status.HTTP_200_OK)
3945
@prefix_router.get('/', status_code=status.HTTP_200_OK)

0 commit comments

Comments
 (0)