Skip to content

Commit

Permalink
Merge github.com:apache/spark into ui-refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Apr 3, 2014
2 parents 7d57444 + 47ebea5 commit cd000b0
Show file tree
Hide file tree
Showing 111 changed files with 5,198 additions and 1,204 deletions.
2 changes: 1 addition & 1 deletion core/pom.xml
Expand Up @@ -150,7 +150,7 @@
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
<version>3.2.6</version>
<!-- see also exclusion for lift-json; this is necessary since it depends on
scala-library and scalap 2.10.0, but we use 2.10.3, and only override
scala-library and scalap 2.10.0, but we use 2.10.4, and only override
scala-library -->
<exclusions>
<exclusion>
Expand Down
21 changes: 10 additions & 11 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Expand Up @@ -84,7 +84,7 @@ class DAGScheduler(
private[scheduler] val stageIdToJobIds = new TimeStampedHashMap[Int, HashSet[Int]]
private[scheduler] val stageIdToStage = new TimeStampedHashMap[Int, Stage]
private[scheduler] val shuffleToMapStage = new TimeStampedHashMap[Int, Stage]
private[scheduler] val stageIdToActiveJob = new HashMap[Int, ActiveJob]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
private[scheduler] val resultStageToJob = new HashMap[Stage, ActiveJob]
private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]

