Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-2387: remove stage barrier #1328

Closed
wants to merge 91 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
163302d
minor fix
May 5, 2014
f81476d
Merge branch 'master' of https://github.com/lirui-intel/spark
May 7, 2014
3124380
try to locate the point to remove the barrier
May 8, 2014
8e625c0
apply upstream hot fix
May 8, 2014
1d5d0f0
RemoveStageBarrier: support partial map outputs
May 9, 2014
c4f4054
RemoveStageBarrier: build fix
May 11, 2014
444d2d9
RemoveStageBarrier: register map outputs progressively
May 11, 2014
2df1d4e
RemoveStageBarrier: increment epoch for progressive registration
May 12, 2014
9f18dc7
RemoveStageBarrier: fix check free CPUs
May 12, 2014
7af23c0
RemoveStageBarrier: make reducers refresh map outputs less often
May 13, 2014
9a32a17
RemoveStageBarrier: start reducers earlier
May 13, 2014
9ffb208
RemoveStageBarrier: add log info
May 14, 2014
ef3b043
RemoveStageBarrier: adjust sleep interval
May 14, 2014
4213d63
RemoveStageBarrier: add a new iterator to manage partial map outputs
May 15, 2014
376230a
RemoveStageBarrier: minor fixes
May 16, 2014
efd31ef
RemoveStageBarrier: fix: reducers may fail due to very slow mappers
May 16, 2014
3cb944c
RemoveStageBarrier: add some log info
May 16, 2014
641715e
RemoveStageBarrier: stage with a bigger ID should take precedence
May 19, 2014
b0c2df2
RemoveStageBarrier: track whether map output for a shuffle is partial…
May 23, 2014
75d2744
RemoveStageBarrier: refine how we get the stage to pre-start
May 23, 2014
b7f1f84
RemoveStageBarrier: indicate the output is partial for progressive re…
May 23, 2014
be47408
add some debug info
May 26, 2014
c88014b
add a new locality level for tasks with no preferred locations
May 26, 2014
133a356
re-compute pending list when new executor is added
May 26, 2014
7d92f9a
pendingTasksWithNoPrefs should only contain tasks that really have no…
May 26, 2014
c1de426
make the delay schedule configurable
May 27, 2014
e57e081
clean up
May 27, 2014
fda0281
do some refactor
May 27, 2014
781861d
RemoveStageBarrier: fix problem with consolidated shuffle file
May 28, 2014
679813b
RemoveStageBarrier: should fail the pre-started stages if the parent …
May 29, 2014
563d743
RemoveStageBarrier: fix issue with empty shuffle blocks
May 29, 2014
2ab311e
RemoveStageBarrier: allow partial map output by default
May 29, 2014
46da965
RemoveStageBarrier: make sure the feature is enabled before we use pa…
May 29, 2014
a89c93f
RemoveStageBarrier: partialForShuffle may cause infinite loop
May 30, 2014
5cfbae8
RemoveStageBarrier: cannot only depend on epoch to determine if the l…
May 30, 2014
2df5939
RemoveStageBarrier: fix bug
May 30, 2014
6891d58
RemoveStageBarrier: adjust fetching order of CoGroupedRDD
Jun 3, 2014
104ebe3
merge upstream master
Jun 4, 2014
5dd28dc
RemoveStageBarrier: make sure pre-started stage has lower priority to…
Jun 4, 2014
6cdf2a3
RemoveStageBarrier: revert previous changes to CoGroupedRDD
Jun 9, 2014
af000f7
RemoveStageBarrier: sleep less waiting for new map outputs
Jun 11, 2014
8349424
RemoveStageBarrier: don't rely on epoch for updated map statuses
Jun 12, 2014
28679b9
RemoveStageBarrier: add a proxy to update partial map outputs periodi…
Jun 13, 2014
71b87e5
RemoveStageBarrier: remove verbose logs
Jun 13, 2014
04f17e8
RemoveStageBarrier: don't increase epoch for partial map output regis…
Jun 16, 2014
eafa476
RemoveStageBarrier: don't put partial outputs in cache
Jun 16, 2014
7547686
RemoveStageBarrier: block reducers waiting for new map outputs
Jun 17, 2014
ca83d19
RemoveStageBarrier: bug fix
Jun 17, 2014
539f1a8
RemoveStageBarrier: add API to SchedulerBackend to tell if there's fr…
Jun 17, 2014
6e10488
RemoveStageBarrier: refine logs
Jun 18, 2014
a48d592
RemoveStageBarrier: fix the way we compute free slots
Jun 18, 2014
a418f03
RemoveStageBarrier: when a task finishes, launch new tasks before pop…
Jun 23, 2014
3ced2bb
RemoveStageBarrier: make offer after successful/failed task is proper…
Jun 24, 2014
5b0031a
RemoveStageBarrier: handle failed task in a synchronized manner
Jun 25, 2014
3c52c69
RemoveStageBarrier: add temp test code to detect deadlock
Jun 27, 2014
0473e3b
RemoveStageBarrier: maintain support for asynchronous handling failed…
Jun 27, 2014
d267c9b
RemoveStageBarrier: fix previously found problem
Jun 27, 2014
118914b
RemoveStageBarrier: fix test code
Jun 27, 2014
fe63024
RemoveStageBarrier: remove temp code
Jun 27, 2014
a996c77
RemoveStageBarrier: add temp test code
Jun 27, 2014
cef517b
RemoveStageBarrier: fix shuffle map stage fail over
Jun 27, 2014
7d9a4a4
RemoveStageBarrier: kill running tasks when resubmit failed stages
Jun 30, 2014
5697b98
RemoveStageBarrier: refine temp test code
Jun 30, 2014
4f80b1d
RemoveStageBarrier: fix test code
Jun 30, 2014
39ddb9d
RemoveStageBarrier: remove temp code
Jun 30, 2014
8cb8e4c
RemoveStageBarrier: kill running tasks before resubmit failed stages
Jun 30, 2014
bc69fed
RemoveStageBarrier: add temp test code
Jul 1, 2014
1e1907d
RemoveStageBarrier: fix test code
Jul 1, 2014
930136d
RemoveStageBarrier: handle fetch failed task only if it comes from a …
Jul 1, 2014
b49cbdb
RemoveStageBarrier: kill tasks without interrupting the thread
Jul 1, 2014
6bcca9b
RemoveStageBarrier: remove test code
Jul 1, 2014
8fded0e
RemoveStageBarrier: use AKKA actor to access DAGScheduler's data stru…
Jul 2, 2014
aa2e0f2
RemoveStageBarrier: fix bug
Jul 2, 2014
0bbdb5d
RemoveStageBarrier: compute sorted task sets without holding a lock o…
Jul 3, 2014
d941899
RemoveStageBarrier: make the updater sleep a little longer if maps ar…
Jul 3, 2014
12b8093
RemoveStageBarrier: fix bug
Jul 3, 2014
c74a876
Revert "RemoveStageBarrier: fix bug"
Jul 4, 2014
c313fe0
RemoveStageBarrier: pre-start a stage if all of its parents' tasks ha…
Jul 4, 2014
033ffc0
RemoveStageBarrier: code refactor
Jul 4, 2014
8a08a6c
RemoveStageBarrier: add some log
Jul 4, 2014
a8b5d75
RemoveStageBarrier: revert change about tracking waiting tasks
Jul 7, 2014
f66a8eb
RemoveStageBarrier: code cleanup
Jul 7, 2014
1521fef
merge upstream master branch
Jul 7, 2014
9747d6b
RemoveStageBarrier: fix code style
Jul 7, 2014
8f798d8
RemoveStageBarrier: minor fix
Jul 8, 2014
1ab7a15
RemoveStageBarrier: minor fix
Jul 8, 2014
8417ffe
RemoveStageBarrier: let the reducer wake the updater
Jul 8, 2014
31c4634
RemoveStageBarrier: introduce a min interval to update map status
Jul 8, 2014
e1c374c
RemoveStageBarrier: fix bug
Jul 8, 2014
a503508
RemoveStageBarrier: code clean up
Jul 8, 2014
85a5d85
fix style
Jul 30, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 167 additions & 39 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package org.apache.spark

