diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 9be15f1eaf644..b88934bc1f7d6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1214,9 +1214,9 @@ abstract class RDD[T: ClassTag]( /** User code that created this RDD (e.g. `textFile`, `parallelize`). */ @transient private[spark] val creationSite = { - val short: String = sc.getLocalProperty(name + Utils.CALL_SITE_SHORT) + val short: String = sc.getLocalProperty(Utils.CALL_SITE_SHORT) if (short != null) { - CallSite(short, sc.getLocalProperty(name + Utils.CALL_SITE_LONG)) + CallSite(short, sc.getLocalProperty(Utils.CALL_SITE_LONG)) } else { Utils.getCallSite } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1ef662f421558..bd39c10b50e9d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -838,7 +838,7 @@ private[spark] object Utils extends Logging { SCALA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined || AKKA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined || JAVA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) { - lastSparkMethod = if (el.getMethodName == "") { + lastSparkMethod = if (el.getMethodName == "") { // Spark method is a constructor; get its class name el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1) } else { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index a4028c016a904..b287662ff1a7a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -112,7 +112,6 @@ class StreamingContext private[streaming] ( if (isCheckpointPresent) { new SparkContext(cp_.sparkConf) } else { - sc_.setCallSite(Utils.getCallSite.short) sc_ } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index ada01cae93f0b..16da46a15de15 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -106,22 +106,25 @@ abstract class DStream[T: ClassTag] ( /** Return the StreamingContext associated with this DStream */ def context = ssc - private[streaming] val RDD_NAME: String = "rddName"; - - @transient var name: String = null - - /** Assign a name to this DStream */ - def setName(_name: String) = { - name = _name - } - /* Find the creation callSite */ val creationSite = Utils.getCallSite /* Store the creation callSite in threadlocal */ - private[streaming] def setCallSite = { - ssc.sparkContext.setLocalProperty(name + Utils.CALL_SITE_SHORT, creationSite.short) - ssc.sparkContext.setLocalProperty(name + Utils.CALL_SITE_LONG, creationSite.long) + private[streaming] def setCreationCallSite() = { + ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_SHORT, creationSite.short) + ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_LONG, creationSite.long) + } + + /* Store the supplied callSite in threadlocal */ + private[streaming] def setCallSite(callSite: CallSite) = { + ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_SHORT, callSite.short) + ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_LONG, callSite.long) + } + + /* Return the current callSite */ + private[streaming] def getCallSite() = { + CallSite(ssc.sparkContext.getLocalProperty(Utils.CALL_SITE_SHORT), + ssc.sparkContext.getLocalProperty(Utils.CALL_SITE_LONG)) } /** Persist the RDDs of this DStream with the given storage level */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index dc0e30a5999eb..6dcb07e679ccb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -57,7 +57,6 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas override def start() { } override def stop() { } - setName("UnionRDD") /** * Finds the files that were modified since the last time this method was called and makes @@ -71,9 +70,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas override def compute(validTime: Time): Option[RDD[(K, V)]] = { assert(validTime.milliseconds >= ignoreTime, "Trying to get new files for a really old time [" + validTime + " < " + ignoreTime + "]") - - setCallSite - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + val prevCallSite = getCallSite + setCreationCallSite // Find new files val (newFiles, minNewFileModTime) = findNewFiles(validTime.milliseconds) logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) @@ -83,6 +81,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas ignoreTime = minNewFileModTime } files += ((validTime, newFiles.toArray)) + setCallSite(prevCallSite) Some(filesToRDD(newFiles)) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala index 1ed7186044517..c49f620533d85 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala @@ -27,16 +27,15 @@ class FilteredDStream[T: ClassTag]( filterFunc: T => Boolean ) extends DStream[T](parent.ssc) { - setName("FilteredRDD") - override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[T]] = { - setCallSite + val prevCallSite = getCallSite + setCreationCallSite val rdd: Option[RDD[T]] = parent.getOrCompute(validTime).map(_.filter(filterFunc)) - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + setCallSite(prevCallSite) return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala index 1e2e3053a5bc3..ef9cead0e9bcb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala @@ -28,16 +28,15 @@ class FlatMapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag]( flatMapValueFunc: V => TraversableOnce[U] ) extends DStream[(K, U)](parent.ssc) { - setName("FlatMappedValuesRDD") - override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[(K, U)]] = { - setCallSite + val prevCallSite = getCallSite + setCreationCallSite val rdd: Option[RDD[(K, U)]] = parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc)) - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + setCallSite(prevCallSite) return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala index e3223ccf1be6c..898cf82f5a3ab 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala @@ -27,16 +27,15 @@ class FlatMappedDStream[T: ClassTag, U: ClassTag]( flatMapFunc: T => Traversable[U] ) extends DStream[U](parent.ssc) { - setName("FlatMappedRDD") - override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { - setCallSite + val prevCallSite = getCallSite + setCreationCallSite val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + setCallSite(prevCallSite) return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala index 5d4c9b153b18a..d696d57fac52f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala @@ -35,10 +35,8 @@ class ForEachDStream[T: ClassTag] ( override def compute(validTime: Time): Option[RDD[Unit]] = None override def generateJob(time: Time): Option[Job] = { - parent.getOrCompute(time) match { + return parent.getOrCompute(time) match { case Some(rdd) => - //parent.ssc.sc.setLocalProperty("spark.job.callSiteShort", rdd.creationSite.short) - //parent.ssc.sc.setLocalProperty("spark.job.callSiteLong", rdd.creationSite.long) val jobFunc = () => { foreachFunc(rdd, time) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala index 215626f9416c7..dac1ba817386a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala @@ -25,16 +25,15 @@ private[streaming] class GlommedDStream[T: ClassTag](parent: DStream[T]) extends DStream[Array[T]](parent.ssc) { - setName("GlommedRDD") - override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[Array[T]]] = { - setCallSite + val prevCallSite = getCallSite + setCreationCallSite val rdd: Option[RDD[Array[T]]] = parent.getOrCompute(validTime).map(_.glom()) - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + setCallSite(prevCallSite) return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala index d179bfc0a0974..a6350bfbf918b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala @@ -28,16 +28,15 @@ class MapPartitionedDStream[T: ClassTag, U: ClassTag]( preservePartitioning: Boolean ) extends DStream[U](parent.ssc) { - setName("MapPartitionsRDD") - override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { - setCallSite + val prevCallSite = getCallSite + setCreationCallSite val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning)) - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + setCallSite(prevCallSite) return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala index 6431dd0424a9b..5095aba74995f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala @@ -28,16 +28,15 @@ class MapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag]( mapValueFunc: V => U ) extends DStream[(K, U)](parent.ssc) { - setName("MappedValuesRDD") - override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[(K, U)]] = { - setCallSite + val prevCallSite = getCallSite + setCreationCallSite val rdd: Option[RDD[(K, U)]] = parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc)) - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + setCallSite(prevCallSite) return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala index 55e9fad3e98eb..cad0ee5116700 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag +import org.apache.spark.util.Utils private[streaming] class MappedDStream[T: ClassTag, U: ClassTag] ( @@ -27,16 +28,15 @@ class MappedDStream[T: ClassTag, U: ClassTag] ( mapFunc: T => U ) extends DStream[U](parent.ssc) { - setName("MappedRDD") - override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { - setCallSite + val prevCallSite = getCallSite + setCreationCallSite val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.map[U](mapFunc)) - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + setCallSite(prevCallSite) return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala index 667c68fc3727c..6e39712eba5eb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala @@ -32,21 +32,20 @@ class QueueInputDStream[T: ClassTag]( defaultRDD: RDD[T] ) extends InputDStream[T](ssc) { - setName("UnionRDD") - override def start() { } override def stop() { } override def compute(validTime: Time): Option[RDD[T]] = { - setCallSite - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + val prevCallSite = getCallSite + setCreationCallSite val buffer = new ArrayBuffer[RDD[T]]() if (oneAtATime && queue.size > 0) { buffer += queue.dequeue() } else { buffer ++= queue.dequeueAll(_ => true) } + setCallSite(prevCallSite) if (buffer.size > 0) { if (oneAtATime) { Some(buffer.head) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 8954e0e4506f3..f69f11e7f9f74 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -25,7 +25,6 @@ import org.apache.spark.storage.BlockId import org.apache.spark.streaming._ import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.scheduler.ReceivedBlockInfo -import org.apache.spark.util.{Utils, CallSite} /** * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] @@ -46,8 +45,6 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont /** This is an unique identifier for the network input stream. */ val id = ssc.getNewReceiverStreamId() - setName("BlockRDD") - /** * Gets the receiver object that will be sent to the worker nodes * to receive data. This method needs to defined by any specific implementation @@ -62,20 +59,22 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont /** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */ override def compute(validTime: Time): Option[RDD[T]] = { - setCallSite - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + val prevCallSite = getCallSite + setCreationCallSite // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // master failure + var blockRDD: Option[RDD[T]] = None if (validTime >= graph.startTime) { val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id) receivedBlockInfo(validTime) = blockInfo val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId]) - Some(new BlockRDD[T](ssc.sc, blockIds)) + blockRDD = Some(new BlockRDD[T](ssc.sc, blockIds)) } else { - Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) + blockRDD = Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) } - + setCallSite(prevCallSite) + blockRDD } /** Get information on received blocks. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala index 4efac61b98f23..bcb4c0e33f4c7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -52,7 +52,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" ) - setName("MappedValuesRDD") + + // Reduce each batch of data using reduceByKey which will be further reduced by window // by ReducedWindowedDStream val reducedStream = parent.reduceByKey(reduceFunc, partitioner) @@ -84,8 +85,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( } override def compute(validTime: Time): Option[RDD[(K, V)]] = { - setCallSite - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + val prevCallSite = getCallSite + setCreationCallSite val reduceF = reduceFunc val invReduceF = invReduceFunc @@ -170,11 +171,13 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( } val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues) - + var returnRDD: Option[RDD[(K, V)]] = None if (filterFunc.isDefined) { - Some(mergedValuesRDD.filter(filterFunc.get)) + returnRDD = Some(mergedValuesRDD.filter(filterFunc.get)) } else { - Some(mergedValuesRDD) + returnRDD = Some(mergedValuesRDD) } + setCallSite(prevCallSite) + returnRDD } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala index 777a87ef20978..e6be835b5fa04 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala @@ -33,20 +33,19 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag]( mapSideCombine: Boolean = true ) extends DStream[(K,C)] (parent.ssc) { - setName("ShuffledRDD") - override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[(K,C)]] = { - setCallSite + val prevCallSite = getCallSite + setCreationCallSite val rdd: Option[RDD[(K,C)]] = parent.getOrCompute(validTime) match { case Some(rdd) => Some(rdd.combineByKey[C]( createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)) case None => None } - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + setCallSite(prevCallSite) return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index a7dd7d81b4e85..b027e376d4e73 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -34,7 +34,6 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( ) extends DStream[(K, S)](parent.ssc) { super.persist(StorageLevel.MEMORY_ONLY_SER) - setName("MapPartitionsRDD") override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration @@ -42,8 +41,8 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( override val mustCheckpoint = true override def compute(validTime: Time): Option[RDD[(K, S)]] = { - setCallSite - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + val prevCallSite = getCallSite + setCreationCallSite // Try to get the previous state RDD getOrCompute(validTime - slideDuration) match { @@ -71,6 +70,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( } val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) + setCallSite(prevCallSite) Some(stateRDD) } case None => { // If parent RDD does not exist @@ -82,6 +82,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( updateFuncLocal(i) } val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning) + setCallSite(prevCallSite) Some(stateRDD) } } @@ -104,10 +105,12 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( val groupedRDD = parentRDD.groupByKey(partitioner) val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning) // logDebug("Generating state RDD for time " + validTime + " (first)") + setCallSite(prevCallSite) Some(sessionRDD) } case None => { // If parent RDD does not exist, then nothing to do! // logDebug("Not generating state RDD (no previous state, no parent)") + setCallSite(prevCallSite) None } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index 9bbf9a24deaba..57aea89216b20 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -32,17 +32,16 @@ class TransformedDStream[U: ClassTag] ( require(parents.map(_.slideDuration).distinct.size == 1, "Some of the DStreams have different slide durations") - setName("TransformedRDD") - override def dependencies = parents.toList override def slideDuration: Duration = parents.head.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { - setCallSite + val prevCallSite = getCallSite + setCreationCallSite val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq val rdd: Option[RDD[U]] = Some(transformFunc(parentRDDs, validTime)) - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + setCallSite(prevCallSite) return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala index 12d3c5bdd3dea..c159fc9223850 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala @@ -40,15 +40,13 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]]) throw new IllegalArgumentException("Array of parents have different slide times") } - setName("UnionRDD") - override def dependencies = parents.toList override def slideDuration: Duration = parents.head.slideDuration override def compute(validTime: Time): Option[RDD[T]] = { - setCallSite - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + val prevCallSite = getCallSite + setCreationCallSite val rdds = new ArrayBuffer[RDD[T]]() parents.map(_.getOrCompute(validTime)).foreach(_ match { case Some(rdd) => rdds += rdd @@ -56,8 +54,10 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]]) + validTime) }) if (rdds.size > 0) { + setCallSite(prevCallSite) Some(new UnionRDD(ssc.sc, rdds)) } else { + setCallSite(prevCallSite) None } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala index ff715306316f0..2cf493312cf77 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -44,8 +44,6 @@ class WindowedDStream[T: ClassTag]( // Persist parent level by default, as those RDDs are going to be obviously reused. parent.persist(StorageLevel.MEMORY_ONLY_SER) - setName("PartitionerAwareUnionRDD") - def windowDuration: Duration = _windowDuration override def dependencies = List(parent) @@ -63,8 +61,8 @@ class WindowedDStream[T: ClassTag]( } override def compute(validTime: Time): Option[RDD[T]] = { - setCallSite - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + val prevCallSite = getCallSite + setCreationCallSite val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) val rddsInWindow = parent.slice(currentWindow) val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) { @@ -74,6 +72,7 @@ class WindowedDStream[T: ClassTag]( logDebug("Using normal union for windowing at " + validTime) new UnionRDD(ssc.sc,rddsInWindow) } + setCallSite(prevCallSite) Some(windowRDD) } }