Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ fastapi~=0.111.1
pydantic==1.10.4
html_testRunner==1.2.1
uvicorn==0.20.0
python-ms-core==0.0.21
python-ms-core==0.0.22
tcat-gtfs-csv-validator~=0.0.38
2 changes: 1 addition & 1 deletion src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class Settings(BaseSettings):
response_topic_name: str = os.environ.get('RESPONSE_TOPIC', None)
request_subscription: str = os.environ.get('REQUEST_SUBSCRIPTION', None)
storage_container_name: str = os.environ.get('CONTAINER_NAME', 'gtfsflex')
max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 1)
max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 2)

def get_unique_id(self) -> str:
return str(uuid.uuid4())
6 changes: 5 additions & 1 deletion src/gtfx_flex_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ def __init__(self):
# self.response_topic = self.core.get_topic(topic_name=self.settings.response_topic_name)
self.logger = self.core.get_logger()
self.storage_client = self.core.get_storage_client()
self.subscribe()
self.listening_thread = threading.Thread(target=self.subscribe)
self.listening_thread.start()

def subscribe(self) -> None:
# Process the incoming message
Expand Down Expand Up @@ -82,3 +83,6 @@ def send_response(self, data: QueueMessage) -> None:
response_topic.publish(data=data)
except Exception as e:
logger.error(f'Error sending response: {e}')

def stop_listening(self):
self.listening_thread.join(timeout=0)
8 changes: 7 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .gtfx_flex_validator import GTFSFlexValidator

app = FastAPI()
app.flex_validator = None

prefix_router = APIRouter(prefix='/health')

Expand All @@ -18,7 +19,7 @@ def get_settings():
@app.on_event('startup')
async def startup_event(settings: Settings = Depends(get_settings)) -> None:
try:
GTFSFlexValidator()
app.flex_validator = GTFSFlexValidator()
except:
print('\n\n\x1b[31m Application startup failed due to missing or invalid .env file \x1b[0m')
print('\x1b[31m Please provide the valid .env file and .env file should contains following parameters\x1b[0m')
Expand All @@ -34,6 +35,11 @@ async def startup_event(settings: Settings = Depends(get_settings)) -> None:
child.kill()
parent.kill()

@app.on_event('shutdown')
async def shutdown_event():
if app.flex_validator:
app.flex_validator.shutdown()


@app.get('/', status_code=status.HTTP_200_OK)
@prefix_router.get('/', status_code=status.HTTP_200_OK)
Expand Down