diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index b9501c3e4bbdb..3113d4a3149fc 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -139,6 +139,11 @@ private[spark] class BlockManager( private val broadcastCleaner = new MetadataCleaner( MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf) + // Field related to peer block managers that are necessary for block replication + @volatile private var cachedPeers: Seq[BlockManagerId] = _ + private val peerFetchLock = new Object + private var lastPeerFetchTime = 0L + initialize() /* The compression codec to use. Note that the "lazy" val is necessary because we want to delay @@ -822,28 +827,111 @@ private[spark] class BlockManager( } /** - * Replicate block to another node. + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { + peerFetchLock.synchronized { + val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds + val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + if (cachedPeers == null || forceFetch || timeout) { + cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode) + lastPeerFetchTime = System.currentTimeMillis + logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } + cachedPeers + } + } + + /** + * Replicate block to another node. Not that this is a blocking call that returns after + * the block has been replicated. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { + val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) + val numPeersToReplicateTo = level.replication - 1 + val peersForReplication = new ArrayBuffer[BlockManagerId] + val peersReplicatedTo = new ArrayBuffer[BlockManagerId] + val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) - if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) + val startTime = System.currentTimeMillis + val random = new Random(blockId.hashCode) + + var replicationFailed = false + var failures = 0 + var done = false + + // Get cached list of peers + peersForReplication ++= getPeers(forceFetch = false) + + // Get a random peer. Note that this selection of a peer is deterministic on the block id. + // So assuming the list of peers does not change and no replication failures, + // if there are multiple attempts in the same node to replicate the same block, + // the same set of peers will be selected. + def getRandomPeer(): Option[BlockManagerId] = { + // If replication had failed, then force update the cached list of peers and remove the peers + // that have been already used + if (replicationFailed) { + peersForReplication.clear() + peersForReplication ++= getPeers(forceFetch = true) + peersForReplication --= peersReplicatedTo + peersForReplication --= peersFailedToReplicateTo + } + if (!peersForReplication.isEmpty) { + Some(peersForReplication(random.nextInt(peersForReplication.size))) + } else { + None + } } - for (peer: BlockManagerId <- cachedPeers) { - val start = System.nanoTime - data.rewind() - logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " + - s"To node: $peer") - val putBlock = PutBlock(blockId, data, tLevel) - val cmId = new ConnectionManagerId(peer.host, peer.port) - val syncPutBlockSuccess = BlockManagerWorker.syncPutBlock(putBlock, cmId) - if (!syncPutBlockSuccess) { - logError(s"Failed to call syncPutBlock to $peer") + + // One by one choose a random peer and try uploading the block to it + // If replication fails (e.g., target peer is down), force the list of cached peers + // to be re-fetched from driver and then pick another random peer for replication. Also + // temporarily black list the peer for which replication failed. + // + // This selection of a peer and replication is continued in a loop until one of the + // following 3 conditions is fulfilled: + // (i) specified number of peers have been replicated to + // (ii) too many failures in replicating to peers + // (iii) no peer left to replicate to + // + while (!done) { + getRandomPeer() match { + case Some(peer) => + val onePeerStartTime = System.currentTimeMillis + data.rewind() + logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer") + val putBlock = PutBlock(blockId, data, tLevel) + val cmId = new ConnectionManagerId(peer.host, peer.port) + val syncPutBlockSuccess = BlockManagerWorker.syncPutBlock(putBlock, cmId) + if (syncPutBlockSuccess) { + logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %d ms" + .format((System.currentTimeMillis - onePeerStartTime))) + peersReplicatedTo += peer + peersForReplication -= peer + replicationFailed = false + if (peersReplicatedTo.size == numPeersToReplicateTo) { + done = true // specified number of peers have been replicated to + } + } else { + logWarning(s"Failed to replicate $blockId to $peer, failure #$failures") + failures += 1 + replicationFailed = true + peersFailedToReplicateTo += peer + if (failures > maxReplicationFailures) { // too many failures in replicating to peers + done = true + } + } + case None => // no peer left to replicate to + done = true } - logDebug("Replicating BlockId %s once used %fs; The size of the data is %d bytes." - .format(blockId, (System.nanoTime - start) / 1e6, data.limit())) + } + val timeTakeMs = (System.currentTimeMillis - startTime) + logTrace(s"Replicating $blockId of ${data.limit()} bytes to " + + s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms") + if (peersReplicatedTo.size < numPeersToReplicateTo) { + logWarning(s"Block $blockId replicated to only " + + s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers") } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index ffd2a4d3ef7a5..fb9305ac33943 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -62,7 +62,9 @@ class BlockManagerId private ( def nettyPort: Int = nettyPort_ - override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { + def isDriver: Boolean = (executorId == "") + + override def writeExternal(out: ObjectOutput) { out.writeUTF(executorId_) out.writeUTF(host_) out.writeInt(port_) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index e67b3dc5ce02e..c2365ca643200 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -80,13 +80,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log } /** Get ids of other nodes in the cluster from the driver */ - def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = { - val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers)) - if (result.length != numPeers) { - throw new SparkException( - "Error getting peers, only got " + result.size + " instead of " + numPeers) - } - result + def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { + askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId)) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index dc80148e137ed..ad1b68b2ac120 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -83,8 +83,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus case GetLocationsMultipleBlockIds(blockIds) => sender ! getLocationsMultipleBlockIds(blockIds) - case GetPeers(blockManagerId, size) => - sender ! getPeers(blockManagerId, size) + case GetPeers(blockManagerId) => + sender ! getPeers(blockManagerId) case GetMemoryStatus => sender ! memoryStatus @@ -173,11 +173,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus * from the executors, but not from the driver. */ private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = { - // TODO: Consolidate usages of import context.dispatcher val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver) val requiredBlockManagers = blockManagerInfo.values.filter { info => - removeFromDriver || info.blockManagerId.executorId != "" + removeFromDriver || !info.blockManagerId.isDriver } Future.sequence( requiredBlockManagers.map { bm => @@ -213,7 +212,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus val minSeenTime = now - slaveTimeout val toRemove = new mutable.HashSet[BlockManagerId] for (info <- blockManagerInfo.values) { - if (info.lastSeenMs < minSeenTime && info.blockManagerId.executorId != "") { + if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) { logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " + (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms") toRemove += info.blockManagerId @@ -233,7 +232,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus */ private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = { if (!blockManagerInfo.contains(blockManagerId)) { - blockManagerId.executorId == "" && !isLocal + blockManagerId.isDriver && !isLocal } else { blockManagerInfo(blockManagerId).updateLastSeenMs() true @@ -355,7 +354,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus tachyonSize: Long) { if (!blockManagerInfo.contains(blockManagerId)) { - if (blockManagerId.executorId == "" && !isLocal) { + if (blockManagerId.isDriver && !isLocal) { // We intentionally do not register the master (except in local mode), // so we should not indicate failure. sender ! true @@ -403,16 +402,14 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus blockIds.map(blockId => getLocations(blockId)) } - private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = { - val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray - - val selfIndex = peers.indexOf(blockManagerId) - if (selfIndex == -1) { - throw new SparkException("Self index for " + blockManagerId + " not found") + /** Get the list of the peers of the given block manager */ + private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { + val blockManagerIds = blockManagerInfo.keySet + if (blockManagerIds.contains(blockManagerId)) { + blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq + } else { + Seq.empty } - - // Note that this logic will select the same node multiple times if there aren't enough peers - Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 03ba898f038be..291ddfcc113ac 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -90,7 +90,7 @@ private[spark] object BlockManagerMessages { case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster - case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster + case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster case class RemoveExecutor(execId: String) extends ToBlockManagerMaster diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index 978a6ded80829..acaf321de52fb 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -132,7 +132,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { val statuses = bmm.getBlockStatus(blockId, askSlaves = true) assert(statuses.size === 1) statuses.head match { case (bm, status) => - assert(bm.executorId === "", "Block should only be on the driver") + assert(bm.isDriver, "Block should only be on the driver") assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK) assert(status.memSize > 0, "Block should be in memory store on the driver") assert(status.diskSize === 0, "Block should not be in disk store on the driver") diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala new file mode 100644 index 0000000000000..1f013d78060e1 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ +import scala.language.{implicitConversions, postfixOps} + +import akka.actor.{ActorSystem, Props} +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} +import org.scalatest.concurrent.Eventually._ + +import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf} +import org.apache.spark.scheduler.LiveListenerBus +import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.shuffle.hash.HashShuffleManager +import org.apache.spark.storage.StorageLevel._ +import org.apache.spark.util.AkkaUtils + +/** Testsuite that tests block replication in BlockManager */ +class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAndAfter { + + private val conf = new SparkConf(false) + var actorSystem: ActorSystem = null + var master: BlockManagerMaster = null + val securityMgr = new SecurityManager(conf) + val mapOutputTracker = new MapOutputTrackerMaster(conf) + val shuffleManager = new HashShuffleManager(conf) + + // List of block manager created during an unit test, so that all of the them can be stopped + // after the unit test. + val allStores = new ArrayBuffer[BlockManager] + + // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test + conf.set("spark.kryoserializer.buffer.mb", "1") + val serializer = new KryoSerializer(conf) + + // Implicitly convert strings to BlockIds for test clarity. + implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) + + private def makeBlockManager(maxMem: Long, name: String = ""): BlockManager = { + val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf, securityMgr, + mapOutputTracker, shuffleManager) + allStores += store + store + } + + before { + val (actorSystem, boundPort) = AkkaUtils.createActorSystem( + "test", "localhost", 0, conf = conf, securityManager = securityMgr) + this.actorSystem = actorSystem + + conf.set("spark.authenticate", "false") + conf.set("spark.driver.port", boundPort.toString) + conf.set("spark.storage.unrollFraction", "0.4") + conf.set("spark.storage.unrollMemoryThreshold", "512") + + // to make a replication attempt to inactive store fail fast + conf.set("spark.core.connection.ack.wait.timeout", "1") + // to make cached peers refresh frequently + conf.set("spark.storage.cachedPeersTtl", "10") + + master = new BlockManagerMaster( + actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), + conf) + allStores.clear() + } + + after { + allStores.foreach { _.stop() } + allStores.clear() + actorSystem.shutdown() + actorSystem.awaitTermination() + actorSystem = null + master = null + } + + + test("get peers with addition and removal of block managers") { + val numStores = 4 + val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, s"store$i") } + val storeIds = stores.map { _.blockManagerId }.toSet + assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId }) + assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId }) + assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId }) + + // Add driver store and test whether it is filtered out + val driverStore = makeBlockManager(1000, "") + assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver)) + assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver)) + assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver)) + + // Add a new store and test whether get peers returns it + val newStore = makeBlockManager(1000, s"store$numStores") + assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId } + newStore.blockManagerId) + assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId } + newStore.blockManagerId) + assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId } + newStore.blockManagerId) + assert(master.getPeers(newStore.blockManagerId).toSet === storeIds) + + // Remove a store and test whether get peers returns it + val storeIdToRemove = stores(0).blockManagerId + master.removeExecutor(storeIdToRemove.executorId) + assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove)) + + // Test whether asking for peers of a unregistered block manager id returns empty list + assert(master.getPeers(stores(0).blockManagerId).isEmpty) + assert(master.getPeers(BlockManagerId("", "", 1, 0)).isEmpty) + } + + test("block replication - 2x replication") { + testReplication(2, + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2) + ) + } + + test("block replication - 3x replication") { + // Generate storage levels with 3x replication + val storageLevels = { + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map { + level => StorageLevel( + level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 3) + } + } + testReplication(3, storageLevels) + } + + test("block replication - mixed between 1x to 5x") { + // Generate storage levels with varying replication + val storageLevels = Seq( + //MEMORY_ONLY, + MEMORY_ONLY_SER_2, + StorageLevel(true, false, false, false, 3) + //StorageLevel(true, true, false, true, 4), + //StorageLevel(true, true, false, false, 5), + //StorageLevel(true, true, false, true, 4), + //StorageLevel(true, false, false, false, 3), + //MEMORY_ONLY_SER_2, + //MEMORY_ONLY + ) + testReplication(5, storageLevels) + } + + test("block replication - 2x replication without peers") { + intercept[org.scalatest.exceptions.TestFailedException] { + testReplication(1, + Seq(StorageLevel.MEMORY_AND_DISK_2, StorageLevel(true, false, false, false, 3))) + } + } + + test("block replication - deterministic node selection") { + val blockSize = 1000 + val storeSize = 10000 + val stores = (1 to 5).map { + i => makeBlockManager(storeSize, s"store$i") + } + val storageLevel2x = StorageLevel.MEMORY_AND_DISK_2 + val storageLevel3x = StorageLevel(true, true, false, true, 3) + val storageLevel4x = StorageLevel(true, true, false, true, 4) + + def putBlockAndGetLocations(blockId: String, level: StorageLevel): Set[BlockManagerId] = { + stores.head.putSingle(blockId, new Array[Byte](blockSize), level) + val locations = master.getLocations(blockId).sortBy { _.executorId }.toSet + stores.foreach { _.removeBlock(blockId) } + master.removeBlock(blockId) + locations + } + + // Test if two attempts to 2x replication returns same set of locations + val a1Locs = putBlockAndGetLocations("a1", storageLevel2x) + assert(putBlockAndGetLocations("a1", storageLevel2x) === a1Locs, + "Inserting a 2x replicated block second time gave different locations from the first") + + // Test if two attempts to 3x replication returns same set of locations + val a2Locs3x = putBlockAndGetLocations("a2", storageLevel3x) + assert(putBlockAndGetLocations("a2", storageLevel3x) === a2Locs3x, + "Inserting a 3x replicated block second time gave different locations from the first") + + // Test if 2x replication of a2 returns a strict subset of the locations of 3x replication + val a2Locs2x = putBlockAndGetLocations("a2", storageLevel2x) + assert( + a2Locs2x.subsetOf(a2Locs3x), + "Inserting a with 2x replication gave locations that are not a subset of locations" + + s" with 3x replication [3x: ${a2Locs3x.mkString(",")}; 2x: ${a2Locs2x.mkString(",")}" + ) + + // Test if 4x replication of a2 returns a strict superset of the locations of 3x replication + val a2Locs4x = putBlockAndGetLocations("a2", storageLevel4x) + assert( + a2Locs3x.subsetOf(a2Locs4x), + "Inserting a with 4x replication gave locations that are not a superset of locations " + + s"with 3x replication [3x: ${a2Locs3x.mkString(",")}; 4x: ${a2Locs4x.mkString(",")}" + ) + + // Test if 3x replication of two different blocks gives two different sets of locations + val a3Locs3x = putBlockAndGetLocations("a3", storageLevel3x) + assert(a3Locs3x !== a2Locs3x, "Two blocks gave same locations with 3x replication") + } + + test("block replication - replication failures") { + /* + Create a system of three block managers / stores. One of them (say, failableStore) + cannot receive blocks. So attempts to use that as replication target fails. + + +-----------/fails/-----------> failableStore + | + normalStore + | + +-----------/works/-----------> anotherNormalStore + + We are first going to add a normal block manager (i.e. normalStore) and the failable block + manager (i.e. failableStore), and test whether 2x replication fails to create two + copies of a block. Then we are going to add another normal block manager + (i.e., anotherNormalStore), and test that now 2x replication works as the + new store will be used for replication. + */ + + // Add a normal block manager + val store = makeBlockManager(10000, "store") + + // Insert a block with 2x replication and return the number of copies of the block + def replicateAndGetNumCopies(blockId: String): Int = { + store.putSingle(blockId, new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK_2) + val numLocations = master.getLocations(blockId).size + allStores.foreach { _.removeBlock(blockId) } + numLocations + } + + // Add a failable block manager with a mock transfer service that does not + // allow receiving of blocks. So attempts to use it as a replication target will fail. + val failableStore = new BlockManager("failable-store", actorSystem, master, serializer, + 10000, conf, securityMgr, mapOutputTracker, shuffleManager) + failableStore.connectionManager.stop() // To disable any transfer to this store + allStores += failableStore // so that this gets stopped after test + assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId)) + + // Test that 2x replication fails by creating only one copy of the block + assert(replicateAndGetNumCopies("a1") === 1) + + // Add another normal block manager and test that 2x replication works + makeBlockManager(10000, "anotherStore") + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + assert(replicateAndGetNumCopies("a2") === 2) + } + } + + test("block replication - addition and deletion of block managers") { + val blockSize = 1000 + val storeSize = 10000 + val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") } + + // Insert a block with given replication factor and return the number of copies of the block\ + def replicateAndGetNumCopies(blockId: String, replicationFactor: Int): Int = { + val storageLevel = StorageLevel(true, true, false, true, replicationFactor) + initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel) + val numLocations = master.getLocations(blockId).size + allStores.foreach { _.removeBlock(blockId) } + numLocations + } + + // 2x replication should work, 3x replication should only replicate 2x + assert(replicateAndGetNumCopies("a1", 2) === 2) + assert(replicateAndGetNumCopies("a2", 3) === 2) + + // Add another store, 3x replication should work now, 4x replication should only replicate 3x + val newStore1 = makeBlockManager(storeSize, s"newstore1") + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + assert(replicateAndGetNumCopies("a3", 3) === 3) + } + assert(replicateAndGetNumCopies("a4", 4) === 3) + + // Add another store, 4x replication should work now + val newStore2 = makeBlockManager(storeSize, s"newstore2") + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + assert(replicateAndGetNumCopies("a5", 4) === 4) + } + + // Remove all but the 1st store, 2x replication should fail + (initialStores.tail ++ Seq(newStore1, newStore2)).foreach { + store => + master.removeExecutor(store.blockManagerId.executorId) + store.stop() + } + assert(replicateAndGetNumCopies("a6", 2) === 1) + + // Add new stores, 3x replication should work + val newStores = (3 to 5).map { + i => makeBlockManager(storeSize, s"newstore$i") + } + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + assert(replicateAndGetNumCopies("a7", 3) === 3) + } + } + + /** + * Test replication of blocks with different storage levels (various combinations of + * memory, disk & serialization). For each storage level, this function tests every store + * whether the block is present and also tests the master whether its knowledge of blocks + * is correct. Then it also drops the block from memory of each store (using LRU) and + * again checks whether the master's knowledge gets updated. + */ + private def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) { + import org.apache.spark.storage.StorageLevel._ + + assert(maxReplication > 1, + s"Cannot test replication factor $maxReplication") + + // storage levels to test with the given replication factor + + val storeSize = 10000 + val blockSize = 1000 + + // As many stores as the replication factor + val stores = (1 to maxReplication).map { + i => makeBlockManager(storeSize, s"store$i") + } + + storageLevels.foreach { storageLevel => + // Put the block into one of the stores + val blockId = new TestBlockId( + "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase) + stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) + + // Assert that master know two locations for the block + val blockLocations = master.getLocations(blockId).map(_.executorId).toSet + assert(blockLocations.size === storageLevel.replication, + s"master did not have ${storageLevel.replication} locations for $blockId, " + blockLocations) + + // Test state of the stores that contain the block + stores.filter { + testStore => blockLocations.contains(testStore.blockManagerId.executorId) + }.foreach { testStore => + val testStoreName = testStore.blockManagerId.executorId + assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName") + assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName), + s"master does not have status for ${blockId.name} in $testStoreName") + + val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId) + + // Assert that block status in the master for this store has expected storage level + assert( + blockStatus.storageLevel.useDisk === storageLevel.useDisk && + blockStatus.storageLevel.useMemory === storageLevel.useMemory && + blockStatus.storageLevel.useOffHeap === storageLevel.useOffHeap && + blockStatus.storageLevel.deserialized === storageLevel.deserialized, + s"master does not know correct storage level for ${blockId.name} in $testStoreName") + + // Assert that the block status in the master for this store has correct memory usage info + assert(!blockStatus.storageLevel.useMemory || blockStatus.memSize >= blockSize, + s"master does not know size of ${blockId.name} stored in memory of $testStoreName") + + + // If the block is supposed to be in memory, then drop the copy of the block in + // this store test whether master is updated with zero memory usage this store + if (storageLevel.useMemory) { + // Force the block to be dropped by adding a number of dummy blocks + (1 to 10).foreach { + i => + testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER) + } + (1 to 10).foreach { + i => testStore.removeBlock(s"dummy-block-$i") + } + + val newBlockStatusOption = master.getBlockStatus(blockId).get(testStore.blockManagerId) + + // Assert that the block status in the master either does not exist (block removed + // from every store) or has zero memory usage for this store + assert( + newBlockStatusOption.isEmpty || newBlockStatusOption.get.memSize === 0, + s"after dropping, master does not know size of ${blockId.name} " + + s"stored in memory of $testStoreName" + ) + } + + // If the block is supposed to be in disk (after dropping or otherwise, then + // test whether master has correct disk usage for this store + if (storageLevel.useDisk) { + assert(master.getBlockStatus(blockId)(testStore.blockManagerId).diskSize >= blockSize, + s"after dropping, master does not know size of ${blockId.name} " + + s"stored in disk of $testStoreName" + ) + } + } + master.removeBlock(blockId) + } + } +} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index f32ce6f9fcc7f..48c45bfe6b3e9 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -189,7 +189,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter store = makeBlockManager(2000, "exec1") store2 = makeBlockManager(2000, "exec2") - val peers = master.getPeers(store.blockManagerId, 1) + val peers = master.getPeers(store.blockManagerId) assert(peers.size === 1, "master did not return the other manager as a peer") assert(peers.head === store2.blockManagerId, "peer returned by master is not the other manager") @@ -448,7 +448,6 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter val list2DiskGet = store.get("list2disk") assert(list2DiskGet.isDefined, "list2memory expected to be in store") assert(list2DiskGet.get.data.size === 3) - System.out.println(list2DiskGet) // We don't know the exact size of the data on disk, but it should certainly be > 0. assert(list2DiskGet.get.inputMetrics.bytesRead > 0) assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk)