Skip to content

Commit

Permalink
Merge pull request #808 from natefoo/amqp_ack
Browse files Browse the repository at this point in the history
[15.07] [Pulsar] Resubmit AMQP acknowledgement messages to the correct queue
  • Loading branch information
blankenberg committed Sep 29, 2015
2 parents 394c372 + 6fef446 commit 8c9e868
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions lib/pulsar/client/amqp_exchange.py
Expand Up @@ -29,6 +29,7 @@
ACK_QUEUE_SUFFIX = "_ack"
ACK_UUID_KEY = 'acknowledge_uuid'
ACK_QUEUE_KEY = 'acknowledge_queue'
ACK_SUBMIT_QUEUE_KEY = 'acknowledge_submit_queue'
ACK_UUID_RESPONSE_KEY = 'acknowledge_uuid_response'
ACK_FORCE_NOACK_KEY = 'force_noack'
DEFAULT_ACK_MANAGER_SLEEP = 15
Expand Down Expand Up @@ -77,6 +78,11 @@ def __init__(
self.publish_uuid_store = publish_uuid_store
self.consume_uuid_store = consume_uuid_store
self.publish_ack_lock = threading.Lock()
# Ack manager should sleep before checking for
# repbulishes, but if that changes, need to drain the
# queue once before the ack manager starts doing its
# thing
self.ack_manager_thread = self.__start_ack_manager()

@staticmethod
def __publish_errback(exc, interval, publish_log_prefix=""):
Expand All @@ -103,11 +109,6 @@ def consume(self, queue_name, callback, check=True, connection_kwargs={}):
with self.connection(self.__url, heartbeat=DEFAULT_HEARTBEAT, **connection_kwargs) as connection:
with kombu.Consumer(connection, queues=[queue], callbacks=callbacks, accept=['json']):
heartbeat_thread = self.__start_heartbeat(queue_name, connection)
# Ack manager should sleep before checking for
# repbulishes, but if that changes, need to drain the
# queue once before the ack manager starts doing its
# thing
self.__start_ack_manager(queue_name)
while check and connection.connected:
try:
connection.drain_events(timeout=self.__timeout)
Expand Down Expand Up @@ -193,6 +194,7 @@ def publish(self, name, payload):
ack_queue = name + ACK_QUEUE_SUFFIX
payload[ACK_UUID_KEY] = ack_uuid
payload[ACK_QUEUE_KEY] = ack_queue
payload[ACK_SUBMIT_QUEUE_KEY] = name
self.publish_uuid_store[ack_uuid] = payload
log.debug('Requesting acknowledgement of UUID %s on queue %s', ack_uuid, ack_queue)
with self.connection(self.__url) as connection:
Expand All @@ -209,18 +211,20 @@ def publish(self, name, payload):
)
log.debug("%sPublished to key %s", publish_log_prefix, key)

def ack_manager(self, queue_name):
def ack_manager(self):
log.debug('Acknowledgement manager thread alive')
resubmit_queue = queue_name[:-len(ACK_QUEUE_SUFFIX)]
try:
while True:
sleep(DEFAULT_ACK_MANAGER_SLEEP)
with self.publish_ack_lock:
for unack_uuid in self.publish_uuid_store.keys():
if self.publish_uuid_store.get_time(unack_uuid) < time() - self.__republish_time:
log.debug('UUID %s has not been acknowledged, republishing original message', unack_uuid)
payload = self.publish_uuid_store[unack_uuid]
payload[ACK_FORCE_NOACK_KEY] = True
resubmit_queue = payload[ACK_SUBMIT_QUEUE_KEY]
log.debug('UUID %s has not been acknowledged, '
'republishing original message on queue %s',
unack_uuid, resubmit_queue)
self.publish(resubmit_queue, payload)
self.publish_uuid_store.set_time(unack_uuid)
except:
Expand Down Expand Up @@ -273,10 +277,10 @@ def __start_heartbeat(self, queue_name, connection):
thread.start()
return thread

def __start_ack_manager(self, queue_name):
if self.acks_enabled and queue_name.endswith(ACK_QUEUE_SUFFIX):
thread_name = "acknowledgement-manager-%s" % (self.__queue_name(queue_name))
thread = threading.Thread(name=thread_name, target=self.ack_manager, args=(queue_name,))
def __start_ack_manager(self):
if self.acks_enabled:
thread_name = "acknowledgement-manager"
thread = threading.Thread(name=thread_name, target=self.ack_manager)
thread.daemon = True
thread.start()
return thread

0 comments on commit 8c9e868

Please sign in to comment.