From 048cd5b5848c0f89213c897090035ec8cf6295df Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 15 Oct 2015 14:28:18 +0800 Subject: [PATCH 1/3] Fix a memory leak in SQLListener._stageIdToStageMetrics --- .../org/apache/spark/sql/execution/ui/SQLListener.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index d6472400a6a21..007b71ae3595b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -126,7 +126,13 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi val stageId = stageSubmitted.stageInfo.stageId val stageAttemptId = stageSubmitted.stageInfo.attemptId // Always override metrics for old stage attempt - _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId) + if (_stageIdToStageMetrics.contains(stageId)) { + _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId) + } else { + // If a stage belongs to some SQL execution, its stageId will be put in "onJobStart". + // Since "_stageIdToStageMetrics" doesn't contain it, it must not belong to any SQL execution. + // So we can ignore it. + } } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { From 2054ec13a45bcfc6cb2803fb90ded1a99c1517b4 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 16 Oct 2015 10:26:48 +0800 Subject: [PATCH 2/3] Add a unit test for SPARK-11126 --- .../sql/execution/ui/SQLListenerSuite.scala | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 727cf3665a871..cee2170f4d165 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -309,7 +309,22 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { assert(executionUIData.failedJobs === Seq(0)) } - ignore("no memory leak") { + test("SPARK-11126: no memory leak when running non SQL jobs") { + val previousStageNumber = sqlContext.listener.stageIdToStageMetrics.size + sqlContext.sparkContext.parallelize(1 to 10).foreach(i => ()) + // listener should ignore the non SQL stage + assert(sqlContext.listener.stageIdToStageMetrics.size == previousStageNumber) + + sqlContext.sparkContext.parallelize(1 to 10).toDF().foreach(i => ()) + // listener should save the SQL stage + assert(sqlContext.listener.stageIdToStageMetrics.size == previousStageNumber + 1) + } + +} + +class SQLListenerMemoryLeakSuite extends SparkFunSuite { + + test("no memory leak") { val conf = new SparkConf() .setMaster("local") .setAppName("test") @@ -344,5 +359,4 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { sc.stop() } } - } From da33fb6e4e6728105e590821c720cb69cd5ad292 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sat, 17 Oct 2015 12:08:38 +0800 Subject: [PATCH 3/3] Update the comment --- .../scala/org/apache/spark/sql/execution/ui/SQLListener.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 007b71ae3595b..c752b2ce772c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -131,7 +131,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi } else { // If a stage belongs to some SQL execution, its stageId will be put in "onJobStart". // Since "_stageIdToStageMetrics" doesn't contain it, it must not belong to any SQL execution. - // So we can ignore it. + // So we can ignore it. Otherwise, this may lead to memory leaks (SPARK-11126). } }