Skip to content

Commit

Permalink
Use Reflection for accessing truly private executor method and
Browse files Browse the repository at this point in the history
use the listener bus to know when receivers have registered (`onStart`
is called before receivers have registered, leading to flaky behavior).
  • Loading branch information
dragos committed Jul 21, 2015
1 parent 210f495 commit 162d9e5
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.streaming.receiver

import java.util.concurrent.atomic.AtomicInteger

import com.google.common.util.concurrent.{RateLimiter => GuavaRateLimiter}

import org.apache.spark.{Logging, SparkConf}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
}

/** Get the attached executor. */
private[streaming] def executor: ReceiverSupervisor = {
private def executor: ReceiverSupervisor = {
assert(executor_ != null, "Executor has not been attached to this receiver")
executor_
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false

/** Update a receiver's maximum ingestion rate */
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = {
for (info <- receiverInfo.get(streamUID); eP <- Option(info.endpoint))
for (info <- receiverInfo.get(streamUID); eP <- Option(info.endpoint)) {
eP.send(UpdateRateLimit(newRate))
}
}

/** Add new blocks for the given stream */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,27 @@ class ReceiverTrackerSuite extends TestSuiteBase {
}

test("Receiver tracker - propagates rate limit") {
object streamingListener extends StreamingListener {
@volatile
var started = false

override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
started = true
}
}

ssc.addStreamingListener(streamingListener)
ssc.scheduler.listenerBus.start(ssc.sc)

val newRateLimit = 100L
val ids = new TestReceiverInputDStream(ssc)
val tracker = new ReceiverTracker(ssc)
tracker.start()

// we wait until the Receiver has registered with the tracker,
// otherwise our rate update is lost
eventually(timeout(5 seconds)) {
assert(TestDummyReceiver.started)
assert(streamingListener.started)
}
tracker.sendRateUpdate(ids.id, newRateLimit)
// this is an async message, we need to wait a bit for it to be processed
Expand All @@ -102,7 +117,14 @@ private class TestReceiverInputDStream(@transient ssc_ : StreamingContext)
override def getReceiver(): DummyReceiver = TestDummyReceiver

def getCurrentRateLimit: Option[Long] = {
TestDummyReceiver.executor.getCurrentRateLimit
invokeExecutorMethod.getCurrentRateLimit
}

private def invokeExecutorMethod: ReceiverSupervisor = {
val c = classOf[Receiver[_]]
val ex = c.getDeclaredMethod("executor")
ex.setAccessible(true)
ex.invoke(TestDummyReceiver).asInstanceOf[ReceiverSupervisor]
}
}

Expand All @@ -118,10 +140,7 @@ private object TestDummyReceiver extends DummyReceiver
private class DummyReceiver(host: Option[String] = None)
extends Receiver[Int](StorageLevel.MEMORY_ONLY) {

var started = false

def onStart() {
started = true
}

def onStop() {
Expand Down

0 comments on commit 162d9e5

Please sign in to comment.