Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16810: Add flag to read from specific partitions while checking consumer performance #15905

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static joptsimple.util.RegexMatcher.regex;

Expand Down Expand Up @@ -133,8 +137,29 @@ private static void consume(KafkaConsumer<byte[], byte[]> consumer,
long reportingIntervalMs = options.reportingIntervalMs();
boolean showDetailedStats = options.showDetailedStats();
SimpleDateFormat dateFormat = options.dateFormat();
consumer.subscribe(options.topic(),
new ConsumerPerfRebListener(joinTimeMs, joinStartMs, joinTimeMsInSingleRound));

Optional<PartitionAndOffsets> partitionAssignment = options.partitionAssignment();

if (partitionAssignment.isPresent()) {
PartitionAndOffsets assignment = partitionAssignment.get();

// TODO: Support multiple topics
String topic = options.topic().iterator().next();

List<TopicPartition> partitions = Arrays.stream(assignment.partitions)
.mapToObj(partition -> new TopicPartition(topic, partition))
.collect(Collectors.toList());

consumer.assign(partitions);
if (assignment.offsets != null) {
for (int i = 0; i < partitions.size(); i++) {
consumer.seek(partitions.get(i), assignment.offsets[i]);
}
}
} else {
consumer.subscribe(options.topic(),
new ConsumerPerfRebListener(joinTimeMs, joinStartMs, joinTimeMsInSingleRound));
}

// now start the benchmark
long currentTimeMs = System.currentTimeMillis();
Expand Down Expand Up @@ -260,6 +285,8 @@ protected static class ConsumerPerfOptions extends CommandDefaultOptions {
private final OptionSpec<Long> reportingIntervalOpt;
private final OptionSpec<String> dateFormatOpt;
private final OptionSpec<Void> hideHeaderOpt;
private final OptionSpec<String> partitionsOpt;
private final OptionSpec<String> offsetsOpt;

public ConsumerPerfOptions(String[] args) {
super(args);
Expand Down Expand Up @@ -332,6 +359,19 @@ public ConsumerPerfOptions(String[] args) {
.ofType(String.class)
.defaultsTo("yyyy-MM-dd HH:mm:ss:SSS");
hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats");
partitionsOpt = parser.accepts("partitions", "List of partitions to fetch data from. " +
"Multiple partitions can be provided as a comma-separated list. e.g. --partitions 0,1,2")
.withRequiredArg()
.describedAs("partitions")
.ofType(String.class);
offsetsOpt = parser.accepts("offsets", "List of offsets to start reading from. " +
"If specified, the list must have the same number of elements as the partitions list in the same sequence. " +
"If not specified, starts reading from earliest unless --from-latest is specified. " +
"Only works with --partitions option. ")
.withRequiredArg()
.describedAs("offsets")
.ofType(String.class);

try {
options = parser.parse(args);
} catch (OptionException e) {
Expand Down Expand Up @@ -402,5 +442,37 @@ public boolean hideHeader() {
public long recordFetchTimeoutMs() {
return options.valueOf(recordFetchTimeoutOpt);
}

public Optional<PartitionAndOffsets> partitionAssignment() {
if (!options.has(partitionsOpt))
return Optional.empty();

int[] partitions = Arrays.stream(options.valueOf(partitionsOpt).split(","))
.mapToInt(Integer::parseInt)
.toArray();
long[] offsets = null;

if (options.has(offsetsOpt)) {
offsets = Arrays.stream(options.valueOf(offsetsOpt).split(","))
.mapToLong(Long::parseLong)
.toArray();

if (partitions.length != offsets.length) {
throw new IllegalArgumentException("The number of partitions and offsets must be the same.");
}
}

return Optional.of(new PartitionAndOffsets(partitions, offsets));
}
}

protected static class PartitionAndOffsets {
int[] partitions;
long[] offsets;

public PartitionAndOffsets(int[] partitions, long[] offsets) {
this.partitions = partitions;
this.offsets = offsets;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.SimpleDateFormat;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ConsumerPerformanceTest {
Expand Down Expand Up @@ -156,6 +159,38 @@ public void testDefaultClientId() throws IOException {
assertEquals("perf-consumer-client", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
}

@Test
public void testPartitionAssignment() {
String[] args = new String[]{
"--broker-list", "localhost:9092",
"--topic", "test",
"--messages", "10",
"--partitions", "0,1,2",
"--offsets", "2,1,0",
};

ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args);
Optional<ConsumerPerformance.PartitionAndOffsets> assignment = config.partitionAssignment();

assertTrue(assignment.isPresent());
assertArrayEquals(new int[]{0, 1, 2}, assignment.get().partitions);
assertArrayEquals(new long[]{2, 1, 0}, assignment.get().offsets);
}

@Test
public void testPartitionAssignmentThrowsExceptionIfPartitionsAndOffsetsAreNotEqual() {
String[] args = new String[]{
"--broker-list", "localhost:9092",
"--topic", "test",
"--messages", "10",
"--partitions", "0,1,2",
"--offsets", "2,1",
};

ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args);
assertThrows(IllegalArgumentException.class, config::partitionAssignment);
}

private void testHeaderMatchContent(boolean detailed, int expectedOutputLineCount, Runnable runnable) {
String out = ToolsTestUtils.captureStandardOut(() -> {
ConsumerPerformance.printHeader(detailed);
Expand Down