From 8e7f0993f64901da404b2043f950889c50dcf476 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 6 Dec 2015 16:44:53 -0800 Subject: [PATCH 1/2] Add test for eviction bug w/ multiple active tasks. --- .../memory/UnifiedMemoryManagerSuite.scala | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 71221deeb4c28..76d168c8012f9 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -230,4 +230,31 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(exception.getMessage.contains("larger heap size")) } + test("execution can evict cached blocks when there are multiple active tasks (SPARK-12155)") { + val conf = new SparkConf() + .set("spark.memory.fraction", "1") + .set("spark.memory.storageFraction", "0") + .set("spark.testing.memory", "1000") + val mm = UnifiedMemoryManager(conf, numCores = 2) + val ms = makeMemoryStore(mm) + assert(mm.maxMemory === 1000) + // Have two tasks each acquire some execution memory so that the memory pool registers that + // there are two active tasks: + assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L) + assert(mm.acquireExecutionMemory(100L, 1, MemoryMode.ON_HEAP) === 100L) + // Fill up all of the remaining memory with storage. + for (_ <- 1 to 8) { + assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks)) + assertEnsureFreeSpaceCalled(ms, 100L) + } + assert(mm.storageMemoryUsed === 800) + assert(mm.executionMemoryUsed === 200) + assert(evictedBlocks.isEmpty) + // A task should still be able to allocate 100 bytes execution memory by evicting blocks + assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L) + assertFreeSpaceForExecutionCalled(ms, 100L) + assert(mm.executionMemoryUsed === 300) + assert(mm.storageMemoryUsed === 700) + assert(evictedBlocks.nonEmpty) + } } From f1b1aac1a9374f618244bb4a287c8b873eff09f5 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 9 Dec 2015 15:05:33 -0800 Subject: [PATCH 2/2] Fix regression test so it builds --- .../org/apache/spark/memory/UnifiedMemoryManagerSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 76d168c8012f9..8d79905335f9d 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -245,14 +245,14 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes // Fill up all of the remaining memory with storage. for (_ <- 1 to 8) { assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, 100L) + assertEvictBlocksToFreeSpaceNotCalled(ms) } assert(mm.storageMemoryUsed === 800) assert(mm.executionMemoryUsed === 200) assert(evictedBlocks.isEmpty) // A task should still be able to allocate 100 bytes execution memory by evicting blocks assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L) - assertFreeSpaceForExecutionCalled(ms, 100L) + assertEvictBlocksToFreeSpaceCalled(ms, 100L) assert(mm.executionMemoryUsed === 300) assert(mm.storageMemoryUsed === 700) assert(evictedBlocks.nonEmpty)