diff --git a/lib/pulsar/client/amqp_exchange.py b/lib/pulsar/client/amqp_exchange.py index 31085583f1a7..2ede16d8ddb9 100644 --- a/lib/pulsar/client/amqp_exchange.py +++ b/lib/pulsar/client/amqp_exchange.py @@ -29,6 +29,7 @@ ACK_QUEUE_SUFFIX = "_ack" ACK_UUID_KEY = 'acknowledge_uuid' ACK_QUEUE_KEY = 'acknowledge_queue' +ACK_SUBMIT_QUEUE_KEY = 'acknowledge_submit_queue' ACK_UUID_RESPONSE_KEY = 'acknowledge_uuid_response' ACK_FORCE_NOACK_KEY = 'force_noack' DEFAULT_ACK_MANAGER_SLEEP = 15 @@ -77,6 +78,11 @@ def __init__( self.publish_uuid_store = publish_uuid_store self.consume_uuid_store = consume_uuid_store self.publish_ack_lock = threading.Lock() + # Ack manager should sleep before checking for + # repbulishes, but if that changes, need to drain the + # queue once before the ack manager starts doing its + # thing + self.ack_manager_thread = self.__start_ack_manager() @staticmethod def __publish_errback(exc, interval, publish_log_prefix=""): @@ -103,11 +109,6 @@ def consume(self, queue_name, callback, check=True, connection_kwargs={}): with self.connection(self.__url, heartbeat=DEFAULT_HEARTBEAT, **connection_kwargs) as connection: with kombu.Consumer(connection, queues=[queue], callbacks=callbacks, accept=['json']): heartbeat_thread = self.__start_heartbeat(queue_name, connection) - # Ack manager should sleep before checking for - # repbulishes, but if that changes, need to drain the - # queue once before the ack manager starts doing its - # thing - self.__start_ack_manager(queue_name) while check and connection.connected: try: connection.drain_events(timeout=self.__timeout) @@ -193,6 +194,7 @@ def publish(self, name, payload): ack_queue = name + ACK_QUEUE_SUFFIX payload[ACK_UUID_KEY] = ack_uuid payload[ACK_QUEUE_KEY] = ack_queue + payload[ACK_SUBMIT_QUEUE_KEY] = name self.publish_uuid_store[ack_uuid] = payload log.debug('Requesting acknowledgement of UUID %s on queue %s', ack_uuid, ack_queue) with self.connection(self.__url) as connection: @@ -209,18 +211,20 @@ def publish(self, name, payload): ) log.debug("%sPublished to key %s", publish_log_prefix, key) - def ack_manager(self, queue_name): + def ack_manager(self): log.debug('Acknowledgement manager thread alive') - resubmit_queue = queue_name[:-len(ACK_QUEUE_SUFFIX)] try: while True: sleep(DEFAULT_ACK_MANAGER_SLEEP) with self.publish_ack_lock: for unack_uuid in self.publish_uuid_store.keys(): if self.publish_uuid_store.get_time(unack_uuid) < time() - self.__republish_time: - log.debug('UUID %s has not been acknowledged, republishing original message', unack_uuid) payload = self.publish_uuid_store[unack_uuid] payload[ACK_FORCE_NOACK_KEY] = True + resubmit_queue = payload[ACK_SUBMIT_QUEUE_KEY] + log.debug('UUID %s has not been acknowledged, ' + 'republishing original message on queue %s', + unack_uuid, resubmit_queue) self.publish(resubmit_queue, payload) self.publish_uuid_store.set_time(unack_uuid) except: @@ -273,10 +277,10 @@ def __start_heartbeat(self, queue_name, connection): thread.start() return thread - def __start_ack_manager(self, queue_name): - if self.acks_enabled and queue_name.endswith(ACK_QUEUE_SUFFIX): - thread_name = "acknowledgement-manager-%s" % (self.__queue_name(queue_name)) - thread = threading.Thread(name=thread_name, target=self.ack_manager, args=(queue_name,)) + def __start_ack_manager(self): + if self.acks_enabled: + thread_name = "acknowledgement-manager" + thread = threading.Thread(name=thread_name, target=self.ack_manager) thread.daemon = True thread.start() return thread