Skip to content

Commit

Permalink
Added optional --topic-name to the Generator. Various tweaks to both
Browse files Browse the repository at this point in the history
utilities.

#27
#28
  • Loading branch information
julianharty committed Mar 12, 2018
1 parent 2ee54c4 commit b81d726
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 21 deletions.
12 changes: 7 additions & 5 deletions src/main/java/com/gslab/pepper/PepperBoxLoadConsumer.java
Expand Up @@ -78,7 +78,7 @@ private boolean TopicNameIsOk(String topicName) {
}
} else {
if (!TopicNameIsOk(topicInPropertiesFile)) {
LOGGER.warning("No valid topic name, found: " + topicInPropertiesFile);
LOGGER.severe("No valid topic name, found: " + topicInPropertiesFile);
System.exit(2);
}
perThreadTopic = topicInPropertiesFile + "." + topicId.toString();
Expand Down Expand Up @@ -154,9 +154,11 @@ public void run() {
long endTime = durationInMillis + System.currentTimeMillis();
int previousCount = -1;

String resultsFilename = "results-" + perThreadTopic + "-of-" + (offset + totalThreads) + ".csv";
// In the following, allow for the zero-based numbering.
String resultsFilename = "results-" + perThreadTopic +
".of[" + offset + ".." +(offset + (totalThreads - 1)) + "].csv";
// Create/open the results file and write the header row.
LOGGER.info("Creating File:" + resultsFilename);
LOGGER.info("Creating File: " + resultsFilename);
FileOutputStream f = new FileOutputStream(resultsFilename, true);
PrintStream p = new PrintStream(f);
p.println("batchReceived,messageGenerated,consumerLag,messageId,recordOffset,messageSize,recordTimestamp");
Expand Down Expand Up @@ -232,7 +234,7 @@ public static void main(String args[]) {
.withRequiredArg()
.describedAs("consumers")
.ofType(Integer.class);
ArgumentAcceptingOptionSpec<String> topicName = parser.accepts("topic-name", "REQUIRED: core topic name to read from.")
ArgumentAcceptingOptionSpec<String> topicName = parser.accepts("topic-name", "OPTIONAL: core topic name to read from.")
.withRequiredArg()
.describedAs("topic")
.ofType(String.class);
Expand All @@ -241,7 +243,7 @@ public static void main(String args[]) {
.describedAs("create a topic per thread")
.defaultsTo("NO")
.ofType(String.class);
ArgumentAcceptingOptionSpec<Integer> startingOffset = parser.accepts("starting-offset", "OPTIONAL: Starting count for separate topics, default 0.")
ArgumentAcceptingOptionSpec<Integer> startingOffset = parser.accepts("starting-offset", "OPTIONAL: Starting count for separate topics.")
.withOptionalArg().ofType(Integer.class).defaultsTo(Integer.valueOf(0), new Integer[0])
.describedAs("starting offset for the topic per thread")
;
Expand Down
65 changes: 49 additions & 16 deletions src/main/java/com/gslab/pepper/PepperBoxLoadGenerator.java
Expand Up @@ -11,7 +11,6 @@
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import kafka.server.KafkaConfig;
import kafka.utils.CommandLineUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand Down Expand Up @@ -49,30 +48,57 @@ public class PepperBoxLoadGenerator extends Thread {
private RateLimiter limiter;
private Iterator iterator = null;
private KafkaProducer<String, String> producer;
private String topic;
private static int offset;
private static String topic;
String perThreadTopic;
private long durationInMillis;
private long messageCount = 0;

private boolean TopicNameIsOk(String topicName) {
if (topicName != null && topicName.length() > 1) {
return true;
} else {
return false;
}
}

/**
* Start kafka load generator from input properties and schema
*
* @param threadIn the logical thread ID, starting with 0
* @param thread the logical thread ID, starting with 0
* @param schemaFile
* @param producerProps
* @param throughput
* @param duration
* @throws PepperBoxException
*/
PepperBoxLoadGenerator(Integer threadIn, String schemaFile, String producerProps, Integer throughput, Integer duration) throws PepperBoxException {
PepperBoxLoadGenerator(Integer thread, String schemaFile, String producerProps, Integer throughput, Integer duration) throws PepperBoxException {
Integer topicId = thread + offset;
Thread t = currentThread();
t.setName(threadIn.toString());
t.setName(thread.toString());

Properties inputProps = populateProducerProps(producerProps);

String perThreadTopic = inputProps.getProperty(ProducerKeys.KAFKA_TOPIC_CONFIG) + "." + threadIn.toString();
inputProps.setProperty(ProducerKeys.KAFKA_TOPIC_CONFIG, perThreadTopic);
LOGGER.log(Level.INFO, "Thread [" + threadIn.toString() + "] using topic [" +
inputProps.getProperty(ProducerKeys.KAFKA_TOPIC_CONFIG) + "]");
String topicInPropertiesFile = inputProps.getProperty(ProducerKeys.KAFKA_TOPIC_CONFIG);
TopicNameIsOk(topicInPropertiesFile);

if (TopicNameIsOk(topic)) {
// If the topic name is provided on the command-line, use it.
perThreadTopic = topic + "." + topicId.toString();
// Inform the user if topic name is in both the file and on the command line.
if (TopicNameIsOk(topicInPropertiesFile)) {
LOGGER.warning(String.format("Using topic=%s provided on command-line, not=%s found in file=%s",
topic, topicInPropertiesFile, producerProps));
}
} else {
if (!TopicNameIsOk(topicInPropertiesFile)) {
LOGGER.severe("No valid topic name, found: " + topicInPropertiesFile);
System.exit(2);
}
perThreadTopic = topicInPropertiesFile + "." + topicId.toString();
}

LOGGER.log(Level.INFO, "Thread [" + thread.toString() + "] using topic [" + perThreadTopic + "]");

createProducer(schemaFile, throughput, duration, inputProps);
}
Expand All @@ -89,6 +115,9 @@ public class PepperBoxLoadGenerator extends Thread {
PepperBoxLoadGenerator(String schemaFile, String producerProps, Integer throughput, Integer duration) throws PepperBoxException {

Properties inputProps = populateProducerProps(producerProps);
String topicInPropertiesFile = inputProps.getProperty(ProducerKeys.KAFKA_TOPIC_CONFIG);
TopicNameIsOk(topicInPropertiesFile);
perThreadTopic = topicInPropertiesFile;
createProducer(schemaFile, throughput, duration, inputProps);
}

Expand Down Expand Up @@ -165,7 +194,6 @@ private void createProducer(String schemaFile, Integer throughput, Integer durat
}
});

topic = inputProps.getProperty(ProducerKeys.KAFKA_TOPIC_CONFIG);
producer = new KafkaProducer<>(brokerProps);
}

Expand Down Expand Up @@ -234,15 +262,15 @@ public void run() {
while (endTime > System.currentTimeMillis()) {
sendMessage();
}
System.out.println("{status:testCompleted}, {thread:" + Thread.currentThread().getId() + "}, {topic:" + topic + "}, {messages:" + messageCount + "}");
System.out.println("{status:testCompleted}, {thread:" + Thread.currentThread().getId() + "}, {topic:" + perThreadTopic + "}, {messages:" + messageCount + "}");
producer.close();
}

public void sendMessage() {
limiter.acquire();
ProducerRecord<String, String> keyedMsg = new ProducerRecord<>(topic, iterator.next().toString());
ProducerRecord<String, String> keyedMsg = new ProducerRecord<>(perThreadTopic, iterator.next().toString());
if (++messageCount % 100 == 1) {
LOGGER.info("{status:interim}, {topic:" +topic + "}, {messages:" + messageCount + "}");
LOGGER.info("{status:interim}, {topic:" + perThreadTopic + "}, {messages:" + messageCount + "}");
}
producer.send(keyedMsg);
}
Expand Down Expand Up @@ -277,12 +305,16 @@ public static void main(String[] args) {
.withRequiredArg()
.describedAs("producers")
.ofType(Integer.class);
ArgumentAcceptingOptionSpec<String> topicName = parser.accepts("topic-name", "OPTIONAL: core topic name to write to.")
.withRequiredArg()
.describedAs("topic")
.ofType(String.class);
ArgumentAcceptingOptionSpec<String> aTopicPerThread = parser.accepts("per-thread-topics", "OPTIONAL: Create a separate topic per producer")
.withRequiredArg()
.describedAs("create a topic per thread")
.ofType(String.class)
.defaultsTo("NO");
ArgumentAcceptingOptionSpec<Integer> startingOffset = parser.accepts("starting-offset", "OPTIONAL: Starting count for separate topics, default 0")
ArgumentAcceptingOptionSpec<Integer> startingOffset = parser.accepts("starting-offset", "OPTIONAL: Starting count for separate topics.")
.withOptionalArg().ofType(Integer.class).defaultsTo(Integer.valueOf(0), new Integer[0])
.describedAs("starting offset for the topic per thread")
;
Expand All @@ -292,14 +324,15 @@ public static void main(String[] args) {
}
OptionSet options = parser.parse(args);
checkRequiredArgs(parser, options, schemaFile, producerConfig, throughput, duration, threadCount);
topic = options.valueOf(topicName);
offset = options.valueOf(startingOffset);
LOGGER.info("starting-offset: " + options.valueOf(startingOffset));
try {
int totalThreads = options.valueOf(threadCount);
for (int i = 0; i < totalThreads; i++) {
PepperBoxLoadGenerator jsonProducer;
if (options.valueOf(aTopicPerThread).equalsIgnoreCase("YES")) {
int topicId = i + options.valueOf(startingOffset);
jsonProducer = new PepperBoxLoadGenerator(topicId, options.valueOf(schemaFile), options.valueOf(producerConfig), options.valueOf(throughput), options.valueOf(duration));
jsonProducer = new PepperBoxLoadGenerator(i, options.valueOf(schemaFile), options.valueOf(producerConfig), options.valueOf(throughput), options.valueOf(duration));
} else {
jsonProducer = new PepperBoxLoadGenerator(options.valueOf(schemaFile), options.valueOf(producerConfig), options.valueOf(throughput), options.valueOf(duration));
}
Expand Down

0 comments on commit b81d726

Please sign in to comment.