Skip to content
This repository was archived by the owner on Apr 15, 2026. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/qpid/dispatch/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,14 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery);
*/
qd_message_t * qd_get_message_context(pn_delivery_t *delivery);

/**
* Returns true if there is at least one non-empty buffer at the head of the content->buffers list
* or if the content->pending buffer is non-empty.
*
* @param msg A pointer to a message.
*/
bool qd_message_has_data_in_content_or_pending_buffers(qd_message_t *msg);

/**
* Send the message outbound on an outgoing link.
*
Expand Down
20 changes: 18 additions & 2 deletions src/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -1255,8 +1255,6 @@ void qd_message_add_fanout(qd_message_t *in_msg,
// DISPATCH-1590: content->buffers may not be set up yet if
// content->pending is the first buffer and it is not yet full.
if (!buf) {
// assumption: proton will never signal a readable delivery if there is
// no data at all.
assert(content->pending && qd_buffer_size(content->pending) > 0);
DEQ_INSERT_TAIL(content->buffers, content->pending);
content->pending = 0;
Expand Down Expand Up @@ -1422,6 +1420,24 @@ qd_message_t * qd_get_message_context(pn_delivery_t *delivery)
return 0;
}

bool qd_message_has_data_in_content_or_pending_buffers(qd_message_t *msg)
{
if (!msg)
return false;

if (MSG_CONTENT(msg)) {
if (DEQ_SIZE(MSG_CONTENT(msg)->buffers) > 0) {
qd_buffer_t *buf = DEQ_HEAD(MSG_CONTENT(msg)->buffers);
if (buf && qd_buffer_size(buf) > 0)
return true;
}
if (MSG_CONTENT(msg)->pending && qd_buffer_size(MSG_CONTENT(msg)->pending) > 0)
return true;
}

return false;
}


qd_message_t *qd_message_receive(pn_delivery_t *delivery)
{
Expand Down
36 changes: 36 additions & 0 deletions src/router_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,42 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
qd_message_t *msg = qd_message_receive(pnd);
bool receive_complete = qd_message_receive_complete(msg);

//
// The very first time AMQP_rx_handler is called on a PN_DELIVERY event, it calls qd_message_receive(). When qd_message_receive() returns, we check here if
// there are any data in the content buffers. If there is no content in the buffers, there is no reason to route the delivery. We will wait for some data
// in the buffers before we start to route the delivery.
// Notice that the if statement checks for the existence of a delivery (qdr_delivery_t). Existence of a delivery means that the delivery has been routed when
// there was data in the buffers (When a delivery has been routed successfully, the delivery (qdr_delivery_t) will be non null)
//
// The following if statement will deal with the following cases:-
// 1. We receive one empty transfer frame with more=true followed by another empty transfer frame with (more=false and abort=true) or with just more=false
// In this case, there is no data at all in the message content buffers, we will reject the message when receive_complete=true. We will never route this
// delivery, so core thread will not be involved
// 2. We receive 2 or more empty transfer frames with more=true followed by another empty transfer frame with (more=false and abort=true) or with just more=false
// This case is similar to #1. We have no content in any of the buffers, we will reject this message after receive_complete=true. We will never route this
// delivery, so core thread will not be involved
// 3. Exactly one empty transfer frame with more=false and abort=false
// In this case, again there is still no content in any of the buffers, we will reject this message. Again, we will not route this message, so the core thread is not involved.
//
if (!delivery && !qd_message_has_data_in_content_or_pending_buffers(msg)) {
if (receive_complete) {
// There is no qdr_delivery_t (delivery) yet which means this message has not been routed yet (the first run of this function is not complete yet) and
// the message is fully received (receive_complete=true) but there is no content in the message buffers.
// This is only possible if there were one or more empty transfer frames.
// Since there is nothing in the message, we will reject it (AMQP message must have a non empty message body)
pn_link_flow(pn_link, 1);
if (pn_delivery_aborted(pnd))
qd_message_set_discard(msg, true);
pn_delivery_update(pnd, PN_REJECTED);
pn_delivery_settle(pnd);
// qd_message_free will free all the associated content buffers and also the content->pending buffer
qd_message_free(msg);
qd_log(router->log_source, QD_LOG_TRACE, "Message rejected due to empty message");
}

return false;
}

if (!qd_message_oversize(msg)) {
// message not rejected as oversize
if (receive_complete) {
Expand Down
145 changes: 145 additions & 0 deletions tests/system_tests_link_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from time import sleep, time
from threading import Event
from subprocess import PIPE, STDOUT
import socket

from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process, TestTimeout, \
AsyncTestSender, AsyncTestReceiver, MgmtMsgProxy, unittest, QdManager
Expand Down Expand Up @@ -1763,6 +1764,150 @@ def test_DISPATCH_1496(self):
self.assertEqual(drain_receiver.error, None)


class EmptyTransferTest(TestCase):
@classmethod
def setUpClass(cls):
super(EmptyTransferTest, cls).setUpClass()
cls.ROUTER_LISTEN_PORT = cls.tester.get_port()

config = [
('router', {'mode': 'standalone', 'id': 'QDR.A'}),
# the client will connect to this listener
('listener', {'role': 'normal',
'host': '0.0.0.0',
'port': cls.ROUTER_LISTEN_PORT,
'saslMechanisms': 'ANONYMOUS'}),
# to connect to the fake broker
('connector', {'name': 'broker',
'role': 'route-container',
'host': '127.0.0.1',
'port': cls.tester.get_port(),
'saslMechanisms': 'ANONYMOUS'}),
('linkRoute',
{'prefix': 'examples', 'containerId': 'FakeBroker',
'direction': 'in'}),
('linkRoute',
{'prefix': 'examples', 'containerId': 'FakeBroker',
'direction': 'out'})
]
config = Qdrouterd.Config(config)
cls.router = cls.tester.qdrouterd('A', config, wait=False)

def _fake_broker(self, cls):
"""
Spawn a fake broker listening on the broker's connector
"""
fake_broker = cls(self.router.connector_addresses[0])
# wait until the connection to the fake broker activates
self.router.wait_connectors()
return fake_broker

def test_DISPATCH_1988(self):
fake_broker = self._fake_broker(FakeBroker)
AMQP_OPEN_BEGIN_ATTACH = bytearray(
b'\x41\x4d\x51\x50\x00\x01\x00\x00\x00\x00\x00\x21\x02\x00\x00'
b'\x00\x00\x53\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x04\xa1\x06'
b'\x2e\x2f\x73\x65\x6e\x64\x40\x40\x60\x7f\xff\x00\x00\x00\x21'
b'\x02\x00\x00\x00\x00\x53\x11\xd0\x00\x00\x00\x11\x00\x00\x00'
b'\x04\x40\x52\x00\x70\x7f\xff\xff\xff\x70\x7f\xff\xff\xff\x00'
b'\x00\x00\x5b\x02\x00\x00\x00\x00\x53\x12\xd0\x00\x00\x00\x4b'
b'\x00\x00\x00\x0b\xa1\x09\x6d\x79\x5f\x73\x65\x6e\x64\x65\x72'
b'\x52\x00\x42\x50\x02\x50\x00\x00\x53\x28\xd0\x00\x00\x00\x0b'
b'\x00\x00\x00\x05\x40\x52\x00\x40\x52\x00\x42\x00\x53\x29\xd0'
b'\x00\x00\x00\x14\x00\x00\x00\x05\xa1\x08\x65\x78\x61\x6d\x70'
b'\x6c\x65\x73\x52\x00\x40\x52\x00\x42\x40\x40\x52\x00\x53\x00')

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect to the router listening port and send an amqp, open,
# begin, attach. The attach is sent on the link
# routed address, "examples"
s.connect(("0.0.0.0", EmptyTransferTest.ROUTER_LISTEN_PORT))
s.sendall(AMQP_OPEN_BEGIN_ATTACH)

# Give a second for the attach to propagate to the broker and
# for the broker to send a response attach
sleep(1)
data = s.recv(2048)
self.assertIn("examples", repr(data))

# First send a message on link routed address "examples" with
# message body of "message 0"
# Verify the the sent message has been accepted.
TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00'
+ b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x01'
+ b'\xa0\x01\x01\x43\x42'
+ b'\x40\x40\x40\x40\x40\x42\x00\x53'
+ b'\x73\xc0\x02\x01\x44\x00\x53\x77'
+ b'\xa1\x09\x6d\x65\x73\x73\x61\x67'
+ b'\x65\x20\x30')
s.sendall(TRANSFER_1)
sleep(0.5)
data = s.recv(1024)
# The delivery has been accepted.
self.assertIn("x00S$E", repr(data))

# Test case 1
# Send an empty transfer frame to the router and you should
# receive a rejected disposition from the router.
# Without the fix for DISPATCH_1988,
# upon sending this EMPTY_TRANSFER
# the router crashes with the following assert
# qpid-dispatch/src/message.c:1260: qd_message_add_fanout: Assertion `content->pending && qd_buffer_size(content->pending) > 0' failed.
# This is the empty transfer frame that is sent to the router.
# [0x614000030050]: AMQP:FRAME:0 <- @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"\x01", message-format=0, settled=false, batchable=false]
EMPTY_TRANSFER = bytearray(b'\x00\x00\x00\x1c\x02\x00\x00\x00'
+ b'\x00\x53\x14\xc0\x0f\x0b\x43\x52'
+ b'\x02\xa0\x01\x02\x43\x42'
+ b'\x42\x40\x40\x40\x40\x42')
s.sendall(EMPTY_TRANSFER)
sleep(1)
data = s.recv(1024)
# The delivery has been rejected.
self.assertIn("x00S%E", repr(data))

# Let's send another transfer to make sure that the
# router has not crashed.
TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00'
+ b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x03'
+ b'\xa0\x01\x03\x43\x42'
+ b'\x40\x40\x40\x40\x40\x42\x00\x53'
+ b'\x73\xc0\x02\x01\x44\x00\x53\x77'
+ b'\xa1\x09\x6d\x65\x73\x73\x61\x67'
+ b'\x65\x20\x30')
s.sendall(TRANSFER_1)
sleep(0.5)
data = s.recv(1024)
# The delivery has been accepted.
self.assertIn("x00S$E", repr(data))

# Test case 2
# Now, send two empty transfer frames, first transfer has
# more=true and the next transfer has more=false.
# This will again be rejected by the router.
# The following are the two transfer frames that will be
# sent to the router.
#[0x614000020050]: AMQP:FRAME: 0 <- @ transfer(20)[handle = 0, delivery - id = 4, delivery - tag = b"\x04", message - format = 0, settled = false, more = true, batchable = false]
#[0x614000020050]: AMQP:FRAME: 0 <- @ transfer(20)[handle = 0, delivery - id = 4, delivery - tag = b"\x04", message - format = 0, settled = false, more = false, batchable = false]
EMPTY_TRANSFER_MORE_TRUE = bytearray(
b'\x00\x00\x00\x1c\x02\x00\x00\x00'
+ b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x04'
+ b'\xa0\x01\x04\x43\x42'
+ b'\x41\x40\x40\x40\x40\x42')
EMPTY_TRANSFER_MORE_FALSE = bytearray(
b'\x00\x00\x00\x1c\x02\x00\x00\x00'
+ b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x04'
+ b'\xa0\x01\x04\x43\x42'
+ b'\x42\x40\x40\x40\x40\x42')
s.sendall(EMPTY_TRANSFER_MORE_TRUE)
s.sendall(EMPTY_TRANSFER_MORE_FALSE)
sleep(0.5)
data = s.recv(1024)
# The delivery has been rejected.
self.assertIn("x00S%E", repr(data))

s.close()


class ConnectionLinkRouteTest(TestCase):
"""
Test connection scoped link route implementation
Expand Down