diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 3e10b9eee4e2..5d48bc7c9655 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -55,7 +55,8 @@ private class ShuffleStatus(numPartitions: Int) { * locations is so small that we choose to ignore that case and store only a single location * for each output. */ - private[this] val mapStatuses = new Array[MapStatus](numPartitions) + // Exposed for testing + val mapStatuses = new Array[MapStatus](numPartitions) /** * The cached result of serializing the map statuses array. This cache is lazily populated when @@ -105,14 +106,30 @@ private class ShuffleStatus(numPartitions: Int) { } } + /** + * Removes all shuffle outputs associated with this host. Note that this will also remove + * outputs which are served by an external shuffle server (if one exists). + */ + def removeOutputsOnHost(host: String): Unit = { + removeOutputsByFilter(x => x.host == host) + } + /** * Removes all map outputs associated with the specified executor. Note that this will also * remove outputs which are served by an external shuffle server (if one exists), as they are * still registered with that execId. */ def removeOutputsOnExecutor(execId: String): Unit = synchronized { + removeOutputsByFilter(x => x.executorId == execId) + } + + /** + * Removes all shuffle outputs which satisfies the filter. Note that this will also + * remove outputs which are served by an external shuffle server (if one exists). + */ + def removeOutputsByFilter(f: (BlockManagerId) => Boolean): Unit = synchronized { for (mapId <- 0 until mapStatuses.length) { - if (mapStatuses(mapId) != null && mapStatuses(mapId).location.executorId == execId) { + if (mapStatuses(mapId) != null && f(mapStatuses(mapId).location)) { _numAvailableOutputs -= 1 mapStatuses(mapId) = null invalidateSerializedMapOutputStatusCache() @@ -317,7 +334,8 @@ private[spark] class MapOutputTrackerMaster( // HashMap for storing shuffleStatuses in the driver. // Statuses are dropped only by explicit de-registering. - private val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala + // Exposed for testing + val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) @@ -415,6 +433,15 @@ private[spark] class MapOutputTrackerMaster( } } + /** + * Removes all shuffle outputs associated with this host. Note that this will also remove + * outputs which are served by an external shuffle server (if one exists). + */ + def removeOutputsOnHost(host: String): Unit = { + shuffleStatuses.valuesIterator.foreach { _.removeOutputsOnHost(host) } + incrementEpoch() + } + /** * Removes all shuffle outputs associated with this executor. Note that this will also remove * outputs which are served by an external shuffle server (if one exists), as they are still diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7827e6760f35..84ef57f2d271 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -151,6 +151,14 @@ package object config { .createOptional // End blacklist confs + private[spark] val UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE = + ConfigBuilder("spark.files.fetchFailure.unRegisterOutputOnHost") + .doc("Whether to un-register all the outputs on the host in condition that we receive " + + " a FetchFailure. This is set default to false, which means, we only un-register the " + + " outputs related to the exact executor(instead of the host) on a FetchFailure.") + .booleanConf + .createWithDefault(false) + private[spark] val LISTENER_BUS_EVENT_QUEUE_CAPACITY = ConfigBuilder("spark.scheduler.listenerbus.eventqueue.capacity") .withAlternative("spark.scheduler.listenerbus.eventqueue.size") diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4149c95a7c6e..fafe9cafdc18 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -35,6 +35,7 @@ import org.apache.commons.lang3.SerializationUtils import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.executor.TaskMetrics +import org.apache.spark.internal.config import org.apache.spark.internal.Logging import org.apache.spark.network.util.JavaUtils import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} @@ -193,7 +194,7 @@ class DAGScheduler( * executor(instead of the host) on a FetchFailure. */ private[scheduler] val unRegisterOutputOnHostOnFetchFailure = - sc.getConf.getBoolean("spark.files.fetchFailure.unRegisterOutputOnHost", false) + sc.getConf.get(config.UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE) /** * Number of consecutive stage attempts allowed before a stage is aborted. @@ -1414,14 +1415,20 @@ class DAGScheduler( failedEpoch(execId) = currentEpoch logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch)) blockManagerMaster.removeExecutor(execId) - - if (filesLost || !env.blockManager.externalShuffleServiceEnabled) { - logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch)) - mapOutputTracker.removeOutputsOnExecutor(execId) + if (fileLost) { + hostToUnregisterOutputs match { + case Some(host) => + logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch)) + mapOutputTracker.removeOutputsOnHost(host) + case None => + logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch)) + mapOutputTracker.removeOutputsOnExecutor(execId) + } clearCacheLocs() + + } else { + logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch)) } - } else { - logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch)) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index e580917ff61f..ddd328110674 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -396,7 +396,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assertDataStructuresEmpty() } - test("All shuffle files should on the slave should be cleaned up when slave lost") { + test("All shuffle files on the slave should be cleaned up when slave lost") { // reset the test context with the right shuffle service config afterEach() val conf = new SparkConf() @@ -411,6 +411,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou val firstShuffleId = firstShuffleDep.shuffleId val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3)) + val secondShuffleId = shuffleDep.shuffleId val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) submit(reduceRdd, Array(0)) // map stage1 completes successfully, with one task on each executor @@ -430,12 +431,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou (Success, makeMapStatus("hostB", 1)) )) // make sure our test setup is correct - val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get + val initialMapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses + // val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get assert(initialMapStatus1.count(_ != null) === 3) assert(initialMapStatus1.map{_.location.executorId}.toSet === Set("exec-hostA1", "exec-hostA2", "exec-hostB")) - val initialMapStatus2 = mapOutputTracker.mapStatuses.get(0).get + val initialMapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses + // val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get assert(initialMapStatus2.count(_ != null) === 3) assert(initialMapStatus2.map{_.location.executorId}.toSet === Set("exec-hostA1", "exec-hostA2", "exec-hostB")) @@ -448,12 +451,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // Here is the main assertion -- make sure that we de-register // the map outputs for both map stage from both executors on hostA - val mapStatus1 = mapOutputTracker.mapStatuses.get(0).get + + val mapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses assert(mapStatus1.count(_ != null) === 1) assert(mapStatus1(2).location.executorId === "exec-hostB") assert(mapStatus1(2).location.host === "hostB") - val mapStatus2 = mapOutputTracker.mapStatuses.get(1).get + val mapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses assert(mapStatus2.count(_ != null) === 1) assert(mapStatus2(2).location.executorId === "exec-hostB") assert(mapStatus2(2).location.host === "hostB")