From e90fc60a4de5e2f4fe67e9ef9275a63aea31c62b Mon Sep 17 00:00:00 2001 From: Ganesh Murthy Date: Fri, 10 Jun 2016 09:13:12 -0400 Subject: [PATCH] DISPATCH-367 - Swap link pointers so that the distribution will be balanced for synchronous senders --- src/router_core/forwarder.c | 14 ++++++++ tests/system_tests_one_router.py | 59 +++++++++++++++++++++++++++++++- 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 996788b3b1..705aa557d0 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -454,6 +454,7 @@ int qdr_forward_balanced_CT(qdr_core_t *core, // Start with the local links // qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks); + while (link_ref) { qdr_link_t *link = link_ref->link; uint32_t value = DEQ_SIZE(link->undelivered) + DEQ_SIZE(link->unsettled); @@ -523,6 +524,18 @@ int qdr_forward_balanced_CT(qdr_core_t *core, } } } + else if (best_eligible_link) { + // + // If we have found a best eligible local link, swap the head and the tail rlinks so that the next message can go + // to another rlink of equal cost. + // + if (DEQ_SIZE(addr->rlinks) > 1) { // Must have more than one local consumer for the swap to be meaningful + qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks); + DEQ_REMOVE_HEAD(addr->rlinks); + DEQ_INSERT_TAIL(addr->rlinks, link_ref); + } + + } qdr_link_t *chosen_link = 0; int chosen_link_bit = -1; @@ -555,6 +568,7 @@ int qdr_forward_balanced_CT(qdr_core_t *core, addr->deliveries_transit++; else addr->deliveries_egress++; + return 1; } diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py index 130c05a6e0..a1dbfc9f80 100644 --- a/tests/system_tests_one_router.py +++ b/tests/system_tests_one_router.py @@ -21,6 +21,7 @@ from proton import Message, Delivery, PENDING, ACCEPTED, REJECTED from system_test import TestCase, Qdrouterd, main_module from proton.handlers import MessagingHandler +from proton.utils import BlockingConnection from proton.reactor import Container, AtMostOnce, AtLeastOnce @@ -46,6 +47,7 @@ def setUpClass(cls): ('listener', {'port': cls.tester.get_port(), 'maxFrameSize': '2048', 'stripAnnotations': 'both'}), ('listener', {'port': cls.tester.get_port(), 'maxFrameSize': '2048', 'stripAnnotations': 'out'}), ('listener', {'port': cls.tester.get_port(), 'maxFrameSize': '2048', 'stripAnnotations': 'in'}), + ('listener', {'port': cls.tester.get_port(), 'linkCapacity': 1}), ('address', {'prefix': 'closest', 'distribution': 'closest'}), ('address', {'prefix': 'spread', 'distribution': 'balanced'}), @@ -987,6 +989,7 @@ def test_12_semantics_spread(self): pass self.assertEqual(30, len(rx_set)) + self.assertTrue(ca > 0) self.assertTrue(cb > 0) self.assertTrue(cc > 0) @@ -1000,6 +1003,10 @@ def test_12_semantics_spread(self): M3.stop() M4.stop() + def test_12a_semantics_balanced(self): + test = BalancedHandlerTest(self.address) + test.run() + self.assertTrue(test.success) def test_13_to_override(self): addr = self.address+"/toov/1" @@ -1113,6 +1120,57 @@ def on_message(self, event): def run(self): Container(self).run() +class BalancedHandlerTest(MessagingHandler): + """ + Send two synchronous messages to a balanced address that has two receivers. Each of the receiver should receive + one message. + """ + def __init__(self, address): + super(BalancedHandlerTest, self).__init__() + self.address = address + self.recv_a_count = 0 + self.recv_b_count = 0 + self.sender = None + self.receiver_b = None + self.receiver_a = None + self.n_sent = 0 + self.success = False + self.dest = "spread.2" + + def on_start(self, event): + conn = event.container.connect(self.address) + self.sender = event.container.create_sender(conn, self.dest) + self.receiver_a = event.container.create_receiver(conn, self.dest, name='A') + self.receiver_b = event.container.create_receiver(conn, self.dest, name='B') + + def on_message(self, event): + if event.receiver == self.receiver_a: + self.recv_a_count += 1 + elif event.receiver == self.receiver_b: + self.recv_b_count += 1 + + if self.recv_a_count == 1 and self.recv_b_count == 1: + self.success = True + self.receiver_a.close() + self.receiver_b.close() + event.connection.close() + + def on_settled(self, event): + # Send another message after the first one has been settled. This emulates a synchronous sender + if self.n_sent < 2: + msg = Message(body="Hello World 2") + event.sender.send(msg) + self.n_sent += 1 + + def on_sendable(self, event): + if self.n_sent < 1: + msg = Message(body="Hello World 1") + event.sender.send(msg) + self.n_sent += 1 + + def run(self): + Container(self).run() + class ExcessDeliveriesReleasedTest(MessagingHandler): def __init__(self, address): @@ -1307,6 +1365,5 @@ def on_released(self, event): def run(self): Container(self).run() - if __name__ == '__main__': unittest.main(main_module())