Skip to content

Commit e0eeab0

Browse files
turboFeipan3793
authored andcommitted
[KYUUBI #1710] Support to specify OPERATION_LANGUAGE with TExecuteStatementReq confOverlay
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Now kyuubi supports SQL and SCALA language. Now the way to switch OPERATION_LANAUAGE: ``` // for SCALA spark.sql("set kyuubi.operation.language=SCALA") // for SQL set kyuubi.operation.language=SCALA ``` It is more user friendly that user can specify the OPERATION_LANGUAGE for each ExecuteStatement. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1710 from turboFei/conf_overlay. Closes #1710 385264d [Fei Wang] fix ut 58fbdbd [Fei Wang] fix npe 2718fb4 [Fei Wang] Support to get OPERATION_LANGUAGE from execute statement confOverlay Authored-by: Fei Wang <fwang12@ebay.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 58dd4ca commit e0eeab0

File tree

17 files changed

+68
-13
lines changed

17 files changed

+68
-13
lines changed

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
2727
override def newExecuteStatementOperation(
2828
session: Session,
2929
statement: String,
30+
confOverlay: Map[String, String],
3031
runAsync: Boolean,
3132
queryTimeout: Long): Operation = {
3233
val op = new ExecuteStatement(session, statement, runAsync, queryTimeout)

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,12 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
4848
override def newExecuteStatementOperation(
4949
session: Session,
5050
statement: String,
51+
confOverlay: Map[String, String],
5152
runAsync: Boolean,
5253
queryTimeout: Long): Operation = {
5354
val spark = session.asInstanceOf[SparkSessionImpl].spark
54-
val lang = spark.conf.get(OPERATION_LANGUAGE.key, operationLanguageDefault)
55+
val lang = confOverlay.get(OPERATION_LANGUAGE.key)
56+
.getOrElse(spark.conf.get(OPERATION_LANGUAGE.key, operationLanguageDefault))
5557
val operation =
5658
OperationLanguages.withName(lang.toUpperCase(Locale.ROOT)) match {
5759
case OperationLanguages.SQL =>

kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
4646
def newExecuteStatementOperation(
4747
session: Session,
4848
statement: String,
49+
confOverlay: Map[String, String],
4950
runAsync: Boolean,
5051
queryTimeout: Long): Operation
5152
def newGetTypeInfoOperation(session: Session): Operation

kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,14 @@ abstract class AbstractBackendService(name: String)
5656
override def executeStatement(
5757
sessionHandle: SessionHandle,
5858
statement: String,
59+
confOverlay: Map[String, String],
5960
runAsync: Boolean,
6061
queryTimeout: Long): OperationHandle = {
61-
sessionManager.getSession(sessionHandle).executeStatement(statement, runAsync, queryTimeout)
62+
sessionManager.getSession(sessionHandle).executeStatement(
63+
statement,
64+
confOverlay,
65+
runAsync,
66+
queryTimeout)
6267
}
6368

6469
override def getTypeInfo(sessionHandle: SessionHandle): OperationHandle = {

kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ trait BackendService {
4848
def executeStatement(
4949
sessionHandle: SessionHandle,
5050
statement: String,
51+
confOverlay: Map[String, String],
5152
runAsync: Boolean,
5253
queryTimeout: Long): OperationHandle
5354

kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,9 +261,14 @@ abstract class ThriftBinaryFrontendService(name: String)
261261
val sessionHandle = SessionHandle(req.getSessionHandle)
262262
val statement = req.getStatement
263263
val runAsync = req.isRunAsync
264-
// val confOverlay = req.getConfOverlay
264+
val confOverlay = Option(req.getConfOverlay).getOrElse(Map.empty.asJava)
265265
val queryTimeout = req.getQueryTimeout
266-
val operationHandle = be.executeStatement(sessionHandle, statement, runAsync, queryTimeout)
266+
val operationHandle = be.executeStatement(
267+
sessionHandle,
268+
statement,
269+
confOverlay.asScala.toMap,
270+
runAsync,
271+
queryTimeout)
267272
resp.setOperationHandle(operationHandle.toTOperationHandle)
268273
resp.setStatus(OK_STATUS)
269274
} catch {

kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,11 @@ abstract class AbstractSession(
115115

116116
override def executeStatement(
117117
statement: String,
118+
confOverlay: Map[String, String],
118119
runAsync: Boolean,
119120
queryTimeout: Long): OperationHandle = withAcquireRelease() {
120121
val operation = sessionManager.operationManager
121-
.newExecuteStatementOperation(this, statement, runAsync, queryTimeout)
122+
.newExecuteStatementOperation(this, statement, confOverlay, runAsync, queryTimeout)
122123
runOperation(operation)
123124
}
124125

kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,11 @@ trait Session {
4747

4848
def getInfo(infoType: TGetInfoType): TGetInfoValue
4949

50-
def executeStatement(statement: String, runAsync: Boolean, queryTimeout: Long): OperationHandle
50+
def executeStatement(
51+
statement: String,
52+
confOverlay: Map[String, String],
53+
runAsync: Boolean,
54+
queryTimeout: Long): OperationHandle
5155

5256
def getTableTypes: OperationHandle
5357
def getTypeInfo: OperationHandle

kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class NoopOperationManager extends OperationManager("noop") {
3131
override def newExecuteStatementOperation(
3232
session: Session,
3333
statement: String,
34+
confOverlay: Map[String, String],
3435
runAsync: Boolean,
3536
queryTimeout: Long): Operation = {
3637
val operation =

kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,13 @@ class KyuubiSyncThriftClient private (protocol: TProtocol)
9393

9494
def executeStatement(
9595
statement: String,
96+
confOverlay: Map[String, String],
9697
shouldRunAsync: Boolean,
9798
queryTimeout: Long): TOperationHandle = {
9899
val req = new TExecuteStatementReq()
99100
req.setSessionHandle(_remoteSessionHandle)
100101
req.setStatement(statement)
102+
req.setConfOverlay(confOverlay.asJava)
101103
req.setRunAsync(shouldRunAsync)
102104
req.setQueryTimeout(queryTimeout)
103105
val resp = withLockAcquired(ExecuteStatement(req))

0 commit comments

Comments
 (0)