Skip to content

Commit

Permalink
Merge pull request #485 from bmbouter/455-close-qpid-when-session-closed
Browse files Browse the repository at this point in the history
Causes the monitoring thread to exit if SessionClosed is raised.
  • Loading branch information
bmbouter committed Jun 11, 2015
2 parents 7a510ab + 986e289 commit 6f51a3a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 0 deletions.
13 changes: 13 additions & 0 deletions kombu/tests/transport/test_qpid.py
Original file line number Diff line number Diff line change
Expand Up @@ -1444,6 +1444,19 @@ def test_receivers_monitor_run_calls_monitor_receivers(
self.monitor.run()
mock_monitor_receivers.assert_called_once_with()

@patch(QPID_MODULE + '.SessionClosed', new=QpidException)
@patch.object(ReceiversMonitor, 'monitor_receivers')
@patch(QPID_MODULE + '.time.sleep')
def test_receivers_monitor_run_exits_on_session_closed(
self, mock_sleep, mock_monitor_receivers):
mock_monitor_receivers.side_effect = QpidException()
try:
self.monitor.run()
except Exception:
self.fail('No exception should be raised here')
mock_monitor_receivers.assert_called_once_with()
mock_sleep.has_calls([])

@patch.object(Transport, 'connection_errors', new=(BreakOutException, ))
@patch.object(ReceiversMonitor, 'monitor_receivers')
@patch(QPID_MODULE + '.time.sleep')
Expand Down
8 changes: 8 additions & 0 deletions kombu/transport/qpid.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@
try:
from qpid.messaging.exceptions import ConnectionError, NotFound
from qpid.messaging.exceptions import Empty as QpidEmpty
from qpid.messaging.exceptions import SessionClosed
except ImportError: # pragma: no cover
ConnectionError = None
NotFound = None
QpidEmpty = None
SessionClosed = None

try:
import qpid
Expand Down Expand Up @@ -1338,6 +1340,10 @@ def run(self):
non connection errors. This guards against unexpected exceptions
which could cause this thread to exit unexpectedly.
A :class:`qpid.messaging.exceptions.SessionClosed` exception should
cause this thread to exit. This is a normal exit condition and the
thread is no longer needed.
If a connection error occurs, the exception needs to be propagated
to MainThread where the kombu exception handler can properly handle
it. The exception is stored as saved_exception on the self._session
Expand All @@ -1351,6 +1357,8 @@ def run(self):
self._session.saved_exception = exc
os.write(self._w_fd, 'e')
break
except SessionClosed:
break
except Exception as exc:
logger.error(exc, exc_info=1)
time.sleep(10)
Expand Down

0 comments on commit 6f51a3a

Please sign in to comment.