diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala index 6c2d3b1e09d..ec2b0701445 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala @@ -81,6 +81,19 @@ abstract class JdbcDialect extends SupportServiceLoader with Logging { def getTRowSetGenerator(): JdbcTRowSetGenerator def getSchemaHelper(): SchemaHelper + + def cancelStatement(jdbcStatement: Statement): Unit = { + if (jdbcStatement != null) { + jdbcStatement.cancel() + jdbcStatement.close() + } + } + + def closeStatement(jdbcStatement: Statement): Unit = { + if (jdbcStatement != null) { + jdbcStatement.close() + } + } } object JdbcDialects extends Logging { diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala index d75c7f408cf..3d611448f20 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala @@ -120,12 +120,31 @@ class ExecuteStatement( super.validateFetchOrientation(order) } + override def cancel(): Unit = withLockRequired { + if (!isTerminalState(state)) { + setState(OperationState.CANCELED) + // TODO: If `shouldRunAsync` is true, the statement is initialized lazily. + // When a SQL is submitted and immediately canceled, `jdbcStatement` may still be null, + // which can lead to the cancellation not taking effect. + if (jdbcStatement != null) { + dialect.cancelStatement(jdbcStatement) + jdbcStatement = null + } else { + warn(s"Ignore cancel operation $statementId due to jdbcStatement is null.") + } + } + } + override def cleanup(targetState: OperationState): Unit = withLockRequired { try { super.cleanup(targetState) } finally { - if (jdbcStatement != null && !jdbcStatement.isClosed) { - jdbcStatement.close() + if (jdbcStatement != null) { + if (targetState == OperationState.CANCELED) { + dialect.cancelStatement(jdbcStatement) + } else { + dialect.closeStatement(jdbcStatement) + } jdbcStatement = null } } diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala index 555725944c0..450d79b0346 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala @@ -62,10 +62,6 @@ abstract class JdbcOperation(session: Session) extends AbstractOperation(session resp } - override def cancel(): Unit = { - cleanup(OperationState.CANCELED) - } - override def close(): Unit = { cleanup(OperationState.CLOSED) } diff --git a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/OperationWithEngineSuite.scala b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/OperationWithEngineSuite.scala index b8264c06992..0b898483056 100644 --- a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/OperationWithEngineSuite.scala +++ b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/OperationWithEngineSuite.scala @@ -16,10 +16,14 @@ */ package org.apache.kyuubi.engine.jdbc.mysql +import org.scalatest.concurrent.TimeLimits.failAfter +import org.scalatest.time.SpanSugar.convertIntToGrainOfTime + import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider import org.apache.kyuubi.operation.HiveJDBCTestHelper import org.apache.kyuubi.shaded.hive.service.rpc.thrift._ +import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TOperationState.{CANCELED_STATE, RUNNING_STATE} class OperationWithEngineSuite extends MySQLOperationSuite with HiveJDBCTestHelper { @@ -75,4 +79,25 @@ class OperationWithEngineSuite extends MySQLOperationSuite with HiveJDBCTestHelp assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) } } + + test("MySQL - JDBC ExecuteStatement cancel operation should kill SQL statement") { + failAfter(20.seconds) { + withSessionHandle { (client, handle) => + val executeReq = new TExecuteStatementReq() + executeReq.setSessionHandle(handle) + // The SQL will sleep 120s + executeReq.setStatement("SELECT sleep(120)") + executeReq.setRunAsync(true) + val executeResp = client.ExecuteStatement(executeReq) + assert(executeResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + + val operationHandle = executeResp.getOperationHandle + waitForOperationStatusIn(client, operationHandle, Set(RUNNING_STATE)) + + val cancelResp = client.CancelOperation(new TCancelOperationReq(operationHandle)) + assert(cancelResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + waitForOperationStatusIn(client, operationHandle, Set(CANCELED_STATE)) + } + } + } } diff --git a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/StarRocksOperationWithEngineSuite.scala b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/StarRocksOperationWithEngineSuite.scala index acbc028f89d..49cc52b41a9 100644 --- a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/StarRocksOperationWithEngineSuite.scala +++ b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/StarRocksOperationWithEngineSuite.scala @@ -16,10 +16,15 @@ */ package org.apache.kyuubi.engine.jdbc.starrocks +import scala.concurrent.duration.DurationInt + +import org.scalatest.concurrent.TimeLimits.failAfter + import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider import org.apache.kyuubi.operation.HiveJDBCTestHelper import org.apache.kyuubi.shaded.hive.service.rpc.thrift._ +import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TOperationState.{CANCELED_STATE, RUNNING_STATE} class StarRocksOperationWithEngineSuite extends StarRocksOperationSuite with HiveJDBCTestHelper { @@ -75,4 +80,25 @@ class StarRocksOperationWithEngineSuite extends StarRocksOperationSuite with Hiv assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) } } + + test("StarRocks - JDBC ExecuteStatement cancel operation should kill SQL statement") { + failAfter(20.seconds) { + withSessionHandle { (client, handle) => + val executeReq = new TExecuteStatementReq() + executeReq.setSessionHandle(handle) + // The SQL will sleep 120s + executeReq.setStatement("SELECT sleep(120)") + executeReq.setRunAsync(true) + val executeResp = client.ExecuteStatement(executeReq) + assert(executeResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + + val operationHandle = executeResp.getOperationHandle + waitForOperationStatusIn(client, operationHandle, Set(RUNNING_STATE)) + + val cancelResp = client.CancelOperation(new TCancelOperationReq(operationHandle)) + assert(cancelResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + waitForOperationStatusIn(client, operationHandle, Set(CANCELED_STATE)) + } + } + } } diff --git a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/WithStarRocksContainer.scala b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/WithStarRocksContainer.scala index 9c229a636cb..8aa90e7861f 100644 --- a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/WithStarRocksContainer.scala +++ b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/WithStarRocksContainer.scala @@ -26,7 +26,7 @@ import org.apache.kyuubi.engine.jdbc.WithJdbcServerContainer trait WithStarRocksContainer extends WithJdbcServerContainer { - private val starrocksDockerImage = "starrocks/allin1-ubuntu:3.1.6" + private val starrocksDockerImage = "starrocks/allin1-ubuntu:3.3.13" private val STARROCKS_FE_MYSQL_PORT = 9030 private val STARROCKS_FE_HTTP_PORT = 8030 @@ -47,7 +47,7 @@ trait WithStarRocksContainer extends WithJdbcServerContainer { .withStrategy(Wait.forListeningPorts(ports: _*)) .withStrategy(forLogMessage(".*broker service already added into FE service.*", 1)) .withStrategy( - forLogMessage(".*Enjoy the journal to StarRocks blazing-fast lake-house engine.*", 1))) + forLogMessage(".*Enjoy the journey to StarRocks blazing-fast lake-house engine.*", 1))) protected def feJdbcUrl: String = withContainers { container => val queryServerHost: String = container.host diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala index 02cb9a00307..8d89cd9784d 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala @@ -124,11 +124,27 @@ trait HiveJDBCTestHelper extends JDBCTestHelper { } def waitForOperationToComplete(client: Iface, op: TOperationHandle): Unit = { - val req = new TGetOperationStatusReq(op) - var state = client.GetOperationStatus(req).getOperationState - eventually(timeout(90.seconds), interval(100.milliseconds)) { - state = client.GetOperationStatus(req).getOperationState - assert(!Set(INITIALIZED_STATE, PENDING_STATE, RUNNING_STATE).contains(state)) + waitForOperationStatusIn( + client, + op, + Set( + FINISHED_STATE, + CANCELED_STATE, + CLOSED_STATE, + ERROR_STATE, + UKNOWN_STATE, + TIMEDOUT_STATE), + timeoutMs = 90000) + } + + def waitForOperationStatusIn( + client: Iface, + op: TOperationHandle, + status: Set[TOperationState], + timeoutMs: Int = 5000): Unit = { + eventually(timeout(timeoutMs.milliseconds), interval(100.milliseconds)) { + val state = client.GetOperationStatus(new TGetOperationStatusReq(op)).getOperationState + assert(status.contains(state)) } }