Skip to content

Commit

Permalink
Add regression test for storage eviction bug.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Dec 7, 2015
1 parent 012cb4b commit b519fe6
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 43 deletions.
Expand Up @@ -39,6 +39,8 @@ import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, StorageLevel
*/
private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAfterEach {

protected val evictedBlocks = new mutable.ArrayBuffer[(BlockId, BlockStatus)]

import MemoryManagerSuite.DEFAULT_ENSURE_FREE_SPACE_CALLED

// Note: Mockito's verify mechanism does not provide a way to reset method call counts
Expand All @@ -55,6 +57,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft

override def beforeEach(): Unit = {
super.beforeEach()
evictedBlocks.clear()
ensureFreeSpaceCalled.set(DEFAULT_ENSURE_FREE_SPACE_CALLED)
}

Expand All @@ -74,7 +77,18 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
}

/**
* Make an [[Answer]] that stubs [[MemoryStore.ensureFreeSpace]] with the right arguments.
* Make an [[Answer]] that stubs [[MemoryStore.ensureFreeSpace]] and simulates the part of that
* method which releases storage memory.
*
* This is a significant simplification of the real method, which actually drops existing
* blocks based on the size of each block. Instead, here we simply release as many bytes
* as needed to ensure the requested amount of free space. This allows us to set up the
* test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in
* many other dependencies.
*
* Every call to this method will set a global variable, [[ensureFreeSpaceCalled]], that
* records the number of bytes this is called with. This variable is expected to be cleared
* by the test code later through [[assertEnsureFreeSpaceCalled]].
*/
private def ensureFreeSpaceAnswer(mm: MemoryManager, numBytesPos: Int): Answer[Boolean] = {
new Answer[Boolean] {
Expand All @@ -85,44 +99,35 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
require(args(numBytesPos).isInstanceOf[Long], s"bad test: expected ensureFreeSpace " +
s"argument at index $numBytesPos to be a Long: ${args.mkString(", ")}")
val numBytes = args(numBytesPos).asInstanceOf[Long]
val success = mockEnsureFreeSpace(mm, numBytes)
if (success) {
require(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED,
"bad test: ensure free space variable was not reset")
// Record the number of bytes we freed this call
ensureFreeSpaceCalled.set(numBytes)

def freeMemory = mm.maxStorageMemory - mm.storageMemoryUsed

// Fail fast if the block simply won't fit
if (numBytes > mm.maxStorageMemory) {
return false
}

// No need to evict anything if there is already enough free space
if (freeMemory >= numBytes) {
return true
}

val spaceToRelease = numBytes - freeMemory
if (spaceToRelease <= mm.storageMemoryUsed) {
// We can evict enough blocks to fulfill the request for space
mm.releaseStorageMemory(spaceToRelease)
args.last.asInstanceOf[mutable.Buffer[(BlockId, BlockStatus)]].append(
(null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytes, 0L, 0L)))
true
} else {
// No blocks were evicted because eviction would not free enough space.
false
}
success
}
}
}

/**
* Simulate the part of [[MemoryStore.ensureFreeSpace]] that releases storage memory.
*
* This is a significant simplification of the real method, which actually drops existing
* blocks based on the size of each block. Instead, here we simply release as many bytes
* as needed to ensure the requested amount of free space. This allows us to set up the
* test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in
* many other dependencies.
*
* Every call to this method will set a global variable, [[ensureFreeSpaceCalled]], that
* records the number of bytes this is called with. This variable is expected to be cleared
* by the test code later through [[assertEnsureFreeSpaceCalled]].
*/
private def mockEnsureFreeSpace(mm: MemoryManager, numBytes: Long): Boolean = mm.synchronized {
require(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED,
"bad test: ensure free space variable was not reset")
// Record the number of bytes we freed this call
ensureFreeSpaceCalled.set(numBytes)
if (numBytes <= mm.maxStorageMemory) {
def freeMemory = mm.maxStorageMemory - mm.storageMemoryUsed
val spaceToRelease = numBytes - freeMemory
if (spaceToRelease > 0) {
mm.releaseStorageMemory(spaceToRelease)
}
freeMemory >= numBytes
} else {
// We attempted to free more bytes than our max allowable memory
false
}
}

