Skip to content

Commit

Permalink
Merge branch '3.0' of github.com:celery/kombu into 3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Apr 4, 2016
2 parents bf454de + 277309f commit 2a89303
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 56 deletions.
121 changes: 79 additions & 42 deletions kombu/tests/transport/test_qpid.py
Original file line number Diff line number Diff line change
Expand Up @@ -1403,20 +1403,9 @@ def setUp(self):
self.patch_b = patch(QPID_MODULE + '.base.Transport.__init__')
self.mock_base_Transport__init__ = self.patch_b.start()

self.patch_c = patch(QPID_MODULE + '.os')
self.mock_os = self.patch_c.start()
self.mock_r = Mock()
self.mock_w = Mock()
self.mock_os.pipe.return_value = self.mock_r, self.mock_w

self.patch_d = patch(QPID_MODULE + '.fcntl')
self.mock_fcntl = self.patch_d.start()

def tearDown(self):
self.patch_a.stop()
self.patch_b.stop()
self.patch_c.stop()
self.patch_d.stop()

def test_Transport___init___calls_verify_runtime_environment(self):
Transport(Mock())
Expand All @@ -1427,20 +1416,9 @@ def test_transport___init___calls_parent_class___init__(self):
Transport(m)
self.mock_base_Transport__init__.assert_called_once_with(m)

def test_transport___init___calls_os_pipe(self):
Transport(Mock())
self.mock_os.pipe.assert_called_once_with()

def test_transport___init___saves_os_pipe_file_descriptors(self):
def test_transport___init___sets_use_async_interface_False(self):
transport = Transport(Mock())
self.assertIs(transport.r, self.mock_r)
self.assertIs(transport._w, self.mock_w)

def test_transport___init___sets_non_blocking_behavior_on_r_fd(self):
Transport(Mock())
self.mock_fcntl.fcntl.assert_called_once_with(
self.mock_r, self.mock_fcntl.F_SETFL, self.mock_os.O_NONBLOCK,
)
self.assertFalse(transport.use_async_interface)


@case_no_python3
Expand Down Expand Up @@ -1769,6 +1747,20 @@ def test_transport_verify_pre_kombu_3_0_exception_labels(self):
@disable_runtime_dependency_check
class TestTransportRegisterWithEventLoop(Case):

def setUp(self):
self.patch_a = patch(QPID_MODULE + '.os')
self.mock_os = self.patch_a.start()
self.mock_r = 1
self.mock_w = 2
self.mock_os.pipe.return_value = self.mock_r, self.mock_w

self.patch_b = patch(QPID_MODULE + '.fcntl')
self.mock_fcntl = self.patch_b.start()

def tearDown(self):
self.patch_a.stop()
self.patch_b.stop()

def test_transport_register_with_event_loop_calls_add_reader(self):
transport = Transport(Mock())
mock_connection = Mock()
Expand All @@ -1778,16 +1770,39 @@ def test_transport_register_with_event_loop_calls_add_reader(self):
transport.r, transport.on_readable, mock_connection, mock_loop,
)

def test_transport___init___calls_os_pipe(self):
transport = Transport(Mock())
transport.register_with_event_loop(Mock(), Mock())
self.mock_os.pipe.assert_called_once_with()

def test_transport___init___saves_os_pipe_file_descriptors(self):
transport = Transport(Mock())
mock_connection = Mock()
mock_loop = Mock()
transport.register_with_event_loop(mock_connection, mock_loop)
self.assertIs(transport.r, self.mock_r)
self.assertIs(transport._w, self.mock_w)

def test_transport___init___sets_non_blocking_behavior_on_r_fd(self):
transport = Transport(Mock())
mock_connection = Mock()
mock_loop = Mock()
transport.register_with_event_loop(mock_connection, mock_loop)
self.mock_fcntl.fcntl.assert_called_once_with(
self.mock_r, self.mock_fcntl.F_SETFL, self.mock_os.O_NONBLOCK,
)


@case_no_python3
@case_no_pypy
@disable_runtime_dependency_check
class TestTransportQpidCallbackHandlers(Case):
class TestTransportQpidCallbackHandlersAsync(Case):

def setUp(self):
self.patch_a = patch(QPID_MODULE + '.os.write')
self.mock_os_write = self.patch_a.start()
self.transport = Transport(Mock())
self.transport.register_with_event_loop(Mock(), Mock())

def tearDown(self):
self.patch_a.stop()
Expand All @@ -1801,6 +1816,28 @@ def test__qpid_async_exception_notify_handler_writes_symbol_to_fd(self):
self.mock_os_write.assert_called_once_with(self.transport._w, 'e')


@case_no_python3
@case_no_pypy
@disable_runtime_dependency_check
class TestTransportQpidCallbackHandlersSync(Case):

def setUp(self):
self.patch_a = patch(QPID_MODULE + '.os.write')
self.mock_os_write = self.patch_a.start()
self.transport = Transport(Mock())

