Skip to content

Commit

Permalink
[SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when dere…
Browse files Browse the repository at this point in the history
…gisterReceivering since we may reuse it later

`deregisterReceiver` should not remove `ReceiverTrackingInfo`. Otherwise, it will throw `java.util.NoSuchElementException: key not found` when restarting it.

Author: zsxwing <zsxwing@gmail.com>

Closes #8538 from zsxwing/SPARK-10369.
  • Loading branch information
zsxwing authored and tdas committed Aug 31, 2015
1 parent 72f6dbf commit 4a5fe09
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
ReceiverTrackingInfo(
streamId, ReceiverState.INACTIVE, None, None, None, None, Some(errorInfo))
}
receiverTrackingInfos -= streamId
receiverTrackingInfos(streamId) = newReceiverTrackingInfo
listenerBus.post(StreamingListenerReceiverStopped(newReceiverTrackingInfo.toReceiverInfo))
val messageWithError = if (error != null && !error.isEmpty) {
s"$message - $error"
Expand Down Expand Up @@ -483,7 +483,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
context.reply(true)
// Local messages
case AllReceiverIds =>
context.reply(receiverTrackingInfos.keys.toSeq)
context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)

This comment has been minimized.

Copy link
@jaceklaskowski

jaceklaskowski Sep 1, 2015

Contributor

filterNot perhaps to improve readability?

case StopAllReceivers =>
assert(isTrackerStopping || isTrackerStopped)
stopReceivers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,26 @@ class ReceiverTrackerSuite extends TestSuiteBase {
}
}
}

test("should restart receiver after stopping it") {
withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc =>
@volatile var startTimes = 0
ssc.addStreamingListener(new StreamingListener {
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
startTimes += 1
}
})
val input = ssc.receiverStream(new StoppableReceiver)
val output = new TestOutputStream(input)
output.register()
ssc.start()
StoppableReceiver.shouldStop = true
eventually(timeout(10 seconds), interval(10 millis)) {
// The receiver is stopped once, so if it's restarted, it should be started twice.
assert(startTimes === 2)
}
}
}
}

/** An input DStream with for testing rate controlling */
Expand Down Expand Up @@ -132,3 +152,34 @@ private[streaming] object RateTestReceiver {

def getActive(): Option[RateTestReceiver] = Option(activeReceiver)
}

/**
* A custom receiver that could be stopped via StoppableReceiver.shouldStop
*/
class StoppableReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {

var receivingThreadOption: Option[Thread] = None

def onStart() {
val thread = new Thread() {
override def run() {
while (!StoppableReceiver.shouldStop) {
Thread.sleep(10)
}
StoppableReceiver.this.stop("stop")
}
}
thread.start()
}

def onStop() {
StoppableReceiver.shouldStop = true
receivingThreadOption.foreach(_.join())
// Reset it so as to restart it
StoppableReceiver.shouldStop = false
}
}

object StoppableReceiver {
@volatile var shouldStop = false
}

0 comments on commit 4a5fe09

Please sign in to comment.