diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index 07e144ad67..1558dde3bb 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -247,6 +247,14 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery); */ qd_message_t * qd_get_message_context(pn_delivery_t *delivery); +/** + * Returns true if there is at least one non-empty buffer at the head of the content->buffers list + * or if the content->pending buffer is non-empty. + * + * @param msg A pointer to a message. + */ +bool qd_message_has_data_in_content_or_pending_buffers(qd_message_t *msg); + /** * Send the message outbound on an outgoing link. * diff --git a/src/message.c b/src/message.c index 1679407548..3508a996ed 100644 --- a/src/message.c +++ b/src/message.c @@ -1255,8 +1255,6 @@ void qd_message_add_fanout(qd_message_t *in_msg, // DISPATCH-1590: content->buffers may not be set up yet if // content->pending is the first buffer and it is not yet full. if (!buf) { - // assumption: proton will never signal a readable delivery if there is - // no data at all. assert(content->pending && qd_buffer_size(content->pending) > 0); DEQ_INSERT_TAIL(content->buffers, content->pending); content->pending = 0; @@ -1422,6 +1420,24 @@ qd_message_t * qd_get_message_context(pn_delivery_t *delivery) return 0; } +bool qd_message_has_data_in_content_or_pending_buffers(qd_message_t *msg) +{ + if (!msg) + return false; + + if (MSG_CONTENT(msg)) { + if (DEQ_SIZE(MSG_CONTENT(msg)->buffers) > 0) { + qd_buffer_t *buf = DEQ_HEAD(MSG_CONTENT(msg)->buffers); + if (buf && qd_buffer_size(buf) > 0) + return true; + } + if (MSG_CONTENT(msg)->pending && qd_buffer_size(MSG_CONTENT(msg)->pending) > 0) + return true; + } + + return false; +} + qd_message_t *qd_message_receive(pn_delivery_t *delivery) { diff --git a/src/router_node.c b/src/router_node.c index 70d98f91d7..bccf6cfde2 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -451,6 +451,42 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link) qd_message_t *msg = qd_message_receive(pnd); bool receive_complete = qd_message_receive_complete(msg); + // + // The very first time AMQP_rx_handler is called on a PN_DELIVERY event, it calls qd_message_receive(). When qd_message_receive() returns, we check here if + // there are any data in the content buffers. If there is no content in the buffers, there is no reason to route the delivery. We will wait for some data + // in the buffers before we start to route the delivery. + // Notice that the if statement checks for the existence of a delivery (qdr_delivery_t). Existence of a delivery means that the delivery has been routed when + // there was data in the buffers (When a delivery has been routed successfully, the delivery (qdr_delivery_t) will be non null) + // + // The following if statement will deal with the following cases:- + // 1. We receive one empty transfer frame with more=true followed by another empty transfer frame with (more=false and abort=true) or with just more=false + // In this case, there is no data at all in the message content buffers, we will reject the message when receive_complete=true. We will never route this + // delivery, so core thread will not be involved + // 2. We receive 2 or more empty transfer frames with more=true followed by another empty transfer frame with (more=false and abort=true) or with just more=false + // This case is similar to #1. We have no content in any of the buffers, we will reject this message after receive_complete=true. We will never route this + // delivery, so core thread will not be involved + // 3. Exactly one empty transfer frame with more=false and abort=false + // In this case, again there is still no content in any of the buffers, we will reject this message. Again, we will not route this message, so the core thread is not involved. + // + if (!delivery && !qd_message_has_data_in_content_or_pending_buffers(msg)) { + if (receive_complete) { + // There is no qdr_delivery_t (delivery) yet which means this message has not been routed yet (the first run of this function is not complete yet) and + // the message is fully received (receive_complete=true) but there is no content in the message buffers. + // This is only possible if there were one or more empty transfer frames. + // Since there is nothing in the message, we will reject it (AMQP message must have a non empty message body) + pn_link_flow(pn_link, 1); + if (pn_delivery_aborted(pnd)) + qd_message_set_discard(msg, true); + pn_delivery_update(pnd, PN_REJECTED); + pn_delivery_settle(pnd); + // qd_message_free will free all the associated content buffers and also the content->pending buffer + qd_message_free(msg); + qd_log(router->log_source, QD_LOG_TRACE, "Message rejected due to empty message"); + } + + return false; + } + if (!qd_message_oversize(msg)) { // message not rejected as oversize if (receive_complete) { diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py index 15e00fe10c..2edc5095a1 100644 --- a/tests/system_tests_link_routes.py +++ b/tests/system_tests_link_routes.py @@ -25,6 +25,7 @@ from time import sleep, time from threading import Event from subprocess import PIPE, STDOUT +import socket from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process, TestTimeout, \ AsyncTestSender, AsyncTestReceiver, MgmtMsgProxy, unittest, QdManager @@ -1763,6 +1764,150 @@ def test_DISPATCH_1496(self): self.assertEqual(drain_receiver.error, None) +class EmptyTransferTest(TestCase): + @classmethod + def setUpClass(cls): + super(EmptyTransferTest, cls).setUpClass() + cls.ROUTER_LISTEN_PORT = cls.tester.get_port() + + config = [ + ('router', {'mode': 'standalone', 'id': 'QDR.A'}), + # the client will connect to this listener + ('listener', {'role': 'normal', + 'host': '0.0.0.0', + 'port': cls.ROUTER_LISTEN_PORT, + 'saslMechanisms': 'ANONYMOUS'}), + # to connect to the fake broker + ('connector', {'name': 'broker', + 'role': 'route-container', + 'host': '127.0.0.1', + 'port': cls.tester.get_port(), + 'saslMechanisms': 'ANONYMOUS'}), + ('linkRoute', + {'prefix': 'examples', 'containerId': 'FakeBroker', + 'direction': 'in'}), + ('linkRoute', + {'prefix': 'examples', 'containerId': 'FakeBroker', + 'direction': 'out'}) + ] + config = Qdrouterd.Config(config) + cls.router = cls.tester.qdrouterd('A', config, wait=False) + + def _fake_broker(self, cls): + """ + Spawn a fake broker listening on the broker's connector + """ + fake_broker = cls(self.router.connector_addresses[0]) + # wait until the connection to the fake broker activates + self.router.wait_connectors() + return fake_broker + + def test_DISPATCH_1988(self): + fake_broker = self._fake_broker(FakeBroker) + AMQP_OPEN_BEGIN_ATTACH = bytearray( + b'\x41\x4d\x51\x50\x00\x01\x00\x00\x00\x00\x00\x21\x02\x00\x00' + b'\x00\x00\x53\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x04\xa1\x06' + b'\x2e\x2f\x73\x65\x6e\x64\x40\x40\x60\x7f\xff\x00\x00\x00\x21' + b'\x02\x00\x00\x00\x00\x53\x11\xd0\x00\x00\x00\x11\x00\x00\x00' + b'\x04\x40\x52\x00\x70\x7f\xff\xff\xff\x70\x7f\xff\xff\xff\x00' + b'\x00\x00\x5b\x02\x00\x00\x00\x00\x53\x12\xd0\x00\x00\x00\x4b' + b'\x00\x00\x00\x0b\xa1\x09\x6d\x79\x5f\x73\x65\x6e\x64\x65\x72' + b'\x52\x00\x42\x50\x02\x50\x00\x00\x53\x28\xd0\x00\x00\x00\x0b' + b'\x00\x00\x00\x05\x40\x52\x00\x40\x52\x00\x42\x00\x53\x29\xd0' + b'\x00\x00\x00\x14\x00\x00\x00\x05\xa1\x08\x65\x78\x61\x6d\x70' + b'\x6c\x65\x73\x52\x00\x40\x52\x00\x42\x40\x40\x52\x00\x53\x00') + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + # Connect to the router listening port and send an amqp, open, + # begin, attach. The attach is sent on the link + # routed address, "examples" + s.connect(("0.0.0.0", EmptyTransferTest.ROUTER_LISTEN_PORT)) + s.sendall(AMQP_OPEN_BEGIN_ATTACH) + + # Give a second for the attach to propagate to the broker and + # for the broker to send a response attach + sleep(1) + data = s.recv(2048) + self.assertIn("examples", repr(data)) + + # First send a message on link routed address "examples" with + # message body of "message 0" + # Verify the the sent message has been accepted. + TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00' + + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x01' + + b'\xa0\x01\x01\x43\x42' + + b'\x40\x40\x40\x40\x40\x42\x00\x53' + + b'\x73\xc0\x02\x01\x44\x00\x53\x77' + + b'\xa1\x09\x6d\x65\x73\x73\x61\x67' + + b'\x65\x20\x30') + s.sendall(TRANSFER_1) + sleep(0.5) + data = s.recv(1024) + # The delivery has been accepted. + self.assertIn("x00S$E", repr(data)) + + # Test case 1 + # Send an empty transfer frame to the router and you should + # receive a rejected disposition from the router. + # Without the fix for DISPATCH_1988, + # upon sending this EMPTY_TRANSFER + # the router crashes with the following assert + # qpid-dispatch/src/message.c:1260: qd_message_add_fanout: Assertion `content->pending && qd_buffer_size(content->pending) > 0' failed. + # This is the empty transfer frame that is sent to the router. + # [0x614000030050]: AMQP:FRAME:0 <- @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"\x01", message-format=0, settled=false, batchable=false] + EMPTY_TRANSFER = bytearray(b'\x00\x00\x00\x1c\x02\x00\x00\x00' + + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52' + + b'\x02\xa0\x01\x02\x43\x42' + + b'\x42\x40\x40\x40\x40\x42') + s.sendall(EMPTY_TRANSFER) + sleep(1) + data = s.recv(1024) + # The delivery has been rejected. + self.assertIn("x00S%E", repr(data)) + + # Let's send another transfer to make sure that the + # router has not crashed. + TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00' + + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x03' + + b'\xa0\x01\x03\x43\x42' + + b'\x40\x40\x40\x40\x40\x42\x00\x53' + + b'\x73\xc0\x02\x01\x44\x00\x53\x77' + + b'\xa1\x09\x6d\x65\x73\x73\x61\x67' + + b'\x65\x20\x30') + s.sendall(TRANSFER_1) + sleep(0.5) + data = s.recv(1024) + # The delivery has been accepted. + self.assertIn("x00S$E", repr(data)) + + # Test case 2 + # Now, send two empty transfer frames, first transfer has + # more=true and the next transfer has more=false. + # This will again be rejected by the router. + # The following are the two transfer frames that will be + # sent to the router. + #[0x614000020050]: AMQP:FRAME: 0 <- @ transfer(20)[handle = 0, delivery - id = 4, delivery - tag = b"\x04", message - format = 0, settled = false, more = true, batchable = false] + #[0x614000020050]: AMQP:FRAME: 0 <- @ transfer(20)[handle = 0, delivery - id = 4, delivery - tag = b"\x04", message - format = 0, settled = false, more = false, batchable = false] + EMPTY_TRANSFER_MORE_TRUE = bytearray( + b'\x00\x00\x00\x1c\x02\x00\x00\x00' + + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x04' + + b'\xa0\x01\x04\x43\x42' + + b'\x41\x40\x40\x40\x40\x42') + EMPTY_TRANSFER_MORE_FALSE = bytearray( + b'\x00\x00\x00\x1c\x02\x00\x00\x00' + + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x04' + + b'\xa0\x01\x04\x43\x42' + + b'\x42\x40\x40\x40\x40\x42') + s.sendall(EMPTY_TRANSFER_MORE_TRUE) + s.sendall(EMPTY_TRANSFER_MORE_FALSE) + sleep(0.5) + data = s.recv(1024) + # The delivery has been rejected. + self.assertIn("x00S%E", repr(data)) + + s.close() + + class ConnectionLinkRouteTest(TestCase): """ Test connection scoped link route implementation