diff --git a/pulsar/client/amqp_exchange.py b/pulsar/client/amqp_exchange.py index 0de56ced..4b9399f7 100644 --- a/pulsar/client/amqp_exchange.py +++ b/pulsar/client/amqp_exchange.py @@ -14,10 +14,17 @@ except ImportError: kombu = None +try: + import amqp + import amqp.exceptions +except ImportError: + amqp = None + log = logging.getLogger(__name__) KOMBU_UNAVAILABLE = "Attempting to bind to AMQP message queue, but kombu dependency unavailable" +AMQP_UNAVAILABLE = "Attempting to bind to AMQP message queue, but pyampq dependency unavailable" DEFAULT_EXCHANGE_NAME = "pulsar" DEFAULT_EXCHANGE_TYPE = "direct" @@ -47,7 +54,7 @@ class PulsarExchange: Each Pulsar manager is defined solely by name in the scheme, so only one Pulsar should target each AMQP endpoint or care should be taken that unique - manager names are used across Pulsar servers targetting same AMQP endpoint - + manager names are used across Pulsar servers targeting the same AMQP endpoint - and in particular only one such Pulsar should define an default manager with name _default_. """ @@ -68,6 +75,8 @@ def __init__( """ if not kombu: raise Exception(KOMBU_UNAVAILABLE) + if not amqp: + raise Exception(AMQP_UNAVAILABLE) self.__url = url self.__manager_name = manager_name self.__amqp_key_prefix = amqp_key_prefix @@ -84,7 +93,7 @@ def __init__( self.consume_uuid_store = consume_uuid_store self.publish_ack_lock = threading.Lock() # Ack manager should sleep before checking for - # repbulishes, but if that changes, need to drain the + # republishes, but if that changes, need to drain the # queue once before the ack manager starts doing its # thing self.ack_manager_thread = self.__start_ack_manager() @@ -119,7 +128,7 @@ def consume(self, queue_name, callback, check=True, connection_kwargs={}): connection.drain_events(timeout=self.__timeout) except socket.timeout: pass - except OSError as exc: + except (OSError, amqp.exceptions.ConnectionForced, amqp.exceptions.RecoverableChannelError, amqp.exceptions.RecoverableConnectionError) as exc: self.__handle_io_error(exc, heartbeat_thread) except BaseException: log.exception("Problem consuming queue, consumer quitting in problematic fashion!")