Skip to content

Commit b792b96

Browse files
link3280pan3793
authored andcommitted
[KYUUBI #3549] Support query id in Flink engine
Flink engine now doesn't support query id. This PR fixes the problem. - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #3550 from link3280/KYUUBI-3549. Closes #3549 4f583cb [Paul Lin] Update externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b8808d7 [Paul Lin] [KYUUBI #3549] Simplify code a68bc59 [Paul Lin] [KYUUBI #3549] Fix typo in the comments e6217db [Paul Lin] [KYUUBI #3549] Support query id in Flink engine Authored-by: Paul Lin <paullin3280@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 2a97616 commit b792b96

File tree

3 files changed

+27
-2
lines changed

3 files changed

+27
-2
lines changed

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.util
2323
import scala.collection.JavaConverters._
2424
import scala.collection.mutable.ArrayBuffer
2525

26+
import org.apache.flink.api.common.JobID
2627
import org.apache.flink.table.api.ResultKind
2728
import org.apache.flink.table.client.gateway.TypedResult
2829
import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData}
@@ -54,6 +55,8 @@ class ExecuteStatement(
5455
private val operationLog: OperationLog =
5556
OperationLog.createOperationLog(session, getHandle)
5657

58+
var jobId: Option[JobID] = None
59+
5760
override def getOperationLog: Option[OperationLog] = Option(operationLog)
5861

5962
override protected def beforeRun(): Unit = {
@@ -152,6 +155,7 @@ class ExecuteStatement(
152155

153156
private def runOperation(operation: Operation): Unit = {
154157
val result = executor.executeOperation(sessionId, operation)
158+
jobId = result.getJobClient.asScala.map(_.getJobID)
155159
result.await()
156160
resultSet = ResultSet.fromTableResult(result)
157161
}

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,13 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
180180
}
181181

182182
override def getQueryId(operation: Operation): String = {
183-
throw KyuubiSQLException.featureNotSupported()
183+
// return empty string instead of null if there's no query id
184+
// otherwise there would be TTransportException
185+
operation match {
186+
case exec: ExecuteStatement => exec.jobId.map(_.toHexString).getOrElse("")
187+
case _: PlanOnlyStatement => ""
188+
case _ =>
189+
throw new IllegalStateException(s"Unsupported Flink operation class $classOf[operation].")
190+
}
184191
}
185192
}

externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ package org.apache.kyuubi.engine.flink.operation
1919

2020
import java.nio.file.Files
2121
import java.sql.DatabaseMetaData
22-
import java.util.UUID
22+
import java.util.{Properties, UUID}
2323

2424
import scala.collection.JavaConverters._
2525

26+
import org.apache.flink.api.common.JobID
2627
import org.apache.flink.table.types.logical.LogicalTypeRoot
2728
import org.apache.hive.service.rpc.thrift._
2829
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -34,6 +35,7 @@ import org.apache.kyuubi.engine.flink.FlinkEngineUtils._
3435
import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
3536
import org.apache.kyuubi.engine.flink.result.Constants
3637
import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
38+
import org.apache.kyuubi.jdbc.hive.{KyuubiConnection, KyuubiStatement}
3739
import org.apache.kyuubi.operation.HiveJDBCTestHelper
3840
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
3941
import org.apache.kyuubi.service.ServiceState._
@@ -974,4 +976,16 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
974976
assertDefaultDatabase(client, "default_database", true)
975977
}
976978
}
979+
980+
test("get query id") {
981+
val conn = new KyuubiConnection(jdbcUrl, new Properties())
982+
val stmt = conn.createStatement()
983+
stmt.executeQuery("create table tbl_a (a int) with ('connector' = 'blackhole')")
984+
assert(stmt.asInstanceOf[KyuubiStatement].getQueryId === null)
985+
stmt.executeQuery("insert into tbl_a values (1)")
986+
val queryId = stmt.asInstanceOf[KyuubiStatement].getQueryId
987+
assert(queryId !== null)
988+
// parse the string to check if it's valid Flink job id
989+
assert(JobID.fromHexString(queryId) !== null)
990+
}
977991
}

0 commit comments

Comments
 (0)