Skip to content

Commit

Permalink
MINOR: Add 2.0, 2.1 and 2.2 to broker and client compat tests
Browse files Browse the repository at this point in the history
These are important to ensure we don't break compatibility.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Gwen Shapira

Closes #6794 from ijuma/update-version-compat-tests
  • Loading branch information
ijuma authored and gwenshap committed May 23, 2019
1 parent 3696b98 commit c823f32
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 6 deletions.
2 changes: 2 additions & 0 deletions tests/docker/Dockerfile
Expand Up @@ -54,6 +54,8 @@ RUN mkdir -p "/opt/kafka-1.0.2" && chmod a+rw /opt/kafka-1.0.2 && curl -s "$KAFK
RUN mkdir -p "/opt/kafka-1.1.1" && chmod a+rw /opt/kafka-1.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.1.1"
RUN mkdir -p "/opt/kafka-2.0.1" && chmod a+rw /opt/kafka-2.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.0.1"
RUN mkdir -p "/opt/kafka-2.1.0" && chmod a+rw /opt/kafka-2.1.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.1.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.1.0"
RUN mkdir -p "/opt/kafka-2.1.1" && chmod a+rw /opt/kafka-2.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.1.1"
RUN mkdir -p "/opt/kafka-2.2.0" && chmod a+rw /opt/kafka-2.2.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.2.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.2.0"

# Streams test dependencies
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar
Expand Down
Expand Up @@ -23,7 +23,7 @@
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from ducktape.tests.test import Test
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, V_0_11_0_0, V_0_10_1_0, KafkaVersion
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, V_0_11_0_0, V_0_10_1_0, KafkaVersion

def get_broker_features(broker_version):
features = {}
Expand Down Expand Up @@ -104,6 +104,9 @@ def invoke_compatibility_program(self, features):
@parametrize(broker_version=str(LATEST_0_11_0))
@parametrize(broker_version=str(LATEST_1_0))
@parametrize(broker_version=str(LATEST_1_1))
@parametrize(broker_version=str(LATEST_2_0))
@parametrize(broker_version=str(LATEST_2_1))
@parametrize(broker_version=str(LATEST_2_2))
def run_compatibility_test(self, broker_version):
self.zk.start()
self.kafka.set_version(KafkaVersion(broker_version))
Expand Down
Expand Up @@ -22,7 +22,7 @@
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int_with_prefix
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, KafkaVersion
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, KafkaVersion

class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
"""
Expand Down Expand Up @@ -59,6 +59,9 @@ def min_cluster_size(self):
@parametrize(broker_version=str(LATEST_0_11_0))
@parametrize(broker_version=str(LATEST_1_0))
@parametrize(broker_version=str(LATEST_1_1))
@parametrize(broker_version=str(LATEST_2_0))
@parametrize(broker_version=str(LATEST_2_1))
@parametrize(broker_version=str(LATEST_2_2))
def test_produce_consume(self, broker_version):
print("running producer_consumer_compat with broker_version = %s" % broker_version)
self.kafka.set_version(KafkaVersion(broker_version))
Expand Down
Expand Up @@ -23,7 +23,7 @@
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, DEV_BRANCH, KafkaVersion
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, DEV_BRANCH, KafkaVersion

# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x)
class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
Expand All @@ -48,6 +48,9 @@ def setUp(self):
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH), compression_types=["none"], timestamp_type=str("LogAppendTime"))
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("CreateTime"))
@parametrize(producer_version=str(LATEST_2_2), consumer_version=str(LATEST_2_2), compression_types=["none"], timestamp_type=str("CreateTime"))
@parametrize(producer_version=str(LATEST_2_1), consumer_version=str(LATEST_2_1), compression_types=["zstd"], timestamp_type=str("CreateTime"))
@parametrize(producer_version=str(LATEST_2_0), consumer_version=str(LATEST_2_0), compression_types=["snappy"], timestamp_type=str("CreateTime"))
@parametrize(producer_version=str(LATEST_1_1), consumer_version=str(LATEST_1_1), compression_types=["lz4"], timestamp_type=str("CreateTime"))
@parametrize(producer_version=str(LATEST_1_0), consumer_version=str(LATEST_1_0), compression_types=["none"], timestamp_type=str("CreateTime"))
@parametrize(producer_version=str(LATEST_0_11_0), consumer_version=str(LATEST_0_11_0), compression_types=["gzip"], timestamp_type=str("CreateTime"))
Expand Down
8 changes: 7 additions & 1 deletion tests/kafkatest/tests/core/upgrade_test.py
Expand Up @@ -23,7 +23,7 @@
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, V_0_9_0_0, DEV_BRANCH, KafkaVersion
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, V_0_9_0_0, DEV_BRANCH, KafkaVersion

class TestUpgrade(ProduceConsumeValidateTest):

Expand Down Expand Up @@ -60,6 +60,12 @@ def perform_upgrade(self, from_kafka_version, to_message_format_version=None):
self.kafka.start_node(node)

@cluster(num_nodes=6)
@parametrize(from_kafka_version=str(LATEST_2_2), to_message_format_version=None, compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_2_2), to_message_format_version=None, compression_types=["zstd"])
@parametrize(from_kafka_version=str(LATEST_2_1), to_message_format_version=None, compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_2_1), to_message_format_version=None, compression_types=["lz4"])
@parametrize(from_kafka_version=str(LATEST_2_0), to_message_format_version=None, compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_2_0), to_message_format_version=None, compression_types=["snappy"])
@parametrize(from_kafka_version=str(LATEST_1_1), to_message_format_version=None, compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_1_1), to_message_format_version=None, compression_types=["lz4"])
@parametrize(from_kafka_version=str(LATEST_1_0), to_message_format_version=None, compression_types=["none"])
Expand Down
6 changes: 5 additions & 1 deletion tests/kafkatest/version.py
Expand Up @@ -116,4 +116,8 @@ def get_version(node=None):

# 2.1.x versions
V_2_1_0 = KafkaVersion("2.1.0")
LATEST_2_1 = V_2_1_0
V_2_1_1 = KafkaVersion("2.1.1")
LATEST_2_1 = V_2_1_1

V_2_2_0 = KafkaVersion("2.2.0")
LATEST_2_2 = V_2_2_0
5 changes: 4 additions & 1 deletion vagrant/base.sh
Expand Up @@ -131,7 +131,10 @@ get_kafka 2.0.1 2.12
chmod a+rw /opt/kafka-2.0.1
get_kafka 2.1.0 2.12
chmod a+rw /opt/kafka-2.1.0

get_kafka 2.1.1 2.12
chmod a+rw /opt/kafka-2.1.1
get_kafka 2.2.0 2.12
chmod a+rw /opt/kafka-2.2.0

# For EC2 nodes, we want to use /mnt, which should have the local disk. On local
# VMs, we can just create it if it doesn't exist and use it like we'd use
Expand Down

0 comments on commit c823f32

Please sign in to comment.