Skip to content

Commit 03c84f8

Browse files
link3280yanghua
authored andcommitted
[KYUUBI #1838] Clean up query results after query operations finish
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> Query results would be cached in memory, and we should clean it up when all rows are fetched. ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> This is a sub-task of KPIP-2 #1322. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1859 from link3280/feature/KYUUBI-1838. Closes #1838 3cef076 [Paul Lin] [KYUUBI #1838] Improve logging message syntax c6253de [Paul Lin] [KYUUBI #1838] Log cleanup exceptions e0b1866 [Paul Lin] [KYUUBI #1838] Clean up query results after query operations finish Authored-by: Paul Lin <paullin3280@gmail.com> Signed-off-by: yanghua <yanghua1127@gmail.com>
1 parent 73c84d4 commit 03c84f8

File tree

1 file changed

+43
-25
lines changed

1 file changed

+43
-25
lines changed

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -119,33 +119,42 @@ class ExecuteStatement(
119119
}
120120

121121
private def runQueryOperation(operation: QueryOperation): Unit = {
122-
val resultDescriptor = executor.executeQuery(sessionId, operation)
123-
124-
val resultID = resultDescriptor.getResultId
125-
126-
val rows = new ArrayBuffer[Row]()
127-
var loop = true
128-
while (loop) {
129-
Thread.sleep(50) // slow the processing down
130-
131-
val result = executor.snapshotResult(sessionId, resultID, 2)
132-
result.getType match {
133-
case TypedResult.ResultType.PAYLOAD =>
134-
rows.clear()
135-
(1 to result.getPayload).foreach { page =>
136-
rows ++= executor.retrieveResultPage(resultID, page).asScala
137-
}
138-
case TypedResult.ResultType.EOS => loop = false
139-
case TypedResult.ResultType.EMPTY =>
122+
var resultId: String = null
123+
try {
124+
val resultDescriptor = executor.executeQuery(sessionId, operation)
125+
126+
resultId = resultDescriptor.getResultId
127+
128+
val rows = new ArrayBuffer[Row]()
129+
var loop = true
130+
131+
while (loop) {
132+
Thread.sleep(50) // slow the processing down
133+
134+
val result = executor.snapshotResult(sessionId, resultId, 2)
135+
result.getType match {
136+
case TypedResult.ResultType.PAYLOAD =>
137+
rows.clear()
138+
(1 to result.getPayload).foreach { page =>
139+
rows ++= executor.retrieveResultPage(resultId, page).asScala
140+
}
141+
case TypedResult.ResultType.EOS => loop = false
142+
case TypedResult.ResultType.EMPTY =>
143+
}
140144
}
141-
}
142145

143-
resultSet = ResultSet.builder
144-
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
145-
.columns(resultDescriptor.getResultSchema.getColumns)
146-
.data(rows.toArray[Row])
147-
.build
148-
setState(OperationState.FINISHED)
146+
resultSet = ResultSet.builder
147+
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
148+
.columns(resultDescriptor.getResultSchema.getColumns)
149+
.data(rows.toArray[Row])
150+
.build
151+
setState(OperationState.FINISHED)
152+
153+
} finally {
154+
if (resultId != null) {
155+
cleanupQueryResult(resultId)
156+
}
157+
}
149158
}
150159

151160
private def runSetOperation(setOperation: SetOperation): Unit = {
@@ -213,6 +222,15 @@ class ExecuteStatement(
213222
setState(OperationState.FINISHED)
214223
}
215224

225+
private def cleanupQueryResult(resultId: String): Unit = {
226+
try {
227+
executor.cancelQuery(sessionId, resultId)
228+
} catch {
229+
case t: Throwable =>
230+
warn(s"Failed to clean result set $resultId in session $sessionId", t)
231+
}
232+
}
233+
216234
private def addTimeoutMonitor(): Unit = {
217235
if (queryTimeout > 0) {
218236
val timeoutExecutor =

0 commit comments

Comments
 (0)