Skip to content

Commit

Permalink
fix file's leak
Browse files Browse the repository at this point in the history
  • Loading branch information
lianhuiwang committed Jul 1, 2015
1 parent 83040cf commit f4737bb
Showing 1 changed file with 15 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -615,10 +615,8 @@ private[spark] class ExternalSorter[K, V, C](

/**
* Spill in-memory inMemory to a temporary file on disk.
* Return on-disk iterator over a temporary file.
*/
private[this] def spillMemoryToDisk(iterator: Iterator[((Int, K), C)])
: Iterator[((Int, K), C)] = {
private[this] def spillMemoryToDisk(iterator: Iterator[((Int, K), C)]): SpilledFile = {

val it = new WritablePartitionedIterator {
private[this] var cur = if (iterator.hasNext) iterator.next() else null
Expand All @@ -633,12 +631,7 @@ private[spark] class ExternalSorter[K, V, C](
def nextPartition(): Int = cur._1._1
}

val spillReader = new SpillReader(spillMemoryToDisk(it))

(0 until numPartitions).iterator.flatMap { p =>
val iterator = spillReader.readNextPartition()
iterator.map(cur => ((p, cur._1), cur._2))
}
spillMemoryToDisk(it)
}

/**
Expand All @@ -649,14 +642,21 @@ private[spark] class ExternalSorter[K, V, C](
extends Iterator[((Int, K), C)] {

var currentIter = memIter
var spillFile: Option[SpilledFile] = None

override def hasNext: Boolean = currentIter.hasNext

override def next(): ((Int, K), C) = currentIter.next()

private[spark] def spill() = {
if (hasNext) {
currentIter = spillMemoryToDisk(currentIter)
spillFile = Some(spillMemoryToDisk(currentIter))
val spillReader = new SpillReader(spillFile.get)

currentIter = (0 until numPartitions).iterator.flatMap { p =>
val iterator = spillReader.readNextPartition()
iterator.map(cur => ((p, cur._1), cur._2))
}
} else {
// in-memory iterator is already drained, release it by giving an empty iterator
currentIter = new Iterator[((Int, K), C)]{
Expand All @@ -666,6 +666,10 @@ private[spark] class ExternalSorter[K, V, C](
logInfo("nothing in memory inMemory, do nothing")
}
}

def cleanup(): Unit = {
spillFile.foreach(_.file.delete())
}
}

/**
Expand Down Expand Up @@ -792,6 +796,7 @@ private[spark] class ExternalSorter[K, V, C](
def stop(): Unit = {
spills.foreach(s => s.file.delete())
spills.clear()
memoryOrDiskIter.foreach(_.cleanup())
}

/**
Expand Down

0 comments on commit f4737bb

Please sign in to comment.