From d885b3baec5eea6bb5de16aa3af571490790e275 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 8 Mar 2016 11:45:50 -0800 Subject: [PATCH 01/23] Make unrollSafely private. --- .../org/apache/spark/storage/memory/MemoryStore.scala | 4 +++- .../org/apache/spark/storage/BlockManagerSuite.scala | 8 +++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index a80b2357ff911..f2063e105e0b9 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -205,7 +205,9 @@ private[spark] class MemoryStore( * This method returns either an array with the contents of the entire block or an iterator * containing the values of the block (if the array would have exceeded available memory). */ - def unrollSafely(blockId: BlockId, values: Iterator[Any]): Either[Array[Any], Iterator[Any]] = { + private def unrollSafely( + blockId: BlockId, + values: Iterator[Any]): Either[Array[Any], Iterator[Any]] = { // Number of elements unrolled so far var elementsUnrolled = 0 diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 42595c8cf2daa..ec2377282dfbc 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1089,7 +1089,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Unroll with all the space in the world. This should succeed and return an array. - var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator) + val unrollSafely = PrivateMethod[Either[Array[Any], Iterator[Any]]]('unrollSafely) + + var unrollResult = memoryStore invokePrivate unrollSafely("unroll", smallList.iterator) verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) assert(memoryStore.currentUnrollMemoryForThisTask === 0) memoryStore.releasePendingUnrollMemoryForThisTask() @@ -1097,7 +1099,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Unroll with not enough space. This should succeed after kicking out someBlock1. store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY) store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY) - unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator) + unrollResult = memoryStore invokePrivate unrollSafely("unroll", smallList.iterator) verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) assert(memoryStore.currentUnrollMemoryForThisTask === 0) assert(memoryStore.contains("someBlock2")) @@ -1108,7 +1110,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator. // In the mean time, however, we kicked out someBlock2 before giving up. store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY) - unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator) + unrollResult = memoryStore invokePrivate unrollSafely("unroll", bigList.iterator) verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false) assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator assert(!memoryStore.contains("someBlock2")) From 5b500fd33b3ce3a91069476b348917d0c5dbedc1 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 8 Mar 2016 11:59:57 -0800 Subject: [PATCH 02/23] Remove unused parameter from releasePendingUnrollMemoryForThisTask --- .../scala/org/apache/spark/storage/memory/MemoryStore.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index f2063e105e0b9..80792f241acff 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -461,11 +461,11 @@ private[spark] class MemoryStore( /** * Release pending unroll memory of current unroll successful block used by this task */ - def releasePendingUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = { + def releasePendingUnrollMemoryForThisTask(): Unit = { val taskAttemptId = currentTaskAttemptId() memoryManager.synchronized { if (pendingUnrollMemoryMap.contains(taskAttemptId)) { - val memoryToRelease = math.min(memory, pendingUnrollMemoryMap(taskAttemptId)) + val memoryToRelease = pendingUnrollMemoryMap(taskAttemptId) if (memoryToRelease > 0) { pendingUnrollMemoryMap(taskAttemptId) -= memoryToRelease if (pendingUnrollMemoryMap(taskAttemptId) == 0) { From e2907874b83c6014b429bd9486375048c05e0768 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 8 Mar 2016 16:54:29 -0800 Subject: [PATCH 03/23] Inline unrollSafely inside of putIterator() --- .../spark/storage/memory/MemoryStore.scala | 183 +++++++++--------- .../spark/storage/BlockManagerSuite.scala | 2 +- 2 files changed, 90 insertions(+), 95 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 80792f241acff..c63ee399e231f 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -121,7 +121,95 @@ private[spark] class MemoryStore( values: Iterator[Any], level: StorageLevel): Either[Iterator[Any], Long] = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") - val unrolledValues = unrollSafely(blockId, values) + /** + * Unroll the given block in memory safely. + * + * The safety of this operation refers to avoiding potential OOM exceptions caused by + * unrolling the entirety of the block in memory at once. This is achieved by periodically + * checking whether the memory restrictions for unrolling blocks are still satisfied, + * stopping immediately if not. This check is a safeguard against the scenario in which + * there is not enough free memory to accommodate the entirety of a single block. + * + * This method returns either an array with the contents of the entire block or an iterator + * containing the values of the block (if the array would have exceeded available memory). + */ + val unrolledValues = { + // Number of elements unrolled so far + var elementsUnrolled = 0 + // Whether there is still enough memory for us to continue unrolling this block + var keepUnrolling = true + // Initial per-task memory to request for unrolling blocks (bytes). Exposed for testing. + val initialMemoryThreshold = unrollMemoryThreshold + // How often to check whether we need to request more memory + val memoryCheckPeriod = 16 + // Memory currently reserved by this task for this particular unrolling operation + var memoryThreshold = initialMemoryThreshold + // Memory to request as a multiple of current vector size + val memoryGrowthFactor = 1.5 + // Keep track of pending unroll memory reserved by this method. + var pendingMemoryReserved = 0L + // Underlying vector for unrolling the block + var vector = new SizeTrackingVector[Any] + + // Request enough memory to begin unrolling + keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold) + + if (!keepUnrolling) { + logWarning(s"Failed to reserve initial memory threshold of " + + s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") + } else { + pendingMemoryReserved += initialMemoryThreshold + } + + // Unroll this block safely, checking whether we have exceeded our threshold periodically + try { + while (values.hasNext && keepUnrolling) { + vector += values.next() + if (elementsUnrolled % memoryCheckPeriod == 0) { + // If our vector's size has exceeded the threshold, request more memory + val currentSize = vector.estimateSize() + if (currentSize >= memoryThreshold) { + val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong + keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest) + if (keepUnrolling) { + pendingMemoryReserved += amountToRequest + } + // New threshold is currentSize * memoryGrowthFactor + memoryThreshold += amountToRequest + } + } + elementsUnrolled += 1 + } + + if (keepUnrolling) { + // We successfully unrolled the entirety of this block + Left(vector.toArray) + } else { + // We ran out of space while unrolling the values for this block + logUnrollFailureMessage(blockId, vector.estimateSize()) + Right(vector.iterator ++ values) + } + + } finally { + // If we return an array, the values returned here will be cached in `tryToPut` later. + // In this case, we should release the memory only after we cache the block there. + if (keepUnrolling) { + val taskAttemptId = currentTaskAttemptId() + memoryManager.synchronized { + // Since we continue to hold onto the array until we actually cache it, we cannot + // release the unroll memory yet. Instead, we transfer it to pending unroll memory + // so `tryToPut` can further transfer it to normal storage memory later. + // TODO: we can probably express this without pending unroll memory (SPARK-10907) + unrollMemoryMap(taskAttemptId) -= pendingMemoryReserved + pendingUnrollMemoryMap(taskAttemptId) = + pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + pendingMemoryReserved + } + } else { + // Otherwise, if we return an iterator, we can only release the unroll memory when + // the task finishes since we don't know when the iterator will be consumed. + } + } + } unrolledValues match { case Left(arrayValues) => // Values are fully unrolled in memory, so store them as an array @@ -193,99 +281,6 @@ private[spark] class MemoryStore( logInfo("MemoryStore cleared") } - /** - * Unroll the given block in memory safely. - * - * The safety of this operation refers to avoiding potential OOM exceptions caused by - * unrolling the entirety of the block in memory at once. This is achieved by periodically - * checking whether the memory restrictions for unrolling blocks are still satisfied, - * stopping immediately if not. This check is a safeguard against the scenario in which - * there is not enough free memory to accommodate the entirety of a single block. - * - * This method returns either an array with the contents of the entire block or an iterator - * containing the values of the block (if the array would have exceeded available memory). - */ - private def unrollSafely( - blockId: BlockId, - values: Iterator[Any]): Either[Array[Any], Iterator[Any]] = { - - // Number of elements unrolled so far - var elementsUnrolled = 0 - // Whether there is still enough memory for us to continue unrolling this block - var keepUnrolling = true - // Initial per-task memory to request for unrolling blocks (bytes). Exposed for testing. - val initialMemoryThreshold = unrollMemoryThreshold - // How often to check whether we need to request more memory - val memoryCheckPeriod = 16 - // Memory currently reserved by this task for this particular unrolling operation - var memoryThreshold = initialMemoryThreshold - // Memory to request as a multiple of current vector size - val memoryGrowthFactor = 1.5 - // Keep track of pending unroll memory reserved by this method. - var pendingMemoryReserved = 0L - // Underlying vector for unrolling the block - var vector = new SizeTrackingVector[Any] - - // Request enough memory to begin unrolling - keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold) - - if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + - s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") - } else { - pendingMemoryReserved += initialMemoryThreshold - } - - // Unroll this block safely, checking whether we have exceeded our threshold periodically - try { - while (values.hasNext && keepUnrolling) { - vector += values.next() - if (elementsUnrolled % memoryCheckPeriod == 0) { - // If our vector's size has exceeded the threshold, request more memory - val currentSize = vector.estimateSize() - if (currentSize >= memoryThreshold) { - val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong - keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest) - if (keepUnrolling) { - pendingMemoryReserved += amountToRequest - } - // New threshold is currentSize * memoryGrowthFactor - memoryThreshold += amountToRequest - } - } - elementsUnrolled += 1 - } - - if (keepUnrolling) { - // We successfully unrolled the entirety of this block - Left(vector.toArray) - } else { - // We ran out of space while unrolling the values for this block - logUnrollFailureMessage(blockId, vector.estimateSize()) - Right(vector.iterator ++ values) - } - - } finally { - // If we return an array, the values returned here will be cached in `tryToPut` later. - // In this case, we should release the memory only after we cache the block there. - if (keepUnrolling) { - val taskAttemptId = currentTaskAttemptId() - memoryManager.synchronized { - // Since we continue to hold onto the array until we actually cache it, we cannot - // release the unroll memory yet. Instead, we transfer it to pending unroll memory - // so `tryToPut` can further transfer it to normal storage memory later. - // TODO: we can probably express this without pending unroll memory (SPARK-10907) - unrollMemoryMap(taskAttemptId) -= pendingMemoryReserved - pendingUnrollMemoryMap(taskAttemptId) = - pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + pendingMemoryReserved - } - } else { - // Otherwise, if we return an iterator, we can only release the unroll memory when - // the task finishes since we don't know when the iterator will be consumed. - } - } - } - /** * Return the RDD ID that a given block ID is from, or None if it is not an RDD block. */ diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index ec2377282dfbc..4172b1d797b80 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1081,7 +1081,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } } - test("safely unroll blocks") { + ignore("safely unroll blocks") { store = makeBlockManager(12000) val smallList = List.fill(40)(new Array[Byte](100)) val bigList = List.fill(40)(new Array[Byte](1000)) From 214349c8ff3adc49670cfeaf416950d515ae9604 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 8 Mar 2016 17:07:20 -0800 Subject: [PATCH 04/23] Inline tryToPut() and remove it. --- .../spark/storage/memory/MemoryStore.scala | 101 ++++++++++-------- 1 file changed, 55 insertions(+), 46 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index c63ee399e231f..7e4df4e1ac201 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -100,13 +100,21 @@ private[spark] class MemoryStore( */ def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Boolean = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") - // Work on a duplicate - since the original input might be used elsewhere. - lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer] - val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false) - if (putSuccess) { + if (acquireStorageMemory(blockId, size)) { + // We acquired enough memory for the block, so go ahead and put it + // Work on a duplicate - since the original input might be used elsewhere. + val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer] assert(bytes.limit == size) + val entry = new MemoryEntry(bytes, size, deserialized = false) + entries.synchronized { + entries.put(blockId, entry) + } + logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format( + blockId, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed))) + true + } else { + false } - putSuccess } /** @@ -215,14 +223,42 @@ private[spark] class MemoryStore( // Values are fully unrolled in memory, so store them as an array if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef]) - if (tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true)) { + val tryToPutResult = { + if (acquireStorageMemory(blockId, sizeEstimate)) { + // We acquired enough memory for the block, so go ahead and put it + val entry = new MemoryEntry(arrayValues, sizeEstimate, deserialized = true) + entries.synchronized { + entries.put(blockId, entry) + } + logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( + blockId, Utils.bytesToString(sizeEstimate), Utils.bytesToString(blocksMemoryUsed))) + true + } else { + false + } + } + if (tryToPutResult) { Right(sizeEstimate) } else { Left(arrayValues.toIterator) } } else { val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) - if (tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)) { + val tryToPutResult = { + if (acquireStorageMemory(blockId, bytes.limit)) { + // We acquired enough memory for the block, so go ahead and put it + val entry = new MemoryEntry(bytes, bytes.limit, deserialized = false) + entries.synchronized { + entries.put(blockId, entry) + } + logInfo("Block %s stored asb bytes in memory (estimated size %s, free %s)".format( + blockId, Utils.bytesToString(bytes.limit), Utils.bytesToString(blocksMemoryUsed))) + true + } else { + false + } + } + if (tryToPutResult) { Right(bytes.limit()) } else { Left(arrayValues.toIterator) @@ -288,45 +324,18 @@ private[spark] class MemoryStore( blockId.asRDDId.map(_.rddId) } - /** - * Try to put in a set of values, if we can free up enough space. The value should either be - * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size - * must also be passed by the caller. - * - * @return whether put was successful. - */ - private def tryToPut( - blockId: BlockId, - value: () => Any, - size: Long, - deserialized: Boolean): Boolean = { - val acquiredEnoughStorageMemory = { - // Synchronize on memoryManager so that the pending unroll memory isn't stolen by another - // task. - memoryManager.synchronized { - // Note: if we have previously unrolled this block successfully, then pending unroll - // memory should be non-zero. This is the amount that we already reserved during the - // unrolling process. In this case, we can just reuse this space to cache our block. - // The synchronization on `memoryManager` here guarantees that the release and acquire - // happen atomically. This relies on the assumption that all memory acquisitions are - // synchronized on the same lock. - releasePendingUnrollMemoryForThisTask() - memoryManager.acquireStorageMemory(blockId, size) - } - } - - if (acquiredEnoughStorageMemory) { - // We acquired enough memory for the block, so go ahead and put it - val entry = new MemoryEntry(value(), size, deserialized) - entries.synchronized { - entries.put(blockId, entry) - } - val valuesOrBytes = if (deserialized) "values" else "bytes" - logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format( - blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed))) - true - } else { - false + private def acquireStorageMemory(blockId: BlockId, size: Long): Boolean = { + // Synchronize on memoryManager so that the pending unroll memory isn't stolen by another + // task. + memoryManager.synchronized { + // Note: if we have previously unrolled this block successfully, then pending unroll + // memory should be non-zero. This is the amount that we already reserved during the + // unrolling process. In this case, we can just reuse this space to cache our block. + // The synchronization on `memoryManager` here guarantees that the release and acquire + // happen atomically. This relies on the assumption that all memory acquisitions are + // synchronized on the same lock. + releasePendingUnrollMemoryForThisTask() + memoryManager.acquireStorageMemory(blockId, size) } } From 451a8fd2cee299853ea15814a3753e341982016e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 8 Mar 2016 17:08:57 -0800 Subject: [PATCH 05/23] Simplify after inlining. --- .../spark/storage/memory/MemoryStore.scala | 44 +++++++------------ 1 file changed, 15 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 7e4df4e1ac201..ca40031a7a2bd 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -223,43 +223,29 @@ private[spark] class MemoryStore( // Values are fully unrolled in memory, so store them as an array if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef]) - val tryToPutResult = { - if (acquireStorageMemory(blockId, sizeEstimate)) { - // We acquired enough memory for the block, so go ahead and put it - val entry = new MemoryEntry(arrayValues, sizeEstimate, deserialized = true) - entries.synchronized { - entries.put(blockId, entry) - } - logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( - blockId, Utils.bytesToString(sizeEstimate), Utils.bytesToString(blocksMemoryUsed))) - true - } else { - false + if (acquireStorageMemory(blockId, sizeEstimate)) { + // We acquired enough memory for the block, so go ahead and put it + val entry = new MemoryEntry(arrayValues, sizeEstimate, deserialized = true) + entries.synchronized { + entries.put(blockId, entry) } - } - if (tryToPutResult) { + logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( + blockId, Utils.bytesToString(sizeEstimate), Utils.bytesToString(blocksMemoryUsed))) Right(sizeEstimate) } else { Left(arrayValues.toIterator) } } else { val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) - val tryToPutResult = { - if (acquireStorageMemory(blockId, bytes.limit)) { - // We acquired enough memory for the block, so go ahead and put it - val entry = new MemoryEntry(bytes, bytes.limit, deserialized = false) - entries.synchronized { - entries.put(blockId, entry) - } - logInfo("Block %s stored asb bytes in memory (estimated size %s, free %s)".format( - blockId, Utils.bytesToString(bytes.limit), Utils.bytesToString(blocksMemoryUsed))) - true - } else { - false + if (acquireStorageMemory(blockId, bytes.limit)) { + // We acquired enough memory for the block, so go ahead and put it + val entry = new MemoryEntry(bytes, bytes.limit, deserialized = false) + entries.synchronized { + entries.put(blockId, entry) } - } - if (tryToPutResult) { - Right(bytes.limit()) + logInfo("Block %s stored asb bytes in memory (estimated size %s, free %s)".format( + blockId, Utils.bytesToString(bytes.limit), Utils.bytesToString(blocksMemoryUsed))) + Right(bytes.limit) } else { Left(arrayValues.toIterator) } From dbb877b187e55c0c35f9511ac9dfa8f7b89c9b82 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 8 Mar 2016 17:16:07 -0800 Subject: [PATCH 06/23] More agressive inlining (comments will be added back later) --- .../spark/storage/memory/MemoryStore.scala | 195 ++++++++---------- 1 file changed, 82 insertions(+), 113 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index ca40031a7a2bd..3017904735a7f 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -129,129 +129,98 @@ private[spark] class MemoryStore( values: Iterator[Any], level: StorageLevel): Either[Iterator[Any], Long] = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") - /** - * Unroll the given block in memory safely. - * - * The safety of this operation refers to avoiding potential OOM exceptions caused by - * unrolling the entirety of the block in memory at once. This is achieved by periodically - * checking whether the memory restrictions for unrolling blocks are still satisfied, - * stopping immediately if not. This check is a safeguard against the scenario in which - * there is not enough free memory to accommodate the entirety of a single block. - * - * This method returns either an array with the contents of the entire block or an iterator - * containing the values of the block (if the array would have exceeded available memory). - */ - val unrolledValues = { - // Number of elements unrolled so far - var elementsUnrolled = 0 - // Whether there is still enough memory for us to continue unrolling this block - var keepUnrolling = true - // Initial per-task memory to request for unrolling blocks (bytes). Exposed for testing. - val initialMemoryThreshold = unrollMemoryThreshold - // How often to check whether we need to request more memory - val memoryCheckPeriod = 16 - // Memory currently reserved by this task for this particular unrolling operation - var memoryThreshold = initialMemoryThreshold - // Memory to request as a multiple of current vector size - val memoryGrowthFactor = 1.5 - // Keep track of pending unroll memory reserved by this method. - var pendingMemoryReserved = 0L - // Underlying vector for unrolling the block - var vector = new SizeTrackingVector[Any] - - // Request enough memory to begin unrolling - keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold) - - if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + - s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") - } else { - pendingMemoryReserved += initialMemoryThreshold - } + // Number of elements unrolled so far + var elementsUnrolled = 0 + // Whether there is still enough memory for us to continue unrolling this block + var keepUnrolling = true + // Initial per-task memory to request for unrolling blocks (bytes). Exposed for testing. + val initialMemoryThreshold = unrollMemoryThreshold + // How often to check whether we need to request more memory + val memoryCheckPeriod = 16 + // Memory currently reserved by this task for this particular unrolling operation + var memoryThreshold = initialMemoryThreshold + // Memory to request as a multiple of current vector size + val memoryGrowthFactor = 1.5 + // Keep track of pending unroll memory reserved by this method. + var pendingMemoryReserved = 0L + // Underlying vector for unrolling the block + var vector = new SizeTrackingVector[Any] + + // Request enough memory to begin unrolling + keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold) + + if (!keepUnrolling) { + logWarning(s"Failed to reserve initial memory threshold of " + + s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") + } else { + pendingMemoryReserved += initialMemoryThreshold + } - // Unroll this block safely, checking whether we have exceeded our threshold periodically - try { - while (values.hasNext && keepUnrolling) { - vector += values.next() - if (elementsUnrolled % memoryCheckPeriod == 0) { - // If our vector's size has exceeded the threshold, request more memory - val currentSize = vector.estimateSize() - if (currentSize >= memoryThreshold) { - val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong - keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest) - if (keepUnrolling) { - pendingMemoryReserved += amountToRequest - } - // New threshold is currentSize * memoryGrowthFactor - memoryThreshold += amountToRequest - } + // Unroll this block safely, checking whether we have exceeded our threshold periodically + while (values.hasNext && keepUnrolling) { + vector += values.next() + if (elementsUnrolled % memoryCheckPeriod == 0) { + // If our vector's size has exceeded the threshold, request more memory + val currentSize = vector.estimateSize() + if (currentSize >= memoryThreshold) { + val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong + keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest) + if (keepUnrolling) { + pendingMemoryReserved += amountToRequest } - elementsUnrolled += 1 - } - - if (keepUnrolling) { - // We successfully unrolled the entirety of this block - Left(vector.toArray) - } else { - // We ran out of space while unrolling the values for this block - logUnrollFailureMessage(blockId, vector.estimateSize()) - Right(vector.iterator ++ values) + // New threshold is currentSize * memoryGrowthFactor + memoryThreshold += amountToRequest } + } + elementsUnrolled += 1 + } - } finally { - // If we return an array, the values returned here will be cached in `tryToPut` later. - // In this case, we should release the memory only after we cache the block there. - if (keepUnrolling) { - val taskAttemptId = currentTaskAttemptId() - memoryManager.synchronized { - // Since we continue to hold onto the array until we actually cache it, we cannot - // release the unroll memory yet. Instead, we transfer it to pending unroll memory - // so `tryToPut` can further transfer it to normal storage memory later. - // TODO: we can probably express this without pending unroll memory (SPARK-10907) - unrollMemoryMap(taskAttemptId) -= pendingMemoryReserved - pendingUnrollMemoryMap(taskAttemptId) = - pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + pendingMemoryReserved + if (keepUnrolling) { + // We successfully unrolled the entirety of this block + val taskAttemptId = currentTaskAttemptId() + memoryManager.synchronized { + // Since we continue to hold onto the array until we actually cache it, we cannot + // release the unroll memory yet. Instead, we transfer it to pending unroll memory + // so `tryToPut` can further transfer it to normal storage memory later. + // TODO: we can probably express this without pending unroll memory (SPARK-10907) + unrollMemoryMap(taskAttemptId) -= pendingMemoryReserved + pendingUnrollMemoryMap(taskAttemptId) = + pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + pendingMemoryReserved + } + val arrayValues = vector.toArray + if (level.deserialized) { + val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef]) + if (acquireStorageMemory(blockId, sizeEstimate)) { + // We acquired enough memory for the block, so go ahead and put it + val entry = new MemoryEntry(arrayValues, sizeEstimate, deserialized = true) + entries.synchronized { + entries.put(blockId, entry) } + logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( + blockId, Utils.bytesToString(sizeEstimate), Utils.bytesToString(blocksMemoryUsed))) + Right(sizeEstimate) } else { - // Otherwise, if we return an iterator, we can only release the unroll memory when - // the task finishes since we don't know when the iterator will be consumed. + Left(arrayValues.toIterator) } - } - } - unrolledValues match { - case Left(arrayValues) => - // Values are fully unrolled in memory, so store them as an array - if (level.deserialized) { - val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef]) - if (acquireStorageMemory(blockId, sizeEstimate)) { - // We acquired enough memory for the block, so go ahead and put it - val entry = new MemoryEntry(arrayValues, sizeEstimate, deserialized = true) - entries.synchronized { - entries.put(blockId, entry) - } - logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( - blockId, Utils.bytesToString(sizeEstimate), Utils.bytesToString(blocksMemoryUsed))) - Right(sizeEstimate) - } else { - Left(arrayValues.toIterator) + } else { + val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) + if (acquireStorageMemory(blockId, bytes.limit)) { + // We acquired enough memory for the block, so go ahead and put it + val entry = new MemoryEntry(bytes, bytes.limit, deserialized = false) + entries.synchronized { + entries.put(blockId, entry) } + logInfo("Block %s stored asb bytes in memory (estimated size %s, free %s)".format( + blockId, Utils.bytesToString(bytes.limit), Utils.bytesToString(blocksMemoryUsed))) + Right(bytes.limit) } else { - val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) - if (acquireStorageMemory(blockId, bytes.limit)) { - // We acquired enough memory for the block, so go ahead and put it - val entry = new MemoryEntry(bytes, bytes.limit, deserialized = false) - entries.synchronized { - entries.put(blockId, entry) - } - logInfo("Block %s stored asb bytes in memory (estimated size %s, free %s)".format( - blockId, Utils.bytesToString(bytes.limit), Utils.bytesToString(blocksMemoryUsed))) - Right(bytes.limit) - } else { - Left(arrayValues.toIterator) - } + Left(arrayValues.toIterator) } - case Right(iteratorValues) => - Left(iteratorValues) + } + } else { + // We ran out of space while unrolling the values for this block + logUnrollFailureMessage(blockId, vector.estimateSize()) + Left(vector.iterator ++ values) } } From e673bef2f47b184b5be509a1a24c4393b7fd0fcd Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 8 Mar 2016 17:54:29 -0800 Subject: [PATCH 07/23] Inline acquireStorageMemory. --- .../spark/storage/memory/MemoryStore.scala | 49 ++++++++++++------- 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 3017904735a7f..1507477d3cbea 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -100,7 +100,7 @@ private[spark] class MemoryStore( */ def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Boolean = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") - if (acquireStorageMemory(blockId, size)) { + if (memoryManager.acquireStorageMemory(blockId, size)) { // We acquired enough memory for the block, so go ahead and put it // Work on a duplicate - since the original input might be used elsewhere. val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer] @@ -190,7 +190,21 @@ private[spark] class MemoryStore( val arrayValues = vector.toArray if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef]) - if (acquireStorageMemory(blockId, sizeEstimate)) { + val acquiredStorageMemory = { + // Synchronize on memoryManager so that the pending unroll memory isn't stolen by another + // task. + memoryManager.synchronized { + // Note: if we have previously unrolled this block successfully, then pending unroll + // memory should be non-zero. This is the amount that we already reserved during the + // unrolling process. In this case, we can just reuse this space to cache our block. + // The synchronization on `memoryManager` here guarantees that the release and acquire + // happen atomically. This relies on the assumption that all memory acquisitions are + // synchronized on the same lock. + releasePendingUnrollMemoryForThisTask() + memoryManager.acquireStorageMemory(blockId, sizeEstimate) + } + } + if (acquiredStorageMemory) { // We acquired enough memory for the block, so go ahead and put it val entry = new MemoryEntry(arrayValues, sizeEstimate, deserialized = true) entries.synchronized { @@ -204,7 +218,21 @@ private[spark] class MemoryStore( } } else { val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) - if (acquireStorageMemory(blockId, bytes.limit)) { + val acquiredStorageMemory = { + // Synchronize on memoryManager so that the pending unroll memory isn't stolen by another + // task. + memoryManager.synchronized { + // Note: if we have previously unrolled this block successfully, then pending unroll + // memory should be non-zero. This is the amount that we already reserved during the + // unrolling process. In this case, we can just reuse this space to cache our block. + // The synchronization on `memoryManager` here guarantees that the release and acquire + // happen atomically. This relies on the assumption that all memory acquisitions are + // synchronized on the same lock. + releasePendingUnrollMemoryForThisTask() + memoryManager.acquireStorageMemory(blockId, bytes.limit) + } + } + if (acquiredStorageMemory) { // We acquired enough memory for the block, so go ahead and put it val entry = new MemoryEntry(bytes, bytes.limit, deserialized = false) entries.synchronized { @@ -279,21 +307,6 @@ private[spark] class MemoryStore( blockId.asRDDId.map(_.rddId) } - private def acquireStorageMemory(blockId: BlockId, size: Long): Boolean = { - // Synchronize on memoryManager so that the pending unroll memory isn't stolen by another - // task. - memoryManager.synchronized { - // Note: if we have previously unrolled this block successfully, then pending unroll - // memory should be non-zero. This is the amount that we already reserved during the - // unrolling process. In this case, we can just reuse this space to cache our block. - // The synchronization on `memoryManager` here guarantees that the release and acquire - // happen atomically. This relies on the assumption that all memory acquisitions are - // synchronized on the same lock. - releasePendingUnrollMemoryForThisTask() - memoryManager.acquireStorageMemory(blockId, size) - } - } - /** * Try to evict blocks to free up a given amount of space to store a particular block. * Can fail if either the block is bigger than our memory or it would require replacing From f87169beb99489e97efbc6731349dfb91fbfdc45 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 8 Mar 2016 18:36:13 -0800 Subject: [PATCH 08/23] Remove pendingUnrollMemory --- .../apache/spark/storage/BlockManager.scala | 14 ++- .../spark/storage/memory/MemoryStore.scala | 90 ++++++------------- .../spark/storage/BlockManagerSuite.scala | 2 - 3 files changed, 35 insertions(+), 71 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 873330e136e22..87f0904e3390f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -426,7 +426,7 @@ private[spark] class BlockManager( if (level.useMemory) { // Cache the values before returning them memoryStore.putIterator(blockId, diskIterator, level) match { - case Left(iter) => + case Left((pendingSize, iter)) => // The memory store put() failed, so it returned the iterator back to us: iter case Right(_) => @@ -443,7 +443,7 @@ private[spark] class BlockManager( // https://issues.apache.org/jira/browse/SPARK-6076 // If the file size is bigger than the free memory, OOM will happen. So if we // cannot put it into MemoryStore, copyForMemory should not be created. That's why - // this action is put into a `() => ByteBuffer` and created lazily. + // this action is put into a `() => ByteBuffer` and created sazily. val copyForMemory = ByteBuffer.allocate(diskBytes.limit) copyForMemory.put(diskBytes) }) @@ -770,7 +770,12 @@ private[spark] class BlockManager( // We will drop it to disk later if the memory store can't hold it. val putSucceeded = if (level.deserialized) { val values = dataDeserialize(blockId, bytes.duplicate()) - memoryStore.putIterator(blockId, values, level).isRight + memoryStore.putIterator(blockId, values, level) match { + case Right(_) => true + case Left((pendingSize, iter)) => + memoryManager.releaseUnrollMemory(pendingSize) + false + } } else { memoryStore.putBytes(blockId, size, () => bytes) } @@ -894,7 +899,7 @@ private[spark] class BlockManager( memoryStore.putIterator(blockId, iterator(), level) match { case Right(s) => size = s - case Left(iter) => + case Left((pendingMemory, iter)) => // Not enough space to unroll this block; drop to disk if applicable if (level.useDisk) { logWarning(s"Persisting block $blockId to disk instead.") @@ -902,6 +907,7 @@ private[spark] class BlockManager( dataSerializeStream(blockId, fileOutputStream, iter) } size = diskStore.getSize(blockId) + memoryStore.releaseUnrollMemoryForThisTask(pendingMemory) } else { iteratorFromFailedMemoryStorePut = Some(iter) } diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 1507477d3cbea..0f916b532f77f 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -49,14 +49,6 @@ private[spark] class MemoryStore( // A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes) // All accesses of this map are assumed to have manually synchronized on `memoryManager` private val unrollMemoryMap = mutable.HashMap[Long, Long]() - // Same as `unrollMemoryMap`, but for pending unroll memory as defined below. - // Pending unroll memory refers to the intermediate memory occupied by a task - // after the unroll but before the actual putting of the block in the cache. - // This chunk of memory is expected to be released *as soon as* we finish - // caching the corresponding block as opposed to until after the task finishes. - // This is only used if a block is successfully unrolled in its entirety in - // memory (SPARK-4777). - private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]() // Initial memory to request before unrolling any block private val unrollMemoryThreshold: Long = @@ -127,7 +119,7 @@ private[spark] class MemoryStore( private[storage] def putIterator( blockId: BlockId, values: Iterator[Any], - level: StorageLevel): Either[Iterator[Any], Long] = { + level: StorageLevel): Either[(Long, Iterator[Any]), Long] = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") // Number of elements unrolled so far var elementsUnrolled = 0 @@ -177,34 +169,23 @@ private[spark] class MemoryStore( if (keepUnrolling) { // We successfully unrolled the entirety of this block - val taskAttemptId = currentTaskAttemptId() - memoryManager.synchronized { - // Since we continue to hold onto the array until we actually cache it, we cannot - // release the unroll memory yet. Instead, we transfer it to pending unroll memory - // so `tryToPut` can further transfer it to normal storage memory later. - // TODO: we can probably express this without pending unroll memory (SPARK-10907) - unrollMemoryMap(taskAttemptId) -= pendingMemoryReserved - pendingUnrollMemoryMap(taskAttemptId) = - pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + pendingMemoryReserved - } val arrayValues = vector.toArray if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef]) val acquiredStorageMemory = { - // Synchronize on memoryManager so that the pending unroll memory isn't stolen by another - // task. - memoryManager.synchronized { - // Note: if we have previously unrolled this block successfully, then pending unroll - // memory should be non-zero. This is the amount that we already reserved during the - // unrolling process. In this case, we can just reuse this space to cache our block. - // The synchronization on `memoryManager` here guarantees that the release and acquire - // happen atomically. This relies on the assumption that all memory acquisitions are - // synchronized on the same lock. - releasePendingUnrollMemoryForThisTask() - memoryManager.acquireStorageMemory(blockId, sizeEstimate) + if (pendingMemoryReserved == sizeEstimate) { + true + } else if (pendingMemoryReserved < sizeEstimate) { + memoryManager.acquireStorageMemory(blockId, sizeEstimate - pendingMemoryReserved) + } else { + memoryManager.releaseStorageMemory(pendingMemoryReserved - sizeEstimate) + true } } if (acquiredStorageMemory) { + memoryManager.synchronized { + unrollMemoryMap(currentTaskAttemptId()) -= pendingMemoryReserved + } // We acquired enough memory for the block, so go ahead and put it val entry = new MemoryEntry(arrayValues, sizeEstimate, deserialized = true) entries.synchronized { @@ -214,25 +195,24 @@ private[spark] class MemoryStore( blockId, Utils.bytesToString(sizeEstimate), Utils.bytesToString(blocksMemoryUsed))) Right(sizeEstimate) } else { - Left(arrayValues.toIterator) + Left((pendingMemoryReserved, arrayValues.toIterator)) } } else { val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) val acquiredStorageMemory = { - // Synchronize on memoryManager so that the pending unroll memory isn't stolen by another - // task. - memoryManager.synchronized { - // Note: if we have previously unrolled this block successfully, then pending unroll - // memory should be non-zero. This is the amount that we already reserved during the - // unrolling process. In this case, we can just reuse this space to cache our block. - // The synchronization on `memoryManager` here guarantees that the release and acquire - // happen atomically. This relies on the assumption that all memory acquisitions are - // synchronized on the same lock. - releasePendingUnrollMemoryForThisTask() - memoryManager.acquireStorageMemory(blockId, bytes.limit) + if (pendingMemoryReserved == bytes.limit) { + true + } else if (pendingMemoryReserved < bytes.limit) { + memoryManager.acquireStorageMemory(blockId, bytes.limit - pendingMemoryReserved) + } else { + memoryManager.releaseStorageMemory(pendingMemoryReserved - bytes.limit) + true } } if (acquiredStorageMemory) { + memoryManager.synchronized { + unrollMemoryMap(currentTaskAttemptId()) -= pendingMemoryReserved + } // We acquired enough memory for the block, so go ahead and put it val entry = new MemoryEntry(bytes, bytes.limit, deserialized = false) entries.synchronized { @@ -242,13 +222,13 @@ private[spark] class MemoryStore( blockId, Utils.bytesToString(bytes.limit), Utils.bytesToString(blocksMemoryUsed))) Right(bytes.limit) } else { - Left(arrayValues.toIterator) + Left((pendingMemoryReserved, arrayValues.toIterator)) } } } else { // We ran out of space while unrolling the values for this block logUnrollFailureMessage(blockId, vector.estimateSize()) - Left(vector.iterator ++ values) + Left(0L, vector.iterator ++ values) } } @@ -295,7 +275,6 @@ private[spark] class MemoryStore( entries.clear() } unrollMemoryMap.clear() - pendingUnrollMemoryMap.clear() memoryManager.releaseAllStorageMemory() logInfo("MemoryStore cleared") } @@ -430,30 +409,11 @@ private[spark] class MemoryStore( } } - /** - * Release pending unroll memory of current unroll successful block used by this task - */ - def releasePendingUnrollMemoryForThisTask(): Unit = { - val taskAttemptId = currentTaskAttemptId() - memoryManager.synchronized { - if (pendingUnrollMemoryMap.contains(taskAttemptId)) { - val memoryToRelease = pendingUnrollMemoryMap(taskAttemptId) - if (memoryToRelease > 0) { - pendingUnrollMemoryMap(taskAttemptId) -= memoryToRelease - if (pendingUnrollMemoryMap(taskAttemptId) == 0) { - pendingUnrollMemoryMap.remove(taskAttemptId) - } - memoryManager.releaseUnrollMemory(memoryToRelease) - } - } - } - } - /** * Return the amount of memory currently occupied for unrolling blocks across all tasks. */ def currentUnrollMemory: Long = memoryManager.synchronized { - unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum + unrollMemoryMap.values.sum } /** diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 4172b1d797b80..98b79d5ede6d4 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1094,7 +1094,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE var unrollResult = memoryStore invokePrivate unrollSafely("unroll", smallList.iterator) verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) assert(memoryStore.currentUnrollMemoryForThisTask === 0) - memoryStore.releasePendingUnrollMemoryForThisTask() // Unroll with not enough space. This should succeed after kicking out someBlock1. store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY) @@ -1104,7 +1103,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.currentUnrollMemoryForThisTask === 0) assert(memoryStore.contains("someBlock2")) assert(!memoryStore.contains("someBlock1")) - memoryStore.releasePendingUnrollMemoryForThisTask() // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 = // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator. From 79aff63fe00fc0f3ebb6d21ed464d5e10f8b9214 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 8 Mar 2016 18:48:55 -0800 Subject: [PATCH 09/23] More cleanup. --- .../spark/storage/memory/MemoryStore.scala | 78 +++++++------------ 1 file changed, 28 insertions(+), 50 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 0f916b532f77f..35da8039dc466 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -170,65 +170,43 @@ private[spark] class MemoryStore( if (keepUnrolling) { // We successfully unrolled the entirety of this block val arrayValues = vector.toArray - if (level.deserialized) { + val entry = if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef]) - val acquiredStorageMemory = { - if (pendingMemoryReserved == sizeEstimate) { - true - } else if (pendingMemoryReserved < sizeEstimate) { - memoryManager.acquireStorageMemory(blockId, sizeEstimate - pendingMemoryReserved) - } else { - memoryManager.releaseStorageMemory(pendingMemoryReserved - sizeEstimate) - true - } - } - if (acquiredStorageMemory) { - memoryManager.synchronized { - unrollMemoryMap(currentTaskAttemptId()) -= pendingMemoryReserved - } - // We acquired enough memory for the block, so go ahead and put it - val entry = new MemoryEntry(arrayValues, sizeEstimate, deserialized = true) - entries.synchronized { - entries.put(blockId, entry) - } - logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( - blockId, Utils.bytesToString(sizeEstimate), Utils.bytesToString(blocksMemoryUsed))) - Right(sizeEstimate) - } else { - Left((pendingMemoryReserved, arrayValues.toIterator)) - } + new MemoryEntry(arrayValues, sizeEstimate, deserialized = true) } else { val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) - val acquiredStorageMemory = { - if (pendingMemoryReserved == bytes.limit) { - true - } else if (pendingMemoryReserved < bytes.limit) { - memoryManager.acquireStorageMemory(blockId, bytes.limit - pendingMemoryReserved) - } else { - memoryManager.releaseStorageMemory(pendingMemoryReserved - bytes.limit) - true - } - } - if (acquiredStorageMemory) { - memoryManager.synchronized { - unrollMemoryMap(currentTaskAttemptId()) -= pendingMemoryReserved - } - // We acquired enough memory for the block, so go ahead and put it - val entry = new MemoryEntry(bytes, bytes.limit, deserialized = false) - entries.synchronized { - entries.put(blockId, entry) - } - logInfo("Block %s stored asb bytes in memory (estimated size %s, free %s)".format( - blockId, Utils.bytesToString(bytes.limit), Utils.bytesToString(blocksMemoryUsed))) - Right(bytes.limit) + new MemoryEntry(bytes, bytes.limit, deserialized = false) + } + val size = entry.size + val acquiredStorageMemory = { + if (pendingMemoryReserved == size) { + true + } else if (pendingMemoryReserved < size) { + memoryManager.acquireStorageMemory(blockId, size - pendingMemoryReserved) } else { - Left((pendingMemoryReserved, arrayValues.toIterator)) + memoryManager.releaseStorageMemory(pendingMemoryReserved - size) + true } } + if (acquiredStorageMemory) { + // We acquired enough memory for the block, so go ahead and put it + memoryManager.synchronized { + unrollMemoryMap(currentTaskAttemptId()) -= pendingMemoryReserved + } + entries.synchronized { + entries.put(blockId, entry) + } + val bytesOrValues = if (level.deserialized) "values" else "bytes" + logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format( + blockId, bytesOrValues, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed))) + Right(size) + } else { + Left((pendingMemoryReserved, arrayValues.toIterator)) + } } else { // We ran out of space while unrolling the values for this block logUnrollFailureMessage(blockId, vector.estimateSize()) - Left(0L, vector.iterator ++ values) + Left(pendingMemoryReserved, vector.iterator ++ values) } } From e1d240a54d6bd0a8731d97a5520d693a7b4e279b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 8 Mar 2016 18:51:34 -0800 Subject: [PATCH 10/23] null out the vector when turning into array --- .../main/scala/org/apache/spark/storage/memory/MemoryStore.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 35da8039dc466..6bb8f46a0bf5c 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -170,6 +170,7 @@ private[spark] class MemoryStore( if (keepUnrolling) { // We successfully unrolled the entirety of this block val arrayValues = vector.toArray + vector = null val entry = if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef]) new MemoryEntry(arrayValues, sizeEstimate, deserialized = true) From d8910b143fb12b2f398ba2a22aeaa35e9cf59753 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 8 Mar 2016 19:00:05 -0800 Subject: [PATCH 11/23] Call releaseUnrollMemory instead of releaseStorageMemory --- .../scala/org/apache/spark/storage/memory/MemoryStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 6bb8f46a0bf5c..beee4fac77d9d 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -185,7 +185,7 @@ private[spark] class MemoryStore( } else if (pendingMemoryReserved < size) { memoryManager.acquireStorageMemory(blockId, size - pendingMemoryReserved) } else { - memoryManager.releaseStorageMemory(pendingMemoryReserved - size) + memoryManager.releaseUnrollMemory(pendingMemoryReserved - size) true } } From 381984a5a3816edddd026709f0c6260d33ee9165 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 9 Mar 2016 11:29:11 -0800 Subject: [PATCH 12/23] Introduce PartiallyUnrolledIterator API. --- .../apache/spark/storage/BlockManager.scala | 22 +++++----- .../spark/storage/memory/MemoryStore.scala | 40 +++++++++++++++++-- 2 files changed, 49 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 87f0904e3390f..c8ece1c5751bd 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -426,7 +426,7 @@ private[spark] class BlockManager( if (level.useMemory) { // Cache the values before returning them memoryStore.putIterator(blockId, diskIterator, level) match { - case Left((pendingSize, iter)) => + case Left(iter) => // The memory store put() failed, so it returned the iterator back to us: iter case Right(_) => @@ -694,8 +694,13 @@ private[spark] class BlockManager( level: StorageLevel, tellMaster: Boolean = true): Boolean = { require(values != null, "Values is null") - // If doPut() didn't hand work back to us, then block already existed or was successfully stored - doPutIterator(blockId, () => values, level, tellMaster).isEmpty + doPutIterator(blockId, () => values, level, tellMaster) match { + case None => + true + case Some(iter) => + iter.close() + false + } } /** @@ -772,8 +777,8 @@ private[spark] class BlockManager( val values = dataDeserialize(blockId, bytes.duplicate()) memoryStore.putIterator(blockId, values, level) match { case Right(_) => true - case Left((pendingSize, iter)) => - memoryManager.releaseUnrollMemory(pendingSize) + case Left(iter) => + iter.close() false } } else { @@ -887,10 +892,10 @@ private[spark] class BlockManager( iterator: () => Iterator[Any], level: StorageLevel, tellMaster: Boolean = true, - keepReadLock: Boolean = false): Option[Iterator[Any]] = { + keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator] = { doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo => val startTimeMs = System.currentTimeMillis - var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None + var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator] = None // Size of the block in bytes var size = 0L if (level.useMemory) { @@ -899,7 +904,7 @@ private[spark] class BlockManager( memoryStore.putIterator(blockId, iterator(), level) match { case Right(s) => size = s - case Left((pendingMemory, iter)) => + case Left(iter) => // Not enough space to unroll this block; drop to disk if applicable if (level.useDisk) { logWarning(s"Persisting block $blockId to disk instead.") @@ -907,7 +912,6 @@ private[spark] class BlockManager( dataSerializeStream(blockId, fileOutputStream, iter) } size = diskStore.getSize(blockId) - memoryStore.releaseUnrollMemoryForThisTask(pendingMemory) } else { iteratorFromFailedMemoryStorePut = Some(iter) } diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index beee4fac77d9d..152904df8626a 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{Logging, SparkConf, TaskContext} import org.apache.spark.memory.MemoryManager import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel} -import org.apache.spark.util.{SizeEstimator, Utils} +import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean) @@ -119,7 +119,7 @@ private[spark] class MemoryStore( private[storage] def putIterator( blockId: BlockId, values: Iterator[Any], - level: StorageLevel): Either[(Long, Iterator[Any]), Long] = { + level: StorageLevel): Either[PartiallyUnrolledIterator, Long] = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") // Number of elements unrolled so far var elementsUnrolled = 0 @@ -202,12 +202,14 @@ private[spark] class MemoryStore( blockId, bytesOrValues, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed))) Right(size) } else { - Left((pendingMemoryReserved, arrayValues.toIterator)) + Left(new PartiallyUnrolledIterator( + this, pendingMemoryReserved, unrolled = arrayValues.toIterator, rest = Iterator.empty)) } } else { // We ran out of space while unrolling the values for this block logUnrollFailureMessage(blockId, vector.estimateSize()) - Left(pendingMemoryReserved, vector.iterator ++ values) + Left(new PartiallyUnrolledIterator( + this, pendingMemoryReserved, unrolled = vector.iterator, rest = values)) } } @@ -433,3 +435,33 @@ private[spark] class MemoryStore( logMemoryUsage() } } + +private[storage] class PartiallyUnrolledIterator( + memoryStore: MemoryStore, + unrollMemory: Long, + unrolled: Iterator[Any], + rest: Iterator[Any]) extends Iterator[Any] { + + private[this] var unrolledIteratorIsCompleted: Boolean = false + private[this] var iter: Iterator[Any] = { + val completionIterator = CompletionIterator[Any, Iterator[Any]](unrolled, { + unrolledIteratorIsCompleted = true + memoryStore.releaseUnrollMemoryForThisTask(unrollMemory) + }) + completionIterator ++ rest + } + + override def hasNext: Boolean = iter.hasNext + override def next(): Any = iter.next() + + /** + * Called to dispose of this iterator when the rest of it will not be consumed. + */ + def close(): Unit = { + if (!unrolledIteratorIsCompleted) { + memoryStore.releaseUnrollMemoryForThisTask(unrollMemory) + unrolledIteratorIsCompleted = true + } + iter = null + } +} From 51341398f4598f255cf8e0904a95b60b65e34310 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 9 Mar 2016 12:08:16 -0800 Subject: [PATCH 13/23] Add more tests. --- .../test/scala/org/apache/spark/storage/BlockManagerSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 98b79d5ede6d4..480780ebb0b5e 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1202,6 +1202,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.contains("b3")) assert(!memoryStore.contains("b4")) assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator + result4.left.get.close() + assert(memoryStore.currentUnrollMemoryForThisTask === 0) // close released the unroll memory } test("multiple unrolls by the same thread") { From c272c59d50b337cc78b6d280dc26a5fb12258180 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 11 Mar 2016 11:11:24 -0800 Subject: [PATCH 14/23] Fix comment typo. --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c8ece1c5751bd..b9c1ac5043bf6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -443,7 +443,7 @@ private[spark] class BlockManager( // https://issues.apache.org/jira/browse/SPARK-6076 // If the file size is bigger than the free memory, OOM will happen. So if we // cannot put it into MemoryStore, copyForMemory should not be created. That's why - // this action is put into a `() => ByteBuffer` and created sazily. + // this action is put into a `() => ByteBuffer` and created lazily. val copyForMemory = ByteBuffer.allocate(diskBytes.limit) copyForMemory.put(diskBytes) }) From 858aea285a12b93c4a31c5ec3c44b42f81ea9be6 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 11 Mar 2016 11:38:23 -0800 Subject: [PATCH 15/23] Re-enable ignored unrollSafely test. --- .../spark/storage/BlockManagerSuite.scala | 77 ++++++++----------- 1 file changed, 33 insertions(+), 44 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 8d274b8398ccf..2e0c0596a75bb 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1065,59 +1065,48 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.currentUnrollMemoryForThisTask === 0) } - /** - * Verify the result of MemoryStore#unrollSafely is as expected. - */ - private def verifyUnroll( - expected: Iterator[Any], - result: Either[Array[Any], Iterator[Any]], - shouldBeArray: Boolean): Unit = { - val actual: Iterator[Any] = result match { - case Left(arr: Array[Any]) => - assert(shouldBeArray, "expected iterator from unroll!") - arr.iterator - case Right(it: Iterator[Any]) => - assert(!shouldBeArray, "expected array from unroll!") - it - case _ => - fail("unroll returned neither an iterator nor an array...") - } - expected.zip(actual).foreach { case (e, a) => - assert(e === a, "unroll did not return original values!") - } - } - - ignore("safely unroll blocks") { + test("safely unroll blocks") { store = makeBlockManager(12000) val smallList = List.fill(40)(new Array[Byte](100)) val bigList = List.fill(40)(new Array[Byte](1000)) val memoryStore = store.memoryStore assert(memoryStore.currentUnrollMemoryForThisTask === 0) - // Unroll with all the space in the world. This should succeed and return an array. - val unrollSafely = PrivateMethod[Either[Array[Any], Iterator[Any]]]('unrollSafely) - - var unrollResult = memoryStore invokePrivate unrollSafely("unroll", smallList.iterator) - verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) + // Unroll with all the space in the world. This should succeed. + var putResult = memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY) + assert(putResult.isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) + smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) => + assert(e === a, "getValues() did not return original values!") + } + assert(memoryStore.remove("unroll")) // Unroll with not enough space. This should succeed after kicking out someBlock1. - store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY) - store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY) - unrollResult = memoryStore invokePrivate unrollSafely("unroll", smallList.iterator) - verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) + assert(store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)) + assert(store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)) + putResult = memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY) + assert(putResult.isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) assert(memoryStore.contains("someBlock2")) assert(!memoryStore.contains("someBlock1")) + smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) => + assert(e === a, "getValues() did not return original values!") + } + assert(memoryStore.remove("unroll")) // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 = // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator. // In the mean time, however, we kicked out someBlock2 before giving up. - store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY) - unrollResult = memoryStore invokePrivate unrollSafely("unroll", bigList.iterator) - verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false) + assert(store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)) + putResult = memoryStore.putIterator("unroll", bigList.iterator, StorageLevel.MEMORY_ONLY) assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator assert(!memoryStore.contains("someBlock2")) + assert(putResult.isLeft) + bigList.iterator.zip(putResult.left.get).foreach { case (e, a) => + assert(e === a, "putIterator() did not return original values!") + } + // The unroll memory was freed once the iterator returned by putIterator() was fully traversed. + assert(memoryStore.currentUnrollMemoryForThisTask === 0) } test("safely unroll blocks through putIterator") { @@ -1220,29 +1209,29 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]] assert(memoryStore.currentUnrollMemoryForThisTask === 0) - // All unroll memory used is released because unrollSafely returned an array - memoryStore.putIterator("b1", smallIterator, memOnly) + // All unroll memory used is released because putIterator did not return an iterator + assert(memoryStore.putIterator("b1", smallIterator, memOnly).isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) - memoryStore.putIterator("b2", smallIterator, memOnly) + assert(memoryStore.putIterator("b2", smallIterator, memOnly).isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) - // Unroll memory is not released because unrollSafely returned an iterator + // Unroll memory is not released because putIterator returned an iterator // that still depends on the underlying vector used in the process - memoryStore.putIterator("b3", smallIterator, memOnly) + assert(memoryStore.putIterator("b3", smallIterator, memOnly).isLeft) val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB3 > 0) // The unroll memory owned by this thread builds on top of its value after the previous unrolls - memoryStore.putIterator("b4", smallIterator, memOnly) + assert(memoryStore.putIterator("b4", smallIterator, memOnly).isLeft) val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB4 > unrollMemoryAfterB3) // ... but only to a certain extent (until we run out of free space to grant new unroll memory) - memoryStore.putIterator("b5", smallIterator, memOnly) + assert(memoryStore.putIterator("b5", smallIterator, memOnly).isLeft) val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask - memoryStore.putIterator("b6", smallIterator, memOnly) + assert(memoryStore.putIterator("b6", smallIterator, memOnly).isLeft) val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask - memoryStore.putIterator("b7", smallIterator, memOnly) + assert(memoryStore.putIterator("b7", smallIterator, memOnly).isLeft) val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB5 === unrollMemoryAfterB4) assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) From ebb25f9d1e82f62dd1b4194e0945af49feb00d64 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 11 Mar 2016 11:57:37 -0800 Subject: [PATCH 16/23] Add back comments. --- .../spark/storage/memory/MemoryStore.scala | 42 ++++++++++++------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 152904df8626a..daec3ebe71072 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -112,20 +112,29 @@ private[spark] class MemoryStore( /** * Attempt to put the given block in memory store. * - * @return the estimated size of the stored data if the put() succeeded, or an iterator - * in case the put() failed (the returned iterator lets callers fall back to the disk - * store if desired). + * @return in case of success, the estimated the estimated size of the stored data. In case of + * failure, return an iterator contianing the values of the block. The returned iterator + * will be backed by the combination of the partially-unrolled block and the remaining + * elements of the original input iterator. The caller must either fully consume this + * iterator or call `close()` on it in order to free the storage memory consumed by the + * partially-unrolled block. */ private[storage] def putIterator( blockId: BlockId, values: Iterator[Any], level: StorageLevel): Either[PartiallyUnrolledIterator, Long] = { + require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") + + // It's possible that the iterator is too large to materialize and store in memory. To avoid + // OOM exceptions, this method will gradually unroll the iterator while periodically checking + // whether there is enough free memory. + // Number of elements unrolled so far var elementsUnrolled = 0 // Whether there is still enough memory for us to continue unrolling this block var keepUnrolling = true - // Initial per-task memory to request for unrolling blocks (bytes). Exposed for testing. + // Initial per-task memory to request for unrolling blocks (bytes). val initialMemoryThreshold = unrollMemoryThreshold // How often to check whether we need to request more memory val memoryCheckPeriod = 16 @@ -133,8 +142,8 @@ private[spark] class MemoryStore( var memoryThreshold = initialMemoryThreshold // Memory to request as a multiple of current vector size val memoryGrowthFactor = 1.5 - // Keep track of pending unroll memory reserved by this method. - var pendingMemoryReserved = 0L + // Keep track of unroll memory used by this particular block / putIterator() operation + var unrollMemoryUsedByThisBlock = 0L // Underlying vector for unrolling the block var vector = new SizeTrackingVector[Any] @@ -145,7 +154,7 @@ private[spark] class MemoryStore( logWarning(s"Failed to reserve initial memory threshold of " + s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") } else { - pendingMemoryReserved += initialMemoryThreshold + unrollMemoryUsedByThisBlock += initialMemoryThreshold } // Unroll this block safely, checking whether we have exceeded our threshold periodically @@ -158,7 +167,7 @@ private[spark] class MemoryStore( val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest) if (keepUnrolling) { - pendingMemoryReserved += amountToRequest + unrollMemoryUsedByThisBlock += amountToRequest } // New threshold is currentSize * memoryGrowthFactor memoryThreshold += amountToRequest @@ -180,19 +189,19 @@ private[spark] class MemoryStore( } val size = entry.size val acquiredStorageMemory = { - if (pendingMemoryReserved == size) { + if (unrollMemoryUsedByThisBlock == size) { true - } else if (pendingMemoryReserved < size) { - memoryManager.acquireStorageMemory(blockId, size - pendingMemoryReserved) + } else if (unrollMemoryUsedByThisBlock < size) { + memoryManager.acquireStorageMemory(blockId, size - unrollMemoryUsedByThisBlock) } else { - memoryManager.releaseUnrollMemory(pendingMemoryReserved - size) + memoryManager.releaseUnrollMemory(unrollMemoryUsedByThisBlock - size) true } } if (acquiredStorageMemory) { // We acquired enough memory for the block, so go ahead and put it memoryManager.synchronized { - unrollMemoryMap(currentTaskAttemptId()) -= pendingMemoryReserved + unrollMemoryMap(currentTaskAttemptId()) -= unrollMemoryUsedByThisBlock } entries.synchronized { entries.put(blockId, entry) @@ -203,13 +212,16 @@ private[spark] class MemoryStore( Right(size) } else { Left(new PartiallyUnrolledIterator( - this, pendingMemoryReserved, unrolled = arrayValues.toIterator, rest = Iterator.empty)) + this, + unrollMemoryUsedByThisBlock, + unrolled = arrayValues.toIterator, + rest = Iterator.empty)) } } else { // We ran out of space while unrolling the values for this block logUnrollFailureMessage(blockId, vector.estimateSize()) Left(new PartiallyUnrolledIterator( - this, pendingMemoryReserved, unrolled = vector.iterator, rest = values)) + this, unrollMemoryUsedByThisBlock, unrolled = vector.iterator, rest = values)) } } From dbca8cf7576dc889349a70fb3c3b29c84f5a4c7c Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 11 Mar 2016 12:24:05 -0800 Subject: [PATCH 17/23] More comments. --- .../spark/storage/memory/MemoryStore.scala | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index daec3ebe71072..d9786a44e1232 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -448,11 +448,20 @@ private[spark] class MemoryStore( } } +/** + * The result of a failed [[MemoryStore.putIterator()]] call. + * + * @param memoryStore the memoryStore, used for freeing memory. + * @param unrollMemory the amount of unroll memory used by the values in `unrolled`. + * @param unrolled an iterator for the partially-unrolled values. + * @param rest the rest of the original iterator passed to [[MemoryStore.putIterator()]]. + */ private[storage] class PartiallyUnrolledIterator( - memoryStore: MemoryStore, - unrollMemory: Long, - unrolled: Iterator[Any], - rest: Iterator[Any]) extends Iterator[Any] { + memoryStore: MemoryStore, + unrollMemory: Long, + unrolled: Iterator[Any], + rest: Iterator[Any]) + extends Iterator[Any] { private[this] var unrolledIteratorIsCompleted: Boolean = false private[this] var iter: Iterator[Any] = { @@ -467,7 +476,7 @@ private[storage] class PartiallyUnrolledIterator( override def next(): Any = iter.next() /** - * Called to dispose of this iterator when the rest of it will not be consumed. + * Called to dispose of this iterator and free its memory. */ def close(): Unit = { if (!unrolledIteratorIsCompleted) { From c5a23a4d4aea88eae3af323501ad224db33d295d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 11 Mar 2016 14:46:56 -0800 Subject: [PATCH 18/23] Renaming and more comments. --- .../apache/spark/storage/BlockManager.scala | 4 +++ .../spark/storage/memory/MemoryStore.scala | 31 +++++++++++-------- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6db50f062a51f..37317e73149f8 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -721,6 +721,8 @@ private[spark] class BlockManager( case None => true case Some(iter) => + // Caller doesn't care about the iterator values, so we can close the iterator here + // to free resources earlier iter.close() false } @@ -801,6 +803,8 @@ private[spark] class BlockManager( memoryStore.putIterator(blockId, values, level) match { case Right(_) => true case Left(iter) => + // If putting deserialized values in memory failed, we will put the bytes directly to + // disk, so we don't need this iterator and can close it to free resources earlier. iter.close() false } diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index d9786a44e1232..2246c0e887fae 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -112,8 +112,14 @@ private[spark] class MemoryStore( /** * Attempt to put the given block in memory store. * + * It's possible that the iterator is too large to materialize and store in memory. To avoid + * OOM exceptions, this method will gradually unroll the iterator while periodically checking + * whether there is enough free memory. If the block is successfully materialized, then the + * temporary unroll memory used during the materialization is "transferred" to storage memory, + * so we won't acquire more memory than is actually needed to store the block. + * * @return in case of success, the estimated the estimated size of the stored data. In case of - * failure, return an iterator contianing the values of the block. The returned iterator + * failure, return an iterator containing the values of the block. The returned iterator * will be backed by the combination of the partially-unrolled block and the remaining * elements of the original input iterator. The caller must either fully consume this * iterator or call `close()` on it in order to free the storage memory consumed by the @@ -126,9 +132,7 @@ private[spark] class MemoryStore( require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") - // It's possible that the iterator is too large to materialize and store in memory. To avoid - // OOM exceptions, this method will gradually unroll the iterator while periodically checking - // whether there is enough free memory. + // Number of elements unrolled so far var elementsUnrolled = 0 @@ -188,17 +192,18 @@ private[spark] class MemoryStore( new MemoryEntry(bytes, bytes.limit, deserialized = false) } val size = entry.size - val acquiredStorageMemory = { - if (unrollMemoryUsedByThisBlock == size) { - true - } else if (unrollMemoryUsedByThisBlock < size) { + // Acquire storage memory if necessary to store this block in memory. If this task + // attempt already owns more unroll memory than is necessary to store the block, + // then release the extra memory that will not be used. + val enoughStorageMemory = { + if (unrollMemoryUsedByThisBlock <= size) { memoryManager.acquireStorageMemory(blockId, size - unrollMemoryUsedByThisBlock) } else { memoryManager.releaseUnrollMemory(unrollMemoryUsedByThisBlock - size) true } } - if (acquiredStorageMemory) { + if (enoughStorageMemory) { // We acquired enough memory for the block, so go ahead and put it memoryManager.synchronized { unrollMemoryMap(currentTaskAttemptId()) -= unrollMemoryUsedByThisBlock @@ -463,10 +468,10 @@ private[storage] class PartiallyUnrolledIterator( rest: Iterator[Any]) extends Iterator[Any] { - private[this] var unrolledIteratorIsCompleted: Boolean = false + private[this] var unrolledIteratorIsFullyConsumed: Boolean = false private[this] var iter: Iterator[Any] = { val completionIterator = CompletionIterator[Any, Iterator[Any]](unrolled, { - unrolledIteratorIsCompleted = true + unrolledIteratorIsFullyConsumed = true memoryStore.releaseUnrollMemoryForThisTask(unrollMemory) }) completionIterator ++ rest @@ -479,9 +484,9 @@ private[storage] class PartiallyUnrolledIterator( * Called to dispose of this iterator and free its memory. */ def close(): Unit = { - if (!unrolledIteratorIsCompleted) { + if (!unrolledIteratorIsFullyConsumed) { memoryStore.releaseUnrollMemoryForThisTask(unrollMemory) - unrolledIteratorIsCompleted = true + unrolledIteratorIsFullyConsumed = true } iter = null } From 8c8fe418e3bb5f56804ffaed9221b213042d20b7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 11 Mar 2016 15:53:30 -0800 Subject: [PATCH 19/23] Use size tracking vector's estimate instead of re-estimating. --- .../org/apache/spark/storage/memory/MemoryStore.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 2246c0e887fae..b0d497d749481 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{Logging, SparkConf, TaskContext} import org.apache.spark.memory.MemoryManager import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel} -import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils} +import org.apache.spark.util.{CompletionIterator, Utils} import org.apache.spark.util.collection.SizeTrackingVector private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean) @@ -132,8 +132,6 @@ private[spark] class MemoryStore( require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") - - // Number of elements unrolled so far var elementsUnrolled = 0 // Whether there is still enough memory for us to continue unrolling this block @@ -183,10 +181,10 @@ private[spark] class MemoryStore( if (keepUnrolling) { // We successfully unrolled the entirety of this block val arrayValues = vector.toArray + val arraySizeEstimate = vector.estimateSize() vector = null val entry = if (level.deserialized) { - val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef]) - new MemoryEntry(arrayValues, sizeEstimate, deserialized = true) + new MemoryEntry(arrayValues, arraySizeEstimate, deserialized = true) } else { val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) new MemoryEntry(bytes, bytes.limit, deserialized = false) From e31cb8001a64517effa45d6377e5c830a8a8a1da Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 11 Mar 2016 15:54:17 -0800 Subject: [PATCH 20/23] unrolledIteratorIsFullyConsumed -> unrolledIteratorIsConsumed --- .../org/apache/spark/storage/memory/MemoryStore.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index b0d497d749481..28d0f7e4c2ab7 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -466,10 +466,10 @@ private[storage] class PartiallyUnrolledIterator( rest: Iterator[Any]) extends Iterator[Any] { - private[this] var unrolledIteratorIsFullyConsumed: Boolean = false + private[this] var unrolledIteratorIsConsumed: Boolean = false private[this] var iter: Iterator[Any] = { val completionIterator = CompletionIterator[Any, Iterator[Any]](unrolled, { - unrolledIteratorIsFullyConsumed = true + unrolledIteratorIsConsumed = true memoryStore.releaseUnrollMemoryForThisTask(unrollMemory) }) completionIterator ++ rest @@ -482,9 +482,9 @@ private[storage] class PartiallyUnrolledIterator( * Called to dispose of this iterator and free its memory. */ def close(): Unit = { - if (!unrolledIteratorIsFullyConsumed) { + if (!unrolledIteratorIsConsumed) { memoryStore.releaseUnrollMemoryForThisTask(unrollMemory) - unrolledIteratorIsFullyConsumed = true + unrolledIteratorIsConsumed = true } iter = null } From bc5c2b93edbe7f6a8f362a335fe1933c43634d98 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 11 Mar 2016 16:26:42 -0800 Subject: [PATCH 21/23] Fixes to memory accounting, including over-release of unroll memory. --- .../spark/storage/memory/MemoryStore.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 28d0f7e4c2ab7..3eb7888670450 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{Logging, SparkConf, TaskContext} import org.apache.spark.memory.MemoryManager import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel} -import org.apache.spark.util.{CompletionIterator, Utils} +import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean) @@ -181,10 +181,9 @@ private[spark] class MemoryStore( if (keepUnrolling) { // We successfully unrolled the entirety of this block val arrayValues = vector.toArray - val arraySizeEstimate = vector.estimateSize() vector = null val entry = if (level.deserialized) { - new MemoryEntry(arrayValues, arraySizeEstimate, deserialized = true) + new MemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues), deserialized = true) } else { val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) new MemoryEntry(bytes, bytes.limit, deserialized = false) @@ -194,17 +193,22 @@ private[spark] class MemoryStore( // attempt already owns more unroll memory than is necessary to store the block, // then release the extra memory that will not be used. val enoughStorageMemory = { - if (unrollMemoryUsedByThisBlock <= size) { + if (unrollMemoryUsedByThisBlock < size) { memoryManager.acquireStorageMemory(blockId, size - unrollMemoryUsedByThisBlock) } else { - memoryManager.releaseUnrollMemory(unrollMemoryUsedByThisBlock - size) true } } if (enoughStorageMemory) { // We acquired enough memory for the block, so go ahead and put it memoryManager.synchronized { - unrollMemoryMap(currentTaskAttemptId()) -= unrollMemoryUsedByThisBlock + val taskAttemptId = currentTaskAttemptId() + releaseUnrollMemoryForThisTask(unrollMemoryUsedByThisBlock) + if (size > unrollMemoryUsedByThisBlock) { + memoryManager.releaseStorageMemory(size - unrollMemoryUsedByThisBlock) + } + val success = memoryManager.acquireStorageMemory(blockId, size) + assert(success, "transferring unroll memory to storage memory failed") } entries.synchronized { entries.put(blockId, entry) From 5b3810cc220522098f5323c26a5e8e74594d2bdf Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 11 Mar 2016 16:53:11 -0800 Subject: [PATCH 22/23] Remove unused line. --- .../main/scala/org/apache/spark/storage/memory/MemoryStore.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 3eb7888670450..6f217d089c0a5 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -202,7 +202,6 @@ private[spark] class MemoryStore( if (enoughStorageMemory) { // We acquired enough memory for the block, so go ahead and put it memoryManager.synchronized { - val taskAttemptId = currentTaskAttemptId() releaseUnrollMemoryForThisTask(unrollMemoryUsedByThisBlock) if (size > unrollMemoryUsedByThisBlock) { memoryManager.releaseStorageMemory(size - unrollMemoryUsedByThisBlock) From 319fb55b5f9b2ecf3cc58ce3ad812e365943bf54 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 14 Mar 2016 11:46:11 -0700 Subject: [PATCH 23/23] Clean up memory transfer code --- .../spark/storage/memory/MemoryStore.scala | 39 ++++++++++++------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 6f217d089c0a5..02d44dc732951 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -189,26 +189,33 @@ private[spark] class MemoryStore( new MemoryEntry(bytes, bytes.limit, deserialized = false) } val size = entry.size - // Acquire storage memory if necessary to store this block in memory. If this task - // attempt already owns more unroll memory than is necessary to store the block, - // then release the extra memory that will not be used. + def transferUnrollToStorage(amount: Long): Unit = { + // Synchronize so that transfer is atomic + memoryManager.synchronized { + releaseUnrollMemoryForThisTask(amount) + val success = memoryManager.acquireStorageMemory(blockId, amount) + assert(success, "transferring unroll memory to storage memory failed") + } + } + // Acquire storage memory if necessary to store this block in memory. val enoughStorageMemory = { - if (unrollMemoryUsedByThisBlock < size) { - memoryManager.acquireStorageMemory(blockId, size - unrollMemoryUsedByThisBlock) - } else { + if (unrollMemoryUsedByThisBlock <= size) { + val acquiredExtra = + memoryManager.acquireStorageMemory(blockId, size - unrollMemoryUsedByThisBlock) + if (acquiredExtra) { + transferUnrollToStorage(unrollMemoryUsedByThisBlock) + } + acquiredExtra + } else { // unrollMemoryUsedByThisBlock > size + // If this task attempt already owns more unroll memory than is necessary to store the + // block, then release the extra memory that will not be used. + val excessUnrollMemory = unrollMemoryUsedByThisBlock - size + releaseUnrollMemoryForThisTask(excessUnrollMemory) + transferUnrollToStorage(size) true } } if (enoughStorageMemory) { - // We acquired enough memory for the block, so go ahead and put it - memoryManager.synchronized { - releaseUnrollMemoryForThisTask(unrollMemoryUsedByThisBlock) - if (size > unrollMemoryUsedByThisBlock) { - memoryManager.releaseStorageMemory(size - unrollMemoryUsedByThisBlock) - } - val success = memoryManager.acquireStorageMemory(blockId, size) - assert(success, "transferring unroll memory to storage memory failed") - } entries.synchronized { entries.put(blockId, entry) } @@ -217,6 +224,8 @@ private[spark] class MemoryStore( blockId, bytesOrValues, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed))) Right(size) } else { + assert(currentUnrollMemoryForThisTask >= currentUnrollMemoryForThisTask, + "released too much unroll memory") Left(new PartiallyUnrolledIterator( this, unrollMemoryUsedByThisBlock,