Skip to content

Commit

Permalink
Merge pull request #451 from bmbouter/qpid-exception-handling-fixes
Browse files Browse the repository at this point in the history
Adds pre 3.0 error classes and removes a custom Exception handler
  • Loading branch information
bmbouter committed Feb 6, 2015
2 parents d9bbe3c + 5bb9bdf commit fbae151
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 131 deletions.
54 changes: 10 additions & 44 deletions kombu/tests/transport/test_qpid.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from kombu.five import Empty, keys, range, monotonic
from kombu.transport.qpid import (AuthenticationFailure, Channel, Connection,
ConnectionError, Message, NotFound, QoS,
QpidMessagingExceptionHandler,
ReceiversMonitor, Transport)
from kombu.transport.virtual import Base64
from kombu.tests.case import Case, Mock, case_no_pypy, case_no_python3
Expand Down Expand Up @@ -89,44 +88,6 @@ class BreakOutException(Exception):
pass


@case_no_python3
@case_no_pypy
class TestQpidMessagingExceptionHandler(Case):

allowed_string = 'object in use'
not_allowed_string = 'a different string'

def setUp(self):
"""Create a mock ExceptionHandler for testing by this object."""
self.handler = QpidMessagingExceptionHandler(self.allowed_string)

def test_string_stored(self):
"""Assert that the allowed_exception_string is stored correctly"""
handler_string = self.handler.allowed_exception_string
self.assertEqual(self.allowed_string, handler_string)

def test_exception_positive(self):
"""Assert that an exception is silenced if it contains the
allowed_string text."""
exception_to_raise = Exception(self.allowed_string)

def exception_raise_fun():
raise exception_to_raise
decorated_fun = self.handler(exception_raise_fun)
decorated_fun()

def test_exception_negative(self):
"""Assert that an exception that does not contain the
allowed_string text is properly raised."""
exception_to_raise = Exception(self.not_allowed_string)

def exception_raise_fun():
raise exception_to_raise
decorated_fun = self.handler(exception_raise_fun)
with self.assertRaises(Exception):
decorated_fun()


