Skip to content

Commit

Permalink
[Pulsar] Resubmit AMQP acknowledgement messages to the correct queue
Browse files Browse the repository at this point in the history
when consuming more than one. Also, only start one acknowledgement
manager thread rather than one per consumer.
  • Loading branch information
natefoo committed Sep 28, 2015
1 parent 394c372 commit 6fef446
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions lib/pulsar/client/amqp_exchange.py
Expand Up @@ -29,6 +29,7 @@
ACK_QUEUE_SUFFIX = "_ack"
ACK_UUID_KEY = 'acknowledge_uuid'
ACK_QUEUE_KEY = 'acknowledge_queue'
ACK_SUBMIT_QUEUE_KEY = 'acknowledge_submit_queue'
ACK_UUID_RESPONSE_KEY = 'acknowledge_uuid_response'
ACK_FORCE_NOACK_KEY = 'force_noack'
DEFAULT_ACK_MANAGER_SLEEP = 15
Expand Down Expand Up @@ -77,6 +78,11 @@ def __init__(
self.publish_uuid_store = publish_uuid_store
self.consume_uuid_store = consume_uuid_store
self.publish_ack_lock = threading.Lock()
# Ack manager should sleep before checking for
# repbulishes, but if that changes, need to drain the
# queue once before the ack manager starts doing its
# thing
self.ack_manager_thread = self.__start_ack_manager()

@staticmethod
def __publish_errback(exc, interval, publish_log_prefix=""):
Expand All @@ -103,11 +109,6 @@ def consume(self, queue_name, callback, check=True, connection_kwargs={}):
with self.connection(self.__url, heartbeat=DEFAULT_HEARTBEAT, **connection_kwargs) as connection:
with kombu.Consumer(connection, queues=[queue], callbacks=callbacks, accept=['json']):
heartbeat_thread = self.__start_heartbeat(queue_name, connection)
# Ack manager should sleep before checking for
# repbulishes, but if that changes, need to drain the
# queue once before the ack manager starts doing its
# thing
self.__start_ack_manager(queue_name)
while check and connection.connected:
try:
connection.drain_events(timeout=self.__timeout)
Expand Down Expand Up @@ -193,6 +194,7 @@ def publish(self, name, payload):
ack_queue = name + ACK_QUEUE_SUFFIX
payload[ACK_UUID_KEY] = ack_uuid
payload[ACK_QUEUE_KEY] = ack_queue
payload[ACK_SUBMIT_QUEUE_KEY] = name
self.publish_uuid_store[ack_uuid] = payload
log.debug('Requesting acknowledgement of UUID %s on queue %s', ack_uuid, ack_queue)
with self.connection(self.__url) as connection:
Expand All @@ -209,18 +211,20 @@ def publish(self, name, payload):
)
log.debug("%sPublished to key %s", publish_log_prefix, key)

def ack_manager(self, queue_name):
def ack_manager(self):
log.debug('Acknowledgement manager thread alive')
resubmit_queue = queue_name[:-len(ACK_QUEUE_SUFFIX)]
try:
while True:
sleep(DEFAULT_ACK_MANAGER_SLEEP)
with self.publish_ack_lock:
for unack_uuid in self.publish_uuid_store.keys():
if self.publish_uuid_store.get_time(unack_uuid) < time() - self.__republish_time:
log.debug('UUID %s has not been acknowledged, republishing original message', unack_uuid)
payload = self.publish_uuid_store[unack_uuid]
payload[ACK_FORCE_NOACK_KEY] = True
resubmit_queue = payload[ACK_SUBMIT_QUEUE_KEY]
log.debug('UUID %s has not been acknowledged, '
'republishing original message on queue %s',
unack_uuid, resubmit_queue)
self.publish(resubmit_queue, payload)
self.publish_uuid_store.set_time(unack_uuid)
except:
Expand Down Expand Up @@ -273,10 +277,10 @@ def __start_heartbeat(self, queue_name, connection):
thread.start()
return thread

def __start_ack_manager(self, queue_name):
if self.acks_enabled and queue_name.endswith(ACK_QUEUE_SUFFIX):
thread_name = "acknowledgement-manager-%s" % (self.__queue_name(queue_name))
thread = threading.Thread(name=thread_name, target=self.ack_manager, args=(queue_name,))
def __start_ack_manager(self):
if self.acks_enabled:
thread_name = "acknowledgement-manager"
thread = threading.Thread(name=thread_name, target=self.ack_manager)
thread.daemon = True
thread.start()
return thread

0 comments on commit 6fef446

Please sign in to comment.