From f4737bbafc495f56fef2246f41c964c7295a7eb2 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Wed, 1 Jul 2015 22:56:46 +0800 Subject: [PATCH] fix file's leak --- .../util/collection/ExternalSorter.scala | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 6856f8619aa11..be0aeb5b22b58 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -615,10 +615,8 @@ private[spark] class ExternalSorter[K, V, C]( /** * Spill in-memory inMemory to a temporary file on disk. - * Return on-disk iterator over a temporary file. */ - private[this] def spillMemoryToDisk(iterator: Iterator[((Int, K), C)]) - : Iterator[((Int, K), C)] = { + private[this] def spillMemoryToDisk(iterator: Iterator[((Int, K), C)]): SpilledFile = { val it = new WritablePartitionedIterator { private[this] var cur = if (iterator.hasNext) iterator.next() else null @@ -633,12 +631,7 @@ private[spark] class ExternalSorter[K, V, C]( def nextPartition(): Int = cur._1._1 } - val spillReader = new SpillReader(spillMemoryToDisk(it)) - - (0 until numPartitions).iterator.flatMap { p => - val iterator = spillReader.readNextPartition() - iterator.map(cur => ((p, cur._1), cur._2)) - } + spillMemoryToDisk(it) } /** @@ -649,6 +642,7 @@ private[spark] class ExternalSorter[K, V, C]( extends Iterator[((Int, K), C)] { var currentIter = memIter + var spillFile: Option[SpilledFile] = None override def hasNext: Boolean = currentIter.hasNext @@ -656,7 +650,13 @@ private[spark] class ExternalSorter[K, V, C]( private[spark] def spill() = { if (hasNext) { - currentIter = spillMemoryToDisk(currentIter) + spillFile = Some(spillMemoryToDisk(currentIter)) + val spillReader = new SpillReader(spillFile.get) + + currentIter = (0 until numPartitions).iterator.flatMap { p => + val iterator = spillReader.readNextPartition() + iterator.map(cur => ((p, cur._1), cur._2)) + } } else { // in-memory iterator is already drained, release it by giving an empty iterator currentIter = new Iterator[((Int, K), C)]{ @@ -666,6 +666,10 @@ private[spark] class ExternalSorter[K, V, C]( logInfo("nothing in memory inMemory, do nothing") } } + + def cleanup(): Unit = { + spillFile.foreach(_.file.delete()) + } } /** @@ -792,6 +796,7 @@ private[spark] class ExternalSorter[K, V, C]( def stop(): Unit = { spills.foreach(s => s.file.delete()) spills.clear() + memoryOrDiskIter.foreach(_.cleanup()) } /**