Skip to content

Commit

Permalink
[SPARK-19753][CORE] All shuffle files on a host should be removed in …
Browse files Browse the repository at this point in the history
…case of fetch failure or slave lost
  • Loading branch information
Sital Kedia committed Feb 27, 2017
1 parent eff7b40 commit 74ca88b
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 6 deletions.
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf,

// HashMaps for storing mapStatuses and cached serialized statuses in the driver.
// Statuses are dropped only by explicit de-registering.
protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
// Exposed for testing
val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala

private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
Expand Down
19 changes: 14 additions & 5 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class DAGScheduler(
*/
private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
private[scheduler] val execIdToHost = new HashMap[String, String]

// Stages we need to run whose parents aren't done
private[scheduler] val waitingStages = new HashSet[Stage]
Expand Down Expand Up @@ -1331,7 +1332,7 @@ class DAGScheduler(

// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))
handleExecutorLost(bmAddress.executorId, slaveLost = true, Some(task.epoch))
}
}

Expand Down Expand Up @@ -1365,19 +1366,26 @@ class DAGScheduler(
*/
private[scheduler] def handleExecutorLost(
execId: String,
filesLost: Boolean,
slaveLost: Boolean,
maybeEpoch: Option[Long] = None) {
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
failedEpoch(execId) = currentEpoch
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
blockManagerMaster.removeExecutor(execId)

if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
if (slaveLost || !env.blockManager.externalShuffleServiceEnabled) {
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleIdToMapStage) {
stage.removeOutputsOnExecutor(execId)
if (slaveLost) {
val host = execIdToHost.get(execId).get
logInfo(("Shuffle files lost for executor: %s (epoch %d)," +
" removing shuffle files on host: %s").format(execId, currentEpoch, host ))
stage.removeOutputsOnHost(host)
} else {
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
stage.removeOutputsOnExecutor(execId)
}
mapOutputTracker.registerMapOutputs(
shuffleId,
stage.outputLocInMapOutputTrackerFormat(),
Expand All @@ -1400,6 +1408,7 @@ class DAGScheduler(
logInfo("Host added was in lost list earlier: " + host)
failedEpoch -= execId
}
execIdToHost.put(execId, host)
}

private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,28 @@ private[spark] class ShuffleMapStage(
outputLocs.map(_.headOption.orNull)
}

/**
* Removes all shuffle outputs associated with this host. Note that this will also remove
* outputs which are served by an external shuffle server (if one exists), as they are still
* registered with this execId.
*/
def removeOutputsOnHost(host: String): Unit = {
var becameUnavailable = false
for (partition <- 0 until numPartitions) {
val prevList = outputLocs(partition)
val newList = prevList.filterNot(_.location.host == host)
outputLocs(partition) = newList
if (prevList != Nil && newList == Nil) {
becameUnavailable = true
_numAvailableOutputs -= 1
}
}
if (becameUnavailable) {
logInfo("%s is now unavailable on host %s (%d/%d, %s)".format(
this, host, _numAvailableOutputs, numPartitions, isAvailable))
}
}

/**
* Removes all shuffle outputs associated with this executor. Note that this will also remove
* outputs which are served by an external shuffle server (if one exists), as they are still
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,31 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assertDataStructuresEmpty()
}

test("All shuffle files should on the slave should be cleaned up when slave lost") {
// reset the test context with the right shuffle service config
afterEach()
val conf = new SparkConf()
conf.set("spark.shuffle.service.enabled", "true")
init(conf)
runEvent(ExecutorAdded("exec-hostA1", "hostA"))
runEvent(ExecutorAdded("exec-hostA2", "hostA"))
runEvent(ExecutorAdded("exec-hostB", "hostB"))
val shuffleMapRdd = new MyRDD(sc, 3, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
runEvent(ExecutorLost("exec-hostA1", SlaveLost("", true)))
val mapStatus = mapOutputTracker.mapStatuses.get(0).get.filter(_!= null)
assert(mapStatus.size === 1)
assert(mapStatus(0).location.executorId === "exec-hostB")
assert(mapStatus(0).location.host === "hostB")
}

test("zero split job") {
var numResults = 0
var failureReason: Option[Exception] = None
Expand Down

0 comments on commit 74ca88b

Please sign in to comment.