From 54def21db24837dbdf45e27815d7eb7d073d67a0 Mon Sep 17 00:00:00 2001 From: mgoulish Date: Wed, 4 Apr 2018 13:06:06 -0400 Subject: [PATCH] No JIRA yet -- example perf test in unit test environment --- tests/CMakeLists.txt | 1 + tests/router_network.py | 346 ++++++++++++++++++++++++++++++ tests/system_tests_performance.py | 316 +++++++++++++++++++++++++++ 3 files changed, 663 insertions(+) create mode 100644 tests/router_network.py create mode 100644 tests/system_tests_performance.py diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 939b51f365..58aae71320 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -109,6 +109,7 @@ foreach(py_test_module system_tests_disallow_link_resumable_link_route system_tests_exchange_bindings system_tests_cmdline_parsing + system_tests_performance ${SYSTEM_TESTS_HTTP} ) diff --git a/tests/router_network.py b/tests/router_network.py new file mode 100644 index 0000000000..90444431af --- /dev/null +++ b/tests/router_network.py @@ -0,0 +1,346 @@ +#! /usr/bin/python + +import os +import subprocess +import time +import datetime +import socket +import contextlib +import string +import signal + + + + +def get_open_port ( ): + with contextlib.closing ( socket.socket ( socket.AF_INET, + socket.SOCK_STREAM + ) + ) as sock: + sock.bind ( ('localhost', 0) ) + return sock.getsockname()[1] + + + + +class Client ( object ) : + + def __init__ ( self, + action = None, + name = None, + port = None, + addr = None, + n_messages = None, + body_size = None + ) : + self.action = action + self.name = name + self.port = port + self.addr = addr + self.n_messages = n_messages + self.body_size = body_size + + self.process = None + + + +class Listener ( object ) : + + def __init__ ( self, role ) : + self.role = role + self.port = get_open_port ( ) + + + + +class Connector ( object ) : + + def __init__ ( self, target_port ) : + self.target_port = target_port + + + + +class Router ( object ) : + + def __init__ ( self, + name = 'router', + mode = 'interior', + workerThreads = '4', + path = None, + config_path = None + ) : + self.name = name + self.mode = mode + self.workerThreads = workerThreads + self.path = path + self.config_path = config_path + + self.process = None + self.stdout = None + self.stderr = None + self.listeners = list() + self.connectors = list() + + + def write_config_script ( self ) : + with open ( self.config_path, 'w+') as config_file : + print >> config_file , 'router {' + print >> config_file , ' mode: %s' % self.mode + print >> config_file , ' id: %s' % self.name + print >> config_file , ' workerThreads: %s' % self.workerThreads + print >> config_file , '}' + + print >> config_file , 'address {' + print >> config_file , ' prefix: closest' + print >> config_file , ' distribution: closest' + print >> config_file , '}' + + for l in self.listeners : + print >> config_file , 'listener {' + print >> config_file , ' role:', l.role + print >> config_file , ' port:', l.port + print >> config_file , ' saslMechanisms: ANONYMOUS' + print >> config_file , '}' + + for c in self.connectors : + print >> config_file , 'connector {' + print >> config_file , ' role: inter-router' + print >> config_file , ' host: 0.0.0.0' + print >> config_file , ' port: ', c.target_port + print >> config_file , ' saslMechanisms: ANONYMOUS' + print >> config_file , '}' + + + + def run ( self ) : + if not self.path : + # print "Router::run error: no path" + return + + self.write_config_script ( ) + + self.process = subprocess.Popen ( [ self.path, + '--config', + self.config_path + ], + stdout = subprocess.PIPE, + stderr = subprocess.PIPE + ) + # print "router", self.name, "has started as process ", self.process.pid + + + + def add_inter_router_listener ( self ) : + for l in self.listeners : + if l.role == 'inter-router' : + # print "router ", self.name, "already has an inter-router listener." + return + self.add_listener ( 'inter-router' ) + + + + def get_inter_router_port ( self ) : + for l in self.listeners : + if l.role == 'inter-router' : + return l.port + return None + + + + def get_client_port ( self ) : + for l in self.listeners : + if l.role == 'normal' : + return l.port + return None + + + + def add_normal_listener ( self ) : + for l in self.listeners : + if l.role == 'normal' : + # Only need one. + return + self.add_listener ( 'normal' ) + + + + def add_listener ( self, role ) : + if not ( (role == 'normal') or (role == 'inter-router') ) : + # print "add_inter_router_listener error: bad role: |%s|" % role + return + self.listeners.append ( Listener ( role ) ) + # print "added", role, "listener to router", self.name + + + + def add_connector ( self, target ) : + target_port = target.get_inter_router_port() + self.connectors.append ( Connector ( target_port ) ) + + + + def check ( self ) : + if self.process.poll() is None : + return "running" + else : + (self.stdout, self.stderr) = self.process.communicate ( ) + return "stopped" + + + + def halt ( self ) : + os.kill ( self.process.pid, signal.SIGTERM ) + #(self.stdout, self.stderr) = self.process.communicate ( ) + #print "router ", self.name, "=============================================" + #print "stdout -----------------------" + #print self.stdout + #print "stderr -----------------------" + #print self.stderr + + + + + +class RouterNetwork ( object ) : + + def __init__ ( self, + n_nodes = 1, + router_path = None, + test_path = None, + client_path = None, + ) : + + self.router_path = router_path + self.test_path = test_path + self.client_path = client_path + + self.routers = dict() + self.clients = list() + + config_dir_path = test_path + "/config/" + + if not os.path.exists ( config_dir_path ) : + os.makedirs ( config_dir_path ) + + for i in xrange ( n_nodes ) : + router_name = string.ascii_uppercase [ i ] + + router_config_path = config_dir_path + router_name + '.conf' + self.routers [ router_name ] = Router ( name=router_name, + path=self.router_path , + config_path = router_config_path + ) + + + + def connect ( self, from_name, to_name ) : + self.routers[to_name].add_inter_router_listener() + to_router = self.routers[to_name] + self.routers[from_name].add_connector(to_router) + + + + def add_client ( self, + router_name = None, # Name line 'A', 'B' + action = None, + name = None, + addr = None, + n_messages = None, + body_size = None + ) : + target_router = self.routers[router_name] + port = target_router.get_client_port ( ) + self.clients.append ( Client ( action = action, + name = name, + port = port, + addr = addr, + n_messages = n_messages, + body_size = body_size + ) + ) + + + + def add_client_listener ( self, router_name ) : + target_router = self.routers [ router_name ] + target_router.add_listener ( 'normal' ) + + + + def halt ( self ) : + self.halt_clients ( ) + for r in self.routers.values() : + r.halt ( ) + + + + def check_clients ( self ) : + clients_still_running = list() + for c in self.clients : + if c.process.poll() is None : + clients_still_running.append ( c.name ) + return clients_still_running + + + + def get_client_output ( self, target ) : + for c in self.clients : + if c.name == target : + (stdout, stderr) = c.process.communicate ( ) + return stdout + + + + def halt_clients ( self ) : + for c in self.clients : + proc = c.process + try : + os.kill ( proc.pid, signal.SIGTERM ) + #(stdout, stderr) = proc.communicate ( ) + #print "client", c.name, "-------------------------------" + #print "STDOUT " + #print stdout + #print "\nSTDERR" + #print stderr + except OSError : + pass # The client has already terminated. + + + + def run ( self ) : + for r in self.routers.keys() : + self.routers[r].run ( ) + time.sleep ( 5 ) + + body_size = '100' + credit_window = '100' + tx_size = '0' + flags = 'none' + + for c in self.clients : + args = [ self.client_path, + 'client', + 'active', + c.action, + c.name, + '0.0.0.0', + str(c.port), + c.addr, + str(c.n_messages), + body_size, + credit_window, + tx_size, + flags + ] + process = subprocess.Popen ( args, + stdout = subprocess.PIPE, + stderr = subprocess.PIPE + ) + c.process = process + + + + + + diff --git a/tests/system_tests_performance.py b/tests/system_tests_performance.py new file mode 100644 index 0000000000..4ad039c0ba --- /dev/null +++ b/tests/system_tests_performance.py @@ -0,0 +1,316 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest, os, json +import subprocess +from subprocess import PIPE, STDOUT +from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable, Timeout +from system_test import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process +from proton.handlers import MessagingHandler +from proton.reactor import Container, AtMostOnce, AtLeastOnce, DynamicNodeProperties, LinkOption, ApplicationEvent, EventInjector +from proton.utils import BlockingConnection +from qpid_dispatch.management.client import Node + +import time +import datetime +import pdb +import inspect +import numpy + +import time +import math +import router_network +import json + + + +# PROTON-828: +try: + from proton import MODIFIED +except ImportError: + from proton import PN_STATUS_MODIFIED as MODIFIED + + + + +#------------------------------------------------ +# Helper classes for all tests. +#------------------------------------------------ + +class Timeout(object): + """ + Named timeout object can handle multiple simultaneous + timers, by telling the parent which one fired. + """ + def __init__ ( self, parent, name ): + self.parent = parent + self.name = name + + def on_timer_task ( self, event ): + #print "MDEBUG: Timeout::on_timer_task -- ", self.name + self.parent.timeout ( self.name ) + + + +class ManagementMessageHelper ( object ) : + """ + Format management messages. + """ + def __init__ ( self, reply_addr ) : + self.reply_addr = reply_addr + + def make_connector_query ( self, connector_name ) : + props = {'operation': 'READ', 'type': 'org.apache.qpid.dispatch.connector', 'name' : connector_name } + msg = Message ( properties=props, reply_to=self.reply_addr ) + return msg + + def make_connector_delete_command ( self, connector_name ) : + props = {'operation': 'DELETE', 'type': 'org.apache.qpid.dispatch.connector', 'name' : connector_name } + msg = Message ( properties=props, reply_to=self.reply_addr ) + return msg + + def make_router_link_query ( self ) : + props = { 'count': '100', + 'operation': 'QUERY', + 'entityType': 'org.apache.qpid.dispatch.router.link', + 'name': 'self', + 'type': 'org.amqp.management' + } + attrs = [] + attrs.append ( unicode('linkType') ) + attrs.append ( unicode('linkDir') ) + attrs.append ( unicode('linkName') ) + attrs.append ( unicode('owningAddr') ) + attrs.append ( unicode('capacity') ) + attrs.append ( unicode('undeliveredCount') ) + attrs.append ( unicode('unsettledCount') ) + attrs.append ( unicode('acceptedCount') ) + attrs.append ( unicode('rejectedCount') ) + attrs.append ( unicode('releasedCount') ) + attrs.append ( unicode('modifiedCount') ) + + msg_body = { } + msg_body [ 'attributeNames' ] = attrs + return Message ( body=msg_body, properties=props, reply_to=self.reply_addr ) + + +#------------------------------------------------ +# END Helper classes for all tests. +#------------------------------------------------ + + + + + +#================================================================ +# Setup +#================================================================ + +class PerfTests ( TestCase ): + + @classmethod + def setUpClass(cls): + super(PerfTests, cls).setUpClass() + + # 1 means skip that test. + cls.skip = { 'test_01_latency_01' : 0, + } + + + def test_01_latency ( self ): + name = 'test_01_latency_01' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) + + test = Latency ( name, 'closest/latency_01' ) + test.run() + self.assertEqual ( None, test.error ) + + + + + +#================================================================ +# Tests +#================================================================ + +class Latency ( MessagingHandler ): + def __init__ ( self, test_name, addr ): + super ( Latency, self).__init__(prefetch=100) + self.test_name = test_name + self.addr = addr + + self.n_messages = 2000 + self.body_size = 100 + self.network = None + self.error = None + self.test_timer = None + self.send_timer = None + self.deadline = 60 + self.router_path = os.getenv ( 'DISPATCH_PERF_ROUTER_PATH' ) + self.client_path = os.getenv ( 'DISPATCH_PERF_CLIENT_PATH' ) + self.results_root = os.getenv ( 'DISPATCH_PERF_RESULTS_ROOT', default='/tmp' ) + + now = datetime.datetime.now() + self.timestamp = "%d_%02d_%02d_%02d_%02d" % (now.year, now.month, now.day, now.hour, now.minute) + self.results_path = self.results_root + '/' + self.timestamp + + self.debug = False + + self.test_description = dict() + self.test_description [ 'test' ] = 'latency' + self.test_description [ 'client' ] = 'proton-c' + self.test_description [ 'timestamp' ] = self.timestamp + self.test_description [ 'n-messages'] = self.n_messages + + + def debug_print ( self, text ) : + if self.debug == True: + print "%.6lf %s" % ( time.time(), text ) + + + # Shut down everything and exit. + def bail ( self, text ): + self.test_timer.cancel ( ) + self.error = text + + + def timeout ( self, name ): + if name == 'test': + self.bail ( "Test is hanging -- timeout expired." ) + + + def on_start ( self, event ): + self.test_timer = event.reactor.schedule ( self.deadline, Timeout(self, "test") ) + + if self.client_path == None : + self.bail ( 'need env var DISPATCH_PERF_CLIENT_PATH' ) + return + + if self.router_path == None : + self.bail ( 'need env var DISPATCH_PERF_ROUTER_PATH' ) + return + + if os.path.exists ( self.results_path ) : + self.bail ( 'Results path already exists: |%s|' % self.results_path ) + return + os.mkdir( self.results_path, 0755 ) + + #==================================================== + # Configure the network. + #==================================================== + self.network = router_network.RouterNetwork ( n_nodes = 3, + router_path = self.router_path, + test_path = './test', + client_path = self.client_path + ) + self.network.add_client_listener ( 'A' ) + self.network.add_client_listener ( 'B' ) + self.network.add_client_listener ( 'C' ) + self.network.connect ( 'A', 'B' ) + self.network.connect ( 'B', 'C' ) + + #==================================================== + # Add the clients. + #==================================================== + router_name = 'A' + client_name = 'receiver' + action = 'receive' + + self.network.add_client ( router_name = router_name, + action = action, + name = client_name, + addr = self.addr, + n_messages = self.n_messages, + body_size = self.body_size + ) + + router_name = 'C' + client_name = 'sender' + action = 'send' + + self.network.add_client ( router_name = router_name, + action = action, + name = client_name, + addr = self.addr, + n_messages = self.n_messages, + body_size = self.body_size + ) + + #==================================================== + # Run everything. + #==================================================== + self.network.run ( ) + + sleeps = 0 + output = None + + while True : + self.debug_print ( "main program running" ) + time.sleep ( 3 ) + sleeps += 1 + + still_running = self.network.check_clients ( ) + self.debug_print ( "still running: %s" % still_running ) + + if len ( still_running ) < 2 : + self.debug_print ( "halting network" ) + self.network.halt ( ) + output = self.network.get_client_output ( 'receiver' ) + break + + #==================================================== + # Process the output. + #==================================================== + flight_times = list() + sum = 0 + + for result in output.split ( ) : + _, start_time, stop_time = result.split ( ',' ) + msec = int(stop_time) - int(start_time) + sum += msec + flight_times.append ( float(msec) ) + + n = len ( flight_times ) + mean = float(sum) / float ( n ) + variances = [ float(msec - mean) for msec in flight_times ] + squared_variances = [ v**2 for v in variances ] + sum_sq_var = 0 + for sv in squared_variances : + sum_sq_var += sv + sigma = math.sqrt ( sum_sq_var / n ) + + result_file_name = self.results_path + '/result' + with open ( result_file_name, 'w+' ) as file : + self.test_description [ 'mean' ] = mean + self.test_description [ 'sigma' ] = sigma + json.dump ( self.test_description, file, indent=4, sort_keys=True ) + self.debug_print ( 'results written to |%s|' % result_file_name ) + self.bail ( None ) + + + def run(self): + Container(self).run() + + + + + +if __name__ == '__main__': + unittest.main(main_module())