Expand Down
Expand Up @@ -17,16 +17,13 @@

package org.apache.spark.memory

import scala.collection.mutable.ArrayBuffer

import org.mockito.Mockito.when

import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId}
import org.apache.spark.storage.{MemoryStore, TestBlockId}

class StaticMemoryManagerSuite extends MemoryManagerSuite {
private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")
private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

/**
* Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class dependencies.
Expand Down
Expand Up @@ -17,16 +17,13 @@

package org.apache.spark.memory

import scala.collection.mutable.ArrayBuffer

import org.scalatest.PrivateMethodTester

import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId}
import org.apache.spark.storage.{MemoryStore, TestBlockId}

class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTester {
private val dummyBlock = TestBlockId("--")
private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

private val storageFraction: Double = 0.5

Expand Down Expand Up @@ -84,14 +81,18 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
assertEnsureFreeSpaceCalled(ms, 100L)
assert(mm.storageMemoryUsed === 110L)
assert(evictedBlocks.isEmpty)
// Acquire more than the max, not granted
assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, evictedBlocks))
assertEnsureFreeSpaceCalled(ms, maxMemory + 1L)
assert(mm.storageMemoryUsed === 110L)
assert(evictedBlocks.isEmpty)
// Acquire up to the max, requests after this are still granted due to LRU eviction
assert(mm.acquireStorageMemory(dummyBlock, maxMemory, evictedBlocks))
assertEnsureFreeSpaceCalled(ms, 1000L)
assert(mm.storageMemoryUsed === 1000L)
assert(evictedBlocks.nonEmpty)
evictedBlocks.clear()
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
assertEnsureFreeSpaceCalled(ms, 1L)
assert(mm.storageMemoryUsed === 1000L)
Expand Down Expand Up @@ -120,17 +121,21 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assertEnsureFreeSpaceCalled(ms, 750L)
assert(mm.executionMemoryUsed === 0L)
assert(mm.storageMemoryUsed === 750L)
assert(evictedBlocks.isEmpty)
// Execution needs to request 250 bytes to evict storage memory
assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L)
assert(mm.executionMemoryUsed === 100L)
assert(mm.storageMemoryUsed === 750L)
assertEnsureFreeSpaceNotCalled(ms)
assert(evictedBlocks.isEmpty)
// Execution wants 200 bytes but only 150 are free, so storage is evicted
assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L)
assert(mm.executionMemoryUsed === 300L)
assertEnsureFreeSpaceCalled(ms, 50L)
assert(mm.executionMemoryUsed === 300L)
assert(evictedBlocks.nonEmpty)
mm.releaseAllStorageMemory()
evictedBlocks.clear()
require(mm.executionMemoryUsed === 300L)
require(mm.storageMemoryUsed === 0, "bad test: all storage memory should have been released")
// Acquire some storage memory again, but this time keep it within the storage region
Expand All @@ -144,6 +149,23 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assert(mm.executionMemoryUsed === 600L)
assert(mm.storageMemoryUsed === 400L)
assertEnsureFreeSpaceNotCalled(ms)
assert(evictedBlocks.isEmpty)
}

test("execution can evict storage blocks when storage memory is below max mem (SPARK-12155)") {
val maxMemory = 1000L
val taskAttemptId = 0L
val (mm, ms) = makeThings(maxMemory)
// Acquire enough storage memory to exceed the storage region size
assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
assertEnsureFreeSpaceCalled(ms, 750L)
assert(mm.executionMemoryUsed === 0L)
assert(mm.storageMemoryUsed === 750L)
// Should now be able to require up to 500 bytes of memory
assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) === 500L)
assert(mm.storageMemoryUsed === 500L)
assert(mm.executionMemoryUsed === 500L)
assert(evictedBlocks.nonEmpty)
}

test("storage does not evict execution") {
Expand Down

0 comments on commit b519fe6

Please sign in to comment.