Skip to content

Commit

Permalink
KAFKA-13234; Transaction system test should clear URPs after broker r…
Browse files Browse the repository at this point in the history
…estarts (#11267)

Clearing under-replicated-partitions helps ensure that partitions do not become unavailable longer than necessary as brokers are rolled. This prevents flakiness due to transaction timeouts.

Reviewers: Luke Chen <showuon@gmail.com>, Ismael Juma <ismael@juma.me.uk>
  • Loading branch information
hachikuji committed Sep 1, 2021
1 parent d595fee commit 0eb09d3
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 1 deletion.
2 changes: 1 addition & 1 deletion tests/docker/ducker-ak
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ ducker_test() {
(test -f ./gradlew || gradle) && ./gradlew systemTestLibs
must_popd
if [[ "${debug}" -eq 1 ]]; then
local ducktape_cmd="python3.7 -m debugpy --listen 0.0.0.0:${debugpy_port} --wait-for-client /usr/local/bin/ducktape"
local ducktape_cmd="python3 -m debugpy --listen 0.0.0.0:${debugpy_port} --wait-for-client /usr/local/bin/ducktape"
else
local ducktape_cmd="ducktape"
fi
Expand Down
43 changes: 43 additions & 0 deletions tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,49 @@ def delete_topic(self, topic, node=None):
self.logger.info("Running topic delete command...\n%s" % cmd)
node.account.ssh(cmd)

def has_under_replicated_partitions(self):
"""
Check whether the cluster has under-replicated partitions.
:return True if there are under-replicated partitions, False otherwise.
"""
return len(self.describe_under_replicated_partitions()) > 0

def await_no_under_replicated_partitions(self, timeout_sec=30):
"""
Wait for all under-replicated partitions to clear.
:param timeout_sec: the maximum time in seconds to wait
"""
wait_until(lambda: not self.has_under_replicated_partitions(),
timeout_sec = timeout_sec,
err_msg="Timed out waiting for under-replicated-partitions to clear")

def describe_under_replicated_partitions(self):
"""
Use the topic tool to find the under-replicated partitions in the cluster.
:return the under-replicated partitions as a list of dictionaries
(e.g. [{"topic": "foo", "partition": 1}, {"topic": "bar", "partition": 0}, ... ])
"""

node = self.nodes[0]
force_use_zk_connection = not node.version.topic_command_supports_bootstrap_server()

cmd = fix_opts_for_new_jvm(node)
cmd += "%s --describe --under-replicated-partitions" % \
self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection)

self.logger.debug("Running topic command to describe under-replicated partitions\n%s" % cmd)
output = ""
for line in node.account.ssh_capture(cmd):
output += line

under_replicated_partitions = self.parse_describe_topic(output)["partitions"]
self.logger.debug("Found %d under-replicated-partitions" % len(under_replicated_partitions))

return under_replicated_partitions

def describe_topic(self, topic, node=None):
if node is None:
node = self.nodes[0]
Expand Down
2 changes: 2 additions & 0 deletions tests/kafkatest/tests/core/group_mode_transactions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ def bounce_brokers(self, clean_shutdown):
time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)
self.kafka.start_node(node)

self.kafka.await_no_under_replicated_partitions()

def create_and_start_message_copier(self, input_topic, output_topic, transactional_id):
message_copier = TransactionalMessageCopier(
context=self.test_context,
Expand Down
2 changes: 2 additions & 0 deletions tests/kafkatest/tests/core/transactions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ def bounce_brokers(self, clean_shutdown):
time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)
self.kafka.start_node(node)

self.kafka.await_no_under_replicated_partitions()

def create_and_start_message_copier(self, input_topic, input_partition, output_topic, transactional_id, use_group_metadata):
message_copier = TransactionalMessageCopier(
context=self.test_context,
Expand Down

0 comments on commit 0eb09d3

Please sign in to comment.