From 72ffc9259db3285960898a4f6b8444c1c8dddcf1 Mon Sep 17 00:00:00 2001 From: cluo <0512lc@163.com> Date: Thu, 5 Jul 2018 16:30:31 +0800 Subject: [PATCH 1/3] Update the JavaDirectKafkaWordCount example to support the new API of Kafka --- .../streaming/JavaDirectKafkaWordCount.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java index b6b163fa8b2cd..a8155eef77085 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java @@ -26,8 +26,10 @@ import scala.Tuple2; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.SparkConf; import org.apache.spark.streaming.api.java.*; import org.apache.spark.streaming.kafka010.ConsumerStrategies; @@ -50,17 +52,19 @@ public final class JavaDirectKafkaWordCount { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws Exception { - if (args.length < 2) { - System.err.println("Usage: JavaDirectKafkaWordCount \n" + - " is a list of one or more Kafka brokers\n" + - " is a list of one or more kafka topics to consume from\n\n"); + if (args.length < 3) { + System.err.println("Usage: JavaDirectKafkaWordCount \n" + + " is a list of one or more Kafka brokers\n" + + " is a consumer group name to consume from topics\n" + + " is a list of one or more kafka topics to consume from\n\n"); System.exit(1); } StreamingExamples.setStreamingLogLevels(); String brokers = args[0]; - String topics = args[1]; + String groupId = args[1]; + String topics = args[2]; // Create context with a 2 seconds batch interval SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount"); @@ -68,7 +72,10 @@ public static void main(String[] args) throws Exception { Set topicsSet = new HashSet<>(Arrays.asList(topics.split(","))); Map kafkaParams = new HashMap<>(); - kafkaParams.put("metadata.broker.list", brokers); + kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // Create direct kafka stream with brokers and topics JavaInputDStream> messages = KafkaUtils.createDirectStream( From 55ed0e5a3420e8accd122724b00f03cf94f9728e Mon Sep 17 00:00:00 2001 From: cluo <0512lc@163.com> Date: Thu, 5 Jul 2018 17:42:39 +0800 Subject: [PATCH 2/3] review fixes --- .../spark/examples/streaming/JavaDirectKafkaWordCount.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java index a8155eef77085..424321d9722e6 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java @@ -28,8 +28,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; - import org.apache.kafka.common.serialization.StringDeserializer; + import org.apache.spark.SparkConf; import org.apache.spark.streaming.api.java.*; import org.apache.spark.streaming.kafka010.ConsumerStrategies; @@ -53,7 +53,7 @@ public final class JavaDirectKafkaWordCount { public static void main(String[] args) throws Exception { if (args.length < 3) { - System.err.println("Usage: JavaDirectKafkaWordCount \n" + + System.err.println("Usage: JavaDirectKafkaWordCount \n" + " is a list of one or more Kafka brokers\n" + " is a consumer group name to consume from topics\n" + " is a list of one or more kafka topics to consume from\n\n"); From c1e42cb692e061c39278c7e1c86b4496f54a499a Mon Sep 17 00:00:00 2001 From: cluo <0512lc@163.com> Date: Thu, 5 Jul 2018 18:32:45 +0800 Subject: [PATCH 3/3] review fixes --- .../spark/examples/streaming/JavaDirectKafkaWordCount.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java index 424321d9722e6..748bf58f30350 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java @@ -39,13 +39,14 @@ /** * Consumes messages from one or more topics in Kafka and does wordcount. - * Usage: JavaDirectKafkaWordCount + * Usage: JavaDirectKafkaWordCount * is a list of one or more Kafka brokers + * is a consumer group name to consume from topics * is a list of one or more kafka topics to consume from * * Example: * $ bin/run-example streaming.JavaDirectKafkaWordCount broker1-host:port,broker2-host:port \ - * topic1,topic2 + * consumer-group topic1,topic2 */ public final class JavaDirectKafkaWordCount {