Skip to content

Commit

Permalink
feat(engine): Attempt to fix rabbitmq connectivity issues
Browse files Browse the repository at this point in the history
  • Loading branch information
daryllimyt committed Apr 23, 2024
1 parent c43e47c commit 5e7fd58
Showing 1 changed file with 26 additions and 2 deletions.
28 changes: 26 additions & 2 deletions tracecat/messaging/consumer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
from __future__ import annotations

import asyncio
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager

from aio_pika import Channel, ExchangeType
from aio_pika.abc import AbstractIncomingMessage
from aio_pika.pool import Pool
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)

from tracecat.logger import standard_logger
from tracecat.messaging.common import RABBITMQ_RUNNER_EVENTS_EXCHANGE
Expand Down Expand Up @@ -62,20 +69,37 @@ async def event_consumer(
async def subscribe(
pool: Pool[Channel], *, routing_keys: list[str]
) -> AsyncGenerator[str, None]:
"""Subscribe to events for a user.
"""Subscribe to events for a user with retry mechanism.
The routing key is the user_id.
Users only receive events that are published to their user_id.
"""
try:

@retry(
retry=retry_if_exception_type(
Exception
), # Specify the type of exceptions to retry on
wait=wait_exponential(
multiplier=1, min=4, max=10
), # Exponential backoff strategy
stop=stop_after_attempt(5), # Retry up to 5 times
)
async def _subscribe():
logger.info("Preparing to subscribe to events...")
async with pool.acquire() as channel:
await asyncio.sleep(3)
logger.info("Subscribing to events...")
async for event in event_consumer(
channel=channel,
exchange=RABBITMQ_RUNNER_EVENTS_EXCHANGE,
routing_keys=routing_keys,
):
out = str(event.body + b"\n", "utf-8")
yield out

try:
async for message in _subscribe():
yield message
except Exception as e:
logger.error(f"Error in event subscription: {e}", exc_info=True)
finally:
Expand Down

0 comments on commit 5e7fd58

Please sign in to comment.