Skip to content

Commit

Permalink
- removed field to hold the current rate limit in rate limiter
Browse files Browse the repository at this point in the history
- made rate limit a Long and default to Long.MaxValue (consequence of the above)
- removed custom `waitUntil` and replaced it by `eventually`
  • Loading branch information
dragos committed Jul 20, 2015
1 parent cd1397d commit 261a051
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,25 @@ import org.apache.spark.{Logging, SparkConf}
private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {

// treated as an upper limit
private val maxRateLimit = conf.getInt("spark.streaming.receiver.maxRate", 0)
private[receiver] var currentRateLimit = new AtomicInteger(maxRateLimit)
private lazy val rateLimiter = GuavaRateLimiter.create(currentRateLimit.get())
private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)
private lazy val rateLimiter = GuavaRateLimiter.create(maxRateLimit.toDouble)

def waitToPush() {
if (currentRateLimit.get() > 0) {
rateLimiter.acquire()
}
rateLimiter.acquire()
}

private[receiver] def updateRate(newRate: Int): Unit =
/**
* Return the current rate limit. If no limit has been set so far, it returns {{{Long.MaxValue}}}.
*/
def getCurrentLimit: Long =
rateLimiter.getRate.toLong

private[receiver] def updateRate(newRate: Long): Unit =
if (newRate > 0) {
try {
if (maxRateLimit > 0) {
currentRateLimit.set(newRate.min(maxRateLimit))
}
else {
currentRateLimit.set(newRate)
}
} finally {
rateLimiter.setRate(currentRateLimit.get())
if (maxRateLimit > 0) {
rateLimiter.setRate(newRate.min(maxRateLimit))
} else {
rateLimiter.setRate(newRate)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
}

/** Get the attached executor. */
private[streaming] def executor = {
private[streaming] def executor: ReceiverSupervisor = {
assert(executor_ != null, "Executor has not been attached to this receiver")
executor_
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private[streaming] abstract class ReceiverSupervisor(
private val defaultRestartDelay = conf.getInt("spark.streaming.receiverRestartDelay", 2000)

/** The current maximum rate limit for this receiver. */
private[streaming] def getCurrentRateLimit: Option[Int] = None
private[streaming] def getCurrentRateLimit: Option[Long] = None

/** Exception associated with the stopping of the receiver */
@volatile protected var stoppingError: Throwable = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ private[streaming] class ReceiverSupervisorImpl(
logDebug("Received delete old batch signal")
cleanupOldBlocks(threshTime)
case UpdateRateLimit(eps) =>
blockGenerator.updateRate(eps.toInt)
logInfo(s"Received a new rate limit: $eps.")
blockGenerator.updateRate(eps)
}
})

Expand All @@ -100,8 +101,8 @@ private[streaming] class ReceiverSupervisorImpl(
}
}, streamId, env.conf)

override private[streaming] def getCurrentRateLimit: Option[Int] =
Some(blockGenerator.currentRateLimit.get)
override private[streaming] def getCurrentRateLimit: Option[Long] =
Some(blockGenerator.getCurrentLimit)

/** Push a single record of received data into block generator. */
def pushSingle(data: Any) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
logError(s"Deregistered receiver for stream $streamId: $messageWithError")
}

/** Update a receiver's maximum rate from an estimator's update */
/** Update a receiver's maximum ingestion rate */
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = {
for (info <- receiverInfo.get(streamUID); eP <- Option(info.endpoint))
eP.send(UpdateRateLimit(newRate))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,19 +537,4 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
verifyOutput[W](output, expectedOutput, useSet)
}
}

/**
* Wait until `cond` becomes true, or timeout ms have passed. This method checks the condition
* every 100ms, so it won't wait more than 100ms more than necessary.
*
* @param cond A boolean that should become `true`
* @param timemout How many millis to wait before giving up
*/
def waitUntil(cond: => Boolean, timeout: Int): Unit = {
val start = System.currentTimeMillis()
val end = start + timeout
while ((System.currentTimeMillis() < end) && !cond) {
Thread.sleep(100)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,20 @@ class RateLimiterSuite extends SparkFunSuite {
val conf = new SparkConf()
val rateLimiter = new RateLimiter(conf){}
rateLimiter.updateRate(105)
assert(rateLimiter.currentRateLimit.get == 105)
assert(rateLimiter.getCurrentLimit == 105)
}

test("rate limiter updates when below maxRate") {
val conf = new SparkConf().set("spark.streaming.receiver.maxRate", "110")
val rateLimiter = new RateLimiter(conf){}
rateLimiter.updateRate(105)
assert(rateLimiter.currentRateLimit.get == 105)
assert(rateLimiter.getCurrentLimit == 105)
}

test("rate limiter stays below maxRate despite large updates") {
val conf = new SparkConf().set("spark.streaming.receiver.maxRate", "100")
val rateLimiter = new RateLimiter(conf){}
rateLimiter.updateRate(105)
assert(rateLimiter.currentRateLimit.get == 100)
assert(rateLimiter.getCurrentLimit === 100)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.streaming.scheduler

import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -77,15 +80,18 @@ class ReceiverTrackerSuite extends TestSuiteBase {
}

test("Receiver tracker - propagates rate limit") {
val newRateLimit = 100
val newRateLimit = 100L
val ids = new TestReceiverInputDStream(ssc)
val tracker = new ReceiverTracker(ssc)
tracker.start()
waitUntil(TestDummyReceiver.started, 5000)
eventually(timeout(5 seconds)) {
assert(TestDummyReceiver.started)
}
tracker.sendRateUpdate(ids.id, newRateLimit)
// this is an async message, we need to wait a bit for it to be processed
waitUntil(ids.getRateLimit.get == newRateLimit, 1000)
assert(ids.getRateLimit.get === newRateLimit)
eventually(timeout(3 seconds)) {
assert(ids.getCurrentRateLimit.get === newRateLimit)
}
}
}

Expand All @@ -95,8 +101,9 @@ private class TestReceiverInputDStream(@transient ssc_ : StreamingContext)

override def getReceiver(): DummyReceiver = TestDummyReceiver

def getRateLimit: Option[Int] =
def getCurrentRateLimit: Option[Long] = {
TestDummyReceiver.executor.getCurrentRateLimit
}
}

/**
Expand Down

0 comments on commit 261a051

Please sign in to comment.