Skip to content
Permalink
Browse files

[SPARK-25144][SQL][TEST][BRANCH-2.3] Free aggregate map when task ends

## What changes were proposed in this pull request?

[SPARK-25144](https://issues.apache.org/jira/browse/SPARK-25144) reports memory leaks on Apache Spark 2.0.2 ~ 2.3.2-RC5.

```scala
scala> case class Foo(bar: Option[String])
scala> val ds = List(Foo(Some("bar"))).toDS
scala> val result = ds.flatMap(_.bar).distinct
scala> result.rdd.isEmpty
18/08/19 23:01:54 WARN Executor: Managed memory leak detected; size = 8650752 bytes, TID = 125
res0: Boolean = false
```

This is a backport of cloud-fan 's #21738 which is a single commit among 3 commits of SPARK-21743. In addition, I added a test case to prevent regressions in branch-2.3 and branch-2.2. Although SPARK-21743 is reverted due to regression, this subpatch can go to branch-2.3 and branch-2.2. This will be merged as cloud-fan 's commit.

## How was this patch tested?

Pass the jenkins with a newly added test case.

Closes #22150 from dongjoon-hyun/SPARK-25144.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
  • Loading branch information...
2 people authored and HyukjinKwon committed Aug 20, 2018
1 parent 032f6d9 commit ea01e362f7427c6f16445db95982923f19c07171
@@ -20,8 +20,8 @@
import java.io.IOException;

import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.internal.config.package$;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
@@ -82,27 +82,34 @@ public static boolean supportsAggregationBufferSchema(StructType schema) {
* @param emptyAggregationBuffer the default value for new keys (a "zero" of the agg. function)
* @param aggregationBufferSchema the schema of the aggregation buffer, used for row conversion.
* @param groupingKeySchema the schema of the grouping key, used for row conversion.
* @param taskMemoryManager the memory manager used to allocate our Unsafe memory structures.
* @param taskContext the current task context.
* @param initialCapacity the initial capacity of the map (a sizing hint to avoid re-hashing).
* @param pageSizeBytes the data page size, in bytes; limits the maximum record size.
*/
public UnsafeFixedWidthAggregationMap(
InternalRow emptyAggregationBuffer,
StructType aggregationBufferSchema,
StructType groupingKeySchema,
TaskMemoryManager taskMemoryManager,
TaskContext taskContext,
int initialCapacity,
long pageSizeBytes) {
this.aggregationBufferSchema = aggregationBufferSchema;
this.currentAggregationBuffer = new UnsafeRow(aggregationBufferSchema.length());
this.groupingKeyProjection = UnsafeProjection.create(groupingKeySchema);
this.groupingKeySchema = groupingKeySchema;
this.map =
new BytesToBytesMap(taskMemoryManager, initialCapacity, pageSizeBytes, true);
this.map = new BytesToBytesMap(
taskContext.taskMemoryManager(), initialCapacity, pageSizeBytes, true);

// Initialize the buffer for aggregation value
final UnsafeProjection valueProjection = UnsafeProjection.create(aggregationBufferSchema);
this.emptyAggregationBuffer = valueProjection.apply(emptyAggregationBuffer).getBytes();

// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
// does not fully consume the aggregation map's output (e.g. aggregate followed by limit).
taskContext.addTaskCompletionListener(context -> {
free();
});
}

/**
@@ -324,7 +324,7 @@ case class HashAggregateExec(
initialBuffer,
bufferSchema,
groupingKeySchema,
TaskContext.get().taskMemoryManager(),
TaskContext.get(),
1024 * 16, // initial capacity
TaskContext.get().taskMemoryManager().pageSizeBytes
)
@@ -166,7 +166,7 @@ class TungstenAggregationIterator(
initialAggregationBuffer,
StructType.fromAttributes(aggregateFunctions.flatMap(_.aggBufferAttributes)),
StructType.fromAttributes(groupingExpressions.map(_.toAttribute)),
TaskContext.get().taskMemoryManager(),
TaskContext.get(),
1024 * 16, // initial capacity
TaskContext.get().taskMemoryManager().pageSizeBytes
)
@@ -2818,4 +2818,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
spark.sql(s"select * from spark_25084 distribute by ($distributeExprs)").count === count)
}
}

test("SPARK-25144 'distinct' causes memory leak") {
val ds = List(Foo(Some("bar"))).toDS
val result = ds.flatMap(_.bar).distinct
result.rdd.isEmpty
}
}

case class Foo(bar: Option[String])
@@ -23,6 +23,7 @@ import scala.collection.mutable
import scala.util.{Random, Try}
import scala.util.control.NonFatal

import org.mockito.Mockito._
import org.scalatest.Matchers

import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext, TaskContextImpl}
@@ -54,6 +55,8 @@ class UnsafeFixedWidthAggregationMapSuite
private var memoryManager: TestMemoryManager = null
private var taskMemoryManager: TaskMemoryManager = null

private var taskContext: TaskContext = null

def testWithMemoryLeakDetection(name: String)(f: => Unit) {
def cleanup(): Unit = {
if (taskMemoryManager != null) {
@@ -67,6 +70,8 @@ class UnsafeFixedWidthAggregationMapSuite
val conf = new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false")
memoryManager = new TestMemoryManager(conf)
taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
taskContext = mock(classOf[TaskContext])
when(taskContext.taskMemoryManager()).thenReturn(taskMemoryManager)

TaskContext.setTaskContext(new TaskContextImpl(
stageId = 0,
@@ -111,7 +116,7 @@ class UnsafeFixedWidthAggregationMapSuite
emptyAggregationBuffer,
aggBufferSchema,
groupKeySchema,
taskMemoryManager,
taskContext,
1024, // initial capacity,
PAGE_SIZE_BYTES
)
@@ -124,7 +129,7 @@ class UnsafeFixedWidthAggregationMapSuite
emptyAggregationBuffer,
aggBufferSchema,
groupKeySchema,
taskMemoryManager,
taskContext,
1024, // initial capacity
PAGE_SIZE_BYTES
)
@@ -151,7 +156,7 @@ class UnsafeFixedWidthAggregationMapSuite
emptyAggregationBuffer,
aggBufferSchema,
groupKeySchema,
taskMemoryManager,
taskContext,
128, // initial capacity
PAGE_SIZE_BYTES
)
@@ -176,7 +181,7 @@ class UnsafeFixedWidthAggregationMapSuite
emptyAggregationBuffer,
aggBufferSchema,
groupKeySchema,
taskMemoryManager,
taskContext,
128, // initial capacity
PAGE_SIZE_BYTES
)
@@ -223,7 +228,7 @@ class UnsafeFixedWidthAggregationMapSuite
emptyAggregationBuffer,
aggBufferSchema,
groupKeySchema,
taskMemoryManager,
taskContext,
128, // initial capacity
PAGE_SIZE_BYTES
)
@@ -263,7 +268,7 @@ class UnsafeFixedWidthAggregationMapSuite
emptyAggregationBuffer,
StructType(Nil),
StructType(Nil),
taskMemoryManager,
taskContext,
128, // initial capacity
PAGE_SIZE_BYTES
)
@@ -307,7 +312,7 @@ class UnsafeFixedWidthAggregationMapSuite
emptyAggregationBuffer,
aggBufferSchema,
groupKeySchema,
taskMemoryManager,
taskContext,
128, // initial capacity
pageSize
)
@@ -344,7 +349,7 @@ class UnsafeFixedWidthAggregationMapSuite
emptyAggregationBuffer,
aggBufferSchema,
groupKeySchema,
taskMemoryManager,
taskContext,
128, // initial capacity
pageSize
)

0 comments on commit ea01e36

Please sign in to comment.
You can’t perform that action at this time.