From af0c1daea8a22bca3b7826322205c887370ce247 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 10 Sep 2014 19:54:31 -0700 Subject: [PATCH 01/16] Added replication unit tests to BlockManagerSuite --- .../spark/storage/BlockManagerSuite.scala | 115 ++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index e251660dae5de..2444b79ca3734 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1228,4 +1228,119 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) } + + test("block replication - 2x") { + testReplication(2) + } + + test("block replication - 3x") { + testReplication(3) + } + + test("block replication - 4x") { + testReplication(4) + } + + /** + * Test replication of blocks with different storage levels (various combinations of + * memory, disk & serialization). For each storage level, this function tests every store + * whether the block is present and also tests the master whether its knowledge of blocks + * is correct. Then it also drops the block from memory of each store (using LRU) and + * again checks whether the master's knowledge gets updated. + */ + def testReplication(replicationFactor: Int) { + import org.apache.spark.storage.StorageLevel._ + + assert(replicationFactor > 1, + s"ReplicationTester cannot test replication factor $replicationFactor") + + // storage levels to test with the given replication factor + val storageLevels = { + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map { + level => StorageLevel( + level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, replicationFactor) + } + } + + val storeSize = 10000 + val blockSize = 1000 + + // As many stores as the replication factor + val stores = (1 to replicationFactor).map { + i => makeBlockManager(storeSize, s"store$i") + } + + try { + storageLevels.foreach {storageLevel => + + // Put the block into one of the stores + val blockId = new TestBlockId( + "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase) + stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) + + // Assert that master know two locations for the block + assert(master.getLocations(blockId).size === replicationFactor, + s"master did not have $replicationFactor locations for $blockId") + + // Test state of the store for the block + stores.foreach { testStore => + val testStoreName = testStore.blockManagerId.executorId + assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName") + assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName), + s"master does not have status for ${blockId.name} in $testStoreName") + + val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId) + + // Assert that block status in the master for this store has expected storage level + assert( + blockStatus.storageLevel.useDisk === storageLevel.useDisk && + blockStatus.storageLevel.useMemory === storageLevel.useMemory && + blockStatus.storageLevel.useOffHeap === storageLevel.useOffHeap && + blockStatus.storageLevel.deserialized === storageLevel.deserialized, + s"master does not know correct storage level for ${blockId.name} in $testStoreName") + + // Assert that the block status in the master for this store has correct memory usage info + assert(!blockStatus.storageLevel.useMemory || blockStatus.memSize >= blockSize, + s"master does not know size of ${blockId.name} stored in memory of $testStoreName") + + + // If the block is supposed to be in memory, then drop the copy of the block in + // this store test whether master is updated with zero memory usage this store + if (storageLevel.useMemory) { + // Force the block to be dropped by adding a number of dummy blocks + (1 to 10).foreach { i => + testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER) + } + (1 to 10).foreach { i => testStore.removeBlock(s"dummy-block-$i") } + + val newBlockStatusOption = master.getBlockStatus(blockId).get(testStore.blockManagerId) + + // Assert that the block status in the master either does not exist (block removed + // from every store) or has zero memory usage for this store + assert( + newBlockStatusOption.isEmpty || newBlockStatusOption.get.memSize === 0, + s"after dropping, master does not know size of ${blockId.name} " + + s"stored in memory of $testStoreName" + ) + } + + // If the block is supposed to be in disk (after dropping or otherwise, then + // test whether master has correct disk usage for this store + if (storageLevel.useDisk) { + assert(master.getBlockStatus(blockId)(testStore.blockManagerId).diskSize >= blockSize, + s"after dropping, master does not know size of ${blockId.name} " + + s"stored in disk of $testStoreName" + ) + } + } + master.removeBlock(blockId) + } + + } finally { + stores.foreach { _.stop() } + master.stop() // to make sure that this master cannot used further in the unit tests + master = null + } + } } + From 9f0ac9fb20660ff183490d13f4e3195b9283bc61 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 11 Sep 2014 01:44:18 -0700 Subject: [PATCH 02/16] Modified replication tests to fail on replication bug. --- .../spark/storage/BlockManagerSuite.scala | 61 +++++++++++++------ 1 file changed, 41 insertions(+), 20 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 2444b79ca3734..4c5fb9ce04604 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -47,6 +47,12 @@ import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils} +import org.apache.spark.storage.StorageLevel._ +import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat +import org.apache.spark.storage.BroadcastBlockId +import org.apache.spark.storage.RDDBlockId +import org.apache.spark.storage.ShuffleBlockId +import org.apache.spark.storage.TestBlockId class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter @@ -120,7 +126,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter System.clearProperty("spark.test.useCompressedOops") } - + /* test("StorageLevel object caching") { val level1 = StorageLevel(false, false, false, false, 3) val level2 = StorageLevel(false, false, false, false, 3) // this should return the same object as level1 @@ -1228,17 +1234,36 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) } - + */ test("block replication - 2x") { - testReplication(2) + testReplication(2, + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2) + ) } test("block replication - 3x") { - testReplication(3) + val storageLevels = { + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map { + level => StorageLevel( + level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 3) + } + } + testReplication(3, storageLevels) } - test("block replication - 4x") { - testReplication(4) + test("block replication - mixed between 1x to 5x") { + val storageLevels = Seq( + MEMORY_ONLY, + MEMORY_ONLY_SER_2, + StorageLevel(true, true, false, true, 3), + StorageLevel(true, false, false, false, 4), + StorageLevel(false, false, false, false, 5), + StorageLevel(true, false, false, false, 4), + StorageLevel(true, true, false, true, 3), + MEMORY_ONLY_SER_2, + MEMORY_ONLY + ) + testReplication(3, storageLevels) } /** @@ -1248,25 +1273,19 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter * is correct. Then it also drops the block from memory of each store (using LRU) and * again checks whether the master's knowledge gets updated. */ - def testReplication(replicationFactor: Int) { + def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) { import org.apache.spark.storage.StorageLevel._ - assert(replicationFactor > 1, - s"ReplicationTester cannot test replication factor $replicationFactor") + assert(maxReplication > 1, + s"Cannot test replication factor $maxReplication") // storage levels to test with the given replication factor - val storageLevels = { - Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map { - level => StorageLevel( - level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, replicationFactor) - } - } val storeSize = 10000 val blockSize = 1000 // As many stores as the replication factor - val stores = (1 to replicationFactor).map { + val stores = (1 to maxReplication).map { i => makeBlockManager(storeSize, s"store$i") } @@ -1279,11 +1298,13 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) // Assert that master know two locations for the block - assert(master.getLocations(blockId).size === replicationFactor, - s"master did not have $replicationFactor locations for $blockId") + val blockLocations = master.getLocations(blockId).map(_.executorId).toSet + assert(blockLocations.size === storageLevel.replication, + s"master did not have ${storageLevel.replication} locations for $blockId") - // Test state of the store for the block - stores.foreach { testStore => + // Test state of the stores that contain the block + stores.filter(testStore => blockLocations.contains(testStore.blockManagerId.executorId)) + .foreach { testStore => val testStoreName = testStore.blockManagerId.executorId assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName") assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName), From d081bf60e87689994a006603f84cb8f22ab19c6a Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 11 Sep 2014 13:58:14 -0700 Subject: [PATCH 03/16] Fixed bug in get peers and unit tests to test get-peers and replication under executor churn. --- .../apache/spark/storage/BlockManager.scala | 37 +++++- .../spark/storage/BlockManagerMaster.scala | 9 +- .../storage/BlockManagerMasterActor.scala | 24 ++-- .../spark/storage/BlockManagerMessages.scala | 2 +- .../spark/storage/BlockManagerSuite.scala | 124 +++++++++++++++--- 5 files changed, 155 insertions(+), 41 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index d1bee3d2c033c..933830ee5621c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -37,6 +37,7 @@ import org.apache.spark.network._ import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.util._ +import scala.collection.mutable private[spark] sealed trait BlockValues @@ -111,7 +112,8 @@ private[spark] class BlockManager( MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf) private val broadcastCleaner = new MetadataCleaner( MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf) - + private val cachedPeers = new ArrayBuffer[BlockManagerId] + private var lastPeerFetchTime = 0L initialize() /* The compression codec to use. Note that the "lazy" val is necessary because we want to delay @@ -786,20 +788,42 @@ private[spark] class BlockManager( updatedBlocks } + /** + * Get peer block managers in the system. + */ + private def getPeers(numPeers: Int): Seq[BlockManagerId] = cachedPeers.synchronized { + val currentTime = System.currentTimeMillis + // If cache is empty or has insufficient number of peers, fetch from master + if (cachedPeers.isEmpty || numPeers > cachedPeers.size) { + cachedPeers.clear() + cachedPeers ++= master.getPeers(blockManagerId) + logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + lastPeerFetchTime = System.currentTimeMillis + } + if (numPeers > cachedPeers.size) { + // if enough peers cannot be provided, return all of them + logDebug(s"Not enough peers - cached peers = ${cachedPeers.size}, required peers = $numPeers") + cachedPeers + } else { + cachedPeers.take(numPeers) + } + } + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) - if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) + val selectedPeers = getPeers(level.replication - 1) + if (selectedPeers.size < level.replication - 1) { + logWarning(s"Failed to replicate block to ${level.replication - 1} peer(s) " + + s"as only ${selectedPeers.size} peer(s) were found") } - for (peer: BlockManagerId <- cachedPeers) { + selectedPeers.foreach { peer => val start = System.nanoTime data.rewind() - logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " + + logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} bytes. " + s"To node: $peer") try { @@ -808,6 +832,7 @@ private[spark] class BlockManager( } catch { case e: Exception => logError(s"Failed to replicate block to $peer", e) + cachedPeers.synchronized { cachedPeers.clear() } } logDebug("Replicating BlockId %s once used %fs; The size of the data is %d bytes." diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 2e262594b3538..d08e1419e3e41 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -84,13 +84,8 @@ class BlockManagerMaster( } /** Get ids of other nodes in the cluster from the driver */ - def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = { - val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers)) - if (result.length != numPeers) { - throw new SparkException( - "Error getting peers, only got " + result.size + " instead of " + numPeers) - } - result + def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { + askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId)) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 1a6c7cb24f9ac..98d4194db10ed 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -83,8 +83,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus case GetLocationsMultipleBlockIds(blockIds) => sender ! getLocationsMultipleBlockIds(blockIds) - case GetPeers(blockManagerId, size) => - sender ! getPeers(blockManagerId, size) + case GetPeers(blockManagerId) => + sender ! getPeers(blockManagerId) case GetMemoryStatus => sender ! memoryStatus @@ -403,16 +403,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus blockIds.map(blockId => getLocations(blockId)) } - private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = { - val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray - - val selfIndex = peers.indexOf(blockManagerId) + /** Get the list of the peers of the given block manager */ + private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { + val blockManagerIds = blockManagerInfo.keySet.toArray + val selfIndex = blockManagerIds.indexOf(blockManagerId) if (selfIndex == -1) { - throw new SparkException("Self index for " + blockManagerId + " not found") + logError("Self index for " + blockManagerId + " not found") + Seq.empty + } else { + // If the blockManagerIds is [ id1 id2 id3 id4 id5 ] and the blockManagerId is id2 + // Then this code will return the list [ id3 id4 id5 id1 ] + Array.tabulate[BlockManagerId](blockManagerIds.size - 1) { i => + blockManagerIds((selfIndex + i + 1) % blockManagerIds.size) + } } - - // Note that this logic will select the same node multiple times if there aren't enough peers - Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 2ba16b8476600..3db5dd9774ae8 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -88,7 +88,7 @@ private[spark] object BlockManagerMessages { case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster - case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster + case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster case class RemoveExecutor(execId: String) extends ToBlockManagerMaster diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 4c5fb9ce04604..6a15c607beaa9 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -49,10 +49,6 @@ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils} import org.apache.spark.storage.StorageLevel._ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat -import org.apache.spark.storage.BroadcastBlockId -import org.apache.spark.storage.RDDBlockId -import org.apache.spark.storage.ShuffleBlockId -import org.apache.spark.storage.TestBlockId class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter @@ -126,7 +122,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter System.clearProperty("spark.test.useCompressedOops") } - /* + test("StorageLevel object caching") { val level1 = StorageLevel(false, false, false, false, 3) val level2 = StorageLevel(false, false, false, false, 3) // this should return the same object as level1 @@ -195,7 +191,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter store = makeBlockManager(2000, "exec1") store2 = makeBlockManager(2000, "exec2") - val peers = master.getPeers(store.blockManagerId, 1) + val peers = master.getPeers(store.blockManagerId) assert(peers.size === 1, "master did not return the other manager as a peer") assert(peers.head === store2.blockManagerId, "peer returned by master is not the other manager") @@ -454,7 +450,6 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter val list2DiskGet = store.get("list2disk") assert(list2DiskGet.isDefined, "list2memory expected to be in store") assert(list2DiskGet.get.data.size === 3) - System.out.println(list2DiskGet) // We don't know the exact size of the data on disk, but it should certainly be > 0. assert(list2DiskGet.get.inputMetrics.bytesRead > 0) assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk) @@ -1234,7 +1229,44 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) } - */ + + test("get peers with store addition and removal") { + val numStores = 4 + val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, s"store$i") } + try { + val storeIds = stores.map { _.blockManagerId }.toSet + assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId }) + assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId }) + assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId }) + + // Add a new store and test whether get peers returns it + val newStore = makeBlockManager(1000, s"store$numStores") + assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId } + newStore.blockManagerId) + assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId } + newStore.blockManagerId) + assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId } + newStore.blockManagerId) + assert(master.getPeers(newStore.blockManagerId).toSet === storeIds) + + // Remove a store and test whether get peers returns it + val storeIdToRemove = stores(0).blockManagerId + master.removeExecutor(storeIdToRemove.executorId) + assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove)) + + // Test whether asking for peers of a unregistered block manager id returns empty list + assert(master.getPeers(stores(0).blockManagerId).isEmpty) + assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty) + } finally { + stores.foreach { _.stop() } + } + } + test("block replication - 2x") { testReplication(2, Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2) @@ -1242,6 +1274,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("block replication - 3x") { + // Generate storage levels with 3x replication val storageLevels = { Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map { level => StorageLevel( @@ -1252,18 +1285,77 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("block replication - mixed between 1x to 5x") { + // Generate storage levels with varying replication val storageLevels = Seq( MEMORY_ONLY, MEMORY_ONLY_SER_2, - StorageLevel(true, true, false, true, 3), - StorageLevel(true, false, false, false, 4), - StorageLevel(false, false, false, false, 5), - StorageLevel(true, false, false, false, 4), - StorageLevel(true, true, false, true, 3), + StorageLevel(true, false, false, false, 3), + StorageLevel(true, true, false, true, 4), + StorageLevel(true, true, false, false, 5), + StorageLevel(true, true, false, true, 4), + StorageLevel(true, false, false, false, 3), MEMORY_ONLY_SER_2, MEMORY_ONLY ) - testReplication(3, storageLevels) + testReplication(5, storageLevels) + } + + test("block replication with addition and removal of executors") { + val blockSize = 1000 + val storeSize = 10000 + val allStores = new ArrayBuffer[BlockManager]() + + try { + val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") } + allStores ++= initialStores + + // 2x replication works + initialStores(0).putSingle("a1", new Array[Byte](blockSize), StorageLevel.MEMORY_AND_DISK_2) + assert(master.getLocations("a1").size === 2) + + // 3x replication should only replicate 2x + initialStores(0).putSingle( + "a2", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 3)) + assert(master.getLocations("a2").size === 2) + + val newStore1 = makeBlockManager(storeSize, s"newstore1") + allStores += newStore1 + + // 3x replication should work now + initialStores(0).putSingle( + "a3", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 3)) + assert(master.getLocations("a3").size === 3) + + // 4x replication should only replicate 3x + initialStores(0).putSingle( + "a4", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 4)) + assert(master.getLocations("a4").size === 3) + + val newStore2 = makeBlockManager(storeSize, s"newstore2") + allStores += newStore2 + + // 4x replication should work now + initialStores(0).putSingle( + "a5", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 4)) + assert(master.getLocations("a5").size === 4) + + // Remove all the stores and add new stores + (initialStores ++ Seq(newStore1, newStore2)).map { store => + store.blockManagerId.executorId + }.foreach { execId => + master.removeExecutor(execId) + } + + // Add new stores and test if replication works + val newStores = (3 to 5).map { i => makeBlockManager(storeSize, s"newstore$i") } + allStores ++= newStores + + newStores(0).putSingle( + "a6", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 3)) + assert(master.getLocations("a6").size === 3) + } finally { + allStores.foreach { _.stop() } + } } /** @@ -1273,7 +1365,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter * is correct. Then it also drops the block from memory of each store (using LRU) and * again checks whether the master's knowledge gets updated. */ - def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) { + private def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) { import org.apache.spark.storage.StorageLevel._ assert(maxReplication > 1, @@ -1291,7 +1383,6 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter try { storageLevels.foreach {storageLevel => - // Put the block into one of the stores val blockId = new TestBlockId( "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase) @@ -1364,4 +1455,3 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } } } - From 03de02d532f51b23bc1b79fc76115aacbd64a4b1 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 11 Sep 2014 17:46:02 -0700 Subject: [PATCH 04/16] Change replication logic to correctly refetch peers from master on failure and on new worker addition. --- .../apache/spark/storage/BlockManager.scala | 104 ++++++++++++------ core/src/test/resources/log4j.properties | 2 +- .../spark/storage/BlockManagerSuite.scala | 65 ++++++----- 3 files changed, 101 insertions(+), 70 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 933830ee5621c..352ce5e41a3c1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -112,8 +112,9 @@ private[spark] class BlockManager( MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf) private val broadcastCleaner = new MetadataCleaner( MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf) - private val cachedPeers = new ArrayBuffer[BlockManagerId] + private val cachedPeers = new mutable.HashSet[BlockManagerId] private var lastPeerFetchTime = 0L + initialize() /* The compression codec to use. Note that the "lazy" val is necessary because we want to delay @@ -791,52 +792,85 @@ private[spark] class BlockManager( /** * Get peer block managers in the system. */ - private def getPeers(numPeers: Int): Seq[BlockManagerId] = cachedPeers.synchronized { - val currentTime = System.currentTimeMillis - // If cache is empty or has insufficient number of peers, fetch from master - if (cachedPeers.isEmpty || numPeers > cachedPeers.size) { - cachedPeers.clear() - cachedPeers ++= master.getPeers(blockManagerId) - logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) - lastPeerFetchTime = System.currentTimeMillis - } - if (numPeers > cachedPeers.size) { - // if enough peers cannot be provided, return all of them - logDebug(s"Not enough peers - cached peers = ${cachedPeers.size}, required peers = $numPeers") - cachedPeers - } else { - cachedPeers.take(numPeers) + private def getPeers(forceFetch: Boolean): mutable.Set[BlockManagerId] = { + val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 1000) // milliseconds + def timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + + cachedPeers.synchronized { + if (cachedPeers.isEmpty || forceFetch || timeout) { + cachedPeers.clear() + cachedPeers ++= master.getPeers(blockManagerId) + lastPeerFetchTime = System.currentTimeMillis + logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } } + cachedPeers } /** * Replicate block to another node. */ private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { + val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) + val numPeersToReplicateTo = level.replication - 1 + val peersReplicatedTo = new mutable.HashSet[BlockManagerId] + val peersFailedToReplicateTo = new mutable.HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) - val selectedPeers = getPeers(level.replication - 1) - if (selectedPeers.size < level.replication - 1) { - logWarning(s"Failed to replicate block to ${level.replication - 1} peer(s) " + - s"as only ${selectedPeers.size} peer(s) were found") + val startTime = System.nanoTime + + var forceFetchPeers = false + var failures = 0 + var done = false + + // Get a random peer + def getRandomPeer(): Option[BlockManagerId] = { + val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo + if (!peers.isEmpty) Some(peers.toSeq(Random.nextInt(peers.size))) else None } - selectedPeers.foreach { peer => - val start = System.nanoTime - data.rewind() - logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} bytes. " + - s"To node: $peer") - try { - blockTransferService.uploadBlockSync( - peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) - } catch { - case e: Exception => - logError(s"Failed to replicate block to $peer", e) - cachedPeers.synchronized { cachedPeers.clear() } + // One by one choose a random peer and try uploading the block to it + // If replication fails (e.g., target peer is down), force the list of cached peers + // to be re-fetched from driver and then pick another random peer for replication. Also + // temporarily black list the peer for which replication failed. + while (!done) { + getRandomPeer() match { + case Some(peer) => + try { + val onePeerStartTime = System.nanoTime + data.rewind() + logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer") + blockTransferService.uploadBlockSync( + peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) + logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms" + .format((System.nanoTime - onePeerStartTime) / 1e6)) + peersReplicatedTo += peer + forceFetchPeers = false + if (peersReplicatedTo.size == numPeersToReplicateTo) { + done = true + } + } catch { + case e: Exception => + logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e) + failures += 1 + forceFetchPeers = true + peersFailedToReplicateTo += peer + if (failures > maxReplicationFailures) { + done = true + } + } + case None => + // no peer left to replicate to + done = true } - - logDebug("Replicating BlockId %s once used %fs; The size of the data is %d bytes." - .format(blockId, (System.nanoTime - start) / 1e6, data.limit())) + } + if (peersReplicatedTo.size < numPeersToReplicateTo) { + logError(s"Replicated $blockId of ${data.limit()} bytes to only " + + s"${peersReplicatedTo.size} peer(s) instead of ${numPeersToReplicateTo} " + + s"in ${(System.nanoTime - startTime) / 1e6} ms") + } else { + logDebug(s"Successfully replicated $blockId of ${data.limit()} bytes to " + + s"${peersReplicatedTo.size} peer(s) in ${(System.nanoTime - startTime) / 1e6} ms") } } diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index 26b73a1b39744..21b3d3389ea84 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -16,7 +16,7 @@ # # Set everything to be logged to the file core/target/unit-tests.log -log4j.rootCategory=INFO, file +log4j.rootCategory=DEBUG, file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false log4j.appender.file.file=target/unit-tests.log diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 6a15c607beaa9..2deb553f5dd65 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -91,7 +91,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter conf.set("spark.driver.port", boundPort.toString) conf.set("spark.storage.unrollFraction", "0.4") conf.set("spark.storage.unrollMemoryThreshold", "512") - + conf.set("spark.core.connection.ack.wait.timeout", "1") + conf.set("spark.storage.cachedPeersTtl", "10") master = new BlockManagerMaster( actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), conf, true) @@ -1300,59 +1301,55 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter testReplication(5, storageLevels) } - test("block replication with addition and removal of executors") { + test("block replication with addition and deletion of executors") { val blockSize = 1000 val storeSize = 10000 val allStores = new ArrayBuffer[BlockManager]() + try { val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") } allStores ++= initialStores - // 2x replication works - initialStores(0).putSingle("a1", new Array[Byte](blockSize), StorageLevel.MEMORY_AND_DISK_2) - assert(master.getLocations("a1").size === 2) + def testPut(blockId: String, storageLevel: StorageLevel, expectedNumLocations: Int) { + initialStores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) + assert(master.getLocations(blockId).size === expectedNumLocations) + master.removeBlock(blockId) + } - // 3x replication should only replicate 2x - initialStores(0).putSingle( - "a2", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 3)) - assert(master.getLocations("a2").size === 2) + // 2x replication should work, 3x replication should only replicate 2x + testPut("a1", StorageLevel.MEMORY_AND_DISK_2, 2) + testPut("a2", StorageLevel(true, true, false, true, 3), 2) + // Add another store, 3x replication should work now, 4x replication should only replicate 3x val newStore1 = makeBlockManager(storeSize, s"newstore1") allStores += newStore1 + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + testPut("a3", StorageLevel(true, true, false, true, 3), 3) + } + testPut("a4",StorageLevel(true, true, false, true, 4), 3) - // 3x replication should work now - initialStores(0).putSingle( - "a3", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 3)) - assert(master.getLocations("a3").size === 3) - - // 4x replication should only replicate 3x - initialStores(0).putSingle( - "a4", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 4)) - assert(master.getLocations("a4").size === 3) - + // Add another store, 4x replication should work now val newStore2 = makeBlockManager(storeSize, s"newstore2") allStores += newStore2 + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + testPut("a5", StorageLevel(true, true, false, true, 4), 4) + } - // 4x replication should work now - initialStores(0).putSingle( - "a5", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 4)) - assert(master.getLocations("a5").size === 4) - - // Remove all the stores and add new stores - (initialStores ++ Seq(newStore1, newStore2)).map { store => - store.blockManagerId.executorId - }.foreach { execId => - master.removeExecutor(execId) + // Remove all but the 1st store, 2x replication should fail + (initialStores.slice(1, initialStores.size) ++ Seq(newStore1, newStore2)).foreach { + store => + master.removeExecutor(store.blockManagerId.executorId) + store.stop() } + testPut("a6", StorageLevel.MEMORY_AND_DISK_2, 1) - // Add new stores and test if replication works + // Add new stores, 3x replication should work val newStores = (3 to 5).map { i => makeBlockManager(storeSize, s"newstore$i") } allStores ++= newStores - - newStores(0).putSingle( - "a6", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 3)) - assert(master.getLocations("a6").size === 3) + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + testPut("a7", StorageLevel(true, true, false, true, 3), 3) + } } finally { allStores.foreach { _.stop() } } From 7598f913c52728f25b6bce91dd9ae6879105e261 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 11 Sep 2014 17:52:16 -0700 Subject: [PATCH 05/16] Minor changes. --- core/src/test/resources/log4j.properties | 2 +- .../scala/org/apache/spark/storage/BlockManagerSuite.scala | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index 21b3d3389ea84..26b73a1b39744 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -16,7 +16,7 @@ # # Set everything to be logged to the file core/target/unit-tests.log -log4j.rootCategory=DEBUG, file +log4j.rootCategory=INFO, file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false log4j.appender.file.file=target/unit-tests.log diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 2deb553f5dd65..1ec405e4a5e6c 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -91,8 +91,12 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter conf.set("spark.driver.port", boundPort.toString) conf.set("spark.storage.unrollFraction", "0.4") conf.set("spark.storage.unrollMemoryThreshold", "512") + + // for block replication test, to make a replication attempt to inactive store fail fast conf.set("spark.core.connection.ack.wait.timeout", "1") + // for block replication test, to make cached peers refresh frequently conf.set("spark.storage.cachedPeersTtl", "10") + master = new BlockManagerMaster( actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), conf, true) From 4a205314cf1fc9a7a413bc1fb066fe2bb2c21932 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 11 Sep 2014 18:38:54 -0700 Subject: [PATCH 06/16] Filtered driver block manager from peer list, and also consolidated the use of in BlockManager. --- .../apache/spark/storage/BlockManagerId.scala | 2 + .../storage/BlockManagerMasterActor.scala | 11 +- .../spark/broadcast/BroadcastSuite.scala | 2 +- .../spark/storage/BlockManagerSuite.scala | 270 +++++++++--------- 4 files changed, 140 insertions(+), 145 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index d4487fce49ab6..94c5e5e8b9acd 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -59,6 +59,8 @@ class BlockManagerId private ( def port: Int = port_ + def isDriver = (executorId == "") + override def writeExternal(out: ObjectOutput) { out.writeUTF(executorId_) out.writeUTF(host_) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 98d4194db10ed..6ab75b4d1e609 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -173,11 +173,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus * from the executors, but not from the driver. */ private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = { - // TODO: Consolidate usages of import context.dispatcher val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver) val requiredBlockManagers = blockManagerInfo.values.filter { info => - removeFromDriver || info.blockManagerId.executorId != "" + removeFromDriver || !info.blockManagerId.isDriver } Future.sequence( requiredBlockManagers.map { bm => @@ -212,7 +211,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus val minSeenTime = now - slaveTimeout val toRemove = new mutable.HashSet[BlockManagerId] for (info <- blockManagerInfo.values) { - if (info.lastSeenMs < minSeenTime && info.blockManagerId.executorId != "") { + if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) { logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " + (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms") toRemove += info.blockManagerId @@ -232,7 +231,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus */ private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = { if (!blockManagerInfo.contains(blockManagerId)) { - blockManagerId.executorId == "" && !isLocal + blockManagerId.isDriver && !isLocal } else { blockManagerInfo(blockManagerId).updateLastSeenMs() true @@ -355,7 +354,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus tachyonSize: Long) { if (!blockManagerInfo.contains(blockManagerId)) { - if (blockManagerId.executorId == "" && !isLocal) { + if (blockManagerId.isDriver && !isLocal) { // We intentionally do not register the master (except in local mode), // so we should not indicate failure. sender ! true @@ -405,7 +404,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus /** Get the list of the peers of the given block manager */ private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { - val blockManagerIds = blockManagerInfo.keySet.toArray + val blockManagerIds = blockManagerInfo.keySet.filterNot { _.isDriver }.toArray val selfIndex = blockManagerIds.indexOf(blockManagerId) if (selfIndex == -1) { logError("Self index for " + blockManagerId + " not found") diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index 978a6ded80829..acaf321de52fb 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -132,7 +132,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { val statuses = bmm.getBlockStatus(blockId, askSlaves = true) assert(statuses.size === 1) statuses.head match { case (bm, status) => - assert(bm.executorId === "", "Block should only be on the driver") + assert(bm.isDriver, "Block should only be on the driver") assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK) assert(status.memSize > 0, "Block should be in memory store on the driver") assert(status.diskSize === 0, "Block should not be in disk store on the driver") diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 1ec405e4a5e6c..6a632c10fe15d 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -64,6 +64,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter val securityMgr = new SecurityManager(conf) val mapOutputTracker = new MapOutputTrackerMaster(conf) val shuffleManager = new HashShuffleManager(conf) + val allStores = new ArrayBuffer[BlockManager] // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test conf.set("spark.kryoserializer.buffer.mb", "1") @@ -75,8 +76,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter private def makeBlockManager(maxMem: Long, name: String = ""): BlockManager = { val transfer = new NioBlockTransferService(conf, securityMgr) - new BlockManager(name, actorSystem, master, serializer, maxMem, conf, + val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf, mapOutputTracker, shuffleManager, transfer) + allStores += store + store } before { @@ -101,6 +104,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), conf, true) + allStores.clear() + val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() } @@ -114,6 +119,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter store2.stop() store2 = null } + allStores.foreach { _.stop() } + allStores.clear() actorSystem.shutdown() actorSystem.awaitTermination() actorSystem = null @@ -1238,38 +1245,40 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter test("get peers with store addition and removal") { val numStores = 4 val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, s"store$i") } - try { - val storeIds = stores.map { _.blockManagerId }.toSet - assert(master.getPeers(stores(0).blockManagerId).toSet === - storeIds.filterNot { _ == stores(0).blockManagerId }) - assert(master.getPeers(stores(1).blockManagerId).toSet === - storeIds.filterNot { _ == stores(1).blockManagerId }) - assert(master.getPeers(stores(2).blockManagerId).toSet === - storeIds.filterNot { _ == stores(2).blockManagerId }) - - // Add a new store and test whether get peers returns it - val newStore = makeBlockManager(1000, s"store$numStores") - assert(master.getPeers(stores(0).blockManagerId).toSet === - storeIds.filterNot { _ == stores(0).blockManagerId } + newStore.blockManagerId) - assert(master.getPeers(stores(1).blockManagerId).toSet === - storeIds.filterNot { _ == stores(1).blockManagerId } + newStore.blockManagerId) - assert(master.getPeers(stores(2).blockManagerId).toSet === - storeIds.filterNot { _ == stores(2).blockManagerId } + newStore.blockManagerId) - assert(master.getPeers(newStore.blockManagerId).toSet === storeIds) - - // Remove a store and test whether get peers returns it - val storeIdToRemove = stores(0).blockManagerId - master.removeExecutor(storeIdToRemove.executorId) - assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove)) - assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove)) - assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove)) - - // Test whether asking for peers of a unregistered block manager id returns empty list - assert(master.getPeers(stores(0).blockManagerId).isEmpty) - assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty) - } finally { - stores.foreach { _.stop() } - } + val storeIds = stores.map { _.blockManagerId }.toSet + assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId }) + assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId }) + assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId }) + + // Add driver store and test whether it is filtered out + val driverStore = makeBlockManager(1000, "") + assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver)) + assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver)) + assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver)) + + // Add a new store and test whether get peers returns it + val newStore = makeBlockManager(1000, s"store$numStores") + assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId } + newStore.blockManagerId) + assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId } + newStore.blockManagerId) + assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId } + newStore.blockManagerId) + assert(master.getPeers(newStore.blockManagerId).toSet === storeIds) + + // Remove a store and test whether get peers returns it + val storeIdToRemove = stores(0).blockManagerId + master.removeExecutor(storeIdToRemove.executorId) + assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove)) + + // Test whether asking for peers of a unregistered block manager id returns empty list + assert(master.getPeers(stores(0).blockManagerId).isEmpty) + assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty) } test("block replication - 2x") { @@ -1308,54 +1317,46 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter test("block replication with addition and deletion of executors") { val blockSize = 1000 val storeSize = 10000 - val allStores = new ArrayBuffer[BlockManager]() + val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") } - - try { - val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") } - allStores ++= initialStores - - def testPut(blockId: String, storageLevel: StorageLevel, expectedNumLocations: Int) { + def testPut(blockId: String, storageLevel: StorageLevel, expectedNumLocations: Int) { + try { initialStores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) assert(master.getLocations(blockId).size === expectedNumLocations) + } finally { master.removeBlock(blockId) } + } - // 2x replication should work, 3x replication should only replicate 2x - testPut("a1", StorageLevel.MEMORY_AND_DISK_2, 2) - testPut("a2", StorageLevel(true, true, false, true, 3), 2) + // 2x replication should work, 3x replication should only replicate 2x + testPut("a1", StorageLevel.MEMORY_AND_DISK_2, 2) + testPut("a2", StorageLevel(true, true, false, true, 3), 2) - // Add another store, 3x replication should work now, 4x replication should only replicate 3x - val newStore1 = makeBlockManager(storeSize, s"newstore1") - allStores += newStore1 - eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { - testPut("a3", StorageLevel(true, true, false, true, 3), 3) - } - testPut("a4",StorageLevel(true, true, false, true, 4), 3) + // Add another store, 3x replication should work now, 4x replication should only replicate 3x + val newStore1 = makeBlockManager(storeSize, s"newstore1") + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + testPut("a3", StorageLevel(true, true, false, true, 3), 3) + } + testPut("a4",StorageLevel(true, true, false, true, 4), 3) - // Add another store, 4x replication should work now - val newStore2 = makeBlockManager(storeSize, s"newstore2") - allStores += newStore2 - eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { - testPut("a5", StorageLevel(true, true, false, true, 4), 4) - } + // Add another store, 4x replication should work now + val newStore2 = makeBlockManager(storeSize, s"newstore2") + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + testPut("a5", StorageLevel(true, true, false, true, 4), 4) + } - // Remove all but the 1st store, 2x replication should fail - (initialStores.slice(1, initialStores.size) ++ Seq(newStore1, newStore2)).foreach { - store => - master.removeExecutor(store.blockManagerId.executorId) - store.stop() - } - testPut("a6", StorageLevel.MEMORY_AND_DISK_2, 1) + // Remove all but the 1st store, 2x replication should fail + (initialStores.slice(1, initialStores.size) ++ Seq(newStore1, newStore2)).foreach { + store => + master.removeExecutor(store.blockManagerId.executorId) + store.stop() + } + testPut("a6", StorageLevel.MEMORY_AND_DISK_2, 1) - // Add new stores, 3x replication should work - val newStores = (3 to 5).map { i => makeBlockManager(storeSize, s"newstore$i") } - allStores ++= newStores - eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { - testPut("a7", StorageLevel(true, true, false, true, 3), 3) - } - } finally { - allStores.foreach { _.stop() } + // Add new stores, 3x replication should work + val newStores = (3 to 5).map { i => makeBlockManager(storeSize, s"newstore$i") } + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + testPut("a7", StorageLevel(true, true, false, true, 3), 3) } } @@ -1382,77 +1383,70 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter i => makeBlockManager(storeSize, s"store$i") } - try { - storageLevels.foreach {storageLevel => - // Put the block into one of the stores - val blockId = new TestBlockId( - "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase) - stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) - - // Assert that master know two locations for the block - val blockLocations = master.getLocations(blockId).map(_.executorId).toSet - assert(blockLocations.size === storageLevel.replication, - s"master did not have ${storageLevel.replication} locations for $blockId") - - // Test state of the stores that contain the block - stores.filter(testStore => blockLocations.contains(testStore.blockManagerId.executorId)) - .foreach { testStore => - val testStoreName = testStore.blockManagerId.executorId - assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName") - assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName), - s"master does not have status for ${blockId.name} in $testStoreName") - - val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId) - - // Assert that block status in the master for this store has expected storage level - assert( - blockStatus.storageLevel.useDisk === storageLevel.useDisk && - blockStatus.storageLevel.useMemory === storageLevel.useMemory && - blockStatus.storageLevel.useOffHeap === storageLevel.useOffHeap && - blockStatus.storageLevel.deserialized === storageLevel.deserialized, - s"master does not know correct storage level for ${blockId.name} in $testStoreName") - - // Assert that the block status in the master for this store has correct memory usage info - assert(!blockStatus.storageLevel.useMemory || blockStatus.memSize >= blockSize, - s"master does not know size of ${blockId.name} stored in memory of $testStoreName") - - - // If the block is supposed to be in memory, then drop the copy of the block in - // this store test whether master is updated with zero memory usage this store - if (storageLevel.useMemory) { - // Force the block to be dropped by adding a number of dummy blocks - (1 to 10).foreach { i => - testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER) - } - (1 to 10).foreach { i => testStore.removeBlock(s"dummy-block-$i") } - - val newBlockStatusOption = master.getBlockStatus(blockId).get(testStore.blockManagerId) - - // Assert that the block status in the master either does not exist (block removed - // from every store) or has zero memory usage for this store - assert( - newBlockStatusOption.isEmpty || newBlockStatusOption.get.memSize === 0, - s"after dropping, master does not know size of ${blockId.name} " + - s"stored in memory of $testStoreName" - ) + storageLevels.foreach {storageLevel => + // Put the block into one of the stores + val blockId = new TestBlockId( + "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase) + stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) + + // Assert that master know two locations for the block + val blockLocations = master.getLocations(blockId).map(_.executorId).toSet + assert(blockLocations.size === storageLevel.replication, + s"master did not have ${storageLevel.replication} locations for $blockId") + + // Test state of the stores that contain the block + stores.filter(testStore => blockLocations.contains(testStore.blockManagerId.executorId)) + .foreach { testStore => + val testStoreName = testStore.blockManagerId.executorId + assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName") + assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName), + s"master does not have status for ${blockId.name} in $testStoreName") + + val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId) + + // Assert that block status in the master for this store has expected storage level + assert( + blockStatus.storageLevel.useDisk === storageLevel.useDisk && + blockStatus.storageLevel.useMemory === storageLevel.useMemory && + blockStatus.storageLevel.useOffHeap === storageLevel.useOffHeap && + blockStatus.storageLevel.deserialized === storageLevel.deserialized, + s"master does not know correct storage level for ${blockId.name} in $testStoreName") + + // Assert that the block status in the master for this store has correct memory usage info + assert(!blockStatus.storageLevel.useMemory || blockStatus.memSize >= blockSize, + s"master does not know size of ${blockId.name} stored in memory of $testStoreName") + + + // If the block is supposed to be in memory, then drop the copy of the block in + // this store test whether master is updated with zero memory usage this store + if (storageLevel.useMemory) { + // Force the block to be dropped by adding a number of dummy blocks + (1 to 10).foreach { i => + testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER) } + (1 to 10).foreach { i => testStore.removeBlock(s"dummy-block-$i") } - // If the block is supposed to be in disk (after dropping or otherwise, then - // test whether master has correct disk usage for this store - if (storageLevel.useDisk) { - assert(master.getBlockStatus(blockId)(testStore.blockManagerId).diskSize >= blockSize, - s"after dropping, master does not know size of ${blockId.name} " + - s"stored in disk of $testStoreName" - ) - } + val newBlockStatusOption = master.getBlockStatus(blockId).get(testStore.blockManagerId) + + // Assert that the block status in the master either does not exist (block removed + // from every store) or has zero memory usage for this store + assert( + newBlockStatusOption.isEmpty || newBlockStatusOption.get.memSize === 0, + s"after dropping, master does not know size of ${blockId.name} " + + s"stored in memory of $testStoreName" + ) } - master.removeBlock(blockId) - } - } finally { - stores.foreach { _.stop() } - master.stop() // to make sure that this master cannot used further in the unit tests - master = null + // If the block is supposed to be in disk (after dropping or otherwise, then + // test whether master has correct disk usage for this store + if (storageLevel.useDisk) { + assert(master.getBlockStatus(blockId)(testStore.blockManagerId).diskSize >= blockSize, + s"after dropping, master does not know size of ${blockId.name} " + + s"stored in disk of $testStoreName" + ) + } + } + master.removeBlock(blockId) } } } From d402506e72b87bb8c8b6505fb60662c693af170c Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 11 Sep 2014 18:53:10 -0700 Subject: [PATCH 07/16] Fixed imports. --- .../scala/org/apache/spark/storage/BlockManager.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 352ce5e41a3c1..18c2b1c6fbab7 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -22,7 +22,7 @@ import java.nio.{ByteBuffer, MappedByteBuffer} import scala.concurrent.ExecutionContext.Implicits.global -import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.util.Random @@ -37,7 +37,6 @@ import org.apache.spark.network._ import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.util._ -import scala.collection.mutable private[spark] sealed trait BlockValues @@ -112,7 +111,7 @@ private[spark] class BlockManager( MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf) private val broadcastCleaner = new MetadataCleaner( MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf) - private val cachedPeers = new mutable.HashSet[BlockManagerId] + private val cachedPeers = new HashSet[BlockManagerId] private var lastPeerFetchTime = 0L initialize() @@ -792,7 +791,7 @@ private[spark] class BlockManager( /** * Get peer block managers in the system. */ - private def getPeers(forceFetch: Boolean): mutable.Set[BlockManagerId] = { + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 1000) // milliseconds def timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl @@ -813,8 +812,8 @@ private[spark] class BlockManager( private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) val numPeersToReplicateTo = level.replication - 1 - val peersReplicatedTo = new mutable.HashSet[BlockManagerId] - val peersFailedToReplicateTo = new mutable.HashSet[BlockManagerId] + val peersReplicatedTo = new HashSet[BlockManagerId] + val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) val startTime = System.nanoTime From 08e5646251e41419c8b2f9d4336d910fc0e6bc24 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 11 Sep 2014 19:02:07 -0700 Subject: [PATCH 08/16] More minor changes. --- .../src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- .../scala/org/apache/spark/storage/BlockManagerSuite.scala | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 18c2b1c6fbab7..465c72ef685c4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -793,7 +793,7 @@ private[spark] class BlockManager( */ private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 1000) // milliseconds - def timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl cachedPeers.synchronized { if (cachedPeers.isEmpty || forceFetch || timeout) { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 6a632c10fe15d..6a739cea43623 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -46,9 +46,8 @@ import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat -import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils} import org.apache.spark.storage.StorageLevel._ -import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat +import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils} class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter From 3821ab971bcc85b182288f9039bf38da0acedece Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 22 Sep 2014 01:42:21 +0530 Subject: [PATCH 09/16] Fixes based on PR comments. --- .../apache/spark/storage/BlockManager.scala | 2 +- .../storage/BlockManagerMasterActor.scala | 14 +-- .../spark/storage/BlockManagerSuite.scala | 105 ++++++++++-------- 3 files changed, 62 insertions(+), 59 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 465c72ef685c4..dec8d93b98226 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -792,7 +792,7 @@ private[spark] class BlockManager( * Get peer block managers in the system. */ private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { - val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 1000) // milliseconds + val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl cachedPeers.synchronized { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 6ab75b4d1e609..6a06257ed0c08 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -404,17 +404,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus /** Get the list of the peers of the given block manager */ private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { - val blockManagerIds = blockManagerInfo.keySet.filterNot { _.isDriver }.toArray - val selfIndex = blockManagerIds.indexOf(blockManagerId) - if (selfIndex == -1) { - logError("Self index for " + blockManagerId + " not found") - Seq.empty + val blockManagerIds = blockManagerInfo.keySet + if (blockManagerIds.contains(blockManagerId)) { + blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq } else { - // If the blockManagerIds is [ id1 id2 id3 id4 id5 ] and the blockManagerId is id2 - // Then this code will return the list [ id3 id4 id5 id1 ] - Array.tabulate[BlockManagerId](blockManagerIds.size - 1) { i => - blockManagerIds((selfIndex + i + 1) % blockManagerIds.size) - } + Seq.empty } } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 6a739cea43623..6313be27b2568 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -63,6 +63,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter val securityMgr = new SecurityManager(conf) val mapOutputTracker = new MapOutputTrackerMaster(conf) val shuffleManager = new HashShuffleManager(conf) + + // List of block manager created during an unit test, so that all of the them can be stopped + // after the unit test. val allStores = new ArrayBuffer[BlockManager] // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test @@ -1241,7 +1244,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) } - test("get peers with store addition and removal") { + test("get peers with addition and removal of block managers") { val numStores = 4 val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, s"store$i") } val storeIds = stores.map { _.blockManagerId }.toSet @@ -1313,11 +1316,17 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter testReplication(5, storageLevels) } - test("block replication with addition and deletion of executors") { + test("block replication with addition and deletion of block managers") { val blockSize = 1000 val storeSize = 10000 val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") } + /** + * Function to test whether insert a block with replication achieves the expected replication. + * Since this function can be call on the same block id repeatedly through an `eventually`, + * it needs to be ensured that the method leaves block manager + master in the same state as + * it was before attempting to insert the block. + */ def testPut(blockId: String, storageLevel: StorageLevel, expectedNumLocations: Int) { try { initialStores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) @@ -1345,7 +1354,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } // Remove all but the 1st store, 2x replication should fail - (initialStores.slice(1, initialStores.size) ++ Seq(newStore1, newStore2)).foreach { + (initialStores.tail ++ Seq(newStore1, newStore2)).foreach { store => master.removeExecutor(store.blockManagerId.executorId) store.stop() @@ -1394,57 +1403,57 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter s"master did not have ${storageLevel.replication} locations for $blockId") // Test state of the stores that contain the block - stores.filter(testStore => blockLocations.contains(testStore.blockManagerId.executorId)) + stores.filter { testStore => blockLocations.contains(testStore.blockManagerId.executorId) } .foreach { testStore => - val testStoreName = testStore.blockManagerId.executorId - assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName") - assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName), - s"master does not have status for ${blockId.name} in $testStoreName") - - val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId) - - // Assert that block status in the master for this store has expected storage level - assert( - blockStatus.storageLevel.useDisk === storageLevel.useDisk && - blockStatus.storageLevel.useMemory === storageLevel.useMemory && - blockStatus.storageLevel.useOffHeap === storageLevel.useOffHeap && - blockStatus.storageLevel.deserialized === storageLevel.deserialized, - s"master does not know correct storage level for ${blockId.name} in $testStoreName") - - // Assert that the block status in the master for this store has correct memory usage info - assert(!blockStatus.storageLevel.useMemory || blockStatus.memSize >= blockSize, - s"master does not know size of ${blockId.name} stored in memory of $testStoreName") - - - // If the block is supposed to be in memory, then drop the copy of the block in - // this store test whether master is updated with zero memory usage this store - if (storageLevel.useMemory) { - // Force the block to be dropped by adding a number of dummy blocks - (1 to 10).foreach { i => - testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER) - } - (1 to 10).foreach { i => testStore.removeBlock(s"dummy-block-$i") } + val testStoreName = testStore.blockManagerId.executorId + assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName") + assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName), + s"master does not have status for ${blockId.name} in $testStoreName") - val newBlockStatusOption = master.getBlockStatus(blockId).get(testStore.blockManagerId) + val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId) - // Assert that the block status in the master either does not exist (block removed - // from every store) or has zero memory usage for this store + // Assert that block status in the master for this store has expected storage level assert( - newBlockStatusOption.isEmpty || newBlockStatusOption.get.memSize === 0, - s"after dropping, master does not know size of ${blockId.name} " + - s"stored in memory of $testStoreName" - ) - } + blockStatus.storageLevel.useDisk === storageLevel.useDisk && + blockStatus.storageLevel.useMemory === storageLevel.useMemory && + blockStatus.storageLevel.useOffHeap === storageLevel.useOffHeap && + blockStatus.storageLevel.deserialized === storageLevel.deserialized, + s"master does not know correct storage level for ${blockId.name} in $testStoreName") + + // Assert that the block status in the master for this store has correct memory usage info + assert(!blockStatus.storageLevel.useMemory || blockStatus.memSize >= blockSize, + s"master does not know size of ${blockId.name} stored in memory of $testStoreName") + + + // If the block is supposed to be in memory, then drop the copy of the block in + // this store test whether master is updated with zero memory usage this store + if (storageLevel.useMemory) { + // Force the block to be dropped by adding a number of dummy blocks + (1 to 10).foreach { i => + testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER) + } + (1 to 10).foreach { i => testStore.removeBlock(s"dummy-block-$i") } + + val newBlockStatusOption = master.getBlockStatus(blockId).get(testStore.blockManagerId) + + // Assert that the block status in the master either does not exist (block removed + // from every store) or has zero memory usage for this store + assert( + newBlockStatusOption.isEmpty || newBlockStatusOption.get.memSize === 0, + s"after dropping, master does not know size of ${blockId.name} " + + s"stored in memory of $testStoreName" + ) + } - // If the block is supposed to be in disk (after dropping or otherwise, then - // test whether master has correct disk usage for this store - if (storageLevel.useDisk) { - assert(master.getBlockStatus(blockId)(testStore.blockManagerId).diskSize >= blockSize, - s"after dropping, master does not know size of ${blockId.name} " + - s"stored in disk of $testStoreName" - ) + // If the block is supposed to be in disk (after dropping or otherwise, then + // test whether master has correct disk usage for this store + if (storageLevel.useDisk) { + assert(master.getBlockStatus(blockId)(testStore.blockManagerId).diskSize >= blockSize, + s"after dropping, master does not know size of ${blockId.name} " + + s"stored in disk of $testStoreName" + ) + } } - } master.removeBlock(blockId) } } From 08afaa94e0672ae60bee6737c040ec0d9de9d268 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 22 Sep 2014 05:18:42 +0530 Subject: [PATCH 10/16] Made peer selection for replication deterministic to block id --- .../apache/spark/storage/BlockManager.scala | 5 +- .../spark/storage/BlockManagerSuite.scala | 47 +++++++++++++++++-- 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index dec8d93b98226..62b28ec70ad26 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -798,7 +798,7 @@ private[spark] class BlockManager( cachedPeers.synchronized { if (cachedPeers.isEmpty || forceFetch || timeout) { cachedPeers.clear() - cachedPeers ++= master.getPeers(blockManagerId) + cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) lastPeerFetchTime = System.currentTimeMillis logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) } @@ -817,6 +817,7 @@ private[spark] class BlockManager( val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) val startTime = System.nanoTime + val random = new Random(blockId.hashCode) var forceFetchPeers = false var failures = 0 @@ -825,7 +826,7 @@ private[spark] class BlockManager( // Get a random peer def getRandomPeer(): Option[BlockManagerId] = { val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo - if (!peers.isEmpty) Some(peers.toSeq(Random.nextInt(peers.size))) else None + if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) else None } // One by one choose a random peer and try uploading the block to it diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 6313be27b2568..7cb8f2ea12a08 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1283,13 +1283,13 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty) } - test("block replication - 2x") { + test("block replication - 2x replication") { testReplication(2, Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2) ) } - test("block replication - 3x") { + test("block replication - 3x replication") { // Generate storage levels with 3x replication val storageLevels = { Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map { @@ -1316,7 +1316,44 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter testReplication(5, storageLevels) } - test("block replication with addition and deletion of block managers") { + test("block replication - deterministic node selection") { + val blockSize = 1000 + val storeSize = 10000 + val stores = (1 to 5).map { i => makeBlockManager(storeSize, s"store$i") } + val storageLevel2x = StorageLevel.MEMORY_AND_DISK_2 + val storageLevel3x = StorageLevel(true, true, false, true, 3) + val storageLevel4x = StorageLevel(true, true, false, true, 4) + + def putBlockAndGetLocations(blockId: String, level: StorageLevel): Set[BlockManagerId] = { + stores.head.putSingle(blockId, new Array[Byte](blockSize), level) + val locations = master.getLocations(blockId).sortBy { _.executorId }.toSet + stores.foreach { _.removeBlock(blockId) } + master.removeBlock(blockId) + locations + } + + val a1Locs = putBlockAndGetLocations("a1", storageLevel2x) + assert(putBlockAndGetLocations("a1", storageLevel2x) === a1Locs, + "Inserting a 2x replicated block second time gave different locations from the first") + + val a2Locs3x = putBlockAndGetLocations("a2", storageLevel3x) + assert(putBlockAndGetLocations("a2", storageLevel3x) === a2Locs3x, + "Inserting a 3x replicated block second time gave different locations from the first") + val a2Locs2x = putBlockAndGetLocations("a2", storageLevel2x) + assert( + a2Locs2x.subsetOf(a2Locs3x), + "Inserting a with 2x replication gave locations that are not a subset of locations" + + s" with 3x replication [3x: ${a2Locs3x.mkString(",")}; 2x: ${a2Locs2x.mkString(",")}" + ) + val a2Locs4x = putBlockAndGetLocations("a2", storageLevel4x) + assert( + a2Locs3x.subsetOf(a2Locs4x), + "Inserting a with 4x replication gave locations that are not a superset of locations " + + s"with 3x replication [3x: ${a2Locs3x.mkString(",")}; 4x: ${a2Locs4x.mkString(",")}" + ) + } + + test("block replication - addition and deletion of block managers") { val blockSize = 1000 val storeSize = 10000 val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") } @@ -1329,10 +1366,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter */ def testPut(blockId: String, storageLevel: StorageLevel, expectedNumLocations: Int) { try { - initialStores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) + initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel) assert(master.getLocations(blockId).size === expectedNumLocations) } finally { - master.removeBlock(blockId) + allStores.foreach { _.removeBlock(blockId) } } } From 68e2c72fe5b390b9f03e79b84a6f05507af13d2a Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 23 Sep 2014 18:40:42 +0530 Subject: [PATCH 11/16] Made replication peer selection logic more efficient. --- .../apache/spark/storage/BlockManager.scala | 61 +++++++++++++------ .../spark/storage/BlockManagerSuite.scala | 17 ++++++ 2 files changed, 60 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 62b28ec70ad26..62e09df4e655c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -22,7 +22,8 @@ import java.nio.{ByteBuffer, MappedByteBuffer} import scala.concurrent.ExecutionContext.Implicits.global -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.mutable +import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.util.Random @@ -111,7 +112,7 @@ private[spark] class BlockManager( MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf) private val broadcastCleaner = new MetadataCleaner( MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf) - private val cachedPeers = new HashSet[BlockManagerId] + private val cachedPeers = new mutable.HashSet[BlockManagerId] private var lastPeerFetchTime = 0L initialize() @@ -791,11 +792,10 @@ private[spark] class BlockManager( /** * Get peer block managers in the system. */ - private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { - val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds - val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl - + private def getPeers(forceFetch: Boolean): mutable.HashSet[BlockManagerId] = { cachedPeers.synchronized { + val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds + val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl if (cachedPeers.isEmpty || forceFetch || timeout) { cachedPeers.clear() cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) @@ -812,27 +812,52 @@ private[spark] class BlockManager( private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) val numPeersToReplicateTo = level.replication - 1 - val peersReplicatedTo = new HashSet[BlockManagerId] - val peersFailedToReplicateTo = new HashSet[BlockManagerId] + val peersForReplication = new ArrayBuffer[BlockManagerId] + val peersReplicatedTo = new ArrayBuffer[BlockManagerId] + val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) val startTime = System.nanoTime val random = new Random(blockId.hashCode) - var forceFetchPeers = false + var replicationFailed = false var failures = 0 var done = false - // Get a random peer + // Get cached list of peers + peersForReplication ++= getPeers(forceFetch = false) + + // Get a random peer. Note that this selection of a peer is deterministic on the block id. + // So assuming the list of peers does not change and no replication failures, + // if there are multiple attempts in the same node to replicate the same block, + // the same set of peers will be selected. def getRandomPeer(): Option[BlockManagerId] = { - val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo - if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) else None + // If replication had failed, then force update the cached list of peers and remove the peers + // that have been already used + if (replicationFailed) { + peersForReplication.clear() + peersForReplication ++= getPeers(forceFetch = true) + peersForReplication --= peersReplicatedTo + peersForReplication --= peersFailedToReplicateTo + } + if (!peersForReplication.isEmpty) { + Some(peersForReplication(random.nextInt(peersForReplication.size))) + } else { + None + } } // One by one choose a random peer and try uploading the block to it // If replication fails (e.g., target peer is down), force the list of cached peers // to be re-fetched from driver and then pick another random peer for replication. Also // temporarily black list the peer for which replication failed. + // + // This selection of a peer and replication is continued in a loop until one of the + // following 3 conditions is fulfilled: + // (i) specified number of peers have been replicated to + // (ii) too many failures in replicating to peers + // (iii) no peer left to replicate to + // while (!done) { getRandomPeer() match { case Some(peer) => @@ -845,22 +870,22 @@ private[spark] class BlockManager( logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms" .format((System.nanoTime - onePeerStartTime) / 1e6)) peersReplicatedTo += peer - forceFetchPeers = false + peersForReplication -= peer + replicationFailed = false if (peersReplicatedTo.size == numPeersToReplicateTo) { - done = true + done = true // specified number of peers have been replicated to } } catch { case e: Exception => logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e) failures += 1 - forceFetchPeers = true + replicationFailed = true peersFailedToReplicateTo += peer - if (failures > maxReplicationFailures) { + if (failures > maxReplicationFailures) { // too many failures in replcating to peers done = true } } - case None => - // no peer left to replicate to + case None => // no peer left to replicate to done = true } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 7cb8f2ea12a08..0d4312e3ed4c2 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1316,6 +1316,13 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter testReplication(5, storageLevels) } + test("block replication - 2x replication without peers") { + intercept[org.scalatest.exceptions.TestFailedException] { + testReplication(1, + Seq(StorageLevel.MEMORY_AND_DISK_2, StorageLevel(true, false, false, false, 3))) + } + } + test("block replication - deterministic node selection") { val blockSize = 1000 val storeSize = 10000 @@ -1332,25 +1339,35 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter locations } + // Test if two attempts to 2x replication returns same set of locations val a1Locs = putBlockAndGetLocations("a1", storageLevel2x) assert(putBlockAndGetLocations("a1", storageLevel2x) === a1Locs, "Inserting a 2x replicated block second time gave different locations from the first") + // Test if two attempts to 3x replication returns same set of locations val a2Locs3x = putBlockAndGetLocations("a2", storageLevel3x) assert(putBlockAndGetLocations("a2", storageLevel3x) === a2Locs3x, "Inserting a 3x replicated block second time gave different locations from the first") + + // Test if 2x replication of a2 returns a strict subset of the locations of 3x replication val a2Locs2x = putBlockAndGetLocations("a2", storageLevel2x) assert( a2Locs2x.subsetOf(a2Locs3x), "Inserting a with 2x replication gave locations that are not a subset of locations" + s" with 3x replication [3x: ${a2Locs3x.mkString(",")}; 2x: ${a2Locs2x.mkString(",")}" ) + + // Test if 4x replication of a2 returns a strict superset of the locations of 3x replication val a2Locs4x = putBlockAndGetLocations("a2", storageLevel4x) assert( a2Locs3x.subsetOf(a2Locs4x), "Inserting a with 4x replication gave locations that are not a superset of locations " + s"with 3x replication [3x: ${a2Locs3x.mkString(",")}; 4x: ${a2Locs4x.mkString(",")}" ) + + // Test if 3x replication of two different blocks gives two different sets of locations + val a3Locs3x = putBlockAndGetLocations("a3", storageLevel3x) + assert(a3Locs3x !== a2Locs3x, "Two blocks gave same locations with 3x replication") } test("block replication - addition and deletion of block managers") { From 89f91a0109bd7d8b988bcaf800170bd82e1678f7 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 23 Sep 2014 22:22:40 +0530 Subject: [PATCH 12/16] Minor change. --- .../org/apache/spark/storage/BlockManager.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 62e09df4e655c..39a099e67bb81 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -862,13 +862,13 @@ private[spark] class BlockManager( getRandomPeer() match { case Some(peer) => try { - val onePeerStartTime = System.nanoTime + val onePeerStartTime = System.currentTimeMillis data.rewind() logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer") blockTransferService.uploadBlockSync( peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms" - .format((System.nanoTime - onePeerStartTime) / 1e6)) + .format((System.currentTimeMillis - onePeerStartTime) / 1e3)) peersReplicatedTo += peer peersForReplication -= peer replicationFailed = false @@ -889,13 +889,12 @@ private[spark] class BlockManager( done = true } } + val timeTakeMs = (System.currentTimeMillis - startTime) / 1e3 + logDebug(s"Replicating $blockId of ${data.limit()} bytes to " + + s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms") if (peersReplicatedTo.size < numPeersToReplicateTo) { - logError(s"Replicated $blockId of ${data.limit()} bytes to only " + - s"${peersReplicatedTo.size} peer(s) instead of ${numPeersToReplicateTo} " + - s"in ${(System.nanoTime - startTime) / 1e6} ms") - } else { - logDebug(s"Successfully replicated $blockId of ${data.limit()} bytes to " + - s"${peersReplicatedTo.size} peer(s) in ${(System.nanoTime - startTime) / 1e6} ms") + logWarning(s"Block $blockId replicated to only " + + s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers") } } From 012afa32f0f009e43eb6da28036087cc5264a7b3 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sat, 27 Sep 2014 04:38:12 +0530 Subject: [PATCH 13/16] Bug fix --- .../org/apache/spark/storage/BlockManager.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 39a099e67bb81..529072f156b0d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -112,7 +112,8 @@ private[spark] class BlockManager( MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf) private val broadcastCleaner = new MetadataCleaner( MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf) - private val cachedPeers = new mutable.HashSet[BlockManagerId] + @volatile private var cachedPeers: Seq[BlockManagerId] = _ + private val peerFetchLock = new Object private var lastPeerFetchTime = 0L initialize() @@ -792,18 +793,17 @@ private[spark] class BlockManager( /** * Get peer block managers in the system. */ - private def getPeers(forceFetch: Boolean): mutable.HashSet[BlockManagerId] = { - cachedPeers.synchronized { + private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { + peerFetchLock.synchronized { val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl - if (cachedPeers.isEmpty || forceFetch || timeout) { - cachedPeers.clear() - cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) + if (cachedPeers == null || forceFetch || timeout) { + cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode) lastPeerFetchTime = System.currentTimeMillis logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) } + cachedPeers } - cachedPeers } /** From a55a65ce5f423d0ddd9ca0808947ad7eeea29daf Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 30 Sep 2014 02:49:12 -0700 Subject: [PATCH 14/16] Added a unit test to test replication behavior. --- .../apache/spark/storage/BlockManager.scala | 5 +- .../spark/storage/BlockManagerSuite.scala | 85 ++++++++++++++----- 2 files changed, 69 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 529072f156b0d..a1ff0c5ade1c0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -112,6 +112,8 @@ private[spark] class BlockManager( MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf) private val broadcastCleaner = new MetadataCleaner( MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf) + + // Field related to peer block managers that are necessary for block replication @volatile private var cachedPeers: Seq[BlockManagerId] = _ private val peerFetchLock = new Object private var lastPeerFetchTime = 0L @@ -807,7 +809,8 @@ private[spark] class BlockManager( } /** - * Replicate block to another node. + * Replicate block to another node. Not that this is a blocking call that returns after + * the block has been replicated. */ private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 0d4312e3ed4c2..2dd10af892a4a 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -48,6 +48,7 @@ import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat import org.apache.spark.storage.StorageLevel._ import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils} +import org.apache.spark.network.{ManagedBuffer, BlockTransferService} class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter @@ -1370,41 +1371,85 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(a3Locs3x !== a2Locs3x, "Two blocks gave same locations with 3x replication") } + test("block replication - replication failures") { + + /* + Create a system of three block managers / stores. One of them (say, failableStore) + cannot receive blocks. So attempts to use that as replication target fails. + + store <-----------/fails/-----------> failableStore + A A + | | + | | + -----/works/-----> store1 <----/fails/------ + + We are first going to add a normal store and a failable store, and test + whether 2x replication fails to create two copies of a block. + Then we are going to add the third normal store, + and test that now 2x replication works as the new store will be used for replication. + */ + + + // Insert a block with 2x replication and return the number of copies of the block + def replicateAndGetNumCopies(blockId: String): Int = { + store.putSingle(blockId, new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK_2) + val numLocations = master.getLocations(blockId).size + allStores.foreach { _.removeBlock(blockId) } + numLocations + } + + // Add a normal block manager + store = makeBlockManager(10000, "store") + + // Add a failable block manager with a mock transfer service that does not + // allow receiving of blocks. So attempts to use it as a replication target will fail. + val failableTransfer = mock(classOf[BlockTransferService]) // this wont actually work + when(failableTransfer.hostName).thenReturn("some-hostname") + when(failableTransfer.port).thenReturn(1000) + val failableStore = new BlockManager("failable-store", actorSystem, master, serializer, 10000, conf, + mapOutputTracker, shuffleManager, failableTransfer) + allStores += failableStore // so that this gets stopped after test + assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId)) + + // Test that 2x replication fails by creating only one copy of the block + assert(replicateAndGetNumCopies("a1") === 1) + + // Add another normal block manager and test that 2x replication works + store = makeBlockManager(10000, "store2") + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + assert(replicateAndGetNumCopies("a2") === 2) + } + } + test("block replication - addition and deletion of block managers") { val blockSize = 1000 val storeSize = 10000 val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") } - /** - * Function to test whether insert a block with replication achieves the expected replication. - * Since this function can be call on the same block id repeatedly through an `eventually`, - * it needs to be ensured that the method leaves block manager + master in the same state as - * it was before attempting to insert the block. - */ - def testPut(blockId: String, storageLevel: StorageLevel, expectedNumLocations: Int) { - try { - initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel) - assert(master.getLocations(blockId).size === expectedNumLocations) - } finally { - allStores.foreach { _.removeBlock(blockId) } - } + // Insert a block with given replication factor and return the number of copies of the block\ + def replicateAndGetNumCopies(blockId: String, replicationFactor: Int): Int = { + val storageLevel = StorageLevel(true, true, false, true, replicationFactor) + initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel) + val numLocations = master.getLocations(blockId).size + allStores.foreach { _.removeBlock(blockId) } + numLocations } // 2x replication should work, 3x replication should only replicate 2x - testPut("a1", StorageLevel.MEMORY_AND_DISK_2, 2) - testPut("a2", StorageLevel(true, true, false, true, 3), 2) + assert(replicateAndGetNumCopies("a1", 2) === 2) + assert(replicateAndGetNumCopies("a2", 3) === 2) // Add another store, 3x replication should work now, 4x replication should only replicate 3x val newStore1 = makeBlockManager(storeSize, s"newstore1") eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { - testPut("a3", StorageLevel(true, true, false, true, 3), 3) + assert(replicateAndGetNumCopies("a3", 3) === 3) } - testPut("a4",StorageLevel(true, true, false, true, 4), 3) + assert(replicateAndGetNumCopies("a4", 4) === 3) // Add another store, 4x replication should work now val newStore2 = makeBlockManager(storeSize, s"newstore2") eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { - testPut("a5", StorageLevel(true, true, false, true, 4), 4) + assert(replicateAndGetNumCopies("a5", 4) === 4) } // Remove all but the 1st store, 2x replication should fail @@ -1413,12 +1458,12 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter master.removeExecutor(store.blockManagerId.executorId) store.stop() } - testPut("a6", StorageLevel.MEMORY_AND_DISK_2, 1) + assert(replicateAndGetNumCopies("a6", 2) === 1) // Add new stores, 3x replication should work val newStores = (3 to 5).map { i => makeBlockManager(storeSize, s"newstore$i") } eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { - testPut("a7", StorageLevel(true, true, false, true, 3), 3) + assert(replicateAndGetNumCopies("a7", 3) === 3) } } From 06617739863b3c79a7be91e83bf382336b03083e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 30 Sep 2014 10:34:05 -0700 Subject: [PATCH 15/16] Minor changes based on PR comments. --- .../scala/org/apache/spark/storage/BlockManager.scala | 6 +++--- .../org/apache/spark/storage/BlockManagerId.scala | 2 +- .../org/apache/spark/storage/BlockManagerSuite.scala | 10 ++++------ 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a1ff0c5ade1c0..3f5d06e1aeee7 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -820,7 +820,7 @@ private[spark] class BlockManager( val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) - val startTime = System.nanoTime + val startTime = System.currentTimeMillis val random = new Random(blockId.hashCode) var replicationFailed = false @@ -871,7 +871,7 @@ private[spark] class BlockManager( blockTransferService.uploadBlockSync( peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms" - .format((System.currentTimeMillis - onePeerStartTime) / 1e3)) + .format((System.currentTimeMillis - onePeerStartTime))) peersReplicatedTo += peer peersForReplication -= peer replicationFailed = false @@ -892,7 +892,7 @@ private[spark] class BlockManager( done = true } } - val timeTakeMs = (System.currentTimeMillis - startTime) / 1e3 + val timeTakeMs = (System.currentTimeMillis - startTime) logDebug(s"Replicating $blockId of ${data.limit()} bytes to " + s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms") if (peersReplicatedTo.size < numPeersToReplicateTo) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index 94c5e5e8b9acd..142285094342c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -59,7 +59,7 @@ class BlockManagerId private ( def port: Int = port_ - def isDriver = (executorId == "") + def isDriver: Boolean = (executorId == "") override def writeExternal(out: ObjectOutput) { out.writeUTF(executorId_) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 2dd10af892a4a..d7b28b9333be0 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1372,7 +1372,6 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("block replication - replication failures") { - /* Create a system of three block managers / stores. One of them (say, failableStore) cannot receive blocks. So attempts to use that as replication target fails. @@ -1383,13 +1382,12 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter | | -----/works/-----> store1 <----/fails/------ - We are first going to add a normal store and a failable store, and test - whether 2x replication fails to create two copies of a block. - Then we are going to add the third normal store, + We are first going to add a normal block manager (i.e. store) and the failable block + manager (i.e. failableStore), and test whether 2x replication fails to create two + copies of a block. Then we are going to add another normal block manager (i.e., store1), and test that now 2x replication works as the new store will be used for replication. */ - // Insert a block with 2x replication and return the number of copies of the block def replicateAndGetNumCopies(blockId: String): Int = { store.putSingle(blockId, new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK_2) @@ -1490,7 +1488,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter i => makeBlockManager(storeSize, s"store$i") } - storageLevels.foreach {storageLevel => + storageLevels.foreach { storageLevel => // Put the block into one of the stores val blockId = new TestBlockId( "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase) From 9690f57be441ea15b3c9de040d6ba07bec262e22 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 2 Oct 2014 11:11:00 -0700 Subject: [PATCH 16/16] Moved replication tests to a new BlockManagerReplicationSuite. --- .../BlockManagerReplicationSuite.scala | 418 ++++++++++++++++++ .../spark/storage/BlockManagerSuite.scala | 335 +------------- 2 files changed, 421 insertions(+), 332 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala new file mode 100644 index 0000000000000..1f1d53a1ee3b0 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ +import scala.language.implicitConversions +import scala.language.postfixOps + +import akka.actor.{ActorSystem, Props} +import org.mockito.Mockito.{mock, when} +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers, PrivateMethodTester} +import org.scalatest.concurrent.Eventually._ + +import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf} +import org.apache.spark.network.BlockTransferService +import org.apache.spark.network.nio.NioBlockTransferService +import org.apache.spark.scheduler.LiveListenerBus +import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.shuffle.hash.HashShuffleManager +import org.apache.spark.storage.StorageLevel._ +import org.apache.spark.util.{AkkaUtils, SizeEstimator} + +/** Testsuite that tests block replication in BlockManager */ +class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAndAfter { + + private val conf = new SparkConf(false) + var actorSystem: ActorSystem = null + var master: BlockManagerMaster = null + val securityMgr = new SecurityManager(conf) + val mapOutputTracker = new MapOutputTrackerMaster(conf) + val shuffleManager = new HashShuffleManager(conf) + + // List of block manager created during an unit test, so that all of the them can be stopped + // after the unit test. + val allStores = new ArrayBuffer[BlockManager] + + // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test + conf.set("spark.kryoserializer.buffer.mb", "1") + val serializer = new KryoSerializer(conf) + + // Implicitly convert strings to BlockIds for test clarity. + implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) + + private def makeBlockManager(maxMem: Long, name: String = ""): BlockManager = { + val transfer = new NioBlockTransferService(conf, securityMgr) + val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf, + mapOutputTracker, shuffleManager, transfer) + allStores += store + store + } + + before { + val (actorSystem, boundPort) = AkkaUtils.createActorSystem( + "test", "localhost", 0, conf = conf, securityManager = securityMgr) + this.actorSystem = actorSystem + + conf.set("spark.authenticate", "false") + conf.set("spark.driver.port", boundPort.toString) + conf.set("spark.storage.unrollFraction", "0.4") + conf.set("spark.storage.unrollMemoryThreshold", "512") + + // to make a replication attempt to inactive store fail fast + conf.set("spark.core.connection.ack.wait.timeout", "1") + // to make cached peers refresh frequently + conf.set("spark.storage.cachedPeersTtl", "10") + + master = new BlockManagerMaster( + actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), + conf, true) + allStores.clear() + } + + after { + allStores.foreach { _.stop() } + allStores.clear() + actorSystem.shutdown() + actorSystem.awaitTermination() + actorSystem = null + master = null + } + + + test("get peers with addition and removal of block managers") { + val numStores = 4 + val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, s"store$i") } + val storeIds = stores.map { _.blockManagerId }.toSet + assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId }) + assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId }) + assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId }) + + // Add driver store and test whether it is filtered out + val driverStore = makeBlockManager(1000, "") + assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver)) + assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver)) + assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver)) + + // Add a new store and test whether get peers returns it + val newStore = makeBlockManager(1000, s"store$numStores") + assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId } + newStore.blockManagerId) + assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId } + newStore.blockManagerId) + assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId } + newStore.blockManagerId) + assert(master.getPeers(newStore.blockManagerId).toSet === storeIds) + + // Remove a store and test whether get peers returns it + val storeIdToRemove = stores(0).blockManagerId + master.removeExecutor(storeIdToRemove.executorId) + assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove)) + + // Test whether asking for peers of a unregistered block manager id returns empty list + assert(master.getPeers(stores(0).blockManagerId).isEmpty) + assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty) + } + + + test("block replication - 2x replication") { + testReplication(2, + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2) + ) + } + + test("block replication - 3x replication") { + // Generate storage levels with 3x replication + val storageLevels = { + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map { + level => StorageLevel( + level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 3) + } + } + testReplication(3, storageLevels) + } + + test("block replication - mixed between 1x to 5x") { + // Generate storage levels with varying replication + val storageLevels = Seq( + MEMORY_ONLY, + MEMORY_ONLY_SER_2, + StorageLevel(true, false, false, false, 3), + StorageLevel(true, true, false, true, 4), + StorageLevel(true, true, false, false, 5), + StorageLevel(true, true, false, true, 4), + StorageLevel(true, false, false, false, 3), + MEMORY_ONLY_SER_2, + MEMORY_ONLY + ) + testReplication(5, storageLevels) + } + + test("block replication - 2x replication without peers") { + intercept[org.scalatest.exceptions.TestFailedException] { + testReplication(1, + Seq(StorageLevel.MEMORY_AND_DISK_2, StorageLevel(true, false, false, false, 3))) + } + } + + test("block replication - deterministic node selection") { + val blockSize = 1000 + val storeSize = 10000 + val stores = (1 to 5).map { + i => makeBlockManager(storeSize, s"store$i") + } + val storageLevel2x = StorageLevel.MEMORY_AND_DISK_2 + val storageLevel3x = StorageLevel(true, true, false, true, 3) + val storageLevel4x = StorageLevel(true, true, false, true, 4) + + def putBlockAndGetLocations(blockId: String, level: StorageLevel): Set[BlockManagerId] = { + stores.head.putSingle(blockId, new Array[Byte](blockSize), level) + val locations = master.getLocations(blockId).sortBy { _.executorId }.toSet + stores.foreach { _.removeBlock(blockId) } + master.removeBlock(blockId) + locations + } + + // Test if two attempts to 2x replication returns same set of locations + val a1Locs = putBlockAndGetLocations("a1", storageLevel2x) + assert(putBlockAndGetLocations("a1", storageLevel2x) === a1Locs, + "Inserting a 2x replicated block second time gave different locations from the first") + + // Test if two attempts to 3x replication returns same set of locations + val a2Locs3x = putBlockAndGetLocations("a2", storageLevel3x) + assert(putBlockAndGetLocations("a2", storageLevel3x) === a2Locs3x, + "Inserting a 3x replicated block second time gave different locations from the first") + + // Test if 2x replication of a2 returns a strict subset of the locations of 3x replication + val a2Locs2x = putBlockAndGetLocations("a2", storageLevel2x) + assert( + a2Locs2x.subsetOf(a2Locs3x), + "Inserting a with 2x replication gave locations that are not a subset of locations" + + s" with 3x replication [3x: ${a2Locs3x.mkString(",")}; 2x: ${a2Locs2x.mkString(",")}" + ) + + // Test if 4x replication of a2 returns a strict superset of the locations of 3x replication + val a2Locs4x = putBlockAndGetLocations("a2", storageLevel4x) + assert( + a2Locs3x.subsetOf(a2Locs4x), + "Inserting a with 4x replication gave locations that are not a superset of locations " + + s"with 3x replication [3x: ${a2Locs3x.mkString(",")}; 4x: ${a2Locs4x.mkString(",")}" + ) + + // Test if 3x replication of two different blocks gives two different sets of locations + val a3Locs3x = putBlockAndGetLocations("a3", storageLevel3x) + assert(a3Locs3x !== a2Locs3x, "Two blocks gave same locations with 3x replication") + } + + test("block replication - replication failures") { + /* + Create a system of three block managers / stores. One of them (say, failableStore) + cannot receive blocks. So attempts to use that as replication target fails. + + +-----------/fails/-----------> failableStore + | + normalStore + | + +-----------/works/-----------> anotherNormalStore + + We are first going to add a normal block manager (i.e. normalStore) and the failable block + manager (i.e. failableStore), and test whether 2x replication fails to create two + copies of a block. Then we are going to add another normal block manager + (i.e., anotherNormalStore), and test that now 2x replication works as the + new store will be used for replication. + */ + + // Add a normal block manager + val store = makeBlockManager(10000, "store") + + // Insert a block with 2x replication and return the number of copies of the block + def replicateAndGetNumCopies(blockId: String): Int = { + store.putSingle(blockId, new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK_2) + val numLocations = master.getLocations(blockId).size + allStores.foreach { _.removeBlock(blockId) } + numLocations + } + + // Add a failable block manager with a mock transfer service that does not + // allow receiving of blocks. So attempts to use it as a replication target will fail. + val failableTransfer = mock(classOf[BlockTransferService]) // this wont actually work + when(failableTransfer.hostName).thenReturn("some-hostname") + when(failableTransfer.port).thenReturn(1000) + val failableStore = new BlockManager("failable-store", actorSystem, master, serializer, + 10000, conf, mapOutputTracker, shuffleManager, failableTransfer) + allStores += failableStore // so that this gets stopped after test + assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId)) + + // Test that 2x replication fails by creating only one copy of the block + assert(replicateAndGetNumCopies("a1") === 1) + + // Add another normal block manager and test that 2x replication works + makeBlockManager(10000, "anotherStore") + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + assert(replicateAndGetNumCopies("a2") === 2) + } + } + + test("block replication - addition and deletion of block managers") { + val blockSize = 1000 + val storeSize = 10000 + val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") } + + // Insert a block with given replication factor and return the number of copies of the block\ + def replicateAndGetNumCopies(blockId: String, replicationFactor: Int): Int = { + val storageLevel = StorageLevel(true, true, false, true, replicationFactor) + initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel) + val numLocations = master.getLocations(blockId).size + allStores.foreach { _.removeBlock(blockId) } + numLocations + } + + // 2x replication should work, 3x replication should only replicate 2x + assert(replicateAndGetNumCopies("a1", 2) === 2) + assert(replicateAndGetNumCopies("a2", 3) === 2) + + // Add another store, 3x replication should work now, 4x replication should only replicate 3x + val newStore1 = makeBlockManager(storeSize, s"newstore1") + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + assert(replicateAndGetNumCopies("a3", 3) === 3) + } + assert(replicateAndGetNumCopies("a4", 4) === 3) + + // Add another store, 4x replication should work now + val newStore2 = makeBlockManager(storeSize, s"newstore2") + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + assert(replicateAndGetNumCopies("a5", 4) === 4) + } + + // Remove all but the 1st store, 2x replication should fail + (initialStores.tail ++ Seq(newStore1, newStore2)).foreach { + store => + master.removeExecutor(store.blockManagerId.executorId) + store.stop() + } + assert(replicateAndGetNumCopies("a6", 2) === 1) + + // Add new stores, 3x replication should work + val newStores = (3 to 5).map { + i => makeBlockManager(storeSize, s"newstore$i") + } + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + assert(replicateAndGetNumCopies("a7", 3) === 3) + } + } + + /** + * Test replication of blocks with different storage levels (various combinations of + * memory, disk & serialization). For each storage level, this function tests every store + * whether the block is present and also tests the master whether its knowledge of blocks + * is correct. Then it also drops the block from memory of each store (using LRU) and + * again checks whether the master's knowledge gets updated. + */ + private def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) { + import org.apache.spark.storage.StorageLevel._ + + assert(maxReplication > 1, + s"Cannot test replication factor $maxReplication") + + // storage levels to test with the given replication factor + + val storeSize = 10000 + val blockSize = 1000 + + // As many stores as the replication factor + val stores = (1 to maxReplication).map { + i => makeBlockManager(storeSize, s"store$i") + } + + storageLevels.foreach { storageLevel => + // Put the block into one of the stores + val blockId = new TestBlockId( + "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase) + stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) + + // Assert that master know two locations for the block + val blockLocations = master.getLocations(blockId).map(_.executorId).toSet + assert(blockLocations.size === storageLevel.replication, + s"master did not have ${storageLevel.replication} locations for $blockId") + + // Test state of the stores that contain the block + stores.filter { + testStore => blockLocations.contains(testStore.blockManagerId.executorId) + }.foreach { testStore => + val testStoreName = testStore.blockManagerId.executorId + assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName") + assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName), + s"master does not have status for ${blockId.name} in $testStoreName") + + val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId) + + // Assert that block status in the master for this store has expected storage level + assert( + blockStatus.storageLevel.useDisk === storageLevel.useDisk && + blockStatus.storageLevel.useMemory === storageLevel.useMemory && + blockStatus.storageLevel.useOffHeap === storageLevel.useOffHeap && + blockStatus.storageLevel.deserialized === storageLevel.deserialized, + s"master does not know correct storage level for ${blockId.name} in $testStoreName") + + // Assert that the block status in the master for this store has correct memory usage info + assert(!blockStatus.storageLevel.useMemory || blockStatus.memSize >= blockSize, + s"master does not know size of ${blockId.name} stored in memory of $testStoreName") + + + // If the block is supposed to be in memory, then drop the copy of the block in + // this store test whether master is updated with zero memory usage this store + if (storageLevel.useMemory) { + // Force the block to be dropped by adding a number of dummy blocks + (1 to 10).foreach { + i => + testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER) + } + (1 to 10).foreach { + i => testStore.removeBlock(s"dummy-block-$i") + } + + val newBlockStatusOption = master.getBlockStatus(blockId).get(testStore.blockManagerId) + + // Assert that the block status in the master either does not exist (block removed + // from every store) or has zero memory usage for this store + assert( + newBlockStatusOption.isEmpty || newBlockStatusOption.get.memSize === 0, + s"after dropping, master does not know size of ${blockId.name} " + + s"stored in memory of $testStoreName" + ) + } + + // If the block is supposed to be in disk (after dropping or otherwise, then + // test whether master has correct disk usage for this store + if (storageLevel.useDisk) { + assert(master.getBlockStatus(blockId)(testStore.blockManagerId).diskSize >= blockSize, + s"after dropping, master does not know size of ${blockId.name} " + + s"stored in disk of $testStoreName" + ) + } + } + master.removeBlock(blockId) + } + } +} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index d7b28b9333be0..9d96202a3e7ac 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -21,8 +21,6 @@ import java.nio.{ByteBuffer, MappedByteBuffer} import java.util.Arrays import java.util.concurrent.TimeUnit -import org.apache.spark.network.nio.NioBlockTransferService - import scala.collection.mutable.ArrayBuffer import scala.concurrent.Await import scala.concurrent.duration._ @@ -35,20 +33,18 @@ import akka.util.Timeout import org.mockito.Mockito.{mock, when} -import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester} +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts._ -import org.scalatest.Matchers import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf} import org.apache.spark.executor.DataReadMethod +import org.apache.spark.network.nio.NioBlockTransferService import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat -import org.apache.spark.storage.StorageLevel._ import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils} -import org.apache.spark.network.{ManagedBuffer, BlockTransferService} class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter @@ -65,10 +61,6 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter val mapOutputTracker = new MapOutputTrackerMaster(conf) val shuffleManager = new HashShuffleManager(conf) - // List of block manager created during an unit test, so that all of the them can be stopped - // after the unit test. - val allStores = new ArrayBuffer[BlockManager] - // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test conf.set("spark.kryoserializer.buffer.mb", "1") val serializer = new KryoSerializer(conf) @@ -79,10 +71,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter private def makeBlockManager(maxMem: Long, name: String = ""): BlockManager = { val transfer = new NioBlockTransferService(conf, securityMgr) - val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf, + new BlockManager(name, actorSystem, master, serializer, maxMem, conf, mapOutputTracker, shuffleManager, transfer) - allStores += store - store } before { @@ -98,17 +88,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter conf.set("spark.storage.unrollFraction", "0.4") conf.set("spark.storage.unrollMemoryThreshold", "512") - // for block replication test, to make a replication attempt to inactive store fail fast - conf.set("spark.core.connection.ack.wait.timeout", "1") - // for block replication test, to make cached peers refresh frequently - conf.set("spark.storage.cachedPeersTtl", "10") - master = new BlockManagerMaster( actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), conf, true) - allStores.clear() - val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() } @@ -122,8 +105,6 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter store2.stop() store2 = null } - allStores.foreach { _.stop() } - allStores.clear() actorSystem.shutdown() actorSystem.awaitTermination() actorSystem = null @@ -1244,314 +1225,4 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) } - - test("get peers with addition and removal of block managers") { - val numStores = 4 - val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, s"store$i") } - val storeIds = stores.map { _.blockManagerId }.toSet - assert(master.getPeers(stores(0).blockManagerId).toSet === - storeIds.filterNot { _ == stores(0).blockManagerId }) - assert(master.getPeers(stores(1).blockManagerId).toSet === - storeIds.filterNot { _ == stores(1).blockManagerId }) - assert(master.getPeers(stores(2).blockManagerId).toSet === - storeIds.filterNot { _ == stores(2).blockManagerId }) - - // Add driver store and test whether it is filtered out - val driverStore = makeBlockManager(1000, "") - assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver)) - assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver)) - assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver)) - - // Add a new store and test whether get peers returns it - val newStore = makeBlockManager(1000, s"store$numStores") - assert(master.getPeers(stores(0).blockManagerId).toSet === - storeIds.filterNot { _ == stores(0).blockManagerId } + newStore.blockManagerId) - assert(master.getPeers(stores(1).blockManagerId).toSet === - storeIds.filterNot { _ == stores(1).blockManagerId } + newStore.blockManagerId) - assert(master.getPeers(stores(2).blockManagerId).toSet === - storeIds.filterNot { _ == stores(2).blockManagerId } + newStore.blockManagerId) - assert(master.getPeers(newStore.blockManagerId).toSet === storeIds) - - // Remove a store and test whether get peers returns it - val storeIdToRemove = stores(0).blockManagerId - master.removeExecutor(storeIdToRemove.executorId) - assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove)) - assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove)) - assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove)) - - // Test whether asking for peers of a unregistered block manager id returns empty list - assert(master.getPeers(stores(0).blockManagerId).isEmpty) - assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty) - } - - test("block replication - 2x replication") { - testReplication(2, - Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2) - ) - } - - test("block replication - 3x replication") { - // Generate storage levels with 3x replication - val storageLevels = { - Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map { - level => StorageLevel( - level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 3) - } - } - testReplication(3, storageLevels) - } - - test("block replication - mixed between 1x to 5x") { - // Generate storage levels with varying replication - val storageLevels = Seq( - MEMORY_ONLY, - MEMORY_ONLY_SER_2, - StorageLevel(true, false, false, false, 3), - StorageLevel(true, true, false, true, 4), - StorageLevel(true, true, false, false, 5), - StorageLevel(true, true, false, true, 4), - StorageLevel(true, false, false, false, 3), - MEMORY_ONLY_SER_2, - MEMORY_ONLY - ) - testReplication(5, storageLevels) - } - - test("block replication - 2x replication without peers") { - intercept[org.scalatest.exceptions.TestFailedException] { - testReplication(1, - Seq(StorageLevel.MEMORY_AND_DISK_2, StorageLevel(true, false, false, false, 3))) - } - } - - test("block replication - deterministic node selection") { - val blockSize = 1000 - val storeSize = 10000 - val stores = (1 to 5).map { i => makeBlockManager(storeSize, s"store$i") } - val storageLevel2x = StorageLevel.MEMORY_AND_DISK_2 - val storageLevel3x = StorageLevel(true, true, false, true, 3) - val storageLevel4x = StorageLevel(true, true, false, true, 4) - - def putBlockAndGetLocations(blockId: String, level: StorageLevel): Set[BlockManagerId] = { - stores.head.putSingle(blockId, new Array[Byte](blockSize), level) - val locations = master.getLocations(blockId).sortBy { _.executorId }.toSet - stores.foreach { _.removeBlock(blockId) } - master.removeBlock(blockId) - locations - } - - // Test if two attempts to 2x replication returns same set of locations - val a1Locs = putBlockAndGetLocations("a1", storageLevel2x) - assert(putBlockAndGetLocations("a1", storageLevel2x) === a1Locs, - "Inserting a 2x replicated block second time gave different locations from the first") - - // Test if two attempts to 3x replication returns same set of locations - val a2Locs3x = putBlockAndGetLocations("a2", storageLevel3x) - assert(putBlockAndGetLocations("a2", storageLevel3x) === a2Locs3x, - "Inserting a 3x replicated block second time gave different locations from the first") - - // Test if 2x replication of a2 returns a strict subset of the locations of 3x replication - val a2Locs2x = putBlockAndGetLocations("a2", storageLevel2x) - assert( - a2Locs2x.subsetOf(a2Locs3x), - "Inserting a with 2x replication gave locations that are not a subset of locations" + - s" with 3x replication [3x: ${a2Locs3x.mkString(",")}; 2x: ${a2Locs2x.mkString(",")}" - ) - - // Test if 4x replication of a2 returns a strict superset of the locations of 3x replication - val a2Locs4x = putBlockAndGetLocations("a2", storageLevel4x) - assert( - a2Locs3x.subsetOf(a2Locs4x), - "Inserting a with 4x replication gave locations that are not a superset of locations " + - s"with 3x replication [3x: ${a2Locs3x.mkString(",")}; 4x: ${a2Locs4x.mkString(",")}" - ) - - // Test if 3x replication of two different blocks gives two different sets of locations - val a3Locs3x = putBlockAndGetLocations("a3", storageLevel3x) - assert(a3Locs3x !== a2Locs3x, "Two blocks gave same locations with 3x replication") - } - - test("block replication - replication failures") { - /* - Create a system of three block managers / stores. One of them (say, failableStore) - cannot receive blocks. So attempts to use that as replication target fails. - - store <-----------/fails/-----------> failableStore - A A - | | - | | - -----/works/-----> store1 <----/fails/------ - - We are first going to add a normal block manager (i.e. store) and the failable block - manager (i.e. failableStore), and test whether 2x replication fails to create two - copies of a block. Then we are going to add another normal block manager (i.e., store1), - and test that now 2x replication works as the new store will be used for replication. - */ - - // Insert a block with 2x replication and return the number of copies of the block - def replicateAndGetNumCopies(blockId: String): Int = { - store.putSingle(blockId, new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK_2) - val numLocations = master.getLocations(blockId).size - allStores.foreach { _.removeBlock(blockId) } - numLocations - } - - // Add a normal block manager - store = makeBlockManager(10000, "store") - - // Add a failable block manager with a mock transfer service that does not - // allow receiving of blocks. So attempts to use it as a replication target will fail. - val failableTransfer = mock(classOf[BlockTransferService]) // this wont actually work - when(failableTransfer.hostName).thenReturn("some-hostname") - when(failableTransfer.port).thenReturn(1000) - val failableStore = new BlockManager("failable-store", actorSystem, master, serializer, 10000, conf, - mapOutputTracker, shuffleManager, failableTransfer) - allStores += failableStore // so that this gets stopped after test - assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId)) - - // Test that 2x replication fails by creating only one copy of the block - assert(replicateAndGetNumCopies("a1") === 1) - - // Add another normal block manager and test that 2x replication works - store = makeBlockManager(10000, "store2") - eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { - assert(replicateAndGetNumCopies("a2") === 2) - } - } - - test("block replication - addition and deletion of block managers") { - val blockSize = 1000 - val storeSize = 10000 - val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") } - - // Insert a block with given replication factor and return the number of copies of the block\ - def replicateAndGetNumCopies(blockId: String, replicationFactor: Int): Int = { - val storageLevel = StorageLevel(true, true, false, true, replicationFactor) - initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel) - val numLocations = master.getLocations(blockId).size - allStores.foreach { _.removeBlock(blockId) } - numLocations - } - - // 2x replication should work, 3x replication should only replicate 2x - assert(replicateAndGetNumCopies("a1", 2) === 2) - assert(replicateAndGetNumCopies("a2", 3) === 2) - - // Add another store, 3x replication should work now, 4x replication should only replicate 3x - val newStore1 = makeBlockManager(storeSize, s"newstore1") - eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { - assert(replicateAndGetNumCopies("a3", 3) === 3) - } - assert(replicateAndGetNumCopies("a4", 4) === 3) - - // Add another store, 4x replication should work now - val newStore2 = makeBlockManager(storeSize, s"newstore2") - eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { - assert(replicateAndGetNumCopies("a5", 4) === 4) - } - - // Remove all but the 1st store, 2x replication should fail - (initialStores.tail ++ Seq(newStore1, newStore2)).foreach { - store => - master.removeExecutor(store.blockManagerId.executorId) - store.stop() - } - assert(replicateAndGetNumCopies("a6", 2) === 1) - - // Add new stores, 3x replication should work - val newStores = (3 to 5).map { i => makeBlockManager(storeSize, s"newstore$i") } - eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { - assert(replicateAndGetNumCopies("a7", 3) === 3) - } - } - - /** - * Test replication of blocks with different storage levels (various combinations of - * memory, disk & serialization). For each storage level, this function tests every store - * whether the block is present and also tests the master whether its knowledge of blocks - * is correct. Then it also drops the block from memory of each store (using LRU) and - * again checks whether the master's knowledge gets updated. - */ - private def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) { - import org.apache.spark.storage.StorageLevel._ - - assert(maxReplication > 1, - s"Cannot test replication factor $maxReplication") - - // storage levels to test with the given replication factor - - val storeSize = 10000 - val blockSize = 1000 - - // As many stores as the replication factor - val stores = (1 to maxReplication).map { - i => makeBlockManager(storeSize, s"store$i") - } - - storageLevels.foreach { storageLevel => - // Put the block into one of the stores - val blockId = new TestBlockId( - "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase) - stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) - - // Assert that master know two locations for the block - val blockLocations = master.getLocations(blockId).map(_.executorId).toSet - assert(blockLocations.size === storageLevel.replication, - s"master did not have ${storageLevel.replication} locations for $blockId") - - // Test state of the stores that contain the block - stores.filter { testStore => blockLocations.contains(testStore.blockManagerId.executorId) } - .foreach { testStore => - val testStoreName = testStore.blockManagerId.executorId - assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName") - assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName), - s"master does not have status for ${blockId.name} in $testStoreName") - - val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId) - - // Assert that block status in the master for this store has expected storage level - assert( - blockStatus.storageLevel.useDisk === storageLevel.useDisk && - blockStatus.storageLevel.useMemory === storageLevel.useMemory && - blockStatus.storageLevel.useOffHeap === storageLevel.useOffHeap && - blockStatus.storageLevel.deserialized === storageLevel.deserialized, - s"master does not know correct storage level for ${blockId.name} in $testStoreName") - - // Assert that the block status in the master for this store has correct memory usage info - assert(!blockStatus.storageLevel.useMemory || blockStatus.memSize >= blockSize, - s"master does not know size of ${blockId.name} stored in memory of $testStoreName") - - - // If the block is supposed to be in memory, then drop the copy of the block in - // this store test whether master is updated with zero memory usage this store - if (storageLevel.useMemory) { - // Force the block to be dropped by adding a number of dummy blocks - (1 to 10).foreach { i => - testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER) - } - (1 to 10).foreach { i => testStore.removeBlock(s"dummy-block-$i") } - - val newBlockStatusOption = master.getBlockStatus(blockId).get(testStore.blockManagerId) - - // Assert that the block status in the master either does not exist (block removed - // from every store) or has zero memory usage for this store - assert( - newBlockStatusOption.isEmpty || newBlockStatusOption.get.memSize === 0, - s"after dropping, master does not know size of ${blockId.name} " + - s"stored in memory of $testStoreName" - ) - } - - // If the block is supposed to be in disk (after dropping or otherwise, then - // test whether master has correct disk usage for this store - if (storageLevel.useDisk) { - assert(master.getBlockStatus(blockId)(testStore.blockManagerId).diskSize >= blockSize, - s"after dropping, master does not know size of ${blockId.name} " + - s"stored in disk of $testStoreName" - ) - } - } - master.removeBlock(blockId) - } - } }