Skip to content

Commit

Permalink
Added network receiver information to the Streaming UI.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Mar 31, 2014
1 parent 56cc7fb commit 93f1c69
Show file tree
Hide file tree
Showing 11 changed files with 301 additions and 108 deletions.
Expand Up @@ -351,15 +351,6 @@ abstract class DStream[T: ClassTag] (
dependencies.foreach(_.clearMetadata(time))
}

/* Adds metadata to the Stream while it is running.
* This method should be overwritten by sublcasses of InputDStream.
*/
private[streaming] def addMetadata(metadata: Any) {
if (metadata != null) {
logInfo("Dropping Metadata: " + metadata.toString)
}
}

/**
* Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
* this stream. This is an internal method that should not be called directly. This is
Expand Down
Expand Up @@ -17,23 +17,23 @@

package org.apache.spark.streaming.dstream

import java.util.concurrent.ArrayBlockingQueue
import java.nio.ByteBuffer
import java.util.concurrent.ArrayBlockingQueue

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.reflect.ClassTag

import akka.actor.{Props, Actor}
import akka.actor.{Actor, Props}
import akka.pattern.ask

import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
import org.apache.spark.streaming._
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.rdd.{RDD, BlockRDD}
import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver}
import org.apache.spark.streaming._
import org.apache.spark.streaming.scheduler.{ReceivedBlockInfo, AddBlocks, DeregisterReceiver, RegisterReceiver}
import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}

/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
Expand All @@ -48,8 +48,10 @@ import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, Regi
abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {

// This is an unique identifier that is used to match the network receiver with the
// corresponding network input stream.
/** Keeps all received blocks information */
private val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]]

/** This is an unique identifier for the network input stream. */
val id = ssc.getNewNetworkStreamId()

/**
Expand All @@ -64,23 +66,45 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte

def stop() {}

/** Ask NetworkInputTracker for received data blocks and generates RDDs with them. */
override def compute(validTime: Time): Option[RDD[T]] = {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// master failure
if (validTime >= graph.startTime) {
val blockIds = ssc.scheduler.networkInputTracker.getBlockIds(id, validTime)
val blockInfo = ssc.scheduler.networkInputTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
}
}

/** Get information on received blocks. */
private[streaming] def getReceivedBlockInfo(time: Time) = {
receivedBlockInfo(time)
}

/**
* Clear metadata that are older than `rememberDuration` of this DStream.
* This is an internal method that should not be called directly. This
* implementation overrides the default implementation to clear received
* block information.
*/
private[streaming] override def clearMetadata(time: Time) {
super.clearMetadata(time)
val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration))
receivedBlockInfo --= oldReceivedBlocks.keys
logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " +
(time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", "))
}
}


private[streaming] sealed trait NetworkReceiverMessage
private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage
private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any)
private[streaming] case class ReportBlock(blockId: StreamBlockId, numRecords: Long, metadata: Any)
extends NetworkReceiverMessage
private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage

Expand Down Expand Up @@ -156,21 +180,20 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
actor ! ReportError(e.toString)
}


/**
* Pushes a block (as an ArrayBuffer filled with data) into the block manager.
*/
def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
def pushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
actor ! ReportBlock(blockId, metadata)
actor ! ReportBlock(blockId, arrayBuffer.size, metadata)
}

/**
* Pushes a block (as bytes) into the block manager.
*/
def pushBlock(blockId: BlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) {
def pushBlock(blockId: StreamBlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) {
env.blockManager.putBytes(blockId, bytes, level)
actor ! ReportBlock(blockId, metadata)
actor ! ReportBlock(blockId, -1 , metadata)
}

/** A helper actor that communicates with the NetworkInputTracker */
Expand All @@ -188,8 +211,8 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
}

override def receive() = {
case ReportBlock(blockId, metadata) =>
tracker ! AddBlocks(streamId, Array(blockId), metadata)
case ReportBlock(blockId, numRecords, metadata) =>
tracker ! AddBlocks(ReceivedBlockInfo(streamId, blockId, numRecords, metadata))
case ReportError(msg) =>
tracker ! DeregisterReceiver(streamId, msg)
case StopReceiver(msg) =>
Expand All @@ -211,7 +234,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
class BlockGenerator(storageLevel: StorageLevel)
extends Serializable with Logging {

case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null)
case class Block(id: StreamBlockId, buffer: ArrayBuffer[T], metadata: Any = null)

val clock = new SystemClock()
val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200)
Expand Down
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.streaming.Time
*/
case class BatchInfo(
batchTime: Time,
receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]],
submissionTime: Long,
processingStartTime: Option[Long],
processingEndTime: Option[Long]
Expand Down
Expand Up @@ -147,7 +147,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
timesToReschedule.mkString(", "))
timesToReschedule.foreach(time =>
jobScheduler.runJobs(time, graph.generateJobs(time))
jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
)

// Restart the timer
Expand All @@ -159,7 +159,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
private def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) => jobScheduler.runJobs(time, jobs)
case Success(jobs) =>
val receivedBlockInfo = graph.getNetworkInputStreams.map { stream =>
val streamId = stream.id
val receivedBlockInfo = stream.getReceivedBlockInfo(time)
(streamId, receivedBlockInfo)
}.toMap
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor ! DoCheckpoint(time)
Expand Down
Expand Up @@ -82,14 +82,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
}

