From 0522bb8d459e341e6003cc8cc4a0481bc5d5713a Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 21 Sep 2017 14:12:24 -0500 Subject: [PATCH 1/4] [SPARK-22083][CORE] Release locks in MemoryStore.evictBlocksToFreeSpace MemoryStore.evictBlocksToFreeSpace acquires write locks for all the blocks it intends to evict up front. If there is a failure to evict blocks (eg., some failure dropping a block to disk), then we have to release the lock. Otherwise the lock is never released and an executor trying to get the lock will wait forever. --- .../spark/storage/memory/MemoryStore.scala | 45 +++++++---- .../spark/storage/MemoryStoreSuite.scala | 75 +++++++++++++++++++ 2 files changed, 107 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index eb2201d142ffb..4ca939fa466c3 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{UNROLL_MEMORY_CHECK_PERIOD, UNROLL_MEMORY_GROWTH_FACTOR} import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.serializer.{SerializationStream, SerializerManager} -import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel, StreamBlockId} +import org.apache.spark.storage._ import org.apache.spark.unsafe.Platform import org.apache.spark.util.{SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector @@ -544,20 +544,39 @@ private[spark] class MemoryStore( } if (freedMemory >= space) { - logInfo(s"${selectedBlocks.size} blocks selected for dropping " + - s"(${Utils.bytesToString(freedMemory)} bytes)") - for (blockId <- selectedBlocks) { - val entry = entries.synchronized { entries.get(blockId) } - // This should never be null as only one task should be dropping - // blocks and removing entries. However the check is still here for - // future safety. - if (entry != null) { - dropBlock(blockId, entry) + var exceptionWasThrown: Boolean = true + try { + logInfo(s"${selectedBlocks.size} blocks selected for dropping " + + s"(${Utils.bytesToString(freedMemory)} bytes)") + for (blockId <- selectedBlocks) { + val entry = entries.synchronized { + entries.get(blockId) + } + // This should never be null as only one task should be dropping + // blocks and removing entries. However the check is still here for + // future safety. + if (entry != null) { + dropBlock(blockId, entry) + } + } + exceptionWasThrown = false + logInfo(s"After dropping ${selectedBlocks.size} blocks, " + + s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}") + freedMemory + } finally { + // like BlockManager.doPut, we use a finally rather than a catch to avoid having to deal + // with InterruptedException + if (exceptionWasThrown) { + selectedBlocks.foreach { id => + // some of the blocks may have already been unlocked, or completely removed + blockInfoManager.get(id).foreach { info => + if (info.readerCount > 0 || info.writerTask != BlockInfo.NO_WRITER) { + blockInfoManager.unlock(id) + } + } + } } } - logInfo(s"After dropping ${selectedBlocks.size} blocks, " + - s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}") - freedMemory } else { blockId.foreach { id => logInfo(s"Will not store $id") diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala index 9929ea033a99f..ffdf212df1735 100644 --- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala @@ -407,4 +407,79 @@ class MemoryStoreSuite }) assert(memoryStore.getSize(blockId) === 10000) } + + test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") { + // Setup a memory store with many blocks cached, and then one request which leads to multiple + // blocks getting evicted. We'll make the eviction throw an exception, and make sure that + // all locks are released. + val ct = implicitly[ClassTag[Array[Byte]]] + def testWithFailureOnNthDrop(failAfterDroppingNBlocks: Int): Unit = { + val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, numCores = 1) + val blockInfoManager = new BlockInfoManager + var droppedSoFar = 0 + val blockEvictionHandler = new BlockEvictionHandler { + var memoryStore: MemoryStore = _ + + override private[storage] def dropFromMemory[T: ClassTag]( + blockId: BlockId, + data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { + if (droppedSoFar < failAfterDroppingNBlocks) { + droppedSoFar += 1 + memoryStore.remove(blockId) + StorageLevel.NONE + } else { + throw new RuntimeException(s"Mock error dropping block $droppedSoFar") + } + } + } + val memoryStore = + new MemoryStore(conf, blockInfoManager, serializerManager, memManager, blockEvictionHandler) + blockEvictionHandler.memoryStore = memoryStore + memManager.setMemoryStore(memoryStore) + + // Put in some small blocks to fill up the memory store + val initialBlocks = (1 to 10).map { id => + val blockId = BlockId(s"rdd_1_$id") + val blockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false) + val initialWriteLock = blockInfoManager.lockNewBlockForWriting(blockId, blockInfo) + assert(initialWriteLock) + val success = memoryStore.putBytes(blockId, 10, MemoryMode.ON_HEAP, () => { + new ChunkedByteBuffer(ByteBuffer.allocate(10)) + }) + assert(success) + blockInfoManager.unlock(blockId, None) + } + assert(blockInfoManager.size === 10) + + + // Add one big block, which will require evicting everything in the memorystore. However our + // mock BlockEvictionHandler will throw an exception -- make sure all locks are cleared. + logWarning("trying to store large block") + val largeBlockId = BlockId(s"rdd_2_1") + val largeBlockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false) + val initialWriteLock = blockInfoManager.lockNewBlockForWriting(largeBlockId, largeBlockInfo) + assert(initialWriteLock) + val exc = intercept[RuntimeException] { + memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => { + new ChunkedByteBuffer(ByteBuffer.allocate(100)) + }) + } + assert(exc.getMessage().startsWith("Mock error dropping block")) + // BlockManager.doPut takes care of releasing the lock for the newly written block -- not + // testing that here, so do it manually + blockInfoManager.removeBlock(largeBlockId) + + assert(blockInfoManager.size === (10 - failAfterDroppingNBlocks)) + + val blocksStillInMemory = blockInfoManager.entries.filter { case (id, info) => + assert(info.writerTask === BlockInfo.NO_WRITER, id) + assert(info.readerCount === 0, id) + memoryStore.contains(id) + } + assert(blocksStillInMemory.size === (10 - failAfterDroppingNBlocks)) + } + + testWithFailureOnNthDrop(0) + testWithFailureOnNthDrop(3) + } } From dfd125833f887dcf0e72e8d5be95ba9cedea4ac2 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 22 Sep 2017 13:02:42 -0500 Subject: [PATCH 2/4] handle race if another thread grabs lock after block is dropped --- .../spark/storage/memory/MemoryStore.scala | 13 ++-- .../spark/storage/MemoryStoreSuite.scala | 72 ++++++++++++++----- 2 files changed, 65 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 4ca939fa466c3..33f447a6035b8 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -544,7 +544,7 @@ private[spark] class MemoryStore( } if (freedMemory >= space) { - var exceptionWasThrown: Boolean = true + val successfulBlocks = ArrayBuffer[BlockId]() try { logInfo(s"${selectedBlocks.size} blocks selected for dropping " + s"(${Utils.bytesToString(freedMemory)} bytes)") @@ -557,17 +557,19 @@ private[spark] class MemoryStore( // future safety. if (entry != null) { dropBlock(blockId, entry) + afterDropAction(blockId) } + successfulBlocks += blockId } - exceptionWasThrown = false logInfo(s"After dropping ${selectedBlocks.size} blocks, " + s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}") freedMemory } finally { // like BlockManager.doPut, we use a finally rather than a catch to avoid having to deal // with InterruptedException - if (exceptionWasThrown) { - selectedBlocks.foreach { id => + if (successfulBlocks.size != selectedBlocks.size) { + val blocksToClean = selectedBlocks -- successfulBlocks + blocksToClean.foreach { id => // some of the blocks may have already been unlocked, or completely removed blockInfoManager.get(id).foreach { info => if (info.readerCount > 0 || info.writerTask != BlockInfo.NO_WRITER) { @@ -589,6 +591,9 @@ private[spark] class MemoryStore( } } + // hook for testing, so we can simulate a race + protected def afterDropAction(blockId: BlockId): Unit = {} + def contains(blockId: BlockId): Boolean = { entries.synchronized { entries.containsKey(blockId) } } diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala index ffdf212df1735..8ac221a88472d 100644 --- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala @@ -413,9 +413,11 @@ class MemoryStoreSuite // blocks getting evicted. We'll make the eviction throw an exception, and make sure that // all locks are released. val ct = implicitly[ClassTag[Array[Byte]]] - def testWithFailureOnNthDrop(failAfterDroppingNBlocks: Int): Unit = { + def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, readLockAfterDrop: Boolean): Unit = { + val tc = TaskContext.empty() val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, numCores = 1) val blockInfoManager = new BlockInfoManager + blockInfoManager.registerTask(tc.taskAttemptId) var droppedSoFar = 0 val blockEvictionHandler = new BlockEvictionHandler { var memoryStore: MemoryStore = _ @@ -426,14 +428,30 @@ class MemoryStoreSuite if (droppedSoFar < failAfterDroppingNBlocks) { droppedSoFar += 1 memoryStore.remove(blockId) - StorageLevel.NONE + if (readLockAfterDrop) { + // for testing purposes, we act like another thread gets the read lock on the new + // block + StorageLevel.DISK_ONLY + } else { + StorageLevel.NONE + } } else { throw new RuntimeException(s"Mock error dropping block $droppedSoFar") } } } - val memoryStore = - new MemoryStore(conf, blockInfoManager, serializerManager, memManager, blockEvictionHandler) + val memoryStore = new MemoryStore(conf, blockInfoManager, serializerManager, memManager, + blockEvictionHandler) { + override def afterDropAction(blockId: BlockId): Unit = { + if (readLockAfterDrop) { + // pretend that we get a read lock on the block (now on disk) in another thread + TaskContext.setTaskContext(tc) + blockInfoManager.lockForReading(blockId) + TaskContext.unset() + } + } + } + blockEvictionHandler.memoryStore = memoryStore memManager.setMemoryStore(memoryStore) @@ -454,32 +472,54 @@ class MemoryStoreSuite // Add one big block, which will require evicting everything in the memorystore. However our // mock BlockEvictionHandler will throw an exception -- make sure all locks are cleared. - logWarning("trying to store large block") val largeBlockId = BlockId(s"rdd_2_1") val largeBlockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false) val initialWriteLock = blockInfoManager.lockNewBlockForWriting(largeBlockId, largeBlockInfo) assert(initialWriteLock) - val exc = intercept[RuntimeException] { + if (failAfterDroppingNBlocks < 10) { + val exc = intercept[RuntimeException] { + memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => { + new ChunkedByteBuffer(ByteBuffer.allocate(100)) + }) + } + assert(exc.getMessage().startsWith("Mock error dropping block"), exc) + // BlockManager.doPut takes care of releasing the lock for the newly written block -- not + // testing that here, so do it manually + blockInfoManager.removeBlock(largeBlockId) + } else { memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => { new ChunkedByteBuffer(ByteBuffer.allocate(100)) }) + // BlockManager.doPut takes care of releasing the lock for the newly written block -- not + // testing that here, so do it manually + blockInfoManager.unlock(largeBlockId) } - assert(exc.getMessage().startsWith("Mock error dropping block")) - // BlockManager.doPut takes care of releasing the lock for the newly written block -- not - // testing that here, so do it manually - blockInfoManager.removeBlock(largeBlockId) - assert(blockInfoManager.size === (10 - failAfterDroppingNBlocks)) + val largeBlockInMemory = if (failAfterDroppingNBlocks == 10) 1 else 0 + val expBlocks = 10 + + (if (readLockAfterDrop) 0 else -failAfterDroppingNBlocks) + + largeBlockInMemory + assert(blockInfoManager.size === expBlocks) val blocksStillInMemory = blockInfoManager.entries.filter { case (id, info) => assert(info.writerTask === BlockInfo.NO_WRITER, id) - assert(info.readerCount === 0, id) - memoryStore.contains(id) + // in this test, all the blocks in memory have no reader, but everything dropped to disk + // had another thread read the block. We shouldn't lose the other thread's reader lock. + if (memoryStore.contains(id)) { + assert(info.readerCount === 0, id) + true + } else { + assert(info.readerCount === 1, id) + false + } } - assert(blocksStillInMemory.size === (10 - failAfterDroppingNBlocks)) + assert(blocksStillInMemory.size === (10 - failAfterDroppingNBlocks + largeBlockInMemory)) } - testWithFailureOnNthDrop(0) - testWithFailureOnNthDrop(3) + Seq(0, 3, 10).foreach { failAfterDropping => + Seq(true, false).foreach { readLockAfterDropping => + testFailureOnNthDrop(failAfterDropping, readLockAfterDropping) + } + } } } From 7ce162d133daabf247f65a0c18d333b28cbdb645 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 22 Sep 2017 13:45:15 -0500 Subject: [PATCH 3/4] cleanup --- .../spark/storage/memory/MemoryStore.scala | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 33f447a6035b8..651e9c7b2ab61 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -544,11 +544,12 @@ private[spark] class MemoryStore( } if (freedMemory >= space) { - val successfulBlocks = ArrayBuffer[BlockId]() + var lastSuccessfulBlock = -1 try { logInfo(s"${selectedBlocks.size} blocks selected for dropping " + s"(${Utils.bytesToString(freedMemory)} bytes)") - for (blockId <- selectedBlocks) { + (0 until selectedBlocks.size).foreach { idx => + val blockId = selectedBlocks(idx) val entry = entries.synchronized { entries.get(blockId) } @@ -559,7 +560,7 @@ private[spark] class MemoryStore( dropBlock(blockId, entry) afterDropAction(blockId) } - successfulBlocks += blockId + lastSuccessfulBlock = idx } logInfo(s"After dropping ${selectedBlocks.size} blocks, " + s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}") @@ -567,15 +568,11 @@ private[spark] class MemoryStore( } finally { // like BlockManager.doPut, we use a finally rather than a catch to avoid having to deal // with InterruptedException - if (successfulBlocks.size != selectedBlocks.size) { - val blocksToClean = selectedBlocks -- successfulBlocks - blocksToClean.foreach { id => - // some of the blocks may have already been unlocked, or completely removed - blockInfoManager.get(id).foreach { info => - if (info.readerCount > 0 || info.writerTask != BlockInfo.NO_WRITER) { - blockInfoManager.unlock(id) - } - } + if (lastSuccessfulBlock != selectedBlocks.size - 1) { + // the blocks we didn't process successfully are still locked, so we have to unlock them + (lastSuccessfulBlock + 1 until selectedBlocks.size).foreach { idx => + val blockId = selectedBlocks(idx) + blockInfoManager.unlock(blockId) } } } From dcb7f73939af52e25fb1aae77062e8546aa92ec8 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 25 Sep 2017 10:46:07 -0500 Subject: [PATCH 4/4] review feedback --- .../spark/storage/MemoryStoreSuite.scala | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala index 8ac221a88472d..7274072e5049a 100644 --- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala @@ -413,9 +413,12 @@ class MemoryStoreSuite // blocks getting evicted. We'll make the eviction throw an exception, and make sure that // all locks are released. val ct = implicitly[ClassTag[Array[Byte]]] - def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, readLockAfterDrop: Boolean): Unit = { + val numInitialBlocks = 10 + val memStoreSize = 100 + val bytesPerSmallBlock = memStoreSize / numInitialBlocks + def testFailureOnNthDrop(numValidBlocks: Int, readLockAfterDrop: Boolean): Unit = { val tc = TaskContext.empty() - val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, numCores = 1) + val memManager = new StaticMemoryManager(conf, Long.MaxValue, memStoreSize, numCores = 1) val blockInfoManager = new BlockInfoManager blockInfoManager.registerTask(tc.taskAttemptId) var droppedSoFar = 0 @@ -425,7 +428,7 @@ class MemoryStoreSuite override private[storage] def dropFromMemory[T: ClassTag]( blockId: BlockId, data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { - if (droppedSoFar < failAfterDroppingNBlocks) { + if (droppedSoFar < numValidBlocks) { droppedSoFar += 1 memoryStore.remove(blockId) if (readLockAfterDrop) { @@ -456,18 +459,18 @@ class MemoryStoreSuite memManager.setMemoryStore(memoryStore) // Put in some small blocks to fill up the memory store - val initialBlocks = (1 to 10).map { id => + val initialBlocks = (1 to numInitialBlocks).map { id => val blockId = BlockId(s"rdd_1_$id") val blockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false) val initialWriteLock = blockInfoManager.lockNewBlockForWriting(blockId, blockInfo) assert(initialWriteLock) - val success = memoryStore.putBytes(blockId, 10, MemoryMode.ON_HEAP, () => { - new ChunkedByteBuffer(ByteBuffer.allocate(10)) + val success = memoryStore.putBytes(blockId, bytesPerSmallBlock, MemoryMode.ON_HEAP, () => { + new ChunkedByteBuffer(ByteBuffer.allocate(bytesPerSmallBlock)) }) assert(success) blockInfoManager.unlock(blockId, None) } - assert(blockInfoManager.size === 10) + assert(blockInfoManager.size === numInitialBlocks) // Add one big block, which will require evicting everything in the memorystore. However our @@ -476,10 +479,10 @@ class MemoryStoreSuite val largeBlockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false) val initialWriteLock = blockInfoManager.lockNewBlockForWriting(largeBlockId, largeBlockInfo) assert(initialWriteLock) - if (failAfterDroppingNBlocks < 10) { + if (numValidBlocks < numInitialBlocks) { val exc = intercept[RuntimeException] { - memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => { - new ChunkedByteBuffer(ByteBuffer.allocate(100)) + memoryStore.putBytes(largeBlockId, memStoreSize, MemoryMode.ON_HEAP, () => { + new ChunkedByteBuffer(ByteBuffer.allocate(memStoreSize)) }) } assert(exc.getMessage().startsWith("Mock error dropping block"), exc) @@ -487,17 +490,17 @@ class MemoryStoreSuite // testing that here, so do it manually blockInfoManager.removeBlock(largeBlockId) } else { - memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => { - new ChunkedByteBuffer(ByteBuffer.allocate(100)) + memoryStore.putBytes(largeBlockId, memStoreSize, MemoryMode.ON_HEAP, () => { + new ChunkedByteBuffer(ByteBuffer.allocate(memStoreSize)) }) // BlockManager.doPut takes care of releasing the lock for the newly written block -- not // testing that here, so do it manually blockInfoManager.unlock(largeBlockId) } - val largeBlockInMemory = if (failAfterDroppingNBlocks == 10) 1 else 0 - val expBlocks = 10 + - (if (readLockAfterDrop) 0 else -failAfterDroppingNBlocks) + + val largeBlockInMemory = if (numValidBlocks == numInitialBlocks) 1 else 0 + val expBlocks = numInitialBlocks + + (if (readLockAfterDrop) 0 else -numValidBlocks) + largeBlockInMemory assert(blockInfoManager.size === expBlocks) @@ -513,10 +516,11 @@ class MemoryStoreSuite false } } - assert(blocksStillInMemory.size === (10 - failAfterDroppingNBlocks + largeBlockInMemory)) + assert(blocksStillInMemory.size === + (numInitialBlocks - numValidBlocks + largeBlockInMemory)) } - Seq(0, 3, 10).foreach { failAfterDropping => + Seq(0, 3, numInitialBlocks).foreach { failAfterDropping => Seq(true, false).foreach { readLockAfterDropping => testFailureOnNthDrop(failAfterDropping, readLockAfterDropping) }