Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,6 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI
)
self.controller_quorum = self.remote_controller_quorum

if self.quorum_info.using_zk:
raise Exception("No need to test ZK mode")

Service.__init__(self, context, num_nodes)
JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
root=KafkaService.PERSISTENT_ROOT)
Expand Down Expand Up @@ -865,7 +862,6 @@ def start_node(self, node, timeout_sec=60, **kwargs):
self.logger.info(prop_file)
node.account.create_file(KafkaService.CONFIG_FILE, prop_file)
node.account.create_file(self.LOG4J_CONFIG, self.render('log4j.properties', log_dir=KafkaService.OPERATIONAL_LOG_DIR))

if self.quorum_info.using_kraft:
# format log directories if necessary
kafka_storage_script = self.path.script("kafka-storage.sh", node)
Expand Down
4 changes: 2 additions & 2 deletions tests/kafkatest/services/kafka/quorum.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ def for_test(test_context):
default_quorum_type = colocated_kraft
arg_name = 'metadata_quorum'
retval = default_quorum_type if not test_context.injected_args else test_context.injected_args.get(arg_name, default_quorum_type)
if retval not in all:
raise Exception("Unknown %s value provided for the test: %s" % (arg_name, retval))
# if retval not in all:
# raise Exception("Unknown %s value provided for the test: %s" % (arg_name, retval))
return retval

class ServiceQuorumInfo:
Expand Down
19 changes: 10 additions & 9 deletions tests/kafkatest/services/kafka/templates/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ group.initial.rebalance.delay.ms=100
log.cleaner.dedupe.buffer.size=33554432

############################# Settings for es #############################
create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy
#create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy
# enable store data in object storage
elasticstream.enable=true
elasticstream.endpoint=s3://
Expand All @@ -163,11 +163,12 @@ s3.stream.object.compaction.interval.minutes=3
s3.stream.object.compaction.max.size.bytes=104857600

############################# Settings for auto balancer #############################
metric.reporters=kafka.autobalancer.metricsreporter.AutoBalancerMetricsReporter
autobalancer.topic="AutoBalancerMetricsReporterTopic"
autobalancer.topic.num.partitions=1
autobalancer.reporter.metrics.reporting.interval.ms=5000
# 10MB/s
autobalancer.reporter.network.in.capacity=10485760
# 10MB/s
autobalancer.reporter.network.out.capacity=10485760
# TODO: autobalancer switch
#metric.reporters=kafka.autobalancer.metricsreporter.AutoBalancerMetricsReporter
#autobalancer.topic="AutoBalancerMetricsReporterTopic"
#autobalancer.topic.num.partitions=1
#autobalancer.reporter.metrics.reporting.interval.ms=5000
## 10MB/s
#autobalancer.reporter.network.in.capacity=10485760
## 10MB/s
#autobalancer.reporter.network.out.capacity=10485760
85 changes: 46 additions & 39 deletions tests/kafkatest/tests/connect/connect_distributed_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,17 @@ def __init__(self, test_context):
self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
self.schemas = True

def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, timestamp_type=None, broker_version=DEV_BRANCH, auto_create_topics=False, include_filestream_connectors=False):
def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, timestamp_type=None, broker_version=DEV_BRANCH, auto_create_topics=False, include_filestream_connectors=False, kraft=True):
self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
topics=self.topics, version=broker_version,
server_prop_overrides=[
["auto.create.topics.enable", str(auto_create_topics)],
["transaction.state.log.replication.factor", str(self.num_brokers)],
["transaction.state.log.min.isr", str(self.num_brokers)]
])
],
allow_zk_with_kraft=kraft
)
if timestamp_type is not None:
for node in self.kafka.nodes:
node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
Expand Down Expand Up @@ -699,51 +701,56 @@ def test_transformations(self, connect_protocol, metadata_quorum):
assert obj['payload'][ts_fieldname] == ts

@cluster(num_nodes=5)
@parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned')
@parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='sessioned')
@parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager')
@parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager')
@parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager')
@parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager')
@parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager')
@parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager')
@parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager')
@parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager')
@parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager')
@parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager')
@parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='eager')
def test_broker_compatibility(self, broker_version, auto_create_topics, exactly_once_source, connect_protocol):
@parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.remote_kraft)
# FIXME: unknown reason kafka 2.x.x broker startup will encounter ducker host resolve fail.
# @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk)
@parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk)
@parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk)
# broker < 1_0 not support jdk17
# @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk)
@parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned', metadata_quorum=quorum.remote_kraft)
# @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='sessioned', metadata_quorum=quorum.zk)
@parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.remote_kraft)
# @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk)
@parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk)
@parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk)
@parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.remote_kraft)
# @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
@parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
@parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
def test_broker_compatibility(self, broker_version, auto_create_topics, exactly_once_source, connect_protocol, metadata_quorum):
"""
Verify that Connect will start up with various broker versions with various configurations.
When Connect distributed starts up, it either creates internal topics (v0.10.1.0 and after)
or relies upon the broker to auto-create the topics (v0.10.0.x and before).
"""
self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source else 'disabled'
self.CONNECT_PROTOCOL = connect_protocol
kraft = False
if broker_version == str(DEV_BRANCH):
kraft = True
self.setup_services(broker_version=KafkaVersion(broker_version), auto_create_topics=auto_create_topics,
security_protocol=SecurityConfig.PLAINTEXT, include_filestream_connectors=True)
security_protocol=SecurityConfig.PLAINTEXT, include_filestream_connectors=True, kraft=kraft)
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))

self.cc.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

from ducktape.mark import parametrize
from ducktape.mark import ignore
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
Expand Down Expand Up @@ -62,7 +63,7 @@ def __init__(self, test_context):
def setUp(self):
self.zk.start()


@ignore # AutoMQ won't release stream jar, so we only need AutoMQ can support stream
@cluster(num_nodes=4)
@parametrize(broker_version=str(LATEST_3_3))
@parametrize(broker_version=str(LATEST_3_2))
Expand Down Expand Up @@ -96,6 +97,7 @@ def test_compatible_brokers_eos_disabled(self, broker_version):
self.consumer.stop()
self.kafka.stop()

@ignore
@cluster(num_nodes=4)
@parametrize(broker_version=str(LATEST_3_3))
@parametrize(broker_version=str(LATEST_3_2))
Expand Down Expand Up @@ -129,6 +131,7 @@ def test_compatible_brokers_eos_alpha_enabled(self, broker_version):
self.consumer.stop()
self.kafka.stop()

@ignore
@cluster(num_nodes=4)
@parametrize(broker_version=str(LATEST_3_3))
@parametrize(broker_version=str(LATEST_3_2))
Expand All @@ -154,6 +157,7 @@ def test_compatible_brokers_eos_v2_enabled(self, broker_version):
self.consumer.stop()
self.kafka.stop()

@ignore
@cluster(num_nodes=4)
@parametrize(broker_version=str(LATEST_0_10_2))
@parametrize(broker_version=str(LATEST_0_10_1))
Expand All @@ -172,6 +176,7 @@ def test_fail_fast_on_incompatible_brokers(self, broker_version):

self.kafka.stop()

@ignore
@cluster(num_nodes=4)
@parametrize(broker_version=str(LATEST_2_4))
@parametrize(broker_version=str(LATEST_2_3))
Expand Down