From 6856f3457702d7855467fbbf6a92228928a8ce9a Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Fri, 6 May 2016 20:54:59 -0700 Subject: [PATCH 1/3] Address Connect test failure --- .../tests/connect/connect_rest_test.py | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py index 69a8cb797020..270d66867d82 100644 --- a/tests/kafkatest/tests/connect/connect_rest_test.py +++ b/tests/kafkatest/tests/connect/connect_rest_test.py @@ -17,6 +17,7 @@ from kafkatest.services.connect import ConnectDistributedService, ConnectRestError from ducktape.utils.util import wait_until import hashlib, subprocess, json, itertools +import time class ConnectRestApiTest(KafkaTest): """ @@ -65,7 +66,9 @@ def test_rest_api(self): sink_connector_props = self.render("connect-file-sink.properties") for connector_props in [source_connector_props, sink_connector_props]: connector_config = self._config_dict_from_props(connector_props) - self.cc.create_connector(connector_config) + wait_until(lambda: self.not_throw_rest_error(lambda: self.cc.create_connector(connector_config)), timeout_sec=120, err_msg="Create connectors throws exception.") + + wait_until(lambda: self.not_throw_rest_error(lambda: self.cc.list_connectors()), timeout_sec=120, err_msg="List connectors throws exception.") # We should see the connectors appear wait_until(lambda: set(self.cc.list_connectors()) == set(["local-file-source", "local-file-sink"]), @@ -76,7 +79,6 @@ def test_rest_api(self): node.account.ssh("echo -e -n " + repr(self.INPUTS) + " >> " + self.INPUT_FILE) wait_until(lambda: self.validate_output(self.INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.") - # Trying to create the same connector again should cause an error try: self.cc.create_connector(self._config_dict_from_props(source_connector_props)) @@ -97,19 +99,18 @@ def test_rest_api(self): expected_sink_info = { 'name': 'local-file-sink', 'config': self._config_dict_from_props(sink_connector_props), - 'tasks': [{ 'connector': 'local-file-sink', 'task': 0 }] + 'tasks': [{'connector': 'local-file-sink', 'task': 0 }] } sink_info = self.cc.get_connector("local-file-sink") assert expected_sink_info == sink_info, "Incorrect info:" + json.dumps(sink_info) sink_config = self.cc.get_connector_config("local-file-sink") assert expected_sink_info['config'] == sink_config, "Incorrect config: " + json.dumps(sink_config) - # Validate that we can get info about tasks. This info should definitely be available now without waiting since # we've already seen data appear in files. # TODO: It would be nice to validate a complete listing, but that doesn't make sense for the file connectors expected_source_task_info = [{ - 'id': { 'connector': 'local-file-source', 'task': 0 }, + 'id': {'connector': 'local-file-source', 'task': 0}, 'config': { 'task.class': 'org.apache.kafka.connect.file.FileStreamSourceTask', 'file': self.INPUT_FILE, @@ -119,7 +120,7 @@ def test_rest_api(self): source_task_info = self.cc.get_connector_tasks("local-file-source") assert expected_source_task_info == source_task_info, "Incorrect info:" + json.dumps(source_task_info) expected_sink_task_info = [{ - 'id': { 'connector': 'local-file-sink', 'task': 0 }, + 'id': {'connector': 'local-file-sink', 'task': 0}, 'config': { 'task.class': 'org.apache.kafka.connect.file.FileStreamSinkTask', 'file': self.OUTPUT_FILE, @@ -139,8 +140,10 @@ def test_rest_api(self): node.account.ssh("echo -e -n " + repr(self.LONER_INPUTS) + " >> " + self.INPUT_FILE2) wait_until(lambda: self.validate_output(self.LONGER_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.") - self.cc.delete_connector("local-file-source") - self.cc.delete_connector("local-file-sink") + wait_until(lambda: self.not_throw_rest_error(lambda: self.cc.delete_connector("local-file-source")), timeout_sec=120, err_msg="Delete connector throws exception.") + wait_until(lambda: self.not_throw_rest_error(lambda: self.cc.delete_connector("local-file-sink")), timeout_sec=120, err_msg="Delete connector throws exception.") + + wait_until(lambda: self.not_throw_rest_error(lambda: self.cc.list_connectors()), timeout_sec=120, err_msg="List connectors throws exception.") wait_until(lambda: len(self.cc.list_connectors()) == 0, timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing") def validate_output(self, input): @@ -151,7 +154,6 @@ def validate_output(self, input): ])) return input_set == output_set - def file_contents(self, node, file): try: # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of @@ -163,3 +165,12 @@ def file_contents(self, node, file): def _config_dict_from_props(self, connector_props): return dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')]) + def not_throw_exception(self, fun, exception): + try: + fun() + return True + except exception: + return False + + def not_throw_rest_error(self, fun): + return self.not_throw_exception(fun, ConnectRestError) From 6241de36489cbde99a2dda8b20ac2f69d249419b Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Sun, 8 May 2016 18:48:29 -0700 Subject: [PATCH 2/3] Move retry on exception to util.py --- tests/kafkatest/services/connect.py | 8 +++--- .../tests/connect/connect_rest_test.py | 27 ++++++++----------- tests/kafkatest/utils/util.py | 13 +++++++++ 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py index 1eb2dd58c646..45f960ee7dbe 100644 --- a/tests/kafkatest/services/connect.py +++ b/tests/kafkatest/services/connect.py @@ -102,7 +102,6 @@ def clean_node(self, node): def config_filenames(self): return [os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties") for idx, template in enumerate(self.connector_config_templates or [])] - def list_connectors(self, node=None): return self._rest('/connectors', node=node) @@ -144,10 +143,10 @@ def _rest(self, path, body=None, node=None, method="GET"): else: return resp.json() - def _base_url(self, node): return 'http://' + node.account.externally_routable_ip + ':' + '8083' + class ConnectStandaloneService(ConnectServiceBase): """Runs Kafka Connect in standalone mode.""" @@ -223,8 +222,6 @@ def start_node(self, node): raise RuntimeError("No process ids recorded") - - class ConnectRestError(RuntimeError): def __init__(self, status, msg, url): self.status = status @@ -235,7 +232,6 @@ def __unicode__(self): return "Kafka Connect REST call failed: returned " + self.status + " for " + self.url + ". Response: " + self.message - class VerifiableConnector(object): def messages(self): """ @@ -261,6 +257,7 @@ def stop(self): self.logger.info("Destroying connector %s %s", type(self).__name__, self.name) self.cc.delete_connector(self.name) + class VerifiableSource(VerifiableConnector): """ Helper class for running a verifiable source connector on a Kafka Connect cluster and analyzing the output. @@ -284,6 +281,7 @@ def start(self): 'throughput': self.throughput }) + class VerifiableSink(VerifiableConnector): """ Helper class for running a verifiable sink connector on a Kafka Connect cluster and analyzing the output. diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py index 270d66867d82..a85defe27b14 100644 --- a/tests/kafkatest/tests/connect/connect_rest_test.py +++ b/tests/kafkatest/tests/connect/connect_rest_test.py @@ -15,9 +15,12 @@ from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.connect import ConnectDistributedService, ConnectRestError +from kafkatest.utils.util import retry_on_exception from ducktape.utils.util import wait_until -import hashlib, subprocess, json, itertools -import time +import subprocess +import json +import itertools + class ConnectRestApiTest(KafkaTest): """ @@ -66,9 +69,9 @@ def test_rest_api(self): sink_connector_props = self.render("connect-file-sink.properties") for connector_props in [source_connector_props, sink_connector_props]: connector_config = self._config_dict_from_props(connector_props) - wait_until(lambda: self.not_throw_rest_error(lambda: self.cc.create_connector(connector_config)), timeout_sec=120, err_msg="Create connectors throws exception.") + retry_on_exception(lambda: self.cc.create_connector(connector_config), exception=ConnectRestError, timeout_sec=120, err_msg="Create connectors throws exception.") - wait_until(lambda: self.not_throw_rest_error(lambda: self.cc.list_connectors()), timeout_sec=120, err_msg="List connectors throws exception.") + retry_on_exception(lambda: self.cc.list_connectors(), exception=ConnectRestError, timeout_sec=120, err_msg="List connectors throws exception.") # We should see the connectors appear wait_until(lambda: set(self.cc.list_connectors()) == set(["local-file-source", "local-file-sink"]), @@ -140,10 +143,11 @@ def test_rest_api(self): node.account.ssh("echo -e -n " + repr(self.LONER_INPUTS) + " >> " + self.INPUT_FILE2) wait_until(lambda: self.validate_output(self.LONGER_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.") - wait_until(lambda: self.not_throw_rest_error(lambda: self.cc.delete_connector("local-file-source")), timeout_sec=120, err_msg="Delete connector throws exception.") - wait_until(lambda: self.not_throw_rest_error(lambda: self.cc.delete_connector("local-file-sink")), timeout_sec=120, err_msg="Delete connector throws exception.") + retry_on_exception(lambda: self.cc.delete_connector("local-file-source"), exception=ConnectRestError, timeout_sec=120, err_msg="Delete connector throws exception.") + retry_on_exception(lambda: self.cc.delete_connector("local-file-sink"), exception=ConnectRestError, timeout_sec=120, err_msg="Delete connector throws exception.") + + retry_on_exception(lambda: self.cc.list_connectors(), exception=ConnectRestError, timeout_sec=120, err_msg="List connectors throws exception.") - wait_until(lambda: self.not_throw_rest_error(lambda: self.cc.list_connectors()), timeout_sec=120, err_msg="List connectors throws exception.") wait_until(lambda: len(self.cc.list_connectors()) == 0, timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing") def validate_output(self, input): @@ -165,12 +169,3 @@ def file_contents(self, node, file): def _config_dict_from_props(self, connector_props): return dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')]) - def not_throw_exception(self, fun, exception): - try: - fun() - return True - except exception: - return False - - def not_throw_rest_error(self, fun): - return self.not_throw_exception(fun, ConnectRestError) diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py index 0b10dbf0f445..d0bd08508c96 100644 --- a/tests/kafkatest/utils/util.py +++ b/tests/kafkatest/utils/util.py @@ -15,6 +15,7 @@ from kafkatest import __version__ as __kafkatest_version__ import re +from ducktape.utils.util import wait_until def kafkatest_version(): @@ -71,3 +72,15 @@ def is_int_with_prefix(msg): raise Exception("Unexpected message format. Message should be of format: integer " "prefix dot integer value, but one of the two parts (before or after dot) " "are not integers. Message: %s" % (msg)) + + +def not_throw_exception(fun, exception): + try: + fun() + return True + except exception: + return False + + +def retry_on_exception(fun, exception, timeout_sec, err_msg): + wait_until(lambda: not_throw_exception(fun, exception), timeout_sec=timeout_sec, err_msg=err_msg) From 1a927f033b5f368795f462ce7b50d00881ea9b90 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Sun, 8 May 2016 21:38:42 -0700 Subject: [PATCH 3/3] Move retry to REST APIs --- tests/kafkatest/services/connect.py | 32 +++++++++++-------- .../tests/connect/connect_rest_test.py | 15 +++------ tests/kafkatest/utils/util.py | 21 ++++++------ 3 files changed, 33 insertions(+), 35 deletions(-) diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py index 45f960ee7dbe..cf67c301b09e 100644 --- a/tests/kafkatest/services/connect.py +++ b/tests/kafkatest/services/connect.py @@ -22,6 +22,7 @@ from ducktape.errors import DucktapeError from ducktape.services.service import Service from ducktape.utils.util import wait_until +from kafkatest.utils.util import retry_on_exception from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin @@ -102,30 +103,30 @@ def clean_node(self, node): def config_filenames(self): return [os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties") for idx, template in enumerate(self.connector_config_templates or [])] - def list_connectors(self, node=None): - return self._rest('/connectors', node=node) + def list_connectors(self, node=None, retries=0, retry_backoff=.01): + return self._rest_with_retry('/connectors', node=node, retries=retries, retry_backoff=retry_backoff) - def create_connector(self, config, node=None): + def create_connector(self, config, node=None, retries=0, retry_backoff=.01): create_request = { 'name': config['name'], 'config': config } - return self._rest('/connectors', create_request, node=node, method="POST") + return self._rest_with_retry('/connectors', create_request, node=node, method="POST", retries=retries, retry_backoff=retry_backoff) - def get_connector(self, name, node=None): - return self._rest('/connectors/' + name, node=node) + def get_connector(self, name, node=None, retries=0, retry_backoff=.01): + return self._rest_with_retry('/connectors/' + name, node=node, retries=retries, retry_backoff=retry_backoff) - def get_connector_config(self, name, node=None): - return self._rest('/connectors/' + name + '/config', node=node) + def get_connector_config(self, name, node=None, retries=0, retry_backoff=.01): + return self._rest_with_retry('/connectors/' + name + '/config', node=node, retries=retries, retry_backoff=retry_backoff) - def set_connector_config(self, name, config, node=None): - return self._rest('/connectors/' + name + '/config', config, node=node, method="PUT") + def set_connector_config(self, name, config, node=None, retries=0, retry_backoff=.01): + return self._rest_with_retry('/connectors/' + name + '/config', config, node=node, method="PUT", retries=retries, retry_backoff=retry_backoff) - def get_connector_tasks(self, name, node=None): - return self._rest('/connectors/' + name + '/tasks', node=node) + def get_connector_tasks(self, name, node=None, retries=0, retry_backoff=.01): + return self._rest_with_retry('/connectors/' + name + '/tasks', node=node, retries=retries, retry_backoff=retry_backoff) - def delete_connector(self, name, node=None): - return self._rest('/connectors/' + name, node=node, method="DELETE") + def delete_connector(self, name, node=None, retries=0, retry_backoff=.01): + return self._rest_with_retry('/connectors/' + name, node=node, method="DELETE", retries=retries, retry_backoff=retry_backoff) def _rest(self, path, body=None, node=None, method="GET"): if node is None: @@ -143,6 +144,9 @@ def _rest(self, path, body=None, node=None, method="GET"): else: return resp.json() + def _rest_with_retry(self, path, body=None, node=None, method="GET", retries=0, retry_backoff=.01): + return retry_on_exception(lambda: self._rest(path, body, node, method), ConnectRestError, retries, retry_backoff) + def _base_url(self, node): return 'http://' + node.account.externally_routable_ip + ':' + '8083' diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py index a85defe27b14..63b9bb11a58e 100644 --- a/tests/kafkatest/tests/connect/connect_rest_test.py +++ b/tests/kafkatest/tests/connect/connect_rest_test.py @@ -69,12 +69,10 @@ def test_rest_api(self): sink_connector_props = self.render("connect-file-sink.properties") for connector_props in [source_connector_props, sink_connector_props]: connector_config = self._config_dict_from_props(connector_props) - retry_on_exception(lambda: self.cc.create_connector(connector_config), exception=ConnectRestError, timeout_sec=120, err_msg="Create connectors throws exception.") - - retry_on_exception(lambda: self.cc.list_connectors(), exception=ConnectRestError, timeout_sec=120, err_msg="List connectors throws exception.") + self.cc.create_connector(connector_config, retries=120, retry_backoff=1) # We should see the connectors appear - wait_until(lambda: set(self.cc.list_connectors()) == set(["local-file-source", "local-file-sink"]), + wait_until(lambda: set(self.cc.list_connectors(retries=5, retry_backoff=1)) == set(["local-file-source", "local-file-sink"]), timeout_sec=10, err_msg="Connectors that were just created did not appear in connector listing") # We'll only do very simple validation that the connectors and tasks really ran. @@ -143,12 +141,9 @@ def test_rest_api(self): node.account.ssh("echo -e -n " + repr(self.LONER_INPUTS) + " >> " + self.INPUT_FILE2) wait_until(lambda: self.validate_output(self.LONGER_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.") - retry_on_exception(lambda: self.cc.delete_connector("local-file-source"), exception=ConnectRestError, timeout_sec=120, err_msg="Delete connector throws exception.") - retry_on_exception(lambda: self.cc.delete_connector("local-file-sink"), exception=ConnectRestError, timeout_sec=120, err_msg="Delete connector throws exception.") - - retry_on_exception(lambda: self.cc.list_connectors(), exception=ConnectRestError, timeout_sec=120, err_msg="List connectors throws exception.") - - wait_until(lambda: len(self.cc.list_connectors()) == 0, timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing") + self.cc.delete_connector("local-file-source", retries=120, retry_backoff=1) + self.cc.delete_connector("local-file-sink", retries=120, retry_backoff=1) + wait_until(lambda: len(self.cc.list_connectors(retries=5, retry_backoff=1)) == 0, timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing") def validate_output(self, input): input_set = set(input) diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py index d0bd08508c96..c043bec743fb 100644 --- a/tests/kafkatest/utils/util.py +++ b/tests/kafkatest/utils/util.py @@ -15,7 +15,7 @@ from kafkatest import __version__ as __kafkatest_version__ import re -from ducktape.utils.util import wait_until +import time def kafkatest_version(): @@ -74,13 +74,12 @@ def is_int_with_prefix(msg): "are not integers. Message: %s" % (msg)) -def not_throw_exception(fun, exception): - try: - fun() - return True - except exception: - return False - - -def retry_on_exception(fun, exception, timeout_sec, err_msg): - wait_until(lambda: not_throw_exception(fun, exception), timeout_sec=timeout_sec, err_msg=err_msg) +def retry_on_exception(fun, exception, retries, retry_backoff=.01): + exception_to_throw = None + for i in range(0, retries + 1): + try: + return fun() + except exception as e: + exception_to_throw = e + time.sleep(retry_backoff) + raise exception_to_throw