Skip to content

Commit

Permalink
Changes for SPARK-1853
Browse files Browse the repository at this point in the history
  • Loading branch information
mubarak committed Aug 10, 2014
1 parent 1500deb commit 70f494f
Show file tree
Hide file tree
Showing 21 changed files with 82 additions and 88 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1214,9 +1214,9 @@ abstract class RDD[T: ClassTag](

/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
@transient private[spark] val creationSite = {
val short: String = sc.getLocalProperty(name + Utils.CALL_SITE_SHORT)
val short: String = sc.getLocalProperty(Utils.CALL_SITE_SHORT)
if (short != null) {
CallSite(short, sc.getLocalProperty(name + Utils.CALL_SITE_LONG))
CallSite(short, sc.getLocalProperty(Utils.CALL_SITE_LONG))
} else {
Utils.getCallSite
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ private[spark] object Utils extends Logging {
SCALA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined ||
AKKA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined ||
JAVA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) {
lastSparkMethod = if (el.getMethodName == "<init>") {
lastSparkMethod = if (el.getMethodName == "<init>") {
// Spark method is a constructor; get its class name
el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ class StreamingContext private[streaming] (
if (isCheckpointPresent) {
new SparkContext(cp_.sparkConf)
} else {
sc_.setCallSite(Utils.getCallSite.short)
sc_
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,22 +106,25 @@ abstract class DStream[T: ClassTag] (
/** Return the StreamingContext associated with this DStream */
def context = ssc

private[streaming] val RDD_NAME: String = "rddName";

@transient var name: String = null

/** Assign a name to this DStream */
def setName(_name: String) = {
name = _name
}

/* Find the creation callSite */
val creationSite = Utils.getCallSite

/* Store the creation callSite in threadlocal */
private[streaming] def setCallSite = {
ssc.sparkContext.setLocalProperty(name + Utils.CALL_SITE_SHORT, creationSite.short)
ssc.sparkContext.setLocalProperty(name + Utils.CALL_SITE_LONG, creationSite.long)
private[streaming] def setCreationCallSite() = {
ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_SHORT, creationSite.short)
ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_LONG, creationSite.long)
}

/* Store the supplied callSite in threadlocal */
private[streaming] def setCallSite(callSite: CallSite) = {
ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_SHORT, callSite.short)
ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_LONG, callSite.long)
}

/* Return the current callSite */
private[streaming] def getCallSite() = {
CallSite(ssc.sparkContext.getLocalProperty(Utils.CALL_SITE_SHORT),
ssc.sparkContext.getLocalProperty(Utils.CALL_SITE_LONG))
}

/** Persist the RDDs of this DStream with the given storage level */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
override def start() { }

override def stop() { }
setName("UnionRDD")

/**
* Finds the files that were modified since the last time this method was called and makes
Expand All @@ -71,9 +70,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
assert(validTime.milliseconds >= ignoreTime,
"Trying to get new files for a really old time [" + validTime + " < " + ignoreTime + "]")

setCallSite
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
val prevCallSite = getCallSite
setCreationCallSite
// Find new files
val (newFiles, minNewFileModTime) = findNewFiles(validTime.milliseconds)
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
Expand All @@ -83,6 +81,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
ignoreTime = minNewFileModTime
}
files += ((validTime, newFiles.toArray))
setCallSite(prevCallSite)
Some(filesToRDD(newFiles))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@ class FilteredDStream[T: ClassTag](
filterFunc: T => Boolean
) extends DStream[T](parent.ssc) {

setName("FilteredRDD")

override def dependencies = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[T]] = {
setCallSite
val prevCallSite = getCallSite
setCreationCallSite
val rdd: Option[RDD[T]] = parent.getOrCompute(validTime).map(_.filter(filterFunc))
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
setCallSite(prevCallSite)
return rdd
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@ class FlatMapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
flatMapValueFunc: V => TraversableOnce[U]
) extends DStream[(K, U)](parent.ssc) {

setName("FlatMappedValuesRDD")

override def dependencies = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[(K, U)]] = {
setCallSite
val prevCallSite = getCallSite
setCreationCallSite
val rdd: Option[RDD[(K, U)]] = parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
setCallSite(prevCallSite)
return rdd
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@ class FlatMappedDStream[T: ClassTag, U: ClassTag](
flatMapFunc: T => Traversable[U]
) extends DStream[U](parent.ssc) {

setName("FlatMappedRDD")

override def dependencies = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[U]] = {
setCallSite
val prevCallSite = getCallSite
setCreationCallSite
val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
setCallSite(prevCallSite)
return rdd
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@ class ForEachDStream[T: ClassTag] (
override def compute(validTime: Time): Option[RDD[Unit]] = None

override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
return parent.getOrCompute(time) match {
case Some(rdd) =>
//parent.ssc.sc.setLocalProperty("spark.job.callSiteShort", rdd.creationSite.short)
//parent.ssc.sc.setLocalProperty("spark.job.callSiteLong", rdd.creationSite.long)
val jobFunc = () => {
foreachFunc(rdd, time)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@ private[streaming]
class GlommedDStream[T: ClassTag](parent: DStream[T])
extends DStream[Array[T]](parent.ssc) {

setName("GlommedRDD")

override def dependencies = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[Array[T]]] = {
setCallSite
val prevCallSite = getCallSite
setCreationCallSite
val rdd: Option[RDD[Array[T]]] = parent.getOrCompute(validTime).map(_.glom())
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
setCallSite(prevCallSite)
return rdd
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@ class MapPartitionedDStream[T: ClassTag, U: ClassTag](
preservePartitioning: Boolean
) extends DStream[U](parent.ssc) {

setName("MapPartitionsRDD")

override def dependencies = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[U]] = {
setCallSite
val prevCallSite = getCallSite
setCreationCallSite
val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
setCallSite(prevCallSite)
return rdd
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@ class MapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
mapValueFunc: V => U
) extends DStream[(K, U)](parent.ssc) {

setName("MappedValuesRDD")

override def dependencies = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[(K, U)]] = {
setCallSite
val prevCallSite = getCallSite
setCreationCallSite
val rdd: Option[RDD[(K, U)]] = parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
setCallSite(prevCallSite)
return rdd
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,23 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
import org.apache.spark.util.Utils

private[streaming]
class MappedDStream[T: ClassTag, U: ClassTag] (
parent: DStream[T],
mapFunc: T => U
) extends DStream[U](parent.ssc) {

setName("MappedRDD")

override def dependencies = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[U]] = {
setCallSite
val prevCallSite = getCallSite
setCreationCallSite
val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.map[U](mapFunc))
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
setCallSite(prevCallSite)
return rdd
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,20 @@ class QueueInputDStream[T: ClassTag](
defaultRDD: RDD[T]
) extends InputDStream[T](ssc) {

setName("UnionRDD")

override def start() { }

override def stop() { }

override def compute(validTime: Time): Option[RDD[T]] = {
setCallSite
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
val prevCallSite = getCallSite
setCreationCallSite
val buffer = new ArrayBuffer[RDD[T]]()
if (oneAtATime && queue.size > 0) {
buffer += queue.dequeue()
} else {
buffer ++= queue.dequeueAll(_ => true)
}
setCallSite(prevCallSite)
if (buffer.size > 0) {
if (oneAtATime) {
Some(buffer.head)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.storage.BlockId
import org.apache.spark.streaming._
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
import org.apache.spark.util.{Utils, CallSite}

/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
Expand All @@ -46,8 +45,6 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
/** This is an unique identifier for the network input stream. */
val id = ssc.getNewReceiverStreamId()

setName("BlockRDD")

/**
* Gets the receiver object that will be sent to the worker nodes
* to receive data. This method needs to defined by any specific implementation
Expand All @@ -62,20 +59,22 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont

/** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */
override def compute(validTime: Time): Option[RDD[T]] = {
setCallSite
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
val prevCallSite = getCallSite
setCreationCallSite
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// master failure
var blockRDD: Option[RDD[T]] = None
if (validTime >= graph.startTime) {
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
Some(new BlockRDD[T](ssc.sc, blockIds))
blockRDD = Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
blockRDD = Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
}

setCallSite(prevCallSite)
blockRDD
}

/** Get information on received blocks. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
)

setName("MappedValuesRDD")


// Reduce each batch of data using reduceByKey which will be further reduced by window
// by ReducedWindowedDStream
val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
Expand Down Expand Up @@ -84,8 +85,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
}

override def compute(validTime: Time): Option[RDD[(K, V)]] = {
setCallSite
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
val prevCallSite = getCallSite
setCreationCallSite
val reduceF = reduceFunc
val invReduceF = invReduceFunc

Expand Down Expand Up @@ -170,11 +171,13 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
}

val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues)

var returnRDD: Option[RDD[(K, V)]] = None
if (filterFunc.isDefined) {
Some(mergedValuesRDD.filter(filterFunc.get))
returnRDD = Some(mergedValuesRDD.filter(filterFunc.get))
} else {
Some(mergedValuesRDD)
returnRDD = Some(mergedValuesRDD)
}
setCallSite(prevCallSite)
returnRDD
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,19 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
mapSideCombine: Boolean = true
) extends DStream[(K,C)] (parent.ssc) {

setName("ShuffledRDD")

override def dependencies = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[(K,C)]] = {
setCallSite
val prevCallSite = getCallSite
setCreationCallSite
val rdd: Option[RDD[(K,C)]] = parent.getOrCompute(validTime) match {
case Some(rdd) => Some(rdd.combineByKey[C](
createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
case None => None
}
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
setCallSite(prevCallSite)
return rdd
}
}
Loading

0 comments on commit 70f494f

Please sign in to comment.