diff --git a/lib/pulsar/client/amqp_exchange.py b/lib/pulsar/client/amqp_exchange.py index ef26563d2d3f..31085583f1a7 100644 --- a/lib/pulsar/client/amqp_exchange.py +++ b/lib/pulsar/client/amqp_exchange.py @@ -73,13 +73,16 @@ def __init__( if publish_kwds.get("retry", False): if "retry_policy" not in publish_kwds: publish_kwds["retry_policy"] = {} - if "errback" not in publish_kwds["retry_policy"]: - publish_kwds["retry_policy"]["errback"] = self.__publish_errback self.__publish_kwds = publish_kwds self.publish_uuid_store = publish_uuid_store self.consume_uuid_store = consume_uuid_store self.publish_ack_lock = threading.Lock() + @staticmethod + def __publish_errback(exc, interval, publish_log_prefix=""): + log.error("%sConnection error while publishing: %r", publish_log_prefix, exc, exc_info=1) + log.info("%sRetrying in %s seconds", publish_log_prefix, interval) + @property def url(self): return self.__url @@ -230,16 +233,12 @@ def __prepare_publish_kwds(self, publish_log_prefix): publish_kwds = copy.deepcopy(self.__publish_kwds) def errback(exc, interval): - return self.__publish_errback(exc, interval, publish_log_prefix) + return PulsarExchange.__publish_errback(exc, interval, publish_log_prefix) publish_kwds["retry_policy"]["errback"] = errback else: publish_kwds = self.__publish_kwds return publish_kwds - def __publish_errback(self, exc, interval, publish_log_prefix=""): - log.error("%sConnection error while publishing: %r", publish_log_prefix, exc, exc_info=1) - log.info("%sRetrying in %s seconds", publish_log_prefix, interval) - def __publish_log_prefex(self, transaction_uuid=None): prefix = "" if transaction_uuid: