From 3a0f2818b9a06eb29c147bb24a0e974edb6c94d0 Mon Sep 17 00:00:00 2001 From: aliel Date: Wed, 17 Sep 2025 20:26:56 +0200 Subject: [PATCH] Fix RabbitMQ connection timeouts during long operations. --- src/aleph/chains/chain_data_service.py | 1 + src/aleph/config.py | 2 ++ src/aleph/jobs/job_utils.py | 1 + src/aleph/jobs/process_pending_messages.py | 8 +++++++- src/aleph/toolkit/rabbitmq.py | 1 + 5 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/aleph/chains/chain_data_service.py b/src/aleph/chains/chain_data_service.py index 0ce3b3e33..dae5c3f7f 100644 --- a/src/aleph/chains/chain_data_service.py +++ b/src/aleph/chains/chain_data_service.py @@ -231,6 +231,7 @@ async def make_pending_tx_exchange(config: Config) -> aio_pika.abc.AbstractExcha port=config.rabbitmq.port.value, login=config.rabbitmq.username.value, password=config.rabbitmq.password.value, + heartbeat=config.rabbitmq.heartbeat.value, ) channel = await mq_conn.channel() pending_tx_exchange = await channel.declare_exchange( diff --git a/src/aleph/config.py b/src/aleph/config.py index 4e19bdc71..4acc5f385 100644 --- a/src/aleph/config.py +++ b/src/aleph/config.py @@ -204,6 +204,8 @@ def get_defaults(): "pending_message_exchange": "aleph-pending-messages", # Name of the RabbitMQ exchange used for sync/message events (input of the TX processor). "pending_tx_exchange": "aleph-pending-txs", + # Heartbeat interval in seconds to prevent connection timeouts during long operations. + "heartbeat": 600, }, "redis": { # Hostname of the Redis service. diff --git a/src/aleph/jobs/job_utils.py b/src/aleph/jobs/job_utils.py index 92453cbe7..c4a6b1f6b 100644 --- a/src/aleph/jobs/job_utils.py +++ b/src/aleph/jobs/job_utils.py @@ -41,6 +41,7 @@ async def _make_pending_queue( port=config.rabbitmq.port.value, login=config.rabbitmq.username.value, password=config.rabbitmq.password.value, + heartbeat=config.rabbitmq.heartbeat.value, ) channel = await mq_conn.channel() diff --git a/src/aleph/jobs/process_pending_messages.py b/src/aleph/jobs/process_pending_messages.py index 08bb99397..c78a73c02 100644 --- a/src/aleph/jobs/process_pending_messages.py +++ b/src/aleph/jobs/process_pending_messages.py @@ -63,9 +63,14 @@ async def new( mq_password: str, message_exchange_name: str, pending_message_exchange_name: str, + mq_heartbeat: int, ): mq_conn = await aio_pika.connect_robust( - host=mq_host, port=mq_port, login=mq_username, password=mq_password + host=mq_host, + port=mq_port, + login=mq_username, + password=mq_password, + heartbeat=mq_heartbeat, ) channel = await mq_conn.channel() mq_message_exchange = await channel.declare_exchange( @@ -179,6 +184,7 @@ async def fetch_and_process_messages_task(config: Config): mq_password=config.rabbitmq.password.value, message_exchange_name=config.rabbitmq.message_exchange.value, pending_message_exchange_name=config.rabbitmq.pending_message_exchange.value, + mq_heartbeat=config.rabbitmq.heartbeat.value, ) async with pending_message_processor: diff --git a/src/aleph/toolkit/rabbitmq.py b/src/aleph/toolkit/rabbitmq.py index 0de3bd58c..20f81abb9 100644 --- a/src/aleph/toolkit/rabbitmq.py +++ b/src/aleph/toolkit/rabbitmq.py @@ -7,5 +7,6 @@ async def make_mq_conn(config) -> aio_pika.abc.AbstractConnection: port=config.rabbitmq.port.value, login=config.rabbitmq.username.value, password=config.rabbitmq.password.value, + heartbeat=config.rabbitmq.heartbeat.value, ) return mq_conn