Skip to content

Commit

Permalink
Merge pull request #324 from mvdbeek/dont_die_on_rabbitmq_restart
Browse files Browse the repository at this point in the history
Allow recovery from rabbitmq restart
  • Loading branch information
jmchilton committed Apr 21, 2023
2 parents 56fd06c + cfede2b commit e487494
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 11 deletions.
4 changes: 4 additions & 0 deletions app.yml.sample
Expand Up @@ -89,6 +89,10 @@
## kombu.Connection's drain_events() method.
#amqp_consumer_timeout: 0.2

## publishing messages to the queue may hang if the connection becomes invalid.
## this value is used as the timeout argument to the producer.publish function.
#amqp_publish_timeout: 2.0

# AMQP does not guarantee that a published message is received by the AMQP
# server, so Pulsar can request that the consumer acknowledge messages and will
# resend them if acknowledgement is not received after a configurable timeout
Expand Down
3 changes: 3 additions & 0 deletions mypy.ini
Expand Up @@ -35,6 +35,9 @@ ignore_missing_imports = True
[mypy-kombu.*]
ignore_missing_imports = True

[mypy-amqp.*]
ignore_missing_imports = True

[mypy-requests_toolbelt.*]
ignore_missing_imports = True

Expand Down
46 changes: 36 additions & 10 deletions pulsar/client/amqp_exchange.py
Expand Up @@ -7,17 +7,27 @@
sleep,
time,
)
from typing import Optional

from packaging.version import parse as parse_version
try:
import kombu
import kombu.exceptions
from kombu import pools
except ImportError:
kombu = None

try:
import amqp
import amqp.exceptions
except ImportError:
amqp = None

log = logging.getLogger(__name__)


KOMBU_UNAVAILABLE = "Attempting to bind to AMQP message queue, but kombu dependency unavailable"
AMQP_UNAVAILABLE = "Attempting to bind to AMQP message queue, but pyampq dependency unavailable"

DEFAULT_EXCHANGE_NAME = "pulsar"
DEFAULT_EXCHANGE_TYPE = "direct"
Expand All @@ -37,6 +47,7 @@
ACK_FORCE_NOACK_KEY = 'force_noack'
DEFAULT_ACK_MANAGER_SLEEP = 15
DEFAULT_REPUBLISH_TIME = 30
MINIMUM_KOMBU_VERSION_PUBLISH_TIMEOUT = parse_version("5.2.0")


