From ddb970b4ad8e6b3dc85ee4d206857827e1fa86ce Mon Sep 17 00:00:00 2001 From: Rohan Date: Wed, 21 Aug 2019 23:50:35 -0700 Subject: [PATCH] feat: enhance datagen for use as a load generator (#3230) * feat: enhance datagen for use as a load generator Resurrecting some ancient enhancements to datagen so that we can use it to generate load: - Add a flag to disable printing each row - Add a flag to control the number of threads producing data - Add a flag to control the total message rate (msgs/second) across all the threads. The rate limiting is implemented using a token bucket. * review feedback * Just use an executor * typo --- .../io/confluent/ksql/datagen/DataGen.java | 185 ++++++++++++++---- .../ksql/datagen/DataGenProducer.java | 24 ++- .../ksql/datagen/DataGenFunctionalTest.java | 12 +- .../confluent/ksql/datagen/DataGenTest.java | 13 +- 4 files changed, 184 insertions(+), 50 deletions(-) diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java index 6adf47d04c5c..2141de344a3d 100644 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java @@ -16,10 +16,12 @@ package io.confluent.ksql.datagen; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.RateLimiter; import io.confluent.avro.random.generator.Generator; import io.confluent.ksql.serde.Format; import io.confluent.ksql.util.KsqlConfig; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; @@ -28,7 +30,14 @@ import java.util.Optional; import java.util.Properties; import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Executors; import java.util.function.BiConsumer; +import java.util.function.Supplier; public final class DataGen { @@ -42,13 +51,13 @@ public static void main(final String[] args) { System.err.println(exception.getMessage()); usage(); System.exit(1); - } catch (final Exception e) { + } catch (final Throwable e) { e.printStackTrace(); System.exit(1); } } - static void run(final String... args) throws IOException { + static void run(final String... args) throws Throwable { final Arguments arguments = new Arguments.Builder() .parseArgs(args) .build(); @@ -58,19 +67,56 @@ static void run(final String... args) throws IOException { return; } - final Generator generator = new Generator(arguments.schemaFile, new Random()); final Properties props = getProperties(arguments); final DataGenProducer dataProducer = ProducerFactory .getProducer(arguments.keyFormat, arguments.valueFormat, props); - - dataProducer.populateTopic( - props, - generator, - arguments.topicName, - arguments.keyName, - arguments.iterations, - arguments.maxInterval + final Optional rateLimiter = arguments.msgRate != -1 + ? Optional.of(RateLimiter.create(arguments.msgRate)) : Optional.empty(); + + final Executor executor = Executors.newFixedThreadPool( + arguments.numThreads, + r -> { + final Thread thread = new Thread(r); + thread.setDaemon(true); + return thread; + } ); + final CompletionService service = new ExecutorCompletionService<>(executor); + + for (int i = 0; i < arguments.numThreads; i++) { + service.submit(getProducerTask(arguments, dataProducer, props, rateLimiter)); + } + for (int i = 0; i < arguments.numThreads; i++) { + try { + service.take().get(); + } catch (final InterruptedException e) { + System.err.println("Interrupted waiting for threads to exit."); + System.exit(1); + } catch (final ExecutionException e) { + throw e.getCause(); + } + } + } + + private static Callable getProducerTask( + final Arguments arguments, + final DataGenProducer dataProducer, + final Properties props, + final Optional rateLimiter) throws IOException { + final Generator generator = new Generator(arguments.schemaFile.get(), new Random()); + return () -> { + dataProducer.populateTopic( + props, + generator, + arguments.topicName, + arguments.keyName, + arguments.iterations, + arguments.maxInterval, + arguments.printRows, + rateLimiter + ); + return null; + }; } static Properties getProperties(final Arguments arguments) throws IOException { @@ -105,7 +151,10 @@ private static void usage() { + "key= " + newLine + "[iterations= (defaults to 1,000,000)] " + newLine + "[maxInterval= (defaults to 500)] " + newLine - + "[propertiesFile=]" + newLine + + "[propertiesFile=] " + newLine + + "[nThreads=] " + newLine + + "[msgRate=] " + newLine + + "[printRows=]" + newLine ); } @@ -113,7 +162,7 @@ static class Arguments { private final boolean help; private final String bootstrapServer; - private final InputStream schemaFile; + private final Supplier schemaFile; private final Format keyFormat; private final Format valueFormat; private final String topicName; @@ -122,12 +171,15 @@ static class Arguments { private final long maxInterval; private final String schemaRegistryUrl; private final InputStream propertiesFile; + private final int numThreads; + private final int msgRate; + private final boolean printRows; // CHECKSTYLE_RULES.OFF: ParameterNumberCheck Arguments( final boolean help, final String bootstrapServer, - final InputStream schemaFile, + final Supplier schemaFile, final Format keyFormat, final Format valueFormat, final String topicName, @@ -135,7 +187,10 @@ static class Arguments { final int iterations, final long maxInterval, final String schemaRegistryUrl, - final InputStream propertiesFile + final InputStream propertiesFile, + final int numThreads, + final int msgRate, + final boolean printRows ) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck this.help = help; @@ -149,6 +204,9 @@ static class Arguments { this.maxInterval = maxInterval; this.schemaRegistryUrl = schemaRegistryUrl; this.propertiesFile = propertiesFile; + this.numThreads = numThreads; + this.msgRate = msgRate; + this.printRows = printRows; } static class ArgumentParseException extends RuntimeException { @@ -171,19 +229,22 @@ private static final class Builder { .put("format", (builder, argVal) -> builder.valueFormat = parseFormat(argVal)) .put("topic", (builder, argVal) -> builder.topicName = argVal) .put("key", (builder, argVal) -> builder.keyName = argVal) - .put("iterations", (builder, argVal) -> builder.iterations = parseIterations(argVal)) + .put("iterations", (builder, argVal) -> builder.iterations = parseInt(argVal, 1)) .put("maxInterval", - (builder, argVal) -> builder.maxInterval = parseIterations(argVal)) + (builder, argVal) -> builder.maxInterval = parseInt(argVal, 0)) .put("schemaRegistryUrl", (builder, argVal) -> builder.schemaRegistryUrl = argVal) .put("propertiesFile", - (builder, argVal) -> builder.propertiesFile = toFileInputStream(argVal)) + (builder, argVal) -> builder.propertiesFile = toFileInputStream(argVal).get()) + .put("msgRate", (builder, argVal) -> builder.msgRate = parseInt(argVal, 1)) + .put("nThreads", (builder, argVal) -> builder.numThreads = parseNumThreads(argVal)) + .put("printRows", (builder, argVal) -> builder.printRows = parsePrintRows(argVal)) .build(); private Quickstart quickstart; private boolean help; private String bootstrapServer; - private InputStream schemaFile; + private Supplier schemaFile; private Format keyFormat; private Format valueFormat; private String topicName; @@ -192,6 +253,9 @@ private static final class Builder { private long maxInterval; private String schemaRegistryUrl; private InputStream propertiesFile; + private int msgRate; + private int numThreads; + private boolean printRows; private Builder() { quickstart = null; @@ -206,6 +270,9 @@ private Builder() { maxInterval = -1; schemaRegistryUrl = "http://localhost:8081"; propertiesFile = null; + msgRate = -1; + numThreads = 1; + printRows = true; } private enum Quickstart { @@ -228,8 +295,8 @@ private enum Quickstart { this.keyName = keyName; } - public InputStream getSchemaFile() { - return getClass().getClassLoader().getResourceAsStream(schemaFileName); + public Supplier getSchemaFile() { + return () -> getClass().getClassLoader().getResourceAsStream(schemaFileName); } public String getTopicName(final Format format) { @@ -251,7 +318,22 @@ public Format getValueFormat() { Arguments build() { if (help) { - return new Arguments(true, null, null, null, null,null, null, 0, -1, null, null); + return new Arguments( + true, + null, + null, + null, + null, + null, + null, + 0, + -1, + null, + null, + 1, + -1, + true + ); } if (quickstart != null) { @@ -282,7 +364,10 @@ Arguments build() { iterations, maxInterval, schemaRegistryUrl, - propertiesFile + propertiesFile, + numThreads, + msgRate, + printRows ); } @@ -342,12 +427,14 @@ private void setArg(final String argName, final String argVal) { handler.accept(this, argVal); } - private static FileInputStream toFileInputStream(final String argVal) { - try { - return new FileInputStream(argVal); - } catch (final Exception e) { - throw new IllegalArgumentException("File not found: " + argVal, e); - } + private static Supplier toFileInputStream(final String argVal) { + return () -> { + try { + return new FileInputStream(argVal); + } catch (final FileNotFoundException e) { + throw new IllegalArgumentException("File not found: " + argVal, e); + } + }; } private static Quickstart parseQuickStart(final String argValue) { @@ -375,19 +462,49 @@ private static Format parseFormat(final String formatString) { } } - private static int parseIterations(final String iterationsString) { + private static int parseNumThreads(final String numThreadsString) { + try { + final int result = Integer.valueOf(numThreadsString, 10); + if (result < 0) { + throw new ArgumentParseException(String.format( + "Invalid number of threads in '%d'; must be a positive number", + result)); + } + return result; + } catch (NumberFormatException e) { + throw new ArgumentParseException(String.format( + "Invalid number of threads in '%s'; must be a positive number", + numThreadsString)); + } + } + + private static boolean parsePrintRows(final String printRowsString) { + switch (printRowsString.toLowerCase()) { + case "false": + return false; + case "true": + return true; + default: + throw new ArgumentParseException(String.format( + "Invalid value for printRows in '%s'; must be true or false", + printRowsString + )); + } + } + + private static int parseInt(final String iterationsString, final int minValue) { try { final int result = Integer.valueOf(iterationsString, 10); - if (result <= 0) { + if (result < minValue) { throw new ArgumentParseException(String.format( - "Invalid number of iterations in '%d'; must be a positive number", - result + "Invalid integer value '%d'; must be >= %d", + result, minValue )); } return Integer.valueOf(iterationsString, 10); } catch (final NumberFormatException exception) { throw new ArgumentParseException(String.format( - "Invalid number of iterations in '%s'; must be a valid base 10 integer", + "Invalid integer value '%s'; must be a valid base 10 integer", iterationsString )); } diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java index 68ecdeed0fff..d7cf72105d6b 100644 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java @@ -17,11 +17,13 @@ import static java.util.Objects.requireNonNull; +import com.google.common.util.concurrent.RateLimiter; import io.confluent.avro.random.generator.Generator; import io.confluent.ksql.GenericRow; import io.confluent.ksql.schema.ksql.PersistenceSchema; import io.confluent.ksql.util.Pair; import java.util.Objects; +import java.util.Optional; import java.util.Properties; import java.util.stream.Collectors; import org.apache.avro.Schema; @@ -35,7 +37,7 @@ public class DataGenProducer { - // Max 100 ms between messsages. + // Max 500 ms between messsages. public static final long INTER_MESSAGE_MAX_INTERVAL = 500; private final SerializerFactory keySerializerFactory; @@ -55,7 +57,9 @@ public void populateTopic( final String kafkaTopicName, final String key, final int messageCount, - final long maxInterval + final long maxInterval, + final boolean printRows, + final Optional rateLimiter ) { final Schema avroSchema = generator.schema(); if (avroSchema.getField(key) == null) { @@ -76,6 +80,7 @@ public void populateTopic( ); for (int i = 0; i < messageCount; i++) { + rateLimiter.ifPresent(RateLimiter::acquire); final Pair genericRowPair = rowGenerator.generateRow(); @@ -88,7 +93,8 @@ public void populateTopic( producer.send(producerRecord, new LoggingCallback(kafkaTopicName, genericRowPair.getLeft(), - genericRowPair.getRight())); + genericRowPair.getRight(), + printRows)); try { final long interval = maxInterval < 0 ? INTER_MESSAGE_MAX_INTERVAL : maxInterval; @@ -121,11 +127,17 @@ private static class LoggingCallback implements Callback { private final String topic; private final String key; private final String value; + private final boolean printOnSuccess; - LoggingCallback(final String topic, final Struct key, final GenericRow value) { + LoggingCallback( + final String topic, + final Struct key, + final GenericRow value, + final boolean printOnSuccess) { this.topic = topic; this.key = formatKey(key); this.value = Objects.toString(value); + this.printOnSuccess = printOnSuccess; } @Override @@ -138,7 +150,9 @@ public void onCompletion(final RecordMetadata metadata, final Exception e) { ); e.printStackTrace(System.err); } else { - System.out.println(key + " --> (" + value + ") ts:" + metadata.timestamp()); + if (printOnSuccess) { + System.out.println(key + " --> (" + value + ") ts:" + metadata.timestamp()); + } } } diff --git a/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenFunctionalTest.java b/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenFunctionalTest.java index 3e7b8cab46ae..b06097ca2bc3 100644 --- a/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenFunctionalTest.java +++ b/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenFunctionalTest.java @@ -68,7 +68,7 @@ public void setUp() { } @Test - public void shouldWorkWithoutAnyFormatSupplied() throws Exception { + public void shouldWorkWithoutAnyFormatSupplied() throws Throwable { // Given: final Map args = new HashMap<>(DEFAULT_ARGS); args.remove("key-format"); @@ -91,7 +91,7 @@ public void shouldWorkWithoutAnyFormatSupplied() throws Exception { } @Test - public void shouldProduceDataWithKafkaFormatKeys() throws Exception { + public void shouldProduceDataWithKafkaFormatKeys() throws Throwable { // When: runWithArgOverrides(ImmutableMap.of( "key-format", "kafka" @@ -110,7 +110,7 @@ public void shouldProduceDataWithKafkaFormatKeys() throws Exception { } @Test - public void shouldProduceDataWithJsonFormatKeys() throws Exception { + public void shouldProduceDataWithJsonFormatKeys() throws Throwable { // When: runWithArgOverrides(ImmutableMap.of( "key-format", "json" @@ -129,7 +129,7 @@ public void shouldProduceDataWithJsonFormatKeys() throws Exception { } @Test - public void shouldProduceDataWithKJsonFormatValues() throws Exception { + public void shouldProduceDataWithKJsonFormatValues() throws Throwable { // When: runWithArgOverrides(ImmutableMap.of( "value-format", "json" @@ -177,14 +177,14 @@ private static void assertJsonValues( } } - private void runWithArgOverrides(final Map additionalArgs) throws IOException { + private void runWithArgOverrides(final Map additionalArgs) throws Throwable { final Map args = new HashMap<>(DEFAULT_ARGS); args.putAll(additionalArgs); runWithExactArgs(args); } - private void runWithExactArgs(final Map args) throws IOException { + private void runWithExactArgs(final Map args) throws Throwable { args.put("topic", topicName); args.put("bootstrap-server", CLUSTER.bootstrapServers()); diff --git a/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenTest.java b/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenTest.java index 4a7f0493d5a2..56709d1e0b67 100644 --- a/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenTest.java +++ b/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenTest.java @@ -30,7 +30,7 @@ public class DataGenTest { public final ExpectedException expectedException = ExpectedException.none(); @Test(expected = DataGen.Arguments.ArgumentParseException.class) - public void shouldThrowOnUnknownFormat() throws Exception { + public void shouldThrowOnUnknownFormat() throws Throwable { DataGen.run( "format=wtf", "schema=./src/main/resources/purchase.avro", @@ -39,7 +39,7 @@ public void shouldThrowOnUnknownFormat() throws Exception { } @Test - public void shouldThrowIfSchemaFileDoesNotExist() throws Exception { + public void shouldThrowIfSchemaFileDoesNotExist() throws Throwable { expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage(containsString("File not found: you/won't/find/me/right?")); @@ -51,7 +51,7 @@ public void shouldThrowIfSchemaFileDoesNotExist() throws Exception { } @Test(expected = IllegalArgumentException.class) - public void shouldThrowIfKeyFieldDoesNotExist() throws Exception { + public void shouldThrowIfKeyFieldDoesNotExist() throws Throwable { DataGen.run( "key=not_a_field", "schema=./src/main/resources/purchase.avro", @@ -60,7 +60,7 @@ public void shouldThrowIfKeyFieldDoesNotExist() throws Exception { } @Test(expected = DataGen.Arguments.ArgumentParseException.class) - public void shouldThrowOnUnknownQuickStart() throws Exception { + public void shouldThrowOnUnknownQuickStart() throws Throwable { DataGen.run( "quickstart=wtf", "format=avro", @@ -80,7 +80,10 @@ public void shouldPassSchemaRegistryUrl() throws Exception { 0, 0L, "srUrl", - null + null, + 1, + -1, + true ); final Properties props = DataGen.getProperties(args);