import java.io._
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import java.util.concurrent._
import java.util.Collections

import scala.collection.mutable.{HashSet, HashMap, Map}
import scala.concurrent.Await
import scala.collection.JavaConversions._

import akka.actor._
import akka.pattern.ask
Expand All @@ -30,11 +33,13 @@ import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util._
import scala.collection.mutable

private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int)
extends MapOutputTrackerMessage
private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
private[spark] case class GetShuffleStatus(shuffleId: Int) extends MapOutputTrackerMessage

/** Actor class for MapOutputTrackerMaster */
private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster, conf: SparkConf)
Expand Down Expand Up @@ -64,6 +69,9 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
logInfo("MapOutputTrackerActor stopped!")
sender ! true
context.stop(self)

case GetShuffleStatus(shuffleId: Int) =>
sender ! tracker.completenessForShuffle(shuffleId)
}
}

Expand All @@ -87,6 +95,14 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
*/
protected val mapStatuses: Map[Int, Array[MapStatus]]

// Track if we have partial map outputs for a shuffle
protected val partialForShuffle =
Collections.newSetFromMap[Int](new ConcurrentHashMap[Int, java.lang.Boolean]())

protected val partialEpoch = new mutable.HashMap[Int, Int]()

protected val updaterLock = new ConcurrentHashMap[Int, AnyRef]()

