Skip to content

Commit

Permalink
Add MemoryStore.freeSpaceForExecution() method, which forces blocks t…
Browse files Browse the repository at this point in the history
…o be dropped.

Previously, ensureFreeSpace() might end up not dropping blocks if the total
storage memory pool usage was less than the maximum possible storage pool usage.
  • Loading branch information
JoshRosen committed Dec 7, 2015
1 parent b519fe6 commit 7c68ca0
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 57 deletions.
Expand Up @@ -128,9 +128,11 @@ class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging {
} else {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
memoryStore.ensureFreeSpace(spaceToFree - spaceFreedByReleasingUnusedMemory, evictedBlocks)
memoryStore.freeSpaceForExecution(
spaceToFree - spaceFreedByReleasingUnusedMemory, evictedBlocks)
val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
_memoryUsed -= spaceFreedByEviction
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
decrementPoolSize(spaceFreedByEviction)
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
}
Expand Down
56 changes: 40 additions & 16 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Expand Up @@ -406,16 +406,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}

/**
* Try to free up a given amount of space by evicting existing blocks.
* Try to free up the given amount of storage memory for use by execution by evicting blocks.
*
* @param space the amount of memory to free, in bytes
* @param droppedBlocks a holder for blocks evicted in the process
* @return whether the requested free space is freed.
*/
private[spark] def ensureFreeSpace(
private[spark] def freeSpaceForExecution(
space: Long,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
ensureFreeSpace(None, space, droppedBlocks)
evictBlocksToFreeSpace(None, space, droppedBlocks)
}

/**
Expand Down Expand Up @@ -449,42 +449,66 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
memoryManager.synchronized {
val freeMemory = maxMemory - memoryUsed
val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
var selectedMemory = 0L

logInfo(s"Ensuring $space bytes of free space " +
blockId.map { id => s"for block $id" }.getOrElse("") +
s"(free: $freeMemory, max: $maxMemory)")

// Fail fast if the block simply won't fit
if (space > maxMemory) {
// Fail fast if the block simply won't fit
logInfo("Will not " + blockId.map { id => s"store $id" }.getOrElse("free memory") +
s" as the required space ($space bytes) exceeds our memory limit ($maxMemory bytes)")
return false
}

// No need to evict anything if there is already enough free space
if (freeMemory >= space) {
return true
false
} else if (freeMemory >= space) {
// No need to evict anything if there is already enough free space
true
} else {
// Evict blocks as necessary
evictBlocksToFreeSpace(blockId, freeMemory - space, droppedBlocks)
}
}
}

/**
* Try to evict blocks to free up a given amount of space to store a particular block.
* Can fail if either the block is bigger than our memory or it would require replacing
* another block from the same RDD (which leads to a wasteful cyclic replacement pattern for
* RDDs that don't fit into memory that we want to avoid).
*
* Compared to [[ensureFreeSpace()]], this method will drop blocks without first checking whether
* there is free storage memory which could be used to store a new block; as a result, this
* method should be used when evicting stroage blocks in order to reclaim memory for use by
* execution.
*
* @param blockId the ID of the block we are freeing space for, if any
* @param space the size of this block
* @param droppedBlocks a holder for blocks evicted in the process
* @return whether the requested free space is freed.
*/
private def evictBlocksToFreeSpace(
blockId: Option[BlockId],
space: Long,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
memoryManager.synchronized {
var freedMemory = 0L
val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
entries.synchronized {
val iterator = entries.entrySet().iterator()
while (freeMemory + selectedMemory < space && iterator.hasNext) {
while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
selectedBlocks += blockId
selectedMemory += pair.getValue.size
freedMemory += pair.getValue.size
}
}
}

if (freeMemory + selectedMemory >= space) {
if (freedMemory >= space) {
logInfo(s"${selectedBlocks.size} blocks selected for dropping")
for (blockId <- selectedBlocks) {
val entry = entries.synchronized { entries.get(blockId) }
Expand Down
126 changes: 88 additions & 38 deletions core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
Expand Up @@ -24,7 +24,7 @@ import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}

import org.mockito.Matchers.{any, anyLong}
import org.mockito.Mockito.{mock, when}
import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfterEach
Expand All @@ -41,67 +41,67 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft

protected val evictedBlocks = new mutable.ArrayBuffer[(BlockId, BlockStatus)]

import MemoryManagerSuite.DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED
import MemoryManagerSuite.DEFAULT_ENSURE_FREE_SPACE_CALLED

// Note: Mockito's verify mechanism does not provide a way to reset method call counts
// without also resetting stubbed methods. Since our test code relies on the latter,
// we need to use our own variable to track invocations of `ensureFreeSpace`.
// we need to use our own variable to track invocations of `freeSpaceForUseByExecution` and
// `ensureFreeSpace`.

/**
* The amount of free space requested in the last call to [[MemoryStore.ensureFreeSpace]]
* The amount of free space requested in the last call to [[MemoryStore.freeSpaceForExecution]].
*
* This set whenever [[MemoryStore.ensureFreeSpace]] is called, and cleared when the test
* code makes explicit assertions on this variable through [[assertEnsureFreeSpaceCalled]].
* This set whenever [[MemoryStore.freeSpaceForExecution]] is called, and cleared when the test
* code makes explicit assertions on this variable through
* [[assertFreeSpaceForExecutionCalled]].
*/
private val ensureFreeSpaceCalled = new AtomicLong(DEFAULT_ENSURE_FREE_SPACE_CALLED)
private val freeSpaceForExecutionCalled = new AtomicLong(0)

private val ensureFreeSpaceCalled = new AtomicLong(0)

override def beforeEach(): Unit = {
super.beforeEach()
evictedBlocks.clear()
freeSpaceForExecutionCalled.set(DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED)
ensureFreeSpaceCalled.set(DEFAULT_ENSURE_FREE_SPACE_CALLED)
}

/**
* Make a mocked [[MemoryStore]] whose [[MemoryStore.ensureFreeSpace]] method is stubbed.
* Make a mocked [[MemoryStore]] whose [[MemoryStore.ensureFreeSpace]] and
* [[MemoryStore.freeSpaceForExecution]] methods are stubbed.
*
* This allows our test code to release storage memory when [[MemoryStore.ensureFreeSpace]]
* is called without relying on [[org.apache.spark.storage.BlockManager]] and all of its
* dependencies.
* This allows our test code to release storage memory when these methods are called
* without relying on [[org.apache.spark.storage.BlockManager]] and all of its dependencies.
*/
protected def makeMemoryStore(mm: MemoryManager): MemoryStore = {
val ms = mock(classOf[MemoryStore])
when(ms.ensureFreeSpace(anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm, 0))
when(ms.ensureFreeSpace(any(), anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm, 1))
val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
when(ms.freeSpaceForExecution(anyLong(), any())).thenAnswer(freeSpaceForExecutionAnswer(mm))
when(ms.ensureFreeSpace(any(), anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm))
mm.setMemoryStore(ms)
ms
}

/**
* Make an [[Answer]] that stubs [[MemoryStore.ensureFreeSpace]] and simulates the part of that
* method which releases storage memory.
*
* This is a significant simplification of the real method, which actually drops existing
* blocks based on the size of each block. Instead, here we simply release as many bytes
* as needed to ensure the requested amount of free space. This allows us to set up the
* test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in
* many other dependencies.
*
* Every call to this method will set a global variable, [[ensureFreeSpaceCalled]], that
* records the number of bytes this is called with. This variable is expected to be cleared
* by the test code later through [[assertEnsureFreeSpaceCalled]].
*/
private def ensureFreeSpaceAnswer(mm: MemoryManager, numBytesPos: Int): Answer[Boolean] = {
* Simulate the part of [[MemoryStore.ensureFreeSpace]] that releases storage memory.
*
* This is a significant simplification of the real method, which actually drops existing
* blocks based on the size of each block. Instead, here we simply release as many bytes
* as needed to ensure the requested amount of free space. This allows us to set up the
* test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in
* many other dependencies.
*
* Every call to this method will set a global variable, [[ensureFreeSpaceCalled]], that
* records the number of bytes this is called with. This variable is expected to be cleared
* by the test code later through [[assertEnsureFreeSpaceCalled]].
*/
private def ensureFreeSpaceAnswer(mm: MemoryManager): Answer[Boolean] = {
new Answer[Boolean] {
override def answer(invocation: InvocationOnMock): Boolean = {
val args = invocation.getArguments
require(args.size > numBytesPos, s"bad test: expected >$numBytesPos arguments " +
s"in ensureFreeSpace, found ${args.size}")
require(args(numBytesPos).isInstanceOf[Long], s"bad test: expected ensureFreeSpace " +
s"argument at index $numBytesPos to be a Long: ${args.mkString(", ")}")
val numBytes = args(numBytesPos).asInstanceOf[Long]
val numBytes = args(1).asInstanceOf[Long]
require(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED,
"bad test: ensure free space variable was not reset")
// Record the number of bytes we freed this call
"bad test: ensureFreeSpace() variable was not reset")
ensureFreeSpaceCalled.set(numBytes)

def freeMemory = mm.maxStorageMemory - mm.storageMemoryUsed
Expand Down Expand Up @@ -132,20 +132,69 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
}

/**
* Simulate the part of [[MemoryStore.freeSpaceForExecution]] that releases storage memory.
*
* This is a significant simplification of the real method, which actually drops existing
* blocks based on the size of each block. Instead, here we simply release as many bytes
* as needed to ensure the requested amount of free space. This allows us to set up the
* test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in
* many other dependencies.
*
* Every call to this method will set a global variable, [[freeSpaceForExecutionCalled]], that
* records the number of bytes this is called with. This variable is expected to be cleared
* by the test code later through [[assertFreeSpaceForExecutionCalled]].
*/
private def freeSpaceForExecutionAnswer(mm: MemoryManager): Answer[Boolean] = {
new Answer[Boolean] {
override def answer(invocation: InvocationOnMock): Boolean = {
val args = invocation.getArguments
val numBytes = args(0).asInstanceOf[Long]
require(freeSpaceForExecutionCalled.get() === DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED,
"bad test: freeSpaceForExecution() variable was not reset")
freeSpaceForExecutionCalled.set(numBytes)
assert(numBytes <= mm.storageMemoryUsed)
mm.releaseStorageMemory(numBytes)
args.last.asInstanceOf[mutable.Buffer[(BlockId, BlockStatus)]].append(
(null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytes, 0L, 0L)))
evictedBlocks.append(
(null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytes, 0L, 0L)))
true
}
}
}

/**
* Assert that [[MemoryStore.freeSpaceForExecution]] is called with the given parameters.
*/
protected def assertFreeSpaceForExecutionCalled(ms: MemoryStore, numBytes: Long): Unit = {
assert(freeSpaceForExecutionCalled.get() === numBytes,
s"expected freeSpaceForExecution() to be called with $numBytes")
freeSpaceForExecutionCalled.set(DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED)
}

/**
* Assert that [[MemoryStore.freeSpaceForExecution]] is NOT called.
*/
protected def assertFreeSpaceForExecutionNotCalled[T](ms: MemoryStore): Unit = {
assert(freeSpaceForExecutionCalled.get() === DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED,
"freeSpaceForExecution() should not have been called!")
}

/**
* Assert that [[MemoryStore.ensureFreeSpace]] is called with the given parameters.
*/
protected def assertEnsureFreeSpaceCalled(ms: MemoryStore, numBytes: Long): Unit = {
assert(ensureFreeSpaceCalled.get() === numBytes,
s"expected ensure free space to be called with $numBytes")
ensureFreeSpaceCalled.set(DEFAULT_ENSURE_FREE_SPACE_CALLED)
s"expected ensureFreeSpace() to be called with $numBytes")
ensureFreeSpaceCalled.set(DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED)
}

/**
* Assert that [[MemoryStore.ensureFreeSpace]] is NOT called.
*/
protected def assertEnsureFreeSpaceNotCalled[T](ms: MemoryStore): Unit = {
assert(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED,
"ensure free space should not have been called!")
assert(ensureFreeSpaceCalled.get() === DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED,
"ensureFreeSpace() should not have been called!")
}

/**
Expand Down Expand Up @@ -302,5 +351,6 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
}

private object MemoryManagerSuite {
private val DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED = -1L
private val DEFAULT_ENSURE_FREE_SPACE_CALLED = -1L
}
Expand Up @@ -131,7 +131,8 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
// Execution wants 200 bytes but only 150 are free, so storage is evicted
assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L)
assert(mm.executionMemoryUsed === 300L)
assertEnsureFreeSpaceCalled(ms, 50L)
assertEnsureFreeSpaceNotCalled(ms)
assertFreeSpaceForExecutionCalled(ms, 50L)
assert(mm.executionMemoryUsed === 300L)
assert(evictedBlocks.nonEmpty)
mm.releaseAllStorageMemory()
Expand Down

0 comments on commit 7c68ca0

Please sign in to comment.