Skip to content

Commit

Permalink
Update JavaDirectKerberizedKafkaWordCount.java
Browse files Browse the repository at this point in the history
  • Loading branch information
hddong committed Aug 13, 2019
1 parent cb62731 commit b9937f3
Showing 1 changed file with 40 additions and 40 deletions.
Expand Up @@ -56,52 +56,52 @@
*/

public final class JavaDirectKerberizedKafkaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.err.println(
"Usage: JavaDirectKerberizedKafkaWordCount <brokers> <groupId> <topics>\n" +
" <brokers> is a list of one or more Kafka brokers\n" +
" <groupId> is a consumer group name to consume from topics\n" +
" <topics> is a list of one or more kafka topics to consume from\n\n");
System.exit(1);
}
public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.err.println(
"Usage: JavaDirectKerberizedKafkaWordCount <brokers> <groupId> <topics>\n" +
" <brokers> is a list of one or more Kafka brokers\n" +
" <groupId> is a consumer group name to consume from topics\n" +
" <topics> is a list of one or more kafka topics to consume from\n\n");
System.exit(1);
}

StreamingExamples.setStreamingLogLevels();
StreamingExamples.setStreamingLogLevels();

String brokers = args[0];
String groupId = args[1];
String topics = args[2];
String brokers = args[0];
String groupId = args[1];
String topics = args[2];

// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKerberizedKafkaWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKerberizedKafkaWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
SecurityProtocol.SASL_PLAINTEXT.name);
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
SecurityProtocol.SASL_PLAINTEXT.name);

// Create direct kafka stream with brokers and topics
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
// Create direct kafka stream with brokers and topics
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

// Get the lines, split them into words, count the words and print
JavaDStream<String> lines = messages.map(ConsumerRecord::value);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
// Get the lines, split them into words, count the words and print
JavaDStream<String> lines = messages.map(ConsumerRecord::value);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();

// Start the computation
jssc.start();
jssc.awaitTermination();
}
// Start the computation
jssc.start();
jssc.awaitTermination();
}
}

0 comments on commit b9937f3

Please sign in to comment.