From 7e1bb1331d20881ab72a4ee0b55ebc8ab73cdf86 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 18 Apr 2015 22:12:44 +0800 Subject: [PATCH 1/6] Try to cache remotely received block if enabled by configuration. --- .../scala/org/apache/spark/CacheManager.scala | 13 +++++-- .../apache/spark/storage/BlockManager.scala | 7 ++++ .../org/apache/spark/CacheManagerSuite.scala | 38 +++++++++++++++++++ 3 files changed, 55 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index a96d754744a05..43bd6be771ced 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -49,7 +49,15 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { .getInputMetricsForReadMethod(inputMetrics.readMethod) existingMetrics.incBytesRead(inputMetrics.bytesRead) - val iter = blockResult.data.asInstanceOf[Iterator[T]] + val buf = blockResult.data.toArray + if (rdd.sparkContext.getConf.getBoolean("spark.rdd.remoteblock.cache", false)) { + // If the block is retrieved remotely, try to cache it locally + if (!blockManager.containsBlockId(key)) { + blockManager.putArray(key, buf, storageLevel) + } + } + + val iter = buf.toIterator.asInstanceOf[Iterator[T]] new InterruptibleIterator[T](context, iter) { override def next(): T = { existingMetrics.incRecordsRead(1) @@ -73,7 +81,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { if (context.isRunningLocally) { return computedValues } - // Otherwise, cache the values and keep track of any updates in block statuses val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks) @@ -115,7 +122,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { } } logInfo(s"Finished waiting for $id") - val values = blockManager.get(id) + val values = blockManager.getLocal(id) if (!values.isDefined) { /* The block is not guaranteed to exist even after the other thread has finished. * For instance, the block could be evicted after it was put, but before our get. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1aa0ef18de118..653bc57b5fcb9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -343,6 +343,13 @@ private[spark] class BlockManager( (blockInfo.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq } + /** + * Query if the given block is stored in the block manager. + */ + def containsBlockId(blockId: BlockId): Boolean = { + blockInfo.contains(blockId) + } + /** * Tell the master about the current storage status of a block. This will send a block update * message reflecting the current status, *not* the desired storage level in its block info. diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index 70529d9216591..be34ce79fc64e 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -24,6 +24,8 @@ import org.scalatest.mock.MockitoSugar import org.apache.spark.executor.DataReadMethod import org.apache.spark.rdd.RDD import org.apache.spark.storage._ +import org.apache.spark.util.TimeStampedHashMap + // TODO: Test the CacheManager's thread-safety aspects class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter @@ -82,6 +84,42 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf assert(value.toList === List(5, 6, 7)) } + test("cache remotely block") { + val conf = new SparkConf(true) + conf.set("spark.rdd.remoteblock.cache", "true") + sc = new SparkContext("local", "test", conf) + rdd = new RDD[Int](sc, Nil) { + override def getPartitions: Array[Partition] = Array(split) + override val getDependencies = List[Dependency[_]]() + override def compute(split: Partition, context: TaskContext): Iterator[Int] = + Array(1, 2, 3, 4).iterator + } + + val blockInfo = new TimeStampedHashMap[BlockId, Boolean] + val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12) + // mock remotely received block + when(blockManager.get(RDDBlockId(0, 0))).thenReturn(Some(result)) + when(blockManager.containsBlockId(RDDBlockId(0, 0))) + .thenReturn(false) + when(blockManager.putArray(RDDBlockId(0, 0), Array(5, 6, 7), StorageLevel.MEMORY_ONLY)) + .thenThrow(new RuntimeException("putArray")) + + assert(blockInfo.contains(RDDBlockId(0, 0)) === false) + + val context = new TaskContextImpl(0, 0, 0, 0) + try { + val computeValue = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) + } catch { + case e: RuntimeException => + if (e.getMessage().contains("putArray")) { + blockInfo.putIfAbsent(RDDBlockId(0, 0), true) + } + } + + // remotely block is cached now + assert(blockInfo.contains(RDDBlockId(0, 0)) === true) + } + test("get uncached local rdd") { // Local computation should not persist the resulting value, so don't expect a put(). when(blockManager.get(RDDBlockId(0, 0))).thenReturn(None) From 098167f5fafbcd3dd2e1393a12df4537a3bade17 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 19 Apr 2015 00:57:09 +0800 Subject: [PATCH 2/6] Remove configuration and add new paraemter to rdd.iterator. --- .../main/scala/org/apache/spark/CacheManager.scala | 7 +++---- .../org/apache/spark/api/python/PythonRDD.scala | 3 ++- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 13 ++++++++++--- .../scala/org/apache/spark/CacheManagerSuite.scala | 13 ++----------- 4 files changed, 17 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 43bd6be771ced..8d7c0aceaec39 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -37,7 +37,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { rdd: RDD[T], partition: Partition, context: TaskContext, - storageLevel: StorageLevel): Iterator[T] = { + storageLevel: StorageLevel, + cacheRemote: Boolean = false): Iterator[T] = { val key = RDDBlockId(rdd.id, partition.index) logDebug(s"Looking for partition $key") @@ -50,11 +51,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { existingMetrics.incBytesRead(inputMetrics.bytesRead) val buf = blockResult.data.toArray - if (rdd.sparkContext.getConf.getBoolean("spark.rdd.remoteblock.cache", false)) { // If the block is retrieved remotely, try to cache it locally - if (!blockManager.containsBlockId(key)) { + if (cacheRemote && !blockManager.containsBlockId(key)) { blockManager.putArray(key, buf, storageLevel) - } } val iter = buf.toIterator.asInstanceOf[Iterator[T]] diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 7409dc2d866f6..ca1700a541d78 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -242,7 +242,8 @@ private[spark] class PythonRDD( dataOut.writeInt(command.length) dataOut.write(command) // Data values - PythonRDD.writeIteratorToStream(firstParent.iterator(split, context), dataOut) + PythonRDD.writeIteratorToStream( + firstParent.iterator(split, context, cacheRemote = false), dataOut) dataOut.writeInt(SpecialLengths.END_OF_DATA_SECTION) dataOut.writeInt(SpecialLengths.END_OF_STREAM) dataOut.flush() diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index d80d94a588346..8b2d33e3b379d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -236,10 +236,17 @@ abstract class RDD[T: ClassTag]( * Internal method to this RDD; will read from cache if applicable, or otherwise compute it. * This should ''not'' be called by users directly, but is available for implementors of custom * subclasses of RDD. - */ - final def iterator(split: Partition, context: TaskContext): Iterator[T] = { + * @param split the partition of RDD + * @param context context of task + @param cacheRemote whether to cache remotely received block in block manager or not + * @return Iterator of data in RDD + */ + final def iterator( + split: Partition, + context: TaskContext, + cacheRemote: Boolean = false): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { - SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) + SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel, cacheRemote) } else { computeOrReadCheckpoint(split, context) } diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index be34ce79fc64e..fd309b58a0a3e 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -85,16 +85,6 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf } test("cache remotely block") { - val conf = new SparkConf(true) - conf.set("spark.rdd.remoteblock.cache", "true") - sc = new SparkContext("local", "test", conf) - rdd = new RDD[Int](sc, Nil) { - override def getPartitions: Array[Partition] = Array(split) - override val getDependencies = List[Dependency[_]]() - override def compute(split: Partition, context: TaskContext): Iterator[Int] = - Array(1, 2, 3, 4).iterator - } - val blockInfo = new TimeStampedHashMap[BlockId, Boolean] val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12) // mock remotely received block @@ -108,7 +98,8 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf val context = new TaskContextImpl(0, 0, 0, 0) try { - val computeValue = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) + val computeValue = + cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY, cacheRemote = true) } catch { case e: RuntimeException => if (e.getMessage().contains("putArray")) { From 6a01c77fcc7c6f4fe9f39a09c785e400b66d9d32 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 19 Apr 2015 01:34:48 +0800 Subject: [PATCH 3/6] Address binary incompatibilities and minor style in test. --- .../org/apache/spark/api/python/PythonRDD.scala | 3 +-- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 13 +++++++++++-- .../scala/org/apache/spark/CacheManagerSuite.scala | 4 ++-- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index ca1700a541d78..7409dc2d866f6 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -242,8 +242,7 @@ private[spark] class PythonRDD( dataOut.writeInt(command.length) dataOut.write(command) // Data values - PythonRDD.writeIteratorToStream( - firstParent.iterator(split, context, cacheRemote = false), dataOut) + PythonRDD.writeIteratorToStream(firstParent.iterator(split, context), dataOut) dataOut.writeInt(SpecialLengths.END_OF_DATA_SECTION) dataOut.writeInt(SpecialLengths.END_OF_STREAM) dataOut.flush() diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 8b2d33e3b379d..1721400b52054 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -232,19 +232,28 @@ abstract class RDD[T: ClassTag]( } } + /** + * Internal method to this RDD; will read from cache if applicable, or otherwise compute it. + * This should ''not'' be called by users directly, but is available for implementors of custom + * subclasses of RDD. + */ + final def iterator(split: Partition, context: TaskContext): Iterator[T] = { + iterator(split, context, cacheRemote = false) + } + /** * Internal method to this RDD; will read from cache if applicable, or otherwise compute it. * This should ''not'' be called by users directly, but is available for implementors of custom * subclasses of RDD. * @param split the partition of RDD * @param context context of task - @param cacheRemote whether to cache remotely received block in block manager or not + * @param cacheRemote whether to cache remotely received block in block manager or not * @return Iterator of data in RDD */ final def iterator( split: Partition, context: TaskContext, - cacheRemote: Boolean = false): Iterator[T] = { + cacheRemote: Boolean): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel, cacheRemote) } else { diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index fd309b58a0a3e..4719a1d254e17 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -94,7 +94,7 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf when(blockManager.putArray(RDDBlockId(0, 0), Array(5, 6, 7), StorageLevel.MEMORY_ONLY)) .thenThrow(new RuntimeException("putArray")) - assert(blockInfo.contains(RDDBlockId(0, 0)) === false) + assert(!blockInfo.contains(RDDBlockId(0, 0))) val context = new TaskContextImpl(0, 0, 0, 0) try { @@ -108,7 +108,7 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf } // remotely block is cached now - assert(blockInfo.contains(RDDBlockId(0, 0)) === true) + assert(blockInfo.contains(RDDBlockId(0, 0))) } test("get uncached local rdd") { From ddc6b4b759256afe3fe35f7d680de32f7f69accd Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 21 May 2015 21:03:35 +0800 Subject: [PATCH 4/6] Adapt for updated TaskContextImpl. --- core/src/test/scala/org/apache/spark/CacheManagerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index 6dc5094474fca..597f83d89821b 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -96,7 +96,7 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf assert(!blockInfo.contains(RDDBlockId(0, 0))) - val context = new TaskContextImpl(0, 0, 0, 0) + val context = new TaskContextImpl(0, 0, 0, 0, null) try { val computeValue = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY, cacheRemote = true) From 695bb3b91820bb3e2af54468ece53cba85fe142a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 25 May 2015 18:06:21 +0800 Subject: [PATCH 5/6] Committed CartesianRDD. --- core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index c1d6971787572..f8b122dfb9cee 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -72,8 +72,9 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = { val currSplit = split.asInstanceOf[CartesianPartition] - for (x <- rdd1.iterator(currSplit.s1, context); - y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) + val rdd2Items = rdd2.iterator(currSplit.s2, context, cacheRemote = true).toArray + for (x <- rdd1.iterator(currSplit.s1, context, cacheRemote = true); + y <- rdd2Items.toIterator) yield (x, y) } override def getDependencies: Seq[Dependency[_]] = List( From 4696534e513b5f0da32a30e96e0cbfe20d563120 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 26 May 2015 07:45:02 +0800 Subject: [PATCH 6/6] Call putInBlockManager which uses unrollSafely instead of putArray directly to avoid OOM. --- .../main/scala/org/apache/spark/CacheManager.scala | 8 ++++---- .../scala/org/apache/spark/rdd/CartesianRDD.scala | 3 +-- .../scala/org/apache/spark/CacheManagerSuite.scala | 13 +++++++++---- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index e4bc08517b6e3..5eb17d219598a 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -49,14 +49,14 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { .getInputMetricsForReadMethod(blockResult.readMethod) existingMetrics.incBytesRead(blockResult.bytes) - val buf = blockResult.data.toArray + var iter = blockResult.data.asInstanceOf[Iterator[T]] // If the block is retrieved remotely, try to cache it locally if (cacheRemote && !blockManager.containsBlockId(key)) { - blockManager.putArray(key, buf, storageLevel) + val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + iter = putInBlockManager(key, iter, storageLevel, updatedBlocks) } - val iter = buf.toIterator.asInstanceOf[Iterator[T]] - new InterruptibleIterator[T](context, iter) { + new InterruptibleIterator(context, iter) { override def next(): T = { existingMetrics.incRecordsRead(1) delegate.next() diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index f8b122dfb9cee..fbbfda5d87438 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -72,9 +72,8 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = { val currSplit = split.asInstanceOf[CartesianPartition] - val rdd2Items = rdd2.iterator(currSplit.s2, context, cacheRemote = true).toArray for (x <- rdd1.iterator(currSplit.s1, context, cacheRemote = true); - y <- rdd2Items.toIterator) yield (x, y) + y <- rdd2.iterator(currSplit.s2, context, cacheRemote = true)) yield (x, y) } override def getDependencies: Seq[Dependency[_]] = List( diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index 597f83d89821b..857db3108a298 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark +import scala.collection.mutable.ArrayBuffer + import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.mock.MockitoSugar @@ -85,14 +87,17 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf } test("cache remotely block") { + val memoryStore = mock[MemoryStore] val blockInfo = new TimeStampedHashMap[BlockId, Boolean] - val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12) + val values = Array(5, 6, 7).iterator + val result = new BlockResult(values, DataReadMethod.Memory, 12) // mock remotely received block when(blockManager.get(RDDBlockId(0, 0))).thenReturn(Some(result)) when(blockManager.containsBlockId(RDDBlockId(0, 0))) .thenReturn(false) - when(blockManager.putArray(RDDBlockId(0, 0), Array(5, 6, 7), StorageLevel.MEMORY_ONLY)) - .thenThrow(new RuntimeException("putArray")) + when(blockManager.memoryStore).thenReturn(memoryStore) + when(memoryStore.unrollSafely(RDDBlockId(0, 0), values, + new ArrayBuffer[(BlockId, BlockStatus)])).thenThrow(new RuntimeException("putInBlockManager")) assert(!blockInfo.contains(RDDBlockId(0, 0))) @@ -102,7 +107,7 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY, cacheRemote = true) } catch { case e: RuntimeException => - if (e.getMessage().contains("putArray")) { + if (e.getMessage().contains("putInBlockManager")) { blockInfo.putIfAbsent(RDDBlockId(0, 0), true) } }