@case_no_python3
@case_no_pypy
class TestQoS__init__(Case):
Expand Down Expand Up @@ -399,7 +360,7 @@ def test_mutates_ConnError_by_code(self, mock_qpid, mock_exc_info):
@patch(QPID_MODULE + '.sys.exc_info')
@patch(QPID_MODULE + '.qpid')
def test_connection__init__mutates_ConnError_by_message2(self, mock_qpid,
mock_exc_info):
mock_exc_info):
"""
Test for PLAIN connection via python-saslwrapper, sans cyrus-sasl-plain
Expand All @@ -422,7 +383,6 @@ def test_connection__init__mutates_ConnError_by_message2(self, mock_qpid,
else:
self.fail('ConnectionError type was not mutated correctly')


@patch(QPID_MODULE + '.ConnectionError', new=(QpidException, ))
@patch(QPID_MODULE + '.sys.exc_info')
@patch(QPID_MODULE + '.qpid')
Expand Down Expand Up @@ -1499,7 +1459,7 @@ def return_once_raise_on_second_call(*args):
self.monitor.run()
mock_monitor_receivers.has_calls([call(), call()])

@patch.object(Transport, 'connection_errors', new=(QpidException, ))
@patch.object(Transport, 'recoverable_connection_errors', new=(QpidException, ))
@patch.object(ReceiversMonitor, 'monitor_receivers')
@patch(QPID_MODULE + '.time.sleep')
@patch(QPID_MODULE + '.logger')
Expand All @@ -1513,7 +1473,7 @@ def test_receivers_monitor_exits_when_recoverable_exception_raised(
self.monitor.run()
self.assertFalse(mock_logger.error.called)

@patch.object(Transport, 'connection_errors', new=(QpidException, ))
@patch.object(Transport, 'recoverable_connection_errors', new=(QpidException, ))
@patch.object(ReceiversMonitor, 'monitor_receivers')
@patch(QPID_MODULE + '.time.sleep')
@patch(QPID_MODULE + '.logger')
Expand All @@ -1529,7 +1489,7 @@ def test_receivers_monitor_saves_exception_when_recoverable_exc_raised(
mock_monitor_receivers.side_effect,
)

@patch.object(Transport, 'connection_errors', new=(QpidException, ))
@patch.object(Transport, 'recoverable_connection_errors', new=(QpidException, ))
@patch.object(ReceiversMonitor, 'monitor_receivers')
@patch(QPID_MODULE + '.time.sleep')
@patch(QPID_MODULE + '.logger')
Expand Down Expand Up @@ -1916,6 +1876,12 @@ def test_transport_verify_recoverable_channel_errors(self):
channel_errors = Transport.recoverable_channel_errors
self.assertIn(NotFound, channel_errors)

def test_transport_verify_pre_kombu_3_0_exception_labels(self):
self.assertEqual(Transport.recoverable_channel_errors,
Transport.channel_errors)
self.assertEqual(Transport.recoverable_connection_errors,
Transport.connection_errors)


@case_no_python3
@case_no_pypy
Expand Down
127 changes: 40 additions & 87 deletions kombu/transport/qpid.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@
qpidtoollibs = None # noqa

try:
from qpid.messaging.exceptions import ConnectionError
from qpid.messaging.exceptions import ConnectionError, NotFound
from qpid.messaging.exceptions import Empty as QpidEmpty
from qpid.messaging.exceptions import NotFound
except ImportError: # pragma: no cover
ConnectionError = None
NotFound = None
Expand Down Expand Up @@ -100,59 +99,6 @@ class AuthenticationFailure(Exception):
pass


class QpidMessagingExceptionHandler(object):
"""An exception handling decorator that silences some exceptions.
An exception handling class designed to silence specific exceptions
that qpid.messaging raises as part of normal operation. qpid.messaging
exceptions require string parsing, and are not machine consumable.
This is designed to be used as a decorator, and accepts a whitelist
string as an argument.
Usage:
@QpidMessagingExceptionHandler('whitelist string goes here')
"""

def __init__(self, allowed_exception_string):
"""Instantiate a QpidMessagingExceptionHandler object.
:param allowed_exception_string: a string that, if present in the
exception message, will be silenced.
:type allowed_exception_string: str
"""
self.allowed_exception_string = allowed_exception_string

def __call__(self, original_fun):
"""The decorator method.
Method that wraps the actual function with exception silencing
functionality. Any exception that contains the string
self.allowed_exception_string in the message will be silenced.
:param original_fun: function that is automatically passed in
when this object is used as a decorator.
:type original_fun: function
:return: A function that decorates (wraps) the original function.
:rtype: function
"""

def decorator(*args, **kwargs):
"""A runtime-built function that will be returned which contains
a reference to the original function, and wraps a call to it in
a try/except block that can silence errors.
"""
try:
return original_fun(*args, **kwargs)
except Exception as exc:
if self.allowed_exception_string not in str(exc):
raise

return decorator


class QoS(object):
"""A helper object for message prefetch and ACKing purposes.
Expand Down Expand Up @@ -419,6 +365,8 @@ def _get(self, queue):
:return: The received message.
:rtype: :class:`qpid.messaging.Message`
:raises: :class:`qpid.messaging.exceptions.Empty` if no
message is available.
"""
rx = self.transport.session.receiver(queue)
try:
Expand Down Expand Up @@ -688,7 +636,6 @@ def queue_delete(self, queue, if_unused=False, if_empty=False, **kwargs):
return
self._delete(queue)

@QpidMessagingExceptionHandler(OBJECT_ALREADY_EXISTS_STRING)
def exchange_declare(self, exchange='', type='direct', durable=False,
**kwargs):
"""Create a new exchange.
Expand All @@ -715,7 +662,11 @@ def exchange_declare(self, exchange='', type='direct', durable=False,
:type durable: bool
"""
options = {'durable': durable}
self._broker.addExchange(type, exchange, options)
try:
self._broker.addExchange(type, exchange, options)
except Exception as exc:
if OBJECT_ALREADY_EXISTS_STRING not in str(exc):
raise exc

def exchange_delete(self, exchange_name, **kwargs):
"""Delete an exchange specified by name
Expand Down Expand Up @@ -1352,9 +1303,7 @@ class ReceiversMonitor(threading.Thread):
session.next_receiver() forever.
The entry point of the thread is :meth:`run` which calls
:meth:`monitor_receivers` and catches and logs all exceptions raised.
After an exception is logged, the method sleeps for 10 seconds, and
re-enters :meth:`monitor_receivers`
:meth:`monitor_receivers`.
The thread is designed to be daemonized, and will be forcefully killed
when all non-daemon threads have already exited.
Expand All @@ -1377,27 +1326,20 @@ def __init__(self, session, w):
def run(self):
"""Thread entry point for ReceiversMonitor
Calls :meth:`monitor_receivers` with a log-and-reenter behavior. This
guards against unexpected exceptions which could cause this thread to
exit unexpectedly.
If a recoverable error occurs, then the exception needs to be
propagated to the Main Thread where an exception handler can properly
handle it. An Exception is checked if it is recoverable, and if so,
it is stored as saved_exception on the self._session object. The
character 'e' is then written to the self.w_fd file descriptor
causing Main Thread to raise the saved exception. Once the Exception
info is saved and the file descriptor is written, this Thread
gracefully exits.
Typically recoverable errors are connection errors, and can be
recovered through a call to Transport.establish_connection which will
spawn a new ReceiversMonitor Thread.
Calls :meth:`monitor_receivers` with a log-and-reenter behavior for
non connection errors. This guards against unexpected exceptions
which could cause this thread to exit unexpectedly.
If a connection error occurs, the exception needs to be propagated
to MainThread where the kombu exception handler can properly handle
it. The exception is stored as saved_exception on the self._session
object. The character 'e' is then written to the self.w_fd file
descriptor and then this thread exits.
"""
while True:
try:
self.monitor_receivers()
except Transport.connection_errors as exc:
except Transport.recoverable_connection_errors as exc:
self._session.saved_exception = exc
os.write(self._w_fd, 'e')
break
Expand Down Expand Up @@ -1438,12 +1380,17 @@ class Transport(base.Transport):
The Transport can create :class:`Channel` objects to communicate with the
broker with using the :meth:`create_channel` method.
The Transport identifies recoverable errors, allowing for error recovery
when certain exceptions occur. These exception types are stored in the
Transport class attribute connection_errors. This adds support for Kombu
to retry an operation if a ConnectionError occurs. ConnectionErrors occur
when the Transport cannot communicate with the Qpid broker.
The Transport identifies recoverable connection errors and recoverable
channel errors according to the Kombu 3.0 interface. These exception are
listed as tuples and store in the Transport class attribute
`recoverable_connection_errors` and `recoverable_channel_errors`
respectively. Any exception raised that is not a member of one of these
tuples is considered non-recoverable. This allows Kombu support for
automatic retry of certain operations to function correctly.
For backwards compatibility to the pre Kombu 3.0 exception interface, the
recoverable errors are also listed as `connection_errors` and
`channel_errors`.
"""

# Reference to the class that should be used as the Connection object
Expand Down Expand Up @@ -1475,6 +1422,12 @@ class Transport(base.Transport):
NotFound,
)

# Support the pre 3.0 Kombu exception labeling interface which treats
# connection_errors and channel_errors both as recoverable via a
# reconnect.
connection_errors = recoverable_connection_errors
channel_errors = recoverable_channel_errors

def __init__(self, *args, **kwargs):
"""Instantiate a Transport object.
Expand Down Expand Up @@ -1537,10 +1490,10 @@ def on_readable(self, connection, loop):
ensuring that an accidental call to this method when no more messages
will arrive will not cause indefinite blocking.
If the self.r file descriptor returns the character 'e', a
recoverable error occurred in the background thread, and this thread
should raise the saved exception. The exception is stored as
saved_exception on the session object.
If the self.r file descriptor receives the character 'e', an error
occurred in the background thread, and this thread should raise the
saved exception. The exception is stored as saved_exception on the
session object.
Nothing is expected to be returned from :meth:`drain_events` because
:meth:`drain_events` handles messages by calling callbacks that are
Expand Down

0 comments on commit fbae151

Please sign in to comment.