From f7437644d1a938ab3220aa38dccbdc2627ffaefc Mon Sep 17 00:00:00 2001 From: mick goulish Date: Thu, 29 Jun 2017 02:26:17 -0400 Subject: [PATCH] DISPATCH-209 : linkroute test better for proactor --- tests/system_tests_three_routers.py | 284 ++++++++++++++++++++++++++-- 1 file changed, 264 insertions(+), 20 deletions(-) diff --git a/tests/system_tests_three_routers.py b/tests/system_tests_three_routers.py index cfc210c45a..d7b97a1526 100644 --- a/tests/system_tests_three_routers.py +++ b/tests/system_tests_three_routers.py @@ -18,11 +18,14 @@ # import unittest, os, json -from subprocess import PIPE, STDOUT -from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable, Timeout -from system_test import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process +from subprocess import PIPE, STDOUT +from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable, Timeout +from system_test import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process from proton.handlers import MessagingHandler -from proton.reactor import Container, AtMostOnce, AtLeastOnce, DynamicNodeProperties, LinkOption +from proton.reactor import Container, AtMostOnce, AtLeastOnce, DynamicNodeProperties, LinkOption +from proton.utils import BlockingConnection +from qpid_dispatch.management.client import Node + import time @@ -42,7 +45,7 @@ def setUpClass(cls): """Start a router and a sender-listener client""" super(RouterTest, cls).setUpClass() - def router ( name, connection_1, connection_2=None ): + def router ( name, more_config ): config = [ ('router', @@ -52,6 +55,7 @@ def router ( name, connection_1, connection_2=None ): ), ('listener', {'port' : cls.tester.get_port(), + 'role' : 'normal', 'stripAnnotations' : 'no' } ), @@ -60,10 +64,7 @@ def router ( name, connection_1, connection_2=None ): 'distribution' : 'closest' } ), - ] - config.append ( connection_1 ) - if None != connection_2: - config.append ( connection_2 ) + ] + more_config config = Qdrouterd.Config ( config ) @@ -74,17 +75,43 @@ def router ( name, connection_1, connection_2=None ): inter_router_port_A = cls.tester.get_port() inter_router_port_B = cls.tester.get_port() port_for_sender = cls.tester.get_port() + A_linkroute_listener_port = cls.tester.get_port() + + cls.linkroute_prefix = '0.0.0.0/link' router ( 'A', + [ ( 'listener', {'role': 'inter-router', 'port': inter_router_port_A } + ), + ( 'listener', + { 'name': 'linkroute-listener', + 'role': 'route-container', + 'host': '0.0.0.0', + 'port': A_linkroute_listener_port, + 'saslMechanisms': 'ANONYMOUS' + } + ), + ( 'linkRoute', + { 'prefix': cls.linkroute_prefix, + 'connection': 'linkroute-listener', + 'dir': 'in' + } + ), + ( 'linkRoute', + { 'prefix': cls.linkroute_prefix, + 'connection': 'linkroute-listener', + 'dir': 'out' + } ) + ] ) router ( 'B', + [ ( 'listener', { 'role': 'inter-router', 'port': inter_router_port_B @@ -96,10 +123,22 @@ def router ( name, connection_1, connection_2=None ): 'port': inter_router_port_A, 'verifyHostName': 'no' } + ), + ( 'linkRoute', + { 'prefix': cls.linkroute_prefix, + 'dir': 'in' + } + ), + ( 'linkRoute', + { 'prefix': cls.linkroute_prefix, + 'dir': 'out' + } ) + ] ) router ( 'C', + [ ( 'connector', { 'name': 'connectorToB', 'role': 'inter-router', @@ -111,7 +150,18 @@ def router ( name, connection_1, connection_2=None ): { 'role': 'normal', 'port': port_for_sender } + ), + ( 'linkRoute', + { 'prefix': cls.linkroute_prefix, + 'dir': 'in' + } + ), + ( 'linkRoute', + { 'prefix': cls.linkroute_prefix, + 'dir': 'out' + } ) + ] ) @@ -135,8 +185,11 @@ def router ( name, connection_1, connection_2=None ): # receiver <--- A <--- B <--- C <--- sender # #------------------------------------------------ + cls.C_normal_addr = cls.router_C.addresses[1] cls.send_addr = cls.router_C.addresses[1] cls.recv_addr = cls.router_A.addresses[0] + cls.route_check_address = cls.router_A.addresses[0] + cls.route_container_addr = cls.router_A.addresses[2] def test_01_targeted_sender(self): @@ -144,24 +197,28 @@ def test_01_targeted_sender(self): test.run() self.assertEqual(None, test.error) + def test_02_anonymous_sender(self): test = AnonymousSenderTest ( self.send_addr, self.recv_addr ) test.run() self.assertEqual(None, test.error) + def test_03_dynamic_reply_to(self): test = DynamicReplyTo ( self.send_addr, self.recv_addr ) test.run() self.assertEqual(None, test.error) + def test_04_link_route ( self ): + test = LinkRoute ( self.route_container_addr, + self.linkroute_prefix, + self.route_check_address, + self.C_normal_addr + ) + test.run() + self.assertEqual(None, test.error) -class Timeout(object): - def __init__(self, parent): - self.parent = parent - - def on_timer_task(self, event): - self.parent.timeout() class TargetedSenderTest(MessagingHandler): @@ -185,14 +242,13 @@ def timeout(self): self.recv_conn.close() def on_start(self, event): - self.timer = event.reactor.schedule(10, Timeout(self)) + self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) self.send_conn = event.container.connect(self.send_addr) self.recv_conn = event.container.connect(self.recv_addr) self.sender = event.container.create_sender(self.send_conn, self.dest) self.receiver = event.container.create_receiver(self.recv_conn, self.dest) self.receiver.flow(self.n_expected) - def send(self): while self.sender.credit > 0 and self.n_sent < self.n_expected: msg = Message(body=self.n_sent) @@ -253,7 +309,7 @@ def timeout ( self ): self.recv_conn.close() def on_start(self, event): - self.timer = event.reactor.schedule(10, Timeout(self)) + self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) self.send_conn = event.container.connect(self.send_addr) self.recv_conn = event.container.connect(self.recv_addr) self.sender = event.container.create_sender(self.send_conn, options=DynamicTarget()) @@ -322,7 +378,7 @@ def timeout(self): def on_start ( self, event ): - self.timer = event.reactor.schedule ( 10, Timeout(self) ) + self.timer = event.reactor.schedule ( TIMEOUT, Timeout(self) ) # separate connections to simulate client and server. self.client_connection = event.container.connect(self.client_addr) @@ -337,9 +393,13 @@ def on_start ( self, event ): def on_sendable(self, event): + reply_to_addr = self.client_receiver.remote_source.address + + if reply_to_addr == None: + return + while event.sender.credit > 0 and self.n_sent < self.n_expected: # We send to server, and tell it how to reply to the client. - reply_to_addr = self.client_receiver.remote_source.address request = Message ( body=self.n_sent, address=self.dest, @@ -373,5 +433,189 @@ def run(self): + +class Entity(object): + def __init__(self, status_code, status_description, attrs): + self.status_code = status_code + self.status_description = status_description + self.attrs = attrs + + def __getattr__(self, key): + return self.attrs[key] + + + + +class RouterProxy(object): + def __init__(self, reply_addr): + self.reply_addr = reply_addr + + def response(self, msg): + ap = msg.properties + return Entity(ap['statusCode'], ap['statusDescription'], msg.body) + + def read_address(self, name): + ap = {'operation': 'READ', 'type': 'org.apache.qpid.dispatch.router.address', 'name': name} + return Message(properties=ap, reply_to=self.reply_addr) + + def query_addresses(self): + ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.router.address'} + return Message(properties=ap, reply_to=self.reply_addr) + + + + +class PollTimeout(object): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.poll_timeout() + + + +class Timeout(object): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.timeout() + + +class LinkRoute ( MessagingHandler ): + + """ + Set up and use a link-route, to send a message this way: + receiver <--- A <--- B <--- C <--- sender + + Check for appearance of the link-route at router A with a + technique that is appropriate to the 'proactor' style, + i.e. do not take control away from the proactor for an extended + period of time. + """ + + def __init__ ( self, route_container, linkroute_prefix, route_check_address, send_address ): + super(LinkRoute, self).__init__(prefetch=0) + self.route_container = route_container + self.send_address = send_address + self.linkroute_prefix = linkroute_prefix + self.route_check_address = route_check_address + self.true_statement = "This IS the message you are looking for." + self.error = None + self.send_connection = None + self.recv_connection = None + self.route_check_connection = None + self.route_check_sender = None + self.sender = None + self.route_check_timer = None + self.route_check_receiver = None + self.count = 1 + self.sent = 0 + self.route_checks_sent = 0 + + + def bail ( self, error_text ) : + self.error = error_text + self.recv_connection.close() + self.route_check_connection.close() + self.send_connection.close() + self.timer.cancel() + if self.route_check_timer : + self.route_check_timer.cancel() + self.route_check_timer = None + + + def timeout(self): + self.bail ( "Timeout Expired" ) + + + def poll_timeout ( self ) : + self.poll() + + + def on_start(self, event): + # Iff this timer expires, the test fails. + self.timer = event.reactor.schedule ( TIMEOUT, Timeout(self) ) + + # At startup, create only the receivers and the sender that checks for + # the route being ready. Creation of the payload message sender + # has to wait until we know the address is available, to avoid a + # 'no route to destination' error. + self.recv_connection = event.container.connect ( self.route_container ) + + self.route_check_connection = event.container.connect ( self.route_check_address ) + self.route_check_receiver = event.container.create_receiver ( self.route_check_connection, + dynamic=True + ) + self.route_check_sender = event.container.create_sender ( self.route_check_connection, + "$management" + ) + + + + def poll ( self ): + # The router prepends this 'D' to the address. If we want + # to look for it down at router A, we have to do likewise. + doctored_linkroute_prefix = 'D' + self.linkroute_prefix + request = self.proxy.read_address ( doctored_linkroute_prefix ) + self.route_check_sender.send ( request ) + + + + def on_link_opened ( self, event ): + + if event.receiver: + event.receiver.flow ( self.count ) + + if event.receiver == self.route_check_receiver: + self.proxy = RouterProxy ( self.route_check_receiver.remote_source.address ) + self.poll() + + + def on_link_opening ( self, event ): + if event.receiver: + event.receiver.target.address = self.route_container + event.receiver.flow(1) + + + def on_sendable ( self, event ): + if event.sender == self.sender and self.sent < self.count : + self.sender.send ( Message ( address = self.linkroute_prefix, body = self.true_statement ) ) + self.sent += 1 + + + def on_message ( self, event ): + # Check that the incoming message is the one we sent. + if ( event.message.body == self.true_statement ) : + self.bail ( None ) + + if event.receiver == self.route_check_receiver: + response = self.proxy.response(event.message) + if response.status_code == 200 and (response.remoteCount + response.containerCount) > 0: + if self.route_check_timer : + self.route_check_timer.cancel() + self.route_check_timer = None + # After address appears at the far end of my router line, (router A) + # give more time to let it finish propagating. + time.sleep ( 2 ) + self.send_connection = event.container.connect ( self.send_address ) + self.sender = event.container.create_sender ( self.send_connection, self.linkroute_prefix ) + else: + # Try again. + self.route_checks_sent += 1 + if self.route_checks_sent > 10 : + self.bail ( "can't find address" ) + else : + self.route_check_timer = event.reactor.schedule(0.50, PollTimeout(self)) + + + + + def run(self): + Container(self).run() + + + + if __name__ == '__main__': unittest.main(main_module())