diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index ec8621bc55cf3..1e4b9906e0137 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -49,7 +49,7 @@ import org.apache.spark.util._ * * All public methods of this class are thread-safe. */ -private class ShuffleStatus(numPartitions: Int) { +private class ShuffleStatus(numPartitions: Int) extends Logging { private val (readLock, writeLock) = { val lock = new ReentrantReadWriteLock() @@ -121,12 +121,28 @@ private class ShuffleStatus(numPartitions: Int) { mapStatuses(mapIndex) = status } + /** + * Update the map output location (e.g. during migration). + */ + def updateMapOutput(mapId: Long, bmAddress: BlockManagerId): Unit = withWriteLock { + val mapStatusOpt = mapStatuses.find(_.mapId == mapId) + mapStatusOpt match { + case Some(mapStatus) => + logInfo(s"Updating map output for ${mapId} to ${bmAddress}") + mapStatus.updateLocation(bmAddress) + invalidateSerializedMapOutputStatusCache() + case None => + logError(s"Asked to update map output ${mapId} for untracked map status.") + } + } + /** * Remove the map output which was served by the specified block manager. * This is a no-op if there is no registered map output or if the registered output is from a * different block manager. */ def removeMapOutput(mapIndex: Int, bmAddress: BlockManagerId): Unit = withWriteLock { + logDebug(s"Removing existing map output ${mapIndex} ${bmAddress}") if (mapStatuses(mapIndex) != null && mapStatuses(mapIndex).location == bmAddress) { _numAvailableOutputs -= 1 mapStatuses(mapIndex) = null @@ -139,6 +155,7 @@ private class ShuffleStatus(numPartitions: Int) { * outputs which are served by an external shuffle server (if one exists). */ def removeOutputsOnHost(host: String): Unit = withWriteLock { + logDebug(s"Removing outputs for host ${host}") removeOutputsByFilter(x => x.host == host) } @@ -148,6 +165,7 @@ private class ShuffleStatus(numPartitions: Int) { * still registered with that execId. */ def removeOutputsOnExecutor(execId: String): Unit = withWriteLock { + logDebug(s"Removing outputs for execId ${execId}") removeOutputsByFilter(x => x.executorId == execId) } @@ -265,7 +283,7 @@ private[spark] class MapOutputTrackerMasterEndpoint( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case GetMapOutputStatuses(shuffleId: Int) => val hostPort = context.senderAddress.hostPort - logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort) + logInfo(s"Asked to send map output locations for shuffle ${shuffleId} to ${hostPort}") tracker.post(new GetMapOutputMessage(shuffleId, context)) case StopMapOutputTracker => @@ -479,6 +497,15 @@ private[spark] class MapOutputTrackerMaster( } } + def updateMapOutput(shuffleId: Int, mapId: Long, bmAddress: BlockManagerId): Unit = { + shuffleStatuses.get(shuffleId) match { + case Some(shuffleStatus) => + shuffleStatus.updateMapOutput(mapId, bmAddress) + case None => + logError(s"Asked to update map output for unknown shuffle ${shuffleId}") + } + } + def registerMapOutput(shuffleId: Int, mapIndex: Int, status: MapStatus): Unit = { shuffleStatuses(shuffleId).addMapOutput(mapIndex, status) } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 38d7319b1f0ef..0d4151f29c855 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -57,7 +57,7 @@ import org.apache.spark.resource._ import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} import org.apache.spark.scheduler.local.LocalSchedulerBackend import org.apache.spark.shuffle.ShuffleDataIOUtils import org.apache.spark.shuffle.api.ShuffleDriverComponents @@ -1725,6 +1725,17 @@ class SparkContext(config: SparkConf) extends Logging { } } + + private[spark] def decommissionExecutors(executorIds: Seq[String]): Unit = { + schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => + executorIds.foreach(b.decommissionExecutor) + case _ => + logWarning(s"Decommissioning executors is not supported by current scheduler" + + s"${schedulerBackend}") + } + } + /** The version of Spark on which this application is running. */ def version: String = SPARK_VERSION diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 8ba1739831803..d543359f4dedf 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -367,7 +367,8 @@ object SparkEnv extends Logging { externalShuffleClient } else { None - }, blockManagerInfo)), + }, blockManagerInfo, + mapOutputTracker.asInstanceOf[MapOutputTrackerMaster])), registerOrLookupEndpoint( BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME, new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo)), diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 18305ad3746a6..41b4be960136f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -158,8 +158,6 @@ private[deploy] object DeployMessages { case object ReregisterWithMaster // used when a worker attempts to reconnect to a master - case object DecommissionSelf // Mark as decommissioned. May be Master to Worker in the future. - // AppClient to Master case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index aa8c46fc68315..862e685c2dce6 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -668,7 +668,7 @@ private[deploy] class Worker( finishedApps += id maybeCleanupApplication(id) - case DecommissionSelf => + case WorkerDecommission(_, _) => decommissionSelf() } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 6625457749f6a..9b93cc14e8df6 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -210,6 +210,10 @@ private[spark] class CoarseGrainedExecutorBackend( case UpdateDelegationTokens(tokenBytes) => logInfo(s"Received tokens of ${tokenBytes.length} bytes") SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf) + + case DecommissionSelf => + logInfo("Received decommission self") + decommissionSelf() } override def onDisconnected(remoteAddress: RpcAddress): Unit = { @@ -258,26 +262,65 @@ private[spark] class CoarseGrainedExecutorBackend( System.exit(code) } - private def decommissionSelf(): Boolean = { - logInfo("Decommissioning self w/sync") - try { - decommissioned = true - // Tell master we are are decommissioned so it stops trying to schedule us - if (driver.nonEmpty) { - driver.get.askSync[Boolean](DecommissionExecutor(executorId)) + private var previousAllBlocksMigrated = false + private def shutdownIfDone(): Unit = { + val numRunningTasks = executor.numRunningTasks + logInfo(s"Checking to see if we can shutdown have ${numRunningTasks} running tasks.") + if (executor.numRunningTasks == 0) { + if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { + val allBlocksMigrated = env.blockManager.decommissionManager match { + case Some(m) => m.allBlocksMigrated + case None => false // We haven't started migrations yet. + } + if (allBlocksMigrated && previousAllBlocksMigrated) { + logInfo("No running tasks, all blocks migrated, stopping.") + exitExecutor(0, "Finished decommissioning", notifyDriver = true) + } + previousAllBlocksMigrated = allBlocksMigrated } else { - logError("No driver to message decommissioning.") + logInfo("No running tasks, no block migration configured, stopping.") + exitExecutor(0, "Finished decommissioning", notifyDriver = true) } - if (executor != null) { - executor.decommission() + } else { + // If there's a running task it could store blocks. + previousAllBlocksMigrated = false + } + } + + private def decommissionSelf(): Boolean = { + if (!decommissioned) { + logInfo("Decommissioning self and starting background thread to exit when done.") + try { + decommissioned = true + // Tell master we are are decommissioned so it stops trying to schedule us + if (driver.nonEmpty) { + driver.get.askSync[Boolean](DecommissionExecutor(executorId)) + } else { + logError("No driver to message decommissioning.") + } + if (executor != null) { + executor.decommission() + } + // Shutdown the executor once all tasks are gone :) + val shutdownThread = new Thread() { + while (true) { + shutdownIfDone() + Thread.sleep(1000) // 1s + } + } + shutdownThread.setDaemon(true) + shutdownThread.setName("decommission-shutdown-thread") + shutdownThread.start() + logInfo("Done decommissioning self.") + // Return true since we are handling a signal + true + } catch { + case e: Exception => + logError(s"Error ${e} during attempt to decommission self") + false } - logInfo("Done decommissioning self.") - // Return true since we are handling a signal + } else { true - } catch { - case e: Exception => - logError(s"Error ${e} during attempt to decommission self") - false } } } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 45cec726c4ca7..6a2dcd4b850f9 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -230,9 +230,10 @@ private[spark] class Executor( private[executor] def numRunningTasks: Int = runningTasks.size() /** - * Mark an executor for decommissioning and avoid launching new tasks. + * Mark an executor for decommissioning. */ private[spark] def decommission(): Unit = { + logInfo("Executor asked to decommission.") decommissioned = true } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 8ef0c37198568..6c2b90cd8d83b 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -420,6 +420,21 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val STORAGE_SHUFFLE_DECOMMISSION_ENABLED = + ConfigBuilder("spark.storage.decommission.shuffle_blocks") + .doc("Whether to transfer shuffle blocks during block manager decommissioning. Requires " + + "an indexed shuffle resolver (like sort based shuffe)") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + + private[spark] val STORAGE_RDD_DECOMMISSION_ENABLED = + ConfigBuilder("spark.storage.decommission.rdd_blocks") + .doc("Whether to transfer RDD blocks during block manager decommissioning.") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK = ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock") .internal() diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 3de7377f99202..2069f812e067e 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -168,7 +168,8 @@ private[spark] class NettyBlockTransferService( // Everything else is encoded using our binary protocol. val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag))) - val asStream = blockData.size() > conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM) + val asStream = (blockData.size() > conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM) || + blockId.isInternalShuffle || blockId.isShuffle) val callback = new RpcResponseCallback { override def onSuccess(response: ByteBuffer): Unit = { logTrace(s"Successfully uploaded block $blockId${if (asStream) " as stream" else ""}") diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 7f8893ff3b9d8..9dee1f779bcb5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -33,9 +33,11 @@ import org.apache.spark.util.Utils * task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks. */ private[spark] sealed trait MapStatus { - /** Location where this task was run. */ + /** Location where this task output is. */ def location: BlockManagerId + def updateLocation(bm: BlockManagerId): Unit + /** * Estimated size for the reduce block, in bytes. * @@ -126,6 +128,10 @@ private[spark] class CompressedMapStatus( override def location: BlockManagerId = loc + override def updateLocation(bm: BlockManagerId): Unit = { + loc = bm + } + override def getSizeForBlock(reduceId: Int): Long = { MapStatus.decompressSize(compressedSizes(reduceId)) } @@ -178,6 +184,10 @@ private[spark] class HighlyCompressedMapStatus private ( override def location: BlockManagerId = loc + override def updateLocation(bm: BlockManagerId): Unit = { + loc = bm + } + override def getSizeForBlock(reduceId: Int): Long = { assert(hugeBlockSizes != null) if (emptyBlocks.contains(reduceId)) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 465c0d20de481..b961ac3a32b1e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -52,6 +52,8 @@ private[spark] object CoarseGrainedClusterMessages { case class UpdateDelegationTokens(tokens: Array[Byte]) extends CoarseGrainedClusterMessage + case object DecommissionSelf extends CoarseGrainedClusterMessage // Mark as decommissioned. + // Executors to driver case class RegisterExecutor( executorId: String, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 67638a5f9593c..43f17c0477797 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -27,7 +27,7 @@ import scala.concurrent.Future import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} -import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.{DeployMessage, SparkHadoopUtil} import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.executor.ExecutorLogUrlHandler import org.apache.spark.internal.Logging @@ -432,7 +432,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp if (shouldDisable) { logInfo(s"Starting decommissioning executor $executorId.") try { + // Stop making offers on this executor scheduler.executorDecommission(executorId) + // Send decommission message to the executor (it could have originated there but not + // necessarily). + executorDataMap.get(executorId) match { + case Some(executorInfo) => + executorInfo.executorEndpoint.send( + DecommissionSelf) + case None => + // Ignoring the executor since it is not registered. + logWarning(s"Attempted to decommission unknown executor $executorId.") + } } catch { case e: Exception => logError(s"Unexpected error during decommissioning ${e.toString}", e) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 42c46464d79e1..ff41957779c83 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -44,7 +44,7 @@ private[spark] class StandaloneSchedulerBackend( with StandaloneAppClientListener with Logging { - private var client: StandaloneAppClient = null + private[spark] var client: StandaloneAppClient = null private val stopping = new AtomicBoolean(false) private val launcherBackend = new LauncherBackend() { override protected def conf: SparkConf = sc.conf diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index af2c82e771970..7886e02b8b10d 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -18,6 +18,7 @@ package org.apache.spark.shuffle import java.io._ +import java.nio.ByteBuffer import java.nio.channels.Channels import java.nio.file.Files @@ -25,8 +26,10 @@ import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.io.NioBufferedFileInputStream import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} +import org.apache.spark.network.client.StreamCallbackWithID import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.ExecutorDiskUtils +import org.apache.spark.serializer.SerializerManager import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID import org.apache.spark.storage._ import org.apache.spark.util.Utils @@ -46,7 +49,7 @@ private[spark] class IndexShuffleBlockResolver( conf: SparkConf, _blockManager: BlockManager = null) extends ShuffleBlockResolver - with Logging { + with Logging with MigratableResolver { private lazy val blockManager = Option(_blockManager).getOrElse(SparkEnv.get.blockManager) @@ -55,6 +58,25 @@ private[spark] class IndexShuffleBlockResolver( def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId, mapId, None) + /** + * Get the shuffle files that are stored locally. Used for block migrations. + */ + override def getStoredShuffles(): Set[(Int, Long)] = { + // Matches ShuffleIndexBlockId name + val pattern = "shuffle_(\\d+)_(\\d+)_.+\\.index".r + val rootDirs = blockManager.diskBlockManager.localDirs + // ExecutorDiskUtil puts things inside one level hashed sub directories + val searchDirs = rootDirs.flatMap(_.listFiles()).filter(_.isDirectory()) ++ rootDirs + val filenames = searchDirs.flatMap(_.list()) + logDebug(s"Got block files ${filenames.toList}") + filenames.flatMap { fname => + pattern.findAllIn(fname).matchData.map { + matched => (matched.group(1).toInt, matched.group(2).toLong) + } + }.toSet + } + + /** * Get the shuffle data file. * @@ -148,6 +170,87 @@ private[spark] class IndexShuffleBlockResolver( } } + /** + * Write a provided shuffle block as a stream. Used for block migrations. + * ShuffleBlockBatchIds must contain the full range represented in the ShuffleIndexBlock. + * Requires the caller to delete any shuffle index blocks where the shuffle block fails to + * put. + */ + override def putShuffleBlockAsStream(blockId: BlockId, serializerManager: SerializerManager): + StreamCallbackWithID = { + val file = blockId match { + case ShuffleIndexBlockId(shuffleId, mapId, _) => + getIndexFile(shuffleId, mapId) + case ShuffleDataBlockId(shuffleId, mapId, _) => + getDataFile(shuffleId, mapId) + case _ => + throw new Exception(s"Unexpected shuffle block transfer ${blockId} as " + + s"${blockId.getClass().getSimpleName()}") + } + val fileTmp = Utils.tempFileWith(file) + val channel = Channels.newChannel( + serializerManager.wrapStream(blockId, + new FileOutputStream(fileTmp))) + + new StreamCallbackWithID { + + override def getID: String = blockId.name + + override def onData(streamId: String, buf: ByteBuffer): Unit = { + while (buf.hasRemaining) { + channel.write(buf) + } + } + + override def onComplete(streamId: String): Unit = { + logTrace(s"Done receiving shuffle block $blockId, now storing on local disk.") + channel.close() + val diskSize = fileTmp.length() + this.synchronized { + if (file.exists()) { + file.delete() + } + if (!fileTmp.renameTo(file)) { + throw new IOException(s"fail to rename file ${fileTmp} to ${file}") + } + } + blockManager.reportBlockStatus(blockId, BlockStatus( + StorageLevel( + useDisk = true, + useMemory = false, + useOffHeap = false, + deserialized = false, + replication = 0) + , 0, diskSize)) + } + + override def onFailure(streamId: String, cause: Throwable): Unit = { + // the framework handles the connection itself, we just need to do local cleanup + logWarning(s"Error while uploading $blockId", cause) + channel.close() + fileTmp.delete() + } + } + } + + /** + * Get the index & data block for migration. + */ + def getMigrationBlocks(shuffleId: Int, mapId: Long): List[(BlockId, ManagedBuffer)] = { + // Load the index block + val indexFile = getIndexFile(shuffleId, mapId) + val indexBlockId = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID) + val indexFileSize = indexFile.length() + val indexBlockData = new FileSegmentManagedBuffer(transportConf, indexFile, 0, indexFileSize) + + // Load the data block + val dataFile = getDataFile(shuffleId, mapId) + val dataBlockId = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID) + val dataBlockData = new FileSegmentManagedBuffer(transportConf, dataFile, 0, dataFile.length()) + List((indexBlockId, indexBlockData), (dataBlockId, dataBlockData)) + } + + /** * Write an index file with the offsets of each block, plus a final offset at the end for the * end of the output file. This will be used by getBlockData to figure out where each block @@ -169,7 +272,7 @@ private[spark] class IndexShuffleBlockResolver( val dataFile = getDataFile(shuffleId, mapId) // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure // the following check and rename are atomic. - synchronized { + this.synchronized { val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length) if (existingLengths != null) { // Another attempt for the same task has already written our map outputs successfully, diff --git a/core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala new file mode 100644 index 0000000000000..768d4f8db4364 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.annotation.Experimental +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.client.StreamCallbackWithID +import org.apache.spark.serializer.SerializerManager +import org.apache.spark.storage.BlockId + +/** + * :: Experimental :: + * An experimental trait to allow Spark to migrate shuffle blocks. + */ +@Experimental +trait MigratableResolver { + /** + * Get the shuffle ids that are stored locally. Used for block migrations. + */ + def getStoredShuffles(): Set[(Int, Long)] + + /** + * Write a provided shuffle block as a stream. Used for block migrations. + */ + def putShuffleBlockAsStream(blockId: BlockId, serializerManager: SerializerManager): + StreamCallbackWithID + + /** + * Get the blocks for migration for a particular shuffle and map. + */ + def getMigrationBlocks(shuffleId: Int, mapId: Long): List[(BlockId, ManagedBuffer)] +} diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 68ed3aa5b062f..398db1f4875ee 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -40,6 +40,9 @@ sealed abstract class BlockId { def isRDD: Boolean = isInstanceOf[RDDBlockId] def isShuffle: Boolean = isInstanceOf[ShuffleBlockId] || isInstanceOf[ShuffleBlockBatchId] def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId] + def isInternalShuffle: Boolean = { + isInstanceOf[ShuffleDataBlockId] || isInstanceOf[ShuffleIndexBlockId] + } override def toString: String = name } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e0478ad09601d..29fd9ef462d97 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -24,6 +24,7 @@ import java.nio.channels.Channels import java.util.Collections import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, TimeUnit} +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.HashMap import scala.concurrent.{ExecutionContext, Future} @@ -53,6 +54,7 @@ import org.apache.spark.network.util.TransportConf import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.serializer.{SerializerInstance, SerializerManager} +import org.apache.spark.shuffle.{MigratableResolver, ShuffleManager, ShuffleWriteMetricsReporter} import org.apache.spark.shuffle.{ShuffleManager, ShuffleWriteMetricsReporter} import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock import org.apache.spark.storage.memory._ @@ -243,7 +245,7 @@ private[spark] class BlockManager( private var blockReplicationPolicy: BlockReplicationPolicy = _ private var blockManagerDecommissioning: Boolean = false - private var decommissionManager: Option[BlockManagerDecommissionManager] = None + private[spark] var decommissionManager: Option[BlockManagerDecommissionManager] = None // A DownloadFileManager used to track all the files of remote blocks which are above the // specified memory threshold. Files will be deleted automatically based on weak reference. @@ -254,6 +256,17 @@ private[spark] class BlockManager( var hostLocalDirManager: Option[HostLocalDirManager] = None + private lazy val migratableResolver: MigratableResolver = { + shuffleManager.shuffleBlockResolver.asInstanceOf[MigratableResolver] + } + + // Shuffles which are either in queue for migrations or migrated + private val migratingShuffles = mutable.HashSet[(Int, Long)]() + // Shuffles which have migrated + private val migratedShuffles = mutable.HashSet[(Int, Long)]() + // Shuffles which are queued for migration + private val shufflesToMigrate = new java.util.concurrent.ConcurrentLinkedQueue[(Int, Long)]() + /** * Abstraction for storing blocks from bytes, whether they start in memory or on disk. * @@ -617,6 +630,7 @@ private[spark] class BlockManager( */ override def getLocalBlockData(blockId: BlockId): ManagedBuffer = { if (blockId.isShuffle) { + logInfo(s"Getting local shuffle block ${blockId}") shuffleManager.shuffleBlockResolver.getBlockData(blockId) } else { getLocalBytes(blockId) match { @@ -650,6 +664,23 @@ private[spark] class BlockManager( blockId: BlockId, level: StorageLevel, classTag: ClassTag[_]): StreamCallbackWithID = { + + if (blockManagerDecommissioning) { + throw new BlockSavedOnDecommissionedBlockManagerException(blockId) + } + + if (blockId.isShuffle || blockId.isInternalShuffle) { + logInfo(s"Putting shuffle block ${blockId}") + try { + return migratableResolver.putShuffleBlockAsStream(blockId, serializerManager) + } catch { + case e: ClassCastException => throw new Exception( + s"Unexpected shuffle block ${blockId} with unsupported shuffle " + + s"resolver ${shuffleManager.shuffleBlockResolver}") + } + } + logInfo(s"Putting regular block ${blockId}") + // All other blocks val (_, tmpFile) = diskBlockManager.createTempLocalBlock() val channel = new CountingWritableChannel( Channels.newChannel(serializerManager.wrapForEncryption(new FileOutputStream(tmpFile)))) @@ -720,7 +751,7 @@ private[spark] class BlockManager( * it is still valid). This ensures that update in master will compensate for the increase in * memory on slave. */ - private def reportBlockStatus( + private[spark] def reportBlockStatus( blockId: BlockId, status: BlockStatus, droppedMemorySize: Long = 0L): Unit = { @@ -1285,6 +1316,9 @@ private[spark] class BlockManager( require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") + if (blockManagerDecommissioning) { + throw new BlockSavedOnDecommissionedBlockManagerException(blockId) + } val putBlockInfo = { val newInfo = new BlockInfo(level, classTag, tellMaster) @@ -1781,7 +1815,7 @@ private[spark] class BlockManager( def decommissionBlockManager(): Unit = { if (!blockManagerDecommissioning) { - logInfo("Starting block manager decommissioning process") + logInfo("Starting block manager decommissioning process...") blockManagerDecommissioning = true decommissionManager = Some(new BlockManagerDecommissionManager(conf)) decommissionManager.foreach(_.start()) @@ -1790,19 +1824,128 @@ private[spark] class BlockManager( } } + + private class ShuffleMigrationRunnable(peer: BlockManagerId) extends Runnable { + @volatile var running = true + override def run(): Unit = { + var migrating: Option[(Int, Long)] = None + val storageLevel = StorageLevel( + useDisk = true, + useMemory = false, + useOffHeap = false, + deserialized = false, + replication = 1) + logInfo(s"Starting migration thread for ${peer}") + // Once a block fails to transfer to an executor stop trying to transfer more blocks + try { + while (running) { + val migrating = Option(shufflesToMigrate.poll()) + migrating match { + case None => + logInfo("Nothing to migrate") + // Nothing to do right now, but maybe a transfer will fail or a new block + // will finish being committed. + val SLEEP_TIME_SECS = 1 + Thread.sleep(SLEEP_TIME_SECS * 1000L) + case Some((shuffleId, mapId)) => + logInfo(s"Trying to migrate shuffle ${shuffleId},${mapId} to ${peer}") + val blocks = + migratableResolver.getMigrationBlocks(shuffleId, mapId) + logInfo(s"Got migration sub-blocks ${blocks}") + blocks.foreach { case (blockId, buffer) => + logInfo(s"Migrating sub-block ${blockId}") + blockTransferService.uploadBlockSync( + peer.host, + peer.port, + peer.executorId, + blockId, + buffer, + storageLevel, + null)// class tag, we don't need for shuffle + logInfo(s"Migrated sub block ${blockId}") + } + logInfo(s"Migrated ${shuffleId},${mapId} to ${peer}") + migratedShuffles += ((shuffleId, mapId)) + } + } + // This catch is intentionally outside of the while running block. + // if we encounter errors migrating to an executor we want to stop. + } catch { + case e: Exception => + migrating match { + case Some(shuffleMap) => + logError("Error ${e} during migration, adding ${shuffleMap} back to migration queue") + shufflesToMigrate.add(shuffleMap) + case None => + logError(s"Error ${e} while waiting for block to migrate") + } + } + } + } + + private val migrationPeers = mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]() + + /** + * Tries to offload all shuffle blocks that are registered with the shuffle service locally. + * Note: this does not delete the shuffle files in-case there is an in-progress fetch + * but rather shadows them. + * Requires an Indexed based shuffle resolver. + * + * @return true if we have not migrated all shuffle blocks, false otherwise. + */ + def offloadShuffleBlocks(): Boolean = { + // Update the queue of shuffles to be migrated + logInfo("Offloading shuffle blocks") + val localShuffles = migratableResolver.getStoredShuffles() + logInfo(s"My local shuffles are ${localShuffles.toList}") + val newShufflesToMigrate = localShuffles.&~(migratingShuffles).toSeq + logInfo(s"My new shuffles to migrate ${newShufflesToMigrate.toList}") + shufflesToMigrate.addAll(newShufflesToMigrate.asJava) + migratingShuffles ++= newShufflesToMigrate + + // Update the threads doing migrations + // TODO: Sort & only start as many threads as min(||blocks||, ||targets||) using location pref + val livePeerSet = getPeers(false).toSet + val currentPeerSet = migrationPeers.keys.toSet + val deadPeers = currentPeerSet.&~(livePeerSet) + val newPeers = livePeerSet.&~(currentPeerSet) + migrationPeers ++= newPeers.map { peer => + logDebug(s"Starting thread to migrate shuffle blocks to ${peer}") + val executor = ThreadUtils.newDaemonSingleThreadExecutor(s"migrate-shuffle-to-${peer}") + val runnable = new ShuffleMigrationRunnable(peer) + executor.submit(runnable) + (peer, runnable) + } + // A peer may have entered a decommissioning state, don't transfer any new blocks + deadPeers.foreach { peer => + migrationPeers.get(peer).foreach(_.running = false) + } + // If we found any new shuffles to migrate or otherwise have not migrated everything. + return newShufflesToMigrate.nonEmpty || (migratingShuffles.&~(migratedShuffles)).nonEmpty + } + + + /** + * Stop migrating shuffle blocks. + */ + def stopOffloadingShuffleBlocks(): Unit = { + logInfo("Stopping offloading shuffle blocks") + migrationPeers.values.foreach(_.running = false) + } + /** * Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers * Visible for testing */ - def decommissionRddCacheBlocks(): Unit = { + private[spark] def decommissionRddCacheBlocks(): Boolean = { val replicateBlocksInfo = master.getReplicateInfoForRDDBlocks(blockManagerId) if (replicateBlocksInfo.nonEmpty) { - logInfo(s"Need to replicate ${replicateBlocksInfo.size} blocks " + + logInfo(s"Need to replicate ${replicateBlocksInfo.size} RDD blocks " + "for block manager decommissioning") } else { logWarning(s"Asked to decommission RDD cache blocks, but no blocks to migrate") - return + return false } // Maximum number of storage replication failure which replicateBlock can handle @@ -1831,6 +1974,7 @@ private[spark] class BlockManager( logWarning("Blocks failed replication in cache decommissioning " + s"process: ${blocksFailedReplication.mkString(",")}") } + return true } /** @@ -1905,25 +2049,52 @@ private[spark] class BlockManager( * Class to handle block manager decommissioning retries * It creates a Thread to retry offloading all RDD cache blocks */ - private class BlockManagerDecommissionManager(conf: SparkConf) { + private[spark] class BlockManagerDecommissionManager(conf: SparkConf) { @volatile private var stopped = false - private val sleepInterval = conf.get( - config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL) + // Since running tasks can add more blocks this can change. + @volatile var allBlocksMigrated = false + var previousBlocksLeft = true + private val blockMigrationThread = new Thread { + val sleepInterval = conf.get( + config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL) - private val blockReplicationThread = new Thread { override def run(): Unit = { var failures = 0 while (blockManagerDecommissioning && !stopped && !Thread.interrupted() && failures < 20) { + logInfo("Iterating on migrating from the block manager.") try { - logDebug("Attempting to replicate all cached RDD blocks") - decommissionRddCacheBlocks() - logInfo("Attempt to replicate all cached blocks done") + var blocksLeft = false + // If enabled we migrate shuffle blocks first as they are more expensive. + if (conf.get(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED)) { + logDebug("Attempting to replicate all shuffle blocks") + blocksLeft = blocksLeft || offloadShuffleBlocks() + logInfo("Done starting workers to migrate shuffle blocks") + } + if (conf.get(config.STORAGE_RDD_DECOMMISSION_ENABLED)) { + logDebug("Attempting to replicate all cached RDD blocks") + blocksLeft = blocksLeft || decommissionRddCacheBlocks() + logInfo("Attempt to replicate all cached blocks done") + blocksLeft + } + logInfo(s"We have blocksLeft: ${blocksLeft}") + // Avoid the situation where block was added during the loop + allBlocksMigrated = (! blocksLeft ) && ( ! previousBlocksLeft ) + previousBlocksLeft = blocksLeft + if (!conf.get(config.STORAGE_RDD_DECOMMISSION_ENABLED) && + !conf.get(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED)) { + logWarning("Decommissioning, but no task configured set one or both:\n" + + "spark.storage.decommission.shuffle_blocks\n" + + "spark.storage.decommission.rdd_blocks") + allBlocksMigrated = true + stopped = true + } + logInfo(s"Waiting for ${sleepInterval} before refreshing migrations.") Thread.sleep(sleepInterval) } catch { - case _: InterruptedException => + case e: InterruptedException => logInfo("Interrupted during migration, will not refresh migrations.") stopped = true case NonFatal(e) => @@ -1934,24 +2105,26 @@ private[spark] class BlockManager( } } } - blockReplicationThread.setDaemon(true) - blockReplicationThread.setName("block-replication-thread") + blockMigrationThread.setDaemon(true) + blockMigrationThread.setName("block-replication-thread") def start(): Unit = { logInfo("Starting block replication thread") - blockReplicationThread.start() + blockMigrationThread.start() } def stop(): Unit = { if (!stopped) { stopped = true logInfo("Stopping block replication thread") - blockReplicationThread.interrupt() + blockMigrationThread.interrupt() + stopOffloadingShuffleBlocks() } } } def stop(): Unit = { + logInfo("Stopping decommission manager") decommissionManager.foreach(_.stop()) blockTransferService.close() if (blockStoreClient ne blockTransferService) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 3cfa5d2a25818..ff0b099e44b51 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.spark.storage import scala.collection.generic.CanBuildFrom diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index d936420a99276..1986b62e0ed19 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -28,7 +28,7 @@ import scala.util.Random import com.google.common.cache.CacheBuilder -import org.apache.spark.SparkConf +import org.apache.spark.{MapOutputTrackerMaster, SparkConf} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.{config, Logging} import org.apache.spark.network.shuffle.ExternalBlockStoreClient @@ -48,7 +48,8 @@ class BlockManagerMasterEndpoint( conf: SparkConf, listenerBus: LiveListenerBus, externalBlockStoreClient: Option[ExternalBlockStoreClient], - blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo]) + blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo], + mapOutputTracker: MapOutputTrackerMaster) extends IsolatedRpcEndpoint with Logging { // Mapping from executor id to the block manager's local disk directories. @@ -157,7 +158,8 @@ class BlockManagerMasterEndpoint( context.reply(true) case DecommissionBlockManagers(executorIds) => - decommissionBlockManagers(executorIds.flatMap(blockManagerIdByExecutor.get)) + val bmIds = executorIds.flatMap(blockManagerIdByExecutor.get) + decommissionBlockManagers(bmIds) context.reply(true) case GetReplicateInfoForRDDBlocks(blockManagerId) => @@ -334,13 +336,14 @@ class BlockManagerMasterEndpoint( val info = blockManagerInfo(blockManagerId) val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD) - rddBlocks.map { blockId => + val result = rddBlocks.map { blockId => val currentBlockLocations = blockLocations.get(blockId) val maxReplicas = currentBlockLocations.size + 1 val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId) val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas) replicateMsg }.toSeq + result } // Remove a block from the slaves that have it. This can only be used to remove @@ -489,6 +492,24 @@ class BlockManagerMasterEndpoint( storageLevel: StorageLevel, memSize: Long, diskSize: Long): Boolean = { + logInfo(s"Updating block info on master ${blockId} for ${blockManagerId}") + + if (blockId.isInternalShuffle) { + blockId match { + case ShuffleIndexBlockId(shuffleId, mapId, _) => + // Don't update the map output on just the index block + logDebug("Received shuffle index block update for ${shuffleId} ${mapId}, ignoring.") + return true + case ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) => + logInfo(s"Received shuffle data block update for ${shuffleId} ${mapId}, updating.") + mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId) + return true + case _ => + logDebug(s"Unexpected shuffle block type ${blockId}" + + s"as ${blockId.getClass().getSimpleName()}") + return false + } + } if (!blockManagerInfo.contains(blockManagerId)) { if (blockManagerId.isDriver && !isLocal) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockSavedOnDecommissionedBlockManagerException.scala b/core/src/main/scala/org/apache/spark/storage/BlockSavedOnDecommissionedBlockManagerException.scala new file mode 100644 index 0000000000000..4684d9c67754d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/BlockSavedOnDecommissionedBlockManagerException.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +class BlockSavedOnDecommissionedBlockManagerException(blockId: BlockId) + extends Exception(s"Block $blockId cannot be saved on decommissioned executor") diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index 148d20ee659a2..cd3ab4db77f85 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -58,7 +58,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { }) TestUtils.waitUntilExecutorsUp(sc = sc, numExecutors = 2, - timeout = 10000) // 10s + timeout = 30000) // 30s val sleepyRdd = input.mapPartitions{ x => Thread.sleep(5000) // 5s x diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala index 27bb06b4e0636..b4092be466e27 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala @@ -48,6 +48,7 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa when(blockManager.diskBlockManager).thenReturn(diskBlockManager) when(diskBlockManager.getFile(any[BlockId])).thenAnswer( (invocation: InvocationOnMock) => new File(tempDir, invocation.getArguments.head.toString)) + when(diskBlockManager.localDirs).thenReturn(Array(tempDir)) } override def afterEach(): Unit = { @@ -73,6 +74,7 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa } resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths, dataTmp) + val storedShuffles = resolver.getStoredShuffles() val indexFile = new File(tempDir.getAbsolutePath, idxName) val dataFile = resolver.getDataFile(shuffleId, mapId) @@ -81,6 +83,7 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa assert(dataFile.exists()) assert(dataFile.length() === 30) assert(!dataTmp.exists()) + assert(storedShuffles === Set((1, 2))) val lengths2 = new Array[Long](3) val dataTmp2 = File.createTempFile("shuffle", null, tempDir) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index 7456ca7f02a2e..df356df0cdbf3 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -22,85 +22,215 @@ import java.util.concurrent.Semaphore import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, Success} +import org.scalatest.concurrent.Eventually + +import org.apache.spark._ import org.apache.spark.internal.config -import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart} +import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend import org.apache.spark.util.{ResetSystemProperties, ThreadUtils} class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext - with ResetSystemProperties { + with ResetSystemProperties with Eventually { + + val numExecs = 3 + val numParts = 3 + + test(s"verify that an already running task which is going to cache data succeeds " + + s"on a decommissioned executor") { + runDecomTest(true, false, true) + } + + test(s"verify that shuffle blocks are migrated") { + runDecomTest(false, true, false) + } + + test(s"verify that both migrations can work at the same time.") { + runDecomTest(true, true, false) + } - override def beforeEach(): Unit = { - val conf = new SparkConf().setAppName("test") + private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring: Boolean) = { + + val master = s"local-cluster[${numExecs}, 1, 1024]" + val conf = new SparkConf().setAppName("test").setMaster(master) .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true) .set(config.STORAGE_DECOMMISSION_ENABLED, true) + .set(config.STORAGE_RDD_DECOMMISSION_ENABLED, persist) + .set(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED, shuffle) + // Just replicate blocks as fast as we can during testing, there isn't another + // workload we need to worry about. + .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L) - sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf) - } + sc = new SparkContext(master, "test", conf) - test(s"verify that an already running task which is going to cache data succeeds " + - s"on a decommissioned executor") { // Create input RDD with 10 partitions - val input = sc.parallelize(1 to 10, 10) + val input = sc.parallelize(1 to numParts, numParts) val accum = sc.longAccumulator("mapperRunAccumulator") // Do a count to wait for the executors to be registered. input.count() // Create a new RDD where we have sleep in each partition, we are also increasing // the value of accumulator in each partition - val sleepyRdd = input.mapPartitions { x => - Thread.sleep(500) + val baseRdd = input.mapPartitions { x => + if (migrateDuring) { + Thread.sleep(500) + } accum.add(1) - x + x.map(y => (y, y)) + } + val testRdd = shuffle match { + case true => baseRdd.reduceByKey(_ + _) + case false => baseRdd } - // Listen for the job - val sem = new Semaphore(0) + // Listen for the job & block updates + val taskStartSem = new Semaphore(0) + val broadcastSem = new Semaphore(0) + val executorRemovedSem = new Semaphore(0) val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd] + val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated] sc.addSparkListener(new SparkListener { + + override def onExecutorRemoved(execRemoved: SparkListenerExecutorRemoved): Unit = { + executorRemovedSem.release() + } + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { - sem.release() + taskStartSem.release() } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { taskEndEvents.append(taskEnd) } + + override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { + // Once broadcast start landing on the executors we're good to proceed. + // We don't only use task start as it can occur before the work is on the executor. + if (blockUpdated.blockUpdatedInfo.blockId.isBroadcast) { + broadcastSem.release() + } + blocksUpdated.append(blockUpdated) + } }) + // Cache the RDD lazily - sleepyRdd.persist() + if (persist) { + testRdd.persist() + } - // Start the computation of RDD - this step will also cache the RDD - val asyncCount = sleepyRdd.countAsync() + // Wait for the first executor to start + TestUtils.waitUntilExecutorsUp(sc = sc, + numExecutors = 1, + timeout = 20000) // 20s - // Wait for the job to have started - sem.acquire(1) + // Start the computation of RDD - this step will also cache the RDD + val asyncCount = testRdd.countAsync() + + // Wait for all of the executors to start + TestUtils.waitUntilExecutorsUp(sc = sc, + // We need to make sure there is the original plus one exec to migrate too, we don't need + // the full set. + numExecutors = 2, + timeout = 30000) // 30s + + // Wait for the job to have started. + taskStartSem.acquire(1) + // Wait for each executor + driver to have it's broadcast info delivered. + broadcastSem.acquire((numExecs + 1)) + + // Make sure the job is either mid run or otherwise has data to migrate. + if (migrateDuring) { + // Give Spark a tiny bit to start executing after the broadcast blocks land. + // For me this works at 100, set to 300 for system variance. + Thread.sleep(300) + } else { + ThreadUtils.awaitResult(asyncCount, 15.seconds) + } - // Give Spark a tiny bit to start the tasks after the listener says hello - Thread.sleep(100) // Decommission one of the executor val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] val execs = sched.getExecutorIds() - assert(execs.size == 2, s"Expected 2 executors but found ${execs.size}") + assert(execs.size == numExecs, s"Expected ${numExecs} executors but found ${execs.size}") + val execToDecommission = execs.head + logDebug(s"Decommissioning executor ${execToDecommission}") sched.decommissionExecutor(execToDecommission) // Wait for job to finish - val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 6.seconds) - assert(asyncCountResult === 10) - // All 10 tasks finished, so accum should have been increased 10 times - assert(accum.value === 10) + val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds) + assert(asyncCountResult === numParts) + // All tasks finished, so accum should have been increased numParts times + assert(accum.value === numParts) // All tasks should be successful, nothing should have failed sc.listenerBus.waitUntilEmpty() - assert(taskEndEvents.size === 10) // 10 mappers - assert(taskEndEvents.map(_.reason).toSet === Set(Success)) + if (shuffle) { + // mappers & reducers which succeeded + assert(taskEndEvents.count(_.reason == Success) === 2 * numParts, + s"Expected ${2 * numParts} tasks got ${taskEndEvents.size} (${taskEndEvents})") + } else { + // only mappers which executed successfully + assert(taskEndEvents.count(_.reason == Success) === numParts, + s"Expected ${numParts} tasks got ${taskEndEvents.size} (${taskEndEvents})") + } - // Since the RDD is cached, so further usage of same RDD should use the + // Wait for our respective blocks to have migrated + eventually(timeout(15.seconds), interval(10.milliseconds)) { + if (persist) { + // One of our blocks should have moved. + val rddUpdates = blocksUpdated.filter { update => + val blockId = update.blockUpdatedInfo.blockId + blockId.isRDD} + val blockLocs = rddUpdates.map { update => + (update.blockUpdatedInfo.blockId.name, + update.blockUpdatedInfo.blockManagerId)} + val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.size) + assert(!blocksToManagers.filter(_._2 > 1).isEmpty, + s"We should have a block that has been on multiple BMs in rdds:\n ${rddUpdates} from:\n" + + s"${blocksUpdated}\n but instead we got:\n ${blocksToManagers}") + } + // If we're migrating shuffles we look for any shuffle block updates + // as there is no block update on the initial shuffle block write. + if (shuffle) { + val numDataLocs = blocksUpdated.filter { update => + val blockId = update.blockUpdatedInfo.blockId + blockId.isInstanceOf[ShuffleDataBlockId] + }.size + val numIndexLocs = blocksUpdated.filter { update => + val blockId = update.blockUpdatedInfo.blockId + blockId.isInstanceOf[ShuffleIndexBlockId] + }.size + assert(numDataLocs >= 1, s"Expect shuffle data block updates in ${blocksUpdated}") + assert(numIndexLocs >= 1, s"Expect shuffle index block updates in ${blocksUpdated}") + } + } + + // Since the RDD is cached or shuffled so further usage of same RDD should use the // cached data. Original RDD partitions should not be recomputed i.e. accum // should have same value like before - assert(sleepyRdd.count() === 10) - assert(accum.value === 10) + assert(testRdd.count() === numParts) + assert(accum.value === numParts) + + val storageStatus = sc.env.blockManager.master.getStorageStatus + val execIdToBlocksMapping = storageStatus.map( + status => (status.blockManagerId.executorId, status.blocks)).toMap + // No cached blocks should be present on executor which was decommissioned + assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq(), + "Cache blocks should be migrated") + if (persist) { + // There should still be all the RDD blocks cached + assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === numParts) + } + + // Wait for the executor to be removed after blocks are migrated. + executorRemovedSem.acquire(1) + + // Since the RDD is cached or shuffled so further usage of same RDD should use the + // cached data. Original RDD partitions should not be recomputed i.e. accum + // should have same value like before + assert(testRdd.count() === numParts) + assert(accum.value === numParts) + } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 660bfcfc48267..d18d84dfaa9e5 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -103,7 +103,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]() master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(conf), None, blockManagerInfo)), + new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker)), rpcEnv.setupEndpoint("blockmanagerHeartbeat", new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true) allStores.clear() diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index bfef8f1ab29d8..4143129810aa3 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -149,7 +149,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE liveListenerBus = spy(new LiveListenerBus(conf)) master = spy(new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - liveListenerBus, None, blockManagerInfo)), + liveListenerBus, None, blockManagerInfo, mapOutputTracker)), rpcEnv.setupEndpoint("blockmanagerHeartbeat", new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true)) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala index becf9415c7506..e71d9ea127d25 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala @@ -28,18 +28,29 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite => .set(Worker.WORKER_DECOMMISSION_ENABLED.key, "true") .set("spark.kubernetes.pyspark.pythonVersion", "3") .set("spark.kubernetes.container.image", pyImage) + .set("spark.storage.decommission.enabled", "true") + .set("spark.storage.decommission.shuffle_blocks", "true") + .set("spark.storage.decommission.shuffle_blocks", "true") + // Ensure we have somewhere to migrate our data too + .set("spark.executor.instances", "3") + // The default of 30 seconds is fine, but for testing we just want to get this done fast. + .set("spark.storage.decommission.replicationReattemptInterval", "1s") + .set("spark.storage.decommission.rdd_blocks", "true") runSparkApplicationAndVerifyCompletion( appResource = PYSPARK_DECOMISSIONING, mainClass = "", expectedLogOnCompletion = Seq( "Finished waiting, stopping Spark", - "decommissioning executor"), + "decommissioning executor", + "Final accumulator value is: 100"), appArgs = Array.empty[String], driverPodChecker = doBasicDriverPyPodCheck, executorPodChecker = doBasicExecutorPyPodCheck, appLocator = appLocator, isJVM = false, + pyFiles = None, + executorPatience = None, decommissioningTest = true) } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 65a2f1ff79697..bb7126057bf60 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -325,21 +325,36 @@ class KubernetesSuite extends SparkFunSuite val result = checkPodReady(namespace, name) result shouldBe (true) } - // Look for the string that indicates we're good to clean up - // on the driver + // Look for the string that indicates we're good to trigger decom on the driver logDebug("Waiting for first collect...") Eventually.eventually(TIMEOUT, INTERVAL) { assert(kubernetesTestComponents.kubernetesClient .pods() .withName(driverPodName) .getLog - .contains("Waiting to give nodes time to finish."), + .contains("Waiting to give nodes time to finish migration, decom exec 1."), "Decommission test did not complete first collect.") } // Delete the pod to simulate cluster scale down/migration. - val pod = kubernetesTestComponents.kubernetesClient.pods().withName(name) + // This will allow the pod to remain up for the grace period + val pod = kubernetesTestComponents.kubernetesClient.pods() + .withName(name) pod.delete() logDebug(s"Triggered pod decom/delete: $name deleted") + // Look for the string that indicates we should force kill the first + // Executor. This simulates the pod being fully lost. + logDebug("Waiting for second collect...") + Eventually.eventually(TIMEOUT, INTERVAL) { + assert(kubernetesTestComponents.kubernetesClient + .pods() + .withName(driverPodName) + .getLog + .contains("Waiting some more, please kill exec 1."), + "Decommission test did not complete second collect.") + } + logDebug("Force deleting") + val podNoGrace = pod.withGracePeriod(0) + podNoGrace.delete() } case Action.DELETED | Action.ERROR => execPods.remove(name) @@ -365,9 +380,10 @@ class KubernetesSuite extends SparkFunSuite .get(0) driverPodChecker(driverPod) - // If we're testing decommissioning we delete all the executors, but we should have - // an executor at some point. - Eventually.eventually(patienceTimeout, patienceInterval) { + + // If we're testing decommissioning we an executors, but we should have an executor + // at some point. + Eventually.eventually(TIMEOUT, patienceInterval) { execPods.values.nonEmpty should be (true) } execWatcher.close() @@ -482,6 +498,6 @@ private[spark] object KubernetesSuite { val SPARK_DFS_READ_WRITE_TEST = "org.apache.spark.examples.DFSReadWriteTest" val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest" val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest" - val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) + val TIMEOUT = PatienceConfiguration.Timeout(Span(3, Minutes)) val INTERVAL = PatienceConfiguration.Interval(Span(1, Seconds)) } diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py index f68f24d49763d..84dc514b8988b 100644 --- a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py +++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py @@ -31,14 +31,29 @@ .appName("PyMemoryTest") \ .getOrCreate() sc = spark._sc - rdd = sc.parallelize(range(10)) + acc = sc.accumulator(0) + + def addToAcc(x): + acc.add(1) + return x + + initialRdd = sc.parallelize(range(100), 5) + accRdd = initialRdd.map(addToAcc) + # Trigger a shuffle so there are shuffle blocks to migrate + rdd = accRdd.map(lambda x: (x, x)).groupByKey() rdd.collect() - print("Waiting to give nodes time to finish.") + print("1st accumulator value is: " + str(acc.value)) + print("Waiting to give nodes time to finish migration, decom exec 1.") + print("...") time.sleep(5) + rdd.count() + print("Waiting some more, please kill exec 1.") + print("...") + time.sleep(5) + print("Executor node should be deleted now") + rdd.count() rdd.collect() - print("Waiting some more....") - time.sleep(10) - rdd.collect() + print("Final accumulator value is: " + str(acc.value)) print("Finished waiting, stopping Spark.") spark.stop() print("Done, exiting Python") diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index 15cc3109da3f7..bf362b9da30f8 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -68,8 +68,6 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite with SharedThriftServ // Missing UDF "postgreSQL/boolean.sql", "postgreSQL/case.sql", - // SPARK-28624 - "date.sql", // SPARK-28620 "postgreSQL/float4.sql", // SPARK-28636 diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 0976494b6d094..558e2c99e0442 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -91,7 +91,7 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]() blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(conf), None, blockManagerInfo)), + new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker)), rpcEnv.setupEndpoint("blockmanagerHeartbeat", new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true)