From f9adda322b01f75558e56187490feefd17df7270 Mon Sep 17 00:00:00 2001 From: junhao Date: Thu, 26 Nov 2015 11:11:28 +0800 Subject: [PATCH 1/7] change the Configuration Item for backpressure initialRate name --- .../spark/streaming/receiver/RateLimiter.scala | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index bca1fbc8fda2..3da03bca6724 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming.receiver import com.google.common.util.concurrent.{RateLimiter => GuavaRateLimiter} +import org.apache.spark.streaming.scheduler.RateController import org.apache.spark.{Logging, SparkConf} @@ -61,4 +62,21 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { rateLimiter.setRate(newRate) } } + + /** + * Get the initial rateLimit to initial rateLimiter + * @return + */ + def getInitialRateLimit() : Long = { + if (RateController.isBackPressureEnabled(conf)) { + val initialRate = conf.getLong("spark.streaming.backpressure.initialRate", 1000) + if (initialRate < maxRateLimit) { + initialRate + } else { + maxRateLimit + } + } else { + maxRateLimit + } + } } From e6f9db9351cc0c031816acfe59109d95b3c65030 Mon Sep 17 00:00:00 2001 From: junhao Date: Thu, 26 Nov 2015 11:23:36 +0800 Subject: [PATCH 2/7] add initial input rate limit for spark streaming backpressure --- .../scala/org/apache/spark/streaming/receiver/RateLimiter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index 3da03bca6724..32ef5f6c41c4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -37,7 +37,7 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { // treated as an upper limit private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue) - private lazy val rateLimiter = GuavaRateLimiter.create(maxRateLimit.toDouble) + private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit()) def waitToPush() { rateLimiter.acquire() From 8dbfb2d18040ab8aeb129d80a64ae14bd0544c1c Mon Sep 17 00:00:00 2001 From: junhao Date: Thu, 26 Nov 2015 11:29:31 +0800 Subject: [PATCH 3/7] fix add initial input rate limit for spark streaming backpressure bugs --- .../scala/org/apache/spark/streaming/receiver/RateLimiter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index 32ef5f6c41c4..8850b26ce5eb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -37,7 +37,7 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { // treated as an upper limit private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue) - private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit()) + private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble) def waitToPush() { rateLimiter.acquire() From 89fa959a67eb4289edbc57ec31b20242535ba0a4 Mon Sep 17 00:00:00 2001 From: junhao Date: Fri, 4 Dec 2015 18:48:32 +0800 Subject: [PATCH 4/7] add spark.streaming.backpressure.initialRate to the configuration docs --- docs/configuration.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index 85e7d1202d2a..098eafe8d66d 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1523,6 +1523,15 @@ Apart from these, the following properties are also available, and may be useful if they are set (see below). + + spark.streaming.backpressure.initialRate + 1000 + + Initial rate for backpressure mechanism (since 1.5). This provides maximum receiving rate of + receivers in the first batch when enables the backpressure mechanism, then the maximum receiving + rate will compute dynamically based on the current batch scheduling delays and processing times. + + spark.streaming.blockInterval 200ms From 476d80472710522514bf9ba70eb1cf42017a0071 Mon Sep 17 00:00:00 2001 From: junhao Date: Thu, 17 Dec 2015 13:13:28 +0800 Subject: [PATCH 5/7] optimize add initial input rate limit for spark streaming backpressure --- .../spark/streaming/receiver/RateLimiter.scala | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index 8850b26ce5eb..6a1b672220bd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -18,7 +18,6 @@ package org.apache.spark.streaming.receiver import com.google.common.util.concurrent.{RateLimiter => GuavaRateLimiter} -import org.apache.spark.streaming.scheduler.RateController import org.apache.spark.{Logging, SparkConf} @@ -65,18 +64,8 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { /** * Get the initial rateLimit to initial rateLimiter - * @return */ - def getInitialRateLimit() : Long = { - if (RateController.isBackPressureEnabled(conf)) { - val initialRate = conf.getLong("spark.streaming.backpressure.initialRate", 1000) - if (initialRate < maxRateLimit) { - initialRate - } else { - maxRateLimit - } - } else { - maxRateLimit - } + private def getInitialRateLimit(): Long = { + math.min(conf.getLong("spark.streaming.backpressure.initialRate", maxRateLimit), maxRateLimit) } } From f2a8ac78190308cee406db926e4a0a5c52a8ba9e Mon Sep 17 00:00:00 2001 From: junhao Date: Thu, 17 Dec 2015 15:17:09 +0800 Subject: [PATCH 6/7] update the configuration doc for spark.streaming.backpressure.initialRate --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 098eafe8d66d..1646b34925a8 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1525,7 +1525,7 @@ Apart from these, the following properties are also available, and may be useful spark.streaming.backpressure.initialRate - 1000 + not set Initial rate for backpressure mechanism (since 1.5). This provides maximum receiving rate of receivers in the first batch when enables the backpressure mechanism, then the maximum receiving From 2d750c4c1cedaff9849137710b58242bcd15bef9 Mon Sep 17 00:00:00 2001 From: junhao Date: Mon, 21 Dec 2015 09:56:27 +0800 Subject: [PATCH 7/7] modify the configuration description of spark.streaming.backpressure.initialRate --- docs/configuration.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 1646b34925a8..c8ee644e936c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1527,9 +1527,8 @@ Apart from these, the following properties are also available, and may be useful spark.streaming.backpressure.initialRate not set - Initial rate for backpressure mechanism (since 1.5). This provides maximum receiving rate of - receivers in the first batch when enables the backpressure mechanism, then the maximum receiving - rate will compute dynamically based on the current batch scheduling delays and processing times. + This is the initial maximum receiving rate at which each receiver will receive data for the + first batch when the backpressure mechanism is enabled.