/**
* Incremented every time a fetch fails so that client nodes know to clear
* their cache of map output locations if this happens.
Expand Down Expand Up @@ -126,6 +142,45 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
* a given shuffle.
*/
def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
val statuses = getMapStatusesForShuffle(shuffleId, reduceId)
statuses.synchronized {
MapOutputTracker.convertMapStatuses(
shuffleId, reduceId, statuses, isPartial = partialForShuffle.contains(shuffleId))
}
}

/** Called to get current epoch number. */
def getEpoch: Long = {
epochLock.synchronized {
return epoch
}
}

/**
* Called from executors to update the epoch number, potentially clearing old outputs
* because of a fetch failure. Each worker task calls this with the latest epoch
* number on the master at the time it was created.
*/
def updateEpoch(newEpoch: Long) {
epochLock.synchronized {
if (newEpoch > epoch) {
logInfo("Updating epoch from "+epoch+" to " + newEpoch + " and clearing cache")
epoch = newEpoch
mapStatuses.clear()
}
}
}

/** Unregister shuffle data. */
def unregisterShuffle(shuffleId: Int) {
mapStatuses.remove(shuffleId)
}

/** Stop the tracker. */
def stop() { }

// Get map statuses for a shuffle
private def getMapStatusesForShuffle(shuffleId: Int, reduceId: Int): Array[MapStatus]={
val statuses = mapStatuses.get(shuffleId).orNull
if (statuses == null) {
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
Expand Down Expand Up @@ -158,7 +213,15 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
try {
val fetchedBytes =
askTracker(GetMapOutputStatuses(shuffleId)).asInstanceOf[Array[Byte]]
fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
val fetchedResults = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
fetchedStatuses = fetchedResults._1
if (fetchedResults._2) {
if(partialForShuffle.add(shuffleId)){
new Thread(new MapStatusUpdater(shuffleId)).start()
}
} else {
partialForShuffle -= shuffleId
}
logInfo("Got the output locations")
mapStatuses.put(shuffleId, fetchedStatuses)
} finally {
Expand All @@ -169,49 +232,92 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}
}
if (fetchedStatuses != null) {
fetchedStatuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
}
fetchedStatuses
} else {
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
}
} else {
statuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
}
statuses
}
}

