Skip to content

Commit

Permalink
Revert unstable changes to upgrade/downgrade test
Browse files Browse the repository at this point in the history
  • Loading branch information
hachikuji committed Sep 1, 2021
1 parent ed177fb commit 9813c52
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
9 changes: 7 additions & 2 deletions tests/kafkatest/tests/core/downgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def upgrade_from(self, kafka_version):
node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = str(kafka_version)
node.config[config_property.MESSAGE_FORMAT_VERSION] = str(kafka_version)
self.kafka.start_node(node)
self.kafka.await_no_under_replicated_partitions(timeout_sec=60)
self.wait_until_rejoin()

def downgrade_to(self, kafka_version):
for node in self.kafka.nodes:
Expand All @@ -50,7 +50,7 @@ def downgrade_to(self, kafka_version):
del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION]
del node.config[config_property.MESSAGE_FORMAT_VERSION]
self.kafka.start_node(node)
self.kafka.await_no_under_replicated_partitions(timeout_sec=60)
self.wait_until_rejoin()

def setup_services(self, kafka_version, compression_types, security_protocol, static_membership):
self.create_zookeeper_if_necessary()
Expand All @@ -73,6 +73,11 @@ def setup_services(self, kafka_version, compression_types, security_protocol, st

self.consumer.start()

def wait_until_rejoin(self):
for partition in range(0, self.PARTITIONS):
wait_until(lambda: len(self.kafka.isr_idx_list(self.topic, partition)) == self.REPLICATION_FACTOR,
timeout_sec=60, backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time")

@cluster(num_nodes=7)
@parametrize(version=str(LATEST_2_8), compression_types=["snappy"])
@parametrize(version=str(LATEST_2_8), compression_types=["zstd"], security_protocol="SASL_SSL")
Expand Down
9 changes: 7 additions & 2 deletions tests/kafkatest/tests/core/upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ def setUp(self):
self.num_producers = 1
self.num_consumers = 1

def wait_until_rejoin(self):
for partition in range(0, self.partitions):
wait_until(lambda: len(self.kafka.isr_idx_list(self.topic, partition)) == self.replication_factor, timeout_sec=60,
backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time")

def perform_upgrade(self, from_kafka_version, to_message_format_version=None):
self.logger.info("Upgrade ZooKeeper from %s to %s" % (str(self.zk.nodes[0].version), str(DEV_BRANCH)))
self.zk.set_version(DEV_BRANCH)
Expand Down Expand Up @@ -69,7 +74,7 @@ def perform_upgrade(self, from_kafka_version, to_message_format_version=None):
node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = from_kafka_version
node.config[config_property.MESSAGE_FORMAT_VERSION] = from_kafka_version
self.kafka.start_node(node)
self.kafka.await_no_under_replicated_partitions(timeout_sec=60)
self.wait_until_rejoin()

self.logger.info("Third pass bounce - remove inter.broker.protocol.version config")
for node in self.kafka.nodes:
Expand All @@ -83,7 +88,7 @@ def perform_upgrade(self, from_kafka_version, to_message_format_version=None):
node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = str(V_2_8_0)
node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version
self.kafka.start_node(node)
self.kafka.await_no_under_replicated_partitions(timeout_sec=60)
self.wait_until_rejoin()

@cluster(num_nodes=6)
@parametrize(from_kafka_version=str(LATEST_2_8), to_message_format_version=None, compression_types=["none"])
Expand Down

0 comments on commit 9813c52

Please sign in to comment.