From aeed622cbab64be443ef318812bf550581b583c3 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 17 Jan 2017 16:42:28 -0800 Subject: [PATCH 1/2] MINOR: refactor streams system test class hierachy --- .../performance/streams_performance.py | 112 ++---------------- tests/kafkatest/services/streams.py | 54 +++++---- .../streams/streams_shutdown_deadlock_test.py | 3 +- 3 files changed, 42 insertions(+), 127 deletions(-) diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py index 0af13f9b349ca..e9fa2a79acd40 100644 --- a/tests/kafkatest/services/performance/streams_performance.py +++ b/tests/kafkatest/services/performance/streams_performance.py @@ -13,115 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os.path -import signal +from kafkatest.services.streams import StreamsTestBaseService -from ducktape.services.service import Service -from ducktape.utils.util import wait_until - -from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin # # Class used to start the simple Kafka Streams benchmark # -class StreamsSimpleBenchmarkService(KafkaPathResolverMixin, Service): +class StreamsSimpleBenchmarkService(StreamsTestBaseService): """Base class for simple Kafka Streams benchmark""" - PERSISTENT_ROOT = "/mnt/streams" - # The log file contains normal log4j logs written using a file appender. stdout and stderr are handled separately - LOG_FILE = os.path.join(PERSISTENT_ROOT, "streams.log") - STDOUT_FILE = os.path.join(PERSISTENT_ROOT, "streams.stdout") - STDERR_FILE = os.path.join(PERSISTENT_ROOT, "streams.stderr") - LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") - PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid") - - logs = { - "streams_log": { - "path": LOG_FILE, - "collect_default": True}, - "streams_stdout": { - "path": STDOUT_FILE, - "collect_default": True}, - "streams_stderr": { - "path": STDERR_FILE, - "collect_default": True}, - } - - def __init__(self, context, kafka, numrecs): - super(StreamsSimpleBenchmarkService, self).__init__(context, 1) - self.kafka = kafka - self.numrecs = numrecs - - @property - def node(self): - return self.nodes[0] - - def pids(self, node): - try: - return [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=int)] - except: - return [] - - def stop_node(self, node, clean_shutdown=True): - self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping SimpleBenchmark on " + str(node.account)) - pids = self.pids(node) - sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL - - for pid in pids: - node.account.signal(pid, sig, allow_fail=True) - if clean_shutdown: - for pid in pids: - wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="SimpleBenchmark process on " + str(node.account) + " took too long to exit") - - node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False) - - def wait(self): - for node in self.nodes: - for pid in self.pids(node): - wait_until(lambda: not node.account.alive(pid), timeout_sec=600, backoff_sec=1, err_msg="SimpleBenchmark process on " + str(node.account) + " took too long to exit") - - def clean_node(self, node): - node.account.kill_process("streams", clean_shutdown=False, allow_fail=True) - node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False) - - def start_cmd(self, node): - args = {} - args['kafka'] = self.kafka.bootstrap_servers() - args['zk'] = self.kafka.zk.connect_setting() - args['state_dir'] = self.PERSISTENT_ROOT - args['numrecs'] = self.numrecs - args['stdout'] = self.STDOUT_FILE - args['stderr'] = self.STDERR_FILE - args['pidfile'] = self.PID_FILE - args['log4j'] = self.LOG4J_CONFIG_FILE - args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node) - - cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \ - "INCLUDE_TEST_JARS=true %(kafka_run_class)s org.apache.kafka.streams.perf.SimpleBenchmark " \ - " %(kafka)s %(zk)s %(state_dir)s %(numrecs)s " \ - " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args - - return cmd - - def start_node(self, node): - node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False) - - node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('tools_log4j.properties', log_file=self.LOG_FILE)) - - self.logger.info("Starting SimpleBenchmark process on " + str(node.account)) - results = {} - with node.account.monitor_log(self.STDOUT_FILE) as monitor: - node.account.ssh(self.start_cmd(node)) - monitor.wait_until('SimpleBenchmark instance started', timeout_sec=15, err_msg="Never saw message indicating SimpleBenchmark finished startup on " + str(node.account)) - - if len(self.pids(node)) == 0: - raise RuntimeError("No process ids recorded") - - def collect_data(self, node): - # Collect the data and return it to the framework - output = node.account.ssh_capture("grep Performance %s" % self.STDOUT_FILE) - data = {} - for line in output: - parts = line.split(':') - data[parts[0]] = float(parts[1]) - return data + def __init__(self, test_context, kafka, numrecs): + super(StreamsSimpleBenchmarkService, self).__init__(test_context, + kafka, + "org.apache.kafka.streams.perf.SimpleBenchmark", + numrecs) diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 87e441470e425..9250cd7b94375 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -22,8 +22,8 @@ from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin -class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service): - """Base class for Streams Smoke Test services providing some common settings and functionality""" +class StreamsTestBaseService(KafkaPathResolverMixin, Service): + """Base class for Streams Test services providing some common settings and functionality""" PERSISTENT_ROOT = "/mnt/streams" # The log file contains normal log4j logs written using a file appender. stdout and stderr are handled separately @@ -45,10 +45,11 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service): "collect_default": True}, } - def __init__(self, context, kafka, command): - super(StreamsSmokeTestBaseService, self).__init__(context, 1) + def __init__(self, test_context, kafka, streams_class_name, user_test_args): + super(StreamsTestBaseService, self).__init__(test_context, 1) self.kafka = kafka - self.args = {'command': command} + self.args = {'streams_class_name': streams_class_name, + 'user_test_args': user_test_args} @property def node(self): @@ -65,7 +66,7 @@ def stop_nodes(self, clean_shutdown=True): self.stop_node(node, clean_shutdown) def stop_node(self, node, clean_shutdown=True): - self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Streams Smoke Test on " + str(node.account)) + self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Streams Test on " + str(node.account)) pids = self.pids(node) sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL @@ -73,7 +74,7 @@ def stop_node(self, node, clean_shutdown=True): node.account.signal(pid, sig, allow_fail=True) if clean_shutdown: for pid in pids: - wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="Streams Smoke Test process on " + str(node.account) + " took too long to exit") + wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="Streams Test process on " + str(node.account) + " took too long to exit") node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False) @@ -95,8 +96,11 @@ def abortThenRestart(self): def wait(self, timeout_sec=360): for node in self.nodes: - for pid in self.pids(node): - wait_until(lambda: not node.account.alive(pid), timeout_sec=timeout_sec, err_msg="Streams Smoke Test process on " + str(node.account) + " took too long to exit") + self.wait_node(node, timeout_sec) + + def wait_node(self, node, timeout_sec=None): + for pid in self.pids(node): + wait_until(lambda: not node.account.alive(pid), timeout_sec=timeout_sec, err_msg="Streams Test process on " + str(node.account) + " took too long to exit") def clean_node(self, node): node.account.kill_process("streams", clean_shutdown=False, allow_fail=True) @@ -105,7 +109,6 @@ def clean_node(self, node): def start_cmd(self, node): args = self.args.copy() args['kafka'] = self.kafka.bootstrap_servers() - args['zk'] = self.kafka.zk.connect_setting() args['state_dir'] = self.PERSISTENT_ROOT args['stdout'] = self.STDOUT_FILE args['stderr'] = self.STDERR_FILE @@ -114,8 +117,8 @@ def start_cmd(self, node): args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node) cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \ - "INCLUDE_TEST_JARS=true %(kafka_run_class)s org.apache.kafka.streams.smoketest.StreamsSmokeTest " \ - " %(command)s %(kafka)s %(zk)s %(state_dir)s " \ + "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \ + " %(kafka)s %(state_dir)s %(user_test_args)s" \ " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args return cmd @@ -125,24 +128,35 @@ def start_node(self, node): node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('tools_log4j.properties', log_file=self.LOG_FILE)) - self.logger.info("Starting StreamsSmokeTest process on " + str(node.account)) + self.logger.info("Starting StreamsTest process on " + str(node.account)) with node.account.monitor_log(self.STDOUT_FILE) as monitor: node.account.ssh(self.start_cmd(node)) - monitor.wait_until('StreamsSmokeTest instance started', timeout_sec=15, err_msg="Never saw message indicating StreamsSmokeTest finished startup on " + str(node.account)) + monitor.wait_until('StreamsTest instance started', timeout_sec=60, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account)) if len(self.pids(node)) == 0: raise RuntimeError("No process ids recorded") +class StreamsSmokeTestBaseService(StreamsTestBaseService): + """Base class for Streams Smoke Test services providing some common settings and functionality""" + + def __init__(self, test_context, kafka, command): + super(StreamsSmokeTestBaseService, self).__init__(test_context, + kafka, + "org.apache.kafka.streams.smoketest.StreamsSmokeTest", + command) + + class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService): - def __init__(self, context, kafka): - super(StreamsSmokeTestDriverService, self).__init__(context, kafka, "run") + def __init__(self, test_context, kafka): + super(StreamsSmokeTestDriverService, self).__init__(test_context, kafka, "run") class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService): - def __init__(self, context, kafka): - super(StreamsSmokeTestJobRunnerService, self).__init__(context, kafka, "process") + def __init__(self, test_context, kafka): + super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process") + class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService): - def __init__(self, context, kafka): - super(StreamsSmokeTestShutdownDeadlockService, self).__init__(context, kafka, "close-deadlock-test") + def __init__(self, test_context, kafka): + super(StreamsSmokeTestShutdownDeadlockService, self).__init__(test_context, kafka, "close-deadlock-test") diff --git a/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py b/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py index 5e4e7f28c683b..482da9c5d85f7 100644 --- a/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py +++ b/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py @@ -13,11 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.mark import ignore - from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.streams import StreamsSmokeTestShutdownDeadlockService + class StreamsShutdownDeadlockTest(KafkaTest): """ Simple test of Kafka Streams. From d36791c546ac1ac3a416f2e02c4258eefe17ccf3 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 17 Jan 2017 22:06:38 -0800 Subject: [PATCH 2/2] fixed SimpleBenchmark startup issue --- .../java/org/apache/kafka/streams/perf/SimpleBenchmark.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index fb26206fd238b..90226c126a0df 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -108,7 +108,7 @@ public static void main(String[] args) throws Exception { rocksdbDir.mkdir(); // Note: this output is needed for automated tests and must not be removed - System.out.println("SimpleBenchmark instance started"); + System.out.println("StreamsTest instance started"); System.out.println("kafka=" + kafka); System.out.println("stateDir=" + stateDir); System.out.println("numRecords=" + numRecords);