From a73aa1910e971da866ac8195c3c6204c6f3ba895 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 22 Sep 2015 11:44:43 -0700 Subject: [PATCH] Refactor DiskBlockObjectWriter to not require BlockId. --- .../sort/BypassMergeSortShuffleWriter.java | 9 +++-- .../shuffle/IndexShuffleBlockResolver.scala | 3 +- .../apache/spark/storage/BlockManager.scala | 2 +- .../spark/storage/DiskBlockObjectWriter.scala | 7 ++-- .../unsafe/UnsafeShuffleWriterSuite.java | 1 - .../sort/UnsafeExternalSorterSuite.java | 1 - .../BypassMergeSortShuffleWriterSuite.scala | 1 - .../storage/DiskBlockObjectWriterSuite.scala | 33 +++++++++---------- 8 files changed, 27 insertions(+), 30 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 0b8b604e18494..f5d80bbcf3557 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -151,7 +151,7 @@ public long[] writePartitionedFile( } finally { Closeables.close(in, copyThrewException); } - if (!blockManager.diskBlockManager().getFile(partitionWriters[i].blockId()).delete()) { + if (!partitionWriters[i].fileSegment().file().delete()) { logger.error("Unable to delete file for partition {}", i); } } @@ -168,12 +168,11 @@ public long[] writePartitionedFile( public void stop() throws IOException { if (partitionWriters != null) { try { - final DiskBlockManager diskBlockManager = blockManager.diskBlockManager(); for (DiskBlockObjectWriter writer : partitionWriters) { // This method explicitly does _not_ throw exceptions: - writer.revertPartialWritesAndClose(); - if (!diskBlockManager.getFile(writer.blockId()).delete()) { - logger.error("Error while deleting file for block {}", writer.blockId()); + File file = writer.revertPartialWritesAndClose(); + if (!file.delete()) { + logger.error("Error while deleting file {}", file.getAbsolutePath()); } } } finally { diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index d0163d326dba7..e9af1d69f3f23 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -114,9 +114,8 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB } private[spark] object IndexShuffleBlockResolver { - // No-op reduce ID used in interactions with disk store and DiskBlockObjectWriter. + // No-op reduce ID used in interactions with disk store. // The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort // shuffle outputs for several reduces are glommed into a single file. - // TODO: Avoid this entirely by having the DiskBlockObjectWriter not require a BlockId. val NOOP_REDUCE_ID = 0 } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index bca3942f8c555..47bd2ef8b2941 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -669,7 +669,7 @@ private[spark] class BlockManager( writeMetrics: ShuffleWriteMetrics): DiskBlockObjectWriter = { val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _) val syncWrites = conf.getBoolean("spark.shuffle.sync", false) - new DiskBlockObjectWriter(blockId, file, serializerInstance, bufferSize, compressStream, + new DiskBlockObjectWriter(file, serializerInstance, bufferSize, compressStream, syncWrites, writeMetrics) } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index 49d9154f95a5b..80d426fadc65e 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -34,7 +34,6 @@ import org.apache.spark.util.Utils * reopened again. */ private[spark] class DiskBlockObjectWriter( - val blockId: BlockId, file: File, serializerInstance: SerializerInstance, bufferSize: Int, @@ -144,8 +143,10 @@ private[spark] class DiskBlockObjectWriter( * Reverts writes that haven't been flushed yet. Callers should invoke this function * when there are runtime exceptions. This method will not throw, though it may be * unsuccessful in truncating written data. + * + * @return the file that this DiskBlockObjectWriter wrote to. */ - def revertPartialWritesAndClose() { + def revertPartialWritesAndClose(): File = { // Discard current writes. We do this by flushing the outstanding writes and then // truncating the file to its initial position. try { @@ -160,12 +161,14 @@ private[spark] class DiskBlockObjectWriter( val truncateStream = new FileOutputStream(file, true) try { truncateStream.getChannel.truncate(initialPosition) + file } finally { truncateStream.close() } } catch { case e: Exception => logError("Uncaught exception while reverting partial writes to file " + file, e) + file } } diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java index a266b0c36e0fa..d218344cd4520 100644 --- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java @@ -129,7 +129,6 @@ public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) throws Th Object[] args = invocationOnMock.getArguments(); return new DiskBlockObjectWriter( - (BlockId) args[0], (File) args[1], (SerializerInstance) args[2], (Integer) args[3], diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index 445a37b83e98a..a5bbaa95fa456 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -127,7 +127,6 @@ public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) throws Th Object[] args = invocationOnMock.getArguments(); return new DiskBlockObjectWriter( - (BlockId) args[0], (File) args[1], (SerializerInstance) args[2], (Integer) args[3], diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index cc7342f1ecd78..341f56df2dafc 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -72,7 +72,6 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte override def answer(invocation: InvocationOnMock): DiskBlockObjectWriter = { val args = invocation.getArguments new DiskBlockObjectWriter( - args(0).asInstanceOf[BlockId], args(1).asInstanceOf[File], args(2).asInstanceOf[SerializerInstance], args(3).asInstanceOf[Int], diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala index 66af6e1a79740..7c19531c18802 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala @@ -20,7 +20,6 @@ import java.io.File import org.scalatest.BeforeAndAfterEach -import org.apache.spark.SparkConf import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.serializer.JavaSerializer @@ -41,8 +40,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("verify write metrics") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) writer.write(Long.box(20), Long.box(30)) // Record metrics update on every write @@ -63,8 +62,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("verify write metrics on revert") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) writer.write(Long.box(20), Long.box(30)) // Record metrics update on every write @@ -86,8 +85,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("Reopening a closed block writer") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) writer.open() writer.close() @@ -99,8 +98,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("calling revertPartialWritesAndClose() on a closed block writer should have no effect") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) for (i <- 1 to 1000) { writer.write(i, i) } @@ -115,8 +114,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("commitAndClose() should be idempotent") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) for (i <- 1 to 1000) { writer.write(i, i) } @@ -133,8 +132,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("revertPartialWritesAndClose() should be idempotent") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) for (i <- 1 to 1000) { writer.write(i, i) } @@ -151,8 +150,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("fileSegment() can only be called after commitAndClose() has been called") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) for (i <- 1 to 1000) { writer.write(i, i) } @@ -165,8 +164,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("commitAndClose() without ever opening or writing") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) writer.commitAndClose() assert(writer.fileSegment().length === 0) }