From 52dfaf5b4066ccb7edcf3c5519a83d044e05bc6a Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 11 May 2015 15:10:00 +0800 Subject: [PATCH] Close files correctly when iterator is finished --- .../apache/spark/streaming/util/FileBasedWriteAheadLog.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 9985fedc35141..87ba4f84a9ceb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -26,7 +26,7 @@ import scala.language.postfixOps import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.{CompletionIterator, ThreadUtils} import org.apache.spark.{Logging, SparkConf} /** @@ -124,7 +124,8 @@ private[streaming] class FileBasedWriteAheadLog( logFilesToRead.iterator.map { file => logDebug(s"Creating log reader with $file") - new FileBasedWriteAheadLogReader(file, hadoopConf) + val reader = new FileBasedWriteAheadLogReader(file, hadoopConf) + CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, reader.close _) } flatMap { x => x } }