From fc3f1d78e14a30dd2f71fc65ec59a2def5c1a0d4 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Wed, 5 Jul 2017 16:20:16 +0300 Subject: [PATCH 01/16] SPARK-6235__take1: introduce a failing test. --- .../apache/spark/storage/DiskStoreSuite.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala index 67fc084e8a13d..de4c2f9a6c432 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala @@ -92,6 +92,31 @@ class DiskStoreSuite extends SparkFunSuite { assert(diskStore.getSize(blockId) === 0L) } + test("blocks larger than 2gb") { + val conf = new SparkConf() + val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true) + val diskStore = new DiskStore(conf, diskBlockManager, new SecurityManager(conf)) + + val mb = 1024*1024 + val gb = 1024*mb + + val blockId = BlockId("rdd_1_2") + diskStore.put(blockId) { chan => + val arr = new Array[Byte](mb) + for{ + _ <- 0 until 2048 + }{ + val buf = ByteBuffer.wrap(arr) + while (buf.hasRemaining()) { + chan.write(buf) + } + } + } + + val blockData = diskStore.getBytes(blockId) + assert(blockData.size == 2 * gb) + } + test("block data encryption") { val testDir = Utils.createTempDir() val testData = new Array[Byte](128 * 1024) From 84687380026a6a3bcded27be517094d3f690c3bb Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sun, 30 Jul 2017 23:06:05 +0300 Subject: [PATCH 02/16] SPARK-6235__add_failing_tests: add failing tests for block manager suite. --- .../spark/storage/BlockManagerSuite.scala | 86 +++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 086adccea954c..e8b458d317044 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1415,6 +1415,92 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE super.fetchBlockSync(host, port, execId, blockId) } } + + def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel){ + store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1") + def mkBlobs() = { + val rng = new java.util.Random(42) + Iterator.fill(2 * 1024 + 1){ + val buff = new Array[Byte](1024 * 1024) + rng.nextBytes(buff) + buff + } + } + val res1 = store.getOrElseUpdate( + RDDBlockId(42, 0), + storageLevel, + implicitly[ClassTag[Array[Byte]]], + mkBlobs _ + ) + withClue(res1) { + assert(res1.isLeft) + assert(res1.left.get.data.zipAll(mkBlobs(), null, null).forall { + case (a, b) => + a != null && + b != null && + a.asInstanceOf[Array[Byte]].seq == b.asInstanceOf[Array[Byte]].seq + }) + } + val getResult = store.get(RDDBlockId(42, 0)) + withClue(getResult) { + assert(getResult.isDefined) + assert(getResult.get.data.zipAll(mkBlobs(), null, null).forall { + case (a, b) => + a != null && + b != null && + a.asInstanceOf[Array[Byte]].seq == b.asInstanceOf[Array[Byte]].seq + }) + } + val getBlockRes = store.getBlockData(RDDBlockId(42, 0)) + withClue(getBlockRes){ + try { + assert(getBlockRes.size() >= 2 * 1024 * 1024 * 1024) + Utils.tryWithResource(getBlockRes.createInputStream()){ inpStrm => + val iter = store.serializerManager.dataDeserializeStream(RDDBlockId(42, 0), inpStrm)(implicitly[ClassTag[Array[Byte]]]) + assert(iter.zipAll(mkBlobs(), null, null).forall { + case (a, b) => + a != null && + b != null && + a.asInstanceOf[Array[Byte]].seq == b.asInstanceOf[Array[Byte]].seq + }) + } + } finally { + getBlockRes.release() + } + } +/* + val res2 = store.getOrElseUpdate( + RDDBlockId(42, 0), + storageLevel, + implicitly[ClassTag[Array[Byte]]], + mkBlobs _ + ) + withClue(res2) { + assert(res2.isLeft) + assert(res2.left.get.data.zipAll(mkBlobs(), null, null).forall { + case (a, b) => + a != null && + b != null && + a.asInstanceOf[Array[Byte]].seq == b.asInstanceOf[Array[Byte]].seq + }) + }*/ + } + + test("getOrElseUpdate > 2gb, storage level = disk only"){ + testGetOrElseUpdateForLargeBlock(StorageLevel.DISK_ONLY) + } + + test("getOrElseUpdate > 2gb, storage level = memory deserialized"){ + testGetOrElseUpdateForLargeBlock(StorageLevel.MEMORY_ONLY) + } + + test("getOrElseUpdate > 2gb, storage level = off-heap"){ + testGetOrElseUpdateForLargeBlock(StorageLevel.OFF_HEAP) + } + + test("getOrElseUpdate > 2gb, storage level = memory serialized"){ + testGetOrElseUpdateForLargeBlock(StorageLevel.MEMORY_ONLY_SER) + } } private object BlockManagerSuite { From 15804497a477b8f97c08adfad5f0519504dc82f2 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Tue, 1 Aug 2017 20:34:26 +0300 Subject: [PATCH 03/16] SPARK-6235__add_failing_tests: introduce a new BlockData implementation to represent a disk backed block data. --- .../org/apache/spark/storage/DiskStore.scala | 78 +++++++++++++------ 1 file changed, 56 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index c6656341fcd15..584f80d54e07c 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -25,11 +25,9 @@ import java.nio.charset.StandardCharsets.UTF_8 import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable.ListBuffer - import com.google.common.io.{ByteStreams, Closeables, Files} -import io.netty.channel.FileRegion +import io.netty.channel.{DefaultFileRegion, FileRegion} import io.netty.util.AbstractReferenceCounted - import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.ManagedBuffer @@ -108,25 +106,7 @@ private[spark] class DiskStore( new EncryptedBlockData(file, blockSize, conf, key) case _ => - val channel = new FileInputStream(file).getChannel() - if (blockSize < minMemoryMapBytes) { - // For small files, directly read rather than memory map. - Utils.tryWithSafeFinally { - val buf = ByteBuffer.allocate(blockSize.toInt) - JavaUtils.readFully(channel, buf) - buf.flip() - new ByteBufferBlockData(new ChunkedByteBuffer(buf), true) - } { - channel.close() - } - } else { - Utils.tryWithSafeFinally { - new ByteBufferBlockData( - new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)), true) - } { - channel.close() - } - } + new DiskBlockData(conf, file, blockSize) } } @@ -165,6 +145,60 @@ private[spark] class DiskStore( } +private class DiskBlockData( + conf: SparkConf, + file: File, + blockSize: Long) extends BlockData { + + private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m") + + override def toInputStream(): InputStream = new FileInputStream(file) + + /** + * Returns a Netty-friendly wrapper for the block's data. + * + * Please see `ManagedBuffer.convertToNetty()` for more details. + */ + override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size) + + override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = { + Utils.tryWithResource(open()){ channel => + var remaining = blockSize + val chunks = new ListBuffer[ByteBuffer]() + while (remaining > 0) { + val chunkSize = math.min(remaining, Int.MaxValue) + val chunk = allocator(chunkSize.toInt) + remaining -= chunkSize + JavaUtils.readFully(channel, chunk) + chunk.flip() + chunks += chunk + } + new ChunkedByteBuffer(chunks.toArray) + } + } + + override def toByteBuffer(): ByteBuffer = { + require( size < Int.MaxValue, s"can't create a byte buffer of size $blockSize since it exceeds Int.MaxValue ${Int.MaxValue}.") + Utils.tryWithResource(open()) { channel => + if (blockSize < minMemoryMapBytes) { + // For small files, directly read rather than memory map. + val buf = ByteBuffer.allocate(blockSize.toInt) + JavaUtils.readFully(channel, buf) + buf.flip() + buf + } else { + channel.map(MapMode.READ_ONLY, 0, file.length) + } + } + } + + override def size: Long = blockSize + + override def dispose(): Unit = {} + + private def open() = new FileInputStream(file).getChannel +} + private class EncryptedBlockData( file: File, blockSize: Long, From c5028f50698c4fe48a06f5dd683dbee42f7e6b2b Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sat, 5 Aug 2017 22:57:41 +0300 Subject: [PATCH 04/16] SPARK-6235__add_failing_tests: styling --- .../org/apache/spark/storage/DiskStore.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 584f80d54e07c..85b76807a520f 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -25,9 +25,11 @@ import java.nio.charset.StandardCharsets.UTF_8 import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable.ListBuffer + import com.google.common.io.{ByteStreams, Closeables, Files} import io.netty.channel.{DefaultFileRegion, FileRegion} import io.netty.util.AbstractReferenceCounted + import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.ManagedBuffer @@ -155,14 +157,14 @@ private class DiskBlockData( override def toInputStream(): InputStream = new FileInputStream(file) /** - * Returns a Netty-friendly wrapper for the block's data. - * - * Please see `ManagedBuffer.convertToNetty()` for more details. - */ + * Returns a Netty-friendly wrapper for the block's data. + * + * Please see `ManagedBuffer.convertToNetty()` for more details. + */ override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size) override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = { - Utils.tryWithResource(open()){ channel => + Utils.tryWithResource(open()) { channel => var remaining = blockSize val chunks = new ListBuffer[ByteBuffer]() while (remaining > 0) { @@ -178,7 +180,9 @@ private class DiskBlockData( } override def toByteBuffer(): ByteBuffer = { - require( size < Int.MaxValue, s"can't create a byte buffer of size $blockSize since it exceeds Int.MaxValue ${Int.MaxValue}.") + require( size < Int.MaxValue + , s"can't create a byte buffer of size $blockSize" + + s" since it exceeds Int.MaxValue ${Int.MaxValue}.") Utils.tryWithResource(open()) { channel => if (blockSize < minMemoryMapBytes) { // For small files, directly read rather than memory map. From 908c7860688534d0bb77bcbebbd2e006a161fb74 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sat, 5 Aug 2017 22:58:52 +0300 Subject: [PATCH 05/16] SPARK-6235__add_failing_tests: adapt DiskStoreSuite to the modifications in the tested class. --- .../org/apache/spark/storage/DiskStoreSuite.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala index de4c2f9a6c432..563ef288d64dc 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala @@ -50,18 +50,18 @@ class DiskStoreSuite extends SparkFunSuite { val diskStoreMapped = new DiskStore(conf.clone().set(confKey, "0"), diskBlockManager, securityManager) diskStoreMapped.putBytes(blockId, byteBuffer) - val mapped = diskStoreMapped.getBytes(blockId).asInstanceOf[ByteBufferBlockData].buffer + val mapped = diskStoreMapped.getBytes(blockId).toByteBuffer() assert(diskStoreMapped.remove(blockId)) val diskStoreNotMapped = new DiskStore(conf.clone().set(confKey, "1m"), diskBlockManager, securityManager) diskStoreNotMapped.putBytes(blockId, byteBuffer) - val notMapped = diskStoreNotMapped.getBytes(blockId).asInstanceOf[ByteBufferBlockData].buffer + val notMapped = diskStoreNotMapped.getBytes(blockId).toByteBuffer() // Not possible to do isInstanceOf due to visibility of HeapByteBuffer - assert(notMapped.getChunks().forall(_.getClass.getName.endsWith("HeapByteBuffer")), + assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"), "Expected HeapByteBuffer for un-mapped read") - assert(mapped.getChunks().forall(_.isInstanceOf[MappedByteBuffer]), + assert(mapped.isInstanceOf[MappedByteBuffer], "Expected MappedByteBuffer for mapped read") def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = { @@ -70,8 +70,8 @@ class DiskStoreSuite extends SparkFunSuite { array } - assert(Arrays.equals(mapped.toArray, bytes)) - assert(Arrays.equals(notMapped.toArray, bytes)) + assert(Arrays.equals(new ChunkedByteBuffer(mapped).toArray, bytes)) + assert(Arrays.equals(new ChunkedByteBuffer(notMapped).toArray, bytes)) } test("block size tracking") { @@ -98,7 +98,7 @@ class DiskStoreSuite extends SparkFunSuite { val diskStore = new DiskStore(conf, diskBlockManager, new SecurityManager(conf)) val mb = 1024*1024 - val gb = 1024*mb + val gb = 1024L*mb val blockId = BlockId("rdd_1_2") diskStore.put(blockId) { chan => From 67f4259ca16c3ca7c904c9ccc5de9acbc25d2271 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sat, 5 Aug 2017 23:57:58 +0300 Subject: [PATCH 06/16] SPARK-6235__add_failing_tests: try to reduce actual memory footprint of the >2gb tests. --- .../spark/storage/BlockManagerSuite.scala | 41 +++++++------------ 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index e8b458d317044..f82d6ee584964 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1416,13 +1416,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } } - def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel){ + def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel) { store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1") def mkBlobs() = { val rng = new java.util.Random(42) - Iterator.fill(2 * 1024 + 1){ - val buff = new Array[Byte](1024 * 1024) - rng.nextBytes(buff) + val buff = new Array[Byte](1024 * 1024) + rng.nextBytes(buff) + Iterator.fill(2 * 1024 + 1) { buff } } @@ -1452,11 +1452,14 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE }) } val getBlockRes = store.getBlockData(RDDBlockId(42, 0)) - withClue(getBlockRes){ + withClue(getBlockRes) { try { assert(getBlockRes.size() >= 2 * 1024 * 1024 * 1024) - Utils.tryWithResource(getBlockRes.createInputStream()){ inpStrm => - val iter = store.serializerManager.dataDeserializeStream(RDDBlockId(42, 0), inpStrm)(implicitly[ClassTag[Array[Byte]]]) + Utils.tryWithResource(getBlockRes.createInputStream()) { inpStrm => + val iter = store + .serializerManager + .dataDeserializeStream(RDDBlockId(42, 0) + , inpStrm)(implicitly[ClassTag[Array[Byte]]]) assert(iter.zipAll(mkBlobs(), null, null).forall { case (a, b) => a != null && @@ -1468,37 +1471,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE getBlockRes.release() } } -/* - val res2 = store.getOrElseUpdate( - RDDBlockId(42, 0), - storageLevel, - implicitly[ClassTag[Array[Byte]]], - mkBlobs _ - ) - withClue(res2) { - assert(res2.isLeft) - assert(res2.left.get.data.zipAll(mkBlobs(), null, null).forall { - case (a, b) => - a != null && - b != null && - a.asInstanceOf[Array[Byte]].seq == b.asInstanceOf[Array[Byte]].seq - }) - }*/ } - test("getOrElseUpdate > 2gb, storage level = disk only"){ + test("getOrElseUpdate > 2gb, storage level = disk only") { testGetOrElseUpdateForLargeBlock(StorageLevel.DISK_ONLY) } - test("getOrElseUpdate > 2gb, storage level = memory deserialized"){ + test("getOrElseUpdate > 2gb, storage level = memory deserialized") { testGetOrElseUpdateForLargeBlock(StorageLevel.MEMORY_ONLY) } - test("getOrElseUpdate > 2gb, storage level = off-heap"){ + test("getOrElseUpdate > 2gb, storage level = off-heap") { testGetOrElseUpdateForLargeBlock(StorageLevel.OFF_HEAP) } - test("getOrElseUpdate > 2gb, storage level = memory serialized"){ + test("getOrElseUpdate > 2gb, storage level = memory serialized") { testGetOrElseUpdateForLargeBlock(StorageLevel.MEMORY_ONLY_SER) } } From 8338b4ee2186f9b7eb5710857d88404009896d9c Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sun, 6 Aug 2017 21:13:53 +0300 Subject: [PATCH 07/16] SPARK-3151: address styling issues. --- .../main/scala/org/apache/spark/storage/DiskStore.scala | 2 +- .../scala/org/apache/spark/storage/DiskStoreSuite.scala | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 85b76807a520f..f265e43452582 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -200,7 +200,7 @@ private class DiskBlockData( override def dispose(): Unit = {} - private def open() = new FileInputStream(file).getChannel + private def open() = new FileInputStream(file).getChannel } private class EncryptedBlockData( diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala index 563ef288d64dc..cf56d75e1db74 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala @@ -97,15 +97,15 @@ class DiskStoreSuite extends SparkFunSuite { val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true) val diskStore = new DiskStore(conf, diskBlockManager, new SecurityManager(conf)) - val mb = 1024*1024 - val gb = 1024L*mb + val mb = 1024 * 1024 + val gb = 1024L * mb val blockId = BlockId("rdd_1_2") diskStore.put(blockId) { chan => val arr = new Array[Byte](mb) - for{ + for { _ <- 0 until 2048 - }{ + } { val buf = ByteBuffer.wrap(arr) while (buf.hasRemaining()) { chan.write(buf) From 4a320e6345c73d976eecf01d85e29c2d3b9b1fc8 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sun, 6 Aug 2017 21:51:18 +0300 Subject: [PATCH 08/16] SPARK-3151: address a comment about code readability. --- core/src/main/scala/org/apache/spark/storage/DiskStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index f265e43452582..7cb4d8fdaadfe 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -180,7 +180,7 @@ private class DiskBlockData( } override def toByteBuffer(): ByteBuffer = { - require( size < Int.MaxValue + require( blockSize < Int.MaxValue , s"can't create a byte buffer of size $blockSize" + s" since it exceeds Int.MaxValue ${Int.MaxValue}.") Utils.tryWithResource(open()) { channel => From 5a5c3448f87b4bc2dddec26310e952dd2eadb98b Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Tue, 8 Aug 2017 16:46:14 +0300 Subject: [PATCH 09/16] SPARK-3151: address comments by @cloud-fan. --- .../scala/org/apache/spark/storage/BlockManagerSuite.scala | 2 +- .../test/scala/org/apache/spark/storage/DiskStoreSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index f82d6ee584964..6e01415453685 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1416,7 +1416,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } } - def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel) { + def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) { store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1") def mkBlobs() = { val rng = new java.util.Random(42) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala index cf56d75e1db74..303498d8182e9 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala @@ -104,7 +104,7 @@ class DiskStoreSuite extends SparkFunSuite { diskStore.put(blockId) { chan => val arr = new Array[Byte](mb) for { - _ <- 0 until 2048 + _ <- 0 until 3072 } { val buf = ByteBuffer.wrap(arr) while (buf.hasRemaining()) { @@ -114,7 +114,7 @@ class DiskStoreSuite extends SparkFunSuite { } val blockData = diskStore.getBytes(blockId) - assert(blockData.size == 2 * gb) + assert(blockData.size == 3 * gb) } test("block data encryption") { From 6cbe8d07d001b04e67c77e191bef2bc6cce6b0ea Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Fri, 11 Aug 2017 10:31:07 +0300 Subject: [PATCH 10/16] SPARK-3151: increase max heap size for forked tests. --- project/SparkBuild.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 89b0c7a3ab7b0..890bdcd412e4a 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -790,7 +790,7 @@ object TestSettings { javaOptions in Test ++= System.getProperties.asScala.filter(_._1.startsWith("spark")) .map { case (k,v) => s"-D$k=$v" }.toSeq, javaOptions in Test += "-ea", - javaOptions in Test ++= "-Xmx3g -Xss4096k" + javaOptions in Test ++= "-Xmx6g -Xss4096k" .split(" ").toSeq, javaOptions += "-Xmx3g", // Exclude tags defined in a system property From f6fb9e9fd08a676d0c634d407edfe5be289deaff Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Tue, 15 Aug 2017 11:22:19 +0300 Subject: [PATCH 11/16] SPARK-3151: follow @vanzin's style comments --- .../src/main/scala/org/apache/spark/storage/DiskStore.scala | 6 +++--- .../scala/org/apache/spark/storage/BlockManagerSuite.scala | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 7cb4d8fdaadfe..77a22bb6752e4 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -180,9 +180,9 @@ private class DiskBlockData( } override def toByteBuffer(): ByteBuffer = { - require( blockSize < Int.MaxValue - , s"can't create a byte buffer of size $blockSize" - + s" since it exceeds Int.MaxValue ${Int.MaxValue}.") + require(blockSize < Int.MaxValue, + s"can't create a byte buffer of size $blockSize" + + s" since it exceeds Int.MaxValue ${Int.MaxValue}.") Utils.tryWithResource(open()) { channel => if (blockSize < minMemoryMapBytes) { // For small files, directly read rather than memory map. diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 6e01415453685..e239abac2af34 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1438,7 +1438,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE case (a, b) => a != null && b != null && - a.asInstanceOf[Array[Byte]].seq == b.asInstanceOf[Array[Byte]].seq + a.asInstanceOf[Array[Byte]].seq === b.asInstanceOf[Array[Byte]].seq }) } val getResult = store.get(RDDBlockId(42, 0)) @@ -1448,7 +1448,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE case (a, b) => a != null && b != null && - a.asInstanceOf[Array[Byte]].seq == b.asInstanceOf[Array[Byte]].seq + a.asInstanceOf[Array[Byte]].seq === b.asInstanceOf[Array[Byte]].seq }) } val getBlockRes = store.getBlockData(RDDBlockId(42, 0)) @@ -1464,7 +1464,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE case (a, b) => a != null && b != null && - a.asInstanceOf[Array[Byte]].seq == b.asInstanceOf[Array[Byte]].seq + a.asInstanceOf[Array[Byte]].seq === b.asInstanceOf[Array[Byte]].seq }) } } finally { From 0e3cd8244c3ae11943b0fabb443a504e150c06ed Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Tue, 15 Aug 2017 13:20:28 +0300 Subject: [PATCH 12/16] SPARK-3151: refactort DiskStore and DiskSttoreSuite according to @vansin's comments. --- .../org/apache/spark/storage/DiskStore.scala | 16 ++++++++----- .../apache/spark/storage/DiskStoreSuite.scala | 24 +++++++++++++++---- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 77a22bb6752e4..8744a29c1883f 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -47,6 +47,8 @@ private[spark] class DiskStore( securityManager: SecurityManager) extends Logging { private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m") + private val maxMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", + s"${Int.MaxValue}b") private val blockSizes = new ConcurrentHashMap[String, Long]() def getSize(blockId: BlockId): Long = blockSizes.get(blockId.name) @@ -108,7 +110,7 @@ private[spark] class DiskStore( new EncryptedBlockData(file, blockSize, conf, key) case _ => - new DiskBlockData(conf, file, blockSize) + new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize) } } @@ -148,12 +150,11 @@ private[spark] class DiskStore( } private class DiskBlockData( - conf: SparkConf, + minMemoryMapBytes : Long, + maxMemoryMapBytes : Long, file: File, blockSize: Long) extends BlockData { - private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m") - override def toInputStream(): InputStream = new FileInputStream(file) /** @@ -168,7 +169,7 @@ private class DiskBlockData( var remaining = blockSize val chunks = new ListBuffer[ByteBuffer]() while (remaining > 0) { - val chunkSize = math.min(remaining, Int.MaxValue) + val chunkSize = math.min(remaining, maxMemoryMapBytes) val chunk = allocator(chunkSize.toInt) remaining -= chunkSize JavaUtils.readFully(channel, chunk) @@ -180,7 +181,10 @@ private class DiskBlockData( } override def toByteBuffer(): ByteBuffer = { - require(blockSize < Int.MaxValue, + // I chose to leave to original error message here + // since users are unfamiliar with the configureation key + // controling maxMemoryMapBytes for tests + require(blockSize < maxMemoryMapBytes, s"can't create a byte buffer of size $blockSize" + s" since it exceeds Int.MaxValue ${Int.MaxValue}.") Utils.tryWithResource(open()) { channel => diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala index 303498d8182e9..ce9e2c69c98b0 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala @@ -94,17 +94,16 @@ class DiskStoreSuite extends SparkFunSuite { test("blocks larger than 2gb") { val conf = new SparkConf() + .set("spark.storage.memoryMapLimitForTests", "10k" ) val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true) val diskStore = new DiskStore(conf, diskBlockManager, new SecurityManager(conf)) - val mb = 1024 * 1024 - val gb = 1024L * mb val blockId = BlockId("rdd_1_2") diskStore.put(blockId) { chan => - val arr = new Array[Byte](mb) + val arr = new Array[Byte](1024) for { - _ <- 0 until 3072 + _ <- 0 until 20 } { val buf = ByteBuffer.wrap(arr) while (buf.hasRemaining()) { @@ -114,7 +113,22 @@ class DiskStoreSuite extends SparkFunSuite { } val blockData = diskStore.getBytes(blockId) - assert(blockData.size == 3 * gb) + assert(blockData.size == 20 * 1024) + + val chunkedByteBuffer = blockData.toChunkedByteBuffer(ByteBuffer.allocate) + val chunks = chunkedByteBuffer.chunks + assert(chunks.size === 2) + for( chunk <- chunks ) { + assert(chunk.limit === 10 * 1024) + } + + val e = intercept[IllegalArgumentException]{ + blockData.toByteBuffer() + } + + assert(e.getMessage == + s"requirement failed: can't create a byte buffer of size ${blockData.size}" + + s" since it exceeds Int.MaxValue ${Int.MaxValue}.") } test("block data encryption") { From a911c856509333216f16faad6371692a632bfe21 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Tue, 15 Aug 2017 21:57:02 +0300 Subject: [PATCH 13/16] Revert "SPARK-3151: increase max heap size for forked tests." This reverts commit 6cbe8d07d001b04e67c77e191bef2bc6cce6b0ea. --- project/SparkBuild.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 890bdcd412e4a..89b0c7a3ab7b0 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -790,7 +790,7 @@ object TestSettings { javaOptions in Test ++= System.getProperties.asScala.filter(_._1.startsWith("spark")) .map { case (k,v) => s"-D$k=$v" }.toSeq, javaOptions in Test += "-ea", - javaOptions in Test ++= "-Xmx6g -Xss4096k" + javaOptions in Test ++= "-Xmx3g -Xss4096k" .split(" ").toSeq, javaOptions += "-Xmx3g", // Exclude tags defined in a system property From 8be899f93922ed3181fd67be0782bff8f47c6c37 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Tue, 15 Aug 2017 21:59:00 +0300 Subject: [PATCH 14/16] SPARK-3151: revert BlockManagerSuite changes. --- .../spark/storage/BlockManagerSuite.scala | 73 ------------------- 1 file changed, 73 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index c2d48e341be6b..755a61a438a6a 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1415,79 +1415,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE super.fetchBlockSync(host, port, execId, blockId) } } - - def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) { - store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1") - def mkBlobs() = { - val rng = new java.util.Random(42) - val buff = new Array[Byte](1024 * 1024) - rng.nextBytes(buff) - Iterator.fill(2 * 1024 + 1) { - buff - } - } - val res1 = store.getOrElseUpdate( - RDDBlockId(42, 0), - storageLevel, - implicitly[ClassTag[Array[Byte]]], - mkBlobs _ - ) - withClue(res1) { - assert(res1.isLeft) - assert(res1.left.get.data.zipAll(mkBlobs(), null, null).forall { - case (a, b) => - a != null && - b != null && - a.asInstanceOf[Array[Byte]].seq === b.asInstanceOf[Array[Byte]].seq - }) - } - val getResult = store.get(RDDBlockId(42, 0)) - withClue(getResult) { - assert(getResult.isDefined) - assert(getResult.get.data.zipAll(mkBlobs(), null, null).forall { - case (a, b) => - a != null && - b != null && - a.asInstanceOf[Array[Byte]].seq === b.asInstanceOf[Array[Byte]].seq - }) - } - val getBlockRes = store.getBlockData(RDDBlockId(42, 0)) - withClue(getBlockRes) { - try { - assert(getBlockRes.size() >= 2 * 1024 * 1024 * 1024) - Utils.tryWithResource(getBlockRes.createInputStream()) { inpStrm => - val iter = store - .serializerManager - .dataDeserializeStream(RDDBlockId(42, 0) - , inpStrm)(implicitly[ClassTag[Array[Byte]]]) - assert(iter.zipAll(mkBlobs(), null, null).forall { - case (a, b) => - a != null && - b != null && - a.asInstanceOf[Array[Byte]].seq === b.asInstanceOf[Array[Byte]].seq - }) - } - } finally { - getBlockRes.release() - } - } - } - - test("getOrElseUpdate > 2gb, storage level = disk only") { - testGetOrElseUpdateForLargeBlock(StorageLevel.DISK_ONLY) - } - - test("getOrElseUpdate > 2gb, storage level = memory deserialized") { - testGetOrElseUpdateForLargeBlock(StorageLevel.MEMORY_ONLY) - } - - test("getOrElseUpdate > 2gb, storage level = off-heap") { - testGetOrElseUpdateForLargeBlock(StorageLevel.OFF_HEAP) - } - - test("getOrElseUpdate > 2gb, storage level = memory serialized") { - testGetOrElseUpdateForLargeBlock(StorageLevel.MEMORY_ONLY_SER) - } } private object BlockManagerSuite { From 732073c5c73d4c12cc1059314c25f1ae94fc4469 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Wed, 16 Aug 2017 08:02:08 +0300 Subject: [PATCH 15/16] SPARK-3151: address style comments by @vanzin --- .../test/scala/org/apache/spark/storage/DiskStoreSuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala index ce9e2c69c98b0..005e9745c2ebc 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala @@ -98,7 +98,6 @@ class DiskStoreSuite extends SparkFunSuite { val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true) val diskStore = new DiskStore(conf, diskBlockManager, new SecurityManager(conf)) - val blockId = BlockId("rdd_1_2") diskStore.put(blockId) { chan => val arr = new Array[Byte](1024) @@ -118,7 +117,7 @@ class DiskStoreSuite extends SparkFunSuite { val chunkedByteBuffer = blockData.toChunkedByteBuffer(ByteBuffer.allocate) val chunks = chunkedByteBuffer.chunks assert(chunks.size === 2) - for( chunk <- chunks ) { + for (chunk <- chunks) { assert(chunk.limit === 10 * 1024) } @@ -126,7 +125,7 @@ class DiskStoreSuite extends SparkFunSuite { blockData.toByteBuffer() } - assert(e.getMessage == + assert(e.getMessage === s"requirement failed: can't create a byte buffer of size ${blockData.size}" + s" since it exceeds Int.MaxValue ${Int.MaxValue}.") } From d0c98a14ed3d9b4d70129c52af186c2c86f93f41 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Wed, 16 Aug 2017 21:49:43 +0300 Subject: [PATCH 16/16] SPARK-3151: address comments by @cloud-fan. --- .../scala/org/apache/spark/storage/DiskStore.scala | 11 ++++------- .../org/apache/spark/storage/DiskStoreSuite.scala | 2 +- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 8744a29c1883f..95d70479ef017 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -48,7 +48,7 @@ private[spark] class DiskStore( private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m") private val maxMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", - s"${Int.MaxValue}b") + Int.MaxValue.toString) private val blockSizes = new ConcurrentHashMap[String, Long]() def getSize(blockId: BlockId): Long = blockSizes.get(blockId.name) @@ -150,8 +150,8 @@ private[spark] class DiskStore( } private class DiskBlockData( - minMemoryMapBytes : Long, - maxMemoryMapBytes : Long, + minMemoryMapBytes: Long, + maxMemoryMapBytes: Long, file: File, blockSize: Long) extends BlockData { @@ -181,12 +181,9 @@ private class DiskBlockData( } override def toByteBuffer(): ByteBuffer = { - // I chose to leave to original error message here - // since users are unfamiliar with the configureation key - // controling maxMemoryMapBytes for tests require(blockSize < maxMemoryMapBytes, s"can't create a byte buffer of size $blockSize" + - s" since it exceeds Int.MaxValue ${Int.MaxValue}.") + s" since it exceeds ${Utils.bytesToString(maxMemoryMapBytes)}.") Utils.tryWithResource(open()) { channel => if (blockSize < minMemoryMapBytes) { // For small files, directly read rather than memory map. diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala index 005e9745c2ebc..36977d8c554ad 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala @@ -127,7 +127,7 @@ class DiskStoreSuite extends SparkFunSuite { assert(e.getMessage === s"requirement failed: can't create a byte buffer of size ${blockData.size}" + - s" since it exceeds Int.MaxValue ${Int.MaxValue}.") + " since it exceeds 10.0 KB.") } test("block data encryption") {