Skip to content

Commit b41be9e

Browse files
yangrong688yaooqinn
authored andcommitted
[KYUUBI #2020] [Subtask] Hive Backend Engine - new APIs with hive-service-rpc 3.1.2 - TGetQueryId
### _Why are the changes needed?_ Hive Backend Engine - new APIs with hive-service-rpc 3.1.2 - TGetQueryId ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2382 from yangrong688/KYUUBI-2020. Closes #2020 97eccc0 [yangrong688] fix spotless check efe0116 [yangrong688] add javadoc and add returns null if stmtHandle is null a9d04e6 [yangrong688] fix ac0dff2 [yangrong688] fix 3e93580 [yangrong688] skip test when java version beyond JAVA_1_8 2a777e2 [yangrong688] update getQueryId test case ec12d2a [yangrong688] [feat] complete TGetQueryId op, should refined test case later 2996579 [yangrong] [feat] init operation getQueryId, just structure, need to improve quickly Lead-authored-by: yangrong688 <yangrong.jxufe@gmail.com> Co-authored-by: yangrong <yangrong.jxufe@gmail.com> Signed-off-by: Kent Yao <yao@apache.org>
1 parent 06da8cf commit b41be9e

File tree

16 files changed

+114
-14
lines changed

16 files changed

+114
-14
lines changed

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,4 +144,8 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
144144
foreignTable: String): Operation = {
145145
throw KyuubiSQLException.featureNotSupported()
146146
}
147+
148+
override def getQueryId(operation: Operation): String = {
149+
throw KyuubiSQLException.featureNotSupported()
150+
}
147151
}

externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.List
2222

2323
import scala.collection.JavaConverters._
2424

25+
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
2526
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
2627
import org.apache.hive.service.cli.{RowSetFactory, TableSchema}
2728
import org.apache.hive.service.rpc.thrift.TRowSet
@@ -161,4 +162,10 @@ class HiveOperationManager() extends OperationManager("HiveOperationManager") {
161162

162163
rowSet.toTRowSet
163164
}
165+
166+
override def getQueryId(operation: Operation): String = {
167+
val hiveOperation = operation.asInstanceOf[HiveOperation]
168+
val internalHiveOperation = hiveOperation.internalHiveOperation
169+
internalHiveOperation.getParentSession.getHiveConf.getVar(ConfVars.HIVEQUERYID)
170+
}
164171
}

externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
package org.apache.kyuubi.engine.hive.operation
1919

20+
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
21+
2022
import org.apache.kyuubi.{HiveEngineTests, Utils}
2123
import org.apache.kyuubi.engine.hive.HiveSQLEngine
24+
import org.apache.kyuubi.jdbc.hive.KyuubiStatement
2225

2326
class HiveOperationSuite extends HiveEngineTests {
2427

@@ -35,4 +38,15 @@ class HiveOperationSuite extends HiveEngineTests {
3538
override protected def jdbcUrl: String = {
3639
"jdbc:hive2://" + HiveSQLEngine.currentEngine.get.frontendServices.head.connectionUrl + "/;"
3740
}
41+
42+
test("test get query id") {
43+
assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_1_8))
44+
withJdbcStatement("hive_engine_test") { statement =>
45+
statement.execute("CREATE TABLE hive_engine_test(id int, value string) stored as orc")
46+
statement.execute("INSERT INTO hive_engine_test SELECT 1, '2'")
47+
statement.executeQuery("SELECT ID, VALUE FROM hive_engine_test")
48+
val kyuubiStatement = statement.asInstanceOf[KyuubiStatement]
49+
assert(kyuubiStatement.getQueryId != null)
50+
}
51+
}
3852
}

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,8 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
151151
foreignTable: String): Operation = {
152152
throw KyuubiSQLException.featureNotSupported()
153153
}
154+
155+
override def getQueryId(operation: Operation): String = {
156+
throw KyuubiSQLException.featureNotSupported()
157+
}
154158
}

externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,4 +116,8 @@ class TrinoOperationManager extends OperationManager("TrinoOperationManager") {
116116
foreignTable: String): Operation = {
117117
throw KyuubiSQLException.featureNotSupported()
118118
}
119+
120+
override def getQueryId(operation: Operation): String = {
121+
throw KyuubiSQLException.featureNotSupported()
122+
}
119123
}

kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
8585
foreignCatalog: String,
8686
foreignSchema: String,
8787
foreignTable: String): Operation
88+
def getQueryId(operation: Operation): String
8889

8990
final def addOperation(operation: Operation): Operation = synchronized {
9091
handleToOperation.put(operation.getHandle, operation)

kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,12 @@ abstract class AbstractBackendService(name: String)
150150
foreignTable)
151151
}
152152

153+
override def getQueryId(operationHandle: OperationHandle): String = {
154+
val operation = sessionManager.operationManager.getOperation(operationHandle)
155+
val queryId = sessionManager.operationManager.getQueryId(operation)
156+
queryId
157+
}
158+
153159
override def getOperationStatus(operationHandle: OperationHandle): OperationStatus = {
154160
val operation = sessionManager.operationManager.getOperation(operationHandle)
155161
if (operation.shouldRunAsync) {

kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ trait BackendService {
8989
foreignCatalog: String,
9090
foreignSchema: String,
9191
foreignTable: String): OperationHandle
92+
def getQueryId(operationHandle: OperationHandle): String
9293

9394
def getOperationStatus(operationHandle: OperationHandle): OperationStatus
9495
def cancelOperation(operationHandle: OperationHandle): Unit

kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,8 @@ abstract class TFrontendService(name: String)
529529
override def GetQueryId(req: TGetQueryIdReq): TGetQueryIdResp = {
530530
debug(req.toString)
531531
val resp = new TGetQueryIdResp
532+
val queryId = be.getQueryId(OperationHandle(req.getOperationHandle))
533+
resp.setQueryId(queryId)
532534
resp
533535
}
534536

kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,12 @@ abstract class AbstractSession(
200200
runOperation(operation)
201201
}
202202

203+
override def getQueryId(operationHandle: OperationHandle): String = {
204+
val operation = sessionManager.operationManager.getOperation(operationHandle)
205+
val queryId = sessionManager.operationManager.getQueryId(operation)
206+
queryId
207+
}
208+
203209
override def cancelOperation(operationHandle: OperationHandle): Unit = withAcquireRelease() {
204210
sessionManager.operationManager.cancelOperation(operationHandle)
205211
}

0 commit comments

Comments
 (0)