Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19753][CORE] Un-register all shuffle output on a host in case of slave lost or fetch failure #17088

Closed
wants to merge 10 commits into from
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf,

// HashMaps for storing mapStatuses and cached serialized statuses in the driver.
// Statuses are dropped only by explicit de-registering.
protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
// Exposed for testing
val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala

private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
Expand Down
56 changes: 47 additions & 9 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -552,10 +552,8 @@ class DAGScheduler(
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @return a JobWaiter object that can be used to block until the job finishes executing
* or can be used to cancel the job.
*
* @throws IllegalArgumentException when partitions ids are illegal
*/
def submitJob[T, U](
Expand Down Expand Up @@ -599,7 +597,6 @@ class DAGScheduler(
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @throws Exception when the job fails
*/
def runJob[T, U](
Expand Down Expand Up @@ -1331,7 +1328,14 @@ class DAGScheduler(

// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))
if (env.blockManager.externalShuffleServiceEnabled) {
removeExecutorAndUnregisterOutputOnHost(bmAddress.executorId,
bmAddress.host, Some(task.epoch))
}
else {
removeExecutorAndUnregisterOutputOnExecutor(bmAddress.executorId,
true, Some(task.epoch))
}
}
}

Expand Down Expand Up @@ -1365,18 +1369,25 @@ class DAGScheduler(
*/
private[scheduler] def handleExecutorLost(
execId: String,
filesLost: Boolean,
fileLost: Boolean) {
removeExecutorAndUnregisterOutputOnExecutor(execId,
fileLost || !env.blockManager.externalShuffleServiceEnabled, None)
}


private[scheduler] def removeExecutorAndUnregisterOutputOnExecutor(
execId: String,
fileLost: Boolean,
maybeEpoch: Option[Long] = None) {
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
failedEpoch(execId) = currentEpoch
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
blockManagerMaster.removeExecutor(execId)

if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
if (fileLost) {
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleIdToMapStage) {
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log should be outside the for loop, like it was before

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, my bad, thanks for noticing.

stage.removeOutputsOnExecutor(execId)
mapOutputTracker.registerMapOutputs(
shuffleId,
Expand All @@ -1390,7 +1401,34 @@ class DAGScheduler(
}
} else {
logDebug("Additional executor lost message for " + execId +
"(epoch " + currentEpoch + ")")
"(epoch " + currentEpoch + ")")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as long as you're updating this, can you change it to use interpolation instead, just for consistency?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}
}

private[scheduler] def removeExecutorAndUnregisterOutputOnHost(
execId: String,
host: String,
maybeEpoch: Option[Long] = None) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add return type : Unit =

we didn't adopt that convention (nor did scala) till after a lot of this class was written, hence the inconsistency in this file, but new changes should follow this style.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
failedEpoch(execId) = currentEpoch
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
blockManagerMaster.removeExecutor(execId)
for ((shuffleId, stage) <- shuffleIdToMapStage) {
logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch))
stage.removeOutputsOnHost(host)
mapOutputTracker.registerMapOutputs(
shuffleId,
stage.outputLocInMapOutputTrackerFormat(),
changeEpoch = true)
}
if (shuffleIdToMapStage.isEmpty) {
mapOutputTracker.incrementEpoch()
}
clearCacheLocs()
} else {
logDebug("Additional executor lost message for " + execId +
"(epoch " + currentEpoch + ")")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for trying this refactoring, but I don't like the amount of repitition between these two helper methods now. Sorry this code was very confusing even before, and I haven't given constructive suggestions so far ... what do you think of this version?squito@3e33d5e

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made changes as suggested, thanks!

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,28 @@ private[spark] class ShuffleMapStage(
outputLocs.map(_.headOption.orNull)
}

/**
* Removes all shuffle outputs associated with this host. Note that this will also remove
* outputs which are served by an external shuffle server (if one exists), as they are still
* registered with this execId.
*/
def removeOutputsOnHost(host: String): Unit = {
var becameUnavailable = false
for (partition <- 0 until numPartitions) {
val prevList = outputLocs(partition)
val newList = prevList.filterNot(_.location.host == host)
outputLocs(partition) = newList
if (prevList != Nil && newList == Nil) {
becameUnavailable = true
_numAvailableOutputs -= 1
}
}
if (becameUnavailable) {
logInfo("%s is now unavailable on host %s (%d/%d, %s)".format(
this, host, _numAvailableOutputs, numPartitions, isAvailable))
}
}

/**
* Removes all shuffle outputs associated with this executor. Note that this will also remove
* outputs which are served by an external shuffle server (if one exists), as they are still
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,68 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assertDataStructuresEmpty()
}

test("All shuffle files should on the slave should be cleaned up when slave lost") {
// reset the test context with the right shuffle service config
afterEach()
val conf = new SparkConf()
conf.set("spark.shuffle.service.enabled", "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a another test with spark.shuffle.service.enabled = false?

init(conf)
runEvent(ExecutorAdded("exec-hostA1", "hostA"))
runEvent(ExecutorAdded("exec-hostA2", "hostA"))
runEvent(ExecutorAdded("exec-hostB", "hostB"))
val firstRDD = new MyRDD(sc, 3, Nil)
val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(2))
val firstShuffleId = firstShuffleDep.shuffleId
val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the number of partitions here needs to match the with the number used in the partitioner from its dependencies. Same below.

I know it doesn't matter in this test, but it becomes hard to understand what is going on in these tests if they have inconsistencies like this

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, it was confusing before. Changed accordingly.

val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
submit(reduceRdd, Array(0))
// map stage1 completes successfully, with one task on each executor
complete(taskSets(0), Seq(
(Success,
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))),
(Success,
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))),
(Success, makeMapStatus("hostB", 1))
))
// map stage2 completes successfully, with one task on each executor
complete(taskSets(1), Seq(
(Success,
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))),
(Success,
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))),
(Success, makeMapStatus("hostB", 1))
))
// make sure our test setup is correct
val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
assert(initialMapStatus1.count(_ != null) === 3)
assert(initialMapStatus1.map{_.location.executorId}.toSet ===
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map{..} => map(..)

Set("exec-hostA1", "exec-hostA2", "exec-hostB"))

val initialMapStatus2 = mapOutputTracker.mapStatuses.get(0).get
assert(initialMapStatus2.count(_ != null) === 3)
assert(initialMapStatus2.map{_.location.executorId}.toSet ===
Set("exec-hostA1", "exec-hostA2", "exec-hostB"))

// reduce stage fails with a fetch failure from one host
complete(taskSets(2), Seq(
(FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345), firstShuffleId, 0, 0, "ignored"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the FetchFailed message should reference shuffleDep.shuffleId instead of firstShuffleId?

null)
))

// Here is the main assertion -- make sure that we de-register
// the map outputs for both map stage from both executors on hostA
val mapStatus1 = mapOutputTracker.mapStatuses.get(0).get
assert(mapStatus1.count(_ != null) === 1)
assert(mapStatus1(2).location.executorId === "exec-hostB")
assert(mapStatus1(2).location.host === "hostB")

val mapStatus2 = mapOutputTracker.mapStatuses.get(1).get
assert(mapStatus2.count(_ != null) === 1)
assert(mapStatus2(2).location.executorId === "exec-hostB")
assert(mapStatus2(2).location.host === "hostB")
}

test("zero split job") {
var numResults = 0
var failureReason: Option[Exception] = None
Expand Down