Skip to content

Commit

Permalink
Minor updates to VerboseProducer
Browse files Browse the repository at this point in the history
  • Loading branch information
Geoff Anderson committed May 29, 2015
1 parent 2777712 commit 8b4b1f2
Showing 1 changed file with 16 additions and 10 deletions.
26 changes: 16 additions & 10 deletions core/src/main/scala/kafka/tools/VerboseProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
import joptsimple.OptionSpecBuilder;
import kafka.utils.CommandLineUtils;

/**
* Primarily intended for use with system testing, this producer produces a fixed number of
* increasing integers and prints metadata in the form of JSON to stdout for each failed or
* successful produce attempt.
*/
public class VerboseProducer {
OptionParser commandLineParser;
Map<String, OptionSpec<?>> commandLineOptions = new HashMap<String, OptionSpec<?>>();
Expand Down Expand Up @@ -51,15 +56,11 @@ private void configureParser() {
commandLineOptions.put("broker-list", brokerListOpt);


ArgumentAcceptingOptionSpec<String> numMessagesOpt = commandLineParser.accepts("num-messages", "REQUIRED: The number of messages to produce.")
ArgumentAcceptingOptionSpec<Integer> numMessagesOpt = commandLineParser.accepts("num-messages", "REQUIRED: The number of messages to produce.")
.withRequiredArg()
.describedAs("num-messages")
.ofType(String.class);
commandLineOptions.put("num-messages", numMessagesOpt);

// syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
// val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue()

.ofType(Integer.class);
commandLineOptions.put("num-messages", numMessagesOpt);

OptionSpecBuilder helpOpt
= commandLineParser.accepts("help", "Print this message.");
Expand All @@ -84,7 +85,7 @@ public void parseCommandLineArgs(String[] args) throws IOException {
}
checkRequiredArgs(commandLineParser, options, requiredArgs);

this.numMessages = Integer.parseInt((String) options.valueOf("num-messages"));
this.numMessages = (Integer) options.valueOf("num-messages");
this.topic = (String) options.valueOf("topic");

producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf("broker-list"));
Expand Down Expand Up @@ -115,7 +116,12 @@ private static void checkRequiredArgs(
*/
public void send(String value) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, value);
producer.send(record, new PrintInfoCallback(value));
try {
producer.send(record, new PrintInfoCallback(value));
}
catch (Exception e) {
System.out.println(errorString(e, value));
}
}

/** Need to close the producer to flush any remaining messages if we're in async mode. */
Expand Down Expand Up @@ -176,4 +182,4 @@ public static void main(String[] args) throws IOException {

producer.close();
}
}
}

0 comments on commit 8b4b1f2

Please sign in to comment.