Skip to content

Commit

Permalink
[SPARK-10985][CORE] Avoid passing evicted blocks throughout BlockManager
Browse files Browse the repository at this point in the history
This patch refactors portions of the BlockManager and CacheManager in order to avoid having to pass `evictedBlocks` lists throughout the code. It appears that these lists were only consumed by `TaskContext.taskMetrics`, so the new code now directly updates the metrics from the lower-level BlockManager methods.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #10776 from JoshRosen/SPARK-10985.
  • Loading branch information
JoshRosen authored and Andrew Or committed Jan 18, 2016
1 parent 302bb56 commit b8cb548
Show file tree
Hide file tree
Showing 14 changed files with 170 additions and 241 deletions.
20 changes: 6 additions & 14 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
Expand Down Expand Up @@ -68,12 +67,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(partition, context)

// Otherwise, cache the values and keep track of any updates in block statuses
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
val metrics = context.taskMetrics
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
// Otherwise, cache the values
val cachedValues = putInBlockManager(key, computedValues, storageLevel)
new InterruptibleIterator(context, cachedValues)

} finally {
Expand Down Expand Up @@ -135,7 +130,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
key: BlockId,
values: Iterator[T],
level: StorageLevel,
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {

val putLevel = effectiveStorageLevel.getOrElse(level)
Expand All @@ -144,8 +138,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
* This RDD is not to be cached in memory, so we can just pass the computed values as an
* iterator directly to the BlockManager rather than first fully unrolling it in memory.
*/
updatedBlocks ++=
blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
blockManager.get(key) match {
case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
Expand All @@ -163,11 +156,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
* single partition. Instead, we unroll the values cautiously, potentially aborting and
* dropping the partition to disk if applicable.
*/
blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
blockManager.memoryStore.unrollSafely(key, values) match {
case Left(arr) =>
// We have successfully unrolled the entire partition, so cache it in memory
updatedBlocks ++=
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
arr.iterator.asInstanceOf[Iterator[T]]
case Right(it) =>
// There is not enough space to cache this partition in memory
Expand All @@ -176,7 +168,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
logWarning(s"Persisting partition $key to disk instead.")
val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
useOffHeap = false, deserialized = false, putLevel.replication)
putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
putInBlockManager[T](key, returnValues, level, Some(diskOnlyLevel))
} else {
returnValues
}
Expand Down
18 changes: 3 additions & 15 deletions core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package org.apache.spark.memory

import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
import org.apache.spark.storage.{BlockId, MemoryStore}
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.memory.MemoryAllocator

Expand Down Expand Up @@ -67,32 +65,22 @@ private[spark] abstract class MemoryManager(
storageMemoryPool.setMemoryStore(store)
}

// TODO: avoid passing evicted blocks around to simplify method signatures (SPARK-10985)

/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
* @return whether all N bytes were successfully granted.
*/
def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean

/**
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
*
* This extra method allows subclasses to differentiate behavior between acquiring storage
* memory and acquiring unroll memory. For instance, the memory management model in Spark
* 1.5 and before places a limit on the amount of space that can be freed from unrolling.
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
*
* @return whether all N bytes were successfully granted.
*/
def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean

/**
* Try to acquire up to `numBytes` of execution memory for the current task and return the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

package org.apache.spark.memory

import scala.collection.mutable

import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockId, BlockStatus}
import org.apache.spark.storage.BlockId

/**
* A [[MemoryManager]] that statically partitions the heap space into disjoint regions.
Expand Down Expand Up @@ -53,24 +51,18 @@ private[spark] class StaticMemoryManager(
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}

override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
if (numBytes > maxStorageMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxStorageMemory bytes)")
false
} else {
storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
storageMemoryPool.acquireMemory(blockId, numBytes)
}
}

override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
val freeMemory = storageMemoryPool.memoryFree
// When unrolling, we will use all of the existing free memory, and, if necessary,
Expand All @@ -80,7 +72,7 @@ private[spark] class StaticMemoryManager(
val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
// Keep it within the range 0 <= X <= maxNumBytesToFree
val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
}

private[memory]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ package org.apache.spark.memory

import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{Logging, TaskContext}
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
import org.apache.spark.Logging
import org.apache.spark.storage.{BlockId, MemoryStore}

/**
* Performs bookkeeping for managing an adjustable-size pool of memory that is used for storage
Expand Down Expand Up @@ -58,15 +55,11 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w

/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
* @return whether all N bytes were successfully granted.
*/
def acquireMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized {
def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
val numBytesToFree = math.max(0, numBytes - memoryFree)
acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
acquireMemory(blockId, numBytes, numBytesToFree)
}

/**
Expand All @@ -80,19 +73,12 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
def acquireMemory(
blockId: BlockId,
numBytesToAcquire: Long,
numBytesToFree: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized {
numBytesToFree: Long): Boolean = lock.synchronized {
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)
if (numBytesToFree > 0) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, evictedBlocks)
// Register evicted blocks, if any, with the active task metrics
Option(TaskContext.get()).foreach { tc =>
val metrics = tc.taskMetrics()
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
}
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree)
}
// NOTE: If the memory store evicts blocks, then those evictions will synchronously call
// back into this StorageMemoryPool in order to free memory. Therefore, these variables
Expand Down Expand Up @@ -129,9 +115,7 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, evictedBlocks)
val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree)
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
decrementPoolSize(spaceFreedByEviction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

package org.apache.spark.memory

import scala.collection.mutable

import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockId, BlockStatus}
import org.apache.spark.storage.BlockId

/**
* A [[MemoryManager]] that enforces a soft boundary between execution and storage such that
Expand Down Expand Up @@ -133,10 +131,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
}
}

override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
assert(numBytes >= 0)
if (numBytes > maxStorageMemory) {
Expand All @@ -152,14 +147,11 @@ private[spark] class UnifiedMemoryManager private[memory] (
onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution)
storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution)
}
storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
storageMemoryPool.acquireMemory(blockId, numBytes)
}

override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
acquireStorageMemory(blockId, numBytes, evictedBlocks)
override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
acquireStorageMemory(blockId, numBytes)
}
}

Expand Down

0 comments on commit b8cb548

Please sign in to comment.