From 848320fc6b0209da42cd6b20aa379e5fa593cdbd Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 14 Jan 2016 19:45:44 -0800 Subject: [PATCH 1/7] Delete all of the evictedBlocks stuff; will re-add status updates / task metrics later. --- .../scala/org/apache/spark/CacheManager.scala | 20 +-- .../apache/spark/memory/MemoryManager.scala | 18 +-- .../spark/memory/StaticMemoryManager.scala | 18 +-- .../spark/memory/StorageMemoryPool.scala | 30 +--- .../spark/memory/UnifiedMemoryManager.scala | 18 +-- .../apache/spark/storage/BlockManager.scala | 42 ++--- .../apache/spark/storage/MemoryStore.scala | 77 ++++----- .../spark/memory/MemoryManagerSuite.scala | 13 +- .../memory/StaticMemoryManagerSuite.scala | 24 +-- .../spark/memory/TestMemoryManager.scala | 10 +- .../memory/UnifiedMemoryManagerSuite.scala | 30 ++-- .../spark/storage/BlockManagerSuite.scala | 151 +++++++++--------- .../receiver/ReceivedBlockHandler.scala | 8 +- 13 files changed, 187 insertions(+), 272 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 36b536e89c3a4..d92d8b0eef8a0 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -18,7 +18,6 @@ package org.apache.spark import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer import org.apache.spark.rdd.RDD import org.apache.spark.storage._ @@ -68,12 +67,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { logInfo(s"Partition $key not found, computing it") val computedValues = rdd.computeOrReadCheckpoint(partition, context) - // Otherwise, cache the values and keep track of any updates in block statuses - val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks) - val metrics = context.taskMetrics - val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) - metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq) + // Otherwise, cache the values + val cachedValues = putInBlockManager(key, computedValues, storageLevel) new InterruptibleIterator(context, cachedValues) } finally { @@ -135,7 +130,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { key: BlockId, values: Iterator[T], level: StorageLevel, - updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)], effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = { val putLevel = effectiveStorageLevel.getOrElse(level) @@ -144,8 +138,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { * This RDD is not to be cached in memory, so we can just pass the computed values as an * iterator directly to the BlockManager rather than first fully unrolling it in memory. */ - updatedBlocks ++= - blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel) + blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel) blockManager.get(key) match { case Some(v) => v.data.asInstanceOf[Iterator[T]] case None => @@ -163,11 +156,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { * single partition. Instead, we unroll the values cautiously, potentially aborting and * dropping the partition to disk if applicable. */ - blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match { + blockManager.memoryStore.unrollSafely(key, values) match { case Left(arr) => // We have successfully unrolled the entire partition, so cache it in memory - updatedBlocks ++= - blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel) + blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel) arr.iterator.asInstanceOf[Iterator[T]] case Right(it) => // There is not enough space to cache this partition in memory @@ -176,7 +168,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { logWarning(s"Persisting partition $key to disk instead.") val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false, useOffHeap = false, deserialized = false, putLevel.replication) - putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel)) + putInBlockManager[T](key, returnValues, level, Some(diskOnlyLevel)) } else { returnValues } diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index 33f8b9f16c11b..b5adbd88a2c23 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -19,10 +19,8 @@ package org.apache.spark.memory import javax.annotation.concurrent.GuardedBy -import scala.collection.mutable - import org.apache.spark.{Logging, SparkConf} -import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore} +import org.apache.spark.storage.{BlockId, MemoryStore} import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.unsafe.memory.MemoryAllocator @@ -67,17 +65,11 @@ private[spark] abstract class MemoryManager( storageMemoryPool.setMemoryStore(store) } - // TODO: avoid passing evicted blocks around to simplify method signatures (SPARK-10985) - /** * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary. - * Blocks evicted in the process, if any, are added to `evictedBlocks`. * @return whether all N bytes were successfully granted. */ - def acquireStorageMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean + def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean /** * Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary. @@ -85,14 +77,10 @@ private[spark] abstract class MemoryManager( * This extra method allows subclasses to differentiate behavior between acquiring storage * memory and acquiring unroll memory. For instance, the memory management model in Spark * 1.5 and before places a limit on the amount of space that can be freed from unrolling. - * Blocks evicted in the process, if any, are added to `evictedBlocks`. * * @return whether all N bytes were successfully granted. */ - def acquireUnrollMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean + def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean /** * Try to acquire up to `numBytes` of execution memory for the current task and return the diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala index 3554b558f2123..f9f8f820bc49c 100644 --- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala @@ -17,10 +17,8 @@ package org.apache.spark.memory -import scala.collection.mutable - import org.apache.spark.SparkConf -import org.apache.spark.storage.{BlockId, BlockStatus} +import org.apache.spark.storage.BlockId /** * A [[MemoryManager]] that statically partitions the heap space into disjoint regions. @@ -53,24 +51,18 @@ private[spark] class StaticMemoryManager( (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong } - override def acquireStorageMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { + override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized { if (numBytes > maxStorageMemory) { // Fail fast if the block simply won't fit logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + s"memory limit ($maxStorageMemory bytes)") false } else { - storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks) + storageMemoryPool.acquireMemory(blockId, numBytes) } } - override def acquireUnrollMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { + override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized { val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory val freeMemory = storageMemoryPool.memoryFree // When unrolling, we will use all of the existing free memory, and, if necessary, @@ -80,7 +72,7 @@ private[spark] class StaticMemoryManager( val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory) // Keep it within the range 0 <= X <= maxNumBytesToFree val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory)) - storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks) + storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree) } private[memory] diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala index 4036484aada23..6a88966f60d23 100644 --- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala @@ -19,11 +19,8 @@ package org.apache.spark.memory import javax.annotation.concurrent.GuardedBy -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.{Logging, TaskContext} -import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore} +import org.apache.spark.Logging +import org.apache.spark.storage.{BlockId, MemoryStore} /** * Performs bookkeeping for managing an adjustable-size pool of memory that is used for storage @@ -58,15 +55,11 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w /** * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary. - * Blocks evicted in the process, if any, are added to `evictedBlocks`. * @return whether all N bytes were successfully granted. */ - def acquireMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized { + def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized { val numBytesToFree = math.max(0, numBytes - memoryFree) - acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks) + acquireMemory(blockId, numBytes, numBytesToFree) } /** @@ -80,19 +73,12 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w def acquireMemory( blockId: BlockId, numBytesToAcquire: Long, - numBytesToFree: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized { + numBytesToFree: Long): Boolean = lock.synchronized { assert(numBytesToAcquire >= 0) assert(numBytesToFree >= 0) assert(memoryUsed <= poolSize) if (numBytesToFree > 0) { - memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, evictedBlocks) - // Register evicted blocks, if any, with the active task metrics - Option(TaskContext.get()).foreach { tc => - val metrics = tc.taskMetrics() - val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) - metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq) - } + memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree) } // NOTE: If the memory store evicts blocks, then those evictions will synchronously call // back into this StorageMemoryPool in order to free memory. Therefore, these variables @@ -129,9 +115,7 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory if (remainingSpaceToFree > 0) { // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: - val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, evictedBlocks) - val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum + val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree) // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do // not need to decrement _memoryUsed here. However, we do need to decrement the pool size. decrementPoolSize(spaceFreedByEviction) diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 57a24ac140287..a3321e3f179f6 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -17,10 +17,8 @@ package org.apache.spark.memory -import scala.collection.mutable - import org.apache.spark.SparkConf -import org.apache.spark.storage.{BlockId, BlockStatus} +import org.apache.spark.storage.BlockId /** * A [[MemoryManager]] that enforces a soft boundary between execution and storage such that @@ -133,10 +131,7 @@ private[spark] class UnifiedMemoryManager private[memory] ( } } - override def acquireStorageMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { + override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized { assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) assert(numBytes >= 0) if (numBytes > maxStorageMemory) { @@ -152,14 +147,11 @@ private[spark] class UnifiedMemoryManager private[memory] ( onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution) storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution) } - storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks) + storageMemoryPool.acquireMemory(blockId, numBytes) } - override def acquireUnrollMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { - acquireStorageMemory(blockId, numBytes, evictedBlocks) + override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized { + acquireStorageMemory(blockId, numBytes) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 4479e6875a731..69693e6fb3eda 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -642,12 +642,16 @@ private[spark] class BlockManager( None } + /** + * @return true if the block was stored or false if the block was already stored or an + * error occurred. + */ def putIterator( blockId: BlockId, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean = true, - effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = { + effectiveStorageLevel: Option[StorageLevel] = None): Boolean = { require(values != null, "Values is null") doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel) } @@ -671,28 +675,32 @@ private[spark] class BlockManager( /** * Put a new block of values to the block manager. - * Return a list of blocks updated as a result of this put. + * + * @return true if the block was stored or false if the block was already stored or an + * error occurred. */ def putArray( blockId: BlockId, values: Array[Any], level: StorageLevel, tellMaster: Boolean = true, - effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = { + effectiveStorageLevel: Option[StorageLevel] = None): Boolean = { require(values != null, "Values is null") doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel) } /** * Put a new block of serialized bytes to the block manager. - * Return a list of blocks updated as a result of this put. + * + * @return true if the block was stored or false if the block was already stored or an + * error occurred. */ def putBytes( blockId: BlockId, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true, - effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = { + effectiveStorageLevel: Option[StorageLevel] = None): Boolean = { require(bytes != null, "Bytes is null") doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel) } @@ -704,14 +712,16 @@ private[spark] class BlockManager( * The effective storage level refers to the level according to which the block will actually be * handled. This allows the caller to specify an alternate behavior of doPut while preserving * the original level specified by the user. + * + * @return true if the block was stored or false if the block was already stored or an + * error occurred. */ private def doPut( blockId: BlockId, data: BlockValues, level: StorageLevel, tellMaster: Boolean = true, - effectiveStorageLevel: Option[StorageLevel] = None) - : Seq[(BlockId, BlockStatus)] = { + effectiveStorageLevel: Option[StorageLevel] = None): Boolean = { require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") @@ -719,9 +729,6 @@ private[spark] class BlockManager( require(level != null && level.isValid, "Effective StorageLevel is null or invalid") } - // Return value - val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - /* Remember the block's storage level so that we can correctly drop it to disk if it needs * to be dropped right after it got put into memory. Note, however, that other threads will * not be able to get() this block until we call markReady on its BlockInfo. */ @@ -732,7 +739,7 @@ private[spark] class BlockManager( if (oldBlockOpt.isDefined) { if (oldBlockOpt.get.waitForReady()) { logWarning(s"Block $blockId already exists on this machine; not re-adding it") - return updatedBlocks + return false } // TODO: So the block info exists - but previous attempt to load it (?) failed. // What do we do now ? Retry on it ? @@ -816,11 +823,6 @@ private[spark] class BlockManager( case _ => } - // Keep track of which blocks are dropped from memory - if (putLevel.useMemory) { - result.droppedBlocks.foreach { updatedBlocks += _ } - } - val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) if (putBlockStatus.storageLevel != StorageLevel.NONE) { // Now that the block is in either the memory, externalBlockStore, or disk store, @@ -830,7 +832,6 @@ private[spark] class BlockManager( if (tellMaster) { reportBlockStatus(blockId, putBlockInfo, putBlockStatus) } - updatedBlocks += ((blockId, putBlockStatus)) } } finally { // If we failed in putting the block to memory/disk, notify other possible readers @@ -880,7 +881,7 @@ private[spark] class BlockManager( .format(blockId, Utils.getUsedTimeMs(startTimeMs))) } - updatedBlocks + true } /** @@ -1001,12 +1002,15 @@ private[spark] class BlockManager( /** * Write a block consisting of a single object. + * + * @return true if the block was stored or false if the block was already stored or an + * error occurred. */ def putSingle( blockId: BlockId, value: Any, level: StorageLevel, - tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = { + tellMaster: Boolean = true): Boolean = { putIterator(blockId, Iterator(value), level, tellMaster) } diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index bdab8c2332fae..8180d38a89610 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -95,9 +95,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo val values = blockManager.dataDeserialize(blockId, bytes) putIterator(blockId, values, level, returnValues = true) } else { - val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks) - PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks) + tryToPut(blockId, bytes, bytes.limit, deserialized = false) + PutResult(bytes.limit(), Right(bytes.duplicate())) } } @@ -110,8 +109,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = { // Work on a duplicate - since the original input might be used elsewhere. lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer] - val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false, droppedBlocks) + val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false) val data = if (putSuccess) { assert(bytes.limit == size) @@ -119,7 +117,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } else { null } - PutResult(size, data, droppedBlocks) + PutResult(size, data) } override def putArray( @@ -127,15 +125,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo values: Array[Any], level: StorageLevel, returnValues: Boolean): PutResult = { - val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef]) - tryToPut(blockId, values, sizeEstimate, deserialized = true, droppedBlocks) - PutResult(sizeEstimate, Left(values.iterator), droppedBlocks) + tryToPut(blockId, values, sizeEstimate, deserialized = true) + PutResult(sizeEstimate, Left(values.iterator)) } else { val bytes = blockManager.dataSerialize(blockId, values.iterator) - tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks) - PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks) + tryToPut(blockId, bytes, bytes.limit, deserialized = false) + PutResult(bytes.limit(), Right(bytes.duplicate())) } } @@ -165,22 +162,20 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo level: StorageLevel, returnValues: Boolean, allowPersistToDisk: Boolean): PutResult = { - val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - val unrolledValues = unrollSafely(blockId, values, droppedBlocks) + val unrolledValues = unrollSafely(blockId, values) unrolledValues match { case Left(arrayValues) => // Values are fully unrolled in memory, so store them as an array val res = putArray(blockId, arrayValues, level, returnValues) - droppedBlocks ++= res.droppedBlocks - PutResult(res.size, res.data, droppedBlocks) + PutResult(res.size, res.data) case Right(iteratorValues) => // Not enough space to unroll this block; drop to disk if applicable if (level.useDisk && allowPersistToDisk) { logWarning(s"Persisting block $blockId to disk instead.") val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues) - PutResult(res.size, res.data, droppedBlocks) + PutResult(res.size, res.data) } else { - PutResult(0, Left(iteratorValues), droppedBlocks) + PutResult(0, Left(iteratorValues)) } } } @@ -246,11 +241,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * This method returns either an array with the contents of the entire block or an iterator * containing the values of the block (if the array would have exceeded available memory). */ - def unrollSafely( - blockId: BlockId, - values: Iterator[Any], - droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]) - : Either[Array[Any], Iterator[Any]] = { + def unrollSafely(blockId: BlockId, values: Iterator[Any]): Either[Array[Any], Iterator[Any]] = { // Number of elements unrolled so far var elementsUnrolled = 0 @@ -270,7 +261,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo var vector = new SizeTrackingVector[Any] // Request enough memory to begin unrolling - keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, droppedBlocks) + keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold) if (!keepUnrolling) { logWarning(s"Failed to reserve initial memory threshold of " + @@ -286,8 +277,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo val currentSize = vector.estimateSize() if (currentSize >= memoryThreshold) { val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong - keepUnrolling = reserveUnrollMemoryForThisTask( - blockId, amountToRequest, droppedBlocks) + keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest) // New threshold is currentSize * memoryGrowthFactor memoryThreshold += amountToRequest } @@ -337,9 +327,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo blockId: BlockId, value: Any, size: Long, - deserialized: Boolean, - droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { - tryToPut(blockId, () => value, size, deserialized, droppedBlocks) + deserialized: Boolean): Boolean = { + tryToPut(blockId, () => value, size, deserialized) } /** @@ -355,16 +344,13 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * blocks to free memory for one block, another thread may use up the freed space for * another block. * - * All blocks evicted in the process, if any, will be added to `droppedBlocks`. - * * @return whether put was successful. */ private def tryToPut( blockId: BlockId, value: () => Any, size: Long, - deserialized: Boolean, - droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { + deserialized: Boolean): Boolean = { /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has @@ -380,7 +366,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // happen atomically. This relies on the assumption that all memory acquisitions are // synchronized on the same lock. releasePendingUnrollMemoryForThisTask() - val enoughMemory = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks) + val enoughMemory = memoryManager.acquireStorageMemory(blockId, size) if (enoughMemory) { // We acquired enough memory for the block, so go ahead and put it val entry = new MemoryEntry(value(), size, deserialized) @@ -398,8 +384,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } else { Right(value().asInstanceOf[ByteBuffer].duplicate()) } - val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data) - droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } + blockManager.dropFromMemory(blockId, () => data) } enoughMemory } @@ -413,13 +398,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * * @param blockId the ID of the block we are freeing space for, if any * @param space the size of this block - * @param droppedBlocks a holder for blocks evicted in the process - * @return whether the requested free space is freed. + * @return the amount of memory (in bytes) freed by eviction */ - private[spark] def evictBlocksToFreeSpace( - blockId: Option[BlockId], - space: Long, - droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { + private[spark] def evictBlocksToFreeSpace(blockId: Option[BlockId], space: Long): Long = { assert(space > 0) memoryManager.synchronized { var freedMemory = 0L @@ -453,17 +434,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } else { Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) } - val droppedBlockStatus = blockManager.dropFromMemory(blockId, data) - droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } + blockManager.dropFromMemory(blockId, data) } } - true + freedMemory } else { blockId.foreach { id => logInfo(s"Will not store $id as it would require dropping another block " + "from the same RDD") } - false + 0L } } } @@ -481,12 +461,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * Reserve memory for unrolling the given block for this task. * @return whether the request is granted. */ - def reserveUnrollMemoryForThisTask( - blockId: BlockId, - memory: Long, - droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { + def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean = { memoryManager.synchronized { - val success = memoryManager.acquireUnrollMemory(blockId, memory, droppedBlocks) + val success = memoryManager.acquireUnrollMemory(blockId, memory) if (success) { val taskAttemptId = currentTaskAttemptId() unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index f2924a6a5c052..046816d2ad321 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -70,8 +70,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft */ protected def makeMemoryStore(mm: MemoryManager): MemoryStore = { val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS) - when(ms.evictBlocksToFreeSpace(any(), anyLong(), any())) - .thenAnswer(evictBlocksToFreeSpaceAnswer(mm)) + when(ms.evictBlocksToFreeSpace(any(), anyLong())).thenAnswer(evictBlocksToFreeSpaceAnswer(mm)) mm.setMemoryStore(ms) ms } @@ -89,9 +88,9 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft * records the number of bytes this is called with. This variable is expected to be cleared * by the test code later through [[assertEvictBlocksToFreeSpaceCalled]]. */ - private def evictBlocksToFreeSpaceAnswer(mm: MemoryManager): Answer[Boolean] = { - new Answer[Boolean] { - override def answer(invocation: InvocationOnMock): Boolean = { + private def evictBlocksToFreeSpaceAnswer(mm: MemoryManager): Answer[Long] = { + new Answer[Long] { + override def answer(invocation: InvocationOnMock): Long = { val args = invocation.getArguments val numBytesToFree = args(1).asInstanceOf[Long] assert(numBytesToFree > 0) @@ -111,10 +110,10 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft evictedBlocks.append( (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L, 0L))) } - true + numBytesToFree } else { // No blocks were evicted because eviction would not free enough space. - false + 0L } } } diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index 68cf26fc3ed5d..eee78d396e147 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -81,22 +81,22 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { val dummyBlock = TestBlockId("you can see the world you brought to live") val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem) assert(mm.storageMemoryUsed === 0L) - assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 10L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 10L) - assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 100L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 110L) // Acquire more than the max, not granted - assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L, evictedBlocks)) + assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 110L) // Acquire up to the max, requests after this are still granted due to LRU eviction - assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem)) assertEvictBlocksToFreeSpaceCalled(ms, 110L) assert(mm.storageMemoryUsed === 1000L) - assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 1L)) assertEvictBlocksToFreeSpaceCalled(ms, 1L) assert(evictedBlocks.nonEmpty) evictedBlocks.clear() @@ -107,12 +107,12 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { mm.releaseStorageMemory(800L) assert(mm.storageMemoryUsed === 200L) // Acquire after release - assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 1L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 201L) mm.releaseAllStorageMemory() assert(mm.storageMemoryUsed === 0L) - assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 1L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 1L) // Release beyond what was acquired @@ -134,7 +134,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { assert(mm.storageMemoryUsed === 0L) assert(mm.executionMemoryUsed === 200L) // Only storage memory should increase - assert(mm.acquireStorageMemory(dummyBlock, 50L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 50L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 50L) assert(mm.executionMemoryUsed === 200L) @@ -152,21 +152,21 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { val maxStorageMem = 1000L val dummyBlock = TestBlockId("lonely water") val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem) - assert(mm.acquireUnrollMemory(dummyBlock, 100L, evictedBlocks)) + assert(mm.acquireUnrollMemory(dummyBlock, 100L)) when(ms.currentUnrollMemory).thenReturn(100L) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 100L) mm.releaseUnrollMemory(40L) assert(mm.storageMemoryUsed === 60L) when(ms.currentUnrollMemory).thenReturn(60L) - assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 800L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 860L) // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes. // As of this point, cache memory is 800 bytes and current unroll memory is 60 bytes. // Requesting 240 more bytes of unroll memory will leave our total unroll memory at // 300 bytes, still under the 400-byte limit. Therefore, all 240 bytes are granted. - assert(mm.acquireUnrollMemory(dummyBlock, 240L, evictedBlocks)) + assert(mm.acquireUnrollMemory(dummyBlock, 240L)) assertEvictBlocksToFreeSpaceCalled(ms, 100L) // 860 + 240 - 1000 when(ms.currentUnrollMemory).thenReturn(300L) // 60 + 240 assert(mm.storageMemoryUsed === 1000L) @@ -174,7 +174,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { // We already have 300 bytes of unroll memory, so requesting 150 more will leave us // above the 400-byte limit. Since there is not enough free memory, this request will // fail even after evicting as much as we can (400 - 300 = 100 bytes). - assert(!mm.acquireUnrollMemory(dummyBlock, 150L, evictedBlocks)) + assert(!mm.acquireUnrollMemory(dummyBlock, 150L)) assertEvictBlocksToFreeSpaceCalled(ms, 100L) assert(mm.storageMemoryUsed === 900L) // Release beyond what was acquired diff --git a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala index 4a1e49b45df40..e5cb9d3a99f0b 100644 --- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala +++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala @@ -41,14 +41,8 @@ class TestMemoryManager(conf: SparkConf) grant } } - override def acquireStorageMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true - override def acquireUnrollMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true + override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = true + override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = true override def releaseStorageMemory(numBytes: Long): Unit = {} override private[memory] def releaseExecutionMemory( numBytes: Long, diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 6cc48597d38f9..0c4359c3c2cd5 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -74,24 +74,24 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val maxMemory = 1000L val (mm, ms) = makeThings(maxMemory) assert(mm.storageMemoryUsed === 0L) - assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 10L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 10L) - assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 100L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 110L) // Acquire more than the max, not granted - assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, evictedBlocks)) + assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 110L) // Acquire up to the max, requests after this are still granted due to LRU eviction - assert(mm.acquireStorageMemory(dummyBlock, maxMemory, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, maxMemory)) assertEvictBlocksToFreeSpaceCalled(ms, 110L) assert(mm.storageMemoryUsed === 1000L) assert(evictedBlocks.nonEmpty) evictedBlocks.clear() - assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 1L)) assertEvictBlocksToFreeSpaceCalled(ms, 1L) assert(evictedBlocks.nonEmpty) evictedBlocks.clear() @@ -102,12 +102,12 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes mm.releaseStorageMemory(800L) assert(mm.storageMemoryUsed === 200L) // Acquire after release - assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 1L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 201L) mm.releaseAllStorageMemory() assert(mm.storageMemoryUsed === 0L) - assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 1L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 1L) // Release beyond what was acquired @@ -120,7 +120,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val taskAttemptId = 0L val (mm, ms) = makeThings(maxMemory) // Acquire enough storage memory to exceed the storage region - assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 750L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.executionMemoryUsed === 0L) assert(mm.storageMemoryUsed === 750L) @@ -140,7 +140,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes require(mm.executionMemoryUsed === 300L) require(mm.storageMemoryUsed === 0, "bad test: all storage memory should have been released") // Acquire some storage memory again, but this time keep it within the storage region - assert(mm.acquireStorageMemory(dummyBlock, 400L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 400L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 400L) assert(mm.executionMemoryUsed === 300L) @@ -157,7 +157,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val taskAttemptId = 0L val (mm, ms) = makeThings(maxMemory) // Acquire enough storage memory to exceed the storage region size - assert(mm.acquireStorageMemory(dummyBlock, 700L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 700L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.executionMemoryUsed === 0L) assert(mm.storageMemoryUsed === 700L) @@ -182,11 +182,11 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(mm.storageMemoryUsed === 0L) assertEvictBlocksToFreeSpaceNotCalled(ms) // Storage should not be able to evict execution - assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 100L)) assert(mm.executionMemoryUsed === 800L) assert(mm.storageMemoryUsed === 100L) assertEvictBlocksToFreeSpaceNotCalled(ms) - assert(!mm.acquireStorageMemory(dummyBlock, 250L, evictedBlocks)) + assert(!mm.acquireStorageMemory(dummyBlock, 250L)) assert(mm.executionMemoryUsed === 800L) assert(mm.storageMemoryUsed === 100L) // Do not attempt to evict blocks, since evicting will not free enough memory: @@ -199,11 +199,11 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(mm.storageMemoryUsed === 0L) assertEvictBlocksToFreeSpaceNotCalled(ms) // Storage should still not be able to evict execution - assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 750L)) assert(mm.executionMemoryUsed === 200L) assert(mm.storageMemoryUsed === 750L) assertEvictBlocksToFreeSpaceNotCalled(ms) // since there were 800 bytes free - assert(!mm.acquireStorageMemory(dummyBlock, 850L, evictedBlocks)) + assert(!mm.acquireStorageMemory(dummyBlock, 850L)) assert(mm.executionMemoryUsed === 200L) assert(mm.storageMemoryUsed === 750L) // Do not attempt to evict blocks, since evicting will not free enough memory: @@ -243,7 +243,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L) assert(mm.acquireExecutionMemory(100L, 1, MemoryMode.ON_HEAP) === 100L) // Fill up all of the remaining memory with storage. - assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 800L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 800) assert(mm.executionMemoryUsed === 200) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 62e6c4f7932df..272f2700cab3a 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -862,71 +862,71 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(Arrays.equals(notMappedAsArray, bytes)) } - test("updated block statuses") { - store = makeBlockManager(12000) - val list = List.fill(2)(new Array[Byte](2000)) - val bigList = List.fill(8)(new Array[Byte](2000)) - - // 1 updated block (i.e. list1) - val updatedBlocks1 = - store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - assert(updatedBlocks1.size === 1) - assert(updatedBlocks1.head._1 === TestBlockId("list1")) - assert(updatedBlocks1.head._2.storageLevel === StorageLevel.MEMORY_ONLY) - - // 1 updated block (i.e. list2) - val updatedBlocks2 = - store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) - assert(updatedBlocks2.size === 1) - assert(updatedBlocks2.head._1 === TestBlockId("list2")) - assert(updatedBlocks2.head._2.storageLevel === StorageLevel.MEMORY_ONLY) - - // 2 updated blocks - list1 is kicked out of memory while list3 is added - val updatedBlocks3 = - store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - assert(updatedBlocks3.size === 2) - updatedBlocks3.foreach { case (id, status) => - id match { - case TestBlockId("list1") => assert(status.storageLevel === StorageLevel.NONE) - case TestBlockId("list3") => assert(status.storageLevel === StorageLevel.MEMORY_ONLY) - case _ => fail("Updated block is neither list1 nor list3") - } - } - assert(store.memoryStore.contains("list3"), "list3 was not in memory store") - - // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added - val updatedBlocks4 = - store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - assert(updatedBlocks4.size === 2) - updatedBlocks4.foreach { case (id, status) => - id match { - case TestBlockId("list2") => assert(status.storageLevel === StorageLevel.DISK_ONLY) - case TestBlockId("list4") => assert(status.storageLevel === StorageLevel.MEMORY_ONLY) - case _ => fail("Updated block is neither list2 nor list4") - } - } - assert(store.diskStore.contains("list2"), "list2 was not in disk store") - assert(store.memoryStore.contains("list4"), "list4 was not in memory store") - - // No updated blocks - list5 is too big to fit in store and nothing is kicked out - val updatedBlocks5 = - store.putIterator("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - assert(updatedBlocks5.size === 0) - - // memory store contains only list3 and list4 - assert(!store.memoryStore.contains("list1"), "list1 was in memory store") - assert(!store.memoryStore.contains("list2"), "list2 was in memory store") - assert(store.memoryStore.contains("list3"), "list3 was not in memory store") - assert(store.memoryStore.contains("list4"), "list4 was not in memory store") - assert(!store.memoryStore.contains("list5"), "list5 was in memory store") - - // disk store contains only list2 - assert(!store.diskStore.contains("list1"), "list1 was in disk store") - assert(store.diskStore.contains("list2"), "list2 was not in disk store") - assert(!store.diskStore.contains("list3"), "list3 was in disk store") - assert(!store.diskStore.contains("list4"), "list4 was in disk store") - assert(!store.diskStore.contains("list5"), "list5 was in disk store") - } +// test("updated block statuses") { +// store = makeBlockManager(12000) +// val list = List.fill(2)(new Array[Byte](2000)) +// val bigList = List.fill(8)(new Array[Byte](2000)) +// +// // 1 updated block (i.e. list1) +// val updatedBlocks1 = +// store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) +// assert(updatedBlocks1.size === 1) +// assert(updatedBlocks1.head._1 === TestBlockId("list1")) +// assert(updatedBlocks1.head._2.storageLevel === StorageLevel.MEMORY_ONLY) +// +// // 1 updated block (i.e. list2) +// val updatedBlocks2 = +// store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) +// assert(updatedBlocks2.size === 1) +// assert(updatedBlocks2.head._1 === TestBlockId("list2")) +// assert(updatedBlocks2.head._2.storageLevel === StorageLevel.MEMORY_ONLY) +// +// // 2 updated blocks - list1 is kicked out of memory while list3 is added +// val updatedBlocks3 = +// store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) +// assert(updatedBlocks3.size === 2) +// updatedBlocks3.foreach { case (id, status) => +// id match { +// case TestBlockId("list1") => assert(status.storageLevel === StorageLevel.NONE) +// case TestBlockId("list3") => assert(status.storageLevel === StorageLevel.MEMORY_ONLY) +// case _ => fail("Updated block is neither list1 nor list3") +// } +// } +// assert(store.memoryStore.contains("list3"), "list3 was not in memory store") +// +// // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added +// val updatedBlocks4 = +// store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) +// assert(updatedBlocks4.size === 2) +// updatedBlocks4.foreach { case (id, status) => +// id match { +// case TestBlockId("list2") => assert(status.storageLevel === StorageLevel.DISK_ONLY) +// case TestBlockId("list4") => assert(status.storageLevel === StorageLevel.MEMORY_ONLY) +// case _ => fail("Updated block is neither list2 nor list4") +// } +// } +// assert(store.diskStore.contains("list2"), "list2 was not in disk store") +// assert(store.memoryStore.contains("list4"), "list4 was not in memory store") +// +// // No updated blocks - list5 is too big to fit in store and nothing is kicked out +// val updatedBlocks5 = +// store.putIterator("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) +// assert(updatedBlocks5.size === 0) +// +// // memory store contains only list3 and list4 +// assert(!store.memoryStore.contains("list1"), "list1 was in memory store") +// assert(!store.memoryStore.contains("list2"), "list2 was in memory store") +// assert(store.memoryStore.contains("list3"), "list3 was not in memory store") +// assert(store.memoryStore.contains("list4"), "list4 was not in memory store") +// assert(!store.memoryStore.contains("list5"), "list5 was in memory store") +// +// // disk store contains only list2 +// assert(!store.diskStore.contains("list1"), "list1 was in disk store") +// assert(store.diskStore.contains("list2"), "list2 was not in disk store") +// assert(!store.diskStore.contains("list3"), "list3 was in disk store") +// assert(!store.diskStore.contains("list4"), "list4 was in disk store") +// assert(!store.diskStore.contains("list5"), "list5 was in disk store") +// } test("query block statuses") { store = makeBlockManager(12000) @@ -1025,8 +1025,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.currentUnrollMemoryForThisTask === 0) def reserveUnrollMemoryForThisTask(memory: Long): Boolean = { - memoryStore.reserveUnrollMemoryForThisTask( - TestBlockId(""), memory, new ArrayBuffer[(BlockId, BlockStatus)]) + memoryStore.reserveUnrollMemoryForThisTask(TestBlockId(""), memory) } // Reserve @@ -1082,11 +1081,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val smallList = List.fill(40)(new Array[Byte](100)) val bigList = List.fill(40)(new Array[Byte](1000)) val memoryStore = store.memoryStore - val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Unroll with all the space in the world. This should succeed and return an array. - var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks) + var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator) verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) assert(memoryStore.currentUnrollMemoryForThisTask === 0) memoryStore.releasePendingUnrollMemoryForThisTask() @@ -1094,24 +1092,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Unroll with not enough space. This should succeed after kicking out someBlock1. store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY) store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY) - unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks) + unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator) verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) assert(memoryStore.currentUnrollMemoryForThisTask === 0) - assert(droppedBlocks.size === 1) - assert(droppedBlocks.head._1 === TestBlockId("someBlock1")) - droppedBlocks.clear() + assert(memoryStore.contains("someBlock2")) + assert(!memoryStore.contains("someBlock1")) memoryStore.releasePendingUnrollMemoryForThisTask() // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 = // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator. // In the mean time, however, we kicked out someBlock2 before giving up. store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY) - unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator, droppedBlocks) + unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator) verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false) assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator - assert(droppedBlocks.size === 1) - assert(droppedBlocks.head._1 === TestBlockId("someBlock2")) - droppedBlocks.clear() + assert(!memoryStore.contains("someBlock2")) } test("safely unroll blocks through putIterator") { @@ -1258,7 +1253,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE }) assert(result.size === 13000) assert(result.data === null) - assert(result.droppedBlocks === Nil) } test("put a small ByteBuffer to MemoryStore") { @@ -1272,6 +1266,5 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE }) assert(result.size === 10000) assert(result.data === Right(bytes)) - assert(result.droppedBlocks === Nil) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index faa5aca1d8f7a..e22e320b17126 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -71,7 +71,7 @@ private[streaming] class BlockManagerBasedBlockHandler( var numRecords: Option[Long] = None - val putResult: Seq[(BlockId, BlockStatus)] = block match { + val putSucceeded: Boolean = block match { case ArrayBufferBlock(arrayBuffer) => numRecords = Some(arrayBuffer.size.toLong) blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, @@ -88,7 +88,7 @@ private[streaming] class BlockManagerBasedBlockHandler( throw new SparkException( s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") } - if (!putResult.map { _._1 }.contains(blockId)) { + if (!putSucceeded) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } @@ -184,9 +184,9 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // Store the block in block manager val storeInBlockManagerFuture = Future { - val putResult = + val putSucceeded = blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true) - if (!putResult.map { _._1 }.contains(blockId)) { + if (!putSucceeded) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } From ebf86815aeb4d584606343d3fa7431c5a708183c Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 17 Jan 2016 23:05:42 -0800 Subject: [PATCH 2/7] Update block metrics when evicting from MemoryStore. --- .../apache/spark/storage/BlockManager.scala | 21 +++++++------------ .../apache/spark/storage/MemoryStore.scala | 2 +- .../spark/storage/BlockManagerSuite.scala | 8 +++---- 3 files changed, 13 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index aec13198a7d96..70a798a968eba 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -980,23 +980,15 @@ private[spark] class BlockManager( putIterator(blockId, Iterator(value), level, tellMaster) } - def dropFromMemory( - blockId: BlockId, - data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = { - dropFromMemory(blockId, () => data) - } - /** * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory * store reaches its limit and needs to free up space. * * If `data` is not put on disk, it won't be created. - * - * Return the block status if the given block has been updated, else None. */ def dropFromMemory( blockId: BlockId, - data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = { + data: () => Either[Array[Any], ByteBuffer]): Unit = { logInfo(s"Dropping block $blockId from memory") val info = blockInfo.get(blockId) @@ -1009,10 +1001,10 @@ private[spark] class BlockManager( if (!info.waitForReady()) { // If we get here, the block write failed. logWarning(s"Block $blockId was marked as failure. Nothing to drop") - return None + return } else if (blockInfo.asScala.get(blockId).isEmpty) { logWarning(s"Block $blockId was already dropped.") - return None + return } var blockIsUpdated = false val level = info.level @@ -1048,11 +1040,14 @@ private[spark] class BlockManager( blockInfo.remove(blockId) } if (blockIsUpdated) { - return Some(status) + Option(TaskContext.get()).foreach { taskContext => + val metrics = taskContext.taskMetrics() + val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) + metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, status))) + } } } } - None } /** diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 8180d38a89610..76aaa782b9524 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -434,7 +434,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } else { Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) } - blockManager.dropFromMemory(blockId, data) + blockManager.dropFromMemory(blockId, () => data) } } freedMemory diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 93de77f620ea5..e54e4b217748a 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -184,8 +184,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations("a3").size === 0, "master was told about a3") // Drop a1 and a2 from memory; this should be reported back to the master - store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer]) - store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer]) + store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer]) + store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer]) assert(store.getSingle("a1") === None, "a1 not removed from store") assert(store.getSingle("a2") === None, "a2 not removed from store") assert(master.getLocations("a1").size === 0, "master did not remove a1") @@ -425,8 +425,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE t2.join() t3.join() - store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer]) - store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer]) + store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer]) + store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer]) store.waitForAsyncReregister() } } From bdd558005bd8500f1e07fbafeda19c2599761d31 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 17 Jan 2016 23:11:21 -0800 Subject: [PATCH 3/7] Add newly-cached block to updatedBlocks. --- .../main/scala/org/apache/spark/storage/BlockManager.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 70a798a968eba..131ebaa2f1af6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -799,6 +799,11 @@ private[spark] class BlockManager( if (tellMaster) { reportBlockStatus(blockId, putBlockInfo, putBlockStatus) } + Option(TaskContext.get()).foreach { taskContext => + val metrics = taskContext.taskMetrics() + val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) + metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, putBlockStatus))) + } } } finally { // If we failed in putting the block to memory/disk, notify other possible readers From b818433ce11411861e211a5519be3d0361ad5947 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 17 Jan 2016 23:17:41 -0800 Subject: [PATCH 4/7] Fix return semantics of doPut(). --- .../main/scala/org/apache/spark/storage/BlockManager.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 131ebaa2f1af6..e0a8e88df224a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -750,11 +750,12 @@ private[spark] class BlockManager( case _ => null } + var marked = false + putBlockInfo.synchronized { logTrace("Put for block %s took %s to get into synchronized block" .format(blockId, Utils.getUsedTimeMs(startTimeMs))) - var marked = false try { // returnValues - Whether to return the values put // blockStore - The type of storage to put these values into @@ -853,7 +854,7 @@ private[spark] class BlockManager( .format(blockId, Utils.getUsedTimeMs(startTimeMs))) } - true + marked } /** From cb98168a152c9e7b72cfd165c25072a7330073eb Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 17 Jan 2016 23:20:56 -0800 Subject: [PATCH 5/7] Fix MemoryManagerSuite tests. --- .../org/apache/spark/memory/MemoryManagerSuite.scala | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index e4cbedf034f93..d9764c7c10983 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -100,16 +100,8 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft if (numBytesToFree <= mm.storageMemoryUsed) { // We can evict enough blocks to fulfill the request for space mm.releaseStorageMemory(numBytesToFree) - args.last.asInstanceOf[mutable.Buffer[(BlockId, BlockStatus)]].append( + evictedBlocks.append( (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L))) - // We need to add this call so that that the suite-level `evictedBlocks` is updated when - // execution evicts storage; in that case, args.last will not be equal to evictedBlocks - // because it will be a temporary buffer created inside of the MemoryManager rather than - // being passed in by the test code. - if (!(evictedBlocks eq args.last)) { - evictedBlocks.append( - (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L))) - } numBytesToFree } else { // No blocks were evicted because eviction would not free enough space. From b745cab0119acc6b277baf6074dae03da30eae7d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 17 Jan 2016 23:24:56 -0800 Subject: [PATCH 6/7] Fix CacheManagerSuite test. --- .../test/scala/org/apache/spark/CacheManagerSuite.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index 30aa94c8a5971..3865c201bf893 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -85,7 +85,12 @@ class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with Before test("verify task metrics updated correctly") { cacheManager = sc.env.cacheManager val context = TaskContext.empty() - cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY) - assert(context.taskMetrics.updatedBlocks.getOrElse(Seq()).size === 2) + try { + TaskContext.setTaskContext(context) + cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY) + assert(context.taskMetrics.updatedBlocks.getOrElse(Seq()).size === 2) + } finally { + TaskContext.unset() + } } } From bd1c436fd163c3d4b4c2eda6bb5dfe061f3f469c Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 17 Jan 2016 23:29:53 -0800 Subject: [PATCH 7/7] Uncomment BlockManagerSuite test and get it to compile again. --- .../spark/storage/BlockManagerSuite.scala | 146 ++++++++++-------- 1 file changed, 81 insertions(+), 65 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index e54e4b217748a..6e6cf6385f919 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -842,71 +842,87 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(Arrays.equals(notMappedAsArray, bytes)) } -// test("updated block statuses") { -// store = makeBlockManager(12000) -// val list = List.fill(2)(new Array[Byte](2000)) -// val bigList = List.fill(8)(new Array[Byte](2000)) -// -// // 1 updated block (i.e. list1) -// val updatedBlocks1 = -// store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) -// assert(updatedBlocks1.size === 1) -// assert(updatedBlocks1.head._1 === TestBlockId("list1")) -// assert(updatedBlocks1.head._2.storageLevel === StorageLevel.MEMORY_ONLY) -// -// // 1 updated block (i.e. list2) -// val updatedBlocks2 = -// store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) -// assert(updatedBlocks2.size === 1) -// assert(updatedBlocks2.head._1 === TestBlockId("list2")) -// assert(updatedBlocks2.head._2.storageLevel === StorageLevel.MEMORY_ONLY) -// -// // 2 updated blocks - list1 is kicked out of memory while list3 is added -// val updatedBlocks3 = -// store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) -// assert(updatedBlocks3.size === 2) -// updatedBlocks3.foreach { case (id, status) => -// id match { -// case TestBlockId("list1") => assert(status.storageLevel === StorageLevel.NONE) -// case TestBlockId("list3") => assert(status.storageLevel === StorageLevel.MEMORY_ONLY) -// case _ => fail("Updated block is neither list1 nor list3") -// } -// } -// assert(store.memoryStore.contains("list3"), "list3 was not in memory store") -// -// // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added -// val updatedBlocks4 = -// store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) -// assert(updatedBlocks4.size === 2) -// updatedBlocks4.foreach { case (id, status) => -// id match { -// case TestBlockId("list2") => assert(status.storageLevel === StorageLevel.DISK_ONLY) -// case TestBlockId("list4") => assert(status.storageLevel === StorageLevel.MEMORY_ONLY) -// case _ => fail("Updated block is neither list2 nor list4") -// } -// } -// assert(store.diskStore.contains("list2"), "list2 was not in disk store") -// assert(store.memoryStore.contains("list4"), "list4 was not in memory store") -// -// // No updated blocks - list5 is too big to fit in store and nothing is kicked out -// val updatedBlocks5 = -// store.putIterator("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) -// assert(updatedBlocks5.size === 0) -// -// // memory store contains only list3 and list4 -// assert(!store.memoryStore.contains("list1"), "list1 was in memory store") -// assert(!store.memoryStore.contains("list2"), "list2 was in memory store") -// assert(store.memoryStore.contains("list3"), "list3 was not in memory store") -// assert(store.memoryStore.contains("list4"), "list4 was not in memory store") -// assert(!store.memoryStore.contains("list5"), "list5 was in memory store") -// -// // disk store contains only list2 -// assert(!store.diskStore.contains("list1"), "list1 was in disk store") -// assert(store.diskStore.contains("list2"), "list2 was not in disk store") -// assert(!store.diskStore.contains("list3"), "list3 was in disk store") -// assert(!store.diskStore.contains("list4"), "list4 was in disk store") -// assert(!store.diskStore.contains("list5"), "list5 was in disk store") -// } + test("updated block statuses") { + store = makeBlockManager(12000) + val list = List.fill(2)(new Array[Byte](2000)) + val bigList = List.fill(8)(new Array[Byte](2000)) + + def getUpdatedBlocks(task: => Unit): Seq[(BlockId, BlockStatus)] = { + val context = TaskContext.empty() + try { + TaskContext.setTaskContext(context) + task + } finally { + TaskContext.unset() + } + context.taskMetrics.updatedBlocks.getOrElse(Seq.empty) + } + + // 1 updated block (i.e. list1) + val updatedBlocks1 = getUpdatedBlocks { + store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + } + assert(updatedBlocks1.size === 1) + assert(updatedBlocks1.head._1 === TestBlockId("list1")) + assert(updatedBlocks1.head._2.storageLevel === StorageLevel.MEMORY_ONLY) + + // 1 updated block (i.e. list2) + val updatedBlocks2 = getUpdatedBlocks { + store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) + } + assert(updatedBlocks2.size === 1) + assert(updatedBlocks2.head._1 === TestBlockId("list2")) + assert(updatedBlocks2.head._2.storageLevel === StorageLevel.MEMORY_ONLY) + + // 2 updated blocks - list1 is kicked out of memory while list3 is added + val updatedBlocks3 = getUpdatedBlocks { + store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + } + assert(updatedBlocks3.size === 2) + updatedBlocks3.foreach { case (id, status) => + id match { + case TestBlockId("list1") => assert(status.storageLevel === StorageLevel.NONE) + case TestBlockId("list3") => assert(status.storageLevel === StorageLevel.MEMORY_ONLY) + case _ => fail("Updated block is neither list1 nor list3") + } + } + assert(store.memoryStore.contains("list3"), "list3 was not in memory store") + + // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added + val updatedBlocks4 = getUpdatedBlocks { + store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + } + assert(updatedBlocks4.size === 2) + updatedBlocks4.foreach { case (id, status) => + id match { + case TestBlockId("list2") => assert(status.storageLevel === StorageLevel.DISK_ONLY) + case TestBlockId("list4") => assert(status.storageLevel === StorageLevel.MEMORY_ONLY) + case _ => fail("Updated block is neither list2 nor list4") + } + } + assert(store.diskStore.contains("list2"), "list2 was not in disk store") + assert(store.memoryStore.contains("list4"), "list4 was not in memory store") + + // No updated blocks - list5 is too big to fit in store and nothing is kicked out + val updatedBlocks5 = getUpdatedBlocks { + store.putIterator("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + } + assert(updatedBlocks5.size === 0) + + // memory store contains only list3 and list4 + assert(!store.memoryStore.contains("list1"), "list1 was in memory store") + assert(!store.memoryStore.contains("list2"), "list2 was in memory store") + assert(store.memoryStore.contains("list3"), "list3 was not in memory store") + assert(store.memoryStore.contains("list4"), "list4 was not in memory store") + assert(!store.memoryStore.contains("list5"), "list5 was in memory store") + + // disk store contains only list2 + assert(!store.diskStore.contains("list1"), "list1 was in disk store") + assert(store.diskStore.contains("list2"), "list2 was not in disk store") + assert(!store.diskStore.contains("list3"), "list3 was in disk store") + assert(!store.diskStore.contains("list4"), "list4 was in disk store") + assert(!store.diskStore.contains("list5"), "list5 was in disk store") + } test("query block statuses") { store = makeBlockManager(12000)