Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class TaskContext(
val attemptId: Long,
val runningLocally: Boolean = false,
@volatile var interrupted: Boolean = false,
private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty()
private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty
) extends Serializable {

@deprecated("use partitionId", "0.8.1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class TaskMetrics extends Serializable {
}

private[spark] object TaskMetrics {
def empty(): TaskMetrics = new TaskMetrics
def empty: TaskMetrics = new TaskMetrics
}


Expand Down
25 changes: 25 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.rdd
import java.util.Random

import scala.collection.Map
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.{classTag, ClassTag}

Expand Down Expand Up @@ -235,6 +236,30 @@ abstract class RDD[T: ClassTag](
}
}

/**
* Return the ancestors of the given RDD that are related to it only through a sequence of
* narrow dependencies. This traverses the given RDD's dependency tree using DFS, but maintains
* no ordering on the RDDs returned.
*/
private[spark] def getNarrowAncestors: Seq[RDD[_]] = {
val ancestors = new mutable.HashSet[RDD[_]]

def visit(rdd: RDD[_]) {
val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]])
val narrowParents = narrowDependencies.map(_.rdd)
val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains)
narrowParentsNotVisited.foreach { parent =>
ancestors.add(parent)
visit(parent)
}
}

visit(this)

// In case there is a cycle, do not include the root itself
ancestors.filterNot(_ == this).toSeq
}

/**
* Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val taskInfo = taskEnd.taskInfo
var taskStatus = "TASK_TYPE=%s".format(taskEnd.taskType)
val taskMetrics = if (taskEnd.taskMetrics != null) taskEnd.taskMetrics else TaskMetrics.empty()
val taskMetrics = if (taskEnd.taskMetrics != null) taskEnd.taskMetrics else TaskMetrics.empty
taskEnd.reason match {
case Success => taskStatus += " STATUS=SUCCESS"
recordTaskMetrics(taskEnd.stageId, taskStatus, taskInfo, taskMetrics)
Expand Down
19 changes: 12 additions & 7 deletions core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.storage.RDDInfo
* Stores information about a stage to pass from the scheduler to SparkListeners.
*/
@DeveloperApi
class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddInfo: RDDInfo) {
class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddInfos: Seq[RDDInfo]) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
/** Time when all tasks in the stage completed or when the stage was cancelled. */
Expand All @@ -41,12 +41,17 @@ class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddIn
}
}

private[spark]
object StageInfo {
private[spark] object StageInfo {
/**
* Construct a StageInfo from a Stage.
*
* Each Stage is associated with one or many RDDs, with the boundary of a Stage marked by
* shuffle dependencies. Therefore, all ancestor RDDs related to this Stage's RDD through a
* sequence of narrow dependencies should also be associated with this Stage.
*/
def fromStage(stage: Stage): StageInfo = {
val rdd = stage.rdd
val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
val rddInfo = new RDDInfo(rdd.id, rddName, rdd.partitions.size, rdd.getStorageLevel)
new StageInfo(stage.id, stage.name, stage.numTasks, rddInfo)
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos)
}
}
55 changes: 55 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from StorageUtils.scala

* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.storage

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

@DeveloperApi
class RDDInfo(
val id: Int,
val name: String,
val numPartitions: Int,
val storageLevel: StorageLevel)
extends Ordered[RDDInfo] {

var numCachedPartitions = 0
var memSize = 0L
var diskSize = 0L
var tachyonSize = 0L

override def toString = {
import Utils.bytesToString
("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; " +
"TachyonSize: %s; DiskSize: %s").format(
name, id, storageLevel.toString, numCachedPartitions, numPartitions,
bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize))
}

override def compare(that: RDDInfo) = {
this.id - that.id
}
}

private[spark] object RDDInfo {
def fromRdd(rdd: RDD[_]): RDDInfo = {
val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
new RDDInfo(rdd.id, rddName, rdd.partitions.size, rdd.getStorageLevel)
}
}
44 changes: 7 additions & 37 deletions core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,60 +21,30 @@ import scala.collection.Map
import scala.collection.mutable

import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

private[spark]
class StorageStatus(
/** Storage information for each BlockManager. */
private[spark] class StorageStatus(
val blockManagerId: BlockManagerId,
val maxMem: Long,
val blocks: mutable.Map[BlockId, BlockStatus] = mutable.Map.empty) {

def memUsed() = blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
def memUsed = blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)

def memUsedByRDD(rddId: Int) =
rddBlocks.filterKeys(_.rddId == rddId).values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)

def diskUsed() = blocks.values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
def diskUsed = blocks.values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)

def diskUsedByRDD(rddId: Int) =
rddBlocks.filterKeys(_.rddId == rddId).values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)

def memRemaining : Long = maxMem - memUsed()
def memRemaining: Long = maxMem - memUsed

def rddBlocks = blocks.collect { case (rdd: RDDBlockId, status) => (rdd, status) }
}

@DeveloperApi
private[spark]
class RDDInfo(
val id: Int,
val name: String,
val numPartitions: Int,
val storageLevel: StorageLevel)
extends Ordered[RDDInfo] {

var numCachedPartitions = 0
var memSize = 0L
var diskSize = 0L
var tachyonSize = 0L

override def toString = {
import Utils.bytesToString
("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s;" +
"TachyonSize: %s; DiskSize: %s").format(
name, id, storageLevel.toString, numCachedPartitions, numPartitions,
bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize))
}

