Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a problem where the RabbitMQPublisher could deadlock #43

Merged
merged 4 commits into from
Jan 20, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 48 additions & 24 deletions eiffellib/publishers/rabbitmq_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import time
import logging
import warnings
from threading import RLock
from copy import deepcopy

import pika
Expand All @@ -35,6 +36,8 @@ class RabbitMQPublisher(EiffelPublisher, BaseRabbitMQ):
_nacks = 0
_delivered = 0
_last_delivered_tag = 0
# RLock is used so that a thread can acquire a lock multiple times without blocking.
_lock = RLock()

# pylint:disable=too-many-arguments
def __init__(self, host, exchange, routing_key="eiffel",
Expand Down Expand Up @@ -101,13 +104,28 @@ def _publisher_canceled(self, method_frame):

def _resend_nacked_deliveries(self):
"""Resend all NACKed deliveries. This method loops forever."""
deliveries = self._nacked_deliveries.copy()
if deliveries:
_LOG.info("Resending %i NACKed deliveries", len(deliveries))
self._nacked_deliveries.clear()
for event in deliveries:
self.send_event(event)
self._connection.ioloop.call_later(1, self._resend_nacked_deliveries)
if not self.is_alive() or (self._channel is None or not self._channel.is_open):
_LOG.warning("Publisher is not ready. Retry resending NACKed deliveries in 1s")
self._connection.ioloop.call_later(1, self._resend_nacked_deliveries)
return

# If we cannot acquire the lock here, retry later otherwise call the send_event method.
acquired = self._lock.acquire(blocking=False)
if not acquired:
self._connection.ioloop.call_later(1, self._resend_nacked_deliveries)
try:
deliveries = self._nacked_deliveries.copy()
if deliveries:
_LOG.info("Resending %i NACKed deliveries", len(deliveries))
for event in deliveries:
# It is safe to remove the event here since if it fails delivery
# in send_event it will be re-added to _nacked_deliveries.
self._nacked_deliveries.remove(event)
# Never block in an ioloop method
self.send_event(event, block=False)
finally:
self._lock.release()
self._connection.ioloop.call_later(1, self._resend_nacked_deliveries)

def _confirm_delivery(self, method_frame):
"""Confirm the delivery of events and make sure we resend NACKed events.
Expand All @@ -127,25 +145,31 @@ def _confirm_delivery(self, method_frame):
else:
number_of_acks = delivery_tag - self._last_delivered_tag

if confirmation_type == 'ack':
self._acks += number_of_acks
elif confirmation_type == 'nack':
self._nacks += number_of_acks
# Since _resend_nacked_deliveries runs in a thread we must protect this
# part that modifies class attributes.
with self._lock:
if confirmation_type == 'ack':
self._acks += number_of_acks
elif confirmation_type == 'nack':
self._nacks += number_of_acks

if delivery_tag == 0:
if confirmation_type == "nack":
self._nacked_deliveries.extend(self._deliveries.values())
self._deliveries.clear()
else:
for tag in range(self._last_delivered_tag + 1, delivery_tag + 1):
if delivery_tag == 0:
if confirmation_type == "nack":
self._nacked_deliveries.append(self._deliveries[tag])
self._deliveries.pop(tag)
self._last_delivered_tag = delivery_tag

_LOG.debug('Published %i messages, %i have yet to be confirmed, '
'%i were acked and %i were nacked', self._acks+self._nacks,
len(self._deliveries), self._acks, self._nacks)
self._nacked_deliveries.extend(self._deliveries.values())
self._deliveries.clear()
else:
for tag in range(self._last_delivered_tag + 1, delivery_tag + 1):
if confirmation_type == "nack":
self._nacked_deliveries.append(self._deliveries[tag])
try:
self._deliveries.pop(tag)
except KeyError:
_LOG.warning("KeyError when attempting to pop tag %i")
self._last_delivered_tag = delivery_tag

_LOG.debug('Published %i messages, %i have yet to be confirmed, '
'%i were acked and %i were nacked', self._acks+self._nacks,
len(self._deliveries), self._acks, self._nacks)

def send_event(self, event, block=True):
"""Validate and send an eiffel event to the rabbitmq server.
Expand Down