diff --git a/tests/kafkatest/tests/core/downgrade_test.py b/tests/kafkatest/tests/core/downgrade_test.py index bd306d83c204..2ec453a90243 100644 --- a/tests/kafkatest/tests/core/downgrade_test.py +++ b/tests/kafkatest/tests/core/downgrade_test.py @@ -41,7 +41,7 @@ def upgrade_from(self, kafka_version): node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = str(kafka_version) node.config[config_property.MESSAGE_FORMAT_VERSION] = str(kafka_version) self.kafka.start_node(node) - self.kafka.await_no_under_replicated_partitions(timeout_sec=60) + self.wait_until_rejoin() def downgrade_to(self, kafka_version): for node in self.kafka.nodes: @@ -50,7 +50,7 @@ def downgrade_to(self, kafka_version): del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] del node.config[config_property.MESSAGE_FORMAT_VERSION] self.kafka.start_node(node) - self.kafka.await_no_under_replicated_partitions(timeout_sec=60) + self.wait_until_rejoin() def setup_services(self, kafka_version, compression_types, security_protocol, static_membership): self.create_zookeeper_if_necessary() @@ -73,6 +73,11 @@ def setup_services(self, kafka_version, compression_types, security_protocol, st self.consumer.start() + def wait_until_rejoin(self): + for partition in range(0, self.PARTITIONS): + wait_until(lambda: len(self.kafka.isr_idx_list(self.topic, partition)) == self.REPLICATION_FACTOR, + timeout_sec=60, backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time") + @cluster(num_nodes=7) @parametrize(version=str(LATEST_2_8), compression_types=["snappy"]) @parametrize(version=str(LATEST_2_8), compression_types=["zstd"], security_protocol="SASL_SSL") diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py index 339424c64b83..7fd4ab84a96b 100644 --- a/tests/kafkatest/tests/core/upgrade_test.py +++ b/tests/kafkatest/tests/core/upgrade_test.py @@ -42,6 +42,11 @@ def setUp(self): self.num_producers = 1 self.num_consumers = 1 + def wait_until_rejoin(self): + for partition in range(0, self.partitions): + wait_until(lambda: len(self.kafka.isr_idx_list(self.topic, partition)) == self.replication_factor, timeout_sec=60, + backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time") + def perform_upgrade(self, from_kafka_version, to_message_format_version=None): self.logger.info("Upgrade ZooKeeper from %s to %s" % (str(self.zk.nodes[0].version), str(DEV_BRANCH))) self.zk.set_version(DEV_BRANCH) @@ -69,7 +74,7 @@ def perform_upgrade(self, from_kafka_version, to_message_format_version=None): node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = from_kafka_version node.config[config_property.MESSAGE_FORMAT_VERSION] = from_kafka_version self.kafka.start_node(node) - self.kafka.await_no_under_replicated_partitions(timeout_sec=60) + self.wait_until_rejoin() self.logger.info("Third pass bounce - remove inter.broker.protocol.version config") for node in self.kafka.nodes: @@ -83,7 +88,7 @@ def perform_upgrade(self, from_kafka_version, to_message_format_version=None): node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = str(V_2_8_0) node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version self.kafka.start_node(node) - self.kafka.await_no_under_replicated_partitions(timeout_sec=60) + self.wait_until_rejoin() @cluster(num_nodes=6) @parametrize(from_kafka_version=str(LATEST_2_8), to_message_format_version=None, compression_types=["none"])