Skip to content

Commit

Permalink
Throw memory leak warning even in case of error; add warning about co…
Browse files Browse the repository at this point in the history
…de duplication
  • Loading branch information
JoshRosen committed Apr 29, 2015
1 parent 70a39e4 commit 50e9671
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 9 deletions.
10 changes: 4 additions & 6 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,13 @@ private[spark] class Executor(

// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
var succeeded: Boolean = false
val value = try {
val value = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
succeeded = true
value
task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
} finally {
// Release managed memory used by this task
// Note: this memory freeing logic is duplicated in DAGScheduler.runLocallyWithinThread;
// when changing this, make sure to update both copies.
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (succeeded && freedMemory > 0) {
if (freedMemory > 0) {
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
throw new SparkException(errMsg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,16 +654,16 @@ class DAGScheduler(
taskMemoryManager = taskMemoryManager,
runningLocally = true)
TaskContext.setTaskContext(taskContext)
var succeeded: Boolean = false
try {
val result = job.func(taskContext, rdd.iterator(split, taskContext))
succeeded = true
job.listener.taskSucceeded(0, result)
} finally {
taskContext.markTaskCompleted()
TaskContext.unset()
// Note: this memory freeing logic is duplicated in Executor.run(); when changing this,
// make sure to update both copies.
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (succeeded && freedMemory > 0) {
if (freedMemory > 0) {
if (sc.getConf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
throw new SparkException(s"Managed memory leak detected; size = $freedMemory bytes")
} else {
Expand Down

0 comments on commit 50e9671

Please sign in to comment.