-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown #28708
Changes from 39 commits
094d584
cd0781f
8e0304f
ac31c90
4eaf4dc
fd3354c
8054404
dcff412
dc9d648
493a298
c9139ef
0eef6fa
b50de4e
e2d1057
31dc836
a2d5e64
20655a4
5c131ef
5e11f1b
57965c8
93a8a4d
4560ebb
13aaa49
7af7492
a7d9238
953e5f2
d63ca07
197a524
5855eb4
c3f8658
206a3c3
f45a89c
4d7b5b8
d23cbbf
e573116
9503ca5
968418e
2d61c41
ac096f4
ebec1a7
d97c6ee
56a9903
5a0cd2a
546d953
b63808a
fe5ba7b
eb43f20
9d210f5
2467732
16b7376
8494bdd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,7 +49,7 @@ import org.apache.spark.util._ | |
* | ||
* All public methods of this class are thread-safe. | ||
*/ | ||
private class ShuffleStatus(numPartitions: Int) { | ||
private class ShuffleStatus(numPartitions: Int) extends Logging { | ||
|
||
private val (readLock, writeLock) = { | ||
val lock = new ReentrantReadWriteLock() | ||
|
@@ -121,12 +121,28 @@ private class ShuffleStatus(numPartitions: Int) { | |
mapStatuses(mapIndex) = status | ||
} | ||
|
||
/** | ||
* Update the map output location (e.g. during migration). | ||
*/ | ||
def updateMapOutput(mapId: Long, bmAddress: BlockManagerId): Unit = withWriteLock { | ||
val mapStatusOpt = mapStatuses.find(_.mapId == mapId) | ||
mapStatusOpt match { | ||
case Some(mapStatus) => | ||
logInfo(s"Updating map output for ${mapId} to ${bmAddress}") | ||
mapStatus.updateLocation(bmAddress) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we add an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no reason to, this is pretty close to a noop if we get a duplicated update message. |
||
invalidateSerializedMapOutputStatusCache() | ||
case None => | ||
logError(s"Asked to update map output ${mapId} for untracked map status.") | ||
} | ||
} | ||
|
||
/** | ||
* Remove the map output which was served by the specified block manager. | ||
* This is a no-op if there is no registered map output or if the registered output is from a | ||
* different block manager. | ||
*/ | ||
def removeMapOutput(mapIndex: Int, bmAddress: BlockManagerId): Unit = withWriteLock { | ||
logDebug(s"Removing existing map output ${mapIndex} ${bmAddress}") | ||
if (mapStatuses(mapIndex) != null && mapStatuses(mapIndex).location == bmAddress) { | ||
_numAvailableOutputs -= 1 | ||
mapStatuses(mapIndex) = null | ||
|
@@ -139,6 +155,7 @@ private class ShuffleStatus(numPartitions: Int) { | |
* outputs which are served by an external shuffle server (if one exists). | ||
*/ | ||
def removeOutputsOnHost(host: String): Unit = withWriteLock { | ||
logDebug(s"Removing outputs for host ${host}") | ||
removeOutputsByFilter(x => x.host == host) | ||
} | ||
|
||
|
@@ -148,6 +165,7 @@ private class ShuffleStatus(numPartitions: Int) { | |
* still registered with that execId. | ||
*/ | ||
def removeOutputsOnExecutor(execId: String): Unit = withWriteLock { | ||
logDebug(s"Removing outputs for execId ${execId}") | ||
removeOutputsByFilter(x => x.executorId == execId) | ||
} | ||
|
||
|
@@ -265,7 +283,7 @@ private[spark] class MapOutputTrackerMasterEndpoint( | |
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { | ||
case GetMapOutputStatuses(shuffleId: Int) => | ||
val hostPort = context.senderAddress.hostPort | ||
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort) | ||
logInfo(s"Asked to send map output locations for shuffle ${shuffleId} to ${hostPort}") | ||
tracker.post(new GetMapOutputMessage(shuffleId, context)) | ||
|
||
case StopMapOutputTracker => | ||
|
@@ -479,6 +497,15 @@ private[spark] class MapOutputTrackerMaster( | |
} | ||
} | ||
|
||
def updateMapOutput(shuffleId: Int, mapId: Long, bmAddress: BlockManagerId): Unit = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for this one, too. |
||
shuffleStatuses.get(shuffleId) match { | ||
case Some(shuffleStatus) => | ||
shuffleStatus.updateMapOutput(mapId, bmAddress) | ||
case None => | ||
logError(s"Asked to update map output for unknown shuffle ${shuffleId}") | ||
} | ||
} | ||
|
||
def registerMapOutput(shuffleId: Int, mapIndex: Int, status: MapStatus): Unit = { | ||
shuffleStatuses(shuffleId).addMapOutput(mapIndex, status) | ||
} | ||
|
@@ -775,7 +802,12 @@ private[spark] class MapOutputTrackerMaster( | |
override def stop(): Unit = { | ||
mapOutputRequests.offer(PoisonPill) | ||
threadpool.shutdown() | ||
sendTracker(StopMapOutputTracker) | ||
try { | ||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
sendTracker(StopMapOutputTracker) | ||
} catch { | ||
case e: SparkException => | ||
logError("Could not tell tracker we are stopping.", e) | ||
} | ||
trackerEndpoint = null | ||
shuffleStatuses.clear() | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -420,6 +420,30 @@ package object config { | |
.booleanConf | ||
.createWithDefault(false) | ||
|
||
private[spark] val STORAGE_SHUFFLE_DECOMMISSION_ENABLED = | ||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ConfigBuilder("spark.storage.decommission.shuffleBlocks.enabled") | ||
tgravescs marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.doc("Whether to transfer shuffle blocks during block manager decommissioning. Requires " + | ||
"a migratable shuffle resolver (like sort based shuffe)") | ||
.version("3.1.0") | ||
.booleanConf | ||
.createWithDefault(false) | ||
|
||
private[spark] val STORAGE_SHUFFLE_DECOMMISSION_MAX_THREADS = | ||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ConfigBuilder("spark.storage.decommission.shuffleBlocks.maxThreads") | ||
.doc("Maximum number of threads to use in migrating shuffle files.") | ||
.version("3.1.0") | ||
.intConf | ||
.checkValue(_ > 0, "The maximum number of threads should be positive") | ||
.createWithDefault(10) | ||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: redundant blank line |
||
private[spark] val STORAGE_RDD_DECOMMISSION_ENABLED = | ||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ConfigBuilder("spark.storage.decommission.rddBlocks.enabled") | ||
.doc("Whether to transfer RDD blocks during block manager decommissioning.") | ||
.version("3.1.0") | ||
.booleanConf | ||
.createWithDefault(false) | ||
|
||
private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK = | ||
ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock") | ||
.internal() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -168,7 +168,8 @@ private[spark] class NettyBlockTransferService( | |
// Everything else is encoded using our binary protocol. | ||
val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag))) | ||
|
||
val asStream = blockData.size() > conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM) | ||
val asStream = (blockData.size() > conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM) || | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: A parentheses isn't quite needed, but even if it is, then would it be easier to read this as: val asStream = (blockData.size() > conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM) || blockId.isShuffle There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's easier to read as is. |
||
blockId.isInternalShuffle || blockId.isShuffle) | ||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
val callback = new RpcResponseCallback { | ||
override def onSuccess(response: ByteBuffer): Unit = { | ||
logTrace(s"Successfully uploaded block $blockId${if (asStream) " as stream" else ""}") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,9 +33,11 @@ import org.apache.spark.util.Utils | |
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks. | ||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
*/ | ||
private[spark] sealed trait MapStatus { | ||
/** Location where this task was run. */ | ||
/** Location where this task output is. */ | ||
def location: BlockManagerId | ||
|
||
def updateLocation(bm: BlockManagerId): Unit | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for extending this, @holdenk . |
||
|
||
/** | ||
* Estimated size for the reduce block, in bytes. | ||
* | ||
|
@@ -126,6 +128,10 @@ private[spark] class CompressedMapStatus( | |
|
||
override def location: BlockManagerId = loc | ||
|
||
override def updateLocation(bm: BlockManagerId): Unit = { | ||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
loc = bm | ||
} | ||
|
||
override def getSizeForBlock(reduceId: Int): Long = { | ||
MapStatus.decompressSize(compressedSizes(reduceId)) | ||
} | ||
|
@@ -178,6 +184,10 @@ private[spark] class HighlyCompressedMapStatus private ( | |
|
||
override def location: BlockManagerId = loc | ||
|
||
override def updateLocation(bm: BlockManagerId): Unit = { | ||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
loc = bm | ||
} | ||
|
||
override def getSizeForBlock(reduceId: Int): Long = { | ||
assert(hugeBlockSizes != null) | ||
if (emptyBlocks.contains(reduceId)) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,15 +18,18 @@ | |
package org.apache.spark.shuffle | ||
|
||
import java.io._ | ||
import java.nio.ByteBuffer | ||
import java.nio.channels.Channels | ||
import java.nio.file.Files | ||
|
||
import org.apache.spark.{SparkConf, SparkEnv} | ||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.io.NioBufferedFileInputStream | ||
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} | ||
import org.apache.spark.network.client.StreamCallbackWithID | ||
import org.apache.spark.network.netty.SparkTransportConf | ||
import org.apache.spark.network.shuffle.ExecutorDiskUtils | ||
import org.apache.spark.serializer.SerializerManager | ||
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID | ||
import org.apache.spark.storage._ | ||
import org.apache.spark.util.Utils | ||
|
@@ -44,9 +47,9 @@ import org.apache.spark.util.Utils | |
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData(). | ||
private[spark] class IndexShuffleBlockResolver( | ||
conf: SparkConf, | ||
_blockManager: BlockManager = null) | ||
var _blockManager: BlockManager = null) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a var for testing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah it's for the new tests in the |
||
extends ShuffleBlockResolver | ||
with Logging { | ||
with Logging with MigratableResolver { | ||
|
||
private lazy val blockManager = Option(_blockManager).getOrElse(SparkEnv.get.blockManager) | ||
|
||
|
@@ -55,6 +58,25 @@ private[spark] class IndexShuffleBlockResolver( | |
|
||
def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId, mapId, None) | ||
|
||
/** | ||
* Get the shuffle files that are stored locally. Used for block migrations. | ||
*/ | ||
override def getStoredShuffles(): Set[ShuffleBlockInfo] = { | ||
// Matches ShuffleIndexBlockId name | ||
val pattern = "shuffle_(\\d+)_(\\d+)_.+\\.index".r | ||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
val rootDirs = blockManager.diskBlockManager.localDirs | ||
// ExecutorDiskUtil puts things inside one level hashed sub directories | ||
val searchDirs = rootDirs.flatMap(_.listFiles()).filter(_.isDirectory()) ++ rootDirs | ||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
val filenames = searchDirs.flatMap(_.list()) | ||
logDebug(s"Got block files ${filenames.toList}") | ||
filenames.flatMap { fname => | ||
pattern.findAllIn(fname).matchData.map { | ||
matched => ShuffleBlockInfo(matched.group(1).toInt, matched.group(2).toLong) | ||
} | ||
}.toSet | ||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: redundant blank line. |
||
/** | ||
* Get the shuffle data file. | ||
* | ||
|
@@ -148,6 +170,82 @@ private[spark] class IndexShuffleBlockResolver( | |
} | ||
} | ||
|
||
/** | ||
* Write a provided shuffle block as a stream. Used for block migrations. | ||
* ShuffleBlockBatchIds must contain the full range represented in the ShuffleIndexBlock. | ||
* Requires the caller to delete any shuffle index blocks where the shuffle block fails to | ||
* put. | ||
*/ | ||
override def putShuffleBlockAsStream(blockId: BlockId, serializerManager: SerializerManager): | ||
StreamCallbackWithID = { | ||
val file = blockId match { | ||
case ShuffleIndexBlockId(shuffleId, mapId, _) => | ||
getIndexFile(shuffleId, mapId) | ||
case ShuffleDataBlockId(shuffleId, mapId, _) => | ||
getDataFile(shuffleId, mapId) | ||
case _ => | ||
throw new Exception(s"Unexpected shuffle block transfer ${blockId} as " + | ||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
s"${blockId.getClass().getSimpleName()}") | ||
} | ||
val fileTmp = Utils.tempFileWith(file) | ||
val channel = Channels.newChannel( | ||
serializerManager.wrapStream(blockId, | ||
new FileOutputStream(fileTmp))) | ||
|
||
new StreamCallbackWithID { | ||
|
||
override def getID: String = blockId.name | ||
|
||
override def onData(streamId: String, buf: ByteBuffer): Unit = { | ||
while (buf.hasRemaining) { | ||
channel.write(buf) | ||
} | ||
} | ||
|
||
override def onComplete(streamId: String): Unit = { | ||
logTrace(s"Done receiving shuffle block $blockId, now storing on local disk.") | ||
channel.close() | ||
val diskSize = fileTmp.length() | ||
this.synchronized { | ||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (file.exists()) { | ||
file.delete() | ||
} | ||
if (!fileTmp.renameTo(file)) { | ||
throw new IOException(s"fail to rename file ${fileTmp} to ${file}") | ||
} | ||
} | ||
blockManager.reportBlockStatus(blockId, BlockStatus(StorageLevel.DISK_ONLY, 0, diskSize)) | ||
} | ||
|
||
override def onFailure(streamId: String, cause: Throwable): Unit = { | ||
// the framework handles the connection itself, we just need to do local cleanup | ||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
logWarning(s"Error while uploading $blockId", cause) | ||
channel.close() | ||
fileTmp.delete() | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Get the index & data block for migration. | ||
*/ | ||
def getMigrationBlocks(shuffleBlockInfo: ShuffleBlockInfo): List[(BlockId, ManagedBuffer)] = { | ||
val shuffleId = shuffleBlockInfo.shuffleId | ||
val mapId = shuffleBlockInfo.mapId | ||
// Load the index block | ||
val indexFile = getIndexFile(shuffleId, mapId) | ||
val indexBlockId = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID) | ||
val indexFileSize = indexFile.length() | ||
val indexBlockData = new FileSegmentManagedBuffer(transportConf, indexFile, 0, indexFileSize) | ||
|
||
// Load the data block | ||
val dataFile = getDataFile(shuffleId, mapId) | ||
val dataBlockId = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID) | ||
val dataBlockData = new FileSegmentManagedBuffer(transportConf, dataFile, 0, dataFile.length()) | ||
List((indexBlockId, indexBlockData), (dataBlockId, dataBlockData)) | ||
} | ||
|
||
|
||
/** | ||
* Write an index file with the offsets of each block, plus a final offset at the end for the | ||
* end of the output file. This will be used by getBlockData to figure out where each block | ||
|
@@ -169,7 +267,7 @@ private[spark] class IndexShuffleBlockResolver( | |
val dataFile = getDataFile(shuffleId, mapId) | ||
// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure | ||
// the following check and rename are atomic. | ||
synchronized { | ||
this.synchronized { | ||
val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length) | ||
if (existingLengths != null) { | ||
// Another attempt for the same task has already written our map outputs successfully, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.shuffle | ||
|
||
import org.apache.spark.annotation.Experimental | ||
import org.apache.spark.network.buffer.ManagedBuffer | ||
import org.apache.spark.network.client.StreamCallbackWithID | ||
import org.apache.spark.serializer.SerializerManager | ||
import org.apache.spark.storage.BlockId | ||
|
||
/** | ||
* :: Experimental :: | ||
* An experimental trait to allow Spark to migrate shuffle blocks. | ||
*/ | ||
@Experimental | ||
trait MigratableResolver { | ||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/** | ||
* Get the shuffle ids that are stored locally. Used for block migrations. | ||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
*/ | ||
def getStoredShuffles(): Set[ShuffleBlockInfo] | ||
|
||
/** | ||
* Write a provided shuffle block as a stream. Used for block migrations. | ||
*/ | ||
def putShuffleBlockAsStream(blockId: BlockId, serializerManager: SerializerManager): | ||
StreamCallbackWithID | ||
|
||
/** | ||
* Get the blocks for migration for a particular shuffle and map. | ||
*/ | ||
def getMigrationBlocks(shuffleBlockInfo: ShuffleBlockInfo): List[(BlockId, ManagedBuffer)] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for adding this API.