Skip to content

Commit

Permalink
Conditionally enable publish timeout argument
Browse files Browse the repository at this point in the history
  • Loading branch information
mvdbeek committed Apr 21, 2023
1 parent 8b9e2a8 commit cfede2b
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 2 deletions.
2 changes: 1 addition & 1 deletion dev-requirements.txt
@@ -1,6 +1,6 @@
# Optional requirements used by test cases
pycurl
kombu>=5.2.3
kombu
pykube
boto3

Expand Down
6 changes: 6 additions & 0 deletions pulsar/client/amqp_exchange.py
Expand Up @@ -9,6 +9,7 @@
)
from typing import Optional

from packaging.version import parse as parse_version
try:
import kombu
import kombu.exceptions
Expand Down Expand Up @@ -46,6 +47,7 @@
ACK_FORCE_NOACK_KEY = 'force_noack'
DEFAULT_ACK_MANAGER_SLEEP = 15
DEFAULT_REPUBLISH_TIME = 30
MINIMUM_KOMBU_VERSION_PUBLISH_TIMEOUT = parse_version("5.2.0")


class PulsarExchange:
Expand Down Expand Up @@ -87,6 +89,7 @@ def __init__(
amqp.exceptions.RecoverableChannelError, # publish time out
kombu.exceptions.OperationalError, # ConnectionRefusedError, e.g. when getting a new connection while rabbitmq is down
)
self.__kombu_version = parse_version(kombu.__version__)
self.__url = url
self.__manager_name = manager_name
self.__amqp_key_prefix = amqp_key_prefix
Expand Down Expand Up @@ -291,6 +294,9 @@ def errback(exc, interval):
publish_kwds["retry_policy"]["errback"] = errback
else:
publish_kwds = self.__publish_kwds
if self.__kombu_version < MINIMUM_KOMBU_VERSION_PUBLISH_TIMEOUT:
log.warning(f"kombu version {kombu.__version__} does not support timeout argument to publish. Consider updating to 5.2.0 or newer")
publish_kwds.pop("timeout", None)
return publish_kwds

def __publish_log_prefex(self, transaction_uuid=None):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -104,7 +104,7 @@
include_package_data=True,
install_requires=requirements,
extras_require={
'amqp': ['kombu>=5.2.3'],
'amqp': ['kombu'],
'web': ['Paste', 'PasteScript'],
'galaxy_extended_metadata': ['galaxy-job-execution', 'galaxy-util[template]'],
},
Expand Down

0 comments on commit cfede2b

Please sign in to comment.