Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][SPARK-20629][CORE] Copy shuffle data when nodes are being shutdown #28331

Closed
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4126c1b
Add support for migrating shuffle files
holdenk Apr 24, 2020
8ee8949
Style fixes
holdenk Apr 24, 2020
afb1b1a
Re-enable the rest of the K8s tests.
holdenk Apr 24, 2020
4071ae2
Python style fix
holdenk Apr 24, 2020
ff620ba
Don't join the block migration thread, we'll block on that
holdenk May 1, 2020
adb03db
We only need two executors for this test.
holdenk May 1, 2020
be2a5e7
Try and update the tests some more, switch migration to not make a ne…
holdenk May 2, 2020
783114b
Code cleanups (swap some maps for foreach where we didn't need the re…
holdenk May 26, 2020
dbe2418
Merge branch 'master' into SPARK-20629-copy-shuffle-data-when-nodes-a…
holdenk May 26, 2020
a240f98
Add missing follow up commit from merge
holdenk May 26, 2020
ef8fcc5
Use NOOP_REDUCE_ID
holdenk May 28, 2020
838a346
Config shorter interval for testing.
holdenk May 28, 2020
e85c8ef
Wait for the desired number of executors to be present.
holdenk May 28, 2020
2da0f2d
Make a MigratableResolver interface so custom shuffle implementations…
holdenk May 28, 2020
9d31746
Add in the trait I refactored the code to use (forgot the git add :p)
holdenk May 28, 2020
38ff8be
Use block updates to make sure our desired blocks are being moved & a…
holdenk May 28, 2020
13ec43a
Don't hardcode the blockId because it is not constant.
holdenk May 28, 2020
a92025c
Test both migrations at the same time & reduce some of the sleeps
holdenk May 28, 2020
fe265d7
Tag new APIs
holdenk May 28, 2020
70c3871
Increase the number of execs and decrease the thread sleep time while…
holdenk May 29, 2020
069dd3b
Update core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlock…
holdenk May 29, 2020
6340f9b
Fix the migration to store ShuffleDataBlockId, check that data and in…
holdenk May 29, 2020
4cb0458
Merge branch 'master' into SPARK-20629-copy-shuffle-data-when-nodes-a…
holdenk Jun 1, 2020
4cfeb8e
We don't need the to set operations, also sleepyRdd isn't always slee…
holdenk Jun 1, 2020
e81aa5a
Use the remoteBlockSize param in the tests instead of conditioning on…
holdenk Jun 1, 2020
841d443
Add a part of the test where we kill the original exec and recount. N…
holdenk Jun 1, 2020
ba20ec0
Add a part of the test where we kill the original exec and recount. N…
holdenk Jun 1, 2020
17a6a3f
Fix the map output update logic which was getting tramped on (also th…
holdenk Jun 1, 2020
155aeb2
Saw a test failure of the executors not coming up in time, bumping th…
holdenk Jun 1, 2020
7f93df6
Bump the timeout, and also don't wait for the full set if we don't ne…
holdenk Jun 2, 2020
7e32341
Return faster with shuffle blocks since we don't need the rest of the…
holdenk Jun 2, 2020
a3aa8eb
Small cleanups
holdenk Jun 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.spark.util._
*
* All public methods of this class are thread-safe.
*/
private class ShuffleStatus(numPartitions: Int) {
private class ShuffleStatus(numPartitions: Int) extends Logging {

private val (readLock, writeLock) = {
val lock = new ReentrantReadWriteLock()
Expand Down Expand Up @@ -121,12 +121,28 @@ private class ShuffleStatus(numPartitions: Int) {
mapStatuses(mapIndex) = status
}

/**
* Update the map output location (e.g. during migration).
*/
def updateMapOutput(mapId: Long, bmAddress: BlockManagerId): Unit = withWriteLock {
val mapStatusOpt = mapStatuses.find(_.mapId == mapId)
mapStatusOpt match {
case Some(mapStatus) =>
logInfo("Updating map output for ${mapId} to ${bmAddress}")
mapStatus.updateLocation(bmAddress)
invalidateSerializedMapOutputStatusCache()
case None =>
logError("Asked to update map output ${mapId} for untracked map status.")
}
}

/**
* Remove the map output which was served by the specified block manager.
* This is a no-op if there is no registered map output or if the registered output is from a
* different block manager.
*/
def removeMapOutput(mapIndex: Int, bmAddress: BlockManagerId): Unit = withWriteLock {
logDebug(s"Removing existing map output ${mapIndex} ${bmAddress}")
if (mapStatuses(mapIndex) != null && mapStatuses(mapIndex).location == bmAddress) {
_numAvailableOutputs -= 1
mapStatuses(mapIndex) = null
Expand All @@ -139,6 +155,7 @@ private class ShuffleStatus(numPartitions: Int) {
* outputs which are served by an external shuffle server (if one exists).
*/
def removeOutputsOnHost(host: String): Unit = withWriteLock {
logDebug(s"Removing outputs for host ${host}")
removeOutputsByFilter(x => x.host == host)
}

Expand All @@ -148,6 +165,7 @@ private class ShuffleStatus(numPartitions: Int) {
* still registered with that execId.
*/
def removeOutputsOnExecutor(execId: String): Unit = withWriteLock {
logDebug(s"Removing outputs for execId ${execId}")
removeOutputsByFilter(x => x.executorId == execId)
}

Expand Down Expand Up @@ -265,7 +283,6 @@ private[spark] class MapOutputTrackerMasterEndpoint(
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case GetMapOutputStatuses(shuffleId: Int) =>
val hostPort = context.senderAddress.hostPort
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
tracker.post(new GetMapOutputMessage(shuffleId, context))

case StopMapOutputTracker =>
Expand Down Expand Up @@ -449,7 +466,7 @@ private[spark] class MapOutputTrackerMaster(
val context = data.context
val shuffleId = data.shuffleId
val hostPort = context.senderAddress.hostPort
logDebug("Handling request to send map output locations for shuffle " + shuffleId +
logInfo("Handling request to send map output locations for shuffle " + shuffleId +
" to " + hostPort)
val shuffleStatus = shuffleStatuses.get(shuffleId).head
context.reply(
Expand Down Expand Up @@ -479,6 +496,16 @@ private[spark] class MapOutputTrackerMaster(
}
}

def updateMapOutput(shuffleId: Int, mapId: Long, bmAddress: BlockManagerId): Unit = {
shuffleStatuses.get(shuffleId) match {
case Some(shuffleStatus) =>
shuffleStatus.updateMapOutput(mapId, bmAddress)
shuffleStatus.invalidateSerializedMapOutputStatusCache()
case None =>
logError(s"Asked to update map output for unknown shuffle ${shuffleId}")
}
}

def registerMapOutput(shuffleId: Int, mapIndex: Int, status: MapStatus): Unit = {
shuffleStatuses(shuffleId).addMapOutput(mapIndex, status)
}
Expand Down
20 changes: 18 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import org.apache.spark.resource._
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend}
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.shuffle.ShuffleDataIOUtils
import org.apache.spark.shuffle.api.ShuffleDriverComponents
Expand Down Expand Up @@ -1586,7 +1586,12 @@ class SparkContext(config: SparkConf) extends Logging {
listenerBus.removeListener(listener)
}

private[spark] def getExecutorIds(): Seq[String] = {
/**
* :: DeveloperApi ::
* Deregister the listener from Spark's listener bus.
*/
@DeveloperApi
def getExecutorIds(): Seq[String] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll move this back to private[spark] (also the comment I have seems wrong so revert the comment)

schedulerBackend match {
case b: ExecutorAllocationClient =>
b.getExecutorIds()
Expand Down Expand Up @@ -1725,6 +1730,17 @@ class SparkContext(config: SparkConf) extends Logging {
}
}


@DeveloperApi
def decommissionExecutors(executorIds: Seq[String]): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think private[spark] as well

schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
executorIds.foreach(b.decommissionExecutor)
case _ =>
logWarning("Decommissioning executors is not supported by current scheduler.")
}
}

/** The version of Spark on which this application is running. */
def version: String = SPARK_VERSION

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ object SparkEnv extends Logging {
externalShuffleClient
} else {
None
}, blockManagerInfo)),
}, blockManagerInfo,
mapOutputTracker.asInstanceOf[MapOutputTrackerMaster])),
registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME,
new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo)),
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,21 @@ package object config {
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_SHUFFLE_DECOMMISSION_ENABLED =
ConfigBuilder("spark.storage.decommission.shuffle_blocks")
.doc("Whether to transfer shuffle blocks during block manager decommissioning. Requires " +
"an indexed shuffle resolver (like sort based shuffe)")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_RDD_DECOMMISSION_ENABLED =
ConfigBuilder("spark.storage.decommission.rdd_blocks")
.doc("Whether to transfer RDD blocks during block manager decommissioning.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK =
ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock")
.internal()
Expand Down
12 changes: 11 additions & 1 deletion core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import org.apache.spark.util.Utils
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
*/
private[spark] sealed trait MapStatus {
/** Location where this task was run. */
/** Location where this task output is. */
def location: BlockManagerId

def updateLocation(bm: BlockManagerId): Unit

/**
* Estimated size for the reduce block, in bytes.
*
Expand Down Expand Up @@ -126,6 +128,10 @@ private[spark] class CompressedMapStatus(

override def location: BlockManagerId = loc

override def updateLocation(bm: BlockManagerId): Unit = {
loc = bm
}

override def getSizeForBlock(reduceId: Int): Long = {
MapStatus.decompressSize(compressedSizes(reduceId))
}
Expand Down Expand Up @@ -178,6 +184,10 @@ private[spark] class HighlyCompressedMapStatus private (

override def location: BlockManagerId = loc

override def updateLocation(bm: BlockManagerId): Unit = {
loc = bm
}

override def getSizeForBlock(reduceId: Int): Long = {
assert(hugeBlockSizes != null)
if (emptyBlocks.contains(reduceId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private[spark] class StandaloneSchedulerBackend(
with StandaloneAppClientListener
with Logging {

private var client: StandaloneAppClient = null
private[spark] var client: StandaloneAppClient = null
private val stopping = new AtomicBoolean(false)
private val launcherBackend = new LauncherBackend() {
override protected def conf: SparkConf = sc.conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package org.apache.spark.shuffle

import java.io._
import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.file.Files

import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.internal.Logging
import org.apache.spark.io.NioBufferedFileInputStream
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.client.StreamCallbackWithID
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.ExecutorDiskUtils
import org.apache.spark.serializer.SerializerManager
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.storage._
import org.apache.spark.util.Utils
Expand All @@ -46,7 +49,7 @@ private[spark] class IndexShuffleBlockResolver(
conf: SparkConf,
_blockManager: BlockManager = null)
extends ShuffleBlockResolver
with Logging {
with Logging with MigratableResolver {

private lazy val blockManager = Option(_blockManager).getOrElse(SparkEnv.get.blockManager)

Expand All @@ -55,6 +58,25 @@ private[spark] class IndexShuffleBlockResolver(

def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId, mapId, None)

/**
* Get the shuffle files that are stored locally. Used for block migrations.
*/
override def getStoredShuffles(): Set[(Int, Long)] = {
// Matches ShuffleIndexBlockId name
val pattern = "shuffle_(\\d+)_(\\d+)_.+\\.index".r
val rootDirs = blockManager.diskBlockManager.localDirs
// ExecutorDiskUtil puts things inside one level hashed sub directories
val searchDirs = rootDirs.flatMap(_.listFiles()).filter(_.isDirectory()) ++ rootDirs
val filenames = searchDirs.flatMap(_.list())
logDebug(s"Got block files ${filenames.toList}")
filenames.flatMap{ fname =>
pattern.findAllIn(fname).matchData.map {
matched => (matched.group(1).toInt, matched.group(2).toLong)
}
}.toSet
}


/**
* Get the shuffle data file.
*
Expand Down Expand Up @@ -148,6 +170,86 @@ private[spark] class IndexShuffleBlockResolver(
}
}

/**
* Write a provided shuffle block as a stream. Used for block migrations.
* ShuffleBlockBatchIds must contain the full range represented in the ShuffleIndexBlock.
* Requires the caller to delete any shuffle index blocks where the shuffle block fails to
* put.
*/
override def putShuffleBlockAsStream(blockId: BlockId, serializerManager: SerializerManager):
StreamCallbackWithID = {
val file = blockId match {
case ShuffleIndexBlockId(shuffleId, mapId, _) =>
getIndexFile(shuffleId, mapId)
case ShuffleDataBlockId(shuffleId, mapId, _) =>
getDataFile(shuffleId, mapId)
case _ =>
throw new Exception(s"Unexpected shuffle block transfer ${blockId} as " +
s"${blockId.getClass().getSimpleName()}")
}
val fileTmp = Utils.tempFileWith(file)
val channel = Channels.newChannel(
serializerManager.wrapStream(blockId,
new FileOutputStream(fileTmp)))

new StreamCallbackWithID {

override def getID: String = blockId.name

override def onData(streamId: String, buf: ByteBuffer): Unit = {
while (buf.hasRemaining) {
channel.write(buf)
}
}

override def onComplete(streamId: String): Unit = {
logTrace(s"Done receiving block $blockId, now putting into local shuffle service")
channel.close()
val diskSize = fileTmp.length()
this.synchronized {
if (file.exists()) {
file.delete()
}
Comment on lines +210 to +212
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the file exists, does it mean there is index/data file with same shuffle id and map id? When it could happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this should never happen, I'm not sure though let me do some thinking on that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this mirrors the logic inside of writeIndexFileAndCommit, the matching check there was introduced in SPARK-17547
which I believe is for the situation where an exception occurred during a previous write and the filesystem is in a dirty state. So I think we should keep it to be safe.

if (!fileTmp.renameTo(file)) {
throw new IOException(s"fail to rename file ${fileTmp} to ${file}")
}
}
blockManager.reportBlockStatus(blockId, BlockStatus(
StorageLevel(
useDisk = true,
useMemory = false,
useOffHeap = false,
deserialized = false,
replication = 0)
, 0, diskSize))
}

override def onFailure(streamId: String, cause: Throwable): Unit = {
// the framework handles the connection itself, we just need to do local cleanup
channel.close()
fileTmp.delete()
}
}
}

/**
* Get the index & data block for migration.
*/
def getMigrationBlocks(shuffleId: Int, mapId: Long): List[(BlockId, ManagedBuffer)] = {
// Load the index block
val indexFile = getIndexFile(shuffleId, mapId)
val indexBlockId = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
val indexFileSize = indexFile.length()
val indexBlockData = new FileSegmentManagedBuffer(transportConf, indexFile, 0, indexFileSize)

// Load the data block
val dataFile = getDataFile(shuffleId, mapId)
val dataBlockId = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
val dataBlockData = new FileSegmentManagedBuffer(transportConf, dataFile, 0, dataFile.length())
List((indexBlockId, indexBlockData), (dataBlockId, dataBlockData))
}


/**
* Write an index file with the offsets of each block, plus a final offset at the end for the
* end of the output file. This will be used by getBlockData to figure out where each block
Expand All @@ -169,7 +271,7 @@ private[spark] class IndexShuffleBlockResolver(
val dataFile = getDataFile(shuffleId, mapId)
// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
// the following check and rename are atomic.
synchronized {
this.synchronized {
val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
if (existingLengths != null) {
// Another attempt for the same task has already written our map outputs successfully,
Expand Down
Loading