From e38113d5fbd58845db779cb4036cad7da0bd41ee Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Sun, 15 Jun 2014 15:55:29 +0530 Subject: [PATCH 1/2] Just a POC for having compression for every RDD. --- .../apache/spark/api/java/StorageLevels.java | 29 +++++---- .../scala/org/apache/spark/CacheManager.scala | 2 +- .../apache/spark/scheduler/DAGScheduler.scala | 5 +- .../org/apache/spark/storage/BlockId.scala | 10 +-- .../apache/spark/storage/BlockManager.scala | 8 ++- .../apache/spark/storage/StorageLevel.scala | 64 +++++++++++++------ .../org/apache/spark/util/JsonProtocol.scala | 3 +- .../org/apache/spark/CacheManagerSuite.scala | 8 +-- .../apache/spark/ContextCleanerSuite.scala | 2 +- .../org/apache/spark/DistributedSuite.scala | 3 +- .../apache/spark/storage/BlockIdSuite.scala | 6 +- .../spark/storage/BlockManagerSuite.scala | 14 ++-- .../scala/org/apache/spark/ui/UISuite.scala | 2 +- .../apache/spark/util/JsonProtocolSuite.scala | 5 +- 14 files changed, 96 insertions(+), 65 deletions(-) diff --git a/core/src/main/java/org/apache/spark/api/java/StorageLevels.java b/core/src/main/java/org/apache/spark/api/java/StorageLevels.java index 840a1bd93bfbb..27617d32c6292 100644 --- a/core/src/main/java/org/apache/spark/api/java/StorageLevels.java +++ b/core/src/main/java/org/apache/spark/api/java/StorageLevels.java @@ -23,18 +23,18 @@ * Expose some commonly useful storage level constants. */ public class StorageLevels { - public static final StorageLevel NONE = create(false, false, false, false, 1); - public static final StorageLevel DISK_ONLY = create(true, false, false, false, 1); - public static final StorageLevel DISK_ONLY_2 = create(true, false, false, false, 2); - public static final StorageLevel MEMORY_ONLY = create(false, true, false, true, 1); - public static final StorageLevel MEMORY_ONLY_2 = create(false, true, false, true, 2); - public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, false, 1); - public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, false, 2); - public static final StorageLevel MEMORY_AND_DISK = create(true, true, false, true, 1); - public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, false, true, 2); - public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, false, 1); - public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, false, 2); - public static final StorageLevel OFF_HEAP = create(false, false, true, false, 1); + public static final StorageLevel NONE = create(false, false, false, false, false, 1); + public static final StorageLevel DISK_ONLY = create(true, false, false, false, false, 1); + public static final StorageLevel DISK_ONLY_2 = create(true, false, false, false, false, 2); + public static final StorageLevel MEMORY_ONLY = create(false, true, false, true, false, 1); + public static final StorageLevel MEMORY_ONLY_2 = create(false, true, false, true, false, 2); + public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, false, false, 1); + public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, false, false, 2); + public static final StorageLevel MEMORY_AND_DISK = create(true, true, false, true, false, 1); + public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, false, true, false, 2); + public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, false, false, 1); + public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, false, false, 2); + public static final StorageLevel OFF_HEAP = create(false, false, true, false, false, 1); /** * Create a new StorageLevel object. @@ -46,7 +46,7 @@ public class StorageLevels { @Deprecated public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, int replication) { - return StorageLevel.apply(useDisk, useMemory, false, deserialized, replication); + return StorageLevel.apply(useDisk, useMemory, false, deserialized, false, replication); } /** @@ -62,7 +62,8 @@ public static StorageLevel create( boolean useMemory, boolean useOffHeap, boolean deserialized, + boolean compressed, int replication) { - return StorageLevel.apply(useDisk, useMemory, useOffHeap, deserialized, replication); + return StorageLevel.apply(useDisk, useMemory, useOffHeap, deserialized, compressed, replication); } } diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 315ed91f81df3..b2eca51167910 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -38,7 +38,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { context: TaskContext, storageLevel: StorageLevel): Iterator[T] = { - val key = RDDBlockId(rdd.id, split.index) + val key = RDDBlockId(rdd.id, split.index, rdd.getStorageLevel.toInt) logDebug(s"Looking for partition $key") blockManager.get(key) match { case Some(values) => diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 3c85b5a2ae776..d7eb48a0f6dbd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -37,7 +37,7 @@ import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.rdd.RDD -import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId} +import org.apache.spark.storage._ import org.apache.spark.util.{SystemClock, Clock, Utils} /** @@ -172,7 +172,8 @@ class DAGScheduler( private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = { if (!cacheLocs.contains(rdd.id)) { - val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] + val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index, + rdd.getStorageLevel.toInt)).toArray[BlockId] val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster) cacheLocs(rdd.id) = blockIds.map { id => locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId)) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 42ec181b00bb3..02bf3a6764da1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -49,8 +49,8 @@ sealed abstract class BlockId { } @DeveloperApi -case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { - def name = "rdd_" + rddId + "_" + splitIndex +case class RDDBlockId(rddId: Int, splitIndex: Int, storageLevel: Int) extends BlockId { + def name = "rdd_" + rddId + "_" + splitIndex + "_" + storageLevel } @DeveloperApi @@ -86,7 +86,7 @@ private[spark] case class TestBlockId(id: String) extends BlockId { @DeveloperApi object BlockId { - val RDD = "rdd_([0-9]+)_([0-9]+)".r + val RDD = "rdd_([0-9]+)_([0-9]+)_([0-9]+)".r val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r val TASKRESULT = "taskresult_([0-9]+)".r @@ -95,8 +95,8 @@ object BlockId { /** Converts a BlockId "name" String back into a BlockId. */ def apply(id: String) = id match { - case RDD(rddId, splitIndex) => - RDDBlockId(rddId.toInt, splitIndex.toInt) + case RDD(rddId, splitIndex, storageLevel) => + RDDBlockId(rddId.toInt, splitIndex.toInt, storageLevel.toInt) case SHUFFLE(shuffleId, mapId, reduceId) => ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) case BROADCAST(broadcastId, field) => diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f52bc7075104b..1d5d30f62c47d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -301,7 +301,9 @@ private[spark] class BlockManager( val onDisk = level.useDisk && diskStore.contains(blockId) val deserialized = if (inMem) level.deserialized else false val replication = if (inMem || inTachyon || onDisk) level.replication else 1 - val storageLevel = StorageLevel(onDisk, inMem, inTachyon, deserialized, replication) + val compressed = level.compressed && !deserialized + val storageLevel = StorageLevel(onDisk, inMem, inTachyon, deserialized, compressed, + replication) val memSize = if (inMem) memoryStore.getSize(blockId) else 0L val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L @@ -759,7 +761,7 @@ private[spark] class BlockManager( @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { val tLevel = StorageLevel( - level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) + level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, level.compressed, 1) if (cachedPeers == null) { cachedPeers = master.getPeers(blockManagerId, level.replication - 1) } @@ -948,7 +950,7 @@ private[spark] class BlockManager( blockId match { case _: ShuffleBlockId => compressShuffle case _: BroadcastBlockId => compressBroadcast - case _: RDDBlockId => compressRdds + case r: RDDBlockId => compressRdds || StorageLevel(r.storageLevel, 1).compressed case _: TempBlockId => compressShuffleSpill case _ => false } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 1e35abaab5353..70209061515e7 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -38,21 +38,24 @@ class StorageLevel private( private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, + private var _compressed: Boolean, private var _replication: Int = 1) extends Externalizable { // TODO: Also add fields for caching priority, dataset ID, and flushing. private def this(flags: Int, replication: Int) { - this((flags & 8) != 0, (flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication) + this((flags & 8) != 0, (flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, (flags & 16) != 0, + replication) } - def this() = this(false, true, false, false) // For deserialization + def this() = this(false, true, false, false, false) // For deserialization def useDisk = _useDisk def useMemory = _useMemory def useOffHeap = _useOffHeap def deserialized = _deserialized def replication = _replication + def compressed = _compressed assert(replication < 40, "Replication restricted to be less than 40 for calculating hash codes") @@ -63,8 +66,12 @@ class StorageLevel private( require(replication == 1, "Off-heap storage level does not support multiple replication") } + if(_compressed){ + require(!deserialized, "Compressed storage level does not support deserialized storage") + } + override def clone(): StorageLevel = { - new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication) + new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, compressed, replication) } override def equals(other: Any): Boolean = other match { @@ -73,6 +80,7 @@ class StorageLevel private( s.useMemory == useMemory && s.useOffHeap == useOffHeap && s.deserialized == deserialized && + s.compressed == compressed && s.replication == replication case _ => false @@ -82,6 +90,9 @@ class StorageLevel private( def toInt: Int = { var ret = 0 + if (_compressed) { + ret |= 16 + } if (_useDisk) { ret |= 8 } @@ -104,6 +115,7 @@ class StorageLevel private( override def readExternal(in: ObjectInput) { val flags = in.readByte() + _compressed = (flags & 16) != 0 _useDisk = (flags & 8) != 0 _useMemory = (flags & 4) != 0 _useOffHeap = (flags & 2) != 0 @@ -115,7 +127,7 @@ class StorageLevel private( private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this) override def toString: String = { - s"StorageLevel($useDisk, $useMemory, $useOffHeap, $deserialized, $replication)" + s"StorageLevel($useDisk, $useMemory, $useOffHeap, $deserialized, $compressed, $replication)" } override def hashCode(): Int = toInt * 41 + replication @@ -126,6 +138,7 @@ class StorageLevel private( result += (if (useMemory) "Memory " else "") result += (if (useOffHeap) "Tachyon " else "") result += (if (deserialized) "Deserialized " else "Serialized ") + result += (if (compressed) "Compressed" else "Uncompressed") result += s"${replication}x Replicated" result } @@ -137,18 +150,23 @@ class StorageLevel private( * new storage levels. */ object StorageLevel { - val NONE = new StorageLevel(false, false, false, false) - val DISK_ONLY = new StorageLevel(true, false, false, false) - val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) - val MEMORY_ONLY = new StorageLevel(false, true, false, true) - val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) - val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) - val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) - val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) - val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) - val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) - val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) - val OFF_HEAP = new StorageLevel(false, false, true, false) + val NONE = new StorageLevel(false, false, false, false, false) + val DISK_ONLY = new StorageLevel(true, false, false, false, false) + val DISK_ONLY_2 = new StorageLevel(true, false, false, false, false, 2) + val MEMORY_ONLY = new StorageLevel(false, true, false, true, false) + val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, false, 2) + val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false, false) + val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, false, 2) + val MEMORY_AND_DISK = new StorageLevel(true, true, false, true, false) + val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, false, 2) + val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false, false) + val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, false, 2) + val MEMORY_ONLY_COMP = new StorageLevel(false, true, false, false, true) + val MEMORY_ONLY_COMP_2 = new StorageLevel(false, true, false, false, true, 2) + val MEMORY_AND_DISK_COMP = new StorageLevel(true, true, false, false, true, 1) + val MEMORY_AND_DISK_COMP_2 = new StorageLevel(true, true, false, false, true, 2) + + val OFF_HEAP = new StorageLevel(false, false, true, false, false) /** * :: DeveloperApi :: @@ -167,13 +185,17 @@ object StorageLevel { case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2 case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2 + case "MEMORY_AND_DISK_COMP" => MEMORY_AND_DISK_COMP + case "MEMORY_AND_DISK_COMP_2" => MEMORY_AND_DISK_COMP_2 + case "MEMORY_ONLY_COMP" => MEMORY_ONLY_COMP + case "MEMORY_ONLY_COMP_2" => MEMORY_ONLY_COMP_2 case "OFF_HEAP" => OFF_HEAP case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $s") } /** * :: DeveloperApi :: - * Create a new StorageLevel object without setting useOffHeap. + * Create a new StorageLevel object. */ @DeveloperApi def apply( @@ -181,14 +203,15 @@ object StorageLevel { useMemory: Boolean, useOffHeap: Boolean, deserialized: Boolean, + compressed: Boolean, replication: Int) = { getCachedStorageLevel( - new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)) + new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, compressed, replication)) } /** * :: DeveloperApi :: - * Create a new StorageLevel object. + * Create a new StorageLevel object without setting useOffHeap and compression level. */ @DeveloperApi def apply( @@ -196,7 +219,8 @@ object StorageLevel { useMemory: Boolean, deserialized: Boolean, replication: Int = 1) = { - getCachedStorageLevel(new StorageLevel(useDisk, useMemory, false, deserialized, replication)) + getCachedStorageLevel(new StorageLevel(useDisk, useMemory, false, deserialized, + false, replication)) } /** diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 09825087bb048..8832d18d9036d 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -623,8 +623,9 @@ private[spark] object JsonProtocol { val useMemory = (json \ "Use Memory").extract[Boolean] val useTachyon = (json \ "Use Tachyon").extract[Boolean] val deserialized = (json \ "Deserialized").extract[Boolean] + val compressed = (json \ "Compressed").extract[Boolean] val replication = (json \ "Replication").extract[Int] - StorageLevel(useDisk, useMemory, useTachyon, deserialized, replication) + StorageLevel(useDisk, useMemory, useTachyon, deserialized, compressed, replication) } def blockStatusFromJson(json: JValue): BlockStatus = { diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index 4f178db40f638..4309487295e2c 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -52,8 +52,8 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar test("get uncached rdd") { expecting { - blockManager.get(RDDBlockId(0, 0)).andReturn(None) - blockManager.put(RDDBlockId(0, 0), ArrayBuffer[Any](1, 2, 3, 4), StorageLevel.MEMORY_ONLY, + blockManager.get(RDDBlockId(0, 0, 0)).andReturn(None) + blockManager.put(RDDBlockId(0, 0, 0), ArrayBuffer[Any](1, 2, 3, 4), StorageLevel.MEMORY_ONLY, true).andStubReturn(Seq[(BlockId, BlockStatus)]()) } @@ -66,7 +66,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar test("get cached rdd") { expecting { - blockManager.get(RDDBlockId(0, 0)).andReturn(Some(ArrayBuffer(5, 6, 7).iterator)) + blockManager.get(RDDBlockId(0, 0, 0)).andReturn(Some(ArrayBuffer(5, 6, 7).iterator)) } whenExecuting(blockManager) { @@ -79,7 +79,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar test("get uncached local rdd") { expecting { // Local computation should not persist the resulting value, so don't expect a put(). - blockManager.get(RDDBlockId(0, 0)).andReturn(None) + blockManager.get(RDDBlockId(0, 0, 0)).andReturn(None) } whenExecuting(blockManager) { diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index 13b415cccb647..022644dfa9ac1 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -393,7 +393,7 @@ class CleanerTester( private def getRDDBlocks(rddId: Int): Seq[BlockId] = { blockManager.master.getMatchingBlockIds( _ match { - case RDDBlockId(`rddId`, _) => true + case RDDBlockId(`rddId`, _, _) => true case _ => false }, askSlaves = true) } diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 41c294f727b3c..83b23eda56bff 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -199,7 +199,8 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter // Get all the locations of the first partition and try to fetch the partitions // from those locations. - val blockIds = data.partitions.indices.map(index => RDDBlockId(data.id, index)).toArray + val blockIds = data.partitions.indices.map(index => RDDBlockId(data.id, index, + data.getStorageLevel.toInt)).toArray val blockId = blockIds(0) val blockManager = SparkEnv.get.blockManager blockManager.master.getLocations(blockId).foreach(id => { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala index b647e8a6728ec..d73fe9a5685f7 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala @@ -44,9 +44,9 @@ class BlockIdSuite extends FunSuite { } test("rdd") { - val id = RDDBlockId(1, 2) - assertSame(id, RDDBlockId(1, 2)) - assertDifferent(id, RDDBlockId(1, 1)) + val id = RDDBlockId(1, 2, 0) + assertSame(id, RDDBlockId(1, 2, 0)) + assertDifferent(id, RDDBlockId(1, 1, 0)) assert(id.name === "rdd_1_2") assert(id.asRDDId.get.rddId === 1) assert(id.asRDDId.get.splitIndex === 2) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index d7dbe5164b7f6..06a93eb63ebf0 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -57,7 +57,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter // Implicitly convert strings to BlockIds for test clarity. implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) - def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId) + def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId, 0) before { val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, conf = conf, @@ -102,9 +102,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("StorageLevel object caching") { - val level1 = StorageLevel(false, false, false, false, 3) - val level2 = StorageLevel(false, false, false, false, 3) // this should return the same object as level1 - val level3 = StorageLevel(false, false, false, false, 2) // this should return a different object + val level1 = StorageLevel(false, false, false, false, false, 3) + val level2 = StorageLevel(false, false, false, false, false, 3) // this should return the same object as level1 + val level3 = StorageLevel(false, false, false, false, false, 2) // this should return a different object assert(level2 === level1, "level2 is not same as level1") assert(level2.eq(level1), "level2 is not the same object as level1") assert(level3 != level1, "level3 is same as level1") @@ -952,15 +952,15 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = false).size === 1) assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = true).size === 3) - val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0)) + val blockIds = Seq(RDDBlockId(1, 0, 0), RDDBlockId(1, 1, 0), RDDBlockId(2, 0, 0)) blockIds.foreach { blockId => store.put(blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) } val matchedBlockIds = store.master.getMatchingBlockIds(_ match { - case RDDBlockId(1, _) => true + case RDDBlockId(1, _, _) => true case _ => false }, askSlaves = true) - assert(matchedBlockIds.toSet === Set(RDDBlockId(1, 0), RDDBlockId(1, 1))) + assert(matchedBlockIds.toSet === Set(RDDBlockId(1, 0, 0), RDDBlockId(1, 1, 0))) } test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") { diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 038746d2eda4b..27e0525592d77 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -23,6 +23,7 @@ import javax.servlet.http.HttpServletRequest import scala.io.Source import scala.language.postfixOps import scala.util.{Failure, Success, Try} +import scala.xml.Node import org.eclipse.jetty.server.Server import org.eclipse.jetty.servlet.ServletContextHandler @@ -32,7 +33,6 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.LocalSparkContext._ -import scala.xml.Node class UISuite extends FunSuite { diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 3031015256ec9..fb38a5b1065ae 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -110,7 +110,7 @@ class JsonProtocolSuite extends FunSuite { testTaskEndReason(UnknownReason) // BlockId - testBlockId(RDDBlockId(1, 2)) + testBlockId(RDDBlockId(1, 2, 0)) testBlockId(ShuffleBlockId(1, 2, 3)) testBlockId(BroadcastBlockId(1L, "insert_words_of_wisdom_here")) testBlockId(TaskResultBlockId(1L)) @@ -468,7 +468,8 @@ class JsonProtocolSuite extends FunSuite { t.shuffleWriteMetrics = Some(sw) // Make at most 6 blocks t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i => - (RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i, c%i)) + (RDDBlockId(e % i, f % i, StorageLevel.MEMORY_AND_DISK_SER_2.toInt), + BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i, c%i)) }.toSeq) t } From 28df10bb0e73082e1214421c235b12c7b7527ed2 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Mon, 16 Jun 2014 12:41:12 +0530 Subject: [PATCH 2/2] Fixed test suites. --- core/src/main/scala/org/apache/spark/util/JsonProtocol.scala | 1 + core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala | 2 +- .../test/scala/org/apache/spark/storage/BlockManagerSuite.scala | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 8832d18d9036d..fff89e84e729b 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -297,6 +297,7 @@ private[spark] object JsonProtocol { ("Use Memory" -> storageLevel.useMemory) ~ ("Use Tachyon" -> storageLevel.useOffHeap) ~ ("Deserialized" -> storageLevel.deserialized) ~ + ("Compressed" -> storageLevel.compressed) ~ ("Replication" -> storageLevel.replication) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala index d73fe9a5685f7..7d515c235c31e 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala @@ -47,7 +47,7 @@ class BlockIdSuite extends FunSuite { val id = RDDBlockId(1, 2, 0) assertSame(id, RDDBlockId(1, 2, 0)) assertDifferent(id, RDDBlockId(1, 1, 0)) - assert(id.name === "rdd_1_2") + assert(id.name === "rdd_1_2_0") assert(id.asRDDId.get.rddId === 1) assert(id.asRDDId.get.splitIndex === 2) assert(id.isRDD) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 06a93eb63ebf0..d913f52180428 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -796,7 +796,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter val bytes = Array.fill[Byte](1000)(incr) val byteBuffer = ByteBuffer.wrap(bytes) - val blockId = BlockId("rdd_1_2") + val blockId = BlockId("rdd_1_2_0") // This sequence of mocks makes these tests fairly brittle. It would // be nice to refactor classes involved in disk storage in a way that