/** Called to get current epoch number. */
def getEpoch: Long = {
epochLock.synchronized {
return epoch
// Clear outdated map outputs for a shuffle
private def clearOutdatedMapStatuses(shuffleId: Int): Boolean = {
if (mapStatuses.contains(shuffleId)) {
val masterCompleteness = askTracker(GetShuffleStatus(shuffleId)).asInstanceOf[Int]
val diff = masterCompleteness - completenessForShuffle(shuffleId)
if (diff > 0) {
logInfo("Master is " + diff + " map statuses ahead of us for shuffle " +
shuffleId + ". Clear local cache.")
mapStatuses -= shuffleId
return true
} else {
return false
}
}
true
}

/**
* Called from executors to update the epoch number, potentially clearing old outputs
* because of a fetch failure. Each worker task calls this with the latest epoch
* number on the master at the time it was created.
*/
def updateEpoch(newEpoch: Long) {
epochLock.synchronized {
if (newEpoch > epoch) {
logInfo("Updating epoch to " + newEpoch + " and clearing cache")
epoch = newEpoch
mapStatuses.clear()
// Compute the completeness of map statuses for a shuffle
def completenessForShuffle(shuffleId: Int): Int = {
mapStatuses.getOrElse(shuffleId, new Array[MapStatus](0)).count(_ != null)
}

// A proxy to update partial map statuses periodically
class MapStatusUpdater(shuffleId: Int) extends Runnable {
override def run() {
updaterLock.put(shuffleId, new AnyRef)
partialEpoch.synchronized {
if (!partialEpoch.contains(shuffleId)) {
partialEpoch.put(shuffleId, 0)
}
}
logInfo("Updater started for shuffle " + shuffleId + ".")
val minInterval = 1000
val maxInterval = 3000
var lastUpdate = System.currentTimeMillis()
while (partialForShuffle.contains(shuffleId)) {
updaterLock.getOrElseUpdate(shuffleId, new AnyRef).synchronized {
updaterLock(shuffleId).wait(maxInterval)
}
val interval = System.currentTimeMillis() - lastUpdate
if (interval < minInterval) {
Thread.sleep(minInterval - interval)
}
lastUpdate = System.currentTimeMillis()
if (clearOutdatedMapStatuses(shuffleId)) {
getMapStatusesForShuffle(shuffleId, -1)
partialEpoch.synchronized {
partialEpoch.put(shuffleId, partialEpoch.getOrElse(shuffleId, 0) + 1)
partialEpoch.notifyAll()
}
}
}
logInfo("Map status for shuffle " + shuffleId + " is now complete. Updater terminated.")
partialEpoch.synchronized {
partialEpoch.remove(shuffleId)
partialEpoch.notifyAll()
}
}
}

/** Unregister shuffle data. */
def unregisterShuffle(shuffleId: Int) {
mapStatuses.remove(shuffleId)
def getUpdatedStatus(
shuffleId: Int, reduceId: Int, localEpoch: Int): (Array[(BlockManagerId, Long)], Int) = {
partialEpoch.synchronized {
if (!partialEpoch.contains(shuffleId)) {
return (getServerStatuses(shuffleId, reduceId), 0)
}
if (partialEpoch.get(shuffleId).get <= localEpoch) {
updaterLock.getOrElseUpdate(shuffleId, new AnyRef).synchronized {
updaterLock(shuffleId).notifyAll()
}
logInfo("Reduce "+reduceId+" waiting for map outputs of shuffle "+shuffleId+".")
partialEpoch.wait()
}
(getServerStatuses(shuffleId, reduceId), partialEpoch.getOrElse(shuffleId, 0))
}
}

/** Stop the tracker. */
def stop() { }
}

/**
Expand All @@ -229,7 +335,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
* so that statuses are dropped only by explicit de-registering or by TTL-based cleaning (if set).
* Other than these two scenarios, nothing should be dropped from this HashMap.
*/
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]()
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]() with
mutable.SynchronizedMap[Int, Array[MapStatus]]
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]()

// For cleaning up TimeStampedHashMaps
Expand All @@ -240,6 +347,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
if (mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)).isDefined) {
throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
}
// We allow partial output by default. Should be later properly set when register map outputs
partialForShuffle += shuffleId
}

def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) {
Expand All @@ -250,11 +359,18 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
}

