From 50630af74bcf78abe2b3d2a5c07b3347a914543e Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 10 May 2016 17:16:42 -0700 Subject: [PATCH 1/2] KAFKA-3694: Ensure broker Zk deregistration prior to restart in ReplicationTest --- tests/kafkatest/services/kafka/kafka.py | 16 +++++++++++++++- .../services/kafka/templates/kafka.properties | 1 + tests/kafkatest/tests/core/replication_test.py | 11 ++++------- .../kafkatest/tests/produce_consume_validate.py | 2 +- 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 334069d99544e..a843a127b74d3 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -66,7 +66,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT, client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, authorizer_class_name=None, topics=None, version=TRUNK, quota_config=None, jmx_object_names=None, - jmx_attributes=[], zk_connect_timeout=5000): + jmx_attributes=[], zk_connect_timeout=5000, zk_session_timeout=6000): """ :type context :type zk: ZookeeperService @@ -99,6 +99,11 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI # for this constructor. self.zk_connect_timeout = zk_connect_timeout + # Also allow the session timeout to be provided explicitly, + # primarily so that test cases can depend on it when waiting + # e.g. brokers to deregister after a hard kill. + self.zk_session_timeout = zk_session_timeout + self.port_mappings = { 'PLAINTEXT': Port('PLAINTEXT', 9092, False), 'SSL': Port('SSL', 9093, False), @@ -513,6 +518,15 @@ def controller(self): self.logger.info("Controller's ID: %d" % (controller_idx)) return self.get_node(controller_idx) + def is_registered(self, node): + """ + Check whether a broker is registered in Zookeeper + """ + self.logger.debug("Querying zookeeper to see if broker %s is registered", node) + broker_info = self.zk.query("/brokers/ids/%s" % self.idx(node)) + self.logger.debug("Broker info: %s", broker_info) + return broker_info is not None + def get_offset_shell(self, topic, partitions, max_wait_ms, offsets, time): node = self.nodes[0] diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index 1e4f17c0d3ca1..1f2371302c8ac 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -72,6 +72,7 @@ zookeeper.set.acl={{zk_set_acl}} {% endif %} zookeeper.connection.timeout.ms={{ zk_connect_timeout }} +zookeeper.session.timeout.ms={{ zk_session_timeout }} {% if replica_lag is defined %} replica.lag.time.max.ms={{replica_lag}} diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py index 8e9474aec270a..0681817ee65aa 100644 --- a/tests/kafkatest/tests/core/replication_test.py +++ b/tests/kafkatest/tests/core/replication_test.py @@ -65,15 +65,12 @@ def hard_bounce(test, broker_type): test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL) # Since this is a hard kill, we need to make sure the process is down and that - # zookeeper and the broker cluster have registered the loss of the leader/controller. - # Waiting for a new leader for the topic-partition/controller to be elected is a reasonable heuristic for this. + # zookeeper has registered the loss by expiring the broker's session timeout. - def role_reassigned(): - current_elected_broker = broker_node(test, broker_type) - return current_elected_broker is not None and current_elected_broker != prev_broker_node + wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and not test.kafka.is_registered(prev_broker_node), + timeout_sec=test.kafka.zk_session_timeout + 5, + err_msg="Failed to see timely deregistration of hard-killed broker") - wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0, timeout_sec=5) - wait_until(role_reassigned, timeout_sec=10, backoff_sec=.5) test.kafka.start_node(prev_broker_node) failures = { diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py index 425b81659576e..a5da7be1a8850 100644 --- a/tests/kafkatest/tests/produce_consume_validate.py +++ b/tests/kafkatest/tests/produce_consume_validate.py @@ -35,7 +35,7 @@ def setup_producer_and_consumer(self): def start_producer_and_consumer(self): # Start background producer and consumer self.producer.start() - wait_until(lambda: self.producer.num_acked > 5, timeout_sec=10, + wait_until(lambda: self.producer.num_acked > 5, timeout_sec=20, err_msg="Producer failed to start in a reasonable amount of time.") self.consumer.start() wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=60, From baf156937e2d6c5edccd74e3d7451d55fe1ad3c1 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Wed, 11 May 2016 14:35:47 -0700 Subject: [PATCH 2/2] add node information in deregistration timeout message --- tests/kafkatest/tests/core/replication_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py index 0681817ee65aa..f8150341c4999 100644 --- a/tests/kafkatest/tests/core/replication_test.py +++ b/tests/kafkatest/tests/core/replication_test.py @@ -69,7 +69,7 @@ def hard_bounce(test, broker_type): wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and not test.kafka.is_registered(prev_broker_node), timeout_sec=test.kafka.zk_session_timeout + 5, - err_msg="Failed to see timely deregistration of hard-killed broker") + err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(prev_broker_node.account)) test.kafka.start_node(prev_broker_node)