Skip to content

Commit

Permalink
Remove spark.streaming.kafka.maxRetries doc as it is no longer in the…
Browse files Browse the repository at this point in the history
… code; declare existing Kafka integration non-experimental
  • Loading branch information
srowen committed Oct 13, 2018
1 parent 6e34ce7 commit 3d44772
Show file tree
Hide file tree
Showing 7 changed files with 7 additions and 83 deletions.
10 changes: 0 additions & 10 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2029,16 +2029,6 @@ showDF(properties, numRows = 200, truncate = FALSE)
partition when using the new Kafka direct stream API.
</td>
</tr>
<tr>
<td><code>spark.streaming.kafka.maxRetries</code></td>
<td>1</td>
<td>
Maximum number of consecutive retries the driver will make in order to find
the latest offsets on the leader of each partition (a default value of 1
means that the driver will make a maximum of 2 attempts). Only applies to
the new Kafka direct stream API.
</td>
</tr>
<tr>
<td><code>spark.streaming.ui.retainedBatches</code></td>
<td>1000</td>
Expand Down
3 changes: 1 addition & 2 deletions docs/streaming-kafka-0-10-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ title: Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or
The Spark Streaming integration for Kafka 0.10 provides simple parallelism, 1:1 correspondence between Kafka
partitions and Spark partitions, and access to offsets and metadata. However, because the newer integration uses
the [new Kafka consumer API](https://kafka.apache.org/documentation.html#newconsumerapi) instead of the simple API,
there are notable differences in usage. This version of the integration is marked as experimental, so the API is
potentially subject to change.
there are notable differences in usage.

### Linking
For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,16 @@ import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.common.TopicPartition

import org.apache.spark.annotation.Experimental
import org.apache.spark.internal.Logging

/**
* :: Experimental ::
* Choice of how to create and configure underlying Kafka Consumers on driver and executors.
* See [[ConsumerStrategies]] to obtain instances.
* Kafka 0.10 consumers can require additional, sometimes complex, setup after object
* instantiation. This interface encapsulates that process, and allows it to be checkpointed.
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
abstract class ConsumerStrategy[K, V] {
/**
* Kafka <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
Expand Down Expand Up @@ -208,13 +205,10 @@ private case class Assign[K, V](
}

/**
* :: Experimental ::
* object for obtaining instances of [[ConsumerStrategy]]
* Object for obtaining instances of [[ConsumerStrategy]]
*/
@Experimental
object ConsumerStrategies {
/**
* :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
Expand All @@ -227,7 +221,6 @@ object ConsumerStrategies {
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
def Subscribe[K, V](
topics: Iterable[jl.String],
kafkaParams: collection.Map[String, Object],
Expand All @@ -239,7 +232,6 @@ object ConsumerStrategies {
}

/**
* :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
Expand All @@ -249,7 +241,6 @@ object ConsumerStrategies {
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def Subscribe[K, V](
topics: Iterable[jl.String],
kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
Expand All @@ -260,7 +251,6 @@ object ConsumerStrategies {
}

/**
* :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
Expand All @@ -273,7 +263,6 @@ object ConsumerStrategies {
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
def Subscribe[K, V](
topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object],
Expand All @@ -282,7 +271,6 @@ object ConsumerStrategies {
}

/**
* :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
Expand All @@ -292,14 +280,13 @@ object ConsumerStrategies {
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def Subscribe[K, V](
topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
new Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, jl.Long]())
}

/** :: Experimental ::
/**
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topics existing at the time of check.
* @param pattern pattern to subscribe to
Expand All @@ -313,7 +300,6 @@ object ConsumerStrategies {
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
def SubscribePattern[K, V](
pattern: ju.regex.Pattern,
kafkaParams: collection.Map[String, Object],
Expand All @@ -324,7 +310,7 @@ object ConsumerStrategies {
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
}

/** :: Experimental ::
/**
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topics existing at the time of check.
* @param pattern pattern to subscribe to
Expand All @@ -335,7 +321,6 @@ object ConsumerStrategies {
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def SubscribePattern[K, V](
pattern: ju.regex.Pattern,
kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
Expand All @@ -345,7 +330,7 @@ object ConsumerStrategies {
ju.Collections.emptyMap[TopicPartition, jl.Long]())
}

/** :: Experimental ::
/**
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topics existing at the time of check.
* @param pattern pattern to subscribe to
Expand All @@ -359,15 +344,14 @@ object ConsumerStrategies {
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
def SubscribePattern[K, V](
pattern: ju.regex.Pattern,
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
new SubscribePattern[K, V](pattern, kafkaParams, offsets)
}

/** :: Experimental ::
/**
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topics existing at the time of check.
* @param pattern pattern to subscribe to
Expand All @@ -378,7 +362,6 @@ object ConsumerStrategies {
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def SubscribePattern[K, V](
pattern: ju.regex.Pattern,
kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
Expand All @@ -389,7 +372,6 @@ object ConsumerStrategies {
}

/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
Expand All @@ -402,7 +384,6 @@ object ConsumerStrategies {
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
def Assign[K, V](
topicPartitions: Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object],
Expand All @@ -414,7 +395,6 @@ object ConsumerStrategies {
}

/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
Expand All @@ -424,7 +404,6 @@ object ConsumerStrategies {
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def Assign[K, V](
topicPartitions: Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
Expand All @@ -435,7 +414,6 @@ object ConsumerStrategies {
}

/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
Expand All @@ -448,7 +426,6 @@ object ConsumerStrategies {
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
def Assign[K, V](
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object],
Expand All @@ -457,7 +434,6 @@ object ConsumerStrategies {
}

/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
Expand All @@ -467,7 +443,6 @@ object ConsumerStrategies {
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def Assign[K, V](
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition

import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{ JavaRDD, JavaSparkContext }
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
Expand All @@ -32,13 +31,10 @@ import org.apache.spark.streaming.api.java.{ JavaInputDStream, JavaStreamingCont
import org.apache.spark.streaming.dstream._

/**
* :: Experimental ::
* object for constructing Kafka streams and RDDs
*/
@Experimental
object KafkaUtils extends Logging {
/**
* :: Experimental ::
* Scala constructor for a batch-oriented interface for consuming from Kafka.
* Starting and ending offsets are specified in advance,
* so that you can control exactly-once semantics.
Expand All @@ -52,7 +48,6 @@ object KafkaUtils extends Logging {
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
def createRDD[K, V](
sc: SparkContext,
kafkaParams: ju.Map[String, Object],
Expand All @@ -75,7 +70,6 @@ object KafkaUtils extends Logging {
}

/**
* :: Experimental ::
* Java constructor for a batch-oriented interface for consuming from Kafka.
* Starting and ending offsets are specified in advance,
* so that you can control exactly-once semantics.
Expand All @@ -89,7 +83,6 @@ object KafkaUtils extends Logging {
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
def createRDD[K, V](
jsc: JavaSparkContext,
kafkaParams: ju.Map[String, Object],
Expand All @@ -101,7 +94,6 @@ object KafkaUtils extends Logging {
}

/**
* :: Experimental ::
* Scala constructor for a DStream where
* each given Kafka topic/partition corresponds to an RDD partition.
* The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
Expand All @@ -114,7 +106,6 @@ object KafkaUtils extends Logging {
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
Expand All @@ -125,7 +116,6 @@ object KafkaUtils extends Logging {
}

/**
* :: Experimental ::
* Scala constructor for a DStream where
* each given Kafka topic/partition corresponds to an RDD partition.
* @param locationStrategy In most cases, pass in [[LocationStrategies.PreferConsistent]],
Expand All @@ -137,7 +127,6 @@ object KafkaUtils extends Logging {
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
Expand All @@ -148,7 +137,6 @@ object KafkaUtils extends Logging {
}

/**
* :: Experimental ::
* Java constructor for a DStream where
* each given Kafka topic/partition corresponds to an RDD partition.
* @param locationStrategy In most cases, pass in [[LocationStrategies.PreferConsistent]],
Expand All @@ -158,7 +146,6 @@ object KafkaUtils extends Logging {
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
def createDirectStream[K, V](
jssc: JavaStreamingContext,
locationStrategy: LocationStrategy,
Expand All @@ -170,7 +157,6 @@ object KafkaUtils extends Logging {
}

/**
* :: Experimental ::
* Java constructor for a DStream where
* each given Kafka topic/partition corresponds to an RDD partition.
* @param locationStrategy In most cases, pass in [[LocationStrategies.PreferConsistent]],
Expand All @@ -182,7 +168,6 @@ object KafkaUtils extends Logging {
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
def createDirectStream[K, V](
jssc: JavaStreamingContext,
locationStrategy: LocationStrategy,
Expand Down

0 comments on commit 3d44772

Please sign in to comment.