Skip to content

Commit

Permalink
Rebase with master
Browse files Browse the repository at this point in the history
  • Loading branch information
Sital Kedia committed Jun 14, 2017
1 parent b26e99d commit 74f285a
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 15 deletions.
33 changes: 30 additions & 3 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ private class ShuffleStatus(numPartitions: Int) {
* locations is so small that we choose to ignore that case and store only a single location
* for each output.
*/
private[this] val mapStatuses = new Array[MapStatus](numPartitions)
// Exposed for testing
val mapStatuses = new Array[MapStatus](numPartitions)

/**
* The cached result of serializing the map statuses array. This cache is lazily populated when
Expand Down Expand Up @@ -105,14 +106,30 @@ private class ShuffleStatus(numPartitions: Int) {
}
}

/**
* Removes all shuffle outputs associated with this host. Note that this will also remove
* outputs which are served by an external shuffle server (if one exists).
*/
def removeOutputsOnHost(host: String): Unit = {
removeOutputsByFilter(x => x.host == host)
}

/**
* Removes all map outputs associated with the specified executor. Note that this will also
* remove outputs which are served by an external shuffle server (if one exists), as they are
* still registered with that execId.
*/
def removeOutputsOnExecutor(execId: String): Unit = synchronized {
removeOutputsByFilter(x => x.executorId == execId)
}

/**
* Removes all shuffle outputs which satisfies the filter. Note that this will also
* remove outputs which are served by an external shuffle server (if one exists).
*/
def removeOutputsByFilter(f: (BlockManagerId) => Boolean): Unit = synchronized {
for (mapId <- 0 until mapStatuses.length) {
if (mapStatuses(mapId) != null && mapStatuses(mapId).location.executorId == execId) {
if (mapStatuses(mapId) != null && f(mapStatuses(mapId).location)) {
_numAvailableOutputs -= 1
mapStatuses(mapId) = null
invalidateSerializedMapOutputStatusCache()
Expand Down Expand Up @@ -317,7 +334,8 @@ private[spark] class MapOutputTrackerMaster(

// HashMap for storing shuffleStatuses in the driver.
// Statuses are dropped only by explicit de-registering.
private val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala
// Exposed for testing
val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala

private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)

Expand Down Expand Up @@ -415,6 +433,15 @@ private[spark] class MapOutputTrackerMaster(
}
}

/**
* Removes all shuffle outputs associated with this host. Note that this will also remove
* outputs which are served by an external shuffle server (if one exists).
*/
def removeOutputsOnHost(host: String): Unit = {
shuffleStatuses.valuesIterator.foreach { _.removeOutputsOnHost(host) }
incrementEpoch()
}

/**
* Removes all shuffle outputs associated with this executor. Note that this will also remove
* outputs which are served by an external shuffle server (if one exists), as they are still
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ package object config {
.createOptional
// End blacklist confs

private[spark] val UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE =
ConfigBuilder("spark.files.fetchFailure.unRegisterOutputOnHost")
.doc("Whether to un-register all the outputs on the host in condition that we receive " +
" a FetchFailure. This is set default to false, which means, we only un-register the " +
" outputs related to the exact executor(instead of the host) on a FetchFailure.")
.booleanConf
.createWithDefault(false)

private[spark] val LISTENER_BUS_EVENT_QUEUE_CAPACITY =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.capacity")
.withAlternative("spark.scheduler.listenerbus.eventqueue.size")
Expand Down
21 changes: 14 additions & 7 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.commons.lang3.SerializationUtils
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.config
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
Expand Down Expand Up @@ -193,7 +194,7 @@ class DAGScheduler(
* executor(instead of the host) on a FetchFailure.
*/
private[scheduler] val unRegisterOutputOnHostOnFetchFailure =
sc.getConf.getBoolean("spark.files.fetchFailure.unRegisterOutputOnHost", false)
sc.getConf.get(config.UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE)

/**
* Number of consecutive stage attempts allowed before a stage is aborted.
Expand Down Expand Up @@ -1414,14 +1415,20 @@ class DAGScheduler(
failedEpoch(execId) = currentEpoch
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
blockManagerMaster.removeExecutor(execId)

if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
mapOutputTracker.removeOutputsOnExecutor(execId)
if (fileLost) {
hostToUnregisterOutputs match {
case Some(host) =>
logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch))
mapOutputTracker.removeOutputsOnHost(host)
case None =>
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
mapOutputTracker.removeOutputsOnExecutor(execId)
}
clearCacheLocs()

} else {
logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch))
}
} else {
logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assertDataStructuresEmpty()
}

test("All shuffle files should on the slave should be cleaned up when slave lost") {
test("All shuffle files on the slave should be cleaned up when slave lost") {
// reset the test context with the right shuffle service config
afterEach()
val conf = new SparkConf()
Expand All @@ -411,6 +411,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
val firstShuffleId = firstShuffleDep.shuffleId
val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3))
val secondShuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
submit(reduceRdd, Array(0))
// map stage1 completes successfully, with one task on each executor
Expand All @@ -430,12 +431,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
(Success, makeMapStatus("hostB", 1))
))
// make sure our test setup is correct
val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
val initialMapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
// val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
assert(initialMapStatus1.count(_ != null) === 3)
assert(initialMapStatus1.map{_.location.executorId}.toSet ===
Set("exec-hostA1", "exec-hostA2", "exec-hostB"))

val initialMapStatus2 = mapOutputTracker.mapStatuses.get(0).get
val initialMapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
// val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
assert(initialMapStatus2.count(_ != null) === 3)
assert(initialMapStatus2.map{_.location.executorId}.toSet ===
Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
Expand All @@ -448,12 +451,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou

// Here is the main assertion -- make sure that we de-register
// the map outputs for both map stage from both executors on hostA
val mapStatus1 = mapOutputTracker.mapStatuses.get(0).get

val mapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
assert(mapStatus1.count(_ != null) === 1)
assert(mapStatus1(2).location.executorId === "exec-hostB")
assert(mapStatus1(2).location.host === "hostB")

val mapStatus2 = mapOutputTracker.mapStatuses.get(1).get
val mapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
assert(mapStatus2.count(_ != null) === 1)
assert(mapStatus2(2).location.executorId === "exec-hostB")
assert(mapStatus2(2).location.host === "hostB")
Expand Down

0 comments on commit 74f285a

Please sign in to comment.