Skip to content

Commit

Permalink
Properly send SparkListenerStageSubmitted and SparkListenerStageCompl…
Browse files Browse the repository at this point in the history
…eted.
  • Loading branch information
rxin committed Jul 30, 2014
1 parent bab1d8b commit 797c247
Showing 1 changed file with 19 additions and 15 deletions.
34 changes: 19 additions & 15 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler

import java.io.{NotSerializableException}
import java.io.NotSerializableException
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger

Expand Down Expand Up @@ -696,10 +696,25 @@ class DAGScheduler(
stage.pendingTasks.clear()
var tasks = ArrayBuffer[Task[_]]()

val properties = if (jobIdToActiveJob.contains(jobId)) {
jobIdToActiveJob(stage.jobId).properties
} else {
// this stage will be assigned to "default" pool
null
}

runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
listenerBus.post(SparkListenerStageSubmitted(stage.info, properties))

var broadcastRddBinary: Broadcast[Array[Byte]] = null
try {
broadcastRddBinary = stage.rdd.createBroadcastBinary()
} catch {
// In the case of a failure during serialization, abort the stage.
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString)
runningStages -= stage
Expand Down Expand Up @@ -727,21 +742,7 @@ class DAGScheduler(
}
}

val properties = if (jobIdToActiveJob.contains(jobId)) {
jobIdToActiveJob(stage.jobId).properties
} else {
// this stage will be assigned to "default" pool
null
}

if (tasks.size > 0) {
runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
listenerBus.post(SparkListenerStageSubmitted(stage.info, properties))

// Preemptively serialize a task to make sure it can be serialized. We are catching this
// exception here because it would be fairly hard to catch the non-serializable exception
// down the road, where we have several different implementations for local scheduler and
Expand All @@ -766,6 +767,9 @@ class DAGScheduler(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
stage.info.submissionTime = Some(clock.getTime())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should post
// SparkListenerStageCompleted here in case there are no tasks to run.
listenerBus.post(SparkListenerStageCompleted(stage.info))
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
runningStages -= stage
Expand Down

0 comments on commit 797c247

Please sign in to comment.