Skip to content

Commit

Permalink
[SPARK-5423] [CORE] Register a TaskCompletionListener to make sure re…
Browse files Browse the repository at this point in the history
…lease all resources

Make `DiskMapIterator.cleanup` idempotent and register a TaskCompletionListener to make sure call `cleanup`.

Author: zsxwing <zsxwing@gmail.com>

Closes #7529 from zsxwing/SPARK-5423 and squashes the following commits:

3e3c413 [zsxwing] Remove TODO
9556c78 [zsxwing] Fix NullPointerException for tests
3d574d9 [zsxwing] Register a TaskCompletionListener to make sure release all resources
  • Loading branch information
zsxwing authored and Andrew Or committed Jul 21, 2015
1 parent 4f7f1ee commit d45355e
Showing 1 changed file with 19 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer

import com.google.common.io.ByteStreams

import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.{Logging, SparkEnv, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.serializer.{DeserializationStream, Serializer}
import org.apache.spark.storage.{BlockId, BlockManager}
Expand Down Expand Up @@ -470,14 +470,27 @@ class ExternalAppendOnlyMap[K, V, C](
item
}

// TODO: Ensure this gets called even if the iterator isn't drained.
private def cleanup() {
batchIndex = batchOffsets.length // Prevent reading any other batch
val ds = deserializeStream
deserializeStream = null
fileStream = null
ds.close()
file.delete()
if (ds != null) {
ds.close()
deserializeStream = null
}
if (fileStream != null) {
fileStream.close()
fileStream = null
}
if (file.exists()) {
file.delete()
}
}

val context = TaskContext.get()
// context is null in some tests of ExternalAppendOnlyMapSuite because these tests don't run in
// a TaskContext.
if (context != null) {
context.addTaskCompletionListener(context => cleanup())
}
}

Expand Down

0 comments on commit d45355e

Please sign in to comment.