From 17b90151a5f853b2cdabdd27bb50e11e54e1fc5e Mon Sep 17 00:00:00 2001 From: hushan Date: Wed, 14 Oct 2015 16:33:57 +0800 Subject: [PATCH] [SPARK-6157][CORE] Unrolling with MEMORY_AND_DISK should always release memory --- .../scala/org/apache/spark/CacheManager.scala | 9 ++- .../apache/spark/storage/MemoryStore.scala | 65 ++++++++++--------- .../spark/storage/BlockManagerSuite.scala | 14 +++- 3 files changed, 52 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 4d20c7369376e..b8ae9bb81f490 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -181,8 +181,15 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { logWarning(s"Persisting partition $key to disk instead.") val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false, useOffHeap = false, deserialized = false, putLevel.replication) - putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel)) + val results = + putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel)) + // After putting this block on disk, we no longer need the array that stored the + // partial values, so we should release the unroll memory occupied + // in the process (SPARK-6157). + blockManager.memoryStore.releasePendingUnrollMemoryForThisTask() + results } else { + blockManager.memoryStore.reservePendingUnrollMemoryForThisTask() returnValues } } diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 4dbac388e098b..16ef74a58fa92 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -43,16 +43,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo private val conf = blockManager.conf private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true) - // A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes) + // A mapping from taskAttemptId to amount of memory used for these unrolled blocks (in bytes) // All accesses of this map are assumed to have manually synchronized on `memoryManager` private val unrollMemoryMap = mutable.HashMap[Long, Long]() - // Same as `unrollMemoryMap`, but for pending unroll memory as defined below. - // Pending unroll memory refers to the intermediate memory occupied by a task - // after the unroll but before the actual putting of the block in the cache. - // This chunk of memory is expected to be released *as soon as* we finish - // caching the corresponding block as opposed to until after the task finishes. - // This is only used if a block is successfully unrolled in its entirety in - // memory (SPARK-4777). + // Refers to the unrolling memory for the current processing block. private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]() // Initial memory to request before unrolling any block @@ -178,8 +172,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo if (level.useDisk && allowPersistToDisk) { logWarning(s"Persisting block $blockId to disk instead.") val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues) + // After putting this block on disk, we no longer need the array that stored the partial + // values, so we should release the unroll memory occupied in the process (SPARK-6157). + releasePendingUnrollMemoryForThisTask() PutResult(res.size, res.data, droppedBlocks) } else { + reservePendingUnrollMemoryForThisTask() PutResult(0, Left(iteratorValues), droppedBlocks) } } @@ -265,7 +263,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // Memory to request as a multiple of current vector size val memoryGrowthFactor = 1.5 // Previous unroll memory held by this task, for releasing later (only at the very end) - val previousMemoryReserved = currentUnrollMemoryForThisTask // Underlying vector for unrolling the block var vector = new SizeTrackingVector[Any] @@ -303,26 +300,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo logUnrollFailureMessage(blockId, vector.estimateSize()) Right(vector.iterator ++ values) } - - } finally { - // If we return an array, the values returned here will be cached in `tryToPut` later. - // In this case, we should release the memory only after we cache the block there. - if (keepUnrolling) { - val taskAttemptId = currentTaskAttemptId() - memoryManager.synchronized { - // Since we continue to hold onto the array until we actually cache it, we cannot - // release the unroll memory yet. Instead, we transfer it to pending unroll memory - // so `tryToPut` can further transfer it to normal storage memory later. - // TODO: we can probably express this without pending unroll memory (SPARK-10907) - val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved - unrollMemoryMap(taskAttemptId) -= amountToTransferToPending - pendingUnrollMemoryMap(taskAttemptId) = - pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToTransferToPending - } - } else { - // Otherwise, if we return an iterator, we can only release the unroll memory when - // the task finishes since we don't know when the iterator will be consumed. - } } } @@ -523,6 +500,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo /** * Reserve memory for unrolling the given block for this task. + * For we have three methods to release for three situation + * 1. Unroll success. We expect to cache this block in `tryToPut`. We do not release and + * re-acquire memory from the MemoryManager in order to avoid race conditions where + * another component steals the memory that we're trying to transfer. (SPARK-4777) + * 2. Unroll failed for memory_disk_level block. We should release memory after putting + * this block into diskStore in order to re-acquire memory for other purpose. (SPARK-6157) + * 3. Unroll failed for memory_only_level block + * We do not release until we finished task. * @return whether the request is granted. */ def reserveUnrollMemoryForThisTask( @@ -533,7 +518,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo val success = memoryManager.acquireUnrollMemory(blockId, memory, droppedBlocks) if (success) { val taskAttemptId = currentTaskAttemptId() - unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory + pendingUnrollMemoryMap(taskAttemptId) = + pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory } success } @@ -560,7 +546,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } /** - * Release pending unroll memory of current unroll successful block used by this task + * Release pending unroll memory of current processing block */ def releasePendingUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = { val taskAttemptId = currentTaskAttemptId() @@ -578,6 +564,20 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } } + /** + * Transfer pending unroll memory to UnrollMemoryMap, release it until task finished + */ + def reservePendingUnrollMemoryForThisTask(): Unit = { + val taskAttemptId = currentTaskAttemptId() + memoryManager.synchronized { + if (pendingUnrollMemoryMap.contains(taskAttemptId)) { + unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + + pendingUnrollMemoryMap(taskAttemptId) + pendingUnrollMemoryMap.remove(taskAttemptId) + } + } + } + /** * Return the amount of memory currently occupied for unrolling blocks across all tasks. */ @@ -589,7 +589,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * Return the amount of memory currently occupied for unrolling blocks by this task. */ def currentUnrollMemoryForThisTask: Long = memoryManager.synchronized { - unrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L) + unrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L) + + pendingUnrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L) } /** diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index f3fab33ca2e31..382a783f43dfa 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1055,10 +1055,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Reserve assert(reserveUnrollMemoryForThisTask(100)) + memoryStore.reservePendingUnrollMemoryForThisTask() assert(memoryStore.currentUnrollMemoryForThisTask === 100) assert(reserveUnrollMemoryForThisTask(200)) + memoryStore.reservePendingUnrollMemoryForThisTask() assert(memoryStore.currentUnrollMemoryForThisTask === 300) assert(reserveUnrollMemoryForThisTask(500)) + memoryStore.reservePendingUnrollMemoryForThisTask() assert(memoryStore.currentUnrollMemoryForThisTask === 800) assert(!reserveUnrollMemoryForThisTask(1000000)) assert(memoryStore.currentUnrollMemoryForThisTask === 800) // not granted @@ -1069,6 +1072,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.currentUnrollMemoryForThisTask === 600) // Reserve again assert(reserveUnrollMemoryForThisTask(4400)) + memoryStore.reservePendingUnrollMemoryForThisTask() assert(memoryStore.currentUnrollMemoryForThisTask === 5000) assert(!reserveUnrollMemoryForThisTask(20000)) assert(memoryStore.currentUnrollMemoryForThisTask === 5000) // not granted @@ -1112,19 +1116,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Unroll with all the space in the world. This should succeed and return an array. var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks) verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) - assert(memoryStore.currentUnrollMemoryForThisTask === 0) + assert(memoryStore.currentUnrollMemoryForThisTask !== 0) memoryStore.releasePendingUnrollMemoryForThisTask() + assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Unroll with not enough space. This should succeed after kicking out someBlock1. store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY) store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY) unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks) verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) - assert(memoryStore.currentUnrollMemoryForThisTask === 0) + assert(memoryStore.currentUnrollMemoryForThisTask !== 0) assert(droppedBlocks.size === 1) assert(droppedBlocks.head._1 === TestBlockId("someBlock1")) droppedBlocks.clear() memoryStore.releasePendingUnrollMemoryForThisTask() + assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 = // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator. @@ -1136,6 +1142,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(droppedBlocks.size === 1) assert(droppedBlocks.head._1 === TestBlockId("someBlock2")) droppedBlocks.clear() + memoryStore.reservePendingUnrollMemoryForThisTask() + assert(memoryStore.currentUnrollMemoryForThisTask > 0) } test("safely unroll blocks through putIterator") { @@ -1233,7 +1241,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(diskStore.contains("b2")) assert(!diskStore.contains("b3")) assert(diskStore.contains("b4")) - assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator + assert(memoryStore.currentUnrollMemoryForThisTask === 0) // we returned an iterator } test("multiple unrolls by the same thread") {