From 6340f9b9c76f1796d87dfb5813eeb26d350ab511 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 29 May 2020 14:48:23 -0700 Subject: [PATCH] Fix the migration to store ShuffleDataBlockId, check that data and index blocks have both been migrated, check that RDD blocks are duplicated not just broadcast blocks, make the number of partitions smaller so the test can run faster, avoid the Thread.sleep for all of the tests except for the midflight test where we need it, check for the broadcast blocks landing (further along in scheduling) beyond just task start, force fetching the shuffle block to local disk if in shuffle block test mode, start the job as soon as the first executor comes online. --- .../shuffle/IndexShuffleBlockResolver.scala | 4 +- .../BlockManagerDecommissionSuite.scala | 106 ++++++++++++------ 2 files changed, 74 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index fe39af7b9fd6a..79323be45845c 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -180,8 +180,8 @@ private[spark] class IndexShuffleBlockResolver( StreamCallbackWithID = { val file = blockId match { case ShuffleIndexBlockId(shuffleId, mapId, _) => - getIndexFile(shuffleId, mapId) - case ShuffleBlockBatchId(shuffleId, mapId, _, _) => + getIndexFile(shuffleId, mapId) + case ShuffleDataBlockId(shuffleId, mapId, _) => getDataFile(shuffleId, mapId) case _ => throw new Exception(s"Unexpected shuffle block transfer ${blockId} as " + diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index eb0d28aeb2571..2219a32216075 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -24,8 +24,7 @@ import scala.concurrent.duration._ import org.scalatest.concurrent.Eventually -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, Success, - TestUtils} +import org.apache.spark._ import org.apache.spark.internal.config import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend @@ -35,33 +34,41 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext with ResetSystemProperties with Eventually { val numExecs = 3 + val numParts = 3 test(s"verify that an already running task which is going to cache data succeeds " + s"on a decommissioned executor") { - runDecomTest(true, false) + runDecomTest(true, false, true) } test(s"verify that shuffle blocks are migrated.") { - runDecomTest(false, true) + runDecomTest(false, true, false) } test(s"verify that both migrations can work at the same time.") { - runDecomTest(true, true) + runDecomTest(true, true, false) } - private def runDecomTest(persist: Boolean, shuffle: Boolean) = { + private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring: Boolean) = { val master = s"local-cluster[${numExecs}, 1, 1024]" val conf = new SparkConf().setAppName("test").setMaster(master) .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true) .set(config.STORAGE_DECOMMISSION_ENABLED, true) .set(config.STORAGE_RDD_DECOMMISSION_ENABLED, persist) .set(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED, shuffle) + // Just replicate blocks as fast as we can during testing, there isn't another + // workload we need to worry about. .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L) + // Force fetching to local disk + if (shuffle) { + conf.set("spark.network.maxRemoteBlockSizeFetchToMem", "1") + } + sc = new SparkContext(master, "test", conf) // Create input RDD with 10 partitions - val input = sc.parallelize(1 to 10, 10) + val input = sc.parallelize(1 to numParts, numParts) val accum = sc.longAccumulator("mapperRunAccumulator") // Do a count to wait for the executors to be registered. input.count() @@ -69,7 +76,9 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext // Create a new RDD where we have sleep in each partition, we are also increasing // the value of accumulator in each partition val sleepyRdd = input.mapPartitions { x => - Thread.sleep(250) + if (migrateDuring) { + Thread.sleep(500) + } accum.add(1) x.map(y => (y, y)) } @@ -79,12 +88,14 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext } // Listen for the job & block updates - val sem = new Semaphore(0) + val taskStartSem = new Semaphore(0) + val broadcastSem = new Semaphore(0) val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd] val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated] sc.addSparkListener(new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { - sem.release() + taskStartSem.release() } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { @@ -92,6 +103,11 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext } override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { + // Once broadcast start landing on the executors we're good to proceed. + // We don't only use task start as it can occur before the work is on the executor. + if (blockUpdated.blockUpdatedInfo.blockId.isBroadcast) { + broadcastSem.release() + } blocksUpdated.append(blockUpdated) } }) @@ -102,19 +118,32 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext testRdd.persist() } - // Wait for all of the executors to start + // Wait for the first executor to start TestUtils.waitUntilExecutorsUp(sc = sc, - numExecutors = numExecs, + numExecutors = 1, timeout = 10000) // 10s // Start the computation of RDD - this step will also cache the RDD val asyncCount = testRdd.countAsync() - // Wait for the job to have started - sem.acquire(1) + // Wait for all of the executors to start + TestUtils.waitUntilExecutorsUp(sc = sc, + numExecutors = numExecs, + timeout = 10000) // 10s - // Give Spark a tiny bit to start the tasks after the listener says hello - Thread.sleep(50) + // Wait for the job to have started. + taskStartSem.acquire(1) + // Wait for each executor + driver to have it's broadcast info delivered. + broadcastSem.acquire((numExecs + 1)) + + // Make sure the job is either mid run or otherwise has data to migrate. + if (migrateDuring) { + // Give Spark a tiny bit to start executing after the broadcast blocks land. + // For me this works at 100, set to 300 for system variance. + Thread.sleep(300) + } else { + ThreadUtils.awaitResult(asyncCount, 15.seconds) + } // Decommission one of the executor val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] @@ -127,49 +156,58 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext // Wait for job to finish val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds) - assert(asyncCountResult === 10) - // All 10 tasks finished, so accum should have been increased 10 times - assert(accum.value === 10) + assert(asyncCountResult === numParts) + // All tasks finished, so accum should have been increased numParts times + assert(accum.value === numParts) // All tasks should be successful, nothing should have failed sc.listenerBus.waitUntilEmpty() if (shuffle) { - // 10 mappers & 10 reducers which succeeded - assert(taskEndEvents.count(_.reason == Success) === 20, - s"Expected 20 tasks got ${taskEndEvents.size} (${taskEndEvents})") + // mappers & reducers which succeeded + assert(taskEndEvents.count(_.reason == Success) === 2 * numParts, + s"Expected ${2 * numParts} tasks got ${taskEndEvents.size} (${taskEndEvents})") } else { - // 10 mappers which executed successfully - assert(taskEndEvents.count(_.reason == Success) === 10, - s"Expected 10 tasks got ${taskEndEvents.size} (${taskEndEvents})") + // only mappers which executed successfully + assert(taskEndEvents.count(_.reason == Success) === numParts, + s"Expected ${numParts} tasks got ${taskEndEvents.size} (${taskEndEvents})") } // Wait for our respective blocks to have migrated eventually(timeout(15.seconds), interval(10.milliseconds)) { if (persist) { // One of our blocks should have moved. - val blockLocs = blocksUpdated.map{ update => + val rddUpdates = blocksUpdated.filter{update => + val blockId = update.blockUpdatedInfo.blockId + blockId.isRDD} + val blockLocs = rddUpdates.map{ update => (update.blockUpdatedInfo.blockId.name, update.blockUpdatedInfo.blockManagerId)} val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.toSet.size) assert(!blocksToManagers.filter(_._2 > 1).isEmpty, - s"We should have a block that has been on multiple BMs in ${blocksUpdated}") + s"We should have a block that has been on multiple BMs in rdds:\n ${rddUpdates} from:\n" + + s"${blocksUpdated}\n but instead we got:\n ${blocksToManagers}") } // If we're migrating shuffles we look for any shuffle block updates // as there is no block update on the initial shuffle block write. if (shuffle) { - val numLocs = blocksUpdated.filter{ update => + val numDataLocs = blocksUpdated.filter{ update => + val blockId = update.blockUpdatedInfo.blockId + blockId.isInstanceOf[ShuffleDataBlockId] + }.toSet.size + val numIndexLocs = blocksUpdated.filter{ update => val blockId = update.blockUpdatedInfo.blockId - blockId.isShuffle || blockId.isInternalShuffle + blockId.isInstanceOf[ShuffleIndexBlockId] }.toSet.size - assert(numLocs > 0, s"No shuffle block updates in ${blocksUpdated}") + assert(numDataLocs >= 1, s"Expect shuffle data block updates in ${blocksUpdated}") + assert(numIndexLocs >= 1, s"Expect shuffle index block updates in ${blocksUpdated}") } } // Since the RDD is cached or shuffled so further usage of same RDD should use the // cached data. Original RDD partitions should not be recomputed i.e. accum // should have same value like before - assert(testRdd.count() === 10) - assert(accum.value === 10) + assert(testRdd.count() === numParts) + assert(accum.value === numParts) val storageStatus = sc.env.blockManager.master.getStorageStatus val execIdToBlocksMapping = storageStatus.map( @@ -178,8 +216,8 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq(), "Cache blocks should be migrated") if (persist) { - // There should still be all 10 RDD blocks cached - assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === 10) + // There should still be all the RDD blocks cached + assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === numParts) } } }