Skip to content

Commit

Permalink
feat: enhance datagen for use as a load generator (#3230)
Browse files Browse the repository at this point in the history
* feat: enhance datagen for use as a load generator

Resurrecting some ancient enhancements to datagen so that we can use it
to generate load:

- Add a flag to disable printing each row
- Add a flag to control the number of threads producing data
- Add a flag to control the total message rate (msgs/second) across all the
  threads. The rate limiting is implemented using a token bucket.

* review feedback

* Just use an executor

* typo
  • Loading branch information
rodesai committed Aug 22, 2019
1 parent 103c958 commit ddb970b
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 50 deletions.
185 changes: 151 additions & 34 deletions ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package io.confluent.ksql.datagen;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.RateLimiter;
import io.confluent.avro.random.generator.Generator;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.util.KsqlConfig;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
Expand All @@ -28,7 +30,14 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

public final class DataGen {

Expand All @@ -42,13 +51,13 @@ public static void main(final String[] args) {
System.err.println(exception.getMessage());
usage();
System.exit(1);
} catch (final Exception e) {
} catch (final Throwable e) {
e.printStackTrace();
System.exit(1);
}
}

static void run(final String... args) throws IOException {
static void run(final String... args) throws Throwable {
final Arguments arguments = new Arguments.Builder()
.parseArgs(args)
.build();
Expand All @@ -58,19 +67,56 @@ static void run(final String... args) throws IOException {
return;
}

final Generator generator = new Generator(arguments.schemaFile, new Random());
final Properties props = getProperties(arguments);
final DataGenProducer dataProducer = ProducerFactory
.getProducer(arguments.keyFormat, arguments.valueFormat, props);

dataProducer.populateTopic(
props,
generator,
arguments.topicName,
arguments.keyName,
arguments.iterations,
arguments.maxInterval
final Optional<RateLimiter> rateLimiter = arguments.msgRate != -1
? Optional.of(RateLimiter.create(arguments.msgRate)) : Optional.empty();

final Executor executor = Executors.newFixedThreadPool(
arguments.numThreads,
r -> {
final Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
}
);
final CompletionService<Void> service = new ExecutorCompletionService<>(executor);

for (int i = 0; i < arguments.numThreads; i++) {
service.submit(getProducerTask(arguments, dataProducer, props, rateLimiter));
}
for (int i = 0; i < arguments.numThreads; i++) {
try {
service.take().get();
} catch (final InterruptedException e) {
System.err.println("Interrupted waiting for threads to exit.");
System.exit(1);
} catch (final ExecutionException e) {
throw e.getCause();
}
}
}

private static Callable<Void> getProducerTask(
final Arguments arguments,
final DataGenProducer dataProducer,
final Properties props,
final Optional<RateLimiter> rateLimiter) throws IOException {
final Generator generator = new Generator(arguments.schemaFile.get(), new Random());
return () -> {
dataProducer.populateTopic(
props,
generator,
arguments.topicName,
arguments.keyName,
arguments.iterations,
arguments.maxInterval,
arguments.printRows,
rateLimiter
);
return null;
};
}

static Properties getProperties(final Arguments arguments) throws IOException {
Expand Down Expand Up @@ -105,15 +151,18 @@ private static void usage() {
+ "key=<name of key column> " + newLine
+ "[iterations=<number of rows> (defaults to 1,000,000)] " + newLine
+ "[maxInterval=<Max time in ms between rows> (defaults to 500)] " + newLine
+ "[propertiesFile=<file specifying Kafka client properties>]" + newLine
+ "[propertiesFile=<file specifying Kafka client properties>] " + newLine
+ "[nThreads=<number of producer threads to start>] " + newLine
+ "[msgRate=<rate to produce in msgs/second>] " + newLine
+ "[printRows=<true|false>]" + newLine
);
}

static class Arguments {

private final boolean help;
private final String bootstrapServer;
private final InputStream schemaFile;
private final Supplier<InputStream> schemaFile;
private final Format keyFormat;
private final Format valueFormat;
private final String topicName;
Expand All @@ -122,20 +171,26 @@ static class Arguments {
private final long maxInterval;
private final String schemaRegistryUrl;
private final InputStream propertiesFile;
private final int numThreads;
private final int msgRate;
private final boolean printRows;

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
Arguments(
final boolean help,
final String bootstrapServer,
final InputStream schemaFile,
final Supplier<InputStream> schemaFile,
final Format keyFormat,
final Format valueFormat,
final String topicName,
final String keyName,
final int iterations,
final long maxInterval,
final String schemaRegistryUrl,
final InputStream propertiesFile
final InputStream propertiesFile,
final int numThreads,
final int msgRate,
final boolean printRows
) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
this.help = help;
Expand All @@ -149,6 +204,9 @@ static class Arguments {
this.maxInterval = maxInterval;
this.schemaRegistryUrl = schemaRegistryUrl;
this.propertiesFile = propertiesFile;
this.numThreads = numThreads;
this.msgRate = msgRate;
this.printRows = printRows;
}

static class ArgumentParseException extends RuntimeException {
Expand All @@ -171,19 +229,22 @@ private static final class Builder {
.put("format", (builder, argVal) -> builder.valueFormat = parseFormat(argVal))
.put("topic", (builder, argVal) -> builder.topicName = argVal)
.put("key", (builder, argVal) -> builder.keyName = argVal)
.put("iterations", (builder, argVal) -> builder.iterations = parseIterations(argVal))
.put("iterations", (builder, argVal) -> builder.iterations = parseInt(argVal, 1))
.put("maxInterval",
(builder, argVal) -> builder.maxInterval = parseIterations(argVal))
(builder, argVal) -> builder.maxInterval = parseInt(argVal, 0))
.put("schemaRegistryUrl", (builder, argVal) -> builder.schemaRegistryUrl = argVal)
.put("propertiesFile",
(builder, argVal) -> builder.propertiesFile = toFileInputStream(argVal))
(builder, argVal) -> builder.propertiesFile = toFileInputStream(argVal).get())
.put("msgRate", (builder, argVal) -> builder.msgRate = parseInt(argVal, 1))
.put("nThreads", (builder, argVal) -> builder.numThreads = parseNumThreads(argVal))
.put("printRows", (builder, argVal) -> builder.printRows = parsePrintRows(argVal))
.build();

private Quickstart quickstart;

private boolean help;
private String bootstrapServer;
private InputStream schemaFile;
private Supplier<InputStream> schemaFile;
private Format keyFormat;
private Format valueFormat;
private String topicName;
Expand All @@ -192,6 +253,9 @@ private static final class Builder {
private long maxInterval;
private String schemaRegistryUrl;
private InputStream propertiesFile;
private int msgRate;
private int numThreads;
private boolean printRows;

private Builder() {
quickstart = null;
Expand All @@ -206,6 +270,9 @@ private Builder() {
maxInterval = -1;
schemaRegistryUrl = "http://localhost:8081";
propertiesFile = null;
msgRate = -1;
numThreads = 1;
printRows = true;
}

private enum Quickstart {
Expand All @@ -228,8 +295,8 @@ private enum Quickstart {
this.keyName = keyName;
}

public InputStream getSchemaFile() {
return getClass().getClassLoader().getResourceAsStream(schemaFileName);
public Supplier<InputStream> getSchemaFile() {
return () -> getClass().getClassLoader().getResourceAsStream(schemaFileName);
}

public String getTopicName(final Format format) {
Expand All @@ -251,7 +318,22 @@ public Format getValueFormat() {

Arguments build() {
if (help) {
return new Arguments(true, null, null, null, null,null, null, 0, -1, null, null);
return new Arguments(
true,
null,
null,
null,
null,
null,
null,
0,
-1,
null,
null,
1,
-1,
true
);
}

if (quickstart != null) {
Expand Down Expand Up @@ -282,7 +364,10 @@ Arguments build() {
iterations,
maxInterval,
schemaRegistryUrl,
propertiesFile
propertiesFile,
numThreads,
msgRate,
printRows
);
}

Expand Down Expand Up @@ -342,12 +427,14 @@ private void setArg(final String argName, final String argVal) {
handler.accept(this, argVal);
}

private static FileInputStream toFileInputStream(final String argVal) {
try {
return new FileInputStream(argVal);
} catch (final Exception e) {
throw new IllegalArgumentException("File not found: " + argVal, e);
}
private static Supplier<InputStream> toFileInputStream(final String argVal) {
return () -> {
try {
return new FileInputStream(argVal);
} catch (final FileNotFoundException e) {
throw new IllegalArgumentException("File not found: " + argVal, e);
}
};
}

private static Quickstart parseQuickStart(final String argValue) {
Expand Down Expand Up @@ -375,19 +462,49 @@ private static Format parseFormat(final String formatString) {
}
}

private static int parseIterations(final String iterationsString) {
private static int parseNumThreads(final String numThreadsString) {
try {
final int result = Integer.valueOf(numThreadsString, 10);
if (result < 0) {
throw new ArgumentParseException(String.format(
"Invalid number of threads in '%d'; must be a positive number",
result));
}
return result;
} catch (NumberFormatException e) {
throw new ArgumentParseException(String.format(
"Invalid number of threads in '%s'; must be a positive number",
numThreadsString));
}
}

private static boolean parsePrintRows(final String printRowsString) {
switch (printRowsString.toLowerCase()) {
case "false":
return false;
case "true":
return true;
default:
throw new ArgumentParseException(String.format(
"Invalid value for printRows in '%s'; must be true or false",
printRowsString
));
}
}

private static int parseInt(final String iterationsString, final int minValue) {
try {
final int result = Integer.valueOf(iterationsString, 10);
if (result <= 0) {
if (result < minValue) {
throw new ArgumentParseException(String.format(
"Invalid number of iterations in '%d'; must be a positive number",
result
"Invalid integer value '%d'; must be >= %d",
result, minValue
));
}
return Integer.valueOf(iterationsString, 10);
} catch (final NumberFormatException exception) {
throw new ArgumentParseException(String.format(
"Invalid number of iterations in '%s'; must be a valid base 10 integer",
"Invalid integer value '%s'; must be a valid base 10 integer",
iterationsString
));
}
Expand Down

0 comments on commit ddb970b

Please sign in to comment.