Skip to content

Commit

Permalink
backpressure rate controller consumes events preferentially from lagg…
Browse files Browse the repository at this point in the history
…ing partitions
  • Loading branch information
JasonMWhite committed Dec 1, 2015
1 parent 50b9c6b commit 2d11daa
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,29 @@ class DirectKafkaInputDStream[

private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRatePerPartition", 0)
protected def maxMessagesPerPartition: Option[Long] = {
protected def maxMessagesPerPartition(leaderOffsets: Map[TopicAndPartition, LeaderOffset])
: Option[Map[TopicAndPartition, Long]] = {
val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
val numPartitions = currentOffsets.keys.size

val effectiveRateLimitPerPartition = estimatedRateLimit
.filter(_ > 0)
.map { limit =>
if (maxRateLimitPerPartition > 0) {
Math.min(maxRateLimitPerPartition, (limit / numPartitions))
} else {
limit / numPartitions

// calculate a per-partition rate limit based on current lag
val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {
case Some(rate) =>
val lagPerPartition = leaderOffsets.map { case (tp, lo) =>
tp -> Math.max(lo.offset - currentOffsets(tp), 0)
}
val totalLag = lagPerPartition.values.sum.toFloat

lagPerPartition.map { case (tp, lag) =>
tp -> Math.round(lag / Math.max(totalLag, 1) * rate)
}
}.getOrElse(maxRateLimitPerPartition)
case None => leaderOffsets.map { case (tp, lo) => tp -> maxRateLimitPerPartition }
}

if (effectiveRateLimitPerPartition > 0) {
if (effectiveRateLimitPerPartition.values.sum > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
Some((secsPerBatch * effectiveRateLimitPerPartition).toLong)
Some(effectiveRateLimitPerPartition.map {
case (tp, limit) => tp -> (secsPerBatch * limit).toLong
})
} else {
None
}
Expand Down Expand Up @@ -134,9 +140,10 @@ class DirectKafkaInputDStream[
// limits the maximum number of messages per partition
protected def clamp(
leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
maxMessagesPerPartition.map { mmp =>
leaderOffsets.map { case (tp, lo) =>
tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset))
maxMessagesPerPartition(leaderOffsets).map { mmp =>
mmp.map { case (tp, messages) =>
val lo = leaderOffsets(tp)
tp -> lo.copy(offset = Math.min(currentOffsets(tp) + messages, lo.offset))
}
}.getOrElse(leaderOffsets)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@ private[kafka] class KafkaTestUtils extends Logging {
}

/** Create a Kafka topic and wait until it propagated to the whole cluster */
def createTopic(topic: String): Unit = {
AdminUtils.createTopic(zkClient, topic, 1, 1)
def createTopic(topic: String, partitions: Int = 1): Unit = {
AdminUtils.createTopic(zkClient, topic, partitions, 1)
// wait until metadata is propagated
waitUntilMetadataIsPropagated(topic, 0)
(0 until partitions).foreach { p => waitUntilMetadataIsPropagated(topic, p) }
}

/** Java-friendly function for sending messages to the Kafka broker */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private HashMap<TopicAndPartition, Long> topicOffsetToMap(String topic, Long off

private String[] createTopicAndSendData(String topic) {
String[] data = { topic + "-1", topic + "-2", topic + "-3"};
kafkaTestUtils.createTopic(topic);
kafkaTestUtils.createTopic(topic, 1);
kafkaTestUtils.sendMessages(topic, data);
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception

private String[] createTopicAndSendData(String topic) {
String[] data = { topic + "-1", topic + "-2", topic + "-3"};
kafkaTestUtils.createTopic(topic);
kafkaTestUtils.createTopic(topic, 1);
kafkaTestUtils.sendMessages(topic, data);
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void testKafkaStream() throws InterruptedException {
sent.put("b", 3);
sent.put("c", 10);

kafkaTestUtils.createTopic(topic);
kafkaTestUtils.createTopic(topic, 1);
kafkaTestUtils.sendMessages(topic, sent);

HashMap<String, String> kafkaParams = new HashMap<String, String>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.streaming.kafka
import java.io.File
import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.streaming.kafka.KafkaCluster
import org.apache.spark.streaming.scheduler.rate.RateEstimator

import scala.collection.mutable
Expand All @@ -36,9 +36,10 @@ import org.scalatest.concurrent.Eventually

import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
import org.apache.spark.streaming.{kafka, Milliseconds, StreamingContext, Time}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.util.Utils

class DirectKafkaStreamSuite
Expand Down Expand Up @@ -355,17 +356,17 @@ class DirectKafkaStreamSuite

test("using rate controller") {
val topic = "backpressure"
val topicPartition = TopicAndPartition(topic, 0)
kafkaTestUtils.createTopic(topic)
val topicPartitions = Set(TopicAndPartition(topic, 0), TopicAndPartition(topic, 1))
kafkaTestUtils.createTopic(topic, 2)
val kafkaParams = Map(
"metadata.broker.list" -> kafkaTestUtils.brokerAddress,
"auto.offset.reset" -> "smallest"
)

val batchIntervalMilliseconds = 100
val estimator = new ConstantEstimator(100)
val messageKeys = (1 to 200).map(_.toString)
val messages = messageKeys.map((_, 1)).toMap
val messages = Map("foo" -> 200)
kafkaTestUtils.sendMessages(topic, messages)

val sparkConf = new SparkConf()
// Safe, even with streaming, because we're using the direct API.
Expand All @@ -380,7 +381,7 @@ class DirectKafkaStreamSuite
val kafkaStream = withClue("Error creating direct stream") {
val kc = new KafkaCluster(kafkaParams)
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
val m = kc.getEarliestLeaderOffsets(Set(topicPartition))
val m = kc.getEarliestLeaderOffsets(topicPartitions)
.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset))

new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
Expand Down Expand Up @@ -412,7 +413,6 @@ class DirectKafkaStreamSuite
estimator.updateRate(rate) // Set a new rate.
// Expect blocks of data equal to "rate", scaled by the interval length in secs.
val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
kafkaTestUtils.sendMessages(topic, messages)
eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) {
// Assert that rate estimator values are used to determine maxMessagesPerPartition.
// Funky "-" in message makes the complete assertion message read better.
Expand Down Expand Up @@ -469,4 +469,3 @@ private[streaming] class ConstantEstimator(@volatile private var rate: Long)
processingDelay: Long,
schedulingDelay: Long): Option[Double] = Some(rate)
}

0 comments on commit 2d11daa

Please sign in to comment.