Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ private void run() {
LOGGER.info("Starting perf test with config: {}", jsonStringify(config));
TimerUtil timer = new TimerUtil();

if (config.reset) {
LOGGER.info("Deleting all topics...");
int deleted = topicService.deleteTopics();
LOGGER.info("Deleted all topics ({} in total), took {} ms", deleted, timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));
}

LOGGER.info("Creating topics...");
List<Topic> topics = topicService.createTopics(config.topicsConfig());
LOGGER.info("Created {} topics, took {} ms", topics.size(), timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));
Expand All @@ -94,7 +100,7 @@ private void run() {
collectStats(Duration.ofMinutes(config.warmupDurationMinutes));
}

Result result = null;
Result result;
if (config.backlogDurationSeconds > 0) {
LOGGER.info("Pausing consumers for {} seconds to build up backlog...", config.backlogDurationSeconds);
consumerService.pause();
Expand Down Expand Up @@ -144,12 +150,13 @@ private void waitTopicsReady() {
boolean ready = false;
while (System.nanoTime() < start + TOPIC_READY_TIMEOUT_NANOS) {
long received = stats.toCumulativeStats().totalMessagesReceived;
LOGGER.info("Waiting for topics to be ready... sent: {}, received: {}", sent, received);
if (received >= sent) {
ready = true;
break;
}
try {
Thread.sleep(1000);
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,33 @@

package org.apache.kafka.tools.automq.perf;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
Expand All @@ -50,9 +56,11 @@ public class ConsumerService implements AutoCloseable {

private final Admin admin;
private final List<Group> groups = new ArrayList<>();
private final String groupSuffix;

public ConsumerService(String bootstrapServer) {
this.admin = Admin.create(Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer));
this.groupSuffix = new SimpleDateFormat("HHmmss").format(System.currentTimeMillis());
}

/**
Expand All @@ -75,7 +83,11 @@ public int createConsumers(List<Topic> topics, ConsumersConfig config) {
}

public void start(ConsumerCallback callback) {
groups.forEach(group -> group.start(callback));
CompletableFuture.allOf(
groups.stream()
.map(group -> group.start(callback))
.toArray(CompletableFuture[]::new)
).join();
}

public void pause() {
Expand All @@ -87,11 +99,12 @@ public void resume() {
}

public void resetOffset(long startMillis, long intervalMillis) {
for (Group group : groups) {
// TODO make this async
group.seek(startMillis);
startMillis += intervalMillis;
}
AtomicLong start = new AtomicLong(startMillis);
CompletableFuture.allOf(
groups.stream()
.map(group -> group.seek(start.getAndAdd(intervalMillis)))
.toArray(CompletableFuture[]::new)
).join();
}

@Override
Expand Down Expand Up @@ -132,19 +145,25 @@ private class Group implements AutoCloseable {

public Group(int index, int consumersPerGroup, List<Topic> topics, ConsumersConfig config) {
this.index = index;

Properties common = toProperties(config);
for (Topic topic : topics) {
List<Consumer> topicConsumers = new ArrayList<>();
for (int c = 0; c < consumersPerGroup; c++) {
Consumer consumer = createConsumer(topic, common);
Consumer consumer = newConsumer(topic, common);
topicConsumers.add(consumer);
}
consumers.put(topic, topicConsumers);
}
}

public void start(ConsumerCallback callback) {
public CompletableFuture<Void> start(ConsumerCallback callback) {
consumers().forEach(consumer -> consumer.start(callback));

// wait for all consumers to join the group
return CompletableFuture.allOf(consumers()
.map(Consumer::started)
.toArray(CompletableFuture[]::new));
}

public void pause() {
Expand All @@ -155,21 +174,17 @@ public void resume() {
consumers().forEach(Consumer::resume);
}

public void seek(long timestamp) {
try {
var offsetMap = admin.listOffsets(listOffsetsRequest(timestamp)).all().get();

List<AlterConsumerGroupOffsetsResult> futures = new ArrayList<>();
for (Topic topic : consumers.keySet()) {
var future = admin.alterConsumerGroupOffsets(groupId(topic), resetOffsetsRequest(topic, offsetMap));
futures.add(future);
}
for (AlterConsumerGroupOffsetsResult future : futures) {
future.all().get();
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
public CompletableFuture<Void> seek(long timestamp) {
return admin.listOffsets(listOffsetsRequest(timestamp))
.all()
.toCompletionStage()
.toCompletableFuture()
.thenCompose(offsetMap -> CompletableFuture.allOf(consumers.keySet().stream()
.map(topic -> admin.alterConsumerGroupOffsets(groupId(topic), resetOffsetsRequest(topic, offsetMap)))
.map(AlterConsumerGroupOffsetsResult::all)
.map(KafkaFuture::toCompletionStage)
.map(CompletionStage::toCompletableFuture)
.toArray(CompletableFuture[]::new)));
}

public int size() {
Expand All @@ -193,21 +208,19 @@ private Properties toProperties(ConsumersConfig config) {
return properties;
}

private Consumer createConsumer(Topic topic, Properties common) {
Properties properties = new Properties(common);
private Consumer newConsumer(Topic topic, Properties common) {
Properties properties = new Properties();
properties.putAll(common);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId(topic));

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(List.of(topic.name));
return new Consumer(consumer);
return new Consumer(properties, topic.name);
}

private Stream<Consumer> consumers() {
return consumers.values().stream().flatMap(List::stream);
}

private String groupId(Topic topic) {
return String.format("sub-%s-%03d", topic.name, index);
return String.format("sub-%s-%s-%03d", topic.name, groupSuffix, index);
}

private Map<TopicPartition, OffsetSpec> listOffsetsRequest(long timestamp) {
Expand Down Expand Up @@ -237,18 +250,25 @@ private static class Consumer {
private final KafkaConsumer<String, byte[]> consumer;
private final ExecutorService executor;
private Future<?> task;
private final CompletableFuture<Void> started = new CompletableFuture<>();
private boolean paused = false;
private volatile boolean closing = false;

public Consumer(KafkaConsumer<String, byte[]> consumer) {
this.consumer = consumer;
public Consumer(Properties properties, String topic) {
this.consumer = new KafkaConsumer<>(properties);
this.executor = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("perf-consumer", false));

consumer.subscribe(List.of(topic), subscribeListener());
}

public void start(ConsumerCallback callback) {
this.task = this.executor.submit(() -> pollRecords(consumer, callback));
}

public CompletableFuture<Void> started() {
return started;
}

public void pause() {
paused = true;
}
Expand All @@ -257,6 +277,21 @@ public void resume() {
paused = false;
}

private ConsumerRebalanceListener subscribeListener() {
return new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// do nothing
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// once partitions are assigned, it means the consumer has joined the group
started.complete(null);
}
};
}

private void pollRecords(KafkaConsumer<String, byte[]> consumer, ConsumerCallback callback) {
while (!closing) {
try {
Expand Down
Loading