Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4964][Streaming][Kafka] More updates to Exactly-once Kafka stream #4384

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.streaming

import kafka.serializer.StringDecoder

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* Usage: DirectKafkaWordCount <brokers> <topics>
* <brokers> is a list of one or more Kafka brokers
* <topics> is a list of one or more kafka topics to consume from
*
* Example:
* $ bin/run-example streaming.KafkaWordCount broker1-host:port,broker2-host:port topic1,topic2
*/
object DirectKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: DirectKafkaWordCount <broker list> <topic>")
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

val Array(brokerList, topics) = args
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")

val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerList)
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()

ssc.start()
ssc.awaitTermination()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,13 @@ import org.apache.spark.streaming.dstream._
* @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
* starting point of the stream
* @param messageHandler function for translating each message into the desired type
* @param maxRetries maximum number of times in a row to retry getting leaders' offsets
*/
private[streaming]
class DirectKafkaInputDStream[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag,
U <: Decoder[K]: ClassTag,
T <: Decoder[V]: ClassTag,
R: ClassTag](
@transient ssc_ : StreamingContext,
val kafkaParams: Map[String, String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ object KafkaCluster {
extends ConsumerConfig(originalProps) {
val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
val hpa = hp.split(":")
if (hpa.size == 1) {
throw new SparkException(s"Broker not the in correct format of <host>:<port> [$brokers]")
}
(hpa(0), hpa(1).toInt)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,12 @@ import kafka.utils.VerifiableProperties
* Starting and ending offsets are specified in advance,
* so that you can control exactly-once semantics.
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
* configuration parameters</a>.
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
* @param batch Each KafkaRDDPartition in the batch corresponds to a
* range of offsets for a given Kafka topic/partition
* configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You removed the "not zookeeper servers" note here, but left it in for the other methods that take kafkaParams

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only reason we were writing "not zookeepers" is to make the difference with the earlier stream clear, for people who want to switch from the old one or the new. That applies to the public API. This is internal private API. I can add it back, no issues.

* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
* @param messageHandler function for translating each message into the desired type
*/
private[spark]
private[kafka]
class KafkaRDD[
K: ClassTag,
V: ClassTag,
Expand Down Expand Up @@ -183,7 +181,7 @@ class KafkaRDD[
}
}

private[spark]
private[kafka]
object KafkaRDD {
import KafkaCluster.LeaderOffset

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.Partition
* @param host preferred kafka host, i.e. the leader at the time the rdd was created
* @param port preferred kafka host's port
*/
private[spark]
private[kafka]
class KafkaRDDPartition(
val index: Int,
val topic: String,
Expand All @@ -36,24 +36,3 @@ class KafkaRDDPartition(
val host: String,
val port: Int
) extends Partition

private[spark]
object KafkaRDDPartition {
def apply(
index: Int,
topic: String,
partition: Int,
fromOffset: Long,
untilOffset: Long,
host: String,
port: Int
): KafkaRDDPartition = new KafkaRDDPartition(
index,
topic,
partition,
fromOffset,
untilOffset,
host,
port
)
}
Loading