Skip to content

Commit

Permalink
KAFKA-16274: Update replica_scale_test.py to support KIP-848’s group …
Browse files Browse the repository at this point in the history
…protocol config (apache#15577)

Added a new optional group_protocol parameter to the test methods, then passed that down to the methods involved.

Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
  • Loading branch information
kirktrue authored and clolov committed Apr 5, 2024
1 parent 69a5f82 commit 01972eb
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions tests/kafkatest/tests/core/replica_scale_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
from kafkatest.services.trogdor.consume_bench_workload import ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec
from kafkatest.services.trogdor.task_spec import TaskSpec
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.kafka import KafkaService, quorum, consumer_group
from kafkatest.services.trogdor.trogdor import TrogdorService
from kafkatest.services.zookeeper import ZookeeperService

Expand Down Expand Up @@ -52,18 +52,19 @@ def teardown(self):
topic_count=[50],
partition_count=[34],
replication_factor=[3],
metadata_quorum=[quorum.zk],
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
use_new_coordinator=[False]
)
@matrix(
topic_count=[50],
partition_count=[34],
replication_factor=[3],
metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False]
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_produce_consume(self, topic_count, partition_count, replication_factor,
metadata_quorum=quorum.zk, use_new_coordinator=False):
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
topics_create_start_time = time.time()
for i in range(topic_count):
topic = "replicas_produce_consume_%d" % i
Expand Down Expand Up @@ -101,12 +102,13 @@ def test_produce_consume(self, topic_count, partition_count, replication_factor,
produce_workload.wait_for_done(timeout_sec=600)
print("Completed produce bench", flush=True) # Force some stdout for Travis

consumer_conf = consumer_group.maybe_set_group_protocol(group_protocol)
consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
consumer_workload_service.consumer_node,
consumer_workload_service.bootstrap_servers,
target_messages_per_sec=150000,
max_messages=3400000,
consumer_conf={},
consumer_conf=consumer_conf,
admin_client_conf={},
common_client_conf={},
active_topics=["replicas_produce_consume_[0-2]"])
Expand Down

0 comments on commit 01972eb

Please sign in to comment.