From c7a739e5575098736247f1c42d07fb5fa110e22b Mon Sep 17 00:00:00 2001 From: ravipesala Date: Wed, 4 Mar 2015 00:09:42 +0530 Subject: [PATCH] Added BufferedInputStream at required places --- .../spark/deploy/master/FileSystemPersistenceEngine.scala | 4 ++-- .../apache/spark/shuffle/IndexShuffleBlockManager.scala | 2 +- .../org/apache/spark/util/collection/ExternalSorter.scala | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala index 36a2e2c6a6349..5e30460eb025e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala @@ -58,7 +58,7 @@ private[spark] class FileSystemPersistenceEngine( if (!created) { throw new IllegalStateException("Could not create file: " + file) } val serializer = serialization.findSerializerFor(value) val serialized = serializer.toBinary(value) - val out = new FileOutputStream(file) + val out = new BufferedOutputStream(new FileOutputStream(file)) try { out.write(serialized) } finally { @@ -68,7 +68,7 @@ private[spark] class FileSystemPersistenceEngine( private def deserializeFromFile[T](file: File)(implicit m: ClassTag[T]): T = { val fileData = new Array[Byte](file.length().asInstanceOf[Int]) - val dis = new DataInputStream(new FileInputStream(file)) + val dis = new DataInputStream(new BufferedInputStream(new FileInputStream(file))) try { dis.readFully(fileData) } finally { diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala index b292587d37028..2ec953649f3f6 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala @@ -106,7 +106,7 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager { // find out the consolidated file, then the offset within that from our index val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) - val in = new DataInputStream(new FileInputStream(indexFile)) + val in = new DataInputStream(new BufferedInputStream(new FileInputStream(indexFile))) try { ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index d69f2d9048055..18e04fd51bf53 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -721,13 +721,13 @@ private[spark] class ExternalSorter[K, V, C]( // this simple we spill out the current in-memory collection so that everything is in files. spillToPartitionFiles(if (aggregator.isDefined) map else buffer) partitionWriters.foreach(_.commitAndClose()) - var out: FileOutputStream = null - var in: FileInputStream = null + var out: OutputStream = null + var in: InputStream = null val writeStartTime = System.nanoTime try { - out = new FileOutputStream(outputFile, true) + out = new BufferedOutputStream(new FileOutputStream(outputFile, true)) for (i <- 0 until numPartitions) { - in = new FileInputStream(partitionWriters(i).fileSegment().file) + in = new BufferedInputStream(new FileInputStream(partitionWriters(i).fileSegment().file)) val size = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled) in.close() in = null