From 302eaa52fc98c79661cb2b17d9e95392a98b18f0 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 13 Jan 2016 17:02:29 -0800 Subject: [PATCH 1/4] Remove putArray() and duplicated unroll calls. --- .../scala/org/apache/spark/CacheManager.scala | 75 +++---------------- .../apache/spark/storage/BlockManager.scala | 24 +----- .../org/apache/spark/storage/BlockStore.scala | 8 -- .../org/apache/spark/storage/DiskStore.scala | 9 --- .../spark/storage/ExternalBlockStore.scala | 15 ---- .../apache/spark/storage/MemoryStore.scala | 4 +- 6 files changed, 16 insertions(+), 119 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 4d20c7369376..f4e504dc6db6 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -75,7 +75,17 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { // Otherwise, cache the values and keep track of any updates in block statuses val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks) + val cachedValues = { + updatedBlocks ++= + blockManager.putIterator(key, computedValues, storageLevel, tellMaster = true) + blockManager.get(key) match { + case Some(v) => v.data.asInstanceOf[Iterator[T]] + case None => + val msg = s"Block manager failed to return cached value for $key!" + logInfo(msg) + throw new BlockException(key, msg) + } + } val metrics = context.taskMetrics val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq) @@ -126,67 +136,4 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { } } } - - /** - * Cache the values of a partition, keeping track of any updates in the storage statuses of - * other blocks along the way. - * - * The effective storage level refers to the level that actually specifies BlockManager put - * behavior, not the level originally specified by the user. This is mainly for forcing a - * MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition, - * while preserving the the original semantics of the RDD as specified by the application. - */ - private def putInBlockManager[T]( - key: BlockId, - values: Iterator[T], - level: StorageLevel, - updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)], - effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = { - - val putLevel = effectiveStorageLevel.getOrElse(level) - if (!putLevel.useMemory) { - /* - * This RDD is not to be cached in memory, so we can just pass the computed values as an - * iterator directly to the BlockManager rather than first fully unrolling it in memory. - */ - updatedBlocks ++= - blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel) - blockManager.get(key) match { - case Some(v) => v.data.asInstanceOf[Iterator[T]] - case None => - logInfo(s"Failure to store $key") - throw new BlockException(key, s"Block manager failed to return cached value for $key!") - } - } else { - /* - * This RDD is to be cached in memory. In this case we cannot pass the computed values - * to the BlockManager as an iterator and expect to read it back later. This is because - * we may end up dropping a partition from memory store before getting it back. - * - * In addition, we must be careful to not unroll the entire partition in memory at once. - * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this - * single partition. Instead, we unroll the values cautiously, potentially aborting and - * dropping the partition to disk if applicable. - */ - blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match { - case Left(arr) => - // We have successfully unrolled the entire partition, so cache it in memory - updatedBlocks ++= - blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel) - arr.iterator.asInstanceOf[Iterator[T]] - case Right(it) => - // There is not enough space to cache this partition in memory - val returnValues = it.asInstanceOf[Iterator[T]] - if (putLevel.useDisk) { - logWarning(s"Persisting partition $key to disk instead.") - val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false, - useOffHeap = false, deserialized = false, putLevel.replication) - putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel)) - } else { - returnValues - } - } - } - } - } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 4479e6875a73..58d67e36967b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -47,7 +47,6 @@ import org.apache.spark.util._ private[spark] sealed trait BlockValues private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues -private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues /* Class for returning a fetched block and associated metrics. */ private[spark] class BlockResult( @@ -646,10 +645,9 @@ private[spark] class BlockManager( blockId: BlockId, values: Iterator[Any], level: StorageLevel, - tellMaster: Boolean = true, - effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = { + tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = { require(values != null, "Values is null") - doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel) + doPut(blockId, IteratorValues(values), level, tellMaster) } /** @@ -669,20 +667,6 @@ private[spark] class BlockManager( syncWrites, writeMetrics, blockId) } - /** - * Put a new block of values to the block manager. - * Return a list of blocks updated as a result of this put. - */ - def putArray( - blockId: BlockId, - values: Array[Any], - level: StorageLevel, - tellMaster: Boolean = true, - effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = { - require(values != null, "Values is null") - doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel) - } - /** * Put a new block of serialized bytes to the block manager. * Return a list of blocks updated as a result of this put. @@ -803,8 +787,6 @@ private[spark] class BlockManager( val result = data match { case IteratorValues(iterator) => blockStore.putIterator(blockId, iterator, putLevel, returnValues) - case ArrayValues(array) => - blockStore.putArray(blockId, array, putLevel, returnValues) case ByteBufferValues(bytes) => bytes.rewind() blockStore.putBytes(blockId, bytes, putLevel) @@ -1052,7 +1034,7 @@ private[spark] class BlockManager( logInfo(s"Writing block $blockId to disk") data() match { case Left(elements) => - diskStore.putArray(blockId, elements, level, returnValues = false) + diskStore.putIterator(blockId, elements.toIterator, level, returnValues = false) case Right(bytes) => diskStore.putBytes(blockId, bytes, level) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala index 69985c9759e2..131c1df7bc36 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala @@ -19,8 +19,6 @@ package org.apache.spark.storage import java.nio.ByteBuffer -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.Logging /** @@ -43,12 +41,6 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends level: StorageLevel, returnValues: Boolean): PutResult - def putArray( - blockId: BlockId, - values: Array[Any], - level: StorageLevel, - returnValues: Boolean): PutResult - /** * Return the size of a block in bytes. */ diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 1f3f193f2ffa..db12a4a1b999 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -22,7 +22,6 @@ import java.nio.ByteBuffer import java.nio.channels.FileChannel.MapMode import org.apache.spark.Logging -import org.apache.spark.serializer.Serializer import org.apache.spark.util.Utils /** @@ -58,14 +57,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc PutResult(bytes.limit(), Right(bytes.duplicate())) } - override def putArray( - blockId: BlockId, - values: Array[Any], - level: StorageLevel, - returnValues: Boolean): PutResult = { - putIterator(blockId, values.toIterator, level, returnValues) - } - override def putIterator( blockId: BlockId, values: Iterator[Any], diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala index 94883a54a74e..6ba0c62a2a40 100644 --- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala @@ -51,26 +51,11 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId: putIntoExternalBlockStore(blockId, bytes, returnValues = true) } - override def putArray( - blockId: BlockId, - values: Array[Any], - level: StorageLevel, - returnValues: Boolean): PutResult = { - putIntoExternalBlockStore(blockId, values.toIterator, returnValues) - } - override def putIterator( blockId: BlockId, values: Iterator[Any], level: StorageLevel, returnValues: Boolean): PutResult = { - putIntoExternalBlockStore(blockId, values, returnValues) - } - - private def putIntoExternalBlockStore( - blockId: BlockId, - values: Iterator[_], - returnValues: Boolean): PutResult = { logTrace(s"Attempting to put block $blockId into ExternalBlockStore") // we should never hit here if externalBlockManager is None. Handle it anyway for safety. try { diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index bdab8c2332fa..c5b1dea68167 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -122,7 +122,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo PutResult(size, data, droppedBlocks) } - override def putArray( + private def putArray( blockId: BlockId, values: Array[Any], level: StorageLevel, @@ -246,7 +246,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * This method returns either an array with the contents of the entire block or an iterator * containing the values of the block (if the array would have exceeded available memory). */ - def unrollSafely( + private[storage] def unrollSafely( blockId: BlockId, values: Iterator[Any], droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]) From fdad4126cd0691bb70f224b7a55dc93e471dc0b9 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 13 Jan 2016 17:07:02 -0800 Subject: [PATCH 2/4] Inline MemoryStore.putArray() at callsite. --- .../apache/spark/storage/MemoryStore.scala | 30 ++++++++----------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index c5b1dea68167..7f07924d0a3c 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -122,23 +122,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo PutResult(size, data, droppedBlocks) } - private def putArray( - blockId: BlockId, - values: Array[Any], - level: StorageLevel, - returnValues: Boolean): PutResult = { - val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - if (level.deserialized) { - val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef]) - tryToPut(blockId, values, sizeEstimate, deserialized = true, droppedBlocks) - PutResult(sizeEstimate, Left(values.iterator), droppedBlocks) - } else { - val bytes = blockManager.dataSerialize(blockId, values.iterator) - tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks) - PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks) - } - } - override def putIterator( blockId: BlockId, values: Iterator[Any], @@ -170,7 +153,18 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo unrolledValues match { case Left(arrayValues) => // Values are fully unrolled in memory, so store them as an array - val res = putArray(blockId, arrayValues, level, returnValues) + val res = { + val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + if (level.deserialized) { + val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef]) + tryToPut(blockId, arrayValues, sizeEstimate, deserialized = true, droppedBlocks) + PutResult(sizeEstimate, Left(arrayValues.iterator), droppedBlocks) + } else { + val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) + tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks) + PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks) + } + } droppedBlocks ++= res.droppedBlocks PutResult(res.size, res.data, droppedBlocks) case Right(iteratorValues) => From 5accde7d27cba46bd274b92db62981398b02608c Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 13 Jan 2016 17:08:58 -0800 Subject: [PATCH 3/4] Cleanup post-inlining. --- .../apache/spark/storage/MemoryStore.scala | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 7f07924d0a3c..7418120090d3 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -153,20 +153,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo unrolledValues match { case Left(arrayValues) => // Values are fully unrolled in memory, so store them as an array - val res = { - val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - if (level.deserialized) { - val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef]) - tryToPut(blockId, arrayValues, sizeEstimate, deserialized = true, droppedBlocks) - PutResult(sizeEstimate, Left(arrayValues.iterator), droppedBlocks) - } else { - val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) - tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks) - PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks) - } + if (level.deserialized) { + val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef]) + tryToPut(blockId, arrayValues, sizeEstimate, deserialized = true, droppedBlocks) + PutResult(sizeEstimate, Left(arrayValues.iterator), droppedBlocks) + } else { + val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) + tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks) + PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks) } - droppedBlocks ++= res.droppedBlocks - PutResult(res.size, res.data, droppedBlocks) case Right(iteratorValues) => // Not enough space to unroll this block; drop to disk if applicable if (level.useDisk && allowPersistToDisk) { From 80d375a8962cb79a3a6da1f176f82c709e6a493c Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 13 Jan 2016 17:10:31 -0800 Subject: [PATCH 4/4] Remove one of the tryToPut() overloads. --- .../org/apache/spark/storage/MemoryStore.scala | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 7418120090d3..114b387e6655 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -96,7 +96,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo putIterator(blockId, values, level, returnValues = true) } else { val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks) + tryToPut(blockId, () => bytes, bytes.limit, deserialized = false, droppedBlocks) PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks) } } @@ -155,11 +155,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // Values are fully unrolled in memory, so store them as an array if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef]) - tryToPut(blockId, arrayValues, sizeEstimate, deserialized = true, droppedBlocks) + tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true, droppedBlocks) PutResult(sizeEstimate, Left(arrayValues.iterator), droppedBlocks) } else { val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) - tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks) + tryToPut(blockId, () => bytes, bytes.limit, deserialized = false, droppedBlocks) PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks) } case Right(iteratorValues) => @@ -322,15 +322,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo blockId.asRDDId.map(_.rddId) } - private def tryToPut( - blockId: BlockId, - value: Any, - size: Long, - deserialized: Boolean, - droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { - tryToPut(blockId, () => value, size, deserialized, droppedBlocks) - } - /** * Try to put in a set of values, if we can free up enough space. The value should either be * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size