diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java b/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java index 5c1ec8dc35..9e1480e4f2 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java @@ -69,6 +69,12 @@ private void run() { LOGGER.info("Starting perf test with config: {}", jsonStringify(config)); TimerUtil timer = new TimerUtil(); + if (config.reset) { + LOGGER.info("Deleting all topics..."); + int deleted = topicService.deleteTopics(); + LOGGER.info("Deleted all topics ({} in total), took {} ms", deleted, timer.elapsedAndResetAs(TimeUnit.MILLISECONDS)); + } + LOGGER.info("Creating topics..."); List topics = topicService.createTopics(config.topicsConfig()); LOGGER.info("Created {} topics, took {} ms", topics.size(), timer.elapsedAndResetAs(TimeUnit.MILLISECONDS)); @@ -94,7 +100,7 @@ private void run() { collectStats(Duration.ofMinutes(config.warmupDurationMinutes)); } - Result result = null; + Result result; if (config.backlogDurationSeconds > 0) { LOGGER.info("Pausing consumers for {} seconds to build up backlog...", config.backlogDurationSeconds); consumerService.pause(); @@ -144,12 +150,13 @@ private void waitTopicsReady() { boolean ready = false; while (System.nanoTime() < start + TOPIC_READY_TIMEOUT_NANOS) { long received = stats.toCumulativeStats().totalMessagesReceived; + LOGGER.info("Waiting for topics to be ready... sent: {}, received: {}", sent, received); if (received >= sent) { ready = true; break; } try { - Thread.sleep(1000); + Thread.sleep(2000); } catch (InterruptedException e) { throw new RuntimeException(e); } diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java b/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java index b2459d0b36..99d5bef116 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java @@ -11,16 +11,20 @@ package org.apache.kafka.tools.automq.perf; +import java.text.SimpleDateFormat; import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.kafka.clients.admin.Admin; @@ -28,10 +32,12 @@ import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; @@ -50,9 +56,11 @@ public class ConsumerService implements AutoCloseable { private final Admin admin; private final List groups = new ArrayList<>(); + private final String groupSuffix; public ConsumerService(String bootstrapServer) { this.admin = Admin.create(Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer)); + this.groupSuffix = new SimpleDateFormat("HHmmss").format(System.currentTimeMillis()); } /** @@ -75,7 +83,11 @@ public int createConsumers(List topics, ConsumersConfig config) { } public void start(ConsumerCallback callback) { - groups.forEach(group -> group.start(callback)); + CompletableFuture.allOf( + groups.stream() + .map(group -> group.start(callback)) + .toArray(CompletableFuture[]::new) + ).join(); } public void pause() { @@ -87,11 +99,12 @@ public void resume() { } public void resetOffset(long startMillis, long intervalMillis) { - for (Group group : groups) { - // TODO make this async - group.seek(startMillis); - startMillis += intervalMillis; - } + AtomicLong start = new AtomicLong(startMillis); + CompletableFuture.allOf( + groups.stream() + .map(group -> group.seek(start.getAndAdd(intervalMillis))) + .toArray(CompletableFuture[]::new) + ).join(); } @Override @@ -132,19 +145,25 @@ private class Group implements AutoCloseable { public Group(int index, int consumersPerGroup, List topics, ConsumersConfig config) { this.index = index; + Properties common = toProperties(config); for (Topic topic : topics) { List topicConsumers = new ArrayList<>(); for (int c = 0; c < consumersPerGroup; c++) { - Consumer consumer = createConsumer(topic, common); + Consumer consumer = newConsumer(topic, common); topicConsumers.add(consumer); } consumers.put(topic, topicConsumers); } } - public void start(ConsumerCallback callback) { + public CompletableFuture start(ConsumerCallback callback) { consumers().forEach(consumer -> consumer.start(callback)); + + // wait for all consumers to join the group + return CompletableFuture.allOf(consumers() + .map(Consumer::started) + .toArray(CompletableFuture[]::new)); } public void pause() { @@ -155,21 +174,17 @@ public void resume() { consumers().forEach(Consumer::resume); } - public void seek(long timestamp) { - try { - var offsetMap = admin.listOffsets(listOffsetsRequest(timestamp)).all().get(); - - List futures = new ArrayList<>(); - for (Topic topic : consumers.keySet()) { - var future = admin.alterConsumerGroupOffsets(groupId(topic), resetOffsetsRequest(topic, offsetMap)); - futures.add(future); - } - for (AlterConsumerGroupOffsetsResult future : futures) { - future.all().get(); - } - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } + public CompletableFuture seek(long timestamp) { + return admin.listOffsets(listOffsetsRequest(timestamp)) + .all() + .toCompletionStage() + .toCompletableFuture() + .thenCompose(offsetMap -> CompletableFuture.allOf(consumers.keySet().stream() + .map(topic -> admin.alterConsumerGroupOffsets(groupId(topic), resetOffsetsRequest(topic, offsetMap))) + .map(AlterConsumerGroupOffsetsResult::all) + .map(KafkaFuture::toCompletionStage) + .map(CompletionStage::toCompletableFuture) + .toArray(CompletableFuture[]::new))); } public int size() { @@ -193,13 +208,11 @@ private Properties toProperties(ConsumersConfig config) { return properties; } - private Consumer createConsumer(Topic topic, Properties common) { - Properties properties = new Properties(common); + private Consumer newConsumer(Topic topic, Properties common) { + Properties properties = new Properties(); + properties.putAll(common); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId(topic)); - - KafkaConsumer consumer = new KafkaConsumer<>(properties); - consumer.subscribe(List.of(topic.name)); - return new Consumer(consumer); + return new Consumer(properties, topic.name); } private Stream consumers() { @@ -207,7 +220,7 @@ private Stream consumers() { } private String groupId(Topic topic) { - return String.format("sub-%s-%03d", topic.name, index); + return String.format("sub-%s-%s-%03d", topic.name, groupSuffix, index); } private Map listOffsetsRequest(long timestamp) { @@ -237,18 +250,25 @@ private static class Consumer { private final KafkaConsumer consumer; private final ExecutorService executor; private Future task; + private final CompletableFuture started = new CompletableFuture<>(); private boolean paused = false; private volatile boolean closing = false; - public Consumer(KafkaConsumer consumer) { - this.consumer = consumer; + public Consumer(Properties properties, String topic) { + this.consumer = new KafkaConsumer<>(properties); this.executor = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("perf-consumer", false)); + + consumer.subscribe(List.of(topic), subscribeListener()); } public void start(ConsumerCallback callback) { this.task = this.executor.submit(() -> pollRecords(consumer, callback)); } + public CompletableFuture started() { + return started; + } + public void pause() { paused = true; } @@ -257,6 +277,21 @@ public void resume() { paused = false; } + private ConsumerRebalanceListener subscribeListener() { + return new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection partitions) { + // do nothing + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + // once partitions are assigned, it means the consumer has joined the group + started.complete(null); + } + }; + } + private void pollRecords(KafkaConsumer consumer, ConsumerCallback callback) { while (!closing) { try { diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java b/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java index 1d1b7d75ca..6f82acdcec 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java @@ -16,6 +16,8 @@ import java.util.Map; import net.sourceforge.argparse4j.ArgumentParsers; import net.sourceforge.argparse4j.helper.HelpScreenException; +import net.sourceforge.argparse4j.impl.type.ReflectArgumentType; +import net.sourceforge.argparse4j.inf.Argument; import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.ArgumentParserException; import net.sourceforge.argparse4j.inf.Namespace; @@ -24,11 +26,16 @@ import org.apache.kafka.tools.automq.perf.ProducerService.ProducersConfig; import org.apache.kafka.tools.automq.perf.TopicService.TopicsConfig; +import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; +import static org.apache.kafka.tools.automq.perf.PerfConfig.IntegerArgumentType.nonNegativeInteger; +import static org.apache.kafka.tools.automq.perf.PerfConfig.IntegerArgumentType.positiveInteger; + public class PerfConfig { public final String bootstrapServer; public final Map topicConfigs; public final Map producerConfigs; public final Map consumerConfigs; + public final boolean reset; public final String topicPrefix; public final int topics; public final int partitionsPerTopic; @@ -57,11 +64,13 @@ public PerfConfig(String[] args) { parser.handleError(e); Exit.exit(1); } + assert ns != null; bootstrapServer = ns.getString("bootstrapServer"); topicConfigs = parseConfigs(ns.getList("topicConfigs")); producerConfigs = parseConfigs(ns.getList("producerConfigs")); consumerConfigs = parseConfigs(ns.getList("consumerConfigs")); + reset = ns.getBoolean("reset"); topicPrefix = ns.getString("topicPrefix"); topics = ns.getInt("topics"); partitionsPerTopic = ns.getInt("partitionsPerTopic"); @@ -78,11 +87,9 @@ public PerfConfig(String[] args) { testDurationMinutes = ns.getInt("testDurationMinutes"); reportingIntervalSeconds = ns.getInt("reportingIntervalSeconds"); - // TODO add more checker - - if (backlogDurationSeconds <= producersPerTopic * groupStartDelaySeconds) { - throw new IllegalArgumentException(String.format("BACKLOG_DURATION_SECONDS(%d) should be greater than PRODUCERS_PER_TOPIC(%d) * GROUP_START_DELAY_SECONDS(%d)", - backlogDurationSeconds, producersPerTopic, groupStartDelaySeconds)); + if (backlogDurationSeconds < groupsPerTopic * groupStartDelaySeconds) { + throw new IllegalArgumentException(String.format("BACKLOG_DURATION_SECONDS(%d) should not be less than GROUPS_PER_TOPIC(%d) * GROUP_START_DELAY_SECONDS(%d)", + backlogDurationSeconds, groupsPerTopic, groupStartDelaySeconds)); } } @@ -116,6 +123,10 @@ public static ArgumentParser parser() { .dest("consumerConfigs") .metavar("CONSUMER_CONFIG") .help("The consumer configurations."); + parser.addArgument("--reset") + .action(storeTrue()) + .dest("reset") + .help("delete all topics before running the test."); parser.addArgument("-X", "--topic-prefix") .setDefault("test-topic") .type(String.class) @@ -124,37 +135,37 @@ public static ArgumentParser parser() { .help("The topic prefix."); parser.addArgument("-t", "--topics") .setDefault(1) - .type(Integer.class) + .type(positiveInteger()) .dest("topics") .metavar("TOPICS") .help("The number of topics."); parser.addArgument("-n", "--partitions-per-topic") .setDefault(1) - .type(Integer.class) + .type(positiveInteger()) .dest("partitionsPerTopic") .metavar("PARTITIONS_PER_TOPIC") .help("The number of partitions per topic."); parser.addArgument("-p", "--producers-per-topic") .setDefault(1) - .type(Integer.class) + .type(positiveInteger()) .dest("producersPerTopic") .metavar("PRODUCERS_PER_TOPIC") .help("The number of producers per topic."); parser.addArgument("-g", "--groups-per-topic") .setDefault(1) - .type(Integer.class) + .type(nonNegativeInteger()) .dest("groupsPerTopic") .metavar("GROUPS_PER_TOPIC") .help("The number of consumer groups per topic."); parser.addArgument("-c", "--consumers-per-group") .setDefault(1) - .type(Integer.class) + .type(positiveInteger()) .dest("consumersPerGroup") .metavar("CONSUMERS_PER_GROUP") .help("The number of consumers per group."); parser.addArgument("-s", "--record-size") .setDefault(1024) - .type(Integer.class) + .type(positiveInteger()) .dest("recordSize") .metavar("RECORD_SIZE") .help("The record size in bytes."); @@ -166,43 +177,43 @@ public static ArgumentParser parser() { .help("The ratio of random payloads. The value should be between 0.0 and 1.0."); parser.addArgument("-S", "--random-pool-size") .setDefault(1000) - .type(Integer.class) + .type(positiveInteger()) .dest("randomPoolSize") .metavar("RANDOM_POOL_SIZE") .help("The count of random payloads. Only used when random ratio is greater than 0.0."); parser.addArgument("-r", "--send-rate") .setDefault(1000) - .type(Integer.class) + .type(positiveInteger()) .dest("sendRate") .metavar("SEND_RATE") .help("The send rate in messages per second."); parser.addArgument("-b", "--backlog-duration") - .setDefault(500) - .type(Integer.class) + .setDefault(0) + .type(nonNegativeInteger()) .dest("backlogDurationSeconds") .metavar("BACKLOG_DURATION_SECONDS") - .help("The backlog duration in seconds. Should be greater than PRODUCERS_PER_TOPIC * GROUP_START_DELAY_SECONDS."); + .help("The backlog duration in seconds, and zero means no backlog. Should not be less than GROUPS_PER_TOPIC * GROUP_START_DELAY_SECONDS."); parser.addArgument("-G", "--group-start-delay") .setDefault(0) - .type(Integer.class) + .type(nonNegativeInteger()) .dest("groupStartDelaySeconds") .metavar("GROUP_START_DELAY_SECONDS") .help("The group start delay in seconds."); parser.addArgument("-w", "--warmup-duration") .setDefault(1) - .type(Integer.class) + .type(nonNegativeInteger()) .dest("warmupDurationMinutes") .metavar("WARMUP_DURATION_MINUTES") .help("The warmup duration in minutes."); parser.addArgument("-d", "--test-duration") .setDefault(5) - .type(Integer.class) + .type(positiveInteger()) .dest("testDurationMinutes") .metavar("TEST_DURATION_MINUTES") .help("The test duration in minutes."); parser.addArgument("-i", "--reporting-interval") .setDefault(1) - .type(Integer.class) + .type(positiveInteger()) .dest("reportingIntervalSeconds") .metavar("REPORTING_INTERVAL_SECONDS") .help("The reporting interval in seconds."); @@ -253,4 +264,40 @@ private Map parseConfigs(List configs) { } return map; } + + static class IntegerArgumentType extends ReflectArgumentType { + + private final IntegerValidator validator; + + public IntegerArgumentType(IntegerValidator validator) { + super(Integer.class); + this.validator = validator; + } + + @Override + public Integer convert(ArgumentParser parser, Argument arg, String value) throws ArgumentParserException { + Integer result = super.convert(parser, arg, value); + String message = validator.validate(result); + if (message != null) { + throw new ArgumentParserException(message, parser, arg); + } + return result; + } + + public static IntegerArgumentType nonNegativeInteger() { + return new IntegerArgumentType(value -> value < 0 ? "expected a non-negative integer, but got " + value : null); + } + + public static IntegerArgumentType positiveInteger() { + return new IntegerArgumentType(value -> value <= 0 ? "expected a positive integer, but got " + value : null); + } + } + + @FunctionalInterface + interface IntegerValidator { + /** + * Validate the given value. Return an error message if the value is invalid, otherwise return null. + */ + String validate(Integer value); + } } diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/perf/TopicService.java b/tools/src/main/java/org/apache/kafka/tools/automq/perf/TopicService.java index 7ee4c17cd4..42ec6a1367 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/perf/TopicService.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/perf/TopicService.java @@ -14,12 +14,14 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; @@ -52,6 +54,23 @@ public List createTopics(TopicsConfig config) { .collect(Collectors.toList()); } + /** + * Delete all topics except internal topics (starting with '__'). + */ + public int deleteTopics() { + ListTopicsResult result = admin.listTopics(); + try { + Set topics = result.names().get(); + topics.removeIf(name -> name.startsWith("__")); + admin.deleteTopics(topics).all().get(); + return topics.size(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException ignored) { + } + return 0; + } + /** * Wait until the topic is created, or throw an exception if it fails. * Note: It is okay if the topic already exists.