Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10985][CORE] Avoid passing evicted blocks throughout BlockManager #10776

Closed
wants to merge 9 commits into from
20 changes: 6 additions & 14 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
Expand Down Expand Up @@ -68,12 +67,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(partition, context)

// Otherwise, cache the values and keep track of any updates in block statuses
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
val metrics = context.taskMetrics
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
// Otherwise, cache the values
val cachedValues = putInBlockManager(key, computedValues, storageLevel)
new InterruptibleIterator(context, cachedValues)

} finally {
Expand Down Expand Up @@ -135,7 +130,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
key: BlockId,
values: Iterator[T],
level: StorageLevel,
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {

val putLevel = effectiveStorageLevel.getOrElse(level)
Expand All @@ -144,8 +138,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
* This RDD is not to be cached in memory, so we can just pass the computed values as an
* iterator directly to the BlockManager rather than first fully unrolling it in memory.
*/
updatedBlocks ++=
blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
blockManager.get(key) match {
case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
Expand All @@ -163,11 +156,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
* single partition. Instead, we unroll the values cautiously, potentially aborting and
* dropping the partition to disk if applicable.
*/
blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
blockManager.memoryStore.unrollSafely(key, values) match {
case Left(arr) =>
// We have successfully unrolled the entire partition, so cache it in memory
updatedBlocks ++=
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
arr.iterator.asInstanceOf[Iterator[T]]
case Right(it) =>
// There is not enough space to cache this partition in memory
Expand All @@ -176,7 +168,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
logWarning(s"Persisting partition $key to disk instead.")
val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This update has been pushed further down into the BlockManager code. See the changes to doPut(). The same holds for the putIterator() call above.

useOffHeap = false, deserialized = false, putLevel.replication)
putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
putInBlockManager[T](key, returnValues, level, Some(diskOnlyLevel))
} else {
returnValues
}
Expand Down
18 changes: 3 additions & 15 deletions core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package org.apache.spark.memory

import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
import org.apache.spark.storage.{BlockId, MemoryStore}
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.memory.MemoryAllocator

Expand Down Expand Up @@ -67,32 +65,22 @@ private[spark] abstract class MemoryManager(
storageMemoryPool.setMemoryStore(store)
}

// TODO: avoid passing evicted blocks around to simplify method signatures (SPARK-10985)

/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
* @return whether all N bytes were successfully granted.
*/
def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean

/**
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
*
* This extra method allows subclasses to differentiate behavior between acquiring storage
* memory and acquiring unroll memory. For instance, the memory management model in Spark
* 1.5 and before places a limit on the amount of space that can be freed from unrolling.
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
*
* @return whether all N bytes were successfully granted.
*/
def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean

/**
* Try to acquire up to `numBytes` of execution memory for the current task and return the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

package org.apache.spark.memory

import scala.collection.mutable

import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockId, BlockStatus}
import org.apache.spark.storage.BlockId

/**
* A [[MemoryManager]] that statically partitions the heap space into disjoint regions.
Expand Down Expand Up @@ -53,24 +51,18 @@ private[spark] class StaticMemoryManager(
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}

override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
if (numBytes > maxStorageMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxStorageMemory bytes)")
false
} else {
storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
storageMemoryPool.acquireMemory(blockId, numBytes)
}
}

override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
val freeMemory = storageMemoryPool.memoryFree
// When unrolling, we will use all of the existing free memory, and, if necessary,
Expand All @@ -80,7 +72,7 @@ private[spark] class StaticMemoryManager(
val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
// Keep it within the range 0 <= X <= maxNumBytesToFree
val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
}

private[memory]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ package org.apache.spark.memory

import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{Logging, TaskContext}
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
import org.apache.spark.Logging
import org.apache.spark.storage.{BlockId, MemoryStore}

/**
* Performs bookkeeping for managing an adjustable-size pool of memory that is used for storage
Expand Down Expand Up @@ -58,15 +55,11 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w

/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
* @return whether all N bytes were successfully granted.
*/
def acquireMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized {
def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
val numBytesToFree = math.max(0, numBytes - memoryFree)
acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
acquireMemory(blockId, numBytes, numBytesToFree)
}

/**
Expand All @@ -80,19 +73,12 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
def acquireMemory(
blockId: BlockId,
numBytesToAcquire: Long,
numBytesToFree: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized {
numBytesToFree: Long): Boolean = lock.synchronized {
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)
if (numBytesToFree > 0) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, evictedBlocks)
// Register evicted blocks, if any, with the active task metrics
Option(TaskContext.get()).foreach { tc =>
val metrics = tc.taskMetrics()
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
}
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree)
}
// NOTE: If the memory store evicts blocks, then those evictions will synchronously call
// back into this StorageMemoryPool in order to free memory. Therefore, these variables
Expand Down Expand Up @@ -129,9 +115,7 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, evictedBlocks)
val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree)
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
decrementPoolSize(spaceFreedByEviction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

package org.apache.spark.memory

import scala.collection.mutable

import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockId, BlockStatus}
import org.apache.spark.storage.BlockId

/**
* A [[MemoryManager]] that enforces a soft boundary between execution and storage such that
Expand Down Expand Up @@ -133,10 +131,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
}
}

override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
assert(numBytes >= 0)
if (numBytes > maxStorageMemory) {
Expand All @@ -152,14 +147,11 @@ private[spark] class UnifiedMemoryManager private[memory] (
onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution)
storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution)
}
storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
storageMemoryPool.acquireMemory(blockId, numBytes)
}

override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
acquireStorageMemory(blockId, numBytes, evictedBlocks)
override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
acquireStorageMemory(blockId, numBytes)
}
}

Expand Down