Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

KAFKA-808 Migration tool internal queue between consumer and producer…

… threads should be configurable; reviewed by Jun Rao
  • Loading branch information...
commit 7fd9268f7b89afa2e41b526f1f4d7bfc90a519cf 1 parent 3b3fb7f
@nehanarkhede nehanarkhede authored
Showing with 9 additions and 1 deletion.
  1. +9 −1 core/src/main/scala/kafka/tools/KafkaMigrationTool.java
View
10 core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -134,6 +134,13 @@ public static void main(String[] args) throws InterruptedException, IOException
.describedAs("Java regex (String)")
.ofType(String.class);
+ ArgumentAcceptingOptionSpec<Integer> queueSizeOpt
+ = parser.accepts("queue.size", "Number of messages that are buffered between the 0.7 consumer and 0.8 producer")
+ .withRequiredArg()
+ .describedAs("Queue size in terms of number of messages")
+ .ofType(Integer.class)
+ .defaultsTo(10000);
+
OptionSpecBuilder helpOpt
= parser.accepts("help", "Print this message.");
@@ -212,7 +219,8 @@ public static void main(String[] args) throws InterruptedException, IOException
kafkaProducerProperties_08.load(new FileInputStream(producerConfigFile_08));
kafkaProducerProperties_08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
// create a producer channel instead
- ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<String, byte[]>>(numProducers);
+ int queueSize = options.valueOf(queueSizeOpt);
+ ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<String, byte[]>>(queueSize);
int threadId = 0;
Runtime.getRuntime().addShutdownHook(new Thread() {
Please sign in to comment.
Something went wrong with that request. Please try again.