class PulsarExchange:
Expand All @@ -47,7 +58,7 @@ class PulsarExchange:
Each Pulsar manager is defined solely by name in the scheme, so only one Pulsar
should target each AMQP endpoint or care should be taken that unique
manager names are used across Pulsar servers targetting same AMQP endpoint -
manager names are used across Pulsar servers targeting the same AMQP endpoint -
and in particular only one such Pulsar should define an default manager with
name _default_.
"""
Expand All @@ -68,6 +79,17 @@ def __init__(
"""
if not kombu:
raise Exception(KOMBU_UNAVAILABLE)
if not amqp:
raise Exception(AMQP_UNAVAILABLE)
# conditional imports and type checking prevent us from doing this at the module level.
self.recoverable_exceptions = (
socket.timeout,
amqp.exceptions.ConnectionForced, # e.g. connection closed on rabbitmq sigterm
amqp.exceptions.RecoverableConnectionError, # connection closed
amqp.exceptions.RecoverableChannelError, # publish time out
kombu.exceptions.OperationalError, # ConnectionRefusedError, e.g. when getting a new connection while rabbitmq is down
)
self.__kombu_version = parse_version(kombu.__version__)
self.__url = url
self.__manager_name = manager_name
self.__amqp_key_prefix = amqp_key_prefix
Expand All @@ -84,7 +106,7 @@ def __init__(
self.consume_uuid_store = consume_uuid_store
self.publish_ack_lock = threading.Lock()
# Ack manager should sleep before checking for
# repbulishes, but if that changes, need to drain the
# republishes, but if that changes, need to drain the
# queue once before the ack manager starts doing its
# thing
self.ack_manager_thread = self.__start_ack_manager()
Expand Down Expand Up @@ -115,11 +137,8 @@ def consume(self, queue_name, callback, check=True, connection_kwargs={}):
with kombu.Consumer(connection, queues=[queue], callbacks=callbacks, accept=['json']):
heartbeat_thread = self.__start_heartbeat(queue_name, connection)
while check and connection.connected:
try:
connection.drain_events(timeout=self.__timeout)
except socket.timeout:
pass
except OSError as exc:
connection.drain_events(timeout=self.__timeout)
except self.recoverable_exceptions as exc:
self.__handle_io_error(exc, heartbeat_thread)
except BaseException:
log.exception("Problem consuming queue, consumer quitting in problematic fashion!")
Expand Down Expand Up @@ -161,7 +180,7 @@ def __ack_callback(self, body, message):
log.warning('Cannot remove UUID %s from store, already removed', ack_uuid)
message.ack()

def __handle_io_error(self, exc, heartbeat_thread):
def __handle_io_error(self, exc: BaseException, heartbeat_thread: Optional[threading.Thread] = None):
# In testing, errno is None
log.warning('Got %s, will retry: %s', exc.__class__.__name__, exc)
try:
Expand Down Expand Up @@ -233,8 +252,12 @@ def ack_manager(self):
log.debug('UUID %s has not been acknowledged, '
'republishing original message on queue %s',
unack_uuid, resubmit_queue)
self.publish(resubmit_queue, payload)
self.publish_uuid_store.set_time(unack_uuid)
try:
self.publish(resubmit_queue, payload)
self.publish_uuid_store.set_time(unack_uuid)
except self.recoverable_exceptions as e:
self.__handle_io_error(e)
continue
except Exception:
log.exception("Problem with acknowledgement manager, leaving ack_manager method in problematic state!")
raise
Expand Down Expand Up @@ -271,6 +294,9 @@ def errback(exc, interval):
publish_kwds["retry_policy"]["errback"] = errback
else:
publish_kwds = self.__publish_kwds
if self.__kombu_version < MINIMUM_KOMBU_VERSION_PUBLISH_TIMEOUT:
log.warning(f"kombu version {kombu.__version__} does not support timeout argument to publish. Consider updating to 5.2.0 or newer")
publish_kwds.pop("timeout", None)
return publish_kwds

def __publish_log_prefex(self, transaction_uuid=None):
Expand Down
2 changes: 1 addition & 1 deletion pulsar/client/amqp_exchange_factory.py
Expand Up @@ -11,7 +11,7 @@ def get_exchange(url, manager_name, params):
manager_name=manager_name,
amqp_key_prefix=params.get("amqp_key_prefix"),
connect_ssl=connect_ssl,
publish_kwds=parse_amqp_publish_kwds(params)
publish_kwds=parse_amqp_publish_kwds(params),
)
if params.get('amqp_acknowledge', False):
exchange_kwds.update(parse_ack_kwds(params, manager_name))
Expand Down
1 change: 1 addition & 0 deletions pulsar/messaging/bind_amqp.py
Expand Up @@ -15,6 +15,7 @@

TYPED_PARAMS = {
"amqp_consumer_timeout": lambda val: None if str(val) == "None" else float(val),
"amqp_publish_timeout": lambda val: None if str(val) == "None" else float(val),
"amqp_publish_retry": asbool,
"amqp_publish_retry_max_retries": int,
"amqp_publish_retry_interval_start": int,
Expand Down
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -104,6 +104,7 @@
include_package_data=True,
install_requires=requirements,
extras_require={
'amqp': ['kombu'],
'web': ['Paste', 'PasteScript'],
'galaxy_extended_metadata': ['galaxy-job-execution', 'galaxy-util[template]'],
},
Expand Down

0 comments on commit e487494

Please sign in to comment.