Skip to content

Commit 47b0fec

Browse files
SteNicholasyanghua
authored andcommitted
[KYUUBI #1867] Support PlanOnlyStatement operation like Spark SQL engine
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ Support `PlanOnlyStatement` operation like Spark SQL engine. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1891 from SteNicholas/KYUUBI-1867. Closes #1867 beacaf7 [SteNicholas] [KYUUBI #1867] Bugfix FlinkSQLSessionManager should open Flink session f379707 [SteNicholas] [KYUUBI #1867] Support PlanOnlyStatement operation like Spark SQL engine Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: yanghua <yanghua1127@gmail.com>
1 parent fd7a729 commit 47b0fec

File tree

10 files changed

+287
-76
lines changed

10 files changed

+287
-76
lines changed

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala

Lines changed: 6 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,14 @@
1717

1818
package org.apache.kyuubi.engine.flink.operation
1919

20-
import java.util
2120
import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}
2221

2322
import scala.collection.JavaConverters._
2423
import scala.collection.mutable.ArrayBuffer
2524

2625
import com.google.common.annotations.VisibleForTesting
2726
import org.apache.calcite.rel.metadata.{DefaultRelMetadataProvider, JaninoRelMetadataProvider, RelMetadataQueryBase}
28-
import org.apache.flink.table.api.{DataTypes, ResultKind}
29-
import org.apache.flink.table.catalog.Column
27+
import org.apache.flink.table.api.ResultKind
3028
import org.apache.flink.table.client.gateway.{Executor, TypedResult}
3129
import org.apache.flink.table.operations.{Operation, QueryOperation}
3230
import org.apache.flink.table.operations.command.{ResetOperation, SetOperation}
@@ -107,10 +105,13 @@ class ExecuteStatement(
107105
val operation = executor.parseStatement(sessionId, statement)
108106
operation match {
109107
case queryOperation: QueryOperation => runQueryOperation(queryOperation)
110-
case setOperation: SetOperation => runSetOperation(setOperation)
111-
case resetOperation: ResetOperation => runResetOperation(resetOperation)
108+
case setOperation: SetOperation =>
109+
resultSet = ResultSetUtil.runSetOperation(setOperation, executor, sessionId)
110+
case resetOperation: ResetOperation =>
111+
resultSet = ResultSetUtil.runResetOperation(resetOperation, executor, sessionId)
112112
case operation: Operation => runOperation(operation)
113113
}
114+
setState(OperationState.FINISHED)
114115
} catch {
115116
onError(cancel = true)
116117
} finally {
@@ -148,78 +149,17 @@ class ExecuteStatement(
148149
.columns(resultDescriptor.getResultSchema.getColumns)
149150
.data(rows.toArray[Row])
150151
.build
151-
setState(OperationState.FINISHED)
152-
153152
} finally {
154153
if (resultId != null) {
155154
cleanupQueryResult(resultId)
156155
}
157156
}
158157
}
159158

160-
private def runSetOperation(setOperation: SetOperation): Unit = {
161-
if (setOperation.getKey.isPresent) {
162-
val key: String = setOperation.getKey.get.trim
163-
164-
if (setOperation.getValue.isPresent) {
165-
val newValue: String = setOperation.getValue.get.trim
166-
executor.setSessionProperty(sessionId, key, newValue)
167-
}
168-
169-
val value = executor.getSessionConfigMap(sessionId).getOrDefault(key, "")
170-
resultSet = ResultSet.builder
171-
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
172-
.columns(
173-
Column.physical("key", DataTypes.STRING()),
174-
Column.physical("value", DataTypes.STRING()))
175-
.data(Array(Row.of(key, value)))
176-
.build
177-
} else {
178-
// show all properties if set without key
179-
val properties: util.Map[String, String] = executor.getSessionConfigMap(sessionId)
180-
181-
val entries = ArrayBuffer.empty[Row]
182-
properties.forEach((key, value) => entries.append(Row.of(key, value)))
183-
184-
if (entries.nonEmpty) {
185-
val prettyEntries = entries.sortBy(_.getField(0).asInstanceOf[String])
186-
resultSet = ResultSet.builder
187-
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
188-
.columns(
189-
Column.physical("key", DataTypes.STRING()),
190-
Column.physical("value", DataTypes.STRING()))
191-
.data(prettyEntries.toArray)
192-
.build
193-
} else {
194-
resultSet = ResultSet.builder
195-
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
196-
.columns(
197-
Column.physical("key", DataTypes.STRING()),
198-
Column.physical("value", DataTypes.STRING()))
199-
.data(Array[Row]())
200-
.build
201-
}
202-
}
203-
setState(OperationState.FINISHED)
204-
}
205-
206-
private def runResetOperation(resetOperation: ResetOperation): Unit = {
207-
if (resetOperation.getKey.isPresent) {
208-
// reset the given property
209-
executor.resetSessionProperty(sessionId, resetOperation.getKey.get())
210-
} else {
211-
// reset all properties
212-
executor.resetSessionProperties(sessionId)
213-
}
214-
resultSet = ResultSetUtil.successResultSet
215-
setState(OperationState.FINISHED)
216-
}
217-
218159
private def runOperation(operation: Operation): Unit = {
219160
val result = executor.executeOperation(sessionId, operation)
220161
result.await()
221162
resultSet = ResultSet.fromTableResult(result)
222-
setState(OperationState.FINISHED)
223163
}
224164

