From aa4a70b7daa9820d28b7d41793220c8ff430e737 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franc=CC=A7ois=20Garillot?= Date: Tue, 14 Jul 2015 16:53:03 +0200 Subject: [PATCH 01/15] [SPARK-8978][Streaming] Implements the DirectKafkaController --- .../streaming/kafka/DirectKafkaInputDStream.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 48a1933d92f85..3ccb9cfa61e5a 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -73,12 +73,19 @@ class DirectKafkaInputDStream[ protected val kc = new KafkaCluster(kafkaParams) - protected val maxMessagesPerPartition: Option[Long] = { - val ratePerSec = context.sparkContext.getConf.getInt( + private val ratePerSec: Int = context.sparkContext.getConf.getInt( "spark.streaming.kafka.maxRatePerPartition", 0) - if (ratePerSec > 0) { + protected def maxMessagesPerPartition: Option[Long] = { + val estimatedRate = rateController.map(_.getLatestRate().toInt).getOrElse(-1) + val numPartitions = currentOffsets.keys.size + val effectiveRatePerSec = if (estimatedRate > 0) { + (ratePerSec min (estimatedRate / numPartitions)) + } else { + ratePerSec + } + if (effectiveRatePerSec > 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 - Some((secsPerBatch * ratePerSec).toLong) + Some((secsPerBatch * effectiveRatePerSec).toLong) } else { None } From 19200f5f5fde877c6a36007d56e818d06dae5ee7 Mon Sep 17 00:00:00 2001 From: Dean Wampler Date: Mon, 3 Aug 2015 15:35:26 -0500 Subject: [PATCH 02/15] Removed usage of infix notation. Changed a private variable name to be more consistent with usage. --- .../streaming/kafka/DirectKafkaInputDStream.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 3ccb9cfa61e5a..f98cf0aa11942 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -73,19 +73,19 @@ class DirectKafkaInputDStream[ protected val kc = new KafkaCluster(kafkaParams) - private val ratePerSec: Int = context.sparkContext.getConf.getInt( + private val ratePerPartition: Int = context.sparkContext.getConf.getInt( "spark.streaming.kafka.maxRatePerPartition", 0) protected def maxMessagesPerPartition: Option[Long] = { val estimatedRate = rateController.map(_.getLatestRate().toInt).getOrElse(-1) val numPartitions = currentOffsets.keys.size - val effectiveRatePerSec = if (estimatedRate > 0) { - (ratePerSec min (estimatedRate / numPartitions)) + val effectiveRatePerPartition = if (estimatedRate > 0) { + Math.min(ratePerPartition, (estimatedRate / numPartitions)) } else { - ratePerSec + ratePerPartition } - if (effectiveRatePerSec > 0) { + if (effectiveRatePerPartition > 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 - Some((secsPerBatch * effectiveRatePerSec).toLong) + Some((secsPerBatch * effectiveRatePerPartition).toLong) } else { None } From 2795509e563bc5e73ac4fc01ee0a7b54cd36c8c1 Mon Sep 17 00:00:00 2001 From: Nilanjan Raychaudhuri Date: Tue, 4 Aug 2015 09:54:50 +0200 Subject: [PATCH 03/15] Added missing RateController --- .../kafka/DirectKafkaInputDStream.scala | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index f98cf0aa11942..dd37026ec7a89 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.kafka +import org.apache.spark.streaming.scheduler.rate.RateEstimator + import scala.annotation.tailrec import scala.collection.mutable import scala.reflect.ClassTag @@ -29,7 +31,7 @@ import org.apache.spark.{Logging, SparkException} import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset -import org.apache.spark.streaming.scheduler.StreamInputInfo +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} /** * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where @@ -71,6 +73,18 @@ class DirectKafkaInputDStream[ protected[streaming] override val checkpointData = new DirectKafkaInputDStreamCheckpointData + + /** + * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker. + */ + override protected[streaming] val rateController: Option[RateController] = { + if (RateController.isBackPressureEnabled(ssc.conf)) { + RateEstimator.create(ssc.conf).map { new DirectKafkaRateController(id, _) } + } else { + None + } + } + protected val kc = new KafkaCluster(kafkaParams) private val ratePerPartition: Int = context.sparkContext.getConf.getInt( @@ -184,4 +198,12 @@ class DirectKafkaInputDStream[ } } + /** + * A RateController that sends the new rate to receivers, via the receiver tracker. + */ + private[streaming] class DirectKafkaRateController(id: Int, estimator: RateEstimator) + extends RateController(id, estimator) { + override def publish(rate: Long): Unit = () + } + } From 51e78c626321933ea3542075829e794056cef0f6 Mon Sep 17 00:00:00 2001 From: Nilanjan Raychaudhuri Date: Tue, 4 Aug 2015 12:20:06 +0200 Subject: [PATCH 04/15] Rename and fix build failure --- .../streaming/kafka/DirectKafkaInputDStream.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index dd37026ec7a89..d8a316b72f1c9 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -17,8 +17,6 @@ package org.apache.spark.streaming.kafka -import org.apache.spark.streaming.scheduler.rate.RateEstimator - import scala.annotation.tailrec import scala.collection.mutable import scala.reflect.ClassTag @@ -32,6 +30,7 @@ import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator /** * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where @@ -79,7 +78,7 @@ class DirectKafkaInputDStream[ */ override protected[streaming] val rateController: Option[RateController] = { if (RateController.isBackPressureEnabled(ssc.conf)) { - RateEstimator.create(ssc.conf).map { new DirectKafkaRateController(id, _) } + Some(new DirectKafkaRateController(id, RateEstimator.create(ssc.conf, ssc_.graph.batchDuration))) } else { None } @@ -87,15 +86,15 @@ class DirectKafkaInputDStream[ protected val kc = new KafkaCluster(kafkaParams) - private val ratePerPartition: Int = context.sparkContext.getConf.getInt( + private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt( "spark.streaming.kafka.maxRatePerPartition", 0) protected def maxMessagesPerPartition: Option[Long] = { val estimatedRate = rateController.map(_.getLatestRate().toInt).getOrElse(-1) val numPartitions = currentOffsets.keys.size val effectiveRatePerPartition = if (estimatedRate > 0) { - Math.min(ratePerPartition, (estimatedRate / numPartitions)) + Math.min(maxRateLimitPerPartition, (estimatedRate / numPartitions)) } else { - ratePerPartition + maxRateLimitPerPartition } if (effectiveRatePerPartition > 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 From 393c580f17da40daf371c41955d9aa92658182a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franc=CC=A7ois=20Garillot?= Date: Tue, 14 Jul 2015 16:53:03 +0200 Subject: [PATCH 05/15] [SPARK-8978][Streaming] Implements the DirectKafkaRateController --- .../apache/spark/streaming/kafka/DirectKafkaInputDStream.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index d8a316b72f1c9..c49d9c194241f 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -78,7 +78,8 @@ class DirectKafkaInputDStream[ */ override protected[streaming] val rateController: Option[RateController] = { if (RateController.isBackPressureEnabled(ssc.conf)) { - Some(new DirectKafkaRateController(id, RateEstimator.create(ssc.conf, ssc_.graph.batchDuration))) + Some(new DirectKafkaRateController(id, + RateEstimator.create(ssc.conf, ssc_.graph.batchDuration))) } else { None } From 3110267eb5d09070a96e3284ec912962225d3148 Mon Sep 17 00:00:00 2001 From: Nilanjan Raychaudhuri Date: Tue, 4 Aug 2015 13:31:52 +0200 Subject: [PATCH 06/15] [SPARK-8978][Streaming] Implements the DirectKafkaRateController --- .../kafka/DirectKafkaInputDStream.scala | 71 +++++++++++++------ 1 file changed, 50 insertions(+), 21 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 48a1933d92f85..082d29dcbc3c1 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -29,7 +29,8 @@ import org.apache.spark.{Logging, SparkException} import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset -import org.apache.spark.streaming.scheduler.StreamInputInfo +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator /** * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where @@ -52,16 +53,16 @@ import org.apache.spark.streaming.scheduler.StreamInputInfo */ private[streaming] class DirectKafkaInputDStream[ - K: ClassTag, - V: ClassTag, - U <: Decoder[K]: ClassTag, - T <: Decoder[V]: ClassTag, - R: ClassTag]( - @transient ssc_ : StreamingContext, - val kafkaParams: Map[String, String], - val fromOffsets: Map[TopicAndPartition, Long], - messageHandler: MessageAndMetadata[K, V] => R -) extends InputDStream[R](ssc_) with Logging { +K: ClassTag, +V: ClassTag, +U <: Decoder[K]: ClassTag, +T <: Decoder[V]: ClassTag, +R: ClassTag]( + @transient ssc_ : StreamingContext, + val kafkaParams: Map[String, String], + val fromOffsets: Map[TopicAndPartition, Long], + messageHandler: MessageAndMetadata[K, V] => R + ) extends InputDStream[R](ssc_) with Logging { val maxRetries = context.sparkContext.getConf.getInt( "spark.streaming.kafka.maxRetries", 1) @@ -71,14 +72,34 @@ class DirectKafkaInputDStream[ protected[streaming] override val checkpointData = new DirectKafkaInputDStreamCheckpointData + + /** + * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker. + */ + override protected[streaming] val rateController: Option[RateController] = { + if (RateController.isBackPressureEnabled(ssc.conf)) { + Some(new DirectKafkaRateController(id, + RateEstimator.create(ssc.conf, ssc_.graph.batchDuration))) + } else { + None + } + } + protected val kc = new KafkaCluster(kafkaParams) - protected val maxMessagesPerPartition: Option[Long] = { - val ratePerSec = context.sparkContext.getConf.getInt( - "spark.streaming.kafka.maxRatePerPartition", 0) - if (ratePerSec > 0) { + private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt( + "spark.streaming.kafka.maxRatePerPartition", 0) + protected def maxMessagesPerPartition: Option[Long] = { + val estimatedRate = rateController.map(_.getLatestRate().toInt).getOrElse(-1) + val numPartitions = currentOffsets.keys.size + val effectiveRatePerPartition = if (estimatedRate > 0) { + Math.min(maxRateLimitPerPartition, (estimatedRate / numPartitions)) + } else { + maxRateLimitPerPartition + } + if (effectiveRatePerPartition > 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 - Some((secsPerBatch * ratePerSec).toLong) + Some((secsPerBatch * effectiveRatePerPartition).toLong) } else { None } @@ -106,7 +127,7 @@ class DirectKafkaInputDStream[ // limits the maximum number of messages per partition protected def clamp( - leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = { + leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = { maxMessagesPerPartition.map { mmp => leaderOffsets.map { case (tp, lo) => tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset)) @@ -170,11 +191,19 @@ class DirectKafkaInputDStream[ val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics)) batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) => - logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}") - generatedRDDs += t -> new KafkaRDD[K, V, U, T, R]( - context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler) + logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}") + generatedRDDs += t -> new KafkaRDD[K, V, U, T, R]( + context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler) } } } -} + /** + * A RateController that sends the new rate to receivers, via the receiver tracker. + */ + private[streaming] class DirectKafkaRateController(id: Int, estimator: RateEstimator) + extends RateController(id, estimator) { + override def publish(rate: Long): Unit = () + } + +} \ No newline at end of file From d04a288f8ce74e8974235b747357eec1c16b81d0 Mon Sep 17 00:00:00 2001 From: Nilanjan Raychaudhuri Date: Tue, 4 Aug 2015 17:50:36 +0200 Subject: [PATCH 07/15] adding test to make sure rate controller is used to calculate maxMessagesPerPartition --- .../kafka/DirectKafkaStreamSuite.scala | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 5b3c79444aa68..06940b8e86c68 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -20,6 +20,9 @@ package org.apache.spark.streaming.kafka import java.io.File import java.util.concurrent.atomic.AtomicLong +import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset +import org.apache.spark.streaming.scheduler.rate.RateEstimator + import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ @@ -350,6 +353,65 @@ class DirectKafkaStreamSuite ssc.stop() } + test("using rate controller"){ + val topic = "backpressure" + val topicPartition = TopicAndPartition(topic, 0) + kafkaTestUtils.createTopic(topic) + val kafkaParams = Map( + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, + "auto.offset.reset" -> "smallest" + ) + + val collectedData = new mutable.ArrayBuffer[Array[String]]() with mutable.SynchronizedBuffer[Array[String]] + // Send data to Kafka and wait for it to be received + def sendDataAndWaitForReceive(data: Seq[Int]) { + val strings = data.map { _.toString} + kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap) + eventually(timeout(10 seconds), interval(50 milliseconds)) { + assert(strings.forall { collectedData.flatten.contains }) + } + } + + val sparkConf = new SparkConf() + .setMaster("local[4]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.kafka.maxRatePerPartition", "100") + + // Setup the streaming context + ssc = new StreamingContext(sparkConf, Milliseconds(100)) + + val kafkaStream = withClue("Error creating direct stream") { + val kc = new KafkaCluster(kafkaParams) + val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) + val m = kc.getEarliestLeaderOffsets(Set(topicPartition)) + .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset)) + + new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( + ssc, kafkaParams, m, messageHandler) { + override protected[streaming] val rateController = Some(new DirectKafkaRateController(id, + new ConstantEstimator(60.0, 40.0, 20.0))) + } + } + // This is to collect the raw data received from Kafka + kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => + val data = rdd.map { _._2 }.collect() + collectedData += data + } + + ssc.start() + + // Send some data and wait for them to be received + sendDataAndWaitForReceive((1 to 100)) + + //assert that rate estimator values are used to determine maxMessagesPerPartition + assert(collectedData.exists(_.size == 10) === true) //maxRatePerPartition 100 * .1 secs + assert(collectedData.exists(_.size == 6) === true) //rate estimator 60.0 * .1 secs + assert(collectedData.exists(_.size == 4) === true) //rate estimator 40.0 * .1 secs + assert(collectedData.exists(_.size == 2) === true) //rate estimator 20.0 * .1 secs + + ssc.stop() + } + /** Get the generated offset ranges from the DirectKafkaStream */ private def getOffsetRanges[K, V]( kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = { @@ -381,3 +443,20 @@ object DirectKafkaStreamSuite { } } } + +private[streaming] class ConstantEstimator(rates: Double*) extends RateEstimator { + private var idx: Int = 0 + + private def nextRate(): Double = { + val rate = rates(idx) + idx = (idx + 1) % rates.size + rate + } + + def compute( + time: Long, + elements: Long, + processingDelay: Long, + schedulingDelay: Long): Option[Double] = Some(nextRate()) +} + From d3db1ea8ea4f151d65ce6ab839530c9be7d22649 Mon Sep 17 00:00:00 2001 From: Dean Wampler Date: Tue, 4 Aug 2015 12:30:03 -0500 Subject: [PATCH 08/15] Fixing stylecheck errors. --- .../streaming/kafka/DirectKafkaStreamSuite.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 06940b8e86c68..7ca94abaadb18 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -362,7 +362,8 @@ class DirectKafkaStreamSuite "auto.offset.reset" -> "smallest" ) - val collectedData = new mutable.ArrayBuffer[Array[String]]() with mutable.SynchronizedBuffer[Array[String]] + val collectedData = + new mutable.ArrayBuffer[Array[String]]() with mutable.SynchronizedBuffer[Array[String]] // Send data to Kafka and wait for it to be received def sendDataAndWaitForReceive(data: Seq[Int]) { val strings = data.map { _.toString} @@ -403,11 +404,11 @@ class DirectKafkaStreamSuite // Send some data and wait for them to be received sendDataAndWaitForReceive((1 to 100)) - //assert that rate estimator values are used to determine maxMessagesPerPartition - assert(collectedData.exists(_.size == 10) === true) //maxRatePerPartition 100 * .1 secs - assert(collectedData.exists(_.size == 6) === true) //rate estimator 60.0 * .1 secs - assert(collectedData.exists(_.size == 4) === true) //rate estimator 40.0 * .1 secs - assert(collectedData.exists(_.size == 2) === true) //rate estimator 20.0 * .1 secs + // Assert that rate estimator values are used to determine maxMessagesPerPartition + assert(collectedData.exists(_.size == 10) === true) // maxRatePerPartition 100 * .1 secs + assert(collectedData.exists(_.size == 6) === true) // rate estimator 60.0 * .1 secs + assert(collectedData.exists(_.size == 4) === true) // rate estimator 40.0 * .1 secs + assert(collectedData.exists(_.size == 2) === true) // rate estimator 20.0 * .1 secs ssc.stop() } From 9e69e373754c6b4b20dfd6dc310bb5051197860c Mon Sep 17 00:00:00 2001 From: Dean Wampler Date: Tue, 4 Aug 2015 14:47:39 -0500 Subject: [PATCH 09/15] Attempt to fix flakey test that fails in CI, but not locally :( --- .../apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 7ca94abaadb18..0cb1d34de9e6b 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -368,7 +368,7 @@ class DirectKafkaStreamSuite def sendDataAndWaitForReceive(data: Seq[Int]) { val strings = data.map { _.toString} kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap) - eventually(timeout(10 seconds), interval(50 milliseconds)) { + eventually(timeout(20 seconds), interval(50 milliseconds)) { assert(strings.forall { collectedData.flatten.contains }) } } @@ -402,7 +402,7 @@ class DirectKafkaStreamSuite ssc.start() // Send some data and wait for them to be received - sendDataAndWaitForReceive((1 to 100)) + sendDataAndWaitForReceive((1 to 400)) // Assert that rate estimator values are used to determine maxMessagesPerPartition assert(collectedData.exists(_.size == 10) === true) // maxRatePerPartition 100 * .1 secs From 63724789312513970c6c0653ba1ad69fd44f0473 Mon Sep 17 00:00:00 2001 From: Dean Wampler Date: Tue, 4 Aug 2015 16:01:32 -0500 Subject: [PATCH 10/15] Found a few ways to make this test more robust... --- .../streaming/kafka/DirectKafkaStreamSuite.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 0cb1d34de9e6b..440e7b0988473 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -374,7 +374,9 @@ class DirectKafkaStreamSuite } val sparkConf = new SparkConf() - .setMaster("local[4]") + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is necessary to make the test more predictable. + .setMaster("local[1]") .setAppName(this.getClass.getSimpleName) .set("spark.streaming.kafka.maxRatePerPartition", "100") @@ -404,11 +406,13 @@ class DirectKafkaStreamSuite // Send some data and wait for them to be received sendDataAndWaitForReceive((1 to 400)) + def dataToString: String = collectedData.map(_.mkString("[",",","]")).mkString("{",", ","}") + // Assert that rate estimator values are used to determine maxMessagesPerPartition - assert(collectedData.exists(_.size == 10) === true) // maxRatePerPartition 100 * .1 secs - assert(collectedData.exists(_.size == 6) === true) // rate estimator 60.0 * .1 secs - assert(collectedData.exists(_.size == 4) === true) // rate estimator 40.0 * .1 secs - assert(collectedData.exists(_.size == 2) === true) // rate estimator 20.0 * .1 secs + assert(collectedData.exists(_.size == 10), dataToString) // maxRatePerPartition 100 * .1 secs + assert(collectedData.exists(_.size == 6), dataToString) // rate estimator 60.0 * .1 secs + assert(collectedData.exists(_.size == 4), dataToString) // rate estimator 40.0 * .1 secs + assert(collectedData.exists(_.size == 2), dataToString) // rate estimator 20.0 * .1 secs ssc.stop() } From 9615320d491a5d64fcae67acc84f875ab47e25e5 Mon Sep 17 00:00:00 2001 From: Dean Wampler Date: Tue, 4 Aug 2015 16:11:59 -0500 Subject: [PATCH 11/15] Give me a break... --- .../apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 440e7b0988473..5f1a11d2c2a9a 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -406,7 +406,7 @@ class DirectKafkaStreamSuite // Send some data and wait for them to be received sendDataAndWaitForReceive((1 to 400)) - def dataToString: String = collectedData.map(_.mkString("[",",","]")).mkString("{",", ","}") + def dataToString: String = collectedData.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}") // Assert that rate estimator values are used to determine maxMessagesPerPartition assert(collectedData.exists(_.size == 10), dataToString) // maxRatePerPartition 100 * .1 secs From ce19d2a2f49682542df32067dfdb1a9862a7e50f Mon Sep 17 00:00:00 2001 From: Dean Wampler Date: Tue, 4 Aug 2015 16:38:36 -0500 Subject: [PATCH 12/15] Removing an unreliable assertion. --- .../apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 5f1a11d2c2a9a..ebfbbf007b8be 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -409,7 +409,6 @@ class DirectKafkaStreamSuite def dataToString: String = collectedData.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}") // Assert that rate estimator values are used to determine maxMessagesPerPartition - assert(collectedData.exists(_.size == 10), dataToString) // maxRatePerPartition 100 * .1 secs assert(collectedData.exists(_.size == 6), dataToString) // rate estimator 60.0 * .1 secs assert(collectedData.exists(_.size == 4), dataToString) // rate estimator 40.0 * .1 secs assert(collectedData.exists(_.size == 2), dataToString) // rate estimator 20.0 * .1 secs From e43f67854483ba0a1fa6288d56a72c6061eaf108 Mon Sep 17 00:00:00 2001 From: Nilanjan Raychaudhuri Date: Wed, 5 Aug 2015 10:49:56 +0200 Subject: [PATCH 13/15] fixing doc and nits --- .../spark/streaming/kafka/DirectKafkaInputDStream.scala | 2 +- .../apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index cf2b29013c196..e187dd6154dd7 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -199,7 +199,7 @@ class DirectKafkaInputDStream[ } /** - * A RateController that sends the new rate to receivers, via the receiver tracker. + * A RateController to retrieve the rate from RateEstimator. */ private[streaming] class DirectKafkaRateController(id: Int, estimator: RateEstimator) extends RateController(id, estimator) { diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index ebfbbf007b8be..297afcd111290 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -353,7 +353,7 @@ class DirectKafkaStreamSuite ssc.stop() } - test("using rate controller"){ + test("using rate controller") { val topic = "backpressure" val topicPartition = TopicAndPartition(topic, 0) kafkaTestUtils.createTopic(topic) @@ -367,7 +367,7 @@ class DirectKafkaStreamSuite // Send data to Kafka and wait for it to be received def sendDataAndWaitForReceive(data: Seq[Int]) { val strings = data.map { _.toString} - kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap) + kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1 }.toMap) eventually(timeout(20 seconds), interval(50 milliseconds)) { assert(strings.forall { collectedData.flatten.contains }) } From 648c8b1c560151add4100f21808cdc6530570018 Mon Sep 17 00:00:00 2001 From: Dean Wampler Date: Wed, 5 Aug 2015 10:14:17 -0500 Subject: [PATCH 14/15] Refactored rate controller test to be more predictable and run faster. --- .../kafka/DirectKafkaStreamSuite.scala | 72 ++++++++++--------- 1 file changed, 39 insertions(+), 33 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 297afcd111290..02225d5aa7cc5 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -362,26 +362,20 @@ class DirectKafkaStreamSuite "auto.offset.reset" -> "smallest" ) - val collectedData = - new mutable.ArrayBuffer[Array[String]]() with mutable.SynchronizedBuffer[Array[String]] - // Send data to Kafka and wait for it to be received - def sendDataAndWaitForReceive(data: Seq[Int]) { - val strings = data.map { _.toString} - kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1 }.toMap) - eventually(timeout(20 seconds), interval(50 milliseconds)) { - assert(strings.forall { collectedData.flatten.contains }) - } - } + val batchIntervalMilliseconds = 100 + val estimator = new ConstantEstimator(100) + val messageKeys = (1 to 200).map(_.toString) + val messages = messageKeys.map((_, 1)).toMap val sparkConf = new SparkConf() // Safe, even with streaming, because we're using the direct API. - // Using 1 core is necessary to make the test more predictable. + // Using 1 core is useful to make the test more predictable. .setMaster("local[1]") .setAppName(this.getClass.getSimpleName) .set("spark.streaming.kafka.maxRatePerPartition", "100") // Setup the streaming context - ssc = new StreamingContext(sparkConf, Milliseconds(100)) + ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds)) val kafkaStream = withClue("Error creating direct stream") { val kc = new KafkaCluster(kafkaParams) @@ -391,10 +385,18 @@ class DirectKafkaStreamSuite new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( ssc, kafkaParams, m, messageHandler) { - override protected[streaming] val rateController = Some(new DirectKafkaRateController(id, - new ConstantEstimator(60.0, 40.0, 20.0))) + override protected[streaming] val rateController = + Some(new DirectKafkaRateController(id, estimator)) } } + + val collectedData = + new mutable.ArrayBuffer[Array[String]]() with mutable.SynchronizedBuffer[Array[String]] + + // Used for assertion failure messages. + def dataToString: String = + collectedData.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}") + // This is to collect the raw data received from Kafka kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => val data = rdd.map { _._2 }.collect() @@ -403,15 +405,21 @@ class DirectKafkaStreamSuite ssc.start() - // Send some data and wait for them to be received - sendDataAndWaitForReceive((1 to 400)) - - def dataToString: String = collectedData.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}") - - // Assert that rate estimator values are used to determine maxMessagesPerPartition - assert(collectedData.exists(_.size == 6), dataToString) // rate estimator 60.0 * .1 secs - assert(collectedData.exists(_.size == 4), dataToString) // rate estimator 40.0 * .1 secs - assert(collectedData.exists(_.size == 2), dataToString) // rate estimator 20.0 * .1 secs + // Try different rate limits. + // Send data to Kafka and wait for arrays of data to appear matching the rate. + Seq(100, 50, 20).foreach { rate => + collectedData.clear() // Empty this buffer on each pass. + estimator.updateRate(rate) // Set a new rate. + // Expect blocks of data equal to "rate", scaled by the interval length in secs. + val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) + kafkaTestUtils.sendMessages(topic, messages) + eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) { + // Assert that rate estimator values are used to determine maxMessagesPerPartition. + // Funky "-" in message makes the complete assertion message read better. + assert(collectedData.exists(_.size == expectedSize), + s" - No arrays of size $expectedSize for rate $rate found in $dataToString") + } + } ssc.stop() } @@ -448,19 +456,17 @@ object DirectKafkaStreamSuite { } } -private[streaming] class ConstantEstimator(rates: Double*) extends RateEstimator { - private var idx: Int = 0 +private[streaming] class ConstantEstimator(@volatile private var rate: Long) + extends RateEstimator { - private def nextRate(): Double = { - val rate = rates(idx) - idx = (idx + 1) % rates.size - rate + def updateRate(newRate: Long): Unit = { + rate = newRate } def compute( - time: Long, - elements: Long, - processingDelay: Long, - schedulingDelay: Long): Option[Double] = Some(nextRate()) + time: Long, + elements: Long, + processingDelay: Long, + schedulingDelay: Long): Option[Double] = Some(rate) } From 50d1f218d240e4b697e74d8a2b0a2bbe0e4707b6 Mon Sep 17 00:00:00 2001 From: Nilanjan Raychaudhuri Date: Wed, 5 Aug 2015 23:31:36 +0200 Subject: [PATCH 15/15] Taking care of the remaining nits --- .../kafka/DirectKafkaInputDStream.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index e187dd6154dd7..8a177077775c6 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -90,16 +90,17 @@ class DirectKafkaInputDStream[ private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt( "spark.streaming.kafka.maxRatePerPartition", 0) protected def maxMessagesPerPartition: Option[Long] = { - val estimatedRate = rateController.map(_.getLatestRate().toInt).getOrElse(-1) + val estimatedRateLimit = rateController.map(_.getLatestRate().toInt) val numPartitions = currentOffsets.keys.size - val effectiveRatePerPartition = if (estimatedRate > 0) { - Math.min(maxRateLimitPerPartition, (estimatedRate / numPartitions)) - } else { - maxRateLimitPerPartition - } - if (effectiveRatePerPartition > 0) { + + val effectiveRateLimitPerPartition = estimatedRateLimit + .filter(_ > 0) + .map(limit => Math.min(maxRateLimitPerPartition, (limit / numPartitions))) + .getOrElse(maxRateLimitPerPartition) + + if (effectiveRateLimitPerPartition > 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 - Some((secsPerBatch * effectiveRatePerPartition).toLong) + Some((secsPerBatch * effectiveRateLimitPerPartition).toLong) } else { None }