From bdd8462c2ee4040123584d1ca876cd4e95dd14d0 Mon Sep 17 00:00:00 2001 From: Ganesh Murthy Date: Fri, 20 May 2016 12:59:48 -0400 Subject: [PATCH 1/2] DISPATCH-341 - Added drain support for message routes and link routes --- src/router_core/connections.c | 5 +- src/router_core/transfer.c | 60 +++++++++-------- tests/CMakeLists.txt | 1 + tests/system_tests_drain.py | 55 +++++++++++++++ tests/system_tests_drain_support.py | 101 ++++++++++++++++++++++++++++ tests/system_tests_link_routes.py | 15 ++++- 6 files changed, 207 insertions(+), 30 deletions(-) create mode 100644 tests/system_tests_drain.py create mode 100644 tests/system_tests_drain_support.py diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 90dd7789f7..00ba986b42 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -188,7 +188,10 @@ int qdr_connection_process(qdr_connection_t *conn) sys_mutex_unlock(conn->work_lock); if (link) { - core->flow_handler(core->user_context, link, link->incremental_credit); + if (link->drain_mode) + core->drained_handler(core->user_context, link); + else + core->flow_handler(core->user_context, link, link->incremental_credit); link->incremental_credit = 0; event_count++; } diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 8a0387569e..944b4c6622 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -103,40 +103,46 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) { qdr_connection_t *conn = link->conn; qdr_delivery_t *dlv; - bool drained = false; int offer = -1; bool settled = false; + bool drain_mode = link->drain_mode; if (link->link_direction == QD_OUTGOING) { - while (credit > 0 && !drained) { + + while (credit > 0) { sys_mutex_lock(conn->work_lock); + dlv = DEQ_HEAD(link->undelivered); - if (dlv) { - DEQ_REMOVE_HEAD(link->undelivered); - settled = dlv->settled; - if (!settled) { - DEQ_INSERT_TAIL(link->unsettled, dlv); - dlv->where = QDR_DELIVERY_IN_UNSETTLED; - } else - dlv->where = QDR_DELIVERY_NOWHERE; - credit--; - link->total_deliveries++; - offer = DEQ_SIZE(link->undelivered); + + if (!dlv) { + sys_mutex_unlock(conn->work_lock); + break; + } + + DEQ_REMOVE_HEAD(link->undelivered); + settled = dlv->settled; + + if (!settled) { + DEQ_INSERT_TAIL(link->unsettled, dlv); + dlv->where = QDR_DELIVERY_IN_UNSETTLED; } else - drained = true; + dlv->where = QDR_DELIVERY_NOWHERE; + + credit--; + link->total_deliveries++; + offer = DEQ_SIZE(link->undelivered); + sys_mutex_unlock(conn->work_lock); - if (dlv) { - link->credit_to_core--; - core->deliver_handler(core->user_context, link, dlv, settled); - if (settled) - qdr_delivery_free(dlv); - } + link->credit_to_core--; + core->deliver_handler(core->user_context, link, dlv, settled); + if (settled) + qdr_delivery_free(dlv); } - if (drained) + if (drain_mode) core->drained_handler(core->user_context, link); - else if (offer != -1) + if (offer > 0) core->offer_handler(core->user_context, link, offer); } @@ -327,6 +333,11 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar bool drain = action->args.connection.drain; bool activate = false; + // + // Record the drain mode for the link + // + link->drain_mode = drain; + // // If this is an attach-routed link, propagate the flow data downrange. // Note that the credit value is incremental. @@ -346,11 +357,6 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar sys_mutex_unlock(link->conn->work_lock); } - // - // Record the drain mode for the link - // - link->drain_mode = drain; - if (activate) qdr_connection_activate_CT(core, link->conn); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index eef79007a2..1c1be1c043 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -72,6 +72,7 @@ add_test(router_policy_test ${TEST_WRAP} -m unittest -v router_policy_test) foreach(py_test_module # system_tests_broker system_tests_link_routes + system_tests_drain system_tests_management system_tests_one_router system_tests_policy diff --git a/tests/system_tests_drain.py b/tests/system_tests_drain.py new file mode 100644 index 0000000000..ba503a13cd --- /dev/null +++ b/tests/system_tests_drain.py @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest + +from system_test import TestCase, Qdrouterd, main_module +from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler + +class DrainSupportTest(TestCase): + + @classmethod + def setUpClass(cls): + """Start a router and a messenger""" + super(DrainSupportTest, cls).setUpClass() + name = "test-router" + config = Qdrouterd.Config([ + ('router', {'mode': 'standalone', 'id': 'QDR'}), + + # Setting the linkCapacity to 10 will allow the sender to send a burst of 10 messages + ('listener', {'port': cls.tester.get_port(), 'linkCapacity': 10}), + + ]) + + cls.router = cls.tester.qdrouterd(name, config) + cls.router.wait_ready() + cls.address = cls.router.addresses[0] + + def test_drain_support_all_messages(self): + drain_support = DrainMessagesHandler(self.address) + drain_support.run() + self.assertTrue(drain_support.drain_successful) + + def test_drain_support_one_message(self): + drain_support = DrainOneMessageHandler(self.address) + drain_support.run() + self.assertTrue(drain_support.drain_successful) + +if __name__ == '__main__': + unittest.main(main_module()) \ No newline at end of file diff --git a/tests/system_tests_drain_support.py b/tests/system_tests_drain_support.py new file mode 100644 index 0000000000..6192a7cbf7 --- /dev/null +++ b/tests/system_tests_drain_support.py @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton.handlers import MessagingHandler +from proton.reactor import Container +from proton import Message + +class DrainMessagesHandler(MessagingHandler): + def __init__(self, address): + # prefetch is set to zero so that proton does not automatically issue 10 credits. + super(DrainMessagesHandler, self).__init__(prefetch=0) + self.conn = None + self.sender = None + self.receiver = None + self.sent_count = 0 + self.received_count = 0 + self.address = address + self.drain_successful = False + + def on_start(self, event): + self.conn = event.container.connect(self.address) + + # Create a sender and a receiver. They are both listening on the same address + self.receiver = event.container.create_receiver(self.conn, "org.apache.dev") + self.sender = event.container.create_sender(self.conn, "org.apache.dev") + self.receiver.flow(1) + + def on_sendable(self, event): + if self.sent_count < 10: + msg = Message(body="Hello World") + dlv = event.sender.send(msg) + dlv.settle() + self.sent_count += 1 + + def on_message(self, event): + if event.receiver == self.receiver: + if "Hello World" == event.message.body: + self.received_count += 1 + + if self.received_count < 4: + event.receiver.flow(1) + elif self.received_count == 4: + # We are issuing a drain of 20. This means that we will receive all the 10 messages + # that the sender is sending. The router will also send back a response flow frame with + # drain=True but I don't have any way of making sure that the response frame reached the + # receiver + event.receiver.drain(20) + + # The fact that the event.link.credit is 0 means that the receiver will not be receiving any more + # messages. That along with 10 messages received indicates that the drain worked and we can + # declare that the test is successful + if self.received_count == 10 and event.link.credit == 0: + self.drain_successful = True + self.receiver.close() + self.sender.close() + self.conn.close() + + def run(self): + Container(self).run() + +class DrainOneMessageHandler(DrainMessagesHandler): + def __init__(self, address): + super(DrainOneMessageHandler, self).__init__(address) + + def on_message(self, event): + if event.receiver == self.receiver: + if "Hello World" == event.message.body: + self.received_count += 1 + + if self.received_count < 4: + event.receiver.flow(1) + elif self.received_count == 4: + # We are issuing a drain of 1 after we receive the 4th message. + # This means that going forward, we will receive only one more message. + event.receiver.drain(1) + + # The fact that the event.link.credit is 0 means that the receiver will not be receiving any more + # messages. That along with 5 messages received (4 earlier messages and 1 extra message for drain=1) + # indicates that the drain worked and we can declare that the test is successful + if self.received_count == 5 and event.link.credit == 0: + self.drain_successful = True + self.receiver.close() + self.sender.close() + self.conn.close() + diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py index 8f2ee8d723..37317f9cf4 100644 --- a/tests/system_tests_link_routes.py +++ b/tests/system_tests_link_routes.py @@ -28,6 +28,8 @@ from proton.reactor import AtMostOnce, Container from proton.utils import BlockingConnection, LinkDetached +from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler + from qpid_dispatch.management.client import Node class LinkRoutePatternTest(TestCase): @@ -443,6 +445,16 @@ def test_close_with_unsettled(self): test.run() self.assertEqual(None, test.error) + def test_www_drain_support_all_messages(self): + drain_support = DrainMessagesHandler(self.routers[2].addresses[1]) + drain_support.run() + self.assertTrue(drain_support.drain_successful) + + def test_www_drain_support_one_message(self): + drain_support = DrainOneMessageHandler(self.routers[2].addresses[1]) + drain_support.run() + self.assertTrue(drain_support.drain_successful) + class DeliveryTagsTest(MessagingHandler): def __init__(self, sender_address, listening_address, qdstat_address): @@ -565,7 +577,6 @@ def on_message(self, event): def run(self): Container(self).run() - - if __name__ == '__main__': unittest.main(main_module()) + From 124fddaad1c47e277af7ce18b0e4b4c09138c09b Mon Sep 17 00:00:00 2001 From: Ganesh Murthy Date: Fri, 20 May 2016 13:58:36 -0400 Subject: [PATCH 2/2] DISPATCH-341 - Added drain support for message routes and link routes --- src/router_core/connections.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 00ba986b42..6b91d25f2d 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -190,7 +190,7 @@ int qdr_connection_process(qdr_connection_t *conn) if (link) { if (link->drain_mode) core->drained_handler(core->user_context, link); - else + if (link->incremental_credit > 0) core->flow_handler(core->user_context, link, link->incremental_credit); link->incremental_credit = 0; event_count++;