From 6339db4a830580817e5020c2830a278b297e856e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 3 Mar 2016 13:37:25 -0800 Subject: [PATCH 01/15] Refactor synchronous block replication code to call doGetLocal(): The goal behind this change is to rely on doGetLocal() for returning bytes rather than having to perform the serialization in the replication code. --- .../apache/spark/storage/BlockManager.scala | 232 ++++++++---------- 1 file changed, 107 insertions(+), 125 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index b59191b2917c9..2da352c43c904 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -432,98 +432,105 @@ private[spark] class BlockManager( logDebug(s"Block $blockId was not found") None case Some(info) => - val level = info.level - logDebug(s"Level for block $blockId is $level") - - // Look for the block in memory - if (level.useMemory) { - logDebug(s"Getting block $blockId from memory") - val result = if (asBlockResult) { - memoryStore.getValues(blockId).map { iter => - val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId)) - new BlockResult(ci, DataReadMethod.Memory, info.size) - } - } else { - memoryStore.getBytes(blockId) - } - result match { - case Some(values) => - return result - case None => - logDebug(s"Block $blockId not found in memory") - } + doGetLocal(blockId, info, asBlockResult) + } + } + + private def doGetLocal( + blockId: BlockId, + info: BlockInfo, + asBlockResult: Boolean): Option[Any] = { + val level = info.level + logDebug(s"Level for block $blockId is $level") + + // Look for the block in memory + if (level.useMemory) { + logDebug(s"Getting block $blockId from memory") + val result = if (asBlockResult) { + memoryStore.getValues(blockId).map { iter => + val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId)) + new BlockResult(ci, DataReadMethod.Memory, info.size) } + } else { + memoryStore.getBytes(blockId) + } + result match { + case Some(values) => + return result + case None => + logDebug(s"Block $blockId not found in memory") + } + } - // Look for block on disk, potentially storing it back in memory if required - if (level.useDisk) { - logDebug(s"Getting block $blockId from disk") - val bytes: ByteBuffer = diskStore.getBytes(blockId) match { - case Some(b) => b - case None => - releaseLock(blockId) - throw new BlockException( - blockId, s"Block $blockId not found on disk, though it should be") - } - assert(0 == bytes.position()) - - if (!level.useMemory) { - // If the block shouldn't be stored in memory, we can just return it - if (asBlockResult) { - val iter = CompletionIterator[Any, Iterator[Any]]( - dataDeserialize(blockId, bytes), releaseLock(blockId)) - return Some(new BlockResult(iter, DataReadMethod.Disk, info.size)) - } else { - return Some(bytes) - } - } else { - // Otherwise, we also have to store something in the memory store - if (!level.deserialized || !asBlockResult) { - /* We'll store the bytes in memory if the block's storage level includes - * "memory serialized", or if it should be cached as objects in memory - * but we only requested its serialized bytes. */ - memoryStore.putBytes(blockId, bytes.limit, () => { - // https://issues.apache.org/jira/browse/SPARK-6076 - // If the file size is bigger than the free memory, OOM will happen. So if we cannot - // put it into MemoryStore, copyForMemory should not be created. That's why this - // action is put into a `() => ByteBuffer` and created lazily. - val copyForMemory = ByteBuffer.allocate(bytes.limit) - copyForMemory.put(bytes) - }) - bytes.rewind() - } - if (!asBlockResult) { - return Some(bytes) - } else { - val values = dataDeserialize(blockId, bytes) - if (level.deserialized) { - // Cache the values before returning them - val putResult = memoryStore.putIterator( - blockId, values, level, returnValues = true, allowPersistToDisk = false) - // The put may or may not have succeeded, depending on whether there was enough - // space to unroll the block. Either way, the put here should return an iterator. - putResult.data match { - case Left(it) => - val ci = CompletionIterator[Any, Iterator[Any]](it, releaseLock(blockId)) - return Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) - case _ => - // This only happens if we dropped the values back to disk (which is never) - throw new SparkException("Memory store did not return an iterator!") - } - } else { - val ci = CompletionIterator[Any, Iterator[Any]](values, releaseLock(blockId)) + // Look for block on disk, potentially storing it back in memory if required + if (level.useDisk) { + logDebug(s"Getting block $blockId from disk") + val bytes: ByteBuffer = diskStore.getBytes(blockId) match { + case Some(b) => b + case None => + releaseLock(blockId) + throw new BlockException( + blockId, s"Block $blockId not found on disk, though it should be") + } + assert(0 == bytes.position()) + + if (!level.useMemory) { + // If the block shouldn't be stored in memory, we can just return it + if (asBlockResult) { + val iter = CompletionIterator[Any, Iterator[Any]]( + dataDeserialize(blockId, bytes), releaseLock(blockId)) + return Some(new BlockResult(iter, DataReadMethod.Disk, info.size)) + } else { + return Some(bytes) + } + } else { + // Otherwise, we also have to store something in the memory store + if (!level.deserialized || !asBlockResult) { + /* We'll store the bytes in memory if the block's storage level includes + * "memory serialized", or if it should be cached as objects in memory + * but we only requested its serialized bytes. */ + memoryStore.putBytes(blockId, bytes.limit, () => { + // https://issues.apache.org/jira/browse/SPARK-6076 + // If the file size is bigger than the free memory, OOM will happen. So if we cannot + // put it into MemoryStore, copyForMemory should not be created. That's why this + // action is put into a `() => ByteBuffer` and created lazily. + val copyForMemory = ByteBuffer.allocate(bytes.limit) + copyForMemory.put(bytes) + }) + bytes.rewind() + } + if (!asBlockResult) { + return Some(bytes) + } else { + val values = dataDeserialize(blockId, bytes) + if (level.deserialized) { + // Cache the values before returning them + val putResult = memoryStore.putIterator( + blockId, values, level, returnValues = true, allowPersistToDisk = false) + // The put may or may not have succeeded, depending on whether there was enough + // space to unroll the block. Either way, the put here should return an iterator. + putResult.data match { + case Left(it) => + val ci = CompletionIterator[Any, Iterator[Any]](it, releaseLock(blockId)) return Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) - } + case _ => + // This only happens if we dropped the values back to disk (which is never) + throw new SparkException("Memory store did not return an iterator!") } + } else { + val ci = CompletionIterator[Any, Iterator[Any]](values, releaseLock(blockId)) + return Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) } - } else { - // This branch represents a case where the BlockInfoManager contained an entry for - // the block but the block could not be found in any of the block stores. This case - // should never occur, but for completeness's sake we address it here. - logError( - s"Block $blockId is supposedly stored locally but was not found in any block store") - releaseLock(blockId) - None } + } + } else { + // This branch represents a case where the BlockInfoManager contained an entry for + // the block but the block could not be found in any of the block stores. This case + // should never occur, but for completeness's sake we address it here. + logError( + s"Block $blockId is supposedly stored locally but was not found in any block store") + releaseLock(blockId) + None } } @@ -771,15 +778,6 @@ private[spark] class BlockManager( val startTimeMs = System.currentTimeMillis - /* If we're storing values and we need to replicate the data, we'll want access to the values, - * but because our put will read the whole iterator, there will be no values left. For the - * case where the put serializes data, we'll remember the bytes, above; but for the case where - * it doesn't, such as deserialized storage, let's rely on the put returning an Iterator. */ - var valuesAfterPut: Iterator[Any] = null - - // Ditto for the bytes after the put - var bytesAfterPut: ByteBuffer = null - // Size of the block in bytes var size = 0L @@ -834,11 +832,6 @@ private[spark] class BlockManager( blockStore.putBytes(blockId, bytes, putLevel) } size = result.size - result.data match { - case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator - case Right (newBytes) => bytesAfterPut = newBytes - case _ => - } val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid @@ -868,34 +861,23 @@ private[spark] class BlockManager( } logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) - // Either we're storing bytes and we asynchronously started replication, or we're storing - // values and need to serialize and replicate them now: - if (putLevel.replication > 1) { - data match { - case ByteBufferValues(bytes) => - if (replicationFuture != null) { - Await.ready(replicationFuture, Duration.Inf) - } - case _ => - if (blockWasSuccessfullyStored) { - val remoteStartTime = System.currentTimeMillis - // Serialize the block if not already done - if (bytesAfterPut == null) { - if (valuesAfterPut == null) { - throw new SparkException( - "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.") - } - bytesAfterPut = dataSerialize(blockId, valuesAfterPut) - } - replicate(blockId, bytesAfterPut, putLevel) - logDebug("Put block %s remotely took %s" - .format(blockId, Utils.getUsedTimeMs(remoteStartTime))) - } + if (replicationFuture != null) { + // Wait for asynchronous replication to finish + Await.ready(replicationFuture, Duration.Inf) + } else if (putLevel.replication > 1 && blockWasSuccessfullyStored) { + val remoteStartTime = System.currentTimeMillis + val bytesToReplicate = doGetLocal(blockId, putBlockInfo, asBlockResult = false).getOrElse { + throw new SparkException(s"Block $blockId was not found even though it was just stored") + }.asInstanceOf[ByteBuffer] + try { + replicate(blockId, bytesToReplicate, putLevel) + } finally { + BlockManager.dispose(bytesToReplicate) } + logDebug("Put block %s remotely took %s" + .format(blockId, Utils.getUsedTimeMs(remoteStartTime))) } - BlockManager.dispose(bytesAfterPut) - if (putLevel.replication > 1) { logDebug("Putting block %s with replication took %s" .format(blockId, Utils.getUsedTimeMs(startTimeMs))) From 4d940fb23fae42d79028d13563d8ed923c34599d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 3 Mar 2016 13:41:00 -0800 Subject: [PATCH 02/15] Don't request returnValues when putting values in doPut(). --- .../scala/org/apache/spark/storage/BlockManager.scala | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 2da352c43c904..e788a66ad19a1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -806,16 +806,13 @@ private[spark] class BlockManager( .format(blockId, Utils.getUsedTimeMs(startTimeMs))) try { - // returnValues - Whether to return the values put - // blockStore - The type of storage to put these values into - val (returnValues, blockStore: BlockStore) = { + val blockStore: BlockStore = { if (putLevel.useMemory) { // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. - (true, memoryStore) + memoryStore } else if (putLevel.useDisk) { - // Don't get back the bytes from put unless we replicate them - (putLevel.replication > 1, diskStore) + diskStore } else { assert(putLevel == StorageLevel.NONE) throw new BlockException( @@ -826,7 +823,7 @@ private[spark] class BlockManager( // Actually put the values result = data match { case IteratorValues(iterator) => - blockStore.putIterator(blockId, iterator(), putLevel, returnValues) + blockStore.putIterator(blockId, iterator(), putLevel, returnValues = false) case ByteBufferValues(bytes) => bytes.rewind() blockStore.putBytes(blockId, bytes, putLevel) From 2587546579983f7511816eeea416c63a0d72eaac Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 3 Mar 2016 13:57:03 -0800 Subject: [PATCH 03/15] Remove unused PutResult.droppedBlocks field --- core/src/main/scala/org/apache/spark/storage/PutResult.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/PutResult.scala b/core/src/main/scala/org/apache/spark/storage/PutResult.scala index f0eac7594ecf6..e09689068eea4 100644 --- a/core/src/main/scala/org/apache/spark/storage/PutResult.scala +++ b/core/src/main/scala/org/apache/spark/storage/PutResult.scala @@ -24,9 +24,7 @@ import java.nio.ByteBuffer * (1) The estimated size of the put, * (2) The values put if the caller asked for them to be returned (e.g. for chaining * replication), and - * (3) A list of blocks dropped as a result of this put. This is always empty for DiskStore. */ private[spark] case class PutResult( size: Long, - data: Either[Iterator[_], ByteBuffer], - droppedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty) + data: Either[Iterator[_], ByteBuffer]) From 34ba61cd777190f755be9dff803873abfbba41fa Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 3 Mar 2016 14:11:44 -0800 Subject: [PATCH 04/15] Clarify that putBytes() never has returnValues == true --- .../apache/spark/storage/BlockManager.scala | 14 ++++----- .../org/apache/spark/storage/DiskStore.scala | 12 ++------ .../apache/spark/storage/MemoryStore.scala | 30 +++++++++---------- .../org/apache/spark/storage/PutResult.scala | 10 ++----- .../spark/storage/BlockManagerSuite.scala | 11 ++++--- 5 files changed, 31 insertions(+), 46 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e788a66ad19a1..e13760e75d111 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -509,13 +509,11 @@ private[spark] class BlockManager( blockId, values, level, returnValues = true, allowPersistToDisk = false) // The put may or may not have succeeded, depending on whether there was enough // space to unroll the block. Either way, the put here should return an iterator. - putResult.data match { - case Left(it) => - val ci = CompletionIterator[Any, Iterator[Any]](it, releaseLock(blockId)) - return Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) - case _ => - // This only happens if we dropped the values back to disk (which is never) - throw new SparkException("Memory store did not return an iterator!") + if (putResult.data == null) { + throw new SparkException("Memory store did not return an iterator!") + } else { + val ci = CompletionIterator[Any, Iterator[Any]](putResult.data, releaseLock(blockId)) + return Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) } } else { val ci = CompletionIterator[Any, Iterator[Any]](values, releaseLock(blockId)) @@ -680,7 +678,7 @@ private[spark] class BlockManager( // The put failed, likely because the data was too large to fit in memory and could not be // dropped to disk. Therefore, we need to pass the input iterator back to the caller so // that they can decide what to do with the values (e.g. process them without caching). - Right(failedPutResult.data.left.get) + Right(failedPutResult.data) } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index db12a4a1b999a..d407a47c4df3a 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -54,7 +54,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc val finishTime = System.currentTimeMillis logDebug("Block %s stored as %s file on disk in %d ms".format( file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime)) - PutResult(bytes.limit(), Right(bytes.duplicate())) + PutResult(bytes.limit(), null) } override def putIterator( @@ -62,7 +62,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc values: Iterator[Any], level: StorageLevel, returnValues: Boolean): PutResult = { - + require(!returnValues, "returnValues should always be false for DiskStore") logDebug(s"Attempting to write values for block $blockId") val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) @@ -90,13 +90,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc logDebug("Block %s stored as %s file on disk in %d ms".format( file.getName, Utils.bytesToString(length), timeTaken)) - if (returnValues) { - // Return a byte buffer for the contents of the file - val buffer = getBytes(blockId).get - PutResult(length, Right(buffer)) - } else { - PutResult(length, null) - } + PutResult(length, null) } private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = { diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 317d73abba4c6..71a42faacd37f 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -93,10 +93,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo bytes.rewind() if (level.deserialized) { val values = blockManager.dataDeserialize(blockId, bytes) - putIterator(blockId, values, level, returnValues = true) + putIterator(blockId, values, level, returnValues = false) } else { tryToPut(blockId, () => bytes, bytes.limit, deserialized = false) - PutResult(bytes.limit(), Right(bytes.duplicate())) + PutResult(bytes.limit(), null) } } @@ -110,14 +110,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // Work on a duplicate - since the original input might be used elsewhere. lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer] val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false) - val data = - if (putSuccess) { - assert(bytes.limit == size) - Right(bytes.duplicate()) - } else { - null - } - PutResult(size, data) + if (putSuccess) { + assert(bytes.limit == size) + } + PutResult(size, null) } override def putIterator( @@ -144,7 +140,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo blockId: BlockId, values: Iterator[Any], level: StorageLevel, - returnValues: Boolean, + returnValues: Boolean, // TODO(josh): Remove me! allowPersistToDisk: Boolean): PutResult = { val unrolledValues = unrollSafely(blockId, values) unrolledValues match { @@ -154,11 +150,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef]) tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true) - PutResult(sizeEstimate, Left(arrayValues.iterator)) + PutResult(sizeEstimate, arrayValues.iterator) } else { val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) tryToPut(blockId, () => bytes, bytes.limit, deserialized = false) - PutResult(bytes.limit(), Right(bytes.duplicate())) + PutResult(bytes.limit(), null) } } PutResult(res.size, res.data) @@ -166,10 +162,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // Not enough space to unroll this block; drop to disk if applicable if (level.useDisk && allowPersistToDisk) { logWarning(s"Persisting block $blockId to disk instead.") - val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues) - PutResult(res.size, res.data) + val putResult = + blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues = false) + val data = blockManager.diskStore.getValues(blockId).get + PutResult(putResult.size, data) } else { - PutResult(0, Left(iteratorValues)) + PutResult(0, iteratorValues) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/PutResult.scala b/core/src/main/scala/org/apache/spark/storage/PutResult.scala index e09689068eea4..d4d72a11480da 100644 --- a/core/src/main/scala/org/apache/spark/storage/PutResult.scala +++ b/core/src/main/scala/org/apache/spark/storage/PutResult.scala @@ -17,14 +17,10 @@ package org.apache.spark.storage -import java.nio.ByteBuffer - /** * Result of adding a block into a BlockStore. This case class contains a few things: - * (1) The estimated size of the put, + * (1) The estimated size of the put, and * (2) The values put if the caller asked for them to be returned (e.g. for chaining - * replication), and + * replication) */ -private[spark] case class PutResult( - size: Long, - data: Either[Iterator[_], ByteBuffer]) +private[spark] case class PutResult(size: Long, data: Iterator[_]) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 89b427049b548..76926496f3aad 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1162,8 +1162,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.contains("b2")) assert(result1.size > 0) // unroll was successful assert(result2.size > 0) - assert(result1.data.isLeft) // unroll did not drop this block to disk - assert(result2.data.isLeft) + assert(result1.data != null) // unroll did not drop this block to disk + assert(result2.data != null) assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Re-put these two blocks so block manager knows about them too. Otherwise, block manager @@ -1176,7 +1176,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Unroll with not enough space. This should succeed but kick out b1 in the process. val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true) assert(result3.size > 0) - assert(result3.data.isLeft) + assert(result3.data != null) assert(!memoryStore.contains("b1")) assert(memoryStore.contains("b2")) assert(memoryStore.contains("b3")) @@ -1187,7 +1187,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Unroll huge block with not enough space. This should fail and kick out b2 in the process. val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true) assert(result4.size === 0) // unroll was unsuccessful - assert(result4.data.isLeft) + assert(result4.data != null) assert(!memoryStore.contains("b1")) assert(!memoryStore.contains("b2")) assert(memoryStore.contains("b3")) @@ -1231,7 +1231,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // b3, while disk store should contain b1, b2 and b4. val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, returnValues = true) assert(result4.size > 0) - assert(result4.data.isRight) // unroll returned bytes from disk + assert(result4.data != null) // unroll returned bytes from disk assert(!memoryStore.contains("b1")) assert(!memoryStore.contains("b2")) assert(memoryStore.contains("b3")) @@ -1303,7 +1303,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE bytes }) assert(result.size === 10000) - assert(result.data === Right(bytes)) } test("read-locked blocks cannot be evicted from the MemoryStore") { From ca2d23655e5a27dc95d14609a03900de2afecd81 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 3 Mar 2016 14:16:23 -0800 Subject: [PATCH 05/15] Don't call putIterator() with returnValues == true in tests. --- .../org/apache/spark/storage/BlockStore.scala | 2 +- .../spark/storage/BlockManagerSuite.scala | 30 ++++++++----------- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala index d3af50d974232..9ff92fecf0258 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala @@ -39,7 +39,7 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends blockId: BlockId, values: Iterator[Any], level: StorageLevel, - returnValues: Boolean): PutResult + returnValues: Boolean = false): PutResult /** * Return the size of a block in bytes. diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 76926496f3aad..7c622d7f453b1 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1156,14 +1156,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Unroll with plenty of space. This should succeed and cache both blocks. - val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true) - val result2 = memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true) + val result1 = memoryStore.putIterator("b1", smallIterator, memOnly) + val result2 = memoryStore.putIterator("b2", smallIterator, memOnly) assert(memoryStore.contains("b1")) assert(memoryStore.contains("b2")) assert(result1.size > 0) // unroll was successful assert(result2.size > 0) - assert(result1.data != null) // unroll did not drop this block to disk - assert(result2.data != null) assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Re-put these two blocks so block manager knows about them too. Otherwise, block manager @@ -1174,9 +1172,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store.putIterator("b2", smallIterator, memOnly) // Unroll with not enough space. This should succeed but kick out b1 in the process. - val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true) + val result3 = memoryStore.putIterator("b3", smallIterator, memOnly) assert(result3.size > 0) - assert(result3.data != null) assert(!memoryStore.contains("b1")) assert(memoryStore.contains("b2")) assert(memoryStore.contains("b3")) @@ -1185,7 +1182,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store.putIterator("b3", smallIterator, memOnly) // Unroll huge block with not enough space. This should fail and kick out b2 in the process. - val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true) + val result4 = memoryStore.putIterator("b4", bigIterator, memOnly) assert(result4.size === 0) // unroll was unsuccessful assert(result4.data != null) assert(!memoryStore.contains("b1")) @@ -1214,7 +1211,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Unroll with not enough space. This should succeed but kick out b1 in the process. // Memory store should contain b2 and b3, while disk store should contain only b1 - val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, returnValues = true) + val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk) assert(result3.size > 0) assert(!memoryStore.contains("b1")) assert(memoryStore.contains("b2")) @@ -1229,9 +1226,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Unroll huge block with not enough space. This should fail and drop the new block to disk // directly in addition to kicking out b2 in the process. Memory store should contain only // b3, while disk store should contain b1, b2 and b4. - val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, returnValues = true) + val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk) assert(result4.size > 0) - assert(result4.data != null) // unroll returned bytes from disk assert(!memoryStore.contains("b1")) assert(!memoryStore.contains("b2")) assert(memoryStore.contains("b3")) @@ -1252,28 +1248,28 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.currentUnrollMemoryForThisTask === 0) // All unroll memory used is released because unrollSafely returned an array - memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true) + memoryStore.putIterator("b1", smallIterator, memOnly) assert(memoryStore.currentUnrollMemoryForThisTask === 0) - memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true) + memoryStore.putIterator("b2", smallIterator, memOnly) assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Unroll memory is not released because unrollSafely returned an iterator // that still depends on the underlying vector used in the process - memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true) + memoryStore.putIterator("b3", smallIterator, memOnly) val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB3 > 0) // The unroll memory owned by this thread builds on top of its value after the previous unrolls - memoryStore.putIterator("b4", smallIterator, memOnly, returnValues = true) + memoryStore.putIterator("b4", smallIterator, memOnly) val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB4 > unrollMemoryAfterB3) // ... but only to a certain extent (until we run out of free space to grant new unroll memory) - memoryStore.putIterator("b5", smallIterator, memOnly, returnValues = true) + memoryStore.putIterator("b5", smallIterator, memOnly) val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask - memoryStore.putIterator("b6", smallIterator, memOnly, returnValues = true) + memoryStore.putIterator("b6", smallIterator, memOnly) val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask - memoryStore.putIterator("b7", smallIterator, memOnly, returnValues = true) + memoryStore.putIterator("b7", smallIterator, memOnly) val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB5 === unrollMemoryAfterB4) assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) From 68b5e7ceba2b4dc41cc856f7adda72e66ab128ed Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 3 Mar 2016 14:19:25 -0800 Subject: [PATCH 06/15] Remove returnValues from the BlockStore interface. --- .../org/apache/spark/storage/BlockManager.scala | 6 +++--- .../scala/org/apache/spark/storage/BlockStore.scala | 13 +------------ .../scala/org/apache/spark/storage/DiskStore.scala | 4 +--- .../org/apache/spark/storage/MemoryStore.scala | 10 ++++------ 4 files changed, 9 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e13760e75d111..4acbd4616cbea 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -506,7 +506,7 @@ private[spark] class BlockManager( if (level.deserialized) { // Cache the values before returning them val putResult = memoryStore.putIterator( - blockId, values, level, returnValues = true, allowPersistToDisk = false) + blockId, values, level, allowPersistToDisk = false) // The put may or may not have succeeded, depending on whether there was enough // space to unroll the block. Either way, the put here should return an iterator. if (putResult.data == null) { @@ -821,7 +821,7 @@ private[spark] class BlockManager( // Actually put the values result = data match { case IteratorValues(iterator) => - blockStore.putIterator(blockId, iterator(), putLevel, returnValues = false) + blockStore.putIterator(blockId, iterator(), putLevel) case ByteBufferValues(bytes) => bytes.rewind() blockStore.putBytes(blockId, bytes, putLevel) @@ -1041,7 +1041,7 @@ private[spark] class BlockManager( logInfo(s"Writing block $blockId to disk") data() match { case Left(elements) => - diskStore.putIterator(blockId, elements.toIterator, level, returnValues = false) + diskStore.putIterator(blockId, elements.toIterator, level) case Right(bytes) => diskStore.putBytes(blockId, bytes, level) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala index 9ff92fecf0258..2ec8934903c29 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala @@ -28,18 +28,7 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult - /** - * Put in a block and, possibly, also return its content as either bytes or another Iterator. - * This is used to efficiently write the values to multiple locations (e.g. for replication). - * - * @return a PutResult that contains the size of the data, as well as the values put if - * returnValues is true (if not, the result's data field can be null) - */ - def putIterator( - blockId: BlockId, - values: Iterator[Any], - level: StorageLevel, - returnValues: Boolean = false): PutResult + def putIterator(blockId: BlockId, values: Iterator[Any], level: StorageLevel): PutResult /** * Return the size of a block in bytes. diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index d407a47c4df3a..c2ac5507e9f42 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -60,9 +60,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc override def putIterator( blockId: BlockId, values: Iterator[Any], - level: StorageLevel, - returnValues: Boolean): PutResult = { - require(!returnValues, "returnValues should always be false for DiskStore") + level: StorageLevel): PutResult = { logDebug(s"Attempting to write values for block $blockId") val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 71a42faacd37f..b56e606e7e15e 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -93,7 +93,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo bytes.rewind() if (level.deserialized) { val values = blockManager.dataDeserialize(blockId, bytes) - putIterator(blockId, values, level, returnValues = false) + putIterator(blockId, values, level) } else { tryToPut(blockId, () => bytes, bytes.limit, deserialized = false) PutResult(bytes.limit(), null) @@ -119,9 +119,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo override def putIterator( blockId: BlockId, values: Iterator[Any], - level: StorageLevel, - returnValues: Boolean): PutResult = { - putIterator(blockId, values, level, returnValues, allowPersistToDisk = true) + level: StorageLevel): PutResult = { + putIterator(blockId, values, level, allowPersistToDisk = true) } /** @@ -140,7 +139,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo blockId: BlockId, values: Iterator[Any], level: StorageLevel, - returnValues: Boolean, // TODO(josh): Remove me! allowPersistToDisk: Boolean): PutResult = { val unrolledValues = unrollSafely(blockId, values) unrolledValues match { @@ -163,7 +161,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo if (level.useDisk && allowPersistToDisk) { logWarning(s"Persisting block $blockId to disk instead.") val putResult = - blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues = false) + blockManager.diskStore.putIterator(blockId, iteratorValues, level) val data = blockManager.diskStore.getValues(blockId).get PutResult(putResult.size, data) } else { From ae66ccc8ae55f0171e301ddf03bb15d0b3fcb9a6 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 3 Mar 2016 14:46:43 -0800 Subject: [PATCH 07/15] Change putIterator return type to be an Either. --- .../apache/spark/storage/BlockManager.scala | 40 ++++++++++--------- .../org/apache/spark/storage/BlockStore.scala | 11 ++++- .../org/apache/spark/storage/DiskStore.scala | 4 +- .../apache/spark/storage/MemoryStore.scala | 20 +++++----- .../spark/storage/BlockManagerSuite.scala | 13 +++--- 5 files changed, 50 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 4acbd4616cbea..8ea9a26ddc9a7 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -503,22 +503,23 @@ private[spark] class BlockManager( return Some(bytes) } else { val values = dataDeserialize(blockId, bytes) - if (level.deserialized) { - // Cache the values before returning them - val putResult = memoryStore.putIterator( - blockId, values, level, allowPersistToDisk = false) - // The put may or may not have succeeded, depending on whether there was enough - // space to unroll the block. Either way, the put here should return an iterator. - if (putResult.data == null) { - throw new SparkException("Memory store did not return an iterator!") + val valuesToReturn: Iterator[Any] = { + if (level.deserialized) { + // Cache the values before returning them + memoryStore.putIterator(blockId, values, level, allowPersistToDisk = false) match { + case Left(iter) => + // The memory store put() failed, so it returned the iterator back to us: + iter + case Right(_) => + // The put() succeeded, so we can read the values back: + memoryStore.getValues(blockId).get + } } else { - val ci = CompletionIterator[Any, Iterator[Any]](putResult.data, releaseLock(blockId)) - return Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) + values } - } else { - val ci = CompletionIterator[Any, Iterator[Any]](values, releaseLock(blockId)) - return Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) } + val ci = CompletionIterator[Any, Iterator[Any]](valuesToReturn, releaseLock(blockId)) + return Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) } } } else { @@ -797,7 +798,6 @@ private[spark] class BlockManager( } var blockWasSuccessfullyStored = false - var result: PutResult = null putBlockInfo.synchronized { logTrace("Put for block %s took %s to get into synchronized block" @@ -819,14 +819,16 @@ private[spark] class BlockManager( } // Actually put the values - result = data match { + size = data match { case IteratorValues(iterator) => - blockStore.putIterator(blockId, iterator(), putLevel) + blockStore.putIterator(blockId, iterator(), putLevel) match { + case Right(s) => s + case Left(_) => 0 + } case ByteBufferValues(bytes) => bytes.rewind() - blockStore.putBytes(blockId, bytes, putLevel) + blockStore.putBytes(blockId, bytes, putLevel).size } - size = result.size val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid @@ -884,7 +886,7 @@ private[spark] class BlockManager( if (blockWasSuccessfullyStored) { None } else { - Some(result) + Some(PutResult(size, null)) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala index 2ec8934903c29..1a34fb03ed789 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala @@ -28,7 +28,16 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult - def putIterator(blockId: BlockId, values: Iterator[Any], level: StorageLevel): PutResult + /** + * Attempt to store an iterator of values. + * + * @return an iterator of values (in case the put failed), or the estimated size of the stored + * values if the put succeeded. + */ + def putIterator( + blockId: BlockId, + values: Iterator[Any], + level: StorageLevel): Either[Iterator[Any], Long] /** * Return the size of a block in bytes. diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index c2ac5507e9f42..543a00d64d3a7 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -60,7 +60,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc override def putIterator( blockId: BlockId, values: Iterator[Any], - level: StorageLevel): PutResult = { + level: StorageLevel): Right[Iterator[Any], Long] = { logDebug(s"Attempting to write values for block $blockId") val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) @@ -88,7 +88,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc logDebug("Block %s stored as %s file on disk in %d ms".format( file.getName, Utils.bytesToString(length), timeTaken)) - PutResult(length, null) + Right(length) } private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = { diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index b56e606e7e15e..0e2203e57d324 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -93,7 +93,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo bytes.rewind() if (level.deserialized) { val values = blockManager.dataDeserialize(blockId, bytes) - putIterator(blockId, values, level) + putIterator(blockId, values, level) match { + case Right(size) => + PutResult(size, null) + case Left(_) => + PutResult(0, null) + } } else { tryToPut(blockId, () => bytes, bytes.limit, deserialized = false) PutResult(bytes.limit(), null) @@ -119,7 +124,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo override def putIterator( blockId: BlockId, values: Iterator[Any], - level: StorageLevel): PutResult = { + level: StorageLevel): Either[Iterator[Any], Long] = { putIterator(blockId, values, level, allowPersistToDisk = true) } @@ -139,7 +144,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo blockId: BlockId, values: Iterator[Any], level: StorageLevel, - allowPersistToDisk: Boolean): PutResult = { + allowPersistToDisk: Boolean): Either[Iterator[Any], Long] = { val unrolledValues = unrollSafely(blockId, values) unrolledValues match { case Left(arrayValues) => @@ -155,17 +160,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo PutResult(bytes.limit(), null) } } - PutResult(res.size, res.data) + Right(res.size) case Right(iteratorValues) => // Not enough space to unroll this block; drop to disk if applicable if (level.useDisk && allowPersistToDisk) { logWarning(s"Persisting block $blockId to disk instead.") - val putResult = - blockManager.diskStore.putIterator(blockId, iteratorValues, level) - val data = blockManager.diskStore.getValues(blockId).get - PutResult(putResult.size, data) + blockManager.diskStore.putIterator(blockId, iteratorValues, level) } else { - PutResult(0, iteratorValues) + Left(iteratorValues) } } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 7c622d7f453b1..64ea10049f321 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1160,8 +1160,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val result2 = memoryStore.putIterator("b2", smallIterator, memOnly) assert(memoryStore.contains("b1")) assert(memoryStore.contains("b2")) - assert(result1.size > 0) // unroll was successful - assert(result2.size > 0) + assert(result1.isRight) // unroll was successful + assert(result2.isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Re-put these two blocks so block manager knows about them too. Otherwise, block manager @@ -1173,7 +1173,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Unroll with not enough space. This should succeed but kick out b1 in the process. val result3 = memoryStore.putIterator("b3", smallIterator, memOnly) - assert(result3.size > 0) + assert(result3.isRight) assert(!memoryStore.contains("b1")) assert(memoryStore.contains("b2")) assert(memoryStore.contains("b3")) @@ -1183,8 +1183,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Unroll huge block with not enough space. This should fail and kick out b2 in the process. val result4 = memoryStore.putIterator("b4", bigIterator, memOnly) - assert(result4.size === 0) // unroll was unsuccessful - assert(result4.data != null) + assert(result4.isLeft) // unroll was unsuccessful assert(!memoryStore.contains("b1")) assert(!memoryStore.contains("b2")) assert(memoryStore.contains("b3")) @@ -1212,7 +1211,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Unroll with not enough space. This should succeed but kick out b1 in the process. // Memory store should contain b2 and b3, while disk store should contain only b1 val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk) - assert(result3.size > 0) + assert(result3.isRight) assert(!memoryStore.contains("b1")) assert(memoryStore.contains("b2")) assert(memoryStore.contains("b3")) @@ -1227,7 +1226,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // directly in addition to kicking out b2 in the process. Memory store should contain only // b3, while disk store should contain b1, b2 and b4. val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk) - assert(result4.size > 0) + assert(result4.isRight) assert(!memoryStore.contains("b1")) assert(!memoryStore.contains("b2")) assert(memoryStore.contains("b3")) From 5c9259459c24038d94a763436d389619ab48d0c3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 3 Mar 2016 14:55:06 -0800 Subject: [PATCH 08/15] Remove last accesses of PutResult.data. --- .../org/apache/spark/storage/BlockManager.scala | 17 ++++++++++------- .../spark/storage/BlockManagerSuite.scala | 1 - 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 8ea9a26ddc9a7..efbf4bc77aaa1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -679,7 +679,7 @@ private[spark] class BlockManager( // The put failed, likely because the data was too large to fit in memory and could not be // dropped to disk. Therefore, we need to pass the input iterator back to the caller so // that they can decide what to do with the values (e.g. process them without caching). - Right(failedPutResult.data) + Right(failedPutResult.get) } } @@ -740,9 +740,9 @@ private[spark] class BlockManager( * @param keepReadLock if true, this method will hold the read lock when it returns (even if the * block already exists). If false, this method will hold no locks when it * returns. - * @return `Some(PutResult)` if the block did not exist and could not be successfully cached, - * or None if the block already existed or was successfully stored (fully consuming - * the input data / input iterator). + * @return `Some(Option[Iterator[Any])` if the block did not exist and could not be successfully + * cached, or None if the block already existed or was successfully stored (fully + * consuming the input data / input iterator). */ private def doPut( blockId: BlockId, @@ -750,7 +750,7 @@ private[spark] class BlockManager( level: StorageLevel, tellMaster: Boolean = true, effectiveStorageLevel: Option[StorageLevel] = None, - keepReadLock: Boolean = false): Option[PutResult] = { + keepReadLock: Boolean = false): Option[Option[Iterator[Any]]] = { require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") @@ -798,6 +798,7 @@ private[spark] class BlockManager( } var blockWasSuccessfullyStored = false + var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None putBlockInfo.synchronized { logTrace("Put for block %s took %s to get into synchronized block" @@ -823,7 +824,9 @@ private[spark] class BlockManager( case IteratorValues(iterator) => blockStore.putIterator(blockId, iterator(), putLevel) match { case Right(s) => s - case Left(_) => 0 + case Left(iter) => + iteratorFromFailedMemoryStorePut = Some(iter) + 0 } case ByteBufferValues(bytes) => bytes.rewind() @@ -886,7 +889,7 @@ private[spark] class BlockManager( if (blockWasSuccessfullyStored) { None } else { - Some(PutResult(size, null)) + Some(iteratorFromFailedMemoryStorePut) } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 64ea10049f321..8c8bd3a147d84 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1285,7 +1285,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE fail("A big ByteBuffer that cannot be put into MemoryStore should not be created") }) assert(result.size === 13000) - assert(result.data === null) } test("put a small ByteBuffer to MemoryStore") { From 2a2d50d4e2eb84c071c4c667b2ae2b4e391ced63 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 3 Mar 2016 14:57:32 -0800 Subject: [PATCH 09/15] Remove the PutResult.data field. --- .../scala/org/apache/spark/storage/DiskStore.scala | 2 +- .../scala/org/apache/spark/storage/MemoryStore.scala | 12 ++++++------ .../scala/org/apache/spark/storage/PutResult.scala | 8 +++----- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 543a00d64d3a7..23c33a87130cf 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -54,7 +54,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc val finishTime = System.currentTimeMillis logDebug("Block %s stored as %s file on disk in %d ms".format( file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime)) - PutResult(bytes.limit(), null) + PutResult(bytes.limit()) } override def putIterator( diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 0e2203e57d324..34a6a8c93e7a1 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -95,13 +95,13 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo val values = blockManager.dataDeserialize(blockId, bytes) putIterator(blockId, values, level) match { case Right(size) => - PutResult(size, null) + PutResult(size) case Left(_) => - PutResult(0, null) + PutResult(0) } } else { tryToPut(blockId, () => bytes, bytes.limit, deserialized = false) - PutResult(bytes.limit(), null) + PutResult(bytes.limit()) } } @@ -118,7 +118,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo if (putSuccess) { assert(bytes.limit == size) } - PutResult(size, null) + PutResult(size) } override def putIterator( @@ -153,11 +153,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef]) tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true) - PutResult(sizeEstimate, arrayValues.iterator) + PutResult(sizeEstimate) } else { val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) tryToPut(blockId, () => bytes, bytes.limit, deserialized = false) - PutResult(bytes.limit(), null) + PutResult(bytes.limit()) } } Right(res.size) diff --git a/core/src/main/scala/org/apache/spark/storage/PutResult.scala b/core/src/main/scala/org/apache/spark/storage/PutResult.scala index d4d72a11480da..c0d03355a190b 100644 --- a/core/src/main/scala/org/apache/spark/storage/PutResult.scala +++ b/core/src/main/scala/org/apache/spark/storage/PutResult.scala @@ -18,9 +18,7 @@ package org.apache.spark.storage /** - * Result of adding a block into a BlockStore. This case class contains a few things: - * (1) The estimated size of the put, and - * (2) The values put if the caller asked for them to be returned (e.g. for chaining - * replication) + * Result of adding a block into a BlockStore. + * This case class contains the estimated size of the put, and */ -private[spark] case class PutResult(size: Long, data: Iterator[_]) +private[spark] case class PutResult(size: Long) From b38ee81ea5048be5ee213f76f4d4687e27a3250a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 3 Mar 2016 15:12:14 -0800 Subject: [PATCH 10/15] Access DiskStore and MemoryStore directly instead of through BlockStore interface in doPut(). --- .../apache/spark/storage/BlockManager.scala | 56 ++++++++++--------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index efbf4bc77aaa1..5f89a9864c9c1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -805,32 +805,38 @@ private[spark] class BlockManager( .format(blockId, Utils.getUsedTimeMs(startTimeMs))) try { - val blockStore: BlockStore = { - if (putLevel.useMemory) { - // Put it in memory first, even if it also has useDisk set to true; - // We will drop it to disk later if the memory store can't hold it. - memoryStore - } else if (putLevel.useDisk) { - diskStore - } else { - assert(putLevel == StorageLevel.NONE) - throw new BlockException( - blockId, s"Attempted to put block $blockId without specifying storage level!") + if (putLevel.useMemory) { + // Put it in memory first, even if it also has useDisk set to true; + // We will drop it to disk later if the memory store can't hold it. + data match { + case IteratorValues(iterator) => + memoryStore.putIterator(blockId, iterator(), putLevel) match { + case Right(s) => + size = s + case Left(iter) => + iteratorFromFailedMemoryStorePut = Some(iter) + } + case ByteBufferValues(bytes) => + bytes.rewind() + size = bytes.limit() + memoryStore.putBytes(blockId, bytes, putLevel) } - } - - // Actually put the values - size = data match { - case IteratorValues(iterator) => - blockStore.putIterator(blockId, iterator(), putLevel) match { - case Right(s) => s - case Left(iter) => - iteratorFromFailedMemoryStorePut = Some(iter) - 0 - } - case ByteBufferValues(bytes) => - bytes.rewind() - blockStore.putBytes(blockId, bytes, putLevel).size + } else if (putLevel.useDisk) { + data match { + case IteratorValues(iterator) => + diskStore.putIterator(blockId, iterator(), putLevel) match { + case Right(s) => + size = s + } + case ByteBufferValues(bytes) => + bytes.rewind() + size = bytes.limit() + diskStore.putBytes(blockId, bytes, putLevel) + } + } else { + assert(putLevel == StorageLevel.NONE) + throw new BlockException( + blockId, s"Attempted to put block $blockId without specifying storage level!") } val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) From 83a2dfb78a6d0058118d284dcf73361fadb39477 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 3 Mar 2016 15:14:29 -0800 Subject: [PATCH 11/15] Remove unused DiskStore.getBytes() overload. --- core/src/main/scala/org/apache/spark/storage/DiskStore.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 23c33a87130cf..e0af410f3ff80 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -119,10 +119,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc getBytes(file, 0, file.length) } - def getBytes(segment: FileSegment): Option[ByteBuffer] = { - getBytes(segment.file, segment.offset, segment.length) - } - override def getValues(blockId: BlockId): Option[Iterator[Any]] = { getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) } From e24aea1153e182c2ff1bb498246b86d85f052557 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 3 Mar 2016 15:29:59 -0800 Subject: [PATCH 12/15] Change putBytes' return type to Unit, since it's never checked. Eventually it would make sense to give this a Boolean return type, but I'd like to do this later since there's actually a tricky semantics issue when considering what the return type is when saving to the MemoryStore. --- .../scala/org/apache/spark/storage/BlockStore.scala | 2 +- .../scala/org/apache/spark/storage/DiskStore.scala | 3 +-- .../org/apache/spark/storage/MemoryStore.scala | 13 +++---------- .../apache/spark/storage/BlockManagerSuite.scala | 7 +++---- 4 files changed, 8 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala index 1a34fb03ed789..b069918b16106 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala @@ -26,7 +26,7 @@ import org.apache.spark.Logging */ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends Logging { - def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult + def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): Unit /** * Attempt to store an iterator of values. diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index e0af410f3ff80..e35aa1b0684da 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -36,7 +36,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc diskManager.getFile(blockId.name).length } - override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = { + override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = { // So that we do not modify the input offsets ! // duplicate does not copy buffer, so inexpensive val bytes = _bytes.duplicate() @@ -54,7 +54,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc val finishTime = System.currentTimeMillis logDebug("Block %s stored as %s file on disk in %d ms".format( file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime)) - PutResult(bytes.limit()) } override def putIterator( diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 34a6a8c93e7a1..50585b0ecd9f0 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -87,21 +87,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } } - override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = { + override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = { // Work on a duplicate - since the original input might be used elsewhere. val bytes = _bytes.duplicate() bytes.rewind() if (level.deserialized) { val values = blockManager.dataDeserialize(blockId, bytes) - putIterator(blockId, values, level) match { - case Right(size) => - PutResult(size) - case Left(_) => - PutResult(0) - } + putIterator(blockId, values, level) } else { tryToPut(blockId, () => bytes, bytes.limit, deserialized = false) - PutResult(bytes.limit()) } } @@ -111,14 +105,13 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * * The caller should guarantee that `size` is correct. */ - def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = { + def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Unit = { // Work on a duplicate - since the original input might be used elsewhere. lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer] val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false) if (putSuccess) { assert(bytes.limit == size) } - PutResult(size) } override def putIterator( diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 8c8bd3a147d84..cfcbf1745d1b1 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1281,10 +1281,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val blockId = BlockId("rdd_3_10") store.blockInfoManager.lockNewBlockForWriting( blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false)) - val result = memoryStore.putBytes(blockId, 13000, () => { + memoryStore.putBytes(blockId, 13000, () => { fail("A big ByteBuffer that cannot be put into MemoryStore should not be created") }) - assert(result.size === 13000) } test("put a small ByteBuffer to MemoryStore") { @@ -1292,11 +1291,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val memoryStore = store.memoryStore val blockId = BlockId("rdd_3_10") var bytes: ByteBuffer = null - val result = memoryStore.putBytes(blockId, 10000, () => { + memoryStore.putBytes(blockId, 10000, () => { bytes = ByteBuffer.allocate(10000) bytes }) - assert(result.size === 10000) + assert(memoryStore.getSize(blockId) === 10000) } test("read-locked blocks cannot be evicted from the MemoryStore") { From 6381b00a94c7bf4ea0693fc4ae6868ef0f866dc4 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 3 Mar 2016 15:31:50 -0800 Subject: [PATCH 13/15] Remove PutResult. --- .../apache/spark/storage/MemoryStore.scala | 8 +++---- .../org/apache/spark/storage/PutResult.scala | 24 ------------------- 2 files changed, 4 insertions(+), 28 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/storage/PutResult.scala diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 50585b0ecd9f0..12b70d1807994 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -142,18 +142,18 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo unrolledValues match { case Left(arrayValues) => // Values are fully unrolled in memory, so store them as an array - val res = { + val size = { if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef]) tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true) - PutResult(sizeEstimate) + sizeEstimate } else { val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) tryToPut(blockId, () => bytes, bytes.limit, deserialized = false) - PutResult(bytes.limit()) + bytes.limit() } } - Right(res.size) + Right(size) case Right(iteratorValues) => // Not enough space to unroll this block; drop to disk if applicable if (level.useDisk && allowPersistToDisk) { diff --git a/core/src/main/scala/org/apache/spark/storage/PutResult.scala b/core/src/main/scala/org/apache/spark/storage/PutResult.scala deleted file mode 100644 index c0d03355a190b..0000000000000 --- a/core/src/main/scala/org/apache/spark/storage/PutResult.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.storage - -/** - * Result of adding a block into a BlockStore. - * This case class contains the estimated size of the put, and - */ -private[spark] case class PutResult(size: Long) From 3d8f47931915e39feea45f4be30280bb7b472a4b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 4 Mar 2016 20:25:45 -0800 Subject: [PATCH 14/15] Address review comments. --- .../apache/spark/storage/BlockManager.scala | 56 ++++++++++++++----- 1 file changed, 41 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 5f89a9864c9c1..f8563ec44fe90 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -52,6 +52,12 @@ private[spark] class BlockResult( val readMethod: DataReadMethod.Value, val bytes: Long) +// Class for representing return value of doPut() +private sealed trait DoPutResult +private case object DoPutSucceeded extends DoPutResult +private case object DoPutBytesFailed extends DoPutResult +private case class DoPutIteratorFailed(iter: Iterator[Any]) extends DoPutResult + /** * Manager running on every node (driver and executors) which provides interfaces for putting and * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap). @@ -436,6 +442,10 @@ private[spark] class BlockManager( } } + /** + * Get a local block from the block manager. + * Assumes that the caller holds a read lock on the block. + */ private def doGetLocal( blockId: BlockId, info: BlockInfo, @@ -665,7 +675,7 @@ private[spark] class BlockManager( makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = { // Initially we hold no locks on this block. doPut(blockId, IteratorValues(makeIterator), level, keepReadLock = true) match { - case None => + case DoPutSucceeded => // doPut() didn't hand work back to us, so the block already existed or was successfully // stored. Therefore, we now hold a read lock on the block. val blockResult = get(blockId).getOrElse { @@ -675,11 +685,13 @@ private[spark] class BlockManager( throw new SparkException(s"get() failed for block $blockId even though we held a lock") } Left(blockResult) - case Some(failedPutResult) => + case DoPutIteratorFailed(iter) => // The put failed, likely because the data was too large to fit in memory and could not be // dropped to disk. Therefore, we need to pass the input iterator back to the caller so // that they can decide what to do with the values (e.g. process them without caching). - Right(failedPutResult.get) + Right(iter) + case DoPutBytesFailed => + throw new SparkException("doPut returned an invalid failure response") } } @@ -693,7 +705,13 @@ private[spark] class BlockManager( tellMaster: Boolean = true, effectiveStorageLevel: Option[StorageLevel] = None): Boolean = { require(values != null, "Values is null") - doPut(blockId, IteratorValues(() => values), level, tellMaster, effectiveStorageLevel).isEmpty + val result = doPut( + blockId, + IteratorValues(() => values), + level, + tellMaster, + effectiveStorageLevel) + result == DoPutSucceeded } /** @@ -725,7 +743,8 @@ private[spark] class BlockManager( tellMaster: Boolean = true, effectiveStorageLevel: Option[StorageLevel] = None): Boolean = { require(bytes != null, "Bytes is null") - doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel).isEmpty + val result = doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel) + result == DoPutSucceeded } /** @@ -740,9 +759,9 @@ private[spark] class BlockManager( * @param keepReadLock if true, this method will hold the read lock when it returns (even if the * block already exists). If false, this method will hold no locks when it * returns. - * @return `Some(Option[Iterator[Any])` if the block did not exist and could not be successfully - * cached, or None if the block already existed or was successfully stored (fully - * consuming the input data / input iterator). + * @return [[DoPutSucceeded]] if the block was already present or if the put succeeded, or + * [[DoPutBytesFailed]] if the put failed and we were storing bytes, or + * [[DoPutIteratorFailed]] if the put succeeded and we were storing an iterator. */ private def doPut( blockId: BlockId, @@ -750,7 +769,7 @@ private[spark] class BlockManager( level: StorageLevel, tellMaster: Boolean = true, effectiveStorageLevel: Option[StorageLevel] = None, - keepReadLock: Boolean = false): Option[Option[Iterator[Any]]] = { + keepReadLock: Boolean = false): DoPutResult = { require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") @@ -771,7 +790,7 @@ private[spark] class BlockManager( // lockNewBlockForWriting returned a read lock on the existing block, so we must free it: releaseLock(blockId) } - return None + return DoPutSucceeded } } @@ -827,6 +846,7 @@ private[spark] class BlockManager( diskStore.putIterator(blockId, iterator(), putLevel) match { case Right(s) => size = s + // putIterator() will never return Left (see its return type). } case ByteBufferValues(bytes) => bytes.rewind() @@ -872,9 +892,13 @@ private[spark] class BlockManager( Await.ready(replicationFuture, Duration.Inf) } else if (putLevel.replication > 1 && blockWasSuccessfullyStored) { val remoteStartTime = System.currentTimeMillis - val bytesToReplicate = doGetLocal(blockId, putBlockInfo, asBlockResult = false).getOrElse { - throw new SparkException(s"Block $blockId was not found even though it was just stored") - }.asInstanceOf[ByteBuffer] + val bytesToReplicate: ByteBuffer = { + doGetLocal(blockId, putBlockInfo, asBlockResult = false) + .map(_.asInstanceOf[ByteBuffer]) + .getOrElse { + throw new SparkException(s"Block $blockId was not found even though it was just stored") + } + } try { replicate(blockId, bytesToReplicate, putLevel) } finally { @@ -893,9 +917,11 @@ private[spark] class BlockManager( } if (blockWasSuccessfullyStored) { - None + DoPutSucceeded + } else if (iteratorFromFailedMemoryStorePut.isDefined) { + DoPutIteratorFailed(iteratorFromFailedMemoryStorePut.get) } else { - Some(iteratorFromFailedMemoryStorePut) + DoPutBytesFailed } } From 910f9977e5537410da994a0e93096dd7e1e8dbfe Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 4 Mar 2016 20:27:47 -0800 Subject: [PATCH 15/15] Comment typo. --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f8563ec44fe90..dcf359e3c29fe 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -761,7 +761,7 @@ private[spark] class BlockManager( * returns. * @return [[DoPutSucceeded]] if the block was already present or if the put succeeded, or * [[DoPutBytesFailed]] if the put failed and we were storing bytes, or - * [[DoPutIteratorFailed]] if the put succeeded and we were storing an iterator. + * [[DoPutIteratorFailed]] if the put failed and we were storing an iterator. */ private def doPut( blockId: BlockId,