Skip to content

Commit

Permalink
Merge pull request #719 from natefoo/amqp_ack
Browse files Browse the repository at this point in the history
[15.07] Pulsar client: Fix deepcopy() error with copying the errback method from PulsarExchange.
  • Loading branch information
martenson committed Sep 11, 2015
2 parents 358dc40 + 819c7dc commit 88abd34
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions lib/pulsar/client/amqp_exchange.py
Expand Up @@ -73,13 +73,16 @@ def __init__(
if publish_kwds.get("retry", False):
if "retry_policy" not in publish_kwds:
publish_kwds["retry_policy"] = {}
if "errback" not in publish_kwds["retry_policy"]:
publish_kwds["retry_policy"]["errback"] = self.__publish_errback
self.__publish_kwds = publish_kwds
self.publish_uuid_store = publish_uuid_store
self.consume_uuid_store = consume_uuid_store
self.publish_ack_lock = threading.Lock()

@staticmethod
def __publish_errback(exc, interval, publish_log_prefix=""):
log.error("%sConnection error while publishing: %r", publish_log_prefix, exc, exc_info=1)
log.info("%sRetrying in %s seconds", publish_log_prefix, interval)

@property
def url(self):
return self.__url
Expand Down Expand Up @@ -230,16 +233,12 @@ def __prepare_publish_kwds(self, publish_log_prefix):
publish_kwds = copy.deepcopy(self.__publish_kwds)

def errback(exc, interval):
return self.__publish_errback(exc, interval, publish_log_prefix)
return PulsarExchange.__publish_errback(exc, interval, publish_log_prefix)
publish_kwds["retry_policy"]["errback"] = errback
else:
publish_kwds = self.__publish_kwds
return publish_kwds

def __publish_errback(self, exc, interval, publish_log_prefix=""):
log.error("%sConnection error while publishing: %r", publish_log_prefix, exc, exc_info=1)
log.info("%sRetrying in %s seconds", publish_log_prefix, interval)

def __publish_log_prefex(self, transaction_uuid=None):
prefix = ""
if transaction_uuid:
Expand Down

0 comments on commit 88abd34

Please sign in to comment.