Skip to content

Commit

Permalink
Merge pull request #125 from cloudblue/fix-2/LITE-26790
Browse files Browse the repository at this point in the history
LITE-26790 Better Logging in RabbitMQ producer
  • Loading branch information
maxipavlovic committed Mar 20, 2023
2 parents bd69778 + bf5c760 commit 3ff5271
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 11 deletions.
24 changes: 16 additions & 8 deletions dj_cqrs/transport/rabbit_mq.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,26 @@ def _produce_with_retries(cls, payload, retries):
cls._produce_message(channel, exchange, payload)
cls.log_produced(payload)
except (
exceptions.AMQPError, exceptions.ChannelError, exceptions.ReentrancyError,
exceptions.AMQPError,
exceptions.ChannelError,
exceptions.ReentrancyError,
AMQPConnectorException,
):
logger.exception("CQRS couldn't be published: pk = {0} ({1}).{2}".format(
payload.pk, payload.cqrs_id, " Reconnect..." if retries else "",
))

) as e:
# in case of any error - close connection and try to reconnect
cls.clean_connection()

if retries:
cls._produce_with_retries(payload, retries - 1)
base_log_message = "CQRS couldn't be published: pk = {0} ({1}).".format(
payload.pk, payload.cqrs_id,
)
if not retries:
logger.exception(base_log_message)
return

logger.warning('{0} Error: {1}. Reconnect...'.format(
base_log_message, e.__class__.__name__,
))

cls._produce_with_retries(payload, retries - 1)

@classmethod
def _consume_message(cls, ch, method, properties, body, delay_queue):
Expand Down
50 changes: 47 additions & 3 deletions tests/test_transport/test_rabbit_mq.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
import ujson
from django.db import DatabaseError
from pika.adapters.utils.connection_workflow import AMQPConnectorException
from pika.exceptions import AMQPError, ChannelError, ReentrancyError
from pika.exceptions import (
AMQPError,
ChannelError,
ReentrancyError,
StreamLostError,
)

from dj_cqrs.constants import (
DEFAULT_MASTER_AUTO_UPDATE_FIELDS,
Expand Down Expand Up @@ -223,8 +228,47 @@ def test_produce_retry_on_error(rabbit_transport, mocker, caplog):
SignalType.SAVE, 'CQRS_ID', {'id': 1}, 1,
),
)
assert "CQRS couldn't be published: pk = 1 (CQRS_ID). Reconnect..." in caplog.text
assert 'CQRS is published: pk = 1 (CQRS_ID)' in caplog.text

assert caplog.record_tuples == [
(
'django-cqrs',
logging.WARNING,
"CQRS couldn't be published: pk = 1 (CQRS_ID)."
" Error: AMQPConnectorException. Reconnect...",
),
(
'django-cqrs',
logging.INFO,
'CQRS is published: pk = 1 (CQRS_ID), correlation_id = None.',
),
]


def test_produce_retry_on_error_1(rabbit_transport, mocker, caplog):
mocker.patch.object(RabbitMQTransport, '_get_producer_rmq_objects', side_effect=[
StreamLostError,
StreamLostError,
])
mocker.patch.object(RabbitMQTransport, '_produce_message', return_value=True)

rabbit_transport.produce(
TransportPayload(
SignalType.SAVE, 'CQRS_ID', {'id': 1}, 1,
),
)

assert caplog.record_tuples == [
(
'django-cqrs',
logging.WARNING,
"CQRS couldn't be published: pk = 1 (CQRS_ID). Error: StreamLostError. Reconnect...",
),
(
'django-cqrs',
logging.ERROR,
"CQRS couldn't be published: pk = 1 (CQRS_ID).",
),
]


def test_produce_message_ok(mocker):
Expand Down

0 comments on commit 3ff5271

Please sign in to comment.