From 4721c7d9394b7917b2be8fb7df5e4eb1c31d68df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Garillot?= Date: Mon, 13 Jul 2015 13:26:57 +0200 Subject: [PATCH 01/17] [SPARK-8975][Streaming] Add a mechanism to send a new rate from the driver to the block generator --- .../streaming/receiver/RateLimiter.scala | 24 ++++++++++++++++--- .../streaming/receiver/ReceiverMessage.scala | 3 ++- .../receiver/ReceiverSupervisorImpl.scala | 2 ++ .../streaming/scheduler/ReceiverTracker.scala | 8 ++++++- 4 files changed, 32 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index 8df542b367d27..356ae340387df 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.receiver +import java.util.concurrent.atomic.AtomicInteger + import com.google.common.util.concurrent.{RateLimiter => GuavaRateLimiter} import org.apache.spark.{Logging, SparkConf} @@ -34,12 +36,28 @@ import org.apache.spark.{Logging, SparkConf} */ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { - private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0) - private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate) + // treated as an upper limit + private val maxRateLimit = conf.getInt("spark.streaming.receiver.maxRate", 0) + private[receiver] var currentRateLimit = new AtomicInteger(maxRateLimit) + private lazy val rateLimiter = GuavaRateLimiter.create(currentRateLimit.get()) def waitToPush() { - if (desiredRate > 0) { + if (currentRateLimit.get() > 0) { rateLimiter.acquire() } } + + private[receiver] def updateRate(newRate: Int): Unit = + if (newRate > 0) { + try { + if (maxRateLimit > 0) { + currentRateLimit.set(newRate.min(maxRateLimit)) + } + else { + currentRateLimit.set(newRate) + } + } finally { + rateLimiter.setRate(currentRateLimit.get()) + } + } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala index 7bf3c33319491..1eb55affaa9d0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala @@ -23,4 +23,5 @@ import org.apache.spark.streaming.Time private[streaming] sealed trait ReceiverMessage extends Serializable private[streaming] object StopReceiver extends ReceiverMessage private[streaming] case class CleanupOldBlocks(threshTime: Time) extends ReceiverMessage - +private[streaming] case class UpdateRateLimit(elementsPerSecond: Long) + extends ReceiverMessage diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 6078cdf8f8790..6e819460b1b23 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -77,6 +77,8 @@ private[streaming] class ReceiverSupervisorImpl( case CleanupOldBlocks(threshTime) => logDebug("Received delete old batch signal") cleanupOldBlocks(threshTime) + case UpdateRateLimit(eps) => + blockGenerator.updateRate(eps.toInt) } }) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 644e581cd8279..604d1a0dae289 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SparkEnv, SparkException} import org.apache.spark.rpc._ import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl, - StopReceiver} + StopReceiver, UpdateRateLimit} import org.apache.spark.util.SerializableConfiguration /** @@ -180,6 +180,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false logError(s"Deregistered receiver for stream $streamId: $messageWithError") } + /** Update a receiver's maximum rate from an estimator's update */ + def sendRateUpdate(streamUID: Int, newRate: Long): Unit = { + for (info <- receiverInfo.get(streamUID); eP <- Option(info.endpoint)) + eP.send(UpdateRateLimit(newRate)) + } + /** Add new blocks for the given stream */ private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { receivedBlockTracker.addBlock(receivedBlockInfo) From d15de422b973a020d5aa9035016c1274262631fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Garillot?= Date: Wed, 15 Jul 2015 13:14:31 +0200 Subject: [PATCH 02/17] [SPARK-8975][Streaming] Adds Ratelimiter unit tests w.r.t. spark.streaming.receiver.maxRate --- .../streaming/receiver/RateLimiterSuite.scala | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala new file mode 100644 index 0000000000000..904c7773c5f2c --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.receiver + +import org.apache.spark.SparkConf +import org.apache.spark.SparkFunSuite + +/** Testsuite for testing the network receiver behavior */ +class RateLimiterSuite extends SparkFunSuite { + + test("rate limiter initializes even without a maxRate set") { + val conf = new SparkConf() + val rateLimiter = new RateLimiter(conf){} + rateLimiter.updateRate(105) + assert(rateLimiter.currentRateLimit.get == 105) + } + + test("rate limiter updates when below maxRate") { + val conf = new SparkConf().set("spark.streaming.receiver.maxRate", "110") + val rateLimiter = new RateLimiter(conf){} + rateLimiter.updateRate(105) + assert(rateLimiter.currentRateLimit.get == 105) + } + + test("rate limiter stays below maxRate despite large updates") { + val conf = new SparkConf().set("spark.streaming.receiver.maxRate", "100") + val rateLimiter = new RateLimiter(conf){} + rateLimiter.updateRate(105) + assert(rateLimiter.currentRateLimit.get == 100) + } + +} From cd1397d141eda98ded62491c0f2d90a2b47e56c5 Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Fri, 17 Jul 2015 16:38:10 +0200 Subject: [PATCH 03/17] Add a test for the propagation of a new rate limit from driver to receivers. --- .../spark/streaming/receiver/Receiver.scala | 2 +- .../receiver/ReceiverSupervisor.scala | 3 ++ .../receiver/ReceiverSupervisorImpl.scala | 3 ++ .../spark/streaming/TestSuiteBase.scala | 15 ++++++++ .../scheduler/ReceiverTrackerSuite.scala | 34 +++++++++++++++++++ 5 files changed, 56 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala index 5b5a3fe648602..c3078cd4ad35f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala @@ -271,7 +271,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable } /** Get the attached executor. */ - private def executor = { + private[streaming] def executor = { assert(executor_ != null, "Executor has not been attached to this receiver") executor_ } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index eeb14ca3a49e9..944d893b9bbf7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -58,6 +58,9 @@ private[streaming] abstract class ReceiverSupervisor( /** Time between a receiver is stopped and started again */ private val defaultRestartDelay = conf.getInt("spark.streaming.receiverRestartDelay", 2000) + /** The current maximum rate limit for this receiver. */ + private[streaming] def getCurrentRateLimit: Option[Int] = None + /** Exception associated with the stopping of the receiver */ @volatile protected var stoppingError: Throwable = null diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 6e819460b1b23..edb0fc3718fc7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -100,6 +100,9 @@ private[streaming] class ReceiverSupervisorImpl( } }, streamId, env.conf) + override private[streaming] def getCurrentRateLimit: Option[Int] = + Some(blockGenerator.currentRateLimit.get) + /** Push a single record of received data into block generator. */ def pushSingle(data: Any) { blockGenerator.addData(data) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 0d58a7b54412f..d0ac371db9aad 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -537,4 +537,19 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging { verifyOutput[W](output, expectedOutput, useSet) } } + + /** + * Wait until `cond` becomes true, or timeout ms have passed. This method checks the condition + * every 100ms, so it won't wait more than 100ms more than necessary. + * + * @param cond A boolean that should become `true` + * @param timemout How many millis to wait before giving up + */ + def waitUntil(cond: => Boolean, timeout: Int): Unit = { + val start = System.currentTimeMillis() + val end = start + timeout + while ((System.currentTimeMillis() < end) && !cond) { + Thread.sleep(100) + } + } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index a6e783861dbe6..9da851b5e6c1e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -22,6 +22,9 @@ import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver._ import org.apache.spark.util.Utils +import org.apache.spark.streaming.dstream.InputDStream +import scala.reflect.ClassTag +import org.apache.spark.streaming.dstream.ReceiverInputDStream /** Testsuite for receiver scheduling */ class ReceiverTrackerSuite extends TestSuiteBase { @@ -72,15 +75,46 @@ class ReceiverTrackerSuite extends TestSuiteBase { assert(locations(0).length === 1) assert(locations(3).length === 1) } + + test("Receiver tracker - propagates rate limit") { + val newRateLimit = 100 + val ids = new TestReceiverInputDStream(ssc) + val tracker = new ReceiverTracker(ssc) + tracker.start() + waitUntil(TestDummyReceiver.started, 5000) + tracker.sendRateUpdate(ids.id, newRateLimit) + // this is an async message, we need to wait a bit for it to be processed + waitUntil(ids.getRateLimit.get == newRateLimit, 1000) + assert(ids.getRateLimit.get === newRateLimit) + } +} + +/** An input DStream with a hard-coded receiver that gives access to internals for testing. */ +private class TestReceiverInputDStream(@transient ssc_ : StreamingContext) + extends ReceiverInputDStream[Int](ssc_) { + + override def getReceiver(): DummyReceiver = TestDummyReceiver + + def getRateLimit: Option[Int] = + TestDummyReceiver.executor.getCurrentRateLimit } +/** + * We need the receiver to be an object, otherwise serialization will create another one + * and we won't be able to read its rate limit. + */ +private object TestDummyReceiver extends DummyReceiver + /** * Dummy receiver implementation */ private class DummyReceiver(host: Option[String] = None) extends Receiver[Int](StorageLevel.MEMORY_ONLY) { + var started = false + def onStart() { + started = true } def onStop() { From 261a05128ec1e1e055c62a6afd44fef39fb711c1 Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Mon, 20 Jul 2015 14:28:46 +0200 Subject: [PATCH 04/17] - removed field to hold the current rate limit in rate limiter - made rate limit a Long and default to Long.MaxValue (consequence of the above) - removed custom `waitUntil` and replaced it by `eventually` --- .../streaming/receiver/RateLimiter.scala | 30 +++++++++---------- .../spark/streaming/receiver/Receiver.scala | 2 +- .../receiver/ReceiverSupervisor.scala | 2 +- .../receiver/ReceiverSupervisorImpl.scala | 7 +++-- .../streaming/scheduler/ReceiverTracker.scala | 2 +- .../spark/streaming/TestSuiteBase.scala | 15 ---------- .../streaming/receiver/RateLimiterSuite.scala | 7 ++--- .../scheduler/ReceiverTrackerSuite.scala | 17 +++++++---- 8 files changed, 36 insertions(+), 46 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index 356ae340387df..0f15ddc7288ab 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -37,27 +37,25 @@ import org.apache.spark.{Logging, SparkConf} private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { // treated as an upper limit - private val maxRateLimit = conf.getInt("spark.streaming.receiver.maxRate", 0) - private[receiver] var currentRateLimit = new AtomicInteger(maxRateLimit) - private lazy val rateLimiter = GuavaRateLimiter.create(currentRateLimit.get()) + private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue) + private lazy val rateLimiter = GuavaRateLimiter.create(maxRateLimit.toDouble) def waitToPush() { - if (currentRateLimit.get() > 0) { - rateLimiter.acquire() - } + rateLimiter.acquire() } - private[receiver] def updateRate(newRate: Int): Unit = + /** + * Return the current rate limit. If no limit has been set so far, it returns {{{Long.MaxValue}}}. + */ + def getCurrentLimit: Long = + rateLimiter.getRate.toLong + + private[receiver] def updateRate(newRate: Long): Unit = if (newRate > 0) { - try { - if (maxRateLimit > 0) { - currentRateLimit.set(newRate.min(maxRateLimit)) - } - else { - currentRateLimit.set(newRate) - } - } finally { - rateLimiter.setRate(currentRateLimit.get()) + if (maxRateLimit > 0) { + rateLimiter.setRate(newRate.min(maxRateLimit)) + } else { + rateLimiter.setRate(newRate) } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala index c3078cd4ad35f..c8ccfce3902c0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala @@ -271,7 +271,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable } /** Get the attached executor. */ - private[streaming] def executor = { + private[streaming] def executor: ReceiverSupervisor = { assert(executor_ != null, "Executor has not been attached to this receiver") executor_ } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index 944d893b9bbf7..18a5bd7519fef 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -59,7 +59,7 @@ private[streaming] abstract class ReceiverSupervisor( private val defaultRestartDelay = conf.getInt("spark.streaming.receiverRestartDelay", 2000) /** The current maximum rate limit for this receiver. */ - private[streaming] def getCurrentRateLimit: Option[Int] = None + private[streaming] def getCurrentRateLimit: Option[Long] = None /** Exception associated with the stopping of the receiver */ @volatile protected var stoppingError: Throwable = null diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index edb0fc3718fc7..0ada69b6e1aa1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -78,7 +78,8 @@ private[streaming] class ReceiverSupervisorImpl( logDebug("Received delete old batch signal") cleanupOldBlocks(threshTime) case UpdateRateLimit(eps) => - blockGenerator.updateRate(eps.toInt) + logInfo(s"Received a new rate limit: $eps.") + blockGenerator.updateRate(eps) } }) @@ -100,8 +101,8 @@ private[streaming] class ReceiverSupervisorImpl( } }, streamId, env.conf) - override private[streaming] def getCurrentRateLimit: Option[Int] = - Some(blockGenerator.currentRateLimit.get) + override private[streaming] def getCurrentRateLimit: Option[Long] = + Some(blockGenerator.getCurrentLimit) /** Push a single record of received data into block generator. */ def pushSingle(data: Any) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 604d1a0dae289..af37b4876ffc2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -180,7 +180,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false logError(s"Deregistered receiver for stream $streamId: $messageWithError") } - /** Update a receiver's maximum rate from an estimator's update */ + /** Update a receiver's maximum ingestion rate */ def sendRateUpdate(streamUID: Int, newRate: Long): Unit = { for (info <- receiverInfo.get(streamUID); eP <- Option(info.endpoint)) eP.send(UpdateRateLimit(newRate)) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index d0ac371db9aad..0d58a7b54412f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -537,19 +537,4 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging { verifyOutput[W](output, expectedOutput, useSet) } } - - /** - * Wait until `cond` becomes true, or timeout ms have passed. This method checks the condition - * every 100ms, so it won't wait more than 100ms more than necessary. - * - * @param cond A boolean that should become `true` - * @param timemout How many millis to wait before giving up - */ - def waitUntil(cond: => Boolean, timeout: Int): Unit = { - val start = System.currentTimeMillis() - val end = start + timeout - while ((System.currentTimeMillis() < end) && !cond) { - Thread.sleep(100) - } - } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala index 904c7773c5f2c..c6330eb3673fb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala @@ -27,21 +27,20 @@ class RateLimiterSuite extends SparkFunSuite { val conf = new SparkConf() val rateLimiter = new RateLimiter(conf){} rateLimiter.updateRate(105) - assert(rateLimiter.currentRateLimit.get == 105) + assert(rateLimiter.getCurrentLimit == 105) } test("rate limiter updates when below maxRate") { val conf = new SparkConf().set("spark.streaming.receiver.maxRate", "110") val rateLimiter = new RateLimiter(conf){} rateLimiter.updateRate(105) - assert(rateLimiter.currentRateLimit.get == 105) + assert(rateLimiter.getCurrentLimit == 105) } test("rate limiter stays below maxRate despite large updates") { val conf = new SparkConf().set("spark.streaming.receiver.maxRate", "100") val rateLimiter = new RateLimiter(conf){} rateLimiter.updateRate(105) - assert(rateLimiter.currentRateLimit.get == 100) + assert(rateLimiter.getCurrentLimit === 100) } - } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 9da851b5e6c1e..41d92fb5db32f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.streaming.scheduler +import org.scalatest.concurrent.Eventually._ +import org.scalatest.concurrent.Timeouts +import org.scalatest.time.SpanSugar._ import org.apache.spark.streaming._ import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel @@ -77,15 +80,18 @@ class ReceiverTrackerSuite extends TestSuiteBase { } test("Receiver tracker - propagates rate limit") { - val newRateLimit = 100 + val newRateLimit = 100L val ids = new TestReceiverInputDStream(ssc) val tracker = new ReceiverTracker(ssc) tracker.start() - waitUntil(TestDummyReceiver.started, 5000) + eventually(timeout(5 seconds)) { + assert(TestDummyReceiver.started) + } tracker.sendRateUpdate(ids.id, newRateLimit) // this is an async message, we need to wait a bit for it to be processed - waitUntil(ids.getRateLimit.get == newRateLimit, 1000) - assert(ids.getRateLimit.get === newRateLimit) + eventually(timeout(3 seconds)) { + assert(ids.getCurrentRateLimit.get === newRateLimit) + } } } @@ -95,8 +101,9 @@ private class TestReceiverInputDStream(@transient ssc_ : StreamingContext) override def getReceiver(): DummyReceiver = TestDummyReceiver - def getRateLimit: Option[Int] = + def getCurrentRateLimit: Option[Long] = { TestDummyReceiver.executor.getCurrentRateLimit + } } /** From 0c51959c9315f63bc80a7ff5b716f48f907b1152 Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Mon, 20 Jul 2015 16:40:01 +0200 Subject: [PATCH 05/17] =?UTF-8?q?Added=20a=20few=20tests=20that=20measure?= =?UTF-8?q?=20the=20receiver=E2=80=99s=20rate.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit As I mentioned before, I don’t think this is a great idea: - such tests are flaky (original test in ReceiverSuite was ignored for that reason) - Guava’s code has its own test suite, so we can assume it implements `setRate` correctly I noticed one flaky failure in about 10 runs on my machine (receiver got 1 message less than the lower bound, which is within 5% of the nominal rate). --- .../spark/streaming/ReceiverSuite.scala | 96 ++++------------ .../streaming/receiver/RateLimiterSuite.scala | 108 ++++++++++++++++++ .../scheduler/ReceiverTrackerSuite.scala | 1 - 3 files changed, 130 insertions(+), 75 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 5d7127627eea5..c096a251374b6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -155,63 +155,6 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { assert(recordedData.toSet === generatedData.toSet) } - ignore("block generator throttling") { - val blockGeneratorListener = new FakeBlockGeneratorListener - val blockIntervalMs = 100 - val maxRate = 1001 - val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms"). - set("spark.streaming.receiver.maxRate", maxRate.toString) - val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf) - val expectedBlocks = 20 - val waitTime = expectedBlocks * blockIntervalMs - val expectedMessages = maxRate * waitTime / 1000 - val expectedMessagesPerBlock = maxRate * blockIntervalMs / 1000 - val generatedData = new ArrayBuffer[Int] - - // Generate blocks - val startTime = System.currentTimeMillis() - blockGenerator.start() - var count = 0 - while(System.currentTimeMillis - startTime < waitTime) { - blockGenerator.addData(count) - generatedData += count - count += 1 - } - blockGenerator.stop() - - val recordedBlocks = blockGeneratorListener.arrayBuffers - val recordedData = recordedBlocks.flatten - assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received") - assert(recordedData.toSet === generatedData.toSet, "Received data not same") - - // recordedData size should be close to the expected rate; use an error margin proportional to - // the value, so that rate changes don't cause a brittle test - val minExpectedMessages = expectedMessages - 0.05 * expectedMessages - val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages - val numMessages = recordedData.size - assert( - numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages, - s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages" - ) - - // XXX Checking every block would require an even distribution of messages across blocks, - // which throttling code does not control. Therefore, test against the average. - val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock - val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock - val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",") - - // the first and last block may be incomplete, so we slice them out - val validBlocks = recordedBlocks.drop(1).dropRight(1) - val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size - - assert( - averageBlockSize >= minExpectedMessagesPerBlock && - averageBlockSize <= maxExpectedMessagesPerBlock, - s"# records in received blocks = [$receivedBlockSizes], not between " + - s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average" - ) - } - /** * Test whether write ahead logs are generated by received, * and automatically cleaned up. The clean up must be aware of the @@ -347,28 +290,33 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { errors += throwable } } +} - /** - * An implementation of BlockGeneratorListener that is used to test the BlockGenerator. - */ - class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener { - // buffer of data received as ArrayBuffers - val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]] - val errors = new ArrayBuffer[Throwable] +/** + * An implementation of BlockGeneratorListener that is used to test the BlockGenerator. + */ +class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener { + // buffer of data received as ArrayBuffers + val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]] + val errors = new ArrayBuffer[Throwable] - def onAddData(data: Any, metadata: Any) { } + def onAddData(data: Any, metadata: Any) {} - def onGenerateBlock(blockId: StreamBlockId) { } + def onGenerateBlock(blockId: StreamBlockId) {} - def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { - val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int]) - arrayBuffers += bufferOfInts - Thread.sleep(0) - } + def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { + val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int]) + arrayBuffers += bufferOfInts + Thread.sleep(0) + } - def onError(message: String, throwable: Throwable) { - errors += throwable - } + def onError(message: String, throwable: Throwable) { + errors += throwable + } + + def reset(): Unit = { + arrayBuffers.clear() + errors.clear() } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala index c6330eb3673fb..e58baed5f205a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala @@ -17,8 +17,12 @@ package org.apache.spark.streaming.receiver +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.SparkConf import org.apache.spark.SparkFunSuite +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.FakeBlockGeneratorListener /** Testsuite for testing the network receiver behavior */ class RateLimiterSuite extends SparkFunSuite { @@ -43,4 +47,108 @@ class RateLimiterSuite extends SparkFunSuite { rateLimiter.updateRate(105) assert(rateLimiter.getCurrentLimit === 100) } + + def setupGenerator(blockInterval: Int): (BlockGenerator, FakeBlockGeneratorListener) = { + val blockGeneratorListener = new FakeBlockGeneratorListener + val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockInterval}ms") + val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf) + (blockGenerator, blockGeneratorListener) + } + + test("throttling block generator") { + val blockIntervalMs = 100 + val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs) + val maxRate = 1000 + blockGenerator.updateRate(maxRate) + blockGenerator.start() + throttlingTest(maxRate, blockGenerator, blockGeneratorListener, blockIntervalMs) + blockGenerator.stop() + } + + test("throttling block generator changes rate up") { + val blockIntervalMs = 100 + val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs) + val maxRate1 = 1000 + blockGenerator.start() + blockGenerator.updateRate(maxRate1) + throttlingTest(maxRate1, blockGenerator, blockGeneratorListener, blockIntervalMs) + + blockGeneratorListener.reset() + val maxRate2 = 5000 + blockGenerator.updateRate(maxRate2) + throttlingTest(maxRate2, blockGenerator, blockGeneratorListener, blockIntervalMs) + blockGenerator.stop() + } + + test("throttling block generator changes rate up and down") { + val blockIntervalMs = 100 + val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs) + val maxRate1 = 1000 + blockGenerator.updateRate(maxRate1) + blockGenerator.start() + throttlingTest(maxRate1, blockGenerator, blockGeneratorListener, blockIntervalMs) + + blockGeneratorListener.reset() + val maxRate2 = 5000 + blockGenerator.updateRate(maxRate2) + throttlingTest(maxRate2, blockGenerator, blockGeneratorListener, blockIntervalMs) + + blockGeneratorListener.reset() + val maxRate3 = 1000 + blockGenerator.updateRate(maxRate3) + throttlingTest(maxRate3, blockGenerator, blockGeneratorListener, blockIntervalMs) + blockGenerator.stop() + } + + def throttlingTest( + maxRate: Long, + blockGenerator: BlockGenerator, + blockGeneratorListener: FakeBlockGeneratorListener, + blockIntervalMs: Int) { + val expectedBlocks = 20 + val waitTime = expectedBlocks * blockIntervalMs + val expectedMessages = maxRate * waitTime / 1000 + val expectedMessagesPerBlock = maxRate * blockIntervalMs / 1000 + val generatedData = new ArrayBuffer[Int] + + // Generate blocks + val startTime = System.currentTimeMillis() + var count = 0 + while(System.currentTimeMillis - startTime < waitTime) { + blockGenerator.addData(count) + generatedData += count + count += 1 + } + + val recordedBlocks = blockGeneratorListener.arrayBuffers + val recordedData = recordedBlocks.flatten + assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received") + + // recordedData size should be close to the expected rate; use an error margin proportional to + // the value, so that rate changes don't cause a brittle test + val minExpectedMessages = expectedMessages - 0.05 * expectedMessages + val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages + val numMessages = recordedData.size + assert( + numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages, + s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages" + ) + + // XXX Checking every block would require an even distribution of messages across blocks, + // which throttling code does not control. Therefore, test against the average. + val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock + val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock + val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",") + + // the first and last block may be incomplete, so we slice them out + val validBlocks = recordedBlocks.drop(1).dropRight(1) + val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size + + assert( + averageBlockSize >= minExpectedMessagesPerBlock && + averageBlockSize <= maxExpectedMessagesPerBlock, + s"# records in received blocks = [$receivedBlockSizes], not between " + + s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average" + ) + } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 41d92fb5db32f..46d7bc479b5ff 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -26,7 +26,6 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver._ import org.apache.spark.util.Utils import org.apache.spark.streaming.dstream.InputDStream -import scala.reflect.ClassTag import org.apache.spark.streaming.dstream.ReceiverInputDStream /** Testsuite for receiver scheduling */ From 210f495fff34f25caaba41a8db720c1e3a63fa95 Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Tue, 21 Jul 2015 14:29:04 +0200 Subject: [PATCH 06/17] =?UTF-8?q?Revert=20"Added=20a=20few=20tests=20that?= =?UTF-8?q?=20measure=20the=20receiver=E2=80=99s=20rate."?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 0c51959c9315f63bc80a7ff5b716f48f907b1152. --- .../spark/streaming/ReceiverSuite.scala | 96 ++++++++++++---- .../streaming/receiver/RateLimiterSuite.scala | 108 ------------------ .../scheduler/ReceiverTrackerSuite.scala | 1 + 3 files changed, 75 insertions(+), 130 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index c096a251374b6..5d7127627eea5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -155,6 +155,63 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { assert(recordedData.toSet === generatedData.toSet) } + ignore("block generator throttling") { + val blockGeneratorListener = new FakeBlockGeneratorListener + val blockIntervalMs = 100 + val maxRate = 1001 + val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms"). + set("spark.streaming.receiver.maxRate", maxRate.toString) + val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf) + val expectedBlocks = 20 + val waitTime = expectedBlocks * blockIntervalMs + val expectedMessages = maxRate * waitTime / 1000 + val expectedMessagesPerBlock = maxRate * blockIntervalMs / 1000 + val generatedData = new ArrayBuffer[Int] + + // Generate blocks + val startTime = System.currentTimeMillis() + blockGenerator.start() + var count = 0 + while(System.currentTimeMillis - startTime < waitTime) { + blockGenerator.addData(count) + generatedData += count + count += 1 + } + blockGenerator.stop() + + val recordedBlocks = blockGeneratorListener.arrayBuffers + val recordedData = recordedBlocks.flatten + assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received") + assert(recordedData.toSet === generatedData.toSet, "Received data not same") + + // recordedData size should be close to the expected rate; use an error margin proportional to + // the value, so that rate changes don't cause a brittle test + val minExpectedMessages = expectedMessages - 0.05 * expectedMessages + val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages + val numMessages = recordedData.size + assert( + numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages, + s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages" + ) + + // XXX Checking every block would require an even distribution of messages across blocks, + // which throttling code does not control. Therefore, test against the average. + val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock + val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock + val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",") + + // the first and last block may be incomplete, so we slice them out + val validBlocks = recordedBlocks.drop(1).dropRight(1) + val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size + + assert( + averageBlockSize >= minExpectedMessagesPerBlock && + averageBlockSize <= maxExpectedMessagesPerBlock, + s"# records in received blocks = [$receivedBlockSizes], not between " + + s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average" + ) + } + /** * Test whether write ahead logs are generated by received, * and automatically cleaned up. The clean up must be aware of the @@ -290,33 +347,28 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { errors += throwable } } -} -/** - * An implementation of BlockGeneratorListener that is used to test the BlockGenerator. - */ -class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener { - // buffer of data received as ArrayBuffers - val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]] - val errors = new ArrayBuffer[Throwable] - - def onAddData(data: Any, metadata: Any) {} + /** + * An implementation of BlockGeneratorListener that is used to test the BlockGenerator. + */ + class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener { + // buffer of data received as ArrayBuffers + val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]] + val errors = new ArrayBuffer[Throwable] - def onGenerateBlock(blockId: StreamBlockId) {} + def onAddData(data: Any, metadata: Any) { } - def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { - val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int]) - arrayBuffers += bufferOfInts - Thread.sleep(0) - } + def onGenerateBlock(blockId: StreamBlockId) { } - def onError(message: String, throwable: Throwable) { - errors += throwable - } + def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { + val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int]) + arrayBuffers += bufferOfInts + Thread.sleep(0) + } - def reset(): Unit = { - arrayBuffers.clear() - errors.clear() + def onError(message: String, throwable: Throwable) { + errors += throwable + } } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala index e58baed5f205a..c6330eb3673fb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala @@ -17,12 +17,8 @@ package org.apache.spark.streaming.receiver -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.SparkConf import org.apache.spark.SparkFunSuite -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.FakeBlockGeneratorListener /** Testsuite for testing the network receiver behavior */ class RateLimiterSuite extends SparkFunSuite { @@ -47,108 +43,4 @@ class RateLimiterSuite extends SparkFunSuite { rateLimiter.updateRate(105) assert(rateLimiter.getCurrentLimit === 100) } - - def setupGenerator(blockInterval: Int): (BlockGenerator, FakeBlockGeneratorListener) = { - val blockGeneratorListener = new FakeBlockGeneratorListener - val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockInterval}ms") - val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf) - (blockGenerator, blockGeneratorListener) - } - - test("throttling block generator") { - val blockIntervalMs = 100 - val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs) - val maxRate = 1000 - blockGenerator.updateRate(maxRate) - blockGenerator.start() - throttlingTest(maxRate, blockGenerator, blockGeneratorListener, blockIntervalMs) - blockGenerator.stop() - } - - test("throttling block generator changes rate up") { - val blockIntervalMs = 100 - val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs) - val maxRate1 = 1000 - blockGenerator.start() - blockGenerator.updateRate(maxRate1) - throttlingTest(maxRate1, blockGenerator, blockGeneratorListener, blockIntervalMs) - - blockGeneratorListener.reset() - val maxRate2 = 5000 - blockGenerator.updateRate(maxRate2) - throttlingTest(maxRate2, blockGenerator, blockGeneratorListener, blockIntervalMs) - blockGenerator.stop() - } - - test("throttling block generator changes rate up and down") { - val blockIntervalMs = 100 - val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs) - val maxRate1 = 1000 - blockGenerator.updateRate(maxRate1) - blockGenerator.start() - throttlingTest(maxRate1, blockGenerator, blockGeneratorListener, blockIntervalMs) - - blockGeneratorListener.reset() - val maxRate2 = 5000 - blockGenerator.updateRate(maxRate2) - throttlingTest(maxRate2, blockGenerator, blockGeneratorListener, blockIntervalMs) - - blockGeneratorListener.reset() - val maxRate3 = 1000 - blockGenerator.updateRate(maxRate3) - throttlingTest(maxRate3, blockGenerator, blockGeneratorListener, blockIntervalMs) - blockGenerator.stop() - } - - def throttlingTest( - maxRate: Long, - blockGenerator: BlockGenerator, - blockGeneratorListener: FakeBlockGeneratorListener, - blockIntervalMs: Int) { - val expectedBlocks = 20 - val waitTime = expectedBlocks * blockIntervalMs - val expectedMessages = maxRate * waitTime / 1000 - val expectedMessagesPerBlock = maxRate * blockIntervalMs / 1000 - val generatedData = new ArrayBuffer[Int] - - // Generate blocks - val startTime = System.currentTimeMillis() - var count = 0 - while(System.currentTimeMillis - startTime < waitTime) { - blockGenerator.addData(count) - generatedData += count - count += 1 - } - - val recordedBlocks = blockGeneratorListener.arrayBuffers - val recordedData = recordedBlocks.flatten - assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received") - - // recordedData size should be close to the expected rate; use an error margin proportional to - // the value, so that rate changes don't cause a brittle test - val minExpectedMessages = expectedMessages - 0.05 * expectedMessages - val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages - val numMessages = recordedData.size - assert( - numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages, - s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages" - ) - - // XXX Checking every block would require an even distribution of messages across blocks, - // which throttling code does not control. Therefore, test against the average. - val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock - val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock - val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",") - - // the first and last block may be incomplete, so we slice them out - val validBlocks = recordedBlocks.drop(1).dropRight(1) - val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size - - assert( - averageBlockSize >= minExpectedMessagesPerBlock && - averageBlockSize <= maxExpectedMessagesPerBlock, - s"# records in received blocks = [$receivedBlockSizes], not between " + - s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average" - ) - } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 46d7bc479b5ff..41d92fb5db32f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver._ import org.apache.spark.util.Utils import org.apache.spark.streaming.dstream.InputDStream +import scala.reflect.ClassTag import org.apache.spark.streaming.dstream.ReceiverInputDStream /** Testsuite for receiver scheduling */ From 162d9e598040b5b2bad36fa0f7139a99df95e79d Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Tue, 21 Jul 2015 15:06:13 +0200 Subject: [PATCH 07/17] Use Reflection for accessing truly private `executor` method and use the listener bus to know when receivers have registered (`onStart` is called before receivers have registered, leading to flaky behavior). --- .../streaming/receiver/RateLimiter.scala | 2 -- .../spark/streaming/receiver/Receiver.scala | 2 +- .../streaming/scheduler/ReceiverTracker.scala | 3 +- .../scheduler/ReceiverTrackerSuite.scala | 29 +++++++++++++++---- 4 files changed, 27 insertions(+), 9 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index 0f15ddc7288ab..23a676e97c2c7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -17,8 +17,6 @@ package org.apache.spark.streaming.receiver -import java.util.concurrent.atomic.AtomicInteger - import com.google.common.util.concurrent.{RateLimiter => GuavaRateLimiter} import org.apache.spark.{Logging, SparkConf} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala index c8ccfce3902c0..7504fa44d9fae 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala @@ -271,7 +271,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable } /** Get the attached executor. */ - private[streaming] def executor: ReceiverSupervisor = { + private def executor: ReceiverSupervisor = { assert(executor_ != null, "Executor has not been attached to this receiver") executor_ } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index af37b4876ffc2..b0469ebccecc2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -182,8 +182,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** Update a receiver's maximum ingestion rate */ def sendRateUpdate(streamUID: Int, newRate: Long): Unit = { - for (info <- receiverInfo.get(streamUID); eP <- Option(info.endpoint)) + for (info <- receiverInfo.get(streamUID); eP <- Option(info.endpoint)) { eP.send(UpdateRateLimit(newRate)) + } } /** Add new blocks for the given stream */ diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 41d92fb5db32f..2f0a13e060be0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -80,12 +80,27 @@ class ReceiverTrackerSuite extends TestSuiteBase { } test("Receiver tracker - propagates rate limit") { + object streamingListener extends StreamingListener { + @volatile + var started = false + + override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = { + started = true + } + } + + ssc.addStreamingListener(streamingListener) + ssc.scheduler.listenerBus.start(ssc.sc) + val newRateLimit = 100L val ids = new TestReceiverInputDStream(ssc) val tracker = new ReceiverTracker(ssc) tracker.start() + + // we wait until the Receiver has registered with the tracker, + // otherwise our rate update is lost eventually(timeout(5 seconds)) { - assert(TestDummyReceiver.started) + assert(streamingListener.started) } tracker.sendRateUpdate(ids.id, newRateLimit) // this is an async message, we need to wait a bit for it to be processed @@ -102,7 +117,14 @@ private class TestReceiverInputDStream(@transient ssc_ : StreamingContext) override def getReceiver(): DummyReceiver = TestDummyReceiver def getCurrentRateLimit: Option[Long] = { - TestDummyReceiver.executor.getCurrentRateLimit + invokeExecutorMethod.getCurrentRateLimit + } + + private def invokeExecutorMethod: ReceiverSupervisor = { + val c = classOf[Receiver[_]] + val ex = c.getDeclaredMethod("executor") + ex.setAccessible(true) + ex.invoke(TestDummyReceiver).asInstanceOf[ReceiverSupervisor] } } @@ -118,10 +140,7 @@ private object TestDummyReceiver extends DummyReceiver private class DummyReceiver(host: Option[String] = None) extends Receiver[Int](StorageLevel.MEMORY_ONLY) { - var started = false - def onStart() { - started = true } def onStop() { From 8941cf91b03aa7835a78bc756bee1f32cb7bb1d8 Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Wed, 22 Jul 2015 16:45:57 +0200 Subject: [PATCH 08/17] Renames and other nitpicks. --- .../streaming/receiver/RateLimiter.scala | 6 +++++ .../scheduler/ReceiverTrackerSuite.scala | 26 ++++++++++--------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index 23a676e97c2c7..f663def4c0511 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -48,6 +48,12 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { def getCurrentLimit: Long = rateLimiter.getRate.toLong + /** + * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by + * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that. + * + * @param newRate A new rate in events per second. It has no effect if it's 0 or negative. + */ private[receiver] def updateRate(newRate: Long): Unit = if (newRate > 0) { if (maxRateLimit > 0) { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 2f0a13e060be0..aadb7231757b8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -80,7 +80,7 @@ class ReceiverTrackerSuite extends TestSuiteBase { } test("Receiver tracker - propagates rate limit") { - object streamingListener extends StreamingListener { + object ReceiverStartedWaiter extends StreamingListener { @volatile var started = false @@ -89,32 +89,32 @@ class ReceiverTrackerSuite extends TestSuiteBase { } } - ssc.addStreamingListener(streamingListener) + ssc.addStreamingListener(ReceiverStartedWaiter) ssc.scheduler.listenerBus.start(ssc.sc) val newRateLimit = 100L - val ids = new TestReceiverInputDStream(ssc) + val inputDStream = new RateLimitInputDStream(ssc) val tracker = new ReceiverTracker(ssc) tracker.start() // we wait until the Receiver has registered with the tracker, // otherwise our rate update is lost eventually(timeout(5 seconds)) { - assert(streamingListener.started) + assert(ReceiverStartedWaiter.started) } - tracker.sendRateUpdate(ids.id, newRateLimit) + tracker.sendRateUpdate(inputDStream.id, newRateLimit) // this is an async message, we need to wait a bit for it to be processed eventually(timeout(3 seconds)) { - assert(ids.getCurrentRateLimit.get === newRateLimit) + assert(inputDStream.getCurrentRateLimit.get === newRateLimit) } } } /** An input DStream with a hard-coded receiver that gives access to internals for testing. */ -private class TestReceiverInputDStream(@transient ssc_ : StreamingContext) +private class RateLimitInputDStream(@transient ssc_ : StreamingContext) extends ReceiverInputDStream[Int](ssc_) { - override def getReceiver(): DummyReceiver = TestDummyReceiver + override def getReceiver(): DummyReceiver = SingletonDummyReceiver def getCurrentRateLimit: Option[Long] = { invokeExecutorMethod.getCurrentRateLimit @@ -124,15 +124,17 @@ private class TestReceiverInputDStream(@transient ssc_ : StreamingContext) val c = classOf[Receiver[_]] val ex = c.getDeclaredMethod("executor") ex.setAccessible(true) - ex.invoke(TestDummyReceiver).asInstanceOf[ReceiverSupervisor] + ex.invoke(SingletonDummyReceiver).asInstanceOf[ReceiverSupervisor] } } /** - * We need the receiver to be an object, otherwise serialization will create another one - * and we won't be able to read its rate limit. + * A Receiver as an object so we can read its rate limit. + * + * @note It's necessary to be a top-level object, or else serialization would create another + * one on the executor side and we won't be able to read its rate limit. */ -private object TestDummyReceiver extends DummyReceiver +private object SingletonDummyReceiver extends DummyReceiver /** * Dummy receiver implementation From d32ca3697ab18ba8db9905c81d57559bf8472195 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Garillot?= Date: Tue, 14 Jul 2015 13:11:01 +0200 Subject: [PATCH 09/17] [SPARK-8977][Streaming] Defines the RateEstimator interface, and implements the ReceiverRateController --- .../streaming/dstream/InputDStream.scala | 17 +++++ .../dstream/ReceiverInputDStream.scala | 11 ++- .../streaming/scheduler/JobScheduler.scala | 2 + .../streaming/scheduler/RateController.scala | 69 +++++++++++++++++++ .../scheduler/rate/RateEstimator.scala | 46 +++++++++++++ 5 files changed, 144 insertions(+), 1 deletion(-) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index d58c99a8ff321..3ff360d1dbefa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -22,6 +22,8 @@ import scala.reflect.ClassTag import org.apache.spark.SparkContext import org.apache.spark.rdd.RDDOperationScope import org.apache.spark.streaming.{Time, Duration, StreamingContext} +import org.apache.spark.streaming.scheduler.RateController +import org.apache.spark.streaming.scheduler.rate.NoopRateEstimator import org.apache.spark.util.Utils /** @@ -47,6 +49,21 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) /** This is an unique identifier for the input stream. */ val id = ssc.getNewInputStreamId() + /** + * A rate estimator configured by the user to compute a dynamic ingestion bound for this stream. + * @see `RateEstimator` + */ + protected [streaming] val rateEstimator = ssc.conf + .getOption("spark.streaming.RateEstimator") + .getOrElse("noop") match { + case _ => new NoopRateEstimator() + } + + // Keep track of the freshest rate for this stream using the rateEstimator + protected[streaming] val rateController: RateController = new RateController(id, rateEstimator) { + override def publish(rate: Long): Unit = () + } + /** A human-readable name of this InputDStream */ private[streaming] def name: String = { // e.g. FlumePollingDStream -> "Flume polling stream" diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index a50f0efc030ce..39e3694763826 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -24,7 +24,8 @@ import org.apache.spark.storage.BlockId import org.apache.spark.streaming._ import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD import org.apache.spark.streaming.receiver.Receiver -import org.apache.spark.streaming.scheduler.StreamInputInfo +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.NoopRateEstimator import org.apache.spark.streaming.util.WriteAheadLogUtils /** @@ -40,6 +41,14 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext) extends InputDStream[T](ssc_) { + /** + * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker. + */ + override val rateController: RateController = new RateController(id, rateEstimator) { + override def publish(rate: Long): Unit = + ssc.scheduler.receiverTracker.sendRateUpdate(id, rate) + } + /** * Gets the receiver object that will be sent to the worker nodes * to receive data. This method needs to defined by any specific implementation diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 4af9b6d3b56ab..d3f257429c952 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -66,6 +66,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } eventLoop.start() + // Estimators receive updates from batch completion + ssc.graph.getInputStreams.map(_.rateController).foreach(ssc.addStreamingListener(_)) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala new file mode 100644 index 0000000000000..82244498cc05c --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler + +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.streaming.scheduler.rate.RateEstimator +import org.apache.spark.util.ThreadUtils + +import scala.concurrent.{ExecutionContext, Future} + +/** + * :: DeveloperApi :: + * A StreamingListener that receives batch completion updates, and maintains + * an estimate of the speed at which this stream should ingest messages, + * given an estimate computation from a `RateEstimator` + */ +@DeveloperApi +private [streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator) + extends StreamingListener with Serializable { + + protected def publish(rate: Long): Unit + + // Used to compute & publish the rate update asynchronously + @transient private val executionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonSingleThreadExecutor("stream-rate-update")) + + private val rateLimit : AtomicLong = new AtomicLong(-1L) + + // Asynchronous computation of the rate update + private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit = + Future[Unit] { + val newSpeed = rateEstimator.compute(time, elems, workDelay, waitDelay) + newSpeed foreach { s => + rateLimit.set(s.toLong) + publish(getLatestRate()) + } + } (executionContext) + + def getLatestRate(): Long = rateLimit.get() + + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted){ + val elements = batchCompleted.batchInfo.streamIdToInputInfo + + for ( + processingEnd <- batchCompleted.batchInfo.processingEndTime; + workDelay <- batchCompleted.batchInfo.processingDelay; + waitDelay <- batchCompleted.batchInfo.schedulingDelay; + elems <- elements.get(streamUID).map(_.numRecords) + ) computeAndPublish(processingEnd, elems, workDelay, waitDelay) + } + +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala new file mode 100644 index 0000000000000..1e1ccf135ad70 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler.rate + +import org.apache.spark.annotation.DeveloperApi + +/** + * :: DeveloperApi :: + * A component that estimates the rate at wich an InputDStream should ingest + * elements, based on updates at every batch completion. + */ +@DeveloperApi +private[streaming] trait RateEstimator extends Serializable { + + /** + * Computes the number of elements the stream attached to this `RateEstimator` + * should ingest per second, given an update on the size and completion + * times of the latest batch. + */ + def compute(time: Long, elements: Long, + processingDelay: Long, schedulingDelay: Long): Option[Double] +} + +/** + * The trivial rate estimator never sends an update + */ +private[streaming] class NoopRateEstimator extends RateEstimator { + + def compute(time: Long, elements: Long, + processingDelay: Long, schedulingDelay: Long): Option[Double] = None +} From 34a389dda362467879bb8c87dd047e41a1a931ca Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Wed, 22 Jul 2015 23:13:24 +0200 Subject: [PATCH 10/17] Various style changes and a first test for the rate controller. --- .../streaming/dstream/InputDStream.scala | 16 ++-- .../dstream/ReceiverInputDStream.scala | 6 +- .../streaming/scheduler/JobScheduler.scala | 2 +- .../streaming/scheduler/RateController.scala | 24 +++--- .../scheduler/rate/RateEstimator.scala | 21 +++-- .../scheduler/RateControllerSuite.scala | 78 +++++++++++++++++++ 6 files changed, 122 insertions(+), 25 deletions(-) create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index 3ff360d1dbefa..2b2389624610d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -53,11 +53,17 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) * A rate estimator configured by the user to compute a dynamic ingestion bound for this stream. * @see `RateEstimator` */ - protected [streaming] val rateEstimator = ssc.conf - .getOption("spark.streaming.RateEstimator") - .getOrElse("noop") match { - case _ => new NoopRateEstimator() - } + protected [streaming] val rateEstimator = newEstimator() + + /** + * Return the configured estimator, or `noop` if none was specified. + */ + private def newEstimator() = + ssc.conf.get("spark.streaming.RateEstimator", "noop") match { + case "noop" => new NoopRateEstimator() + case estimator => throw new IllegalArgumentException(s"Unknown rate estimator: $estimator") + } + // Keep track of the freshest rate for this stream using the rateEstimator protected[streaming] val rateController: RateController = new RateController(id, rateEstimator) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 39e3694763826..606aaa51367f1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -45,9 +45,9 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker. */ override val rateController: RateController = new RateController(id, rateEstimator) { - override def publish(rate: Long): Unit = - ssc.scheduler.receiverTracker.sendRateUpdate(id, rate) - } + override def publish(rate: Long): Unit = + ssc.scheduler.receiverTracker.sendRateUpdate(id, rate) + } /** * Gets the receiver object that will be sent to the worker nodes diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index d3f257429c952..006abccdeae0e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -67,7 +67,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { eventLoop.start() // Estimators receive updates from batch completion - ssc.graph.getInputStreams.map(_.rateController).foreach(ssc.addStreamingListener(_)) + ssc.graph.getInputStreams.foreach(is => ssc.addStreamingListener(is.rateController)) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala index 82244498cc05c..8fedf571d4c59 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala @@ -19,12 +19,12 @@ package org.apache.spark.streaming.scheduler import java.util.concurrent.atomic.AtomicLong +import scala.concurrent.{ExecutionContext, Future} + import org.apache.spark.annotation.DeveloperApi import org.apache.spark.streaming.scheduler.rate.RateEstimator import org.apache.spark.util.ThreadUtils -import scala.concurrent.{ExecutionContext, Future} - /** * :: DeveloperApi :: * A StreamingListener that receives batch completion updates, and maintains @@ -38,12 +38,15 @@ private [streaming] abstract class RateController(val streamUID: Int, rateEstima protected def publish(rate: Long): Unit // Used to compute & publish the rate update asynchronously - @transient private val executionContext = ExecutionContext.fromExecutorService( + @transient + implicit private val executionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonSingleThreadExecutor("stream-rate-update")) - private val rateLimit : AtomicLong = new AtomicLong(-1L) + private val rateLimit: AtomicLong = new AtomicLong(-1L) - // Asynchronous computation of the rate update + /** + * Compute the new rate limit and publish it asynchronously. + */ private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit = Future[Unit] { val newSpeed = rateEstimator.compute(time, elems, workDelay, waitDelay) @@ -51,19 +54,18 @@ private [streaming] abstract class RateController(val streamUID: Int, rateEstima rateLimit.set(s.toLong) publish(getLatestRate()) } - } (executionContext) + } def getLatestRate(): Long = rateLimit.get() - override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted){ - val elements = batchCompleted.batchInfo.streamIdToInputInfo + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { + val elements = batchCompleted.batchInfo.streamIdToInputInfo - for ( + for { processingEnd <- batchCompleted.batchInfo.processingEndTime; workDelay <- batchCompleted.batchInfo.processingDelay; waitDelay <- batchCompleted.batchInfo.schedulingDelay; elems <- elements.get(streamUID).map(_.numRecords) - ) computeAndPublish(processingEnd, elems, workDelay, waitDelay) + } computeAndPublish(processingEnd, elems, workDelay, waitDelay) } - } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala index 1e1ccf135ad70..d44bedf5f7d91 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala @@ -31,16 +31,27 @@ private[streaming] trait RateEstimator extends Serializable { * Computes the number of elements the stream attached to this `RateEstimator` * should ingest per second, given an update on the size and completion * times of the latest batch. + * + * @param time The timetamp of the current batch interval that just finished + * @param elements The number of elements that were processed in this batch + * @param processingDelay The time in ms that took for the job to complete + * @param schedulingDelay The time in ms that the job spent in the scheduling queue */ - def compute(time: Long, elements: Long, - processingDelay: Long, schedulingDelay: Long): Option[Double] + def compute( + time: Long, + elements: Long, + processingDelay: Long, + schedulingDelay: Long): Option[Double] } /** - * The trivial rate estimator never sends an update + * The trivial rate estimator never sends an update */ private[streaming] class NoopRateEstimator extends RateEstimator { - def compute(time: Long, elements: Long, - processingDelay: Long, schedulingDelay: Long): Option[Double] = None + def compute( + time: Long, + elements: Long, + processingDelay: Long, + schedulingDelay: Long): Option[Double] = None } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala new file mode 100644 index 0000000000000..934c2a6102bef --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler + +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.{StreamingContext, TestOutputStreamWithPartitions, TestSuiteBase, Time} +import org.apache.spark.streaming.dstream.InputDStream +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +class RateControllerSuite extends TestSuiteBase { + + test("rate controller publishes updates") { + val ssc = new StreamingContext(conf, batchDuration) + val dstream = new MockRateLimitDStream(ssc) + val output = new TestOutputStreamWithPartitions(dstream) + output.register() + runStreams(ssc, 1, 1) + + eventually(timeout(2.seconds)) { + assert(dstream.publishCalls === 1) + } + } +} + +/** + * An InputDStream that counts how often its rate controller `publish` method was called. + */ +private class MockRateLimitDStream(@transient ssc: StreamingContext) + extends InputDStream[Int](ssc) { + + @volatile + var publishCalls = 0 + + private object ConstantEstimator extends RateEstimator { + def compute( + time: Long, + elements: Long, + processingDelay: Long, + schedulingDelay: Long): Option[Double] = { + Some(100.0) + } + } + + override val rateController: RateController = new RateController(id, ConstantEstimator) { + override def publish(rate: Long): Unit = { + publishCalls += 1 + } + } + + def compute(validTime: Time): Option[RDD[Int]] = { + val data = Seq(1) + ssc.scheduler.inputInfoTracker.reportInfo(validTime, StreamInputInfo(id, data.size)) + Some(ssc.sc.parallelize(data)) + } + + def stop(): Unit = {} + + def start(): Unit = {} +} From b425d32808c5325b8e1bc73159a08b47b8c5a03a Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Thu, 23 Jul 2015 16:30:49 +0200 Subject: [PATCH 11/17] Removed DeveloperAPI, removed rateEstimator field, removed Noop rate estimator, changed logic for initialising rate estimator. --- .../streaming/dstream/InputDStream.scala | 29 +++++-------------- .../dstream/ReceiverInputDStream.scala | 15 ++++++---- .../streaming/scheduler/JobScheduler.scala | 6 +++- .../streaming/scheduler/RateController.scala | 3 -- .../scheduler/rate/RateEstimator.scala | 26 +++++++++-------- .../scheduler/RateControllerSuite.scala | 11 +++---- 6 files changed, 42 insertions(+), 48 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index 2b2389624610d..8aad39dacca3f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -21,9 +21,9 @@ import scala.reflect.ClassTag import org.apache.spark.SparkContext import org.apache.spark.rdd.RDDOperationScope -import org.apache.spark.streaming.{Time, Duration, StreamingContext} +import org.apache.spark.streaming.{Duration, StreamingContext, Time} import org.apache.spark.streaming.scheduler.RateController -import org.apache.spark.streaming.scheduler.rate.NoopRateEstimator +import org.apache.spark.streaming.scheduler.rate.RateEstimator import org.apache.spark.util.Utils /** @@ -49,26 +49,13 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) /** This is an unique identifier for the input stream. */ val id = ssc.getNewInputStreamId() - /** - * A rate estimator configured by the user to compute a dynamic ingestion bound for this stream. - * @see `RateEstimator` - */ - protected [streaming] val rateEstimator = newEstimator() - - /** - * Return the configured estimator, or `noop` if none was specified. - */ - private def newEstimator() = - ssc.conf.get("spark.streaming.RateEstimator", "noop") match { - case "noop" => new NoopRateEstimator() - case estimator => throw new IllegalArgumentException(s"Unknown rate estimator: $estimator") - } - - // Keep track of the freshest rate for this stream using the rateEstimator - protected[streaming] val rateController: RateController = new RateController(id, rateEstimator) { - override def publish(rate: Long): Unit = () - } + protected[streaming] val rateController: Option[RateController] = + RateEstimator.makeEstimator(ssc.conf).map { estimator => + new RateController(id, estimator) { + override def publish(rate: Long): Unit = () + } + } /** A human-readable name of this InputDStream */ private[streaming] def name: String = { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 606aaa51367f1..ff99930bf00e6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -21,11 +21,11 @@ import scala.reflect.ClassTag import org.apache.spark.rdd.{BlockRDD, RDD} import org.apache.spark.storage.BlockId -import org.apache.spark.streaming._ +import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} -import org.apache.spark.streaming.scheduler.rate.NoopRateEstimator +import org.apache.spark.streaming.scheduler.rate.RateEstimator import org.apache.spark.streaming.util.WriteAheadLogUtils /** @@ -44,10 +44,13 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont /** * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker. */ - override val rateController: RateController = new RateController(id, rateEstimator) { - override def publish(rate: Long): Unit = - ssc.scheduler.receiverTracker.sendRateUpdate(id, rate) - } + override protected[streaming] val rateController: Option[RateController] = + RateEstimator.makeEstimator(ssc.conf).map { estimator => + new RateController(id, estimator) { + override def publish(rate: Long): Unit = + ssc.scheduler.receiverTracker.sendRateUpdate(id, rate) + } + } /** * Gets the receiver object that will be sent to the worker nodes diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 006abccdeae0e..4aed1aa1d92d2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -67,7 +67,11 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { eventLoop.start() // Estimators receive updates from batch completion - ssc.graph.getInputStreams.foreach(is => ssc.addStreamingListener(is.rateController)) + for { + inputDStream <- ssc.graph.getInputStreams + rateController <- inputDStream.rateController + } ssc.addStreamingListener(rateController) + listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala index 8fedf571d4c59..0fea6838da032 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala @@ -21,17 +21,14 @@ import java.util.concurrent.atomic.AtomicLong import scala.concurrent.{ExecutionContext, Future} -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.streaming.scheduler.rate.RateEstimator import org.apache.spark.util.ThreadUtils /** - * :: DeveloperApi :: * A StreamingListener that receives batch completion updates, and maintains * an estimate of the speed at which this stream should ingest messages, * given an estimate computation from a `RateEstimator` */ -@DeveloperApi private [streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator) extends StreamingListener with Serializable { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala index d44bedf5f7d91..592d173e99bdc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala @@ -17,14 +17,13 @@ package org.apache.spark.streaming.scheduler.rate -import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.SparkConf +import org.apache.spark.SparkException /** - * :: DeveloperApi :: * A component that estimates the rate at wich an InputDStream should ingest * elements, based on updates at every batch completion. */ -@DeveloperApi private[streaming] trait RateEstimator extends Serializable { /** @@ -44,14 +43,17 @@ private[streaming] trait RateEstimator extends Serializable { schedulingDelay: Long): Option[Double] } -/** - * The trivial rate estimator never sends an update - */ -private[streaming] class NoopRateEstimator extends RateEstimator { +object RateEstimator { - def compute( - time: Long, - elements: Long, - processingDelay: Long, - schedulingDelay: Long): Option[Double] = None + /** + * Return a new RateEstimator based on the value of `spark.streaming.RateEstimator`. + * + * @return None if there is no configured estimator, otherwise an instance of RateEstimator + * @throws IllegalArgumentException if there is a configured RateEstimator that doesn't match any + * known estimators. + */ + def makeEstimator(conf: SparkConf): Option[RateEstimator] = + conf.getOption("spark.streaming.RateEstimator") map { estimator => + throw new IllegalArgumentException(s"Unkown rate estimator: $estimator") + } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala index 934c2a6102bef..8466e1878b7f7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala @@ -60,11 +60,12 @@ private class MockRateLimitDStream(@transient ssc: StreamingContext) } } - override val rateController: RateController = new RateController(id, ConstantEstimator) { - override def publish(rate: Long): Unit = { - publishCalls += 1 - } - } + override val rateController: Option[RateController] = + Some(new RateController(id, ConstantEstimator) { + override def publish(rate: Long): Unit = { + publishCalls += 1 + } + }) def compute(validTime: Time): Option[RDD[Int]] = { val data = Seq(1) From e57c66b945a9cd3364f75c8ff7c1855031059dd4 Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Thu, 23 Jul 2015 18:36:22 +0200 Subject: [PATCH 12/17] Added a couple of tests for the full scenario from driver to receivers, with several rate updates. --- .../dstream/ReceiverInputDStream.scala | 17 ++- .../scheduler/RateControllerSuite.scala | 113 ++++++++++++++---- .../scheduler/ReceiverTrackerSuite.scala | 23 ++-- 3 files changed, 114 insertions(+), 39 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index ff99930bf00e6..5b4df67d9ce11 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -45,12 +45,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker. */ override protected[streaming] val rateController: Option[RateController] = - RateEstimator.makeEstimator(ssc.conf).map { estimator => - new RateController(id, estimator) { - override def publish(rate: Long): Unit = - ssc.scheduler.receiverTracker.sendRateUpdate(id, rate) - } - } + RateEstimator.makeEstimator(ssc.conf).map { new ReceiverRateController(id, _) } /** * Gets the receiver object that will be sent to the worker nodes @@ -122,4 +117,14 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont } Some(blockRDD) } + + /** + * A RateController that sends the new rate to receivers, via the receiver tracker. + */ + private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator) + extends RateController(id, estimator) { + override def publish(rate: Long): Unit = + ssc.scheduler.receiverTracker.sendRateUpdate(id, rate) + } } + diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala index 8466e1878b7f7..c6258a32c6cae 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala @@ -17,20 +17,26 @@ package org.apache.spark.streaming.scheduler +import scala.collection.mutable +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import org.scalatest.Matchers._ import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.{StreamingContext, TestOutputStreamWithPartitions, TestSuiteBase, Time} -import org.apache.spark.streaming.dstream.InputDStream +import org.apache.spark.streaming._ import org.apache.spark.streaming.scheduler.rate.RateEstimator + + class RateControllerSuite extends TestSuiteBase { + override def actuallyWait: Boolean = true + test("rate controller publishes updates") { val ssc = new StreamingContext(conf, batchDuration) - val dstream = new MockRateLimitDStream(ssc) + val dstream = new MockRateLimitDStream(ssc, Seq(Seq(1)), 1) val output = new TestOutputStreamWithPartitions(dstream) output.register() runStreams(ssc, 1, 1) @@ -39,41 +45,98 @@ class RateControllerSuite extends TestSuiteBase { assert(dstream.publishCalls === 1) } } + + test("receiver rate controller updates reach receivers") { + val ssc = new StreamingContext(conf, batchDuration) + + val dstream = new RateLimitInputDStream(ssc) { + override val rateController = + Some(new ReceiverRateController(id, new ConstantEstimator(200.0))) + } + SingletonDummyReceiver.reset() + + val output = new TestOutputStreamWithPartitions(dstream) + output.register() + runStreams(ssc, 2, 2) + + eventually(timeout(5.seconds)) { + assert(dstream.getCurrentRateLimit === Some(200)) + } + } + + test("multiple rate controller updates reach receivers") { + val ssc = new StreamingContext(conf, batchDuration) + val rates = Seq(100L, 200L, 300L) + + val dstream = new RateLimitInputDStream(ssc) { + override val rateController = + Some(new ReceiverRateController(id, new ConstantEstimator(rates.map(_.toDouble): _*))) + } + SingletonDummyReceiver.reset() + + val output = new TestOutputStreamWithPartitions(dstream) + output.register() + + val observedRates = mutable.HashSet.empty[Long] + + @volatile var done = false + runInBackground { + while (!done) { + try { + dstream.getCurrentRateLimit.foreach(observedRates += _) + } catch { + case NonFatal(_) => () // don't stop if the executor wasn't installed yet + } + Thread.sleep(20) + } + } + runStreams(ssc, 4, 4) + done = true + + // Long.MaxValue (essentially, no rate limit) is the initial rate limit for any Receiver + observedRates should contain theSameElementsAs (rates :+ Long.MaxValue) + } + + private def runInBackground(f: => Unit): Unit = { + new Thread { + override def run(): Unit = { + f + } + }.start() + } } /** * An InputDStream that counts how often its rate controller `publish` method was called. */ -private class MockRateLimitDStream(@transient ssc: StreamingContext) - extends InputDStream[Int](ssc) { +private class MockRateLimitDStream[T: ClassTag]( + @transient ssc: StreamingContext, + input: Seq[Seq[T]], + numPartitions: Int) extends TestInputStream[T](ssc, input, numPartitions) { @volatile var publishCalls = 0 - private object ConstantEstimator extends RateEstimator { - def compute( - time: Long, - elements: Long, - processingDelay: Long, - schedulingDelay: Long): Option[Double] = { - Some(100.0) - } - } - override val rateController: Option[RateController] = - Some(new RateController(id, ConstantEstimator) { + Some(new RateController(id, new ConstantEstimator(100.0)) { override def publish(rate: Long): Unit = { publishCalls += 1 } }) +} - def compute(validTime: Time): Option[RDD[Int]] = { - val data = Seq(1) - ssc.scheduler.inputInfoTracker.reportInfo(validTime, StreamInputInfo(id, data.size)) - Some(ssc.sc.parallelize(data)) - } +private class ConstantEstimator(rates: Double*) extends RateEstimator { + private var idx: Int = 0 - def stop(): Unit = {} + private def nextRate(): Double = { + val rate = rates(idx) + idx = (idx + 1) % rates.size + rate + } - def start(): Unit = {} + def compute( + time: Long, + elements: Long, + processingDelay: Long, + schedulingDelay: Long): Option[Double] = Some(nextRate()) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index aadb7231757b8..8457aa78de5f9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -18,16 +18,15 @@ package org.apache.spark.streaming.scheduler import org.scalatest.concurrent.Eventually._ -import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ -import org.apache.spark.streaming._ + import org.apache.spark.SparkConf +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.receiver._ -import org.apache.spark.util.Utils -import org.apache.spark.streaming.dstream.InputDStream -import scala.reflect.ClassTag +import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestSuiteBase} import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.receiver.{Receiver, ReceiverSupervisor} + /** Testsuite for receiver scheduling */ class ReceiverTrackerSuite extends TestSuiteBase { @@ -129,12 +128,20 @@ private class RateLimitInputDStream(@transient ssc_ : StreamingContext) } /** - * A Receiver as an object so we can read its rate limit. + * A Receiver as an object so we can read its rate limit. Make sure to call `reset()` when + * reusing this receiver, otherwise a non-null `executor_` field will prevent it from being + * serialized when receivers are installed on executors. * * @note It's necessary to be a top-level object, or else serialization would create another * one on the executor side and we won't be able to read its rate limit. */ -private object SingletonDummyReceiver extends DummyReceiver +private object SingletonDummyReceiver extends DummyReceiver { + + /** Reset the object to be usable in another test. */ + def reset(): Unit = { + executor_ = null + } +} /** * Dummy receiver implementation From 715437af0ddb00c90739f818aab2b56934084b93 Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Fri, 24 Jul 2015 16:26:09 +0200 Subject: [PATCH 13/17] Review comments and added a `reset` call in ReceiverTrackerTest. --- .../apache/spark/streaming/dstream/InputDStream.scala | 7 +------ .../spark/streaming/dstream/ReceiverInputDStream.scala | 2 +- .../spark/streaming/scheduler/JobScheduler.scala | 2 +- .../spark/streaming/scheduler/RateController.scala | 10 ++++++---- .../spark/streaming/scheduler/rate/RateEstimator.scala | 4 ++-- .../streaming/scheduler/ReceiverTrackerSuite.scala | 10 +++++++++- 6 files changed, 20 insertions(+), 15 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index 8aad39dacca3f..a6c4cd220e42f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -50,12 +50,7 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) val id = ssc.getNewInputStreamId() // Keep track of the freshest rate for this stream using the rateEstimator - protected[streaming] val rateController: Option[RateController] = - RateEstimator.makeEstimator(ssc.conf).map { estimator => - new RateController(id, estimator) { - override def publish(rate: Long): Unit = () - } - } + protected[streaming] val rateController: Option[RateController] = None /** A human-readable name of this InputDStream */ private[streaming] def name: String = { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 5b4df67d9ce11..e79ba5018d9fd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -45,7 +45,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker. */ override protected[streaming] val rateController: Option[RateController] = - RateEstimator.makeEstimator(ssc.conf).map { new ReceiverRateController(id, _) } + RateEstimator.create(ssc.conf).map { new ReceiverRateController(id, _) } /** * Gets the receiver object that will be sent to the worker nodes diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 4aed1aa1d92d2..58bdda7794bf2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -66,7 +66,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } eventLoop.start() - // Estimators receive updates from batch completion + // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala index 0fea6838da032..f1e75da1644f3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala @@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicLong import scala.concurrent.{ExecutionContext, Future} +import org.apache.spark.SparkConf +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.streaming.scheduler.rate.RateEstimator import org.apache.spark.util.ThreadUtils @@ -29,8 +31,8 @@ import org.apache.spark.util.ThreadUtils * an estimate of the speed at which this stream should ingest messages, * given an estimate computation from a `RateEstimator` */ -private [streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator) - extends StreamingListener with Serializable { +private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator) + extends StreamingListener with Serializable { protected def publish(rate: Long): Unit @@ -46,8 +48,8 @@ private [streaming] abstract class RateController(val streamUID: Int, rateEstima */ private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit = Future[Unit] { - val newSpeed = rateEstimator.compute(time, elems, workDelay, waitDelay) - newSpeed foreach { s => + val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay) + newRate.foreach { s => rateLimit.set(s.toLong) publish(getLatestRate()) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala index 592d173e99bdc..a08685119e5d5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala @@ -52,8 +52,8 @@ object RateEstimator { * @throws IllegalArgumentException if there is a configured RateEstimator that doesn't match any * known estimators. */ - def makeEstimator(conf: SparkConf): Option[RateEstimator] = - conf.getOption("spark.streaming.RateEstimator") map { estimator => + def create(conf: SparkConf): Option[RateEstimator] = + conf.getOption("spark.streaming.backpressure.rateEstimator").map { estimator => throw new IllegalArgumentException(s"Unkown rate estimator: $estimator") } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 8457aa78de5f9..3136cba8b4f63 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -90,6 +90,7 @@ class ReceiverTrackerSuite extends TestSuiteBase { ssc.addStreamingListener(ReceiverStartedWaiter) ssc.scheduler.listenerBus.start(ssc.sc) + SingletonDummyReceiver.reset() val newRateLimit = 100L val inputDStream = new RateLimitInputDStream(ssc) @@ -109,7 +110,14 @@ class ReceiverTrackerSuite extends TestSuiteBase { } } -/** An input DStream with a hard-coded receiver that gives access to internals for testing. */ +/** + * An input DStream with a hard-coded receiver that gives access to internals for testing. + * + * @note Make sure to call {{{SingletonDummyReceiver.reset()}}} before using this in a test, + * or otherwise you may get {{{NotSerializableException}}} when trying to serialize + * the receiver. + * @see [[[SingletonDummyReceiver]]]. + */ private class RateLimitInputDStream(@transient ssc_ : StreamingContext) extends ReceiverInputDStream[Int](ssc_) { From e9fb45ebfce956005ca2830bc6fbc100f090ed53 Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Fri, 24 Jul 2015 17:34:08 +0200 Subject: [PATCH 14/17] - Add a test for checkpointing - fixed serialization for RateController.executionContext --- .../streaming/scheduler/RateController.scala | 27 ++++++++++++++---- .../spark/streaming/CheckpointSuite.scala | 28 +++++++++++++++++++ .../scheduler/RateControllerSuite.scala | 4 +-- .../scheduler/ReceiverTrackerSuite.scala | 6 ++-- 4 files changed, 53 insertions(+), 12 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala index f1e75da1644f3..d0b5923b617ec 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala @@ -17,14 +17,14 @@ package org.apache.spark.streaming.scheduler +import java.io.ObjectInputStream import java.util.concurrent.atomic.AtomicLong import scala.concurrent.{ExecutionContext, Future} -import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi import org.apache.spark.streaming.scheduler.rate.RateEstimator -import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.{ThreadUtils, Utils} /** * A StreamingListener that receives batch completion updates, and maintains @@ -34,14 +34,29 @@ import org.apache.spark.util.ThreadUtils private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator) extends StreamingListener with Serializable { + init() + protected def publish(rate: Long): Unit - // Used to compute & publish the rate update asynchronously @transient - implicit private val executionContext = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonSingleThreadExecutor("stream-rate-update")) + implicit private var executionContext: ExecutionContext = _ + + @transient + private var rateLimit: AtomicLong = _ + + /** + * An initialization method called both from the constructor and Serialization code. + */ + private def init() { + executionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonSingleThreadExecutor("stream-rate-update")) + rateLimit = new AtomicLong(-1L) + } - private val rateLimit: AtomicLong = new AtomicLong(-1L) + private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { + ois.defaultReadObject() + init() + } /** * Compute the new rate limit and publish it asynchronously. diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index d308ac05a54fe..5a524751fa0dc 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -30,8 +30,10 @@ import org.apache.hadoop.io.{IntWritable, Text} import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.SpanSugar._ import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} +import org.apache.spark.streaming.scheduler.{RateLimitInputDStream, ConstantEstimator, SingletonDummyReceiver} import org.apache.spark.util.{Clock, ManualClock, Utils} /** @@ -391,6 +393,32 @@ class CheckpointSuite extends TestSuiteBase { testCheckpointedOperation(input, operation, output, 7) } + test("recovery maintains rate controller") { + ssc = new StreamingContext(conf, batchDuration) + ssc.checkpoint(checkpointDir) + + val dstream = new RateLimitInputDStream(ssc) { + override val rateController = + Some(new ReceiverRateController(id, new ConstantEstimator(200.0))) + } + SingletonDummyReceiver.reset() + + val output = new TestOutputStreamWithPartitions(dstream.checkpoint(batchDuration * 2)) + output.register() + runStreams(ssc, 5, 5) + + SingletonDummyReceiver.reset() + ssc = new StreamingContext(checkpointDir) + ssc.start() + val outputNew = advanceTimeWithRealDelay(ssc, 2) + + eventually(timeout(5.seconds)) { + assert(dstream.getCurrentRateLimit === Some(200)) + } + ssc.stop() + ssc = null + } + // This tests whether file input stream remembers what files were seen before // the master failure and uses them again to process a large window operation. // It also tests whether batches, whose processing was incomplete due to the diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala index c6258a32c6cae..2a7a9ee8fd81b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala @@ -28,8 +28,6 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.streaming._ import org.apache.spark.streaming.scheduler.rate.RateEstimator - - class RateControllerSuite extends TestSuiteBase { override def actuallyWait: Boolean = true @@ -125,7 +123,7 @@ private class MockRateLimitDStream[T: ClassTag]( }) } -private class ConstantEstimator(rates: Double*) extends RateEstimator { +private[streaming] class ConstantEstimator(rates: Double*) extends RateEstimator { private var idx: Int = 0 private def nextRate(): Double = { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 3136cba8b4f63..9f10f8c77d333 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -118,7 +118,7 @@ class ReceiverTrackerSuite extends TestSuiteBase { * the receiver. * @see [[[SingletonDummyReceiver]]]. */ -private class RateLimitInputDStream(@transient ssc_ : StreamingContext) +private[streaming] class RateLimitInputDStream(@transient ssc_ : StreamingContext) extends ReceiverInputDStream[Int](ssc_) { override def getReceiver(): DummyReceiver = SingletonDummyReceiver @@ -143,7 +143,7 @@ private class RateLimitInputDStream(@transient ssc_ : StreamingContext) * @note It's necessary to be a top-level object, or else serialization would create another * one on the executor side and we won't be able to read its rate limit. */ -private object SingletonDummyReceiver extends DummyReceiver { +private[streaming] object SingletonDummyReceiver extends DummyReceiver { /** Reset the object to be usable in another test. */ def reset(): Unit = { @@ -154,7 +154,7 @@ private object SingletonDummyReceiver extends DummyReceiver { /** * Dummy receiver implementation */ -private class DummyReceiver(host: Option[String] = None) +private[streaming] class DummyReceiver(host: Option[String] = None) extends Receiver[Int](StorageLevel.MEMORY_ONLY) { def onStart() { From 475e3464b073c309f08812694d739495c6bcaad8 Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Tue, 28 Jul 2015 11:47:19 +0200 Subject: [PATCH 15/17] Latest round of reviews. --- .../dstream/ReceiverInputDStream.scala | 5 +- .../streaming/scheduler/RateController.scala | 7 +- .../scheduler/RateControllerSuite.scala | 91 ++++++++++--------- 3 files changed, 58 insertions(+), 45 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index e79ba5018d9fd..405554d6db051 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -45,7 +45,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker. */ override protected[streaming] val rateController: Option[RateController] = - RateEstimator.create(ssc.conf).map { new ReceiverRateController(id, _) } + if (RateController.isBackPressureEnabled(ssc.conf)) + RateEstimator.create(ssc.conf).map { new ReceiverRateController(id, _) } + else + None /** * Gets the receiver object that will be sent to the worker nodes diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala index d0b5923b617ec..882ca0676b6ad 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.concurrent.{ExecutionContext, Future} -import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.SparkConf import org.apache.spark.streaming.scheduler.rate.RateEstimator import org.apache.spark.util.{ThreadUtils, Utils} @@ -83,3 +83,8 @@ private[streaming] abstract class RateController(val streamUID: Int, rateEstimat } computeAndPublish(processingEnd, elems, workDelay, waitDelay) } } + +object RateController { + def isBackPressureEnabled(conf: SparkConf): Boolean = + conf.getBoolean("spark.streaming.backpressure.enable", false) +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala index 2a7a9ee8fd81b..f25ac5f8ddd41 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala @@ -34,65 +34,70 @@ class RateControllerSuite extends TestSuiteBase { test("rate controller publishes updates") { val ssc = new StreamingContext(conf, batchDuration) - val dstream = new MockRateLimitDStream(ssc, Seq(Seq(1)), 1) - val output = new TestOutputStreamWithPartitions(dstream) - output.register() - runStreams(ssc, 1, 1) - - eventually(timeout(2.seconds)) { - assert(dstream.publishCalls === 1) + withStreamingContext(ssc) { ssc => + val dstream = new MockRateLimitDStream(ssc, Seq(Seq(1)), 1) + val output = new TestOutputStreamWithPartitions(dstream) + output.register() + runStreams(ssc, 1, 1) + + eventually(timeout(2.seconds)) { + assert(dstream.publishCalls === 1) + } } } test("receiver rate controller updates reach receivers") { val ssc = new StreamingContext(conf, batchDuration) + withStreamingContext(ssc) { ssc => + val dstream = new RateLimitInputDStream(ssc) { + override val rateController = + Some(new ReceiverRateController(id, new ConstantEstimator(200.0))) + } + SingletonDummyReceiver.reset() - val dstream = new RateLimitInputDStream(ssc) { - override val rateController = - Some(new ReceiverRateController(id, new ConstantEstimator(200.0))) - } - SingletonDummyReceiver.reset() - - val output = new TestOutputStreamWithPartitions(dstream) - output.register() - runStreams(ssc, 2, 2) + val output = new TestOutputStreamWithPartitions(dstream) + output.register() + runStreams(ssc, 2, 2) - eventually(timeout(5.seconds)) { - assert(dstream.getCurrentRateLimit === Some(200)) + eventually(timeout(5.seconds)) { + assert(dstream.getCurrentRateLimit === Some(200)) + } } } test("multiple rate controller updates reach receivers") { val ssc = new StreamingContext(conf, batchDuration) - val rates = Seq(100L, 200L, 300L) + withStreamingContext(ssc) { ssc => + val rates = Seq(100L, 200L, 300L) - val dstream = new RateLimitInputDStream(ssc) { - override val rateController = - Some(new ReceiverRateController(id, new ConstantEstimator(rates.map(_.toDouble): _*))) - } - SingletonDummyReceiver.reset() - - val output = new TestOutputStreamWithPartitions(dstream) - output.register() - - val observedRates = mutable.HashSet.empty[Long] - - @volatile var done = false - runInBackground { - while (!done) { - try { - dstream.getCurrentRateLimit.foreach(observedRates += _) - } catch { - case NonFatal(_) => () // don't stop if the executor wasn't installed yet + val dstream = new RateLimitInputDStream(ssc) { + override val rateController = + Some(new ReceiverRateController(id, new ConstantEstimator(rates.map(_.toDouble): _*))) + } + SingletonDummyReceiver.reset() + + val output = new TestOutputStreamWithPartitions(dstream) + output.register() + + val observedRates = mutable.HashSet.empty[Long] + + @volatile var done = false + runInBackground { + while (!done) { + try { + dstream.getCurrentRateLimit.foreach(observedRates += _) + } catch { + case NonFatal(_) => () // don't stop if the executor wasn't installed yet + } + Thread.sleep(20) } - Thread.sleep(20) } - } - runStreams(ssc, 4, 4) - done = true + runStreams(ssc, 4, 4) + done = true - // Long.MaxValue (essentially, no rate limit) is the initial rate limit for any Receiver - observedRates should contain theSameElementsAs (rates :+ Long.MaxValue) + // Long.MaxValue (essentially, no rate limit) is the initial rate limit for any Receiver + observedRates should contain theSameElementsAs (rates :+ Long.MaxValue) + } } private def runInBackground(f: => Unit): Unit = { From 5125e60eb89de0ed5f1c89e4e19985dbf0000989 Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Tue, 28 Jul 2015 12:06:34 +0200 Subject: [PATCH 16/17] Fix style. --- .../spark/streaming/dstream/ReceiverInputDStream.scala | 8 +++++--- .../spark/streaming/scheduler/RateControllerSuite.scala | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 405554d6db051..646a8c3530a62 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -44,11 +44,13 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont /** * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker. */ - override protected[streaming] val rateController: Option[RateController] = - if (RateController.isBackPressureEnabled(ssc.conf)) + override protected[streaming] val rateController: Option[RateController] = { + if (RateController.isBackPressureEnabled(ssc.conf)) { RateEstimator.create(ssc.conf).map { new ReceiverRateController(id, _) } - else + } else { None + } + } /** * Gets the receiver object that will be sent to the worker nodes diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala index f25ac5f8ddd41..838a4c099ee64 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala @@ -46,7 +46,7 @@ class RateControllerSuite extends TestSuiteBase { } } - test("receiver rate controller updates reach receivers") { + test("publish rates reach receivers") { val ssc = new StreamingContext(conf, batchDuration) withStreamingContext(ssc) { ssc => val dstream = new RateLimitInputDStream(ssc) { @@ -65,7 +65,7 @@ class RateControllerSuite extends TestSuiteBase { } } - test("multiple rate controller updates reach receivers") { + test("multiple publish rates reach receivers") { val ssc = new StreamingContext(conf, batchDuration) withStreamingContext(ssc) { ssc => val rates = Seq(100L, 200L, 300L) From f168c9476fc7104d3d3f92702793e0d9116117d0 Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Wed, 29 Jul 2015 11:59:49 +0200 Subject: [PATCH 17/17] Latest review round. --- .../spark/streaming/CheckpointSuite.scala | 6 +- .../scheduler/RateControllerSuite.scala | 76 +++++-------------- .../ReceiverSchedulingPolicySuite.scala | 10 +-- .../scheduler/ReceiverTrackerSuite.scala | 21 +++-- 4 files changed, 41 insertions(+), 72 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 5a524751fa0dc..67c2d900940ab 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -33,7 +33,7 @@ import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} -import org.apache.spark.streaming.scheduler.{RateLimitInputDStream, ConstantEstimator, SingletonDummyReceiver} +import org.apache.spark.streaming.scheduler.{RateLimitInputDStream, ConstantEstimator, SingletonTestRateReceiver} import org.apache.spark.util.{Clock, ManualClock, Utils} /** @@ -401,13 +401,13 @@ class CheckpointSuite extends TestSuiteBase { override val rateController = Some(new ReceiverRateController(id, new ConstantEstimator(200.0))) } - SingletonDummyReceiver.reset() + SingletonTestRateReceiver.reset() val output = new TestOutputStreamWithPartitions(dstream.checkpoint(batchDuration * 2)) output.register() runStreams(ssc, 5, 5) - SingletonDummyReceiver.reset() + SingletonTestRateReceiver.reset() ssc = new StreamingContext(checkpointDir) ssc.start() val outputNew = advanceTimeWithRealDelay(ssc, 2) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala index 838a4c099ee64..921da773f6c11 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala @@ -30,18 +30,17 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator class RateControllerSuite extends TestSuiteBase { - override def actuallyWait: Boolean = true + override def useManualClock: Boolean = false test("rate controller publishes updates") { val ssc = new StreamingContext(conf, batchDuration) withStreamingContext(ssc) { ssc => - val dstream = new MockRateLimitDStream(ssc, Seq(Seq(1)), 1) - val output = new TestOutputStreamWithPartitions(dstream) - output.register() - runStreams(ssc, 1, 1) + val dstream = new RateLimitInputDStream(ssc) + dstream.register() + ssc.start() - eventually(timeout(2.seconds)) { - assert(dstream.publishCalls === 1) + eventually(timeout(10.seconds)) { + assert(dstream.publishCalls > 0) } } } @@ -53,13 +52,11 @@ class RateControllerSuite extends TestSuiteBase { override val rateController = Some(new ReceiverRateController(id, new ConstantEstimator(200.0))) } - SingletonDummyReceiver.reset() + dstream.register() + SingletonTestRateReceiver.reset() + ssc.start() - val output = new TestOutputStreamWithPartitions(dstream) - output.register() - runStreams(ssc, 2, 2) - - eventually(timeout(5.seconds)) { + eventually(timeout(10.seconds)) { assert(dstream.getCurrentRateLimit === Some(200)) } } @@ -74,58 +71,19 @@ class RateControllerSuite extends TestSuiteBase { override val rateController = Some(new ReceiverRateController(id, new ConstantEstimator(rates.map(_.toDouble): _*))) } - SingletonDummyReceiver.reset() - - val output = new TestOutputStreamWithPartitions(dstream) - output.register() + SingletonTestRateReceiver.reset() + dstream.register() val observedRates = mutable.HashSet.empty[Long] + ssc.start() - @volatile var done = false - runInBackground { - while (!done) { - try { - dstream.getCurrentRateLimit.foreach(observedRates += _) - } catch { - case NonFatal(_) => () // don't stop if the executor wasn't installed yet - } - Thread.sleep(20) - } + eventually(timeout(20.seconds)) { + dstream.getCurrentRateLimit.foreach(observedRates += _) + // Long.MaxValue (essentially, no rate limit) is the initial rate limit for any Receiver + observedRates should contain theSameElementsAs (rates :+ Long.MaxValue) } - runStreams(ssc, 4, 4) - done = true - - // Long.MaxValue (essentially, no rate limit) is the initial rate limit for any Receiver - observedRates should contain theSameElementsAs (rates :+ Long.MaxValue) } } - - private def runInBackground(f: => Unit): Unit = { - new Thread { - override def run(): Unit = { - f - } - }.start() - } -} - -/** - * An InputDStream that counts how often its rate controller `publish` method was called. - */ -private class MockRateLimitDStream[T: ClassTag]( - @transient ssc: StreamingContext, - input: Seq[Seq[T]], - numPartitions: Int) extends TestInputStream[T](ssc, input, numPartitions) { - - @volatile - var publishCalls = 0 - - override val rateController: Option[RateController] = - Some(new RateController(id, new ConstantEstimator(100.0)) { - override def publish(rate: Long): Unit = { - publishCalls += 1 - } - }) } private[streaming] class ConstantEstimator(rates: Double*) extends RateEstimator { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala index 93f920fdc71f1..0418d776ecc9a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala @@ -64,7 +64,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite { test("scheduleReceivers: " + "schedule receivers evenly when there are more receivers than executors") { - val receivers = (0 until 6).map(new DummyReceiver(_)) + val receivers = (0 until 6).map(new RateTestReceiver(_)) val executors = (10000 until 10003).map(port => s"localhost:${port}") val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors) val numReceiversOnExecutor = mutable.HashMap[String, Int]() @@ -79,7 +79,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite { test("scheduleReceivers: " + "schedule receivers evenly when there are more executors than receivers") { - val receivers = (0 until 3).map(new DummyReceiver(_)) + val receivers = (0 until 3).map(new RateTestReceiver(_)) val executors = (10000 until 10006).map(port => s"localhost:${port}") val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors) val numReceiversOnExecutor = mutable.HashMap[String, Int]() @@ -94,8 +94,8 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite { } test("scheduleReceivers: schedule receivers evenly when the preferredLocations are even") { - val receivers = (0 until 3).map(new DummyReceiver(_)) ++ - (3 until 6).map(new DummyReceiver(_, Some("localhost"))) + val receivers = (0 until 3).map(new RateTestReceiver(_)) ++ + (3 until 6).map(new RateTestReceiver(_, Some("localhost"))) val executors = (10000 until 10003).map(port => s"localhost:${port}") ++ (10003 until 10006).map(port => s"localhost2:${port}") val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors) @@ -121,7 +121,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite { } test("scheduleReceivers: return empty scheduled executors if no executors") { - val receivers = (0 until 3).map(new DummyReceiver(_)) + val receivers = (0 until 3).map(new RateTestReceiver(_)) val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, Seq.empty) scheduledExecutors.foreach { case (receiverId, executors) => assert(executors.isEmpty) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 19092042d25e4..d88df81ac65f5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -43,7 +43,7 @@ class ReceiverTrackerSuite extends TestSuiteBase { ssc.addStreamingListener(ReceiverStartedWaiter) ssc.scheduler.listenerBus.start(ssc.sc) - SingletonDummyReceiver.reset() + SingletonTestRateReceiver.reset() val newRateLimit = 100L val inputDStream = new RateLimitInputDStream(ssc) @@ -74,17 +74,28 @@ class ReceiverTrackerSuite extends TestSuiteBase { private[streaming] class RateLimitInputDStream(@transient ssc_ : StreamingContext) extends ReceiverInputDStream[Int](ssc_) { - override def getReceiver(): DummyReceiver = SingletonDummyReceiver + override def getReceiver(): RateTestReceiver = SingletonTestRateReceiver def getCurrentRateLimit: Option[Long] = { invokeExecutorMethod.getCurrentRateLimit } + @volatile + var publishCalls = 0 + + override val rateController: Option[RateController] = { + Some(new RateController(id, new ConstantEstimator(100.0)) { + override def publish(rate: Long): Unit = { + publishCalls += 1 + } + }) + } + private def invokeExecutorMethod: ReceiverSupervisor = { val c = classOf[Receiver[_]] val ex = c.getDeclaredMethod("executor") ex.setAccessible(true) - ex.invoke(SingletonDummyReceiver).asInstanceOf[ReceiverSupervisor] + ex.invoke(SingletonTestRateReceiver).asInstanceOf[ReceiverSupervisor] } } @@ -96,7 +107,7 @@ private[streaming] class RateLimitInputDStream(@transient ssc_ : StreamingContex * @note It's necessary to be a top-level object, or else serialization would create another * one on the executor side and we won't be able to read its rate limit. */ -private[streaming] object SingletonDummyReceiver extends DummyReceiver(0) { +private[streaming] object SingletonTestRateReceiver extends RateTestReceiver(0) { /** Reset the object to be usable in another test. */ def reset(): Unit = { @@ -107,7 +118,7 @@ private[streaming] object SingletonDummyReceiver extends DummyReceiver(0) { /** * Dummy receiver implementation */ -private[streaming] class DummyReceiver(receiverId: Int, host: Option[String] = None) +private[streaming] class RateTestReceiver(receiverId: Int, host: Option[String] = None) extends Receiver[Int](StorageLevel.MEMORY_ONLY) { setReceiverId(receiverId)