Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15093: Add 3.4 and 3.5 to core upgrade and compatibility tests #13859

Merged
merged 1 commit into from Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -28,7 +28,7 @@
from ducktape.tests.test import Test
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, \
LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, \
LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, V_0_11_0_0, V_0_10_1_0, KafkaVersion
LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, V_0_11_0_0, V_0_10_1_0, KafkaVersion

def get_broker_features(broker_version):
features = {}
Expand Down Expand Up @@ -136,6 +136,8 @@ def invoke_compatibility_program(self, features):
@parametrize(broker_version=str(LATEST_3_1))
@parametrize(broker_version=str(LATEST_3_2))
@parametrize(broker_version=str(LATEST_3_3))
@parametrize(broker_version=str(LATEST_3_4))
@parametrize(broker_version=str(LATEST_3_5))
def run_compatibility_test(self, broker_version, metadata_quorum=quorum.zk):
if self.zk:
self.zk.start()
Expand Down
Expand Up @@ -25,7 +25,7 @@
from kafkatest.utils import is_int_with_prefix
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, \
LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, \
LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, KafkaVersion
LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, KafkaVersion

class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
"""
Expand Down Expand Up @@ -77,6 +77,8 @@ def min_cluster_size(self):
@parametrize(broker_version=str(LATEST_3_1))
@parametrize(broker_version=str(LATEST_3_2))
@parametrize(broker_version=str(LATEST_3_3))
@parametrize(broker_version=str(LATEST_3_4))
@parametrize(broker_version=str(LATEST_3_5))
def test_produce_consume(self, broker_version, metadata_quorum=quorum.zk):
print("running producer_consumer_compat with broker_version = %s" % broker_version, flush=True)
self.kafka.set_version(KafkaVersion(broker_version))
Expand Down
Expand Up @@ -23,7 +23,7 @@
from kafkatest.utils import is_int
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, \
LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \
LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, DEV_BRANCH, KafkaVersion
LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, DEV_BRANCH, KafkaVersion

# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x)
class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
Expand Down Expand Up @@ -60,6 +60,8 @@ def setUp(self):
@matrix(producer_version=[str(LATEST_3_1)], consumer_version=[str(LATEST_3_1)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_3_2)], consumer_version=[str(LATEST_3_2)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_3_3)], consumer_version=[str(LATEST_3_3)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_3_4)], consumer_version=[str(LATEST_3_4)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_3_5)], consumer_version=[str(LATEST_3_5)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_1)], consumer_version=[str(LATEST_2_1)], compression_types=[["zstd"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_0)], consumer_version=[str(LATEST_2_0)], compression_types=[["snappy"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_1_1)], consumer_version=[str(LATEST_1_1)], compression_types=[["lz4"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
Expand Down
8 changes: 7 additions & 1 deletion tests/kafkatest/tests/core/downgrade_test.py
Expand Up @@ -20,7 +20,7 @@
from kafkatest.services.kafka import config_property
from kafkatest.tests.end_to_end import EndToEndTest
from kafkatest.version import LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, \
LATEST_2_6, LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, DEV_BRANCH, KafkaVersion
LATEST_2_6, LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, DEV_BRANCH, KafkaVersion

class TestDowngrade(EndToEndTest):
PARTITIONS = 3
Expand Down Expand Up @@ -80,6 +80,12 @@ def wait_until_rejoin(self):
timeout_sec=60, backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time")

@cluster(num_nodes=7)
@parametrize(version=str(LATEST_3_5), compression_types=["snappy"])
@parametrize(version=str(LATEST_3_5), compression_types=["zstd"], security_protocol="SASL_SSL")
@matrix(version=[str(LATEST_3_5)], compression_types=[["none"]], static_membership=[False, True])
@parametrize(version=str(LATEST_3_4), compression_types=["snappy"])
@parametrize(version=str(LATEST_3_4), compression_types=["zstd"], security_protocol="SASL_SSL")
@matrix(version=[str(LATEST_3_4)], compression_types=[["none"]], static_membership=[False, True])
@parametrize(version=str(LATEST_3_3), compression_types=["snappy"])
@parametrize(version=str(LATEST_3_3), compression_types=["zstd"], security_protocol="SASL_SSL")
@matrix(version=[str(LATEST_3_3)], compression_types=[["none"]], static_membership=[False, True])
Expand Down
8 changes: 7 additions & 1 deletion tests/kafkatest/tests/core/upgrade_test.py
Expand Up @@ -26,7 +26,7 @@
from kafkatest.utils.remote_account import java_version
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, \
LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, \
LATEST_2_6, LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, V_0_11_0_0, V_2_8_0, V_3_0_0, \
LATEST_2_6, LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, V_0_11_0_0, V_2_8_0, V_3_0_0, \
DEV_BRANCH, KafkaVersion
from kafkatest.services.kafka.util import new_jdk_not_supported

Expand Down Expand Up @@ -94,6 +94,12 @@ def perform_upgrade(self, from_kafka_version, to_message_format_version=None):
self.wait_until_rejoin()

@cluster(num_nodes=6)
@parametrize(from_kafka_version=str(LATEST_3_5), to_message_format_version=None, compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_3_5), to_message_format_version=None, compression_types=["lz4"])
@parametrize(from_kafka_version=str(LATEST_3_5), to_message_format_version=None, compression_types=["snappy"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A general question because I have been going into this part of the codebase - is there a reason why the other two compression types are not tested (i.e. gzip, zstd)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I don't know. At this point, we've copied the same lines as the previous releases. Maybe this could be improved. If so I'd rather separate it from this PR and handle this out of a release.

Copy link
Collaborator

@clolov clolov Jun 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I agree, this definitely needs to not be in this pull request, I was just wondering in case the answer was on top of your head

@parametrize(from_kafka_version=str(LATEST_3_4), to_message_format_version=None, compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_3_4), to_message_format_version=None, compression_types=["lz4"])
@parametrize(from_kafka_version=str(LATEST_3_4), to_message_format_version=None, compression_types=["snappy"])
@parametrize(from_kafka_version=str(LATEST_3_3), to_message_format_version=None, compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_3_3), to_message_format_version=None, compression_types=["lz4"])
@parametrize(from_kafka_version=str(LATEST_3_3), to_message_format_version=None, compression_types=["snappy"])
Expand Down