Skip to content

Commit

Permalink
Add task completion callback to avoid leak in limit after sort
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 29, 2015
1 parent ea250da commit 5d0b2d3
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.io.IOException;
import java.util.LinkedList;

import scala.runtime.AbstractFunction0;
import scala.runtime.BoxedUnit;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -90,6 +93,17 @@ public UnsafeExternalSorter(
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
initializeForWriting();

// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
// does not fully consume the sorter's output (e.g. sort followed by limit).
taskContext.addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
freeMemory();
return null;
}
});
}

// TODO: metrics tracking + integration with shuffle write metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
TestSQLContext.conf.setConf(SQLConf.CODEGEN_ENABLED, SQLConf.CODEGEN_ENABLED.defaultValue.get)
}

ignore("sort followed by limit should not leak memory") {
// TODO: this test is going to fail until we implement a proper iterator interface
// with a close() method.
TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
test("sort followed by limit") {
checkThatPlansAgree(
(1 to 100).map(v => Tuple1(v)).toDF("a"),
(child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
Expand All @@ -48,21 +45,6 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
)
}

test("sort followed by limit") {
TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
try {
checkThatPlansAgree(
(1 to 100).map(v => Tuple1(v)).toDF("a"),
(child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
(child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)),
sortAnswers = false
)
} finally {
TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")

}
}

test("sorting does not crash for large inputs") {
val sortOrder = 'a.asc :: Nil
val stringLength = 1024 * 1024 * 2
Expand Down

0 comments on commit 5d0b2d3

Please sign in to comment.