Skip to content

Commit 942b11b

Browse files
KAFKA-19321: Added share_consumer_performance.py and related system tests (#19836)
This PR includes some performance system tests utilizing the kafka-share-consumer-perf.sh tool for share groups Reviewers: Andrew Schofield <aschofield@confluent.io>
1 parent 8d0097f commit 942b11b

File tree

5 files changed

+309
-9
lines changed

5 files changed

+309
-9
lines changed

tests/kafkatest/benchmarks/core/benchmark_test.py

Lines changed: 126 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,20 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
from ducktape.utils.util import wait_until
1617
from ducktape.mark import matrix
1718
from ducktape.mark import parametrize
1819
from ducktape.mark.resource import cluster
1920
from ducktape.services.service import Service
2021
from ducktape.tests.test import Test
2122

2223
from kafkatest.services.kafka import KafkaService, quorum
23-
from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService, throughput, latency, compute_aggregate_throughput
24+
from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService, ShareConsumerPerformanceService, throughput, latency, compute_aggregate_throughput
25+
from kafkatest.services.security.security_config import SecurityConfig
2426
from kafkatest.version import DEV_BRANCH, KafkaVersion
2527

28+
import os
29+
2630
TOPIC_REP_ONE = "topic-replication-factor-one"
2731
TOPIC_REP_THREE = "topic-replication-factor-three"
2832
DEFAULT_RECORD_SIZE = 100 # bytes
@@ -232,6 +236,71 @@ def test_producer_and_consumer(self, compression_type="none", security_protocol=
232236
str(data)]
233237
self.logger.info("\n".join(summary))
234238
return data
239+
240+
@cluster(num_nodes=8)
241+
@matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'],
242+
compression_type=["none", "snappy"], metadata_quorum=[quorum.isolated_kraft], use_share_groups=[True])
243+
@matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"], metadata_quorum=[quorum.isolated_kraft],
244+
use_share_groups=[True])
245+
def test_producer_and_share_consumer(self, compression_type="none", security_protocol="PLAINTEXT", tls_version=None,
246+
interbroker_security_protocol=None, client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH),
247+
metadata_quorum=quorum.isolated_kraft, use_share_groups=True):
248+
"""
249+
Setup: 3 node kafka cluster
250+
Concurrently produce and consume 1e6 messages with a single producer and a single share consumer,
251+
252+
Return aggregate throughput statistics for both producer and share consumer.
253+
254+
(Under the hood, this runs ProducerPerformance.java, and ShareConsumerPerformance.java)
255+
"""
256+
client_version = KafkaVersion(client_version)
257+
broker_version = KafkaVersion(broker_version)
258+
self.validate_versions(client_version, broker_version)
259+
if interbroker_security_protocol is None:
260+
interbroker_security_protocol = security_protocol
261+
self.start_kafka(security_protocol, interbroker_security_protocol, broker_version, tls_version)
262+
num_records = 1000 * 1000 # 1e6
263+
264+
self.producer = ProducerPerformanceService(
265+
self.test_context, 1, self.kafka,
266+
topic=TOPIC_REP_THREE,
267+
num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, version=client_version,
268+
settings={
269+
'acks': 1,
270+
'compression.type': compression_type,
271+
'batch.size': self.batch_size,
272+
'buffer.memory': self.buffer_memory
273+
}
274+
)
275+
276+
share_group = "perf-share-consumer"
277+
278+
kafka_node = self.kafka.nodes[0]
279+
PERSISTENT_ROOT = "/mnt/share_consumer_performance"
280+
COMMAND_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "command.properties")
281+
282+
if security_protocol is not SecurityConfig.PLAINTEXT:
283+
prop_file = str(self.kafka.security_config.client_config())
284+
self.logger.debug(prop_file)
285+
kafka_node.account.ssh("mkdir -p %s" % PERSISTENT_ROOT, allow_fail=False)
286+
kafka_node.account.create_file(COMMAND_CONFIG_FILE, prop_file)
287+
288+
wait_until(lambda: self.kafka.set_share_group_offset_reset_strategy(group=share_group, strategy="earliest", command_config=COMMAND_CONFIG_FILE),
289+
timeout_sec=20, backoff_sec=2, err_msg="share.auto.offset.reset not set to earliest")
290+
291+
self.share_consumer = ShareConsumerPerformanceService(
292+
self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, messages=num_records, group=share_group, timeout=20000)
293+
Service.run_parallel(self.producer, self.share_consumer)
294+
295+
data = {
296+
"producer": compute_aggregate_throughput(self.producer),
297+
"share_consumer": compute_aggregate_throughput(self.share_consumer)
298+
}
299+
summary = [
300+
"Producer + share_consumer:",
301+
str(data)]
302+
self.logger.info("\n".join(summary))
303+
return data
235304

