Skip to content

Commit

Permalink
Changes in Spark Streaming UI
Browse files Browse the repository at this point in the history
  • Loading branch information
mubarak committed Jul 18, 2014
1 parent 9d38d3c commit 1500deb
Show file tree
Hide file tree
Showing 21 changed files with 115 additions and 31 deletions.
13 changes: 5 additions & 8 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{BoundedPriorityQueue, CallSite, Utils}
import org.apache.spark.util.{Utils, BoundedPriorityQueue, CallSite}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils}

Expand Down Expand Up @@ -124,7 +124,7 @@ abstract class RDD[T: ClassTag](
val id: Int = sc.newRddId()

/** A friendly name for this RDD */
@transient var name: String = null
@transient var name: String = sc.getLocalProperty("rddName")

/** Assign a name to this RDD */
def setName(_name: String): this.type = {
Expand Down Expand Up @@ -1214,14 +1214,11 @@ abstract class RDD[T: ClassTag](

/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
@transient private[spark] val creationSite = {
val short: String = sc.getLocalProperty("spark.job.callSiteShort")
val short: String = sc.getLocalProperty(name + Utils.CALL_SITE_SHORT)
if (short != null) {
CallSite(short, sc.getLocalProperty("spark.job.callSiteLong"))
CallSite(short, sc.getLocalProperty(name + Utils.CALL_SITE_LONG))
} else {
val callSite: CallSite = Utils.getCallSite
//sc.setLocalProperty("spark.job.callSiteShort", callSite.short)
//sc.setLocalProperty("spark.job.callSiteLong", callSite.long)
callSite
Utils.getCallSite
}
}
private[spark] def getCreationSite: String = Option(creationSite).map(_.short).getOrElse("")
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ private[spark] case class CallSite(val short: String, val long: String)
private[spark] object Utils extends Logging {
val random = new Random()

private[spark] val CALL_SITE_SHORT: String = ".callSite.short"
private[spark] val CALL_SITE_LONG: String = ".callSite.long"

def sparkBin(sparkHome: String, which: String): File = {
val suffix = if (isWindows) ".cmd" else ""
new File(sparkHome + File.separator + "bin", which + suffix)
Expand Down Expand Up @@ -800,7 +803,7 @@ private[spark] object Utils extends Logging {
* A regular expression to match classes of the "core" Spark API that we want to skip when
* finding the call site of a method.
*/
private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.streaming)?(\.streaming\.dstream)?(\.streaming\.scheduler)?\.[A-Z]""".r
private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.streaming)?(\.streaming\.dstream)?(\.streaming\.scheduler)?(\.streaming\.twitter)?(\.streaming\.kafka)?(\.streaming\.flume)?(\.streaming\.mqtt)?(\.streaming\.zeromq)?\.[A-Z]""".r
private val SCALA_CLASS_REGEX = """^scala(\.util)?(\.collection)?(\.collection\.mutable)?(\.collection\.immutable)?(\.concurrent\.forkjoin)?\.[A-Z]""".r
private val AKKA_CLASS_REGEX = """^akka(\.actor)?(\.dispatch)?\.[A-Z]""".r
private val JAVA_CLASS_REGEX = """^java(\.util\.concurrent)?(\.lang)?\.[A-Z]""".r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.scheduler.Job
import org.apache.spark.util.{Utils, MetadataCleaner}
import org.apache.spark.util.{CallSite, Utils, MetadataCleaner}

/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
Expand Down Expand Up @@ -106,6 +106,24 @@ abstract class DStream[T: ClassTag] (
/** Return the StreamingContext associated with this DStream */
def context = ssc

private[streaming] val RDD_NAME: String = "rddName";

@transient var name: String = null

/** Assign a name to this DStream */
def setName(_name: String) = {
name = _name
}

/* Find the creation callSite */
val creationSite = Utils.getCallSite

/* Store the creation callSite in threadlocal */
private[streaming] def setCallSite = {
ssc.sparkContext.setLocalProperty(name + Utils.CALL_SITE_SHORT, creationSite.short)
ssc.sparkContext.setLocalProperty(name + Utils.CALL_SITE_LONG, creationSite.long)
}

/** Persist the RDDs of this DStream with the given storage level */
def persist(level: StorageLevel): DStream[T] = {
if (this.isInitialized) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
override def start() { }

override def stop() { }
setName("UnionRDD")

/**
* Finds the files that were modified since the last time this method was called and makes
Expand All @@ -71,6 +72,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
assert(validTime.milliseconds >= ignoreTime,
"Trying to get new files for a really old time [" + validTime + " < " + ignoreTime + "]")

setCallSite
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
// Find new files
val (newFiles, minNewFileModTime) = findNewFiles(validTime.milliseconds)
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@ class FilteredDStream[T: ClassTag](
filterFunc: T => Boolean
) extends DStream[T](parent.ssc) {

setName("FilteredRDD")

override def dependencies = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[T]] = {
parent.getOrCompute(validTime).map(_.filter(filterFunc))
setCallSite
val rdd: Option[RDD[T]] = parent.getOrCompute(validTime).map(_.filter(filterFunc))
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
return rdd
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@ class FlatMapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
flatMapValueFunc: V => TraversableOnce[U]
) extends DStream[(K, U)](parent.ssc) {

setName("FlatMappedValuesRDD")

override def dependencies = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[(K, U)]] = {
parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
setCallSite
val rdd: Option[RDD[(K, U)]] = parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
return rdd
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@ class FlatMappedDStream[T: ClassTag, U: ClassTag](
flatMapFunc: T => Traversable[U]
) extends DStream[U](parent.ssc) {

setName("FlatMappedRDD")

override def dependencies = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
setCallSite
val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
return rdd
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ class ForEachDStream[T: ClassTag] (

override def compute(validTime: Time): Option[RDD[Unit]] = None

//TODO: where to clear up the threadlocal values?
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
parent.ssc.sc.setLocalProperty("spark.job.callSiteShort", rdd.creationSite.short)
parent.ssc.sc.setLocalProperty("spark.job.callSiteLong", rdd.creationSite.long)
//parent.ssc.sc.setLocalProperty("spark.job.callSiteShort", rdd.creationSite.short)
//parent.ssc.sc.setLocalProperty("spark.job.callSiteLong", rdd.creationSite.long)
val jobFunc = () => {
foreachFunc(rdd, time)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@ private[streaming]
class GlommedDStream[T: ClassTag](parent: DStream[T])
extends DStream[Array[T]](parent.ssc) {

setName("GlommedRDD")

override def dependencies = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[Array[T]]] = {
parent.getOrCompute(validTime).map(_.glom())
setCallSite
val rdd: Option[RDD[Array[T]]] = parent.getOrCompute(validTime).map(_.glom())
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
return rdd
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,17 @@ class MapPartitionedDStream[T: ClassTag, U: ClassTag](
preservePartitioning: Boolean
) extends DStream[U](parent.ssc) {

setName("MapPartitionsRDD")

override def dependencies = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
setCallSite
val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
return rdd
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,17 @@ class MapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
mapValueFunc: V => U
) extends DStream[(K, U)](parent.ssc) {

setName("MappedValuesRDD")

override def dependencies = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[(K, U)]] = {
parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
setCallSite
val rdd: Option[RDD[(K, U)]] = parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
return rdd
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@ class MappedDStream[T: ClassTag, U: ClassTag] (
mapFunc: T => U
) extends DStream[U](parent.ssc) {

setName("MappedRDD")

override def dependencies = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.map[U](mapFunc))
setCallSite
val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.map[U](mapFunc))
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
return rdd
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@ class QueueInputDStream[T: ClassTag](
defaultRDD: RDD[T]
) extends InputDStream[T](ssc) {

setName("UnionRDD")

override def start() { }

override def stop() { }

override def compute(validTime: Time): Option[RDD[T]] = {
setCallSite
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
val buffer = new ArrayBuffer[RDD[T]]()
if (oneAtATime && queue.size > 0) {
buffer += queue.dequeue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.storage.BlockId
import org.apache.spark.streaming._
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
import org.apache.spark.util.{Utils, CallSite}

/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
Expand All @@ -45,6 +46,8 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
/** This is an unique identifier for the network input stream. */
val id = ssc.getNewReceiverStreamId()

setName("BlockRDD")

/**
* Gets the receiver object that will be sent to the worker nodes
* to receive data. This method needs to defined by any specific implementation
Expand All @@ -59,6 +62,8 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont

/** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */
override def compute(validTime: Time): Option[RDD[T]] = {
setCallSite
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// master failure
Expand All @@ -70,6 +75,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
}

}

/** Get information on received blocks. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
)

setName("MappedValuesRDD")
// Reduce each batch of data using reduceByKey which will be further reduced by window
// by ReducedWindowedDStream
val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
Expand Down Expand Up @@ -83,6 +84,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
}

override def compute(validTime: Time): Option[RDD[(K, V)]] = {
setCallSite
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
val reduceF = reduceFunc
val invReduceF = invReduceFunc

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,20 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
mapSideCombine: Boolean = true
) extends DStream[(K,C)] (parent.ssc) {

setName("ShuffledRDD")

override def dependencies = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[(K,C)]] = {
parent.getOrCompute(validTime) match {
setCallSite
val rdd: Option[RDD[(K,C)]] = parent.getOrCompute(validTime) match {
case Some(rdd) => Some(rdd.combineByKey[C](
createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
case None => None
}
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
return rdd
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
) extends DStream[(K, S)](parent.ssc) {

super.persist(StorageLevel.MEMORY_ONLY_SER)

setName("MapPartitionsRDD")
override def dependencies = List(parent)

override def slideDuration: Duration = parent.slideDuration

override val mustCheckpoint = true

override def compute(validTime: Time): Option[RDD[(K, S)]] = {
setCallSite
ssc.sparkContext.setLocalProperty(RDD_NAME, name)

// Try to get the previous state RDD
getOrCompute(validTime - slideDuration) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,17 @@ class TransformedDStream[U: ClassTag] (
require(parents.map(_.slideDuration).distinct.size == 1,
"Some of the DStreams have different slide durations")

setName("TransformedRDD")

override def dependencies = parents.toList

override def slideDuration: Duration = parents.head.slideDuration

override def compute(validTime: Time): Option[RDD[U]] = {
setCallSite
val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
Some(transformFunc(parentRDDs, validTime))
val rdd: Option[RDD[U]] = Some(transformFunc(parentRDDs, validTime))
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
return rdd
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,15 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
throw new IllegalArgumentException("Array of parents have different slide times")
}

setName("UnionRDD")

override def dependencies = parents.toList

override def slideDuration: Duration = parents.head.slideDuration

override def compute(validTime: Time): Option[RDD[T]] = {
setCallSite
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
val rdds = new ArrayBuffer[RDD[T]]()
parents.map(_.getOrCompute(validTime)).foreach(_ match {
case Some(rdd) => rdds += rdd
Expand Down
Loading

0 comments on commit 1500deb

Please sign in to comment.