/** Register multiple map output information for the given shuffle */
def registerMapOutputs(shuffleId: Int, statuses: Array[MapStatus], changeEpoch: Boolean = false) {
def registerMapOutputs(
shuffleId: Int, statuses: Array[MapStatus],
changeEpoch: Boolean = false, isPartial: Boolean = false) {
mapStatuses.put(shuffleId, Array[MapStatus]() ++ statuses)
if (changeEpoch) {
incrementEpoch()
}
if (isPartial) {
partialForShuffle += shuffleId
} else {
partialForShuffle -= shuffleId
}
}

/** Unregister map output information of the given shuffle, mapper and block manager */
Expand All @@ -268,14 +384,15 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
}
}
incrementEpoch()
partialForShuffle += shuffleId
} else {
throw new SparkException("unregisterMapOutput called for nonexistent shuffle ID")
}
}

/** Unregister shuffle data */
override def unregisterShuffle(shuffleId: Int) {
mapStatuses.remove(shuffleId)
super.unregisterShuffle(shuffleId)
cachedSerializedStatuses.remove(shuffleId)
}

Expand Down Expand Up @@ -309,11 +426,13 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
}
// If we got here, we failed to find the serialized locations in the cache, so we pulled
// out a snapshot of the locations as "statuses"; let's serialize and return that
val bytes = MapOutputTracker.serializeMapStatuses(statuses)
val partial = partialForShuffle.contains(shuffleId)
val bytes = MapOutputTracker.serializeMapStatuses(statuses,isPartial = partial)
logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
// Add them into the table only if the epoch hasn't changed while we were working
epochLock.synchronized {
if (epoch == epochGotten) {
// Don't put partial outputs in cache
if (epoch == epochGotten && !partial) {
cachedSerializedStatuses(shuffleId) = bytes
}
}
Expand All @@ -340,6 +459,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
*/
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
protected val mapStatuses = new HashMap[Int, Array[MapStatus]]
with mutable.SynchronizedMap[Int, Array[MapStatus]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ConcurrentHashMap is better in most cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this PR be merged soon? If not, I hope this line can be merged soon because it solves a critical concurrent issue of mapStatuses.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing thanks for the comments. Maybe it's better to make it ConcurrentHashMap in the base class.
I don't think this PR can be merged soon... So maybe you can open another JIRA to fix this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's better to make it ConcurrentHashMap in the base class.

Because MapOutputTrackerMaster uses TimeStampedHashMap which is not a ConcurrentHashMap, MapOutputTracker still needs to use Map. Nevertheless, I can add a comment on MapOutputTracker.mapStatuses to mark that it should be a thread-safe map.

}

private[spark] object MapOutputTracker {
Expand All @@ -348,21 +468,24 @@ private[spark] object MapOutputTracker {
// Serialize an array of map output locations into an efficient byte format so that we can send
// it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
// generally be pretty compressible because many map outputs will be on the same hostname.
def serializeMapStatuses(statuses: Array[MapStatus]): Array[Byte] = {
def serializeMapStatuses(statuses: Array[MapStatus], isPartial: Boolean = false): Array[Byte] = {
val out = new ByteArrayOutputStream
val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
// Since statuses can be modified in parallel, sync on it
statuses.synchronized {
objOut.writeObject(statuses)
objOut.writeBoolean(isPartial)
}
objOut.close()
out.toByteArray
}

// Opposite of serializeMapStatuses.
def deserializeMapStatuses(bytes: Array[Byte]): Array[MapStatus] = {
def deserializeMapStatuses(bytes: Array[Byte]): (Array[MapStatus], Boolean) = {
val objIn = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes)))
objIn.readObject().asInstanceOf[Array[MapStatus]]
val mapStatuses = objIn.readObject().asInstanceOf[Array[MapStatus]]
val isPartial = objIn.readBoolean()
(mapStatuses, isPartial)
}

// Convert an array of MapStatuses to locations and sizes for a given reduce ID. If
Expand All @@ -371,13 +494,18 @@ private[spark] object MapOutputTracker {
private def convertMapStatuses(
shuffleId: Int,
reduceId: Int,
statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
statuses: Array[MapStatus],
isPartial: Boolean = false): Array[(BlockManagerId, Long)] = {
assert (statuses != null)
statuses.map {
status =>
if (status == null) {
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
if(isPartial){
(null, 0.toLong)
} else {
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
}
} else {
(status.location, decompressSize(status.compressedSizes(reduceId)))
}
Expand Down
Loading