236305
@cluster(num_nodes=8)
237306
@matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'],
@@ -273,6 +342,62 @@ def test_consumer_throughput(self, compression_type="none", security_protocol="P
273342
self.consumer.group = "test-consumer-group"
274343
self.consumer.run()
275344
return compute_aggregate_throughput(self.consumer)
345+
346+
@cluster(num_nodes=8)
347+
@matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'],
348+
compression_type=["none", "snappy"], metadata_quorum=[quorum.isolated_kraft], use_share_groups=[True])
349+
@matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"], metadata_quorum=[quorum.isolated_kraft],
350+
use_share_groups=[True])
351+
def test_share_consumer_throughput(self, compression_type="none", security_protocol="PLAINTEXT", tls_version=None,
352+
interbroker_security_protocol=None, num_consumers=1, client_version=str(DEV_BRANCH),
353+
broker_version=str(DEV_BRANCH), metadata_quorum=quorum.isolated_kraft, use_share_groups=True):
354+
"""
355+
Consume 1e6 100-byte messages with 1 or more consumers from a topic with 6 partitions
356+
and report throughput.
357+
"""
358+
client_version = KafkaVersion(client_version)
359+
broker_version = KafkaVersion(broker_version)
360+
self.validate_versions(client_version, broker_version)
361+
if interbroker_security_protocol is None:
362+
interbroker_security_protocol = security_protocol
363+
self.start_kafka(security_protocol, interbroker_security_protocol, broker_version, tls_version)
364+
num_records = 1000 * 1000 # 1e6
365+
366+
# seed kafka w/messages
367+
self.producer = ProducerPerformanceService(
368+
self.test_context, 1, self.kafka,
369+
topic=TOPIC_REP_THREE,
370+
num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, version=client_version,
371+
settings={
372+
'acks': 1,
373+
'compression.type': compression_type,
374+
'batch.size': self.batch_size,
375+
'buffer.memory': self.buffer_memory
376+
}
377+
)
378+
self.producer.run()
379+
380+
share_group = "test-share-consumer-group"
381+
382+
kafka_node = self.kafka.nodes[0]
383+
PERSISTENT_ROOT = "/mnt/share_consumer_performance"
384+
COMMAND_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "command.properties")
385+
386+
if security_protocol is not SecurityConfig.PLAINTEXT:
387+
prop_file = str(self.kafka.security_config.client_config())
388+
self.logger.debug(prop_file)
389+
kafka_node.account.ssh("mkdir -p %s" % PERSISTENT_ROOT, allow_fail=False)
390+
kafka_node.account.create_file(COMMAND_CONFIG_FILE, prop_file)
391+
392+
wait_until(lambda: self.kafka.set_share_group_offset_reset_strategy(group=share_group, strategy="earliest", command_config=COMMAND_CONFIG_FILE),
393+
timeout_sec=20, backoff_sec=2, err_msg="share.auto.offset.reset not set to earliest")
394+
395+
# consume
396+
self.share_consumer = ShareConsumerPerformanceService(
397+
self.test_context, num_consumers, self.kafka,
398+
topic=TOPIC_REP_THREE, messages=num_records, group=share_group, timeout=20000)
399+
self.share_consumer.run()
400+
return compute_aggregate_throughput(self.share_consumer)
276401

277402
def validate_versions(self, client_version, broker_version):
278403
assert client_version <= broker_version, "Client version %s should be <= than broker version %s" (client_version, broker_version)

tests/kafkatest/sanity_checks/test_performance_services.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
from ducktape.utils.util import wait_until
1617
from ducktape.mark import matrix, parametrize
1718
from ducktape.mark.resource import cluster
1819
from ducktape.tests.test import Test
1920

2021
from kafkatest.services.kafka import KafkaService, quorum
21-
from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService
22+
from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService, ShareConsumerPerformanceService
2223
from kafkatest.services.performance import latency, compute_aggregate_throughput
23-
from kafkatest.version import DEV_BRANCH, LATEST_2_1, KafkaVersion
24+
from kafkatest.version import DEV_BRANCH, LATEST_2_1, V_4_1_0, KafkaVersion
2425

2526

2627
class PerformanceServiceTest(Test):
@@ -30,9 +31,10 @@ def __init__(self, test_context):
3031
self.num_records = 10000
3132
self.topic = "topic"
3233

