From 9a9e5c1d9a889ab5a7627da210f055f5be3e30c8 Mon Sep 17 00:00:00 2001 From: Ganesh Murthy Date: Thu, 4 Mar 2021 12:42:00 -0500 Subject: [PATCH 1/4] DISPATCH-1988: Added code to start routing a delivery only if there is some data in the content->buffers or in content->pending. If we never receive any data and the receive is complete, the delivery is rejected --- include/qpid/dispatch/message.h | 8 ++ src/message.c | 20 ++++- src/router_node.c | 36 ++++++++ tests/system_tests_link_routes.py | 135 ++++++++++++++++++++++++++++++ 4 files changed, 197 insertions(+), 2 deletions(-) diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index 07e144ad67..1558dde3bb 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -247,6 +247,14 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery); */ qd_message_t * qd_get_message_context(pn_delivery_t *delivery); +/** + * Returns true if there is at least one non-empty buffer at the head of the content->buffers list + * or if the content->pending buffer is non-empty. + * + * @param msg A pointer to a message. + */ +bool qd_message_has_data_in_content_or_pending_buffers(qd_message_t *msg); + /** * Send the message outbound on an outgoing link. * diff --git a/src/message.c b/src/message.c index 1679407548..3508a996ed 100644 --- a/src/message.c +++ b/src/message.c @@ -1255,8 +1255,6 @@ void qd_message_add_fanout(qd_message_t *in_msg, // DISPATCH-1590: content->buffers may not be set up yet if // content->pending is the first buffer and it is not yet full. if (!buf) { - // assumption: proton will never signal a readable delivery if there is - // no data at all. assert(content->pending && qd_buffer_size(content->pending) > 0); DEQ_INSERT_TAIL(content->buffers, content->pending); content->pending = 0; @@ -1422,6 +1420,24 @@ qd_message_t * qd_get_message_context(pn_delivery_t *delivery) return 0; } +bool qd_message_has_data_in_content_or_pending_buffers(qd_message_t *msg) +{ + if (!msg) + return false; + + if (MSG_CONTENT(msg)) { + if (DEQ_SIZE(MSG_CONTENT(msg)->buffers) > 0) { + qd_buffer_t *buf = DEQ_HEAD(MSG_CONTENT(msg)->buffers); + if (buf && qd_buffer_size(buf) > 0) + return true; + } + if (MSG_CONTENT(msg)->pending && qd_buffer_size(MSG_CONTENT(msg)->pending) > 0) + return true; + } + + return false; +} + qd_message_t *qd_message_receive(pn_delivery_t *delivery) { diff --git a/src/router_node.c b/src/router_node.c index 70d98f91d7..bccf6cfde2 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -451,6 +451,42 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link) qd_message_t *msg = qd_message_receive(pnd); bool receive_complete = qd_message_receive_complete(msg); + // + // The very first time AMQP_rx_handler is called on a PN_DELIVERY event, it calls qd_message_receive(). When qd_message_receive() returns, we check here if + // there are any data in the content buffers. If there is no content in the buffers, there is no reason to route the delivery. We will wait for some data + // in the buffers before we start to route the delivery. + // Notice that the if statement checks for the existence of a delivery (qdr_delivery_t). Existence of a delivery means that the delivery has been routed when + // there was data in the buffers (When a delivery has been routed successfully, the delivery (qdr_delivery_t) will be non null) + // + // The following if statement will deal with the following cases:- + // 1. We receive one empty transfer frame with more=true followed by another empty transfer frame with (more=false and abort=true) or with just more=false + // In this case, there is no data at all in the message content buffers, we will reject the message when receive_complete=true. We will never route this + // delivery, so core thread will not be involved + // 2. We receive 2 or more empty transfer frames with more=true followed by another empty transfer frame with (more=false and abort=true) or with just more=false + // This case is similar to #1. We have no content in any of the buffers, we will reject this message after receive_complete=true. We will never route this + // delivery, so core thread will not be involved + // 3. Exactly one empty transfer frame with more=false and abort=false + // In this case, again there is still no content in any of the buffers, we will reject this message. Again, we will not route this message, so the core thread is not involved. + // + if (!delivery && !qd_message_has_data_in_content_or_pending_buffers(msg)) { + if (receive_complete) { + // There is no qdr_delivery_t (delivery) yet which means this message has not been routed yet (the first run of this function is not complete yet) and + // the message is fully received (receive_complete=true) but there is no content in the message buffers. + // This is only possible if there were one or more empty transfer frames. + // Since there is nothing in the message, we will reject it (AMQP message must have a non empty message body) + pn_link_flow(pn_link, 1); + if (pn_delivery_aborted(pnd)) + qd_message_set_discard(msg, true); + pn_delivery_update(pnd, PN_REJECTED); + pn_delivery_settle(pnd); + // qd_message_free will free all the associated content buffers and also the content->pending buffer + qd_message_free(msg); + qd_log(router->log_source, QD_LOG_TRACE, "Message rejected due to empty message"); + } + + return false; + } + if (!qd_message_oversize(msg)) { // message not rejected as oversize if (receive_complete) { diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py index 15e00fe10c..c2bb0d63fa 100644 --- a/tests/system_tests_link_routes.py +++ b/tests/system_tests_link_routes.py @@ -25,6 +25,7 @@ from time import sleep, time from threading import Event from subprocess import PIPE, STDOUT +import socket from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process, TestTimeout, \ AsyncTestSender, AsyncTestReceiver, MgmtMsgProxy, unittest, QdManager @@ -1763,6 +1764,140 @@ def test_DISPATCH_1496(self): self.assertEqual(drain_receiver.error, None) +class EmptyTransferTest(TestCase): + @classmethod + def setUpClass(cls): + super(EmptyTransferTest, cls).setUpClass() + cls.ROUTER_LISTEN_PORT = cls.tester.get_port() + + config = [ + ('router', {'mode': 'standalone', 'id': 'QDR.A'}), + # the client will connect to this listener + ('listener', {'role': 'normal', + 'host': '0.0.0.0', + 'port': cls.ROUTER_LISTEN_PORT, + 'saslMechanisms': 'ANONYMOUS'}), + + # to connect to the fake broker + ('connector', {'name': 'broker', + 'role': 'route-container', + 'host': '127.0.0.1', + 'port': cls.tester.get_port(), + 'saslMechanisms': 'ANONYMOUS'}), + + ('linkRoute', + {'prefix': 'examples', 'containerId': 'FakeBroker', + 'direction': 'in'}), + ('linkRoute', + {'prefix': 'examples', 'containerId': 'FakeBroker', + 'direction': 'out'}) + ] + + config = Qdrouterd.Config(config) + cls.router = cls.tester.qdrouterd('A', config, wait=False) + + def _fake_broker(self, cls): + """ + Spawn a fake broker listening on the broker's connector + """ + fake_broker = cls(self.router.connector_addresses[0]) + # wait until the connection to the fake broker activates + self.router.wait_connectors() + return fake_broker + + def test_DISPATCH_1988(self): + fake_broker = self._fake_broker(FakeBroker) + AMQP_OPEN_BEGIN_ATTACH = bytearray( + b'\x41\x4d\x51\x50\x00\x01\x00\x00\x00\x00\x00\x21\x02\x00\x00\x00\x00\x53\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x04\xa1\x06\x2e\x2f\x73\x65\x6e\x64\x40\x40\x60\x7f\xff\x00\x00\x00\x21\x02\x00\x00\x00\x00\x53\x11\xd0\x00\x00\x00\x11\x00\x00\x00\x04\x40\x52\x00\x70\x7f\xff\xff\xff\x70\x7f\xff\xff\xff\x00\x00\x00\x5b\x02\x00\x00\x00\x00\x53\x12\xd0\x00\x00\x00\x4b\x00\x00\x00\x0b\xa1\x09\x6d\x79\x5f\x73\x65\x6e\x64\x65\x72\x52\x00\x42\x50\x02\x50\x00\x00\x53\x28\xd0\x00\x00\x00\x0b\x00\x00\x00\x05\x40\x52\x00\x40\x52\x00\x42\x00\x53\x29\xd0\x00\x00\x00\x14\x00\x00\x00\x05\xa1\x08\x65\x78\x61\x6d\x70\x6c\x65\x73\x52\x00\x40\x52\x00\x42\x40\x40\x52\x00\x53\x00') + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + # Connect to the router listening port and send an amqp, open, + # begin, attach. The attach is sent on the link + # routed address, "examples" + s.connect(("0.0.0.0", EmptyTransferTest.ROUTER_LISTEN_PORT)) + s.sendall(AMQP_OPEN_BEGIN_ATTACH) + + # Give half a second for the attach to propagate to the broker and + # for the broker to send a response attach + sleep(1) + data = s.recv(2048) + self.assertIn("examples", repr(data)) + + # First send a message on link routed address "examples" with + # message body of "message 0" + # Verify the the sent message has been accepted. + TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00' + + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x01' + + b'\xa0\x01\x01\x43\x42' + + b'\x40\x40\x40\x40\x40\x42\x00\x53' + + b'\x73\xc0\x02\x01\x44\x00\x53\x77' + + b'\xa1\x09\x6d\x65\x73\x73\x61\x67' + + b'\x65\x20\x30') + s.sendall(TRANSFER_1) + sleep(0.5) + data = s.recv(1024) + # The delivery has been accepted. + self.assertIn("x00S$E", repr(data)) + + # Test case 1 + # Send an empty transfer frame to the router and you should + # receive a rejected disposition from the router. + # Without the fix for DISPATCH_1988, + # upon sending this EMPTY_TRANSFER + # the router crashes with the following assert + # qpid-dispatch/src/message.c:1260: qd_message_add_fanout: Assertion `content->pending && qd_buffer_size(content->pending) > 0' failed. + # This is the empty transfer frame that is sent to the router. + # [0x614000030050]: AMQP:FRAME:0 <- @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"\x01", message-format=0, settled=false, batchable=false] + EMPTY_TRANSFER = bytearray(b'\x00\x00\x00\x1c\x02\x00\x00\x00' + + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x02' + + b'\xa0\x01\x02\x43\x42' + + b'\x42\x40\x40\x40\x40\x42') + s.sendall(EMPTY_TRANSFER) + sleep(1) + data = s.recv(1024) + # The delivery has been rejected. + self.assertIn("x00S%E", repr(data)) + + # Let's send another transfer to make sure that the + # router has not crashed. + TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00' + + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x03' + + b'\xa0\x01\x03\x43\x42' + + b'\x40\x40\x40\x40\x40\x42\x00\x53' + + b'\x73\xc0\x02\x01\x44\x00\x53\x77' + + b'\xa1\x09\x6d\x65\x73\x73\x61\x67' + + b'\x65\x20\x30') + s.sendall(TRANSFER_1) + sleep(0.5) + data = s.recv(1024) + # The delivery has been accepted. + self.assertIn("x00S$E", repr(data)) + + # Now, send two emptry transfer frames, first transfer has + # more=true and the next transfer has more=false. + # This will again be rejected by the router. + # The following are the two transfer frames that will be + # sent to the router. + #[0x614000020050]: AMQP:FRAME: 0 <- @ transfer(20)[handle = 0, delivery - id = 4, delivery - tag = b"\x04", message - format = 0, settled = false, more = true, batchable = false] + #[0x614000020050]: AMQP:FRAME: 0 <- @ transfer(20)[handle = 0, delivery - id = 4, delivery - tag = b"\x04", message - format = 0, settled = false, more = false, batchable = false] + EMPTY_TRANSFER_MORE_TRUE = bytearray( + b'\x00\x00\x00\x1c\x02\x00\x00\x00' + + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x04' + + b'\xa0\x01\x04\x43\x42' + + b'\x41\x40\x40\x40\x40\x42') + EMPTY_TRANSFER_MORE_FALSE = bytearray( + b'\x00\x00\x00\x1c\x02\x00\x00\x00' + + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x04' + + b'\xa0\x01\x04\x43\x42' + + b'\x42\x40\x40\x40\x40\x42') + s.sendall(EMPTY_TRANSFER_MORE_TRUE) + s.sendall(EMPTY_TRANSFER_MORE_FALSE) + sleep(0.5) + data = s.recv(1024) + # The delivery has been rejected. + self.assertIn("x00S%E", repr(data)) + + class ConnectionLinkRouteTest(TestCase): """ Test connection scoped link route implementation From b1139f6e734a56d309ea0cfa3543fbcbea5d488a Mon Sep 17 00:00:00 2001 From: Ganesh Murthy Date: Tue, 9 Mar 2021 11:54:29 -0500 Subject: [PATCH 2/4] DISPATCH-1988: Fixed typos and added comment --- tests/system_tests_link_routes.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py index c2bb0d63fa..159b2935fe 100644 --- a/tests/system_tests_link_routes.py +++ b/tests/system_tests_link_routes.py @@ -1817,7 +1817,7 @@ def test_DISPATCH_1988(self): s.connect(("0.0.0.0", EmptyTransferTest.ROUTER_LISTEN_PORT)) s.sendall(AMQP_OPEN_BEGIN_ATTACH) - # Give half a second for the attach to propagate to the broker and + # Give a second for the attach to propagate to the broker and # for the broker to send a response attach sleep(1) data = s.recv(2048) @@ -1873,7 +1873,8 @@ def test_DISPATCH_1988(self): # The delivery has been accepted. self.assertIn("x00S$E", repr(data)) - # Now, send two emptry transfer frames, first transfer has + # Test case 2 + # Now, send two empty transfer frames, first transfer has # more=true and the next transfer has more=false. # This will again be rejected by the router. # The following are the two transfer frames that will be From 8454b67d77b0a22d4c76b4b13555ddb959d7f1f0 Mon Sep 17 00:00:00 2001 From: Ganesh Murthy Date: Tue, 9 Mar 2021 12:44:37 -0500 Subject: [PATCH 3/4] DISPATCH-1988: python-checker fixes --- tests/system_tests_link_routes.py | 53 +++++++++++++++++-------------- 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py index 159b2935fe..1f191bf3ea 100644 --- a/tests/system_tests_link_routes.py +++ b/tests/system_tests_link_routes.py @@ -1772,27 +1772,24 @@ def setUpClass(cls): config = [ ('router', {'mode': 'standalone', 'id': 'QDR.A'}), - # the client will connect to this listener - ('listener', {'role': 'normal', - 'host': '0.0.0.0', - 'port': cls.ROUTER_LISTEN_PORT, + # the client will connect to this listener + ('listener', {'role': 'normal', + 'host': '0.0.0.0', + 'port': cls.ROUTER_LISTEN_PORT, + 'saslMechanisms': 'ANONYMOUS'}), + # to connect to the fake broker + ('connector', {'name': 'broker', + 'role': 'route-container', + 'host': '127.0.0.1', + 'port': cls.tester.get_port(), 'saslMechanisms': 'ANONYMOUS'}), - - # to connect to the fake broker - ('connector', {'name': 'broker', - 'role': 'route-container', - 'host': '127.0.0.1', - 'port': cls.tester.get_port(), - 'saslMechanisms': 'ANONYMOUS'}), - - ('linkRoute', - {'prefix': 'examples', 'containerId': 'FakeBroker', - 'direction': 'in'}), - ('linkRoute', - {'prefix': 'examples', 'containerId': 'FakeBroker', - 'direction': 'out'}) + ('linkRoute', + {'prefix': 'examples', 'containerId': 'FakeBroker', + 'direction': 'in'}), + ('linkRoute', + {'prefix': 'examples', 'containerId': 'FakeBroker', + 'direction': 'out'}) ] - config = Qdrouterd.Config(config) cls.router = cls.tester.qdrouterd('A', config, wait=False) @@ -1808,7 +1805,17 @@ def _fake_broker(self, cls): def test_DISPATCH_1988(self): fake_broker = self._fake_broker(FakeBroker) AMQP_OPEN_BEGIN_ATTACH = bytearray( - b'\x41\x4d\x51\x50\x00\x01\x00\x00\x00\x00\x00\x21\x02\x00\x00\x00\x00\x53\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x04\xa1\x06\x2e\x2f\x73\x65\x6e\x64\x40\x40\x60\x7f\xff\x00\x00\x00\x21\x02\x00\x00\x00\x00\x53\x11\xd0\x00\x00\x00\x11\x00\x00\x00\x04\x40\x52\x00\x70\x7f\xff\xff\xff\x70\x7f\xff\xff\xff\x00\x00\x00\x5b\x02\x00\x00\x00\x00\x53\x12\xd0\x00\x00\x00\x4b\x00\x00\x00\x0b\xa1\x09\x6d\x79\x5f\x73\x65\x6e\x64\x65\x72\x52\x00\x42\x50\x02\x50\x00\x00\x53\x28\xd0\x00\x00\x00\x0b\x00\x00\x00\x05\x40\x52\x00\x40\x52\x00\x42\x00\x53\x29\xd0\x00\x00\x00\x14\x00\x00\x00\x05\xa1\x08\x65\x78\x61\x6d\x70\x6c\x65\x73\x52\x00\x40\x52\x00\x42\x40\x40\x52\x00\x53\x00') + b'\x41\x4d\x51\x50\x00\x01\x00\x00\x00\x00\x00\x21\x02\x00\x00' + b'\x00\x00\x53\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x04\xa1\x06' + b'\x2e\x2f\x73\x65\x6e\x64\x40\x40\x60\x7f\xff\x00\x00\x00\x21' + b'\x02\x00\x00\x00\x00\x53\x11\xd0\x00\x00\x00\x11\x00\x00\x00' + b'\x04\x40\x52\x00\x70\x7f\xff\xff\xff\x70\x7f\xff\xff\xff\x00' + b'\x00\x00\x5b\x02\x00\x00\x00\x00\x53\x12\xd0\x00\x00\x00\x4b' + b'\x00\x00\x00\x0b\xa1\x09\x6d\x79\x5f\x73\x65\x6e\x64\x65\x72' + b'\x52\x00\x42\x50\x02\x50\x00\x00\x53\x28\xd0\x00\x00\x00\x0b' + b'\x00\x00\x00\x05\x40\x52\x00\x40\x52\x00\x42\x00\x53\x29\xd0' + b'\x00\x00\x00\x14\x00\x00\x00\x05\xa1\x08\x65\x78\x61\x6d\x70' + b'\x6c\x65\x73\x52\x00\x40\x52\x00\x42\x40\x40\x52\x00\x53\x00') with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: # Connect to the router listening port and send an amqp, open, @@ -1849,9 +1856,9 @@ def test_DISPATCH_1988(self): # This is the empty transfer frame that is sent to the router. # [0x614000030050]: AMQP:FRAME:0 <- @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"\x01", message-format=0, settled=false, batchable=false] EMPTY_TRANSFER = bytearray(b'\x00\x00\x00\x1c\x02\x00\x00\x00' - + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x02' - + b'\xa0\x01\x02\x43\x42' - + b'\x42\x40\x40\x40\x40\x42') + + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52' + + b'\x02\xa0\x01\x02\x43\x42' + + b'\x42\x40\x40\x40\x40\x42') s.sendall(EMPTY_TRANSFER) sleep(1) data = s.recv(1024) From ec561122f7361996682b940d6d469f3a4de4a0cc Mon Sep 17 00:00:00 2001 From: Ganesh Murthy Date: Tue, 9 Mar 2021 15:26:34 -0500 Subject: [PATCH 4/4] DISPATCH-1988: Use regular socket creation instead of using the with statement --- tests/system_tests_link_routes.py | 176 +++++++++++++++--------------- 1 file changed, 89 insertions(+), 87 deletions(-) diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py index 1f191bf3ea..2edc5095a1 100644 --- a/tests/system_tests_link_routes.py +++ b/tests/system_tests_link_routes.py @@ -1817,93 +1817,95 @@ def test_DISPATCH_1988(self): b'\x00\x00\x00\x14\x00\x00\x00\x05\xa1\x08\x65\x78\x61\x6d\x70' b'\x6c\x65\x73\x52\x00\x40\x52\x00\x42\x40\x40\x52\x00\x53\x00') - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - # Connect to the router listening port and send an amqp, open, - # begin, attach. The attach is sent on the link - # routed address, "examples" - s.connect(("0.0.0.0", EmptyTransferTest.ROUTER_LISTEN_PORT)) - s.sendall(AMQP_OPEN_BEGIN_ATTACH) - - # Give a second for the attach to propagate to the broker and - # for the broker to send a response attach - sleep(1) - data = s.recv(2048) - self.assertIn("examples", repr(data)) - - # First send a message on link routed address "examples" with - # message body of "message 0" - # Verify the the sent message has been accepted. - TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00' - + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x01' - + b'\xa0\x01\x01\x43\x42' - + b'\x40\x40\x40\x40\x40\x42\x00\x53' - + b'\x73\xc0\x02\x01\x44\x00\x53\x77' - + b'\xa1\x09\x6d\x65\x73\x73\x61\x67' - + b'\x65\x20\x30') - s.sendall(TRANSFER_1) - sleep(0.5) - data = s.recv(1024) - # The delivery has been accepted. - self.assertIn("x00S$E", repr(data)) - - # Test case 1 - # Send an empty transfer frame to the router and you should - # receive a rejected disposition from the router. - # Without the fix for DISPATCH_1988, - # upon sending this EMPTY_TRANSFER - # the router crashes with the following assert - # qpid-dispatch/src/message.c:1260: qd_message_add_fanout: Assertion `content->pending && qd_buffer_size(content->pending) > 0' failed. - # This is the empty transfer frame that is sent to the router. - # [0x614000030050]: AMQP:FRAME:0 <- @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"\x01", message-format=0, settled=false, batchable=false] - EMPTY_TRANSFER = bytearray(b'\x00\x00\x00\x1c\x02\x00\x00\x00' - + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52' - + b'\x02\xa0\x01\x02\x43\x42' - + b'\x42\x40\x40\x40\x40\x42') - s.sendall(EMPTY_TRANSFER) - sleep(1) - data = s.recv(1024) - # The delivery has been rejected. - self.assertIn("x00S%E", repr(data)) - - # Let's send another transfer to make sure that the - # router has not crashed. - TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00' - + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x03' - + b'\xa0\x01\x03\x43\x42' - + b'\x40\x40\x40\x40\x40\x42\x00\x53' - + b'\x73\xc0\x02\x01\x44\x00\x53\x77' - + b'\xa1\x09\x6d\x65\x73\x73\x61\x67' - + b'\x65\x20\x30') - s.sendall(TRANSFER_1) - sleep(0.5) - data = s.recv(1024) - # The delivery has been accepted. - self.assertIn("x00S$E", repr(data)) - - # Test case 2 - # Now, send two empty transfer frames, first transfer has - # more=true and the next transfer has more=false. - # This will again be rejected by the router. - # The following are the two transfer frames that will be - # sent to the router. - #[0x614000020050]: AMQP:FRAME: 0 <- @ transfer(20)[handle = 0, delivery - id = 4, delivery - tag = b"\x04", message - format = 0, settled = false, more = true, batchable = false] - #[0x614000020050]: AMQP:FRAME: 0 <- @ transfer(20)[handle = 0, delivery - id = 4, delivery - tag = b"\x04", message - format = 0, settled = false, more = false, batchable = false] - EMPTY_TRANSFER_MORE_TRUE = bytearray( - b'\x00\x00\x00\x1c\x02\x00\x00\x00' - + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x04' - + b'\xa0\x01\x04\x43\x42' - + b'\x41\x40\x40\x40\x40\x42') - EMPTY_TRANSFER_MORE_FALSE = bytearray( - b'\x00\x00\x00\x1c\x02\x00\x00\x00' - + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x04' - + b'\xa0\x01\x04\x43\x42' - + b'\x42\x40\x40\x40\x40\x42') - s.sendall(EMPTY_TRANSFER_MORE_TRUE) - s.sendall(EMPTY_TRANSFER_MORE_FALSE) - sleep(0.5) - data = s.recv(1024) - # The delivery has been rejected. - self.assertIn("x00S%E", repr(data)) + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + # Connect to the router listening port and send an amqp, open, + # begin, attach. The attach is sent on the link + # routed address, "examples" + s.connect(("0.0.0.0", EmptyTransferTest.ROUTER_LISTEN_PORT)) + s.sendall(AMQP_OPEN_BEGIN_ATTACH) + + # Give a second for the attach to propagate to the broker and + # for the broker to send a response attach + sleep(1) + data = s.recv(2048) + self.assertIn("examples", repr(data)) + + # First send a message on link routed address "examples" with + # message body of "message 0" + # Verify the the sent message has been accepted. + TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00' + + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x01' + + b'\xa0\x01\x01\x43\x42' + + b'\x40\x40\x40\x40\x40\x42\x00\x53' + + b'\x73\xc0\x02\x01\x44\x00\x53\x77' + + b'\xa1\x09\x6d\x65\x73\x73\x61\x67' + + b'\x65\x20\x30') + s.sendall(TRANSFER_1) + sleep(0.5) + data = s.recv(1024) + # The delivery has been accepted. + self.assertIn("x00S$E", repr(data)) + + # Test case 1 + # Send an empty transfer frame to the router and you should + # receive a rejected disposition from the router. + # Without the fix for DISPATCH_1988, + # upon sending this EMPTY_TRANSFER + # the router crashes with the following assert + # qpid-dispatch/src/message.c:1260: qd_message_add_fanout: Assertion `content->pending && qd_buffer_size(content->pending) > 0' failed. + # This is the empty transfer frame that is sent to the router. + # [0x614000030050]: AMQP:FRAME:0 <- @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"\x01", message-format=0, settled=false, batchable=false] + EMPTY_TRANSFER = bytearray(b'\x00\x00\x00\x1c\x02\x00\x00\x00' + + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52' + + b'\x02\xa0\x01\x02\x43\x42' + + b'\x42\x40\x40\x40\x40\x42') + s.sendall(EMPTY_TRANSFER) + sleep(1) + data = s.recv(1024) + # The delivery has been rejected. + self.assertIn("x00S%E", repr(data)) + + # Let's send another transfer to make sure that the + # router has not crashed. + TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00' + + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x03' + + b'\xa0\x01\x03\x43\x42' + + b'\x40\x40\x40\x40\x40\x42\x00\x53' + + b'\x73\xc0\x02\x01\x44\x00\x53\x77' + + b'\xa1\x09\x6d\x65\x73\x73\x61\x67' + + b'\x65\x20\x30') + s.sendall(TRANSFER_1) + sleep(0.5) + data = s.recv(1024) + # The delivery has been accepted. + self.assertIn("x00S$E", repr(data)) + + # Test case 2 + # Now, send two empty transfer frames, first transfer has + # more=true and the next transfer has more=false. + # This will again be rejected by the router. + # The following are the two transfer frames that will be + # sent to the router. + #[0x614000020050]: AMQP:FRAME: 0 <- @ transfer(20)[handle = 0, delivery - id = 4, delivery - tag = b"\x04", message - format = 0, settled = false, more = true, batchable = false] + #[0x614000020050]: AMQP:FRAME: 0 <- @ transfer(20)[handle = 0, delivery - id = 4, delivery - tag = b"\x04", message - format = 0, settled = false, more = false, batchable = false] + EMPTY_TRANSFER_MORE_TRUE = bytearray( + b'\x00\x00\x00\x1c\x02\x00\x00\x00' + + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x04' + + b'\xa0\x01\x04\x43\x42' + + b'\x41\x40\x40\x40\x40\x42') + EMPTY_TRANSFER_MORE_FALSE = bytearray( + b'\x00\x00\x00\x1c\x02\x00\x00\x00' + + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x04' + + b'\xa0\x01\x04\x43\x42' + + b'\x42\x40\x40\x40\x40\x42') + s.sendall(EMPTY_TRANSFER_MORE_TRUE) + s.sendall(EMPTY_TRANSFER_MORE_FALSE) + sleep(0.5) + data = s.recv(1024) + # The delivery has been rejected. + self.assertIn("x00S%E", repr(data)) + + s.close() class ConnectionLinkRouteTest(TestCase):