Skip to content

Commit

Permalink
Merge branch '3.0' of github.com:celery/kombu into 3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Jun 12, 2015
2 parents e199eab + 6f51a3a commit 543988b
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 0 deletions.
36 changes: 36 additions & 0 deletions kombu/tests/transport/test_qpid.py
Original file line number Diff line number Diff line change
Expand Up @@ -1444,6 +1444,19 @@ def test_receivers_monitor_run_calls_monitor_receivers(
self.monitor.run()
mock_monitor_receivers.assert_called_once_with()

@patch(QPID_MODULE + '.SessionClosed', new=QpidException)
@patch.object(ReceiversMonitor, 'monitor_receivers')
@patch(QPID_MODULE + '.time.sleep')
def test_receivers_monitor_run_exits_on_session_closed(
self, mock_sleep, mock_monitor_receivers):
mock_monitor_receivers.side_effect = QpidException()
try:
self.monitor.run()
except Exception:
self.fail('No exception should be raised here')
mock_monitor_receivers.assert_called_once_with()
mock_sleep.has_calls([])

@patch.object(Transport, 'connection_errors', new=(BreakOutException, ))
@patch.object(ReceiversMonitor, 'monitor_receivers')
@patch(QPID_MODULE + '.time.sleep')
Expand Down Expand Up @@ -2023,3 +2036,26 @@ def test_default_connection_params(self):
my_transport = Transport(self.mock_client)
result_params = my_transport.default_connection_params
self.assertDictEqual(correct_params, result_params)

@patch('os.close')
def test_del(self, close):
my_transport = Transport(self.mock_client)
my_transport.__del__()
self.assertEqual(
close.call_args_list,
[
((my_transport.r,), {}),
((my_transport._w,), {}),
])

@patch('os.close')
def test_del_failed(self, close):
close.side_effect = OSError()
my_transport = Transport(self.mock_client)
my_transport.__del__()
self.assertEqual(
close.call_args_list,
[
((my_transport.r,), {}),
((my_transport._w,), {}),
])
19 changes: 19 additions & 0 deletions kombu/transport/qpid.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@
try:
from qpid.messaging.exceptions import ConnectionError, NotFound
from qpid.messaging.exceptions import Empty as QpidEmpty
from qpid.messaging.exceptions import SessionClosed
except ImportError: # pragma: no cover
ConnectionError = None
NotFound = None
QpidEmpty = None
SessionClosed = None

try:
import qpid
Expand Down Expand Up @@ -1338,6 +1340,10 @@ def run(self):
non connection errors. This guards against unexpected exceptions
which could cause this thread to exit unexpectedly.
A :class:`qpid.messaging.exceptions.SessionClosed` exception should
cause this thread to exit. This is a normal exit condition and the
thread is no longer needed.
If a connection error occurs, the exception needs to be propagated
to MainThread where the kombu exception handler can properly handle
it. The exception is stored as saved_exception on the self._session
Expand All @@ -1351,6 +1357,8 @@ def run(self):
self._session.saved_exception = exc
os.write(self._w_fd, 'e')
break
except SessionClosed:
break
except Exception as exc:
logger.error(exc, exc_info=1)
time.sleep(10)
Expand Down Expand Up @@ -1707,3 +1715,14 @@ def default_connection_params(self):
return {'userid': 'guest', 'password': '',
'port': self.default_port, 'virtual_host': '',
'hostname': 'localhost', 'sasl_mechanisms': 'PLAIN ANONYMOUS'}

def __del__(self):
"""
Ensure file descriptors opened in __init__() are closed.
"""
for fd in (self.r, self._w):
try:
os.close(fd)
except OSError:
# ignored
pass

0 comments on commit 543988b

Please sign in to comment.