33-
@cluster(num_nodes=5)
34-
@matrix(version=[str(LATEST_2_1), str(DEV_BRANCH)], metadata_quorum=quorum.all_kraft)
35-
def test_version(self, version=str(LATEST_2_1), metadata_quorum=quorum.zk):
34+
@cluster(num_nodes=6)
35+
@matrix(version=[str(LATEST_2_1)], metadata_quorum=quorum.all_kraft)
36+
@matrix(version=[str(DEV_BRANCH)], metadata_quorum=quorum.all_kraft, use_share_groups=[True])
37+
def test_version(self, version=str(LATEST_2_1), metadata_quorum=quorum.zk, use_share_groups=False):
3638
"""
3739
Sanity check out producer performance service - verify that we can run the service with a small
3840
number of messages. The actual stats here are pretty meaningless since the number of messages is quite small.
@@ -73,8 +75,24 @@ def test_version(self, version=str(LATEST_2_1), metadata_quorum=quorum.zk):
7375
consumer_perf_data = compute_aggregate_throughput(self.consumer_perf)
7476
assert consumer_perf_data['records_per_sec'] > 0
7577

76-
return {
78+
results = {
7779
"producer_performance": producer_perf_data,
7880
"end_to_end_latency": end_to_end_data,
79-
"consumer_performance": consumer_perf_data
81+
"consumer_performance": consumer_perf_data,
8082
}
83+
84+
if version >= V_4_1_0:
85+
# check basic run of share consumer performance service
86+
self.share_consumer_perf = ShareConsumerPerformanceService(
87+
self.test_context, 1, self.kafka,
88+
topic=self.topic, version=version, messages=self.num_records)
89+
share_group = "test-share-consumer-group"
90+
self.share_consumer_perf.group = share_group
91+
wait_until(lambda: self.kafka.set_share_group_offset_reset_strategy(group=share_group, strategy="earliest"),
92+
timeout_sec=20, backoff_sec=2, err_msg="share.auto.offset.reset not set to earliest")
93+
self.share_consumer_perf.run()
94+
share_consumer_perf_data = compute_aggregate_throughput(self.share_consumer_perf)
95+
assert share_consumer_perf_data['records_per_sec'] > 0
96+
results["share_consumer_performance"] = share_consumer_perf_data
97+
98+
return results

tests/kafkatest/services/kafka/kafka.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1728,6 +1728,29 @@ def check_protocol_errors(self, node):
17281728
return False
17291729
return True
17301730

1731+
def set_share_group_offset_reset_strategy(self, group, strategy=None, node=None, command_config=None):
1732+
""" Set the offset reset strategy config for the given group.
1733+
"""
1734+
if strategy is None:
1735+
return
1736+
if node is None:
1737+
node = self.nodes[0]
1738+
config_script = self.path.script("kafka-configs.sh", node)
1739+
1740+
if command_config is None:
1741+
command_config = ""
1742+
else:
1743+
command_config = "--command-config " + command_config
1744+
1745+
cmd = fix_opts_for_new_jvm(node)
1746+
cmd += "%s --bootstrap-server %s --group %s --alter --add-config \"share.auto.offset.reset=%s\" %s" % \
1747+
(config_script,
1748+
self.bootstrap_servers(self.security_protocol),
1749+
group,
1750+
strategy,
1751+
command_config)
1752+
return "Completed" in self.run_cli_tool(node, cmd)
1753+
17311754
def list_consumer_groups(self, node=None, command_config=None, state=None, type=None):
17321755
""" Get list of consumer groups.
17331756
"""
@@ -1750,7 +1773,7 @@ def list_consumer_groups(self, node=None, command_config=None, state=None, type=
17501773
if type is not None:
17511774
cmd += " --type %s" % type
17521775
return self.run_cli_tool(node, cmd)
1753-
1776+
17541777
def list_share_groups(self, node=None, command_config=None, state=None):
17551778
""" Get list of share groups.
17561779
"""

tests/kafkatest/services/performance/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@
1717
from .end_to_end_latency import EndToEndLatencyService
1818
from .producer_performance import ProducerPerformanceService
1919
from .consumer_performance import ConsumerPerformanceService
20+
from .share_consumer_performance import ShareConsumerPerformanceService
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
17+
import os
18+
19+
from kafkatest.services.kafka.util import fix_opts_for_new_jvm, get_log4j_config_param, get_log4j_config_for_tools
20+
from kafkatest.services.performance import PerformanceService
21+
from kafkatest.version import DEV_BRANCH
22+
23+
24+
class ShareConsumerPerformanceService(PerformanceService):
25+
"""
26+
See ShareConsumerPerformance tool as the source of truth on these settings, but for reference:
27+
28+
"topic", "REQUIRED: The topic to consume from."
29+
30+
"group", "The group id to consume on."
31+
32+
"fetch-size", "The amount of data to fetch in a single request."
33+
34+
"socket-buffer-size", "The size of the tcp RECV size."
35+
36+
"consumer.config", "Consumer config properties file."
37+
"""
38+
39+
# Root directory for persistent output
40+
PERSISTENT_ROOT = "/mnt/share_consumer_performance"
41+
LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
42+
STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "share_consumer_performance.stdout")
43+
STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "share_consumer_performance.stderr")
44+
LOG_FILE = os.path.join(LOG_DIR, "share_consumer_performance.log")
45+
CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "share_consumer.properties")
46+
47+
logs = {
48+
"share_consumer_performance_output": {
49+
"path": STDOUT_CAPTURE,
50+
"collect_default": True},
51+
"share_consumer_performance_stderr": {
52+
"path": STDERR_CAPTURE,
53+
"collect_default": True},
54+
"share_consumer_performance_log": {
55+
"path": LOG_FILE,
56+
"collect_default": True}
57+
}
58+
59+
def __init__(self, context, num_nodes, kafka, topic, messages, group="perf-share-consumer", version=DEV_BRANCH, timeout=10000, settings={}):
60+
super(ShareConsumerPerformanceService, self).__init__(context, num_nodes)
61+
self.kafka = kafka
62+
self.security_config = kafka.security_config.client_config()
63+
self.topic = topic
64+
self.messages = messages
65+
self.settings = settings
66+
self.group = group
67+
self.timeout = timeout
68+
69+
# These less-frequently used settings can be updated manually after instantiation
70+
self.fetch_size = None
71+
self.socket_buffer_size = None
72+
73+
for node in self.nodes:
74+
node.version = version
75+
76+
def args(self):
77+
"""Dictionary of arguments used to start the Share Consumer Performance script."""
78+
args = {
79+
'topic': self.topic,
80+
'messages': self.messages,
81+
'bootstrap-server': self.kafka.bootstrap_servers(self.security_config.security_protocol),
82+
'group': self.group,
83+
'timeout': self.timeout
84+
}
85+
86+
if self.fetch_size is not None:
87+
args['fetch-size'] = self.fetch_size
88+
89+
if self.socket_buffer_size is not None:
90+
args['socket-buffer-size'] = self.socket_buffer_size
91+
92+
return args
93+
94+
def start_cmd(self, node):
95+
cmd = fix_opts_for_new_jvm(node)
96+
cmd += "export LOG_DIR=%s;" % ShareConsumerPerformanceService.LOG_DIR
97+
cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
98+
cmd += " export KAFKA_LOG4J_OPTS=\"%s%s\";" % (get_log4j_config_param(node), get_log4j_config_for_tools(node))
99+
cmd += " %s" % self.path.script("kafka-share-consumer-perf-test.sh", node)
100+
for key, value in self.args().items():
101+
cmd += " --%s %s" % (key, value)
102+
103+
cmd += " --consumer.config %s" % ShareConsumerPerformanceService.CONFIG_FILE
104+
105+
for key, value in self.settings.items():
106+
cmd += " %s=%s" % (str(key), str(value))
107+
108+
cmd += " 2>> %(stderr)s | tee -a %(stdout)s" % {'stdout': ShareConsumerPerformanceService.STDOUT_CAPTURE,
109+
'stderr': ShareConsumerPerformanceService.STDERR_CAPTURE}
110+
return cmd
111+
112+
def _worker(self, idx, node):
113+
node.account.ssh("mkdir -p %s" % ShareConsumerPerformanceService.PERSISTENT_ROOT, allow_fail=False)
114+
115+
log_config = self.render(get_log4j_config_for_tools(node), log_file=ShareConsumerPerformanceService.LOG_FILE)
116+
node.account.create_file(get_log4j_config_for_tools(node), log_config)
117+
node.account.create_file(ShareConsumerPerformanceService.CONFIG_FILE, str(self.security_config))
118+
self.security_config.setup_node(node)
119+
120+
cmd = self.start_cmd(node)
121+
self.logger.debug("Share consumer performance %d command: %s", idx, cmd)
122+
last = None
123+
for line in node.account.ssh_capture(cmd):
124+
last = line
125+
126+
# Parse and save the last line's information
127+
if last is not None:
128+
parts = last.split(',')
129+
self.results[idx-1] = {
130+
'total_mb': float(parts[2]),
131+
'mbps': float(parts[3]),
132+
'records_per_sec': float(parts[5]),
133+
}

0 commit comments

Comments
 (0)