def runJobs(time: Time, jobs: Seq[Job]) {
if (jobs.isEmpty) {
logInfo("No jobs added for time " + time)
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
val jobSet = new JobSet(time, jobs)
jobSets.put(time, jobSet)
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => executor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + time)
logInfo("Added jobs for time " + jobSet.time)
}
}

Expand Down
Expand Up @@ -24,7 +24,11 @@ import org.apache.spark.streaming.Time
* belong to the same batch.
*/
private[streaming]
case class JobSet(time: Time, jobs: Seq[Job]) {
case class JobSet(
time: Time,
jobs: Seq[Job],
receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]] = Map.empty
) {

private val incompleteJobs = new HashSet[Job]()
private val submissionTime = System.currentTimeMillis() // when this jobset was submitted
Expand Down Expand Up @@ -60,6 +64,7 @@ case class JobSet(time: Time, jobs: Seq[Job]) {
def toBatchInfo: BatchInfo = {
new BatchInfo(
time,
receivedBlockInfo,
submissionTime,
if (processingStartTime >= 0 ) Some(processingStartTime) else None,
if (processingEndTime >= 0 ) Some(processingEndTime) else None
Expand Down
Expand Up @@ -17,26 +17,33 @@

package org.apache.spark.streaming.scheduler

import org.apache.spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
import org.apache.spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
import org.apache.spark.{SparkException, Logging, SparkEnv}
import org.apache.spark.SparkContext._

import scala.collection.mutable.HashMap
import scala.collection.mutable.Queue
import scala.concurrent.duration._
import scala.collection.mutable.{HashMap, SynchronizedQueue, SynchronizedMap}

import akka.actor._
import akka.pattern.ask
import akka.dispatch._
import org.apache.spark.storage.BlockId
import org.apache.spark.streaming.{Time, StreamingContext}

import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.dstream.{NetworkReceiver, StopReceiver}
import org.apache.spark.util.AkkaUtils

/** Information about block received by the network receiver */
case class ReceivedBlockInfo(
streamId: Int,
blockId: StreamBlockId,
numRecords: Long,
metadata: Any
)

/**
* Messages used by the NetworkReceiver and the NetworkInputTracker to communicate
* with each other.
*/
private[streaming] sealed trait NetworkInputTrackerMessage
private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef)
extends NetworkInputTrackerMessage
private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any)
private[streaming] case class AddBlocks(receivedBlockInfo: ReceivedBlockInfo)
extends NetworkInputTrackerMessage
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String)
extends NetworkInputTrackerMessage
Expand All @@ -53,9 +60,10 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*)
val receiverExecutor = new ReceiverExecutor()
val receiverInfo = new HashMap[Int, ActorRef]
val receivedBlockIds = new HashMap[Int, Queue[BlockId]]
val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
val timeout = AkkaUtils.askTimeout(ssc.conf)

val listenerBus = ssc.scheduler.listenerBus

// actor is created when generator starts.
// This not being null means the tracker has been started and not stopped
Expand Down Expand Up @@ -87,15 +95,14 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
}

/** Return all the blocks received from a receiver. */
def getBlockIds(receiverId: Int, time: Time): Array[BlockId] = synchronized {
val queue = receivedBlockIds.synchronized {
receivedBlockIds.getOrElse(receiverId, new Queue[BlockId]())
}
val result = queue.synchronized {
queue.dequeueAll(x => true)
}
logInfo("Stream " + receiverId + " received " + result.size + " blocks")
result.toArray
def getReceivedBlockInfo(streamId: Int): Array[ReceivedBlockInfo] = {
val receivedBlockInfo = getReceivedBlockInfoQueue(streamId).dequeueAll(x => true)
logInfo("Stream " + streamId + " received " + receivedBlockInfo.size + " blocks")
receivedBlockInfo.toArray
}

private def getReceivedBlockInfoQueue(streamId: Int) = {
receivedBlockInfo.getOrElseUpdate(streamId, new SynchronizedQueue[ReceivedBlockInfo])
}

/** Actor to receive messages from the receivers. */
Expand All @@ -110,17 +117,8 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
+ sender.path.address)
sender ! true
}
case AddBlocks(streamId, blockIds, metadata) => {
val tmp = receivedBlockIds.synchronized {
if (!receivedBlockIds.contains(streamId)) {
receivedBlockIds += ((streamId, new Queue[BlockId]))
}
receivedBlockIds(streamId)
}
tmp.synchronized {
tmp ++= blockIds
}
networkInputStreamMap(streamId).addMetadata(metadata)
case AddBlocks(receivedBlockInfo) => {
getReceivedBlockInfoQueue(receivedBlockInfo.streamId) += receivedBlockInfo
}
case DeregisterReceiver(streamId, msg) => {
receiverInfo -= streamId
Expand Down
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.util.Distribution
/** Base trait for events related to StreamingListener */
sealed trait StreamingListenerEvent

case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent

Expand All @@ -34,14 +35,14 @@ private[scheduler] case object StreamingListenerShutdown extends StreamingListen
* computation.
*/
trait StreamingListener {
/**
* Called when processing of a batch has completed
*/

/** Called when a batch of jobs has been submitted for processing. */
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }

/** Called when processing of a batch of jobs has completed. */
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }

/**
* Called when processing of a batch has started
*/
/** Called when processing of a batch of jobs has started. */
def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
}

Expand Down

0 comments on commit 93f1c69

Please sign in to comment.