225165
private def cleanupQueryResult(resultId: String): Unit = {

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,37 @@
1818
package org.apache.kyuubi.engine.flink.operation
1919

2020
import java.util
21+
import java.util.Locale
2122

22-
import scala.collection.JavaConverters.asScalaBufferConverter
23+
import scala.collection.JavaConverters._
2324

25+
import org.apache.kyuubi.config.KyuubiConf._
26+
import org.apache.kyuubi.config.KyuubiConf.OperationModes._
2427
import org.apache.kyuubi.engine.flink.result.Constants
28+
import org.apache.kyuubi.engine.flink.session.FlinkSessionImpl
2529
import org.apache.kyuubi.operation.{Operation, OperationManager}
2630
import org.apache.kyuubi.session.Session
2731

2832
class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManager") {
2933

34+
private lazy val operationModeDefault = getConf.get(OPERATION_PLAN_ONLY)
35+
3036
override def newExecuteStatementOperation(
3137
session: Session,
3238
statement: String,
3339
confOverlay: Map[String, String],
3440
runAsync: Boolean,
3541
queryTimeout: Long): Operation = {
36-
val op = new ExecuteStatement(session, statement, runAsync, queryTimeout)
42+
val flinkSession = session.asInstanceOf[FlinkSessionImpl]
43+
val mode = flinkSession.sessionContext.getConfigMap.getOrDefault(
44+
OPERATION_PLAN_ONLY.key,
45+
operationModeDefault)
46+
val op = OperationModes.withName(mode.toUpperCase(Locale.ROOT)) match {
47+
case NONE =>
48+
new ExecuteStatement(session, statement, runAsync, queryTimeout)
49+
case mode =>
50+
new PlanOnlyStatement(session, statement, mode)
51+
}
3752
addOperation(op)
3853
}
3954

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.flink.operation
19+
20+
import org.apache.flink.table.api.TableEnvironment
21+
import org.apache.flink.table.operations.command.{ResetOperation, SetOperation}
22+
23+
import org.apache.kyuubi.KyuubiSQLException
24+
import org.apache.kyuubi.config.KyuubiConf.OperationModes._
25+
import org.apache.kyuubi.engine.flink.result.ResultSetUtil
26+
import org.apache.kyuubi.operation.OperationType
27+
import org.apache.kyuubi.operation.log.OperationLog
28+
import org.apache.kyuubi.session.Session
29+
30+
/**
31+
* Perform the statement parsing, analyzing or optimizing only without executing it
32+
*/
33+
class PlanOnlyStatement(
34+
session: Session,
35+
override val statement: String,
36+
mode: OperationMode)
37+
extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) {
38+
39+
private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
40+
override def getOperationLog: Option[OperationLog] = Option(operationLog)
41+
42+
override protected def runInternal(): Unit = {
43+
try {
44+
val operation = executor.parseStatement(sessionId, statement)
45+
operation match {
46+
case setOperation: SetOperation =>
47+
resultSet = ResultSetUtil.runSetOperation(setOperation, executor, sessionId)
48+
case resetOperation: ResetOperation =>
49+
resultSet = ResultSetUtil.runResetOperation(resetOperation, executor, sessionId)
50+
case _ => explainOperation(statement)
51+
}
52+
} catch {
53+
onError()
54+
}
55+
}
56+
57+
private def explainOperation(statement: String): Unit = {
58+
val tableEnv: TableEnvironment = sessionContext.getExecutionContext.getTableEnvironment
59+
mode match {
60+
case PARSE =>
61+
val sqlPlan = tableEnv.explainSql(statement)
62+
resultSet =
63+
ResultSetUtil.stringListToResultSet(
64+
List(sqlPlan.split(System.lineSeparator()).apply(1)),
65+
"plan")
66+
case _ =>
67+
throw KyuubiSQLException(
68+
s"""
69+
|The operation mode ${mode.toString} doesn't support in Flink SQL engine.
70+
|Flink only supports the AST and the execution plan of the sql statement.
71+
|Flink engine will support EXECUTION operation plan mode in future.
72+
|""".stripMargin)
73+
}
74+
}
75+
}

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,15 @@
1717

1818
package org.apache.kyuubi.engine.flink.result;
1919

20+
import java.util
21+
22+
import scala.collection.mutable.ArrayBuffer
23+
2024
import org.apache.flink.table.api.DataTypes
2125
import org.apache.flink.table.api.ResultKind
2226
import org.apache.flink.table.catalog.Column
27+
import org.apache.flink.table.client.gateway.Executor
28+
import org.apache.flink.table.operations.command.{ResetOperation, SetOperation}
2329
import org.apache.flink.types.Row
2430

