Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19753][CORE] Un-register all shuffle output on a host in case of slave lost or fetch failure #17088

Closed
wants to merge 10 commits into from
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf,

// HashMaps for storing mapStatuses and cached serialized statuses in the driver.
// Statuses are dropped only by explicit de-registering.
protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
// Exposed for testing
val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala

private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
Expand Down
19 changes: 14 additions & 5 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class DAGScheduler(
*/
private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
private[scheduler] val execIdToHost = new HashMap[String, String]

// Stages we need to run whose parents aren't done
private[scheduler] val waitingStages = new HashSet[Stage]
Expand Down Expand Up @@ -1331,7 +1332,7 @@ class DAGScheduler(

// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))
handleExecutorLost(bmAddress.executorId, slaveLost = true, Some(task.epoch))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could use bmAddress.host, then you wouldn't need to store another execToHost map (though it would require a little more refactoring)

Copy link
Contributor

@kayousterhout kayousterhout Feb 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's correct to assume that a FetchFailure means that all of the executors on the slave were lost. You could have a failure because one executor died, but the other executors on the host are OK, right? (UPDATED: I realized this is the same comment @mridulm made above)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kayousterhout - This change applies only when external shuffle service is enabled, in that case, a fetch failure would mean that the external shuffle service is unavailable, so we should remove all the output on that host, right? For case, when shuffle service is not enabled, this change should be a no-op.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito - Good point, will do.

}
}

Expand Down Expand Up @@ -1365,19 +1366,26 @@ class DAGScheduler(
*/
private[scheduler] def handleExecutorLost(
execId: String,
filesLost: Boolean,
slaveLost: Boolean,
maybeEpoch: Option[Long] = None) {
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
failedEpoch(execId) = currentEpoch
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
blockManagerMaster.removeExecutor(execId)

if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
if (slaveLost || !env.blockManager.externalShuffleServiceEnabled) {
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleIdToMapStage) {
stage.removeOutputsOnExecutor(execId)
if (slaveLost) {
val host = execIdToHost.get(execId).get
logInfo(("Shuffle files lost for executor: %s (epoch %d)," +
" removing shuffle files on host: %s").format(execId, currentEpoch, host ))
stage.removeOutputsOnHost(host)
} else {
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
stage.removeOutputsOnExecutor(execId)
}
mapOutputTracker.registerMapOutputs(
shuffleId,
stage.outputLocInMapOutputTrackerFormat(),
Expand All @@ -1400,6 +1408,7 @@ class DAGScheduler(
logInfo("Host added was in lost list earlier: " + host)
failedEpoch -= execId
}
execIdToHost.put(execId, host)
}

private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,28 @@ private[spark] class ShuffleMapStage(
outputLocs.map(_.headOption.orNull)
}

/**
* Removes all shuffle outputs associated with this host. Note that this will also remove
* outputs which are served by an external shuffle server (if one exists), as they are still
* registered with this execId.
*/
def removeOutputsOnHost(host: String): Unit = {
var becameUnavailable = false
for (partition <- 0 until numPartitions) {
val prevList = outputLocs(partition)
val newList = prevList.filterNot(_.location.host == host)
outputLocs(partition) = newList
if (prevList != Nil && newList == Nil) {
becameUnavailable = true
_numAvailableOutputs -= 1
}
}
if (becameUnavailable) {
logInfo("%s is now unavailable on host %s (%d/%d, %s)".format(
this, host, _numAvailableOutputs, numPartitions, isAvailable))
}
}

/**
* Removes all shuffle outputs associated with this executor. Note that this will also remove
* outputs which are served by an external shuffle server (if one exists), as they are still
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,31 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assertDataStructuresEmpty()
}

test("All shuffle files should on the slave should be cleaned up when slave lost") {
// reset the test context with the right shuffle service config
afterEach()
val conf = new SparkConf()
conf.set("spark.shuffle.service.enabled", "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a another test with spark.shuffle.service.enabled = false?

init(conf)
runEvent(ExecutorAdded("exec-hostA1", "hostA"))
runEvent(ExecutorAdded("exec-hostA2", "hostA"))
runEvent(ExecutorAdded("exec-hostB", "hostB"))
val shuffleMapRdd = new MyRDD(sc, 3, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
runEvent(ExecutorLost("exec-hostA1", SlaveLost("", true)))
val mapStatus = mapOutputTracker.mapStatuses.get(0).get.filter(_!= null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are a couple of problems with this test.

  • you are trying to change the behavior on a fetch failure, so really you should have tasks completing with a FetchFailed
  • makeMapStatus is actually doing the wrong thing in this case, since its expecting executor ids to be "exec-$host", but you've got a "1" or "2" appended to some of them

I think this is better:

    submit(reduceRdd, Array(0))
    // map stage completes successfully, with one task on each executor
    complete(taskSets(0), Seq(
      (Success,
        MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))),
      (Success,
        MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))),
      (Success, makeMapStatus("hostB", 1))
    ))
    // make sure our test setup is correct
    val initialMapStatus = mapOutputTracker.mapStatuses.get(0).get
    assert(initialMapStatus.count(_ != null) === 3)
    assert(initialMapStatus.map{_.location.executorId}.toSet ===
      Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
    // reduce stage fails with a fetch failure from one host
    complete(taskSets(1), Seq(
      (FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345), shuffleId, 0, 0, "ignored"),
        null)
    ))
    // Here is the main assertion -- make sure that we de-register the map output from both executors on hostA
    val mapStatus = mapOutputTracker.mapStatuses.get(0).get
    assert(mapStatus.count(_ != null) === 1)
    assert(mapStatus(2).location.executorId === "exec-hostB")
    assert(mapStatus(2).location.host === "hostB")

this version fails until you reverse the if / else I pointed out in the dagscheduler.

it would also be nice if this included map output from multiple stages registered on the given host, so you could check that all output is deregistered, not just the one shuffleId which had an error.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for providing a better test case, I also modified it to include map output from multiple stages.

assert(mapStatus.size === 1)
assert(mapStatus(0).location.executorId === "exec-hostB")
assert(mapStatus(0).location.host === "hostB")
}

test("zero split job") {
var numResults = 0
var failureReason: Option[Exception] = None
Expand Down