Skip to content

Commit 9fb6277

Browse files
zhaomin1423ulysses-you
authored andcommitted
[KYUUBI #2402] [Improvement] addTimeoutMonitor for trino engine when it run query async
### _Why are the changes needed?_ ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2404 from zhaomin1423/trino_async_addTimeoutMonitor. Closes #2402 5e08e67 [Min Zhao] [KYUUBI #2402] [Improvement] addTimeoutMonitor for trino engine when it run query async Authored-by: Min Zhao <zhaomin1423@163.com> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
1 parent 26d52fa commit 9fb6277

File tree

2 files changed

+20
-2
lines changed

2 files changed

+20
-2
lines changed

externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.kyuubi.engine.trino.operation
1919

20-
import java.util.concurrent.RejectedExecutionException
20+
import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}
2121

2222
import org.apache.kyuubi.KyuubiSQLException
2323
import org.apache.kyuubi.Logging
@@ -28,14 +28,18 @@ import org.apache.kyuubi.operation.OperationState
2828
import org.apache.kyuubi.operation.OperationType
2929
import org.apache.kyuubi.operation.log.OperationLog
3030
import org.apache.kyuubi.session.Session
31+
import org.apache.kyuubi.util.ThreadUtils
3132

3233
class ExecuteStatement(
3334
session: Session,
3435
override val statement: String,
3536
override val shouldRunAsync: Boolean,
37+
queryTimeout: Long,
3638
incrementalCollect: Boolean)
3739
extends TrinoOperation(OperationType.EXECUTE_STATEMENT, session) with Logging {
3840

41+
private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
42+
3943
private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
4044
override def getOperationLog: Option[OperationLog] = Option(operationLog)
4145

@@ -50,6 +54,7 @@ class ExecuteStatement(
5054
}
5155

5256
override protected def runInternal(): Unit = {
57+
addTimeoutMonitor()
5358
val trinoStatement = TrinoStatement(trinoContext, session.sessionManager.getConf, statement)
5459
trino = trinoStatement.getTrinoClient
5560
if (shouldRunAsync) {
@@ -93,6 +98,18 @@ class ExecuteStatement(
9398
setState(OperationState.FINISHED)
9499
} catch {
95100
onError(cancel = true)
101+
} finally {
102+
statementTimeoutCleaner.foreach(_.shutdown())
103+
}
104+
}
105+
106+
private def addTimeoutMonitor(): Unit = {
107+
if (queryTimeout > 0) {
108+
val timeoutExecutor =
109+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
110+
val action: Runnable = () => cleanup(OperationState.TIMEOUT)
111+
timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
112+
statementTimeoutCleaner = Some(timeoutExecutor)
96113
}
97114
}
98115
}

externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ class TrinoOperationManager extends OperationManager("TrinoOperationManager") {
3535
runAsync: Boolean,
3636
queryTimeout: Long): Operation = {
3737
val incrementalCollect = session.sessionManager.getConf.get(OPERATION_INCREMENTAL_COLLECT)
38-
val operation = new ExecuteStatement(session, statement, runAsync, incrementalCollect)
38+
val operation =
39+
new ExecuteStatement(session, statement, runAsync, queryTimeout, incrementalCollect)
3940
addOperation(operation)
4041
}
4142

0 commit comments

Comments
 (0)