From 2d11daa575696402423c25fb5eab74abd91b8e2b Mon Sep 17 00:00:00 2001 From: Jason White Date: Tue, 1 Dec 2015 01:13:49 -0500 Subject: [PATCH] backpressure rate controller consumes events preferentially from lagging partitions --- .../kafka/DirectKafkaInputDStream.scala | 39 +++++++++++-------- .../streaming/kafka/KafkaTestUtils.scala | 6 +-- .../kafka/JavaDirectKafkaStreamSuite.java | 2 +- .../streaming/kafka/JavaKafkaRDDSuite.java | 2 +- .../streaming/kafka/JavaKafkaStreamSuite.java | 2 +- .../kafka/DirectKafkaStreamSuite.scala | 17 ++++---- 6 files changed, 37 insertions(+), 31 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 1000094e93cb3..ae267dbc04f00 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -89,23 +89,29 @@ class DirectKafkaInputDStream[ private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt( "spark.streaming.kafka.maxRatePerPartition", 0) - protected def maxMessagesPerPartition: Option[Long] = { + protected def maxMessagesPerPartition(leaderOffsets: Map[TopicAndPartition, LeaderOffset]) + : Option[Map[TopicAndPartition, Long]] = { val estimatedRateLimit = rateController.map(_.getLatestRate().toInt) - val numPartitions = currentOffsets.keys.size - - val effectiveRateLimitPerPartition = estimatedRateLimit - .filter(_ > 0) - .map { limit => - if (maxRateLimitPerPartition > 0) { - Math.min(maxRateLimitPerPartition, (limit / numPartitions)) - } else { - limit / numPartitions + + // calculate a per-partition rate limit based on current lag + val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match { + case Some(rate) => + val lagPerPartition = leaderOffsets.map { case (tp, lo) => + tp -> Math.max(lo.offset - currentOffsets(tp), 0) + } + val totalLag = lagPerPartition.values.sum.toFloat + + lagPerPartition.map { case (tp, lag) => + tp -> Math.round(lag / Math.max(totalLag, 1) * rate) } - }.getOrElse(maxRateLimitPerPartition) + case None => leaderOffsets.map { case (tp, lo) => tp -> maxRateLimitPerPartition } + } - if (effectiveRateLimitPerPartition > 0) { + if (effectiveRateLimitPerPartition.values.sum > 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 - Some((secsPerBatch * effectiveRateLimitPerPartition).toLong) + Some(effectiveRateLimitPerPartition.map { + case (tp, limit) => tp -> (secsPerBatch * limit).toLong + }) } else { None } @@ -134,9 +140,10 @@ class DirectKafkaInputDStream[ // limits the maximum number of messages per partition protected def clamp( leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = { - maxMessagesPerPartition.map { mmp => - leaderOffsets.map { case (tp, lo) => - tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset)) + maxMessagesPerPartition(leaderOffsets).map { mmp => + mmp.map { case (tp, messages) => + val lo = leaderOffsets(tp) + tp -> lo.copy(offset = Math.min(currentOffsets(tp) + messages, lo.offset)) } }.getOrElse(leaderOffsets) } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index c9fd715d3d554..b1965df7465f0 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -152,10 +152,10 @@ private[kafka] class KafkaTestUtils extends Logging { } /** Create a Kafka topic and wait until it propagated to the whole cluster */ - def createTopic(topic: String): Unit = { - AdminUtils.createTopic(zkClient, topic, 1, 1) + def createTopic(topic: String, partitions: Int = 1): Unit = { + AdminUtils.createTopic(zkClient, topic, partitions, 1) // wait until metadata is propagated - waitUntilMetadataIsPropagated(topic, 0) + (0 until partitions).foreach { p => waitUntilMetadataIsPropagated(topic, p) } } /** Java-friendly function for sending messages to the Kafka broker */ diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java index 9db07d0507fea..e4f6207e40fc8 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -169,7 +169,7 @@ private HashMap topicOffsetToMap(String topic, Long off private String[] createTopicAndSendData(String topic) { String[] data = { topic + "-1", topic + "-2", topic + "-3"}; - kafkaTestUtils.createTopic(topic); + kafkaTestUtils.createTopic(topic, 1); kafkaTestUtils.sendMessages(topic, data); return data; } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java index a9dc6e50613ca..fce5afe3f057b 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java @@ -148,7 +148,7 @@ public String call(MessageAndMetadata msgAndMd) throws Exception private String[] createTopicAndSendData(String topic) { String[] data = { topic + "-1", topic + "-2", topic + "-3"}; - kafkaTestUtils.createTopic(topic); + kafkaTestUtils.createTopic(topic, 1); kafkaTestUtils.sendMessages(topic, data); return data; } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index e4c659215b767..ef8987774a429 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -75,7 +75,7 @@ public void testKafkaStream() throws InterruptedException { sent.put("b", 3); sent.put("c", 10); - kafkaTestUtils.createTopic(topic); + kafkaTestUtils.createTopic(topic, 1); kafkaTestUtils.sendMessages(topic, sent); HashMap kafkaParams = new HashMap(); diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 02225d5aa7cc5..2b68381685ba9 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.streaming.kafka import java.io.File import java.util.concurrent.atomic.AtomicLong -import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset +import org.apache.spark.streaming.kafka.KafkaCluster import org.apache.spark.streaming.scheduler.rate.RateEstimator import scala.collection.mutable @@ -36,9 +36,10 @@ import org.scalatest.concurrent.Eventually import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} +import org.apache.spark.streaming.{kafka, Milliseconds, StreamingContext, Time} import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.scheduler._ +import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset import org.apache.spark.util.Utils class DirectKafkaStreamSuite @@ -355,8 +356,8 @@ class DirectKafkaStreamSuite test("using rate controller") { val topic = "backpressure" - val topicPartition = TopicAndPartition(topic, 0) - kafkaTestUtils.createTopic(topic) + val topicPartitions = Set(TopicAndPartition(topic, 0), TopicAndPartition(topic, 1)) + kafkaTestUtils.createTopic(topic, 2) val kafkaParams = Map( "metadata.broker.list" -> kafkaTestUtils.brokerAddress, "auto.offset.reset" -> "smallest" @@ -364,8 +365,8 @@ class DirectKafkaStreamSuite val batchIntervalMilliseconds = 100 val estimator = new ConstantEstimator(100) - val messageKeys = (1 to 200).map(_.toString) - val messages = messageKeys.map((_, 1)).toMap + val messages = Map("foo" -> 200) + kafkaTestUtils.sendMessages(topic, messages) val sparkConf = new SparkConf() // Safe, even with streaming, because we're using the direct API. @@ -380,7 +381,7 @@ class DirectKafkaStreamSuite val kafkaStream = withClue("Error creating direct stream") { val kc = new KafkaCluster(kafkaParams) val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) - val m = kc.getEarliestLeaderOffsets(Set(topicPartition)) + val m = kc.getEarliestLeaderOffsets(topicPartitions) .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset)) new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( @@ -412,7 +413,6 @@ class DirectKafkaStreamSuite estimator.updateRate(rate) // Set a new rate. // Expect blocks of data equal to "rate", scaled by the interval length in secs. val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) - kafkaTestUtils.sendMessages(topic, messages) eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) { // Assert that rate estimator values are used to determine maxMessagesPerPartition. // Funky "-" in message makes the complete assertion message read better. @@ -469,4 +469,3 @@ private[streaming] class ConstantEstimator(@volatile private var rate: Long) processingDelay: Long, schedulingDelay: Long): Option[Double] = Some(rate) } -