diff --git a/tests/docker/ducker-ak b/tests/docker/ducker-ak index 15c4b7c65392..8047b8fa63e8 100755 --- a/tests/docker/ducker-ak +++ b/tests/docker/ducker-ak @@ -480,7 +480,7 @@ ducker_test() { (test -f ./gradlew || gradle) && ./gradlew systemTestLibs must_popd if [[ "${debug}" -eq 1 ]]; then - local ducktape_cmd="python3.7 -m debugpy --listen 0.0.0.0:${debugpy_port} --wait-for-client /usr/local/bin/ducktape" + local ducktape_cmd="python3 -m debugpy --listen 0.0.0.0:${debugpy_port} --wait-for-client /usr/local/bin/ducktape" else local ducktape_cmd="ducktape" fi diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 99013620cc63..e0570154aa64 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -1125,6 +1125,49 @@ def delete_topic(self, topic, node=None): self.logger.info("Running topic delete command...\n%s" % cmd) node.account.ssh(cmd) + def has_under_replicated_partitions(self): + """ + Check whether the cluster has under-replicated partitions. + + :return True if there are under-replicated partitions, False otherwise. + """ + return len(self.describe_under_replicated_partitions()) > 0 + + def await_no_under_replicated_partitions(self, timeout_sec=30): + """ + Wait for all under-replicated partitions to clear. + + :param timeout_sec: the maximum time in seconds to wait + """ + wait_until(lambda: not self.has_under_replicated_partitions(), + timeout_sec = timeout_sec, + err_msg="Timed out waiting for under-replicated-partitions to clear") + + def describe_under_replicated_partitions(self): + """ + Use the topic tool to find the under-replicated partitions in the cluster. + + :return the under-replicated partitions as a list of dictionaries + (e.g. [{"topic": "foo", "partition": 1}, {"topic": "bar", "partition": 0}, ... ]) + """ + + node = self.nodes[0] + force_use_zk_connection = not node.version.topic_command_supports_bootstrap_server() + + cmd = fix_opts_for_new_jvm(node) + cmd += "%s --describe --under-replicated-partitions" % \ + self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection) + + self.logger.debug("Running topic command to describe under-replicated partitions\n%s" % cmd) + output = "" + for line in node.account.ssh_capture(cmd): + output += line + + under_replicated_partitions = self.parse_describe_topic(output)["partitions"] + self.logger.debug("Found %d under-replicated-partitions" % len(under_replicated_partitions)) + + return under_replicated_partitions + def describe_topic(self, topic, node=None): if node is None: node = self.nodes[0] diff --git a/tests/kafkatest/tests/core/group_mode_transactions_test.py b/tests/kafkatest/tests/core/group_mode_transactions_test.py index 3b3fd1fdc659..bb2749e21075 100644 --- a/tests/kafkatest/tests/core/group_mode_transactions_test.py +++ b/tests/kafkatest/tests/core/group_mode_transactions_test.py @@ -110,6 +110,8 @@ def bounce_brokers(self, clean_shutdown): time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs) self.kafka.start_node(node) + self.kafka.await_no_under_replicated_partitions() + def create_and_start_message_copier(self, input_topic, output_topic, transactional_id): message_copier = TransactionalMessageCopier( context=self.test_context, diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index cfb3b55260f9..1f565b5196fd 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -108,6 +108,8 @@ def bounce_brokers(self, clean_shutdown): time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs) self.kafka.start_node(node) + self.kafka.await_no_under_replicated_partitions() + def create_and_start_message_copier(self, input_topic, input_partition, output_topic, transactional_id, use_group_metadata): message_copier = TransactionalMessageCopier( context=self.test_context,