Expand Down Expand Up @@ -536,7 +536,7 @@ class DAGScheduler(
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
runLocally(job)
} else {
stageIdToActiveJob(jobId) = job
jobIdToActiveJob(jobId) = job
activeJobs += job
resultStageToJob(finalStage) = job
listenerBus.post(
Expand All @@ -559,7 +559,7 @@ class DAGScheduler(
// Cancel all running jobs.
runningStages.map(_.jobId).foreach(handleJobCancellation)
activeJobs.clear() // These should already be empty by this point,
stageIdToActiveJob.clear() // but just in case we lost track of some jobs...
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...

case ExecutorAdded(execId, host) =>
handleExecutorAdded(execId, host)
Expand All @@ -569,7 +569,6 @@ class DAGScheduler(

case BeginEvent(task, taskInfo) =>
for (
job <- stageIdToActiveJob.get(task.stageId);
stage <- stageIdToStage.get(task.stageId);
stageInfo <- stageToInfos.get(stage)
) {
Expand Down Expand Up @@ -697,7 +696,7 @@ class DAGScheduler(
private def activeJobForStage(stage: Stage): Option[Int] = {
if (stageIdToJobIds.contains(stage.id)) {
val jobsThatUseStage: Array[Int] = stageIdToJobIds(stage.id).toArray.sorted
jobsThatUseStage.find(stageIdToActiveJob.contains)
jobsThatUseStage.find(jobIdToActiveJob.contains)
} else {
None
}
Expand Down Expand Up @@ -750,8 +749,8 @@ class DAGScheduler(
}
}

val properties = if (stageIdToActiveJob.contains(jobId)) {
stageIdToActiveJob(stage.jobId).properties
val properties = if (jobIdToActiveJob.contains(jobId)) {
jobIdToActiveJob(stage.jobId).properties
} else {
// this stage will be assigned to "default" pool
null
Expand Down Expand Up @@ -827,7 +826,7 @@ class DAGScheduler(
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
stageIdToActiveJob -= stage.jobId
jobIdToActiveJob -= stage.jobId
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
Expand Down Expand Up @@ -986,11 +985,11 @@ class DAGScheduler(
val independentStages = removeJobAndIndependentStages(jobId)
independentStages.foreach(taskScheduler.cancelTasks)
val error = new SparkException("Job %d cancelled".format(jobId))
val job = stageIdToActiveJob(jobId)
val job = jobIdToActiveJob(jobId)
job.listener.jobFailed(error)
jobIdToStageIds -= jobId
activeJobs -= job
stageIdToActiveJob -= jobId
jobIdToActiveJob -= jobId
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, job.finalStage.id)))
}
}
Expand All @@ -1011,7 +1010,7 @@ class DAGScheduler(
val error = new SparkException("Job aborted: " + reason)
job.listener.jobFailed(error)
jobIdToStageIdsRemove(job.jobId)
stageIdToActiveJob -= resultStage.jobId
jobIdToActiveJob -= resultStage.jobId
activeJobs -= job
resultStageToJob -= resultStage
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, failedStage.id)))
Expand Down
77 changes: 2 additions & 75 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Expand Up @@ -195,7 +195,7 @@ private[spark] object JsonProtocol {
taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing)
val updatedBlocks = taskMetrics.updatedBlocks.map { blocks =>
JArray(blocks.toList.map { case (id, status) =>
("Block ID" -> blockIdToJson(id)) ~
("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
})
}.getOrElse(JNothing)
Expand Down Expand Up @@ -284,35 +284,6 @@ private[spark] object JsonProtocol {
("Replication" -> storageLevel.replication)
}

def blockIdToJson(blockId: BlockId): JValue = {
val blockType = Utils.getFormattedClassName(blockId)
val json: JObject = blockId match {
case rddBlockId: RDDBlockId =>
("RDD ID" -> rddBlockId.rddId) ~
("Split Index" -> rddBlockId.splitIndex)
case shuffleBlockId: ShuffleBlockId =>
("Shuffle ID" -> shuffleBlockId.shuffleId) ~
("Map ID" -> shuffleBlockId.mapId) ~
("Reduce ID" -> shuffleBlockId.reduceId)
case broadcastBlockId: BroadcastBlockId =>
"Broadcast ID" -> broadcastBlockId.broadcastId
case broadcastHelperBlockId: BroadcastHelperBlockId =>
("Broadcast Block ID" -> blockIdToJson(broadcastHelperBlockId.broadcastId)) ~
("Helper Type" -> broadcastHelperBlockId.hType)
case taskResultBlockId: TaskResultBlockId =>
"Task ID" -> taskResultBlockId.taskId
case streamBlockId: StreamBlockId =>
("Stream ID" -> streamBlockId.streamId) ~
("Unique ID" -> streamBlockId.uniqueId)
case tempBlockId: TempBlockId =>
val uuid = UUIDToJson(tempBlockId.id)
"Temp ID" -> uuid
case testBlockId: TestBlockId =>
"Test ID" -> testBlockId.id
}
("Type" -> blockType) ~ json
}

def blockStatusToJson(blockStatus: BlockStatus): JValue = {
val storageLevel = storageLevelToJson(blockStatus.storageLevel)
("Storage Level" -> storageLevel) ~
Expand Down Expand Up @@ -513,7 +484,7 @@ private[spark] object JsonProtocol {
Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
metrics.updatedBlocks = Utils.jsonOption(json \ "Updated Blocks").map { value =>
value.extract[List[JValue]].map { block =>
val id = blockIdFromJson(block \ "Block ID")
val id = BlockId((block \ "Block ID").extract[String])
val status = blockStatusFromJson(block \ "Status")
(id, status)
}
Expand Down Expand Up @@ -616,50 +587,6 @@ private[spark] object JsonProtocol {
StorageLevel(useDisk, useMemory, deserialized, replication)
}

def blockIdFromJson(json: JValue): BlockId = {
val rddBlockId = Utils.getFormattedClassName(RDDBlockId)
val shuffleBlockId = Utils.getFormattedClassName(ShuffleBlockId)
val broadcastBlockId = Utils.getFormattedClassName(BroadcastBlockId)
val broadcastHelperBlockId = Utils.getFormattedClassName(BroadcastHelperBlockId)
val taskResultBlockId = Utils.getFormattedClassName(TaskResultBlockId)
val streamBlockId = Utils.getFormattedClassName(StreamBlockId)
val tempBlockId = Utils.getFormattedClassName(TempBlockId)
val testBlockId = Utils.getFormattedClassName(TestBlockId)

(json \ "Type").extract[String] match {
case `rddBlockId` =>
val rddId = (json \ "RDD ID").extract[Int]
val splitIndex = (json \ "Split Index").extract[Int]
new RDDBlockId(rddId, splitIndex)
case `shuffleBlockId` =>
val shuffleId = (json \ "Shuffle ID").extract[Int]
val mapId = (json \ "Map ID").extract[Int]
val reduceId = (json \ "Reduce ID").extract[Int]
new ShuffleBlockId(shuffleId, mapId, reduceId)
case `broadcastBlockId` =>
val broadcastId = (json \ "Broadcast ID").extract[Long]
new BroadcastBlockId(broadcastId)
case `broadcastHelperBlockId` =>
val broadcastBlockId =
blockIdFromJson(json \ "Broadcast Block ID").asInstanceOf[BroadcastBlockId]
val hType = (json \ "Helper Type").extract[String]
new BroadcastHelperBlockId(broadcastBlockId, hType)
case `taskResultBlockId` =>
val taskId = (json \ "Task ID").extract[Long]
new TaskResultBlockId(taskId)
case `streamBlockId` =>
val streamId = (json \ "Stream ID").extract[Int]
val uniqueId = (json \ "Unique ID").extract[Long]
new StreamBlockId(streamId, uniqueId)
case `tempBlockId` =>
val tempId = UUIDFromJson(json \ "Temp ID")
new TempBlockId(tempId)
case `testBlockId` =>
val testId = (json \ "Test ID").extract[String]
new TestBlockId(testId)
}
}

def blockStatusFromJson(json: JValue): BlockStatus = {
val storageLevel = storageLevelFromJson(json \ "Storage Level")
val memorySize = (json \ "Memory Size").extract[Long]
Expand Down
Expand Up @@ -428,7 +428,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
assert(scheduler.pendingTasks.isEmpty)
assert(scheduler.activeJobs.isEmpty)
assert(scheduler.failedStages.isEmpty)
assert(scheduler.stageIdToActiveJob.isEmpty)
assert(scheduler.jobIdToActiveJob.isEmpty)
assert(scheduler.jobIdToStageIds.isEmpty)
assert(scheduler.stageIdToJobIds.isEmpty)
assert(scheduler.stageIdToStage.isEmpty)
Expand Down

0 comments on commit cd000b0

Please sign in to comment.