Skip to content

Commit

Permalink
[KYUUBI #2403] [Improvement] move addTimeoutMonitor to AbstractOperat…
Browse files Browse the repository at this point in the history
…ion because it was used in multiple engines

### _Why are the changes needed?_

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #2466 from zhaomin1423/timeoutMonitor.

Closes #2403

3266a02 [Min Zhao] modify statementTimeoutCleaner access modifier
db73a42 [Min Zhao] add shutdownTimeoutMonitor
4f1d0e1 [Min Zhao] clear unused import
acda7e6 [Min Zhao] handle cleanup
eca266a [Min Zhao] [KYUUBI #2403] [Improvement] move addTimeoutMonitor to AbstractOperation because it was used in multiple engines
f525605 [Min Zhao] [KYUUBI #2403] [Improvement] move addTimeoutMonitor to AbstractOperation because it was used in multiple engines
8e7820b [Min Zhao] [KYUUBI #2403] [Improvement] move addTimeoutMonitor to AbstractOperation because it was used in multiple engines
9b19704 [Min Zhao] [KYUUBI #2403] [Improvement] move addTimeoutMonitor to AbstractOperation because it was used in multiple engines

Authored-by: Min Zhao <zhaomin1423@163.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
  • Loading branch information
zhaomin1423 authored and ulysses-you committed May 5, 2022
1 parent dcc3ccf commit 9e263d7
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 78 deletions.
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.engine.flink.operation

import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}
import java.util.concurrent.RejectedExecutionException

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
Expand All @@ -35,7 +35,6 @@ import org.apache.kyuubi.engine.flink.result.ResultSet
import org.apache.kyuubi.operation.{OperationState, OperationType}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.ThreadUtils

class ExecuteStatement(
session: Session,
Expand All @@ -48,8 +47,6 @@ class ExecuteStatement(
private val operationLog: OperationLog =
OperationLog.createOperationLog(session, getHandle)

private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None

override def getOperationLog: Option[OperationLog] = Option(operationLog)

@VisibleForTesting
Expand All @@ -70,7 +67,7 @@ class ExecuteStatement(
}

override protected def runInternal(): Unit = {
addTimeoutMonitor()
addTimeoutMonitor(queryTimeout)
if (shouldRunAsync) {
val asyncOperation = new Runnable {
override def run(): Unit = {
Expand Down Expand Up @@ -122,7 +119,7 @@ class ExecuteStatement(
} catch {
onError(cancel = true)
} finally {
statementTimeoutCleaner.foreach(_.shutdown())
shutdownTimeoutMonitor()
}
}

Expand Down Expand Up @@ -181,14 +178,4 @@ class ExecuteStatement(
warn(s"Failed to clean result set $resultId in session $sessionId", t)
}
}

private def addTimeoutMonitor(): Unit = {
if (queryTimeout > 0) {
val timeoutExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
val action: Runnable = () => cleanup(OperationState.TIMEOUT)
timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
statementTimeoutCleaner = Some(timeoutExecutor)
}
}
}
Expand Up @@ -31,7 +31,6 @@ import org.apache.kyuubi.engine.flink.schema.RowSet
import org.apache.kyuubi.engine.flink.session.FlinkSessionImpl
import org.apache.kyuubi.operation.{AbstractOperation, OperationState}
import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, FETCH_PRIOR, FetchOrientation}
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.OperationType.OperationType
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
Expand Down Expand Up @@ -111,13 +110,6 @@ abstract class FlinkOperation(

override def shouldRunAsync: Boolean = false

protected def cleanup(targetState: OperationState): Unit = state.synchronized {
if (!isTerminalState(state)) {
setState(targetState)
Option(getBackgroundHandle).foreach(_.cancel(true))
}
}

protected def onError(cancel: Boolean = false): PartialFunction[Throwable, Unit] = {
// We should use Throwable instead of Exception since `java.lang.NoClassDefFoundError`
// could be thrown.
Expand Down
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.engine.spark.operation

import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}
import java.util.concurrent.RejectedExecutionException

import scala.collection.JavaConverters._

Expand All @@ -34,7 +34,6 @@ import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator, O
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.ThreadUtils

class ExecuteStatement(
session: Session,
Expand All @@ -44,8 +43,6 @@ class ExecuteStatement(
incrementalCollect: Boolean)
extends SparkOperation(OperationType.EXECUTE_STATEMENT, session) with Logging {

private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None

private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)

Expand Down Expand Up @@ -99,12 +96,12 @@ class ExecuteStatement(
} catch {
onError(cancel = true)
} finally {
statementTimeoutCleaner.foreach(_.shutdown())
shutdownTimeoutMonitor()
}
}

override protected def runInternal(): Unit = {
addTimeoutMonitor()
addTimeoutMonitor(queryTimeout)
if (shouldRunAsync) {
val asyncOperation = new Runnable {
override def run(): Unit = {
Expand All @@ -130,22 +127,6 @@ class ExecuteStatement(
}
}

private def addTimeoutMonitor(): Unit = {
if (queryTimeout > 0) {
val timeoutExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
timeoutExecutor.schedule(
new Runnable {
override def run(): Unit = {
cleanup(OperationState.TIMEOUT)
}
},
queryTimeout,
TimeUnit.SECONDS)
statementTimeoutCleaner = Some(timeoutExecutor)
}
}

override def cleanup(targetState: OperationState): Unit = {
spark.sparkContext.removeSparkListener(operationListener)
super.cleanup(targetState)
Expand Down
Expand Up @@ -59,7 +59,7 @@ abstract class SparkOperation(opType: OperationType, session: Session)
override def redactedStatement: String =
redact(spark.sessionState.conf.stringRedactionPattern, statement)

protected def cleanup(targetState: OperationState): Unit = state.synchronized {
override def cleanup(targetState: OperationState): Unit = state.synchronized {
if (!isTerminalState(state)) {
setState(targetState)
Option(getBackgroundHandle).foreach(_.cancel(true))
Expand Down
Expand Up @@ -17,20 +17,15 @@

package org.apache.kyuubi.engine.trino.operation

import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}
import java.util.concurrent.RejectedExecutionException

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.Logging
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.trino.TrinoStatement
import org.apache.kyuubi.engine.trino.event.TrinoOperationEvent
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.operation.ArrayFetchIterator
import org.apache.kyuubi.operation.IterableFetchIterator
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator, OperationState, OperationType}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.ThreadUtils

class ExecuteStatement(
session: Session,
Expand All @@ -40,8 +35,6 @@ class ExecuteStatement(
incrementalCollect: Boolean)
extends TrinoOperation(OperationType.EXECUTE_STATEMENT, session) with Logging {

private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None

private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)

Expand All @@ -56,7 +49,7 @@ class ExecuteStatement(
}

override protected def runInternal(): Unit = {
addTimeoutMonitor()
addTimeoutMonitor(queryTimeout)
val trinoStatement = TrinoStatement(trinoContext, session.sessionManager.getConf, statement)
trino = trinoStatement.getTrinoClient
if (shouldRunAsync) {
Expand Down Expand Up @@ -101,17 +94,7 @@ class ExecuteStatement(
} catch {
onError(cancel = true)
} finally {
statementTimeoutCleaner.foreach(_.shutdown())
}
}

private def addTimeoutMonitor(): Unit = {
if (queryTimeout > 0) {
val timeoutExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
val action: Runnable = () => cleanup(OperationState.TIMEOUT)
timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
statementTimeoutCleaner = Some(timeoutExecutor)
shutdownTimeoutMonitor()
}
}

Expand Down
Expand Up @@ -37,7 +37,6 @@ import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
import org.apache.kyuubi.operation.FetchOrientation.FETCH_PRIOR
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.OperationType.OperationType
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
Expand Down Expand Up @@ -88,13 +87,6 @@ abstract class TrinoOperation(opType: OperationType, session: Session)
cleanup(OperationState.CANCELED)
}

protected def cleanup(targetState: OperationState): Unit = state.synchronized {
if (!isTerminalState(state)) {
setState(targetState)
Option(getBackgroundHandle).foreach(_.cancel(true))
}
}

override def close(): Unit = {
cleanup(OperationState.CLOSED)
try {
Expand Down
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.operation

import java.util.concurrent.Future
import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}

import org.apache.commons.lang3.StringUtils
import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TRowSet, TTableSchema}
Expand All @@ -29,6 +29,7 @@ import org.apache.kyuubi.operation.OperationState._
import org.apache.kyuubi.operation.OperationType.OperationType
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.ThreadUtils

abstract class AbstractOperation(opType: OperationType, session: Session)
extends Operation with Logging {
Expand All @@ -41,6 +42,29 @@ abstract class AbstractOperation(opType: OperationType, session: Session)

final private[kyuubi] val statementId = handle.identifier.toString

private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None

protected def cleanup(targetState: OperationState): Unit = state.synchronized {
if (!isTerminalState(state)) {
setState(targetState)
Option(getBackgroundHandle).foreach(_.cancel(true))
}
}

protected def addTimeoutMonitor(queryTimeout: Long): Unit = {
if (queryTimeout > 0) {
val timeoutExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
val action: Runnable = () => cleanup(OperationState.TIMEOUT)
timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
statementTimeoutCleaner = Some(timeoutExecutor)
}
}

protected def shutdownTimeoutMonitor(): Unit = {
statementTimeoutCleaner.foreach(_.shutdown())
}

override def getOperationLog: Option[OperationLog] = None

@volatile protected var state: OperationState = INITIALIZED
Expand Down

0 comments on commit 9e263d7

Please sign in to comment.