From 5e7fd58357604626d2bafbed4133f020e0896d9f Mon Sep 17 00:00:00 2001 From: Daryl Lim <5508348+daryllimyt@users.noreply.github.com> Date: Wed, 24 Apr 2024 00:06:33 +0100 Subject: [PATCH] feat(engine): Attempt to fix rabbitmq connectivity issues --- tracecat/messaging/consumer.py | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/tracecat/messaging/consumer.py b/tracecat/messaging/consumer.py index fb0db8ba..c013327a 100644 --- a/tracecat/messaging/consumer.py +++ b/tracecat/messaging/consumer.py @@ -1,11 +1,18 @@ from __future__ import annotations +import asyncio from collections.abc import AsyncGenerator from contextlib import asynccontextmanager from aio_pika import Channel, ExchangeType from aio_pika.abc import AbstractIncomingMessage from aio_pika.pool import Pool +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential, +) from tracecat.logger import standard_logger from tracecat.messaging.common import RABBITMQ_RUNNER_EVENTS_EXCHANGE @@ -62,13 +69,26 @@ async def event_consumer( async def subscribe( pool: Pool[Channel], *, routing_keys: list[str] ) -> AsyncGenerator[str, None]: - """Subscribe to events for a user. + """Subscribe to events for a user with retry mechanism. The routing key is the user_id. Users only receive events that are published to their user_id. """ - try: + + @retry( + retry=retry_if_exception_type( + Exception + ), # Specify the type of exceptions to retry on + wait=wait_exponential( + multiplier=1, min=4, max=10 + ), # Exponential backoff strategy + stop=stop_after_attempt(5), # Retry up to 5 times + ) + async def _subscribe(): + logger.info("Preparing to subscribe to events...") async with pool.acquire() as channel: + await asyncio.sleep(3) + logger.info("Subscribing to events...") async for event in event_consumer( channel=channel, exchange=RABBITMQ_RUNNER_EVENTS_EXCHANGE, @@ -76,6 +96,10 @@ async def subscribe( ): out = str(event.body + b"\n", "utf-8") yield out + + try: + async for message in _subscribe(): + yield message except Exception as e: logger.error(f"Error in event subscription: {e}", exc_info=True) finally: