From 0ff20467d6874a7e39b6b89cc9a4212602d76ebf Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Mon, 17 Apr 2017 23:15:32 +0800 Subject: [PATCH 01/14] Optimize RDD.cartesian --- .../org/apache/spark/rdd/CartesianRDD.scala | 61 ++++++++++++++++++- .../apache/spark/storage/BlockManager.scala | 19 +++++- 2 files changed, 74 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 57108dcedcf0c..3693f236f847c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -19,10 +19,11 @@ package org.apache.spark.rdd import java.io.{IOException, ObjectOutputStream} -import scala.reflect.ClassTag +import scala.reflect._ import org.apache.spark._ -import org.apache.spark.util.Utils +import org.apache.spark.storage.{RDDBlockId, StorageLevel} +import org.apache.spark.util.{TaskCompletionListener, TaskFailureListener, Utils} private[spark] class CartesianPartition( @@ -73,7 +74,61 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = { val currSplit = split.asInstanceOf[CartesianPartition] for (x <- rdd1.iterator(currSplit.s1, context); - y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) + y <- getOrCache(rdd2, currSplit.s2, context, StorageLevel.MEMORY_AND_DISK)) + yield (x, y) + } + + private def getOrCache( + rdd: RDD[U], + partition: Partition, + context: TaskContext, + level: StorageLevel): Iterator[U] = { + if (rdd.getStorageLevel != StorageLevel.NONE) { + rdd.iterator(partition, context) + } else { + val blockId = RDDBlockId(rdd.id, partition.index) + var readCachedBlock = true + // This method is called on executors, so we need call SparkEnv.get instead of sc.env. + val iterator = SparkEnv.get.blockManager.getOrElseUpdate(blockId, level, classTag[U], () => { + readCachedBlock = false + rdd.iterator(partition, context) + }, true) match { + case Left(blockResult) => + if (readCachedBlock) { + val existingMetrics = context.taskMetrics().inputMetrics + existingMetrics.incBytesRead(blockResult.bytes) + new InterruptibleIterator[U](context, blockResult.data.asInstanceOf[Iterator[U]]) { + override def next(): U = { + existingMetrics.incRecordsRead(1) + delegate.next() + } + } + } else { + new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[U]]) + } + case Right(iter) => + new InterruptibleIterator(context, iter.asInstanceOf[Iterator[U]]) + } + + context.addTaskCompletionListener(new TaskCompletionListener{ + override def onTaskCompletion(context: TaskContext): Unit = { + if (!readCachedBlock) { + SparkEnv.get.blockManager.removeBlock(blockId, false) + } + } + }) + + context.addTaskFailureListener(new TaskFailureListener{ + override def onTaskFailure(context: TaskContext, error: Throwable): Unit = { + if (!readCachedBlock) { + SparkEnv.get.blockManager.removeBlock(blockId, false) + } + } + }) + + iterator + } + } override def getDependencies: Seq[Dependency[_]] = List( diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 63acba65d3c5b..920efe1f9e78b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -678,6 +678,10 @@ private[spark] class BlockManager( None } + def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = { + getAndCache(blockId, false, StorageLevel.NONE) + } + /** * Get a block from the block manager (either local or remote). * @@ -685,13 +689,21 @@ private[spark] class BlockManager( * any locks if the block was fetched from a remote block manager. The read lock will * automatically be freed once the result's `data` iterator is fully consumed. */ - def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = { + def getAndCache[T: ClassTag]( + blockId: BlockId, + cacheRemote: Boolean, + storageLevel: StorageLevel): Option[BlockResult] = { val local = getLocalValues(blockId) if (local.isDefined) { logInfo(s"Found block $blockId locally") return local } val remote = getRemoteValues[T](blockId) + if (cacheRemote) { + remote.foreach{ blockResult => + putIterator(blockId, blockResult.data, storageLevel) + } + } if (remote.isDefined) { logInfo(s"Found block $blockId remotely") return remote @@ -740,10 +752,11 @@ private[spark] class BlockManager( blockId: BlockId, level: StorageLevel, classTag: ClassTag[T], - makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { + makeIterator: () => Iterator[T], + cacheRemote: Boolean = false): Either[BlockResult, Iterator[T]] = { // Attempt to read the block from local or remote storage. If it's present, then we don't need // to go through the local-get-or-put path. - get[T](blockId)(classTag) match { + getAndCache[T](blockId, cacheRemote, level)(classTag) match { case Some(block) => return Left(block) case _ => From 14d0e951b2ee3bc6a0ec8f227d417572e0371c6b Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Tue, 25 Apr 2017 13:54:07 +0800 Subject: [PATCH 02/14] cache the block fetched from remote --- .../org/apache/spark/rdd/CartesianRDD.scala | 75 +++++++++---------- .../apache/spark/storage/BlockManager.scala | 6 +- 2 files changed, 38 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 3693f236f847c..584d576202529 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -74,61 +74,56 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = { val currSplit = split.asInstanceOf[CartesianPartition] for (x <- rdd1.iterator(currSplit.s1, context); - y <- getOrCache(rdd2, currSplit.s2, context, StorageLevel.MEMORY_AND_DISK)) + y <- getOrCacheBlock(rdd2, currSplit.s2, context, StorageLevel.MEMORY_AND_DISK)) yield (x, y) } - private def getOrCache( + private def getOrCacheBlock( rdd: RDD[U], partition: Partition, context: TaskContext, level: StorageLevel): Iterator[U] = { - if (rdd.getStorageLevel != StorageLevel.NONE) { + val blockId = RDDBlockId(rdd.id, partition.index) + var readCachedBlock = true + // This method is called on executors, so we need call SparkEnv.get instead of sc.env. + val iterator = SparkEnv.get.blockManager.getOrElseUpdate(blockId, level, classTag[U], () => { + readCachedBlock = false rdd.iterator(partition, context) - } else { - val blockId = RDDBlockId(rdd.id, partition.index) - var readCachedBlock = true - // This method is called on executors, so we need call SparkEnv.get instead of sc.env. - val iterator = SparkEnv.get.blockManager.getOrElseUpdate(blockId, level, classTag[U], () => { - readCachedBlock = false - rdd.iterator(partition, context) - }, true) match { - case Left(blockResult) => - if (readCachedBlock) { - val existingMetrics = context.taskMetrics().inputMetrics - existingMetrics.incBytesRead(blockResult.bytes) - new InterruptibleIterator[U](context, blockResult.data.asInstanceOf[Iterator[U]]) { - override def next(): U = { - existingMetrics.incRecordsRead(1) - delegate.next() - } + }, true) match { + case Left(blockResult) => + if (readCachedBlock) { + val existingMetrics = context.taskMetrics().inputMetrics + existingMetrics.incBytesRead(blockResult.bytes) + new InterruptibleIterator[U](context, blockResult.data.asInstanceOf[Iterator[U]]) { + override def next(): U = { + existingMetrics.incRecordsRead(1) + delegate.next() } - } else { - new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[U]]) - } - case Right(iter) => - new InterruptibleIterator(context, iter.asInstanceOf[Iterator[U]]) - } - - context.addTaskCompletionListener(new TaskCompletionListener{ - override def onTaskCompletion(context: TaskContext): Unit = { - if (!readCachedBlock) { - SparkEnv.get.blockManager.removeBlock(blockId, false) } + } else { + new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[U]]) } - }) + case Right(iter) => + new InterruptibleIterator(context, iter.asInstanceOf[Iterator[U]]) + } - context.addTaskFailureListener(new TaskFailureListener{ - override def onTaskFailure(context: TaskContext, error: Throwable): Unit = { - if (!readCachedBlock) { - SparkEnv.get.blockManager.removeBlock(blockId, false) - } + context.addTaskCompletionListener(new TaskCompletionListener{ + override def onTaskCompletion(context: TaskContext): Unit = { + if (!readCachedBlock) { + SparkEnv.get.blockManager.removeBlock(blockId, false) } - }) + } + }) - iterator - } + context.addTaskFailureListener(new TaskFailureListener{ + override def onTaskFailure(context: TaskContext, error: Throwable): Unit = { + if (!readCachedBlock) { + SparkEnv.get.blockManager.removeBlock(blockId, false) + } + } + }) + iterator } override def getDependencies: Seq[Dependency[_]] = List( diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 920efe1f9e78b..9d5edaf50ac24 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -679,7 +679,7 @@ private[spark] class BlockManager( } def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = { - getAndCache(blockId, false, StorageLevel.NONE) + getOrCacheRemote(blockId, false, StorageLevel.NONE) } /** @@ -689,7 +689,7 @@ private[spark] class BlockManager( * any locks if the block was fetched from a remote block manager. The read lock will * automatically be freed once the result's `data` iterator is fully consumed. */ - def getAndCache[T: ClassTag]( + def getOrCacheRemote[T: ClassTag]( blockId: BlockId, cacheRemote: Boolean, storageLevel: StorageLevel): Option[BlockResult] = { @@ -756,7 +756,7 @@ private[spark] class BlockManager( cacheRemote: Boolean = false): Either[BlockResult, Iterator[T]] = { // Attempt to read the block from local or remote storage. If it's present, then we don't need // to go through the local-get-or-put path. - getAndCache[T](blockId, cacheRemote, level)(classTag) match { + getOrCacheRemote[T](blockId, cacheRemote, level)(classTag) match { case Some(block) => return Left(block) case _ => From 5b2318472a0b0f051d350c024fc4288148c172be Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 26 Apr 2017 14:08:04 +0800 Subject: [PATCH 03/14] add some log --- .../apache/spark/storage/BlockManager.scala | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 9d5edaf50ac24..2768ab4b45b34 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -699,15 +699,17 @@ private[spark] class BlockManager( return local } val remote = getRemoteValues[T](blockId) - if (cacheRemote) { - remote.foreach{ blockResult => - putIterator(blockId, blockResult.data, storageLevel) - } - } - if (remote.isDefined) { - logInfo(s"Found block $blockId remotely") - return remote + remote match { + case Some(blockResult) => + logInfo(s"Found block $blockId remotely") + if (cacheRemote) { + putIterator(blockId, blockResult.data, storageLevel) + logInfo(s"Cache bock $blockId fetched from remotely") + } + return remote + case None => _ } + None } From 96708b3933c38600d60bd21d656f915ee62944e3 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 26 Apr 2017 14:12:37 +0800 Subject: [PATCH 04/14] fix the compile error --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 2768ab4b45b34..f30e565699a00 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -707,7 +707,7 @@ private[spark] class BlockManager( logInfo(s"Cache bock $blockId fetched from remotely") } return remote - case None => _ + case None => } None From 0cc4d655fe25701683c6cebecb485c065c9f8dba Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 26 Apr 2017 14:54:51 +0800 Subject: [PATCH 05/14] Add log --- .../org/apache/spark/rdd/CartesianRDD.scala | 31 ++++++++++--------- .../apache/spark/storage/BlockManager.scala | 1 + 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 584d576202529..bd9f475519025 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -83,6 +83,7 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( partition: Partition, context: TaskContext, level: StorageLevel): Iterator[U] = { + logInfo("--------------> in getOrCacheBlock") val blockId = RDDBlockId(rdd.id, partition.index) var readCachedBlock = true // This method is called on executors, so we need call SparkEnv.get instead of sc.env. @@ -107,21 +108,21 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( new InterruptibleIterator(context, iter.asInstanceOf[Iterator[U]]) } - context.addTaskCompletionListener(new TaskCompletionListener{ - override def onTaskCompletion(context: TaskContext): Unit = { - if (!readCachedBlock) { - SparkEnv.get.blockManager.removeBlock(blockId, false) - } - } - }) - - context.addTaskFailureListener(new TaskFailureListener{ - override def onTaskFailure(context: TaskContext, error: Throwable): Unit = { - if (!readCachedBlock) { - SparkEnv.get.blockManager.removeBlock(blockId, false) - } - } - }) +// context.addTaskCompletionListener(new TaskCompletionListener{ +// override def onTaskCompletion(context: TaskContext): Unit = { +// if (!readCachedBlock) { +// SparkEnv.get.blockManager.removeBlock(blockId, false) +// } +// } +// }) +// +// context.addTaskFailureListener(new TaskFailureListener{ +// override def onTaskFailure(context: TaskContext, error: Throwable): Unit = { +// if (!readCachedBlock) { +// SparkEnv.get.blockManager.removeBlock(blockId, false) +// } +// } +// }) iterator } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f30e565699a00..7c81d905512c6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -756,6 +756,7 @@ private[spark] class BlockManager( classTag: ClassTag[T], makeIterator: () => Iterator[T], cacheRemote: Boolean = false): Either[BlockResult, Iterator[T]] = { + logInfo("--------------> In getOrElseUpdate") // Attempt to read the block from local or remote storage. If it's present, then we don't need // to go through the local-get-or-put path. getOrCacheRemote[T](blockId, cacheRemote, level)(classTag) match { From 74d8a5e70a51bdf4fd9c4e730c392b0c9e06b668 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 26 Apr 2017 15:14:45 +0800 Subject: [PATCH 06/14] Add log --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 7c81d905512c6..edd0d826de1a2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -702,7 +702,9 @@ private[spark] class BlockManager( remote match { case Some(blockResult) => logInfo(s"Found block $blockId remotely") + logInfo("------------------->" + cacheRemote) if (cacheRemote) { + logInfo("-------------------> in cacheRemote ") putIterator(blockId, blockResult.data, storageLevel) logInfo(s"Cache bock $blockId fetched from remotely") } From 359469943e3c2bfd25fb2f67c56dc78ce1039e3e Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 26 Apr 2017 15:56:40 +0800 Subject: [PATCH 07/14] add log --- core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala | 3 +++ core/src/main/scala/org/apache/spark/rdd/RDD.scala | 1 + .../src/main/scala/org/apache/spark/storage/BlockManager.scala | 3 ++- 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index bd9f475519025..067d850d2f6db 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -54,6 +54,8 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( extends RDD[(T, U)](sc, Nil) with Serializable { + logInfo("-------------> In Cartesian RDD initialize") + val numPartitionsInRdd2 = rdd2.partitions.length override def getPartitions: Array[Partition] = { @@ -72,6 +74,7 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( } override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = { + logInfo("-------------------> In Cartesian compute method") val currSplit = split.asInstanceOf[CartesianPartition] for (x <- rdd1.iterator(currSplit.s1, context); y <- getOrCacheBlock(rdd2, currSplit.s2, context, StorageLevel.MEMORY_AND_DISK)) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index e524675332d1b..3220967a77d5c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -672,6 +672,7 @@ abstract class RDD[T: ClassTag]( * elements (a, b) where a is in `this` and b is in `other`. */ def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope { + logInfo("---------------> In cartesian method") new CartesianRDD(sc, this, other) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index edd0d826de1a2..6f258e944fbcd 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -679,6 +679,7 @@ private[spark] class BlockManager( } def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = { + logInfo("-------------> get") getOrCacheRemote(blockId, false, StorageLevel.NONE) } @@ -758,7 +759,7 @@ private[spark] class BlockManager( classTag: ClassTag[T], makeIterator: () => Iterator[T], cacheRemote: Boolean = false): Either[BlockResult, Iterator[T]] = { - logInfo("--------------> In getOrElseUpdate") + logInfo("--------------> In getOrElseUpdate, cacheRemote: " + cacheRemote) // Attempt to read the block from local or remote storage. If it's present, then we don't need // to go through the local-get-or-put path. getOrCacheRemote[T](blockId, cacheRemote, level)(classTag) match { From 9bc7a77486c4a42369dbf3c5f5362a2fd7a5daa5 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Fri, 28 Apr 2017 10:54:23 +0800 Subject: [PATCH 08/14] fix the dead lock error --- core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala | 5 +++-- .../main/scala/org/apache/spark/storage/BlockManager.scala | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 067d850d2f6db..e91e63f0eaab7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -86,13 +86,14 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( partition: Partition, context: TaskContext, level: StorageLevel): Iterator[U] = { - logInfo("--------------> in getOrCacheBlock") + logInfo(s"--------------> in getOrCacheBlock(${rdd.id}, ${partition.index})") val blockId = RDDBlockId(rdd.id, partition.index) var readCachedBlock = true // This method is called on executors, so we need call SparkEnv.get instead of sc.env. val iterator = SparkEnv.get.blockManager.getOrElseUpdate(blockId, level, classTag[U], () => { + readCachedBlock = false - rdd.iterator(partition, context) + rdd.computeOrReadCheckpoint(partition, context) }, true) match { case Left(blockResult) => if (readCachedBlock) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6f258e944fbcd..3e0391728d93f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -703,9 +703,9 @@ private[spark] class BlockManager( remote match { case Some(blockResult) => logInfo(s"Found block $blockId remotely") - logInfo("------------------->" + cacheRemote) + logInfo(s"-------------------> (${blockId.name})") if (cacheRemote) { - logInfo("-------------------> in cacheRemote ") + logInfo(s"-------------------> in cacheRemote (${blockId.name})") putIterator(blockId, blockResult.data, storageLevel) logInfo(s"Cache bock $blockId fetched from remotely") } From 80d359677984445e216000bfd6b84b7465013c80 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Tue, 2 May 2017 23:15:58 +0800 Subject: [PATCH 09/14] remove log, and using completionIterator to remote cached block --- .../org/apache/spark/rdd/CartesianRDD.scala | 34 ++++++------------- .../apache/spark/storage/BlockManager.scala | 30 +++++++++------- 2 files changed, 28 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index e91e63f0eaab7..33713bca84906 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -22,8 +22,8 @@ import java.io.{IOException, ObjectOutputStream} import scala.reflect._ import org.apache.spark._ -import org.apache.spark.storage.{RDDBlockId, StorageLevel} -import org.apache.spark.util.{TaskCompletionListener, TaskFailureListener, Utils} +import org.apache.spark.storage.{BlockId, RDDBlockId, StorageLevel} +import org.apache.spark.util.{CompletionIterator, Utils} private[spark] class CartesianPartition( @@ -54,8 +54,6 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( extends RDD[(T, U)](sc, Nil) with Serializable { - logInfo("-------------> In Cartesian RDD initialize") - val numPartitionsInRdd2 = rdd2.partitions.length override def getPartitions: Array[Partition] = { @@ -74,7 +72,6 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( } override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = { - logInfo("-------------------> In Cartesian compute method") val currSplit = split.asInstanceOf[CartesianPartition] for (x <- rdd1.iterator(currSplit.s1, context); y <- getOrCacheBlock(rdd2, currSplit.s2, context, StorageLevel.MEMORY_AND_DISK)) @@ -86,12 +83,10 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( partition: Partition, context: TaskContext, level: StorageLevel): Iterator[U] = { - logInfo(s"--------------> in getOrCacheBlock(${rdd.id}, ${partition.index})") val blockId = RDDBlockId(rdd.id, partition.index) var readCachedBlock = true // This method is called on executors, so we need call SparkEnv.get instead of sc.env. val iterator = SparkEnv.get.blockManager.getOrElseUpdate(blockId, level, classTag[U], () => { - readCachedBlock = false rdd.computeOrReadCheckpoint(partition, context) }, true) match { @@ -112,23 +107,14 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( new InterruptibleIterator(context, iter.asInstanceOf[Iterator[U]]) } -// context.addTaskCompletionListener(new TaskCompletionListener{ -// override def onTaskCompletion(context: TaskContext): Unit = { -// if (!readCachedBlock) { -// SparkEnv.get.blockManager.removeBlock(blockId, false) -// } -// } -// }) -// -// context.addTaskFailureListener(new TaskFailureListener{ -// override def onTaskFailure(context: TaskContext, error: Throwable): Unit = { -// if (!readCachedBlock) { -// SparkEnv.get.blockManager.removeBlock(blockId, false) -// } -// } -// }) - - iterator + def removeBlock(blockId: BlockId, + readCachedBlock: Boolean): Unit = { + if (!readCachedBlock) { + SparkEnv.get.blockManager.removeBlock(blockId, true, false) + } + } + + CompletionIterator[U, Iterator[U]](iterator, removeBlock(blockId, readCachedBlock)) } override def getDependencies: Seq[Dependency[_]] = List( diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 3e0391728d93f..1449f28b7a047 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -679,21 +679,24 @@ private[spark] class BlockManager( } def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = { - logInfo("-------------> get") - getOrCacheRemote(blockId, false, StorageLevel.NONE) + getOrCacheRemote(blockId, false, Some(StorageLevel.NONE)) } /** - * Get a block from the block manager (either local or remote). + * Get a block from the block manager (either local or remote). And also can cache the block + * fetched from remote in local. * * This acquires a read lock on the block if the block was stored locally and does not acquire * any locks if the block was fetched from a remote block manager. The read lock will * automatically be freed once the result's `data` iterator is fully consumed. + * @param blockId the block under fetching. + * @param cacheRemote whether cache the block fetched remotely. + * @param storageLevel if the cacheRemote enabled, this should be set. */ def getOrCacheRemote[T: ClassTag]( blockId: BlockId, - cacheRemote: Boolean, - storageLevel: StorageLevel): Option[BlockResult] = { + cacheRemote: Boolean = false, + storageLevel: Option[StorageLevel] = None): Option[BlockResult] = { val local = getLocalValues(blockId) if (local.isDefined) { logInfo(s"Found block $blockId locally") @@ -703,10 +706,10 @@ private[spark] class BlockManager( remote match { case Some(blockResult) => logInfo(s"Found block $blockId remotely") - logInfo(s"-------------------> (${blockId.name})") if (cacheRemote) { - logInfo(s"-------------------> in cacheRemote (${blockId.name})") - putIterator(blockId, blockResult.data, storageLevel) + assert(storageLevel.isDefined && storageLevel.get.isValid, + "The storage level is invalid.") + putIterator(blockId, blockResult.data, storageLevel.get) logInfo(s"Cache bock $blockId fetched from remotely") } return remote @@ -759,10 +762,9 @@ private[spark] class BlockManager( classTag: ClassTag[T], makeIterator: () => Iterator[T], cacheRemote: Boolean = false): Either[BlockResult, Iterator[T]] = { - logInfo("--------------> In getOrElseUpdate, cacheRemote: " + cacheRemote) // Attempt to read the block from local or remote storage. If it's present, then we don't need // to go through the local-get-or-put path. - getOrCacheRemote[T](blockId, cacheRemote, level)(classTag) match { + getOrCacheRemote[T](blockId, cacheRemote, Some(level))(classTag) match { case Some(block) => return Left(block) case _ => @@ -1446,10 +1448,14 @@ private[spark] class BlockManager( /** * Remove a block from both memory and disk. + * @param blocking if true (default), this call will block until the lock is acquired. If false, + * this call will return immediately if the lock acquisition fails. */ - def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = { + def removeBlock(blockId: BlockId, + tellMaster: Boolean = true, + blocking: Boolean = true): Unit = { logDebug(s"Removing block $blockId") - blockInfoManager.lockForWriting(blockId) match { + blockInfoManager.lockForWriting(blockId, blocking) match { case None => // The block has already been removed; do nothing. logWarning(s"Asked to remove block $blockId, which does not exist") From f5eae9b5ada704c0354a95092ef3c166a75cb688 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 3 May 2017 21:25:46 +0800 Subject: [PATCH 10/14] remove the blocking parameter from 'removeBlock' --- .../main/scala/org/apache/spark/rdd/CartesianRDD.scala | 2 +- .../scala/org/apache/spark/storage/BlockManager.scala | 8 ++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 33713bca84906..3fcf2c091dccd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -110,7 +110,7 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( def removeBlock(blockId: BlockId, readCachedBlock: Boolean): Unit = { if (!readCachedBlock) { - SparkEnv.get.blockManager.removeBlock(blockId, true, false) + SparkEnv.get.blockManager.removeBlock(blockId, true) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1449f28b7a047..1ee345cec28d5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1448,14 +1448,10 @@ private[spark] class BlockManager( /** * Remove a block from both memory and disk. - * @param blocking if true (default), this call will block until the lock is acquired. If false, - * this call will return immediately if the lock acquisition fails. */ - def removeBlock(blockId: BlockId, - tellMaster: Boolean = true, - blocking: Boolean = true): Unit = { + def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = { logDebug(s"Removing block $blockId") - blockInfoManager.lockForWriting(blockId, blocking) match { + blockInfoManager.lockForWriting(blockId) match { case None => // The block has already been removed; do nothing. logWarning(s"Asked to remove block $blockId, which does not exist") From cc3c733ff98863401d19ceae411198be3da06eeb Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Thu, 4 May 2017 00:15:12 +0800 Subject: [PATCH 11/14] remove block without blocking --- .../org/apache/spark/rdd/CartesianRDD.scala | 5 ++- .../apache/spark/storage/BlockManager.scala | 37 +++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 3fcf2c091dccd..af8c5765ad575 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -109,8 +109,9 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( def removeBlock(blockId: BlockId, readCachedBlock: Boolean): Unit = { - if (!readCachedBlock) { - SparkEnv.get.blockManager.removeBlock(blockId, true) + val blockManager = SparkEnv.get.blockManager + if (!readCachedBlock || blockManager.isRemovable(blockId)) { + blockManager.removeOrMarkAsRemovable(blockId, true) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1ee345cec28d5..a263f6efbaff7 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -20,6 +20,7 @@ package org.apache.spark.storage import java.io._ import java.nio.ByteBuffer import java.nio.channels.Channels +import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable import scala.collection.mutable.HashMap @@ -202,6 +203,9 @@ private[spark] class BlockManager( private var blockReplicationPolicy: BlockReplicationPolicy = _ + // Record the removable block. + private lazy val removableBlocks = ConcurrentHashMap.newKeySet[BlockId]() + /** * Initializes the BlockManager with the given appId. This is not performed in the constructor as * the appId may not be known at BlockManager instantiation time (in particular for the driver, @@ -1478,6 +1482,39 @@ private[spark] class BlockManager( } } + /** + * Whether the block is removable. + */ + def isRemovable(blockId: BlockId): Boolean = { + removableBlocks.contains(blockId) + } + + /** + * Try to remove the block without blocking. Mark it as removable if it is use. + */ + def removeOrMarkAsRemovable(blockId: BlockId, tellMaster: Boolean = true): Unit = { + blockInfoManager.lockForWriting(blockId, false) match { + case None => + blockInfoManager.synchronized { + blockInfoManager.get(blockId) match { + case None => + // The block has already been removed; do nothing. + logWarning(s"Asked to remove block $blockId, which does not exist") + removableBlocks.remove(blockId) + case Some(_) => + // The block is in use, mark it as removable. + logDebug(s"Marking block $blockId as removable") + removableBlocks.add(blockId) + } + } + case Some(info) => + logDebug(s"Removing block $blockId") + removableBlocks.remove(blockId) + removeBlockInternal(blockId, tellMaster = tellMaster && info.tellMaster) + addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty) + } + } + private def addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status: BlockStatus): Unit = { Option(TaskContext.get()).foreach { c => c.taskMetrics().incUpdatedBlockStatuses(blockId -> status) From 3c61141b9fc4b9cf4cfcdf6c2e9f45c5a97ff754 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 3 May 2017 22:05:53 +0800 Subject: [PATCH 12/14] Add some comments --- .../main/scala/org/apache/spark/rdd/CartesianRDD.scala | 8 ++++++++ core/src/main/scala/org/apache/spark/rdd/RDD.scala | 1 - .../scala/org/apache/spark/storage/BlockManager.scala | 5 ++++- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index af8c5765ad575..6c91944516115 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -78,6 +78,14 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( yield (x, y) } + /** + * Try to get the block from the local, if not local, then get from the remote and cache it in + * local. + * + * Because the Block may be used by another task in the same executor, so when the task is + * complete, we try to remove the block in a non-blocking manner, otherwise it will be marked + * as removable. + */ private def getOrCacheBlock( rdd: RDD[U], partition: Partition, diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 3220967a77d5c..e524675332d1b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -672,7 +672,6 @@ abstract class RDD[T: ClassTag]( * elements (a, b) where a is in `this` and b is in `other`. */ def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope { - logInfo("---------------> In cartesian method") new CartesianRDD(sc, this, other) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a263f6efbaff7..56a8676aa5039 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1490,11 +1490,13 @@ private[spark] class BlockManager( } /** - * Try to remove the block without blocking. Mark it as removable if it is use. + * Try to remove the block without blocking. Mark it as removable if it is in use. */ def removeOrMarkAsRemovable(blockId: BlockId, tellMaster: Boolean = true): Unit = { + // Try to lock for writing without blocking. blockInfoManager.lockForWriting(blockId, false) match { case None => + // Because lock in unblocking manner, so the block may not exist or be used by other tasks. blockInfoManager.synchronized { blockInfoManager.get(blockId) match { case None => @@ -1532,6 +1534,7 @@ private[spark] class BlockManager( // Closing should be idempotent, but maybe not for the NioBlockTransferService. shuffleClient.close() } + removableBlocks.clear() diskBlockManager.stop() rpcEnv.stop(slaveEndpoint) blockInfoManager.clear() From 85cee23172fe280c8609e12ee30bfcc21f025355 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Fri, 5 May 2017 15:50:27 +0800 Subject: [PATCH 13/14] remove the block only when the task complete --- .../org/apache/spark/rdd/CartesianRDD.scala | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 6c91944516115..7eabbf2a13173 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -73,9 +73,12 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = { val currSplit = split.asInstanceOf[CartesianPartition] - for (x <- rdd1.iterator(currSplit.s1, context); - y <- getOrCacheBlock(rdd2, currSplit.s2, context, StorageLevel.MEMORY_AND_DISK)) - yield (x, y) + val (iter2, readCachedBlock) = + getOrCacheBlock(rdd2, currSplit.s2, context, StorageLevel.MEMORY_AND_DISK) + val resultIter = for (x <- rdd1.iterator(currSplit.s1, context); y <- iter2) yield (x, y) + + CompletionIterator[(T, U), Iterator[(T, U)]](resultIter, + removeBlock(RDDBlockId(rdd2.id, currSplit.s2.index), readCachedBlock)) } /** @@ -90,7 +93,7 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( rdd: RDD[U], partition: Partition, context: TaskContext, - level: StorageLevel): Iterator[U] = { + level: StorageLevel): (Iterator[U], Boolean) = { val blockId = RDDBlockId(rdd.id, partition.index) var readCachedBlock = true // This method is called on executors, so we need call SparkEnv.get instead of sc.env. @@ -115,15 +118,15 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( new InterruptibleIterator(context, iter.asInstanceOf[Iterator[U]]) } - def removeBlock(blockId: BlockId, - readCachedBlock: Boolean): Unit = { - val blockManager = SparkEnv.get.blockManager - if (!readCachedBlock || blockManager.isRemovable(blockId)) { - blockManager.removeOrMarkAsRemovable(blockId, true) - } - } + (iterator, readCachedBlock) + } - CompletionIterator[U, Iterator[U]](iterator, removeBlock(blockId, readCachedBlock)) + private def removeBlock(blockId: BlockId, + readCachedBlock: Boolean): Unit = { + val blockManager = SparkEnv.get.blockManager + if (!readCachedBlock || blockManager.isRemovable(blockId)) { + blockManager.removeOrMarkAsRemovable(blockId, true) + } } override def getDependencies: Seq[Dependency[_]] = List( From ceb547a37ab4a5880e72e3e7a0d00752ab37350b Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Sun, 7 May 2017 23:13:36 +0800 Subject: [PATCH 14/14] add test case and add some comments --- .../apache/spark/storage/BlockManager.scala | 15 +++- .../spark/storage/BlockManagerSuite.scala | 84 ++++++++++++++++++- 2 files changed, 95 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 56a8676aa5039..b71a5823404e5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -682,6 +682,13 @@ private[spark] class BlockManager( None } + /** + * Get a block from the block manager (either local or remote). + * + * This acquires a read lock on the block if the block was stored locally and does not acquire + * any locks if the block was fetched from a remote block manager. The read lock will + * automatically be freed once the result's `data` iterator is fully consumed. + */ def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = { getOrCacheRemote(blockId, false, Some(StorageLevel.NONE)) } @@ -713,8 +720,12 @@ private[spark] class BlockManager( if (cacheRemote) { assert(storageLevel.isDefined && storageLevel.get.isValid, "The storage level is invalid.") - putIterator(blockId, blockResult.data, storageLevel.get) - logInfo(s"Cache bock $blockId fetched from remotely") + val putResult = putIterator(blockId, blockResult.data, storageLevel.get) match { + case true => "success" + case false => "fail" + } + + logInfo(s"Cache bock $blockId fetched from remotely $putResult") } return remote case None => diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index a8b9604899838..ee82afb80bc47 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.storage import java.nio.ByteBuffer +import java.util.Properties import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ @@ -25,13 +26,11 @@ import scala.concurrent.Future import scala.language.implicitConversions import scala.language.postfixOps import scala.reflect.ClassTag - import org.mockito.{Matchers => mc} import org.mockito.Mockito.{mock, times, verify, when} import org.scalatest._ import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts._ - import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.executor.DataReadMethod @@ -101,6 +100,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE blockManager } + private def withTaskId[T](taskAttemptId: Long)(block: => T): T = { + try { + TaskContext.setTaskContext( + new TaskContextImpl(0, 0, taskAttemptId, 0, null, new Properties, null)) + block + } finally { + TaskContext.unset() + } + } + override def beforeEach(): Unit = { super.beforeEach() // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case @@ -1255,6 +1264,77 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations("item").isEmpty) } + test("cache block fetch remotely") { + store = makeBlockManager(8000, "executor1") + store2 = makeBlockManager(8000, "executor2") + val arr = new Array[Byte](4000) + store.putSingle("block1", arr, StorageLevel.MEMORY_AND_DISK, true) + assert(store.getSingleAndReleaseLock("block1").isDefined, "block1 was not in store") + + // default not cache the remotely block + store2.getOrCacheRemote("block1") + assert(!store2.getLocalAndReleaseLock("block1").isDefined, + "block1 should not be cached by store2") + + // cache remotely block + store2.getOrCacheRemote("block1", true, Some(StorageLevel.MEMORY_AND_DISK)) + assert(store2.getLocalAndReleaseLock("block1").isDefined, + "block1 should be cached by store2") + assert(master.getLocations("block1").size == 2, + "master did not report 2 locations for block1") + } + + test("remote block with blocking") { + store = makeBlockManager(8000, "executor1") + val arr = new Array[Byte](4000) + store.putSingle("block", arr, StorageLevel.MEMORY_AND_DISK, true) + withTaskId(0) { + store.get("block") + } + val future = Future { + withTaskId(1) { + store.removeBlock("block") + master.getLocations("block").isEmpty + } + } + Thread.sleep(300) + assert(store.getStatus("block").isDefined, "block was not in store") + withTaskId(0) { + store.releaseLock("block") + } + assert(ThreadUtils.awaitResult(future, 1.seconds)) + } + + test("remote block without blocking") { + store = makeBlockManager(8000, "executor1") + val arr = new Array[Byte](4000) + store.putSingle("block", arr, StorageLevel.MEMORY_AND_DISK, true) + withTaskId(0) { + // lock the block with read lock + store.get("block") + } + val future = Future { + withTaskId(1) { + store.removeOrMarkAsRemovable("block") + store.isRemovable("block") + } + } + Thread.sleep(300) + assert(store.getStatus("block").isDefined, "block should not be removed") + assert(ThreadUtils.awaitResult(future, 1.seconds), "block should be marked as removable") + withTaskId(0) { + store.releaseLock("block") + } + val future1 = Future { + withTaskId(1) { + store.removeOrMarkAsRemovable("block") + !store.isRemovable("block") + } + } + assert(ThreadUtils.awaitResult(future1, 1.seconds), "block should not be marked as removable") + assert(master.getLocations("block").isEmpty, "block should be removed") + } + class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService { var numCalls = 0