-
Notifications
You must be signed in to change notification settings - Fork 13.8k
/
test_bounce.py
78 lines (69 loc) · 3.88 KB
/
test_bounce.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
class TestBounce(Test):
"""Sanity checks on verifiable producer service class with cluster roll."""
def __init__(self, test_context):
super(TestBounce, self).__init__(test_context)
quorum_size_arg_name = 'quorum_size'
default_quorum_size = 1
quorum_size = default_quorum_size if not test_context.injected_args else test_context.injected_args.get(quorum_size_arg_name, default_quorum_size)
if quorum_size < 1:
raise Exception("Illegal %s value provided for the test: %s" % (quorum_size_arg_name, quorum_size))
self.topic = "topic"
self.zk = ZookeeperService(test_context, num_nodes=quorum_size) if quorum.for_test(test_context) == quorum.zk else None
num_kafka_nodes = quorum_size if quorum.for_test(test_context) == quorum.combined_kraft else 1
self.kafka = KafkaService(test_context, num_nodes=num_kafka_nodes, zk=self.zk,
topics={self.topic: {"partitions": 1, "replication-factor": 1}},
controller_num_nodes_override=quorum_size)
self.num_messages = 1000
def create_producer(self):
# This will produce to source kafka cluster
self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic,
max_messages=self.num_messages, throughput=self.num_messages // 10)
def setUp(self):
if self.zk:
self.zk.start()
# ZooKeeper and KRaft, quorum size = 1
@cluster(num_nodes=4)
@matrix(metadata_quorum=quorum.all, quorum_size=[1])
# Isolated and Combined KRaft, quorum size = 3
@cluster(num_nodes=6)
@matrix(metadata_quorum=quorum.all_kraft, quorum_size=[3])
def test_simple_run(self, metadata_quorum, quorum_size):
"""
Test that we can start VerifiableProducer on the current branch snapshot version, and
verify that we can produce a small number of messages both before and after a subsequent roll.
"""
self.kafka.start()
for first_time in [True, False]:
self.create_producer()
self.producer.start()
wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15,
err_msg="Producer failed to start in a reasonable amount of time.")
self.producer.wait()
num_produced = self.producer.num_acked
assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages)
if first_time:
self.producer.stop()
if self.kafka.quorum_info.using_kraft and self.kafka.isolated_controller_quorum:
self.kafka.isolated_controller_quorum.restart_cluster()
self.kafka.restart_cluster()