2531
/** Utility object for building ResultSet. */
@@ -54,4 +60,82 @@ object ResultSetUtil {
5460
.columns(Column.physical("result", DataTypes.STRING))
5561
.data(Array[Row](Row.of("OK")))
5662
.build
63+
64+
/**
65+
* Runs a SetOperation with executor. Returns when SetOperation is executed successfully.
66+
*
67+
* @param setOperation Set operation.
68+
* @param executor A gateway for communicating with Flink and other external systems.
69+
* @param sessionId Id of the session.
70+
* @return A ResultSet of SetOperation execution.
71+
*/
72+
def runSetOperation(
73+
setOperation: SetOperation,
74+
executor: Executor,
75+
sessionId: String): ResultSet = {
76+
if (setOperation.getKey.isPresent) {
77+
val key: String = setOperation.getKey.get.trim
78+
79+
if (setOperation.getValue.isPresent) {
80+
val newValue: String = setOperation.getValue.get.trim
81+
executor.setSessionProperty(sessionId, key, newValue)
82+
}
83+
84+
val value = executor.getSessionConfigMap(sessionId).getOrDefault(key, "")
85+
ResultSet.builder
86+
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
87+
.columns(
88+
Column.physical("key", DataTypes.STRING()),
89+
Column.physical("value", DataTypes.STRING()))
90+
.data(Array(Row.of(key, value)))
91+
.build
92+
} else {
93+
// show all properties if set without key
94+
val properties: util.Map[String, String] = executor.getSessionConfigMap(sessionId)
95+
96+
val entries = ArrayBuffer.empty[Row]
97+
properties.forEach((key, value) => entries.append(Row.of(key, value)))
98+
99+
if (entries.nonEmpty) {
100+
val prettyEntries = entries.sortBy(_.getField(0).asInstanceOf[String])
101+
ResultSet.builder
102+
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
103+
.columns(
104+
Column.physical("key", DataTypes.STRING()),
105+
Column.physical("value", DataTypes.STRING()))
106+
.data(prettyEntries.toArray)
107+
.build
108+
} else {
109+
ResultSet.builder
110+
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
111+
.columns(
112+
Column.physical("key", DataTypes.STRING()),
113+
Column.physical("value", DataTypes.STRING()))
114+
.data(Array[Row]())
115+
.build
116+
}
117+
}
118+
}
119+
120+
/**
121+
* Runs a ResetOperation with executor. Returns when ResetOperation is executed successfully.
122+
*
123+
* @param resetOperation Reset operation.
124+
* @param executor A gateway for communicating with Flink and other external systems.
125+
* @param sessionId Id of the session.
126+
* @return A ResultSet of ResetOperation execution.
127+
*/
128+
def runResetOperation(
129+
resetOperation: ResetOperation,
130+
executor: Executor,
131+
sessionId: String): ResultSet = {
132+
if (resetOperation.getKey.isPresent) {
133+
// reset the given property
134+
executor.resetSessionProperty(sessionId, resetOperation.getKey.get())
135+
} else {
136+
// reset all properties
137+
executor.resetSessionProperties(sessionId)
138+
}
139+
successResultSet
140+
}
57141
}

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.flink.table.client.gateway.context.DefaultContext
2121
import org.apache.flink.table.client.gateway.local.LocalExecutor
2222
import org.apache.hive.service.rpc.thrift.TProtocolVersion
2323

24+
import org.apache.kyuubi.KyuubiSQLException
2425
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
2526
import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
2627
import org.apache.kyuubi.session.{SessionHandle, SessionManager}
@@ -61,8 +62,17 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
6162
sessionHandle,
6263
sessionContext)
6364

64-
setSession(sessionHandle, sessionImpl)
65-
sessionHandle
65+
try {
66+
sessionImpl.open()
67+
setSession(sessionHandle, sessionImpl)
68+
info(s"$user's session with $sessionHandle is opened, current opening sessions" +
69+
s" $getOpenSessionCount")
70+
sessionHandle
71+
} catch {
72+
case e: Exception =>
73+
sessionImpl.close()
74+
throw KyuubiSQLException(e)
75+
}
6676
}
6777

6878
override def closeSession(sessionHandle: SessionHandle): Unit = {

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.kyuubi.engine.flink.session
1919

20-
import org.apache.flink.table.client.gateway.Executor
20+
import org.apache.flink.table.client.gateway.{Executor, SqlExecutionException}
2121
import org.apache.flink.table.client.gateway.context.SessionContext
2222
import org.apache.hive.service.rpc.thrift.TProtocolVersion
2323

@@ -37,4 +37,19 @@ class FlinkSessionImpl(
3737
def executor: Executor = sessionManager.asInstanceOf[FlinkSQLSessionManager].executor
3838

3939
def sessionId: String = handle.identifier.toString
40+
41+
private def setModifiableConfig(key: String, value: String): Unit = {
42+
try {
43+
sessionContext.set(key, value)
44+
} catch {
45+
case e: SqlExecutionException => warn(e.getMessage)
46+
}
47+
}
48+
49+
override def open(): Unit = {
50+
normalizedConf.foreach {
51+
case (key, value) => setModifiableConfig(key, value)
52+
}
53+
super.open()
54+
}
4055
}

0 commit comments

Comments
 (0)