Skip to content

Commit

Permalink
Do not un-register shuffle files in case of executor lost
Browse files Browse the repository at this point in the history
  • Loading branch information
Sital Kedia committed Mar 1, 2017
1 parent 74ca88b commit 6898c2b
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 14 deletions.
25 changes: 11 additions & 14 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ class DAGScheduler(
*/
private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
private[scheduler] val execIdToHost = new HashMap[String, String]

// Stages we need to run whose parents aren't done
private[scheduler] val waitingStages = new HashSet[Stage]
Expand Down Expand Up @@ -553,10 +552,8 @@ class DAGScheduler(
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @return a JobWaiter object that can be used to block until the job finishes executing
* or can be used to cancel the job.
*
* @throws IllegalArgumentException when partitions ids are illegal
*/
def submitJob[T, U](
Expand Down Expand Up @@ -600,7 +597,6 @@ class DAGScheduler(
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @throws Exception when the job fails
*/
def runJob[T, U](
Expand Down Expand Up @@ -1332,7 +1328,8 @@ class DAGScheduler(

// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, slaveLost = true, Some(task.epoch))
handleExecutorLost(bmAddress.executorId, fileLost = false, hostLost = true,
Some(bmAddress.host), Some(task.epoch))
}
}

Expand Down Expand Up @@ -1366,23 +1363,24 @@ class DAGScheduler(
*/
private[scheduler] def handleExecutorLost(
execId: String,
slaveLost: Boolean,
fileLost: Boolean,
hostLost: Boolean = false,
maybeHost: Option[String] = None,
maybeEpoch: Option[Long] = None) {
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
failedEpoch(execId) = currentEpoch
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
blockManagerMaster.removeExecutor(execId)

if (slaveLost || !env.blockManager.externalShuffleServiceEnabled) {
if (fileLost || hostLost || !env.blockManager.externalShuffleServiceEnabled) {
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleIdToMapStage) {
if (slaveLost) {
val host = execIdToHost.get(execId).get
if (hostLost) {
logInfo(("Shuffle files lost for executor: %s (epoch %d)," +
" removing shuffle files on host: %s").format(execId, currentEpoch, host ))
stage.removeOutputsOnHost(host)
} else {
" removing shuffle files on host: %s").format(execId, currentEpoch, maybeHost.get))
stage.removeOutputsOnHost(maybeHost.get)
}
else {
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
stage.removeOutputsOnExecutor(execId)
}
Expand All @@ -1408,7 +1406,6 @@ class DAGScheduler(
logInfo("Host added was in lost list earlier: " + host)
failedEpoch -= execId
}
execIdToHost.put(execId, host)
}

private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
scheduler.handleExecutorLost("exec-hostA1", fileLost = false, hostLost = true, Some("hostA"))
runEvent(ExecutorLost("exec-hostA1", SlaveLost("", true)))
val mapStatus = mapOutputTracker.mapStatuses.get(0).get.filter(_!= null)
assert(mapStatus.size === 1)
Expand Down

0 comments on commit 6898c2b

Please sign in to comment.