def tearDown(self):
self.patch_a.stop()

def test__qpid_message_ready_handler_dows_not_write(self):
self.transport._qpid_message_ready_handler(Mock())
self.assertTrue(not self.mock_os_write.called)

def test__qpid_async_exception_notify_handler_does_not_write(self):
self.transport._qpid_async_exception_notify_handler(Mock(), Mock())
self.assertTrue(not self.mock_os_write.called)


@case_no_python3
@case_no_pypy
@disable_runtime_dependency_check
Expand All @@ -1809,9 +1846,11 @@ class TestTransportOnReadable(Case):
def setUp(self):
self.patch_a = patch(QPID_MODULE + '.os.read')
self.mock_os_read = self.patch_a.start()

self.patch_b = patch.object(Transport, 'drain_events')
self.mock_drain_events = self.patch_b.start()
self.transport = Transport(Mock())
self.transport.register_with_event_loop(Mock(), Mock())

def tearDown(self):
self.patch_a.stop()
Expand Down Expand Up @@ -1903,25 +1942,23 @@ def test_default_connection_params(self):
result_params = my_transport.default_connection_params
self.assertDictEqual(correct_params, result_params)

@patch('os.close')
def test_del(self, close):
@patch(QPID_MODULE + '.os.close')
def test_del_sync(self, close):
my_transport = Transport(self.mock_client)
my_transport.__del__()
self.assertEqual(
close.call_args_list,
[
((my_transport.r,), {}),
((my_transport._w,), {}),
])

@patch('os.close')
def test_del_failed(self, close):
self.assertFalse(close.called)

@patch(QPID_MODULE + '.os.close')
def test_del_async(self, close):
my_transport = Transport(self.mock_client)
my_transport.register_with_event_loop(Mock(), Mock())
my_transport.__del__()
self.assertTrue(close.called)

@patch(QPID_MODULE + '.os.close')
def test_del_async_failed(self, close):
close.side_effect = OSError()
my_transport = Transport(self.mock_client)
my_transport.register_with_event_loop(Mock(), Mock())
my_transport.__del__()
self.assertEqual(
close.call_args_list,
[
((my_transport.r,), {}),
((my_transport._w,), {}),
])
self.assertTrue(close.called)
33 changes: 19 additions & 14 deletions kombu/transport/qpid.py
Original file line number Diff line number Diff line change
Expand Up @@ -1428,9 +1428,7 @@ class Transport(base.Transport):
def __init__(self, *args, **kwargs):
self.verify_runtime_environment()
super(Transport, self).__init__(*args, **kwargs)
self.r, self._w = os.pipe()
if fcntl is not None:
fcntl.fcntl(self.r, fcntl.F_SETFL, os.O_NONBLOCK)
self.use_async_interface = False

def verify_runtime_environment(self):
"""Verify that the runtime environment is acceptable.
Expand Down Expand Up @@ -1471,10 +1469,12 @@ def verify_runtime_environment(self):
'qpid-python`.')

def _qpid_message_ready_handler(self, session):
os.write(self._w, '0')
if self.use_async_interface:
os.write(self._w, '0')

def _qpid_async_exception_notify_handler(self, obj_with_exception, exc):
os.write(self._w, 'e')
if self.use_async_interface:
os.write(self._w, 'e')

def on_readable(self, connection, loop):
"""Handle any messages associated with this Transport.
Expand All @@ -1486,9 +1486,9 @@ def on_readable(self, connection, loop):
all available events are drained through a call to
:meth:`drain_events`.
The behavior of self.r is adjusted in __init__ to be non-blocking,
ensuring that an accidental call to this method when no more messages
will arrive will not cause indefinite blocking.
The file descriptor self.r is modified to be non-blocking, ensuring
that an accidental call to this method when no more messages will
not cause indefinite blocking.
Nothing is expected to be returned from :meth:`drain_events` because
:meth:`drain_events` handles messages by calling callbacks that are
Expand Down Expand Up @@ -1557,6 +1557,10 @@ def register_with_event_loop(self, connection, loop):
:type loop: kombu.async.hub.Hub
"""
self.r, self._w = os.pipe()
if fcntl is not None:
fcntl.fcntl(self.r, fcntl.F_SETFL, os.O_NONBLOCK)
self.use_async_interface = True
loop.add_reader(self.r, self.on_readable, connection, loop)

def establish_connection(self):
Expand Down Expand Up @@ -1727,9 +1731,10 @@ def default_connection_params(self):

def __del__(self):
"""Ensure file descriptors opened in __init__() are closed."""
for fd in (self.r, self._w):
try:
os.close(fd)
except OSError:
# ignored
pass
if self.use_async_interface:
for fd in (self.r, self._w):
try:
os.close(fd)
except OSError:
# ignored
pass

0 comments on commit 2a89303

Please sign in to comment.