Skip to content

Commit

Permalink
Allow recovery from restart
Browse files Browse the repository at this point in the history
  • Loading branch information
mvdbeek committed Apr 20, 2023
1 parent 56fd06c commit 73caeee
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions pulsar/client/amqp_exchange.py
Expand Up @@ -14,10 +14,17 @@
except ImportError:
kombu = None

try:
import amqp
import amqp.exceptions
except ImportError:
amqp = None

log = logging.getLogger(__name__)


KOMBU_UNAVAILABLE = "Attempting to bind to AMQP message queue, but kombu dependency unavailable"
AMQP_UNAVAILABLE = "Attempting to bind to AMQP message queue, but pyampq dependency unavailable"

DEFAULT_EXCHANGE_NAME = "pulsar"
DEFAULT_EXCHANGE_TYPE = "direct"
Expand Down Expand Up @@ -47,7 +54,7 @@ class PulsarExchange:
Each Pulsar manager is defined solely by name in the scheme, so only one Pulsar
should target each AMQP endpoint or care should be taken that unique
manager names are used across Pulsar servers targetting same AMQP endpoint -
manager names are used across Pulsar servers targeting the same AMQP endpoint -
and in particular only one such Pulsar should define an default manager with
name _default_.
"""
Expand All @@ -68,6 +75,8 @@ def __init__(
"""
if not kombu:
raise Exception(KOMBU_UNAVAILABLE)
if not amqp:
raise Exception(AMQP_UNAVAILABLE)
self.__url = url
self.__manager_name = manager_name
self.__amqp_key_prefix = amqp_key_prefix
Expand All @@ -84,7 +93,7 @@ def __init__(
self.consume_uuid_store = consume_uuid_store
self.publish_ack_lock = threading.Lock()
# Ack manager should sleep before checking for
# repbulishes, but if that changes, need to drain the
# republishes, but if that changes, need to drain the
# queue once before the ack manager starts doing its
# thing
self.ack_manager_thread = self.__start_ack_manager()
Expand Down Expand Up @@ -119,7 +128,7 @@ def consume(self, queue_name, callback, check=True, connection_kwargs={}):
connection.drain_events(timeout=self.__timeout)
except socket.timeout:
pass
except OSError as exc:
except (OSError, amqp.exceptions.ConnectionForced, amqp.exceptions.RecoverableChannelError, amqp.exceptions.RecoverableConnectionError) as exc:
self.__handle_io_error(exc, heartbeat_thread)
except BaseException:
log.exception("Problem consuming queue, consumer quitting in problematic fashion!")
Expand Down

0 comments on commit 73caeee

Please sign in to comment.