From 39a0c287fc450c03a7fd780e6c0197d4ba622796 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 6 Oct 2014 21:41:40 -0700 Subject: [PATCH 1/3] Log more detail when unrolling a block fails --- .../scala/org/apache/spark/CacheManager.scala | 2 - .../apache/spark/storage/MemoryStore.scala | 46 ++++++++++++++++--- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index f8584b90cabe6..d89bb50076c9a 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -168,8 +168,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { arr.iterator.asInstanceOf[Iterator[T]] case Right(it) => // There is not enough space to cache this partition in memory - logWarning(s"Not enough space to cache partition $key in memory! " + - s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.") val returnValues = it.asInstanceOf[Iterator[T]] if (putLevel.useDisk) { logWarning(s"Persisting partition $key to disk instead.") diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 0a09c24d61879..6e076d6a76e5f 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -132,8 +132,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) PutResult(res.size, res.data, droppedBlocks) case Right(iteratorValues) => // Not enough space to unroll this block; drop to disk if applicable - logWarning(s"Not enough space to store block $blockId in memory! " + - s"Free memory is $freeMemory bytes.") if (level.useDisk && allowPersistToDisk) { logWarning(s"Persisting block $blockId to disk instead.") val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues) @@ -265,6 +263,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) Left(vector.toArray) } else { // We ran out of space while unrolling the values for this block + logUnrollFailureMessage(blockId, vector.estimateSize()) Right(vector.iterator ++ values) } @@ -424,7 +423,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * Reserve additional memory for unrolling blocks used by this thread. * Return whether the request is granted. */ - private[spark] def reserveUnrollMemoryForThisThread(memory: Long): Boolean = { + def reserveUnrollMemoryForThisThread(memory: Long): Boolean = { accountingLock.synchronized { val granted = freeMemory > currentUnrollMemory + memory if (granted) { @@ -439,7 +438,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * Release memory used by this thread for unrolling blocks. * If the amount is not specified, remove the current thread's allocation altogether. */ - private[spark] def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = { + def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = { val threadId = Thread.currentThread().getId accountingLock.synchronized { if (memory < 0) { @@ -457,16 +456,51 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) /** * Return the amount of memory currently occupied for unrolling blocks across all threads. */ - private[spark] def currentUnrollMemory: Long = accountingLock.synchronized { + def currentUnrollMemory: Long = accountingLock.synchronized { unrollMemoryMap.values.sum } /** * Return the amount of memory currently occupied for unrolling blocks by this thread. */ - private[spark] def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized { + def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized { unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L) } + + /** + * Return the number of threads currently unrolling blocks. + */ + def numThreadsUnrolling: Int = accountingLock.synchronized { unrollMemoryMap.keys.size } + + /** + * Log information about current memory usage. + */ + def logMemoryUsage(): Unit = { + val blocksMemory = currentMemory + val unrollMemory = currentUnrollMemory + val actualFreeMemory = maxMemory - blocksMemory - unrollMemory + logInfo( + s"Max memory is ${Utils.bytesToString(maxMemory)}, of which " + + s"${Utils.bytesToString(blocksMemory)} is occupied by blocks and " + + s"${Utils.bytesToString(unrollMemory)} is reserved for unrolling blocks " + + s"shared across $numThreadsUnrolling thread(s). " + + s"Total free memory left is ${Utils.bytesToString(actualFreeMemory)}." + ) + } + + /** + * Log a warning for failing to unroll a block. + * + * @param blockId ID of the block we are trying to unroll. + * @param finalVectorSize Final size of the vector before unrolling failed. + */ + def logUnrollFailureMessage(blockId: BlockId, finalVectorSize: Long): Unit = { + logWarning( + s"Not enough space to store partition $blockId in memory " + + s"(unrolled ${Utils.bytesToString(finalVectorSize)} so far)!" + ) + logMemoryUsage() + } } private[spark] case class ResultWithDroppedBlocks( From 5638c49f1a441b338b8998294aacae27300cd522 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 6 Oct 2014 21:48:14 -0700 Subject: [PATCH 2/3] Grammar --- .../src/main/scala/org/apache/spark/storage/MemoryStore.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 6e076d6a76e5f..2bad8fc7d5ab7 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -481,8 +481,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) val actualFreeMemory = maxMemory - blocksMemory - unrollMemory logInfo( s"Max memory is ${Utils.bytesToString(maxMemory)}, of which " + - s"${Utils.bytesToString(blocksMemory)} is occupied by blocks and " + - s"${Utils.bytesToString(unrollMemory)} is reserved for unrolling blocks " + + s"${Utils.bytesToString(blocksMemory)} are occupied by blocks and " + + s"${Utils.bytesToString(unrollMemory)} are reserved for unrolling blocks " + s"shared across $numThreadsUnrolling thread(s). " + s"Total free memory left is ${Utils.bytesToString(actualFreeMemory)}." ) From 28e33d60a65c251e7dbcab9d81d013cf705452a2 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 7 Oct 2014 11:07:26 -0700 Subject: [PATCH 3/3] Shy away from "unrolling" --- .../org/apache/spark/storage/MemoryStore.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 2bad8fc7d5ab7..edbc729c17ade 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -478,13 +478,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) def logMemoryUsage(): Unit = { val blocksMemory = currentMemory val unrollMemory = currentUnrollMemory - val actualFreeMemory = maxMemory - blocksMemory - unrollMemory + val totalMemory = blocksMemory + unrollMemory logInfo( - s"Max memory is ${Utils.bytesToString(maxMemory)}, of which " + - s"${Utils.bytesToString(blocksMemory)} are occupied by blocks and " + - s"${Utils.bytesToString(unrollMemory)} are reserved for unrolling blocks " + - s"shared across $numThreadsUnrolling thread(s). " + - s"Total free memory left is ${Utils.bytesToString(actualFreeMemory)}." + s"Memory use = ${Utils.bytesToString(blocksMemory)} (blocks) + " + + s"${Utils.bytesToString(unrollMemory)} (scratch space shared across " + + s"$numThreadsUnrolling thread(s)) = ${Utils.bytesToString(totalMemory)}. " + + s"Storage limit = ${Utils.bytesToString(maxMemory)}." ) } @@ -496,8 +495,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) */ def logUnrollFailureMessage(blockId: BlockId, finalVectorSize: Long): Unit = { logWarning( - s"Not enough space to store partition $blockId in memory " + - s"(unrolled ${Utils.bytesToString(finalVectorSize)} so far)!" + s"Not enough space to cache $blockId in memory! " + + s"(computed ${Utils.bytesToString(finalVectorSize)} so far)" ) logMemoryUsage() }