override def compare(that: RDDInfo) = {
this.id - that.id
}
}

/* Helper methods for storage-related objects */
private[spark]
object StorageUtils {
/** Helper methods for storage-related objects. */
private[spark] object StorageUtils {

/**
* Returns basic information of all RDDs persisted in the given SparkContext. This does not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = listener.storageStatusList
val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _)
val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_ + _)
val memUsed = storageStatusList.map(_.memUsed).fold(0L)(_ + _)
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_ + _)
val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
val execInfoSorted = execInfo.sortBy(_.getOrElse("Executor ID", ""))
Expand Down Expand Up @@ -106,9 +106,9 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
val execId = status.blockManagerId.executorId
val hostPort = status.blockManagerId.hostPort
val rddBlocks = status.blocks.size
val memUsed = status.memUsed()
val memUsed = status.memUsed
val maxMem = status.maxMem
val diskUsed = status.diskUsed()
val diskUsed = status.diskUsed
val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0)
val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ private[ui] class StorageListener(storageStatusListener: StorageStatusListener)
}

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized {
val rddInfo = stageSubmitted.stageInfo.rddInfo
_rddInfoMap.getOrElseUpdate(rddInfo.id, rddInfo)
val rddInfos = stageSubmitted.stageInfo.rddInfos
rddInfos.foreach { info => _rddInfoMap.getOrElseUpdate(info.id, info) }
}

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
Expand Down
22 changes: 12 additions & 10 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private[spark] object JsonProtocol {
* -------------------------------------------------------------------- */

def stageInfoToJson(stageInfo: StageInfo): JValue = {
val rddInfo = rddInfoToJson(stageInfo.rddInfo)
val rddInfo = JArray(stageInfo.rddInfos.map(rddInfoToJson).toList)
val submissionTime = stageInfo.submissionTime.map(JInt(_)).getOrElse(JNothing)
val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
val failureReason = stageInfo.failureReason.map(JString(_)).getOrElse(JNothing)
Expand Down Expand Up @@ -208,7 +208,8 @@ private[spark] object JsonProtocol {
taskMetrics.shuffleReadMetrics.map(shuffleReadMetricsToJson).getOrElse(JNothing)
val shuffleWriteMetrics =
taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing)
val updatedBlocks = taskMetrics.updatedBlocks.map { blocks =>
val updatedBlocks =
taskMetrics.updatedBlocks.map { blocks =>
JArray(blocks.toList.map { case (id, status) =>
("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
Expand Down Expand Up @@ -467,13 +468,13 @@ private[spark] object JsonProtocol {
val stageId = (json \ "Stage ID").extract[Int]
val stageName = (json \ "Stage Name").extract[String]
val numTasks = (json \ "Number of Tasks").extract[Int]
val rddInfo = rddInfoFromJson(json \ "RDD Info")
val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson)
val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])
val emittedTaskSizeWarning = (json \ "Emitted Task Size Warning").extract[Boolean]

val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfo)
val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos)
stageInfo.submissionTime = submissionTime
stageInfo.completionTime = completionTime
stageInfo.failureReason = failureReason
Expand Down Expand Up @@ -518,13 +519,14 @@ private[spark] object JsonProtocol {
Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)
metrics.shuffleWriteMetrics =
Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
metrics.updatedBlocks = Utils.jsonOption(json \ "Updated Blocks").map { value =>
value.extract[List[JValue]].map { block =>
val id = BlockId((block \ "Block ID").extract[String])
val status = blockStatusFromJson(block \ "Status")
(id, status)
metrics.updatedBlocks =
Utils.jsonOption(json \ "Updated Blocks").map { value =>
value.extract[List[JValue]].map { block =>
val id = BlockId((block \ "Block ID").extract[String])
val status = blockStatusFromJson(block \ "Status")
(id, status)
}
}
}
metrics
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar

whenExecuting(blockManager) {
val context = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false,
taskMetrics = TaskMetrics.empty())
taskMetrics = TaskMetrics.empty)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(1, 2, 3, 4))
}
Expand All @@ -73,7 +73,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar

whenExecuting(blockManager) {
val context = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false,
taskMetrics = TaskMetrics.empty())
taskMetrics = TaskMetrics.empty)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(5, 6, 7))
}
Expand All @@ -87,7 +87,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar

whenExecuting(blockManager) {
val context = new TaskContext(0, 0, 0, runningLocally = true, interrupted = false,
taskMetrics = TaskMetrics.empty())
taskMetrics = TaskMetrics.empty)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(1, 2, 3, 4))
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
val hadoopPart1 = generateFakeHadoopPartition()
val pipedRdd = new PipedRDD(nums, "printenv " + varName)
val tContext = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false,
taskMetrics = TaskMetrics.empty())
taskMetrics = TaskMetrics.empty)
val rddIter = pipedRdd.compute(hadoopPart1, tContext)
val arr = rddIter.toArray
assert(arr(0) == "/some/path")
Expand Down
Loading