Skip to content

Commit aba785f

Browse files
committed
[KYUUBI #2922] Clean up SparkConsoleProgressBar when SQL execution fails
### _Why are the changes needed?_ Now `SparkConsoleProgressBar` is initialized after constructing `SQLOperationListener`. Sometimes the user's SQL syntax may fail. At this time, because there is no `SparkListenerSQLExecutionEnd` event, `SparkConsoleProgressBar` is not cleaned up, resulting in a memory leak. In this PR, make the following changes - Added cleanup method - Some objects use lazy to avoid constructing when SQL syntax is wrong close #2922 ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2923 from cxzl25/KYUUBI-2922. Closes #2922 42dbdf2 [sychen] fix leak Authored-by: sychen <sychen@ctrip.com> Signed-off-by: Shaoyun Chen <csy@apache.org>
1 parent 825c70d commit aba785f

File tree

2 files changed

+14
-7
lines changed

2 files changed

+14
-7
lines changed

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ class ExecuteStatement(
144144
}
145145

146146
override def cleanup(targetState: OperationState): Unit = {
147-
operationListener.foreach(spark.sparkContext.removeSparkListener(_))
147+
operationListener.foreach(_.cleanup())
148148
super.cleanup(targetState)
149149
}
150150

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,13 @@ class SQLOperationListener(
4444
spark: SparkSession) extends StatsReportListener with Logging {
4545

4646
private val operationId: String = operation.getHandle.identifier.toString
47-
private val activeJobs = new java.util.HashSet[Int]()
48-
private val activeStages = new java.util.HashSet[Int]()
47+
private lazy val activeJobs = new java.util.HashSet[Int]()
48+
private lazy val activeStages = new java.util.HashSet[Int]()
4949
private var executionId: Option[Long] = None
50-
private val liveStages = new ConcurrentHashMap[StageAttempt, StageInfo]()
50+
private lazy val liveStages = new ConcurrentHashMap[StageAttempt, StageInfo]()
5151

5252
private val conf: KyuubiConf = operation.getSession.sessionManager.getConf
53-
private val consoleProgressBar =
53+
private lazy val consoleProgressBar =
5454
if (conf.get(ENGINE_SPARK_SHOW_PROGRESS)) {
5555
Some(new SparkConsoleProgressBar(
5656
operation,
@@ -87,6 +87,7 @@ class SQLOperationListener(
8787
if (executionId.isEmpty) {
8888
executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY))
8989
.map(_.toLong)
90+
consoleProgressBar
9091
operation match {
9192
case executeStatement: ExecuteStatement =>
9293
executeStatement.setCompiledStateIfNeeded()
@@ -163,9 +164,15 @@ class SQLOperationListener(
163164
event match {
164165
case sqlExecutionEnd: SparkListenerSQLExecutionEnd
165166
if executionId.contains(sqlExecutionEnd.executionId) =>
166-
spark.sparkContext.removeSparkListener(this)
167-
consoleProgressBar.foreach(_.finish())
167+
cleanup()
168168
case _ =>
169169
}
170170
}
171+
172+
def cleanup(): Unit = {
173+
spark.sparkContext.removeSparkListener(this)
174+
if (executionId.isDefined) {
175+
consoleProgressBar.foreach(_.finish())
176+
}
177+
}
171178
}

0 commit comments

Comments
 (0)