Skip to content

Commit

Permalink
Remove local execution code in DAGScheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 17, 2015
1 parent 8975d96 commit b0835dc
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 93 deletions.
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,6 @@ private[spark] class Executor(
val (value, accumUpdates) = try {
task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
} finally {
// Note: this memory freeing logic is duplicated in DAGScheduler.runLocallyWithinThread;
// when changing this, make sure to update both copies.
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (freedMemory > 0) {
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
Expand Down
101 changes: 10 additions & 91 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator
import org.apache.spark.rdd.RDD
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.storage._
import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util._
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat

Expand Down Expand Up @@ -128,10 +127,6 @@ class DAGScheduler(
// This is only safe because DAGScheduler runs in a single thread.
private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()


/** If enabled, we may run certain actions like take() and first() locally. */
private val localExecutionEnabled = sc.getConf.getBoolean("spark.localExecution.enabled", false)

/** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)

Expand Down Expand Up @@ -654,73 +649,6 @@ class DAGScheduler(
}
}

/**
* Run a job on an RDD locally, assuming it has only a single partition and no dependencies.
* We run the operation in a separate thread just in case it takes a bunch of time, so that we
* don't block the DAGScheduler event loop or other concurrent jobs.
*/
protected def runLocally(job: ActiveJob) {
logInfo("Computing the requested partition locally")
new Thread("Local computation of job " + job.jobId) {
override def run() {
runLocallyWithinThread(job)
}
}.start()
}

// Broken out for easier testing in DAGSchedulerSuite.
protected def runLocallyWithinThread(job: ActiveJob) {
var jobResult: JobResult = JobSucceeded
try {
val rdd = job.finalStage.rdd
val split = rdd.partitions(job.partitions(0))
val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager)
val taskContext =
new TaskContextImpl(
job.finalStage.id,
job.partitions(0),
taskAttemptId = 0,
attemptNumber = 0,
taskMemoryManager = taskMemoryManager,
runningLocally = true)
TaskContext.setTaskContext(taskContext)
try {
val result = job.func(taskContext, rdd.iterator(split, taskContext))
job.listener.taskSucceeded(0, result)
} finally {
taskContext.markTaskCompleted()
TaskContext.unset()
// Note: this memory freeing logic is duplicated in Executor.run(); when changing this,
// make sure to update both copies.
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (freedMemory > 0) {
if (sc.getConf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
throw new SparkException(s"Managed memory leak detected; size = $freedMemory bytes")
} else {
logError(s"Managed memory leak detected; size = $freedMemory bytes")
}
}
}
} catch {
case e: Exception =>
val exception = new SparkDriverExecutionException(e)
jobResult = JobFailed(exception)
job.listener.jobFailed(exception)
case oom: OutOfMemoryError =>
val exception = new SparkException("Local job aborted due to out of memory error", oom)
jobResult = JobFailed(exception)
job.listener.jobFailed(exception)
} finally {
val s = job.finalStage
// clean up data structures that were populated for a local job,
// but that won't get cleaned up via the normal paths through
// completion events or stage abort
stageIdToStage -= s.id
jobIdToStageIds -= job.jobId
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), jobResult))
}
}

/** Finds the earliest-created active job that needs the stage */
// TODO: Probably should actually find among the active jobs that need this
// stage the one with the highest priority (highest-priority pool, earliest created).
Expand Down Expand Up @@ -801,29 +729,20 @@ class DAGScheduler(
if (finalStage != null) {
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
job.jobId, callSite.shortForm, partitions.length, allowLocal))
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val shouldRunLocally =
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
val jobSubmissionTime = clock.getTimeMillis()
if (shouldRunLocally) {
// Compute very short actions like first() or take() with no parent stages locally.
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
runLocally(job)
} else {
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.resultOfJob = Some(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.resultOfJob = Some(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}
submitWaitingStages()
}
Expand Down

0 comments on commit b0835dc

Please sign in to comment.