Skip to content

Commit ef8de37

Browse files
link3280yaooqinn
authored andcommitted
[KYUUBI #1778] Support Flink Set/Reset Operations
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ Set/Reset operations are crucial for tunning Flink SQL jobs and enabling/disabling features, but they're not executed as SQL statements in Flink, thus can't be supported by the current ExecuteStatement implementation. We should extend ExecuteStatement to support these operations. This is a sub-task of KPIP-2 #1322. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1822 from link3280/feature/KYUUBI-1778. Closes #1778 80b2de0 [Paul Lin] [KYUUBI #1825] Return key-value properties in two separate columns 786f3c5 [Paul Lin] [KYUUBI #1778] Support Flink reset operations 6513a31 [Paul Lin] [KYUUBI #1778] Support Flink set operations Authored-by: Paul Lin <paullin3280@gmail.com> Signed-off-by: Kent Yao <yao@apache.org>
1 parent b47e1cd commit ef8de37

File tree

3 files changed

+119
-2
lines changed

3 files changed

+119
-2
lines changed

externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/OperationUtil.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,19 @@ public static ResultSet stringListToResultSet(List<String> strings, String colum
5555
.data(data.toArray(new Row[0]))
5656
.build();
5757
}
58+
59+
/**
60+
* Build a simple result with OK message. Returned when SQL commands are executed successfully.
61+
* Noted that a new ResultSet is returned each time, because ResultSet is stateful (with its
62+
* cursor).
63+
*
64+
* @return A simple result with OK message.
65+
*/
66+
public static ResultSet successResultSet() {
67+
return ResultSet.builder()
68+
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
69+
.columns(Column.physical("result", DataTypes.STRING()))
70+
.data(new Row[] {Row.of("OK")})
71+
.build();
72+
}
5873
}

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,22 @@
1717

1818
package org.apache.kyuubi.engine.flink.operation
1919

20+
import java.util
2021
import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}
2122

2223
import scala.collection.JavaConverters._
2324
import scala.collection.mutable.ArrayBuffer
2425

2526
import com.google.common.annotations.VisibleForTesting
26-
import org.apache.flink.table.api.ResultKind
27+
import org.apache.flink.table.api.{DataTypes, ResultKind}
28+
import org.apache.flink.table.catalog.Column
2729
import org.apache.flink.table.client.gateway.{Executor, TypedResult}
2830
import org.apache.flink.table.operations.{Operation, QueryOperation}
31+
import org.apache.flink.table.operations.command.{ResetOperation, SetOperation}
2932
import org.apache.flink.types.Row
3033

3134
import org.apache.kyuubi.{KyuubiSQLException, Logging}
32-
import org.apache.kyuubi.engine.flink.result.ResultSet
35+
import org.apache.kyuubi.engine.flink.result.{OperationUtil, ResultSet}
3336
import org.apache.kyuubi.operation.{OperationState, OperationType}
3437
import org.apache.kyuubi.operation.log.OperationLog
3538
import org.apache.kyuubi.session.Session
@@ -100,6 +103,8 @@ class ExecuteStatement(
100103
val operation = executor.parseStatement(sessionId, statement)
101104
operation match {
102105
case queryOperation: QueryOperation => runQueryOperation(queryOperation)
106+
case setOperation: SetOperation => runSetOperation(setOperation)
107+
case resetOperation: ResetOperation => runResetOperation(resetOperation)
103108
case operation: Operation => runOperation(operation)
104109
}
105110
} catch {
@@ -139,6 +144,64 @@ class ExecuteStatement(
139144
setState(OperationState.FINISHED)
140145
}
141146

147+
private def runSetOperation(setOperation: SetOperation): Unit = {
148+
if (setOperation.getKey.isPresent) {
149+
val key: String = setOperation.getKey.get.trim
150+
151+
if (setOperation.getValue.isPresent) {
152+
val newValue: String = setOperation.getValue.get.trim
153+
executor.setSessionProperty(sessionId, key, newValue)
154+
}
155+
156+
val value = executor.getSessionConfigMap(sessionId).getOrDefault(key, "")
157+
resultSet = ResultSet.builder
158+
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
159+
.columns(
160+
Column.physical("key", DataTypes.STRING()),
161+
Column.physical("value", DataTypes.STRING()))
162+
.data(Array(Row.of(key, value)))
163+
.build
164+
} else {
165+
// show all properties if set without key
166+
val properties: util.Map[String, String] = executor.getSessionConfigMap(sessionId)
167+
168+
val entries = ArrayBuffer.empty[Row]
169+
properties.forEach((key, value) => entries.append(Row.of(key, value)))
170+
171+
if (entries.nonEmpty) {
172+
val prettyEntries = entries.sortBy(_.getField(0).asInstanceOf[String])
173+
resultSet = ResultSet.builder
174+
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
175+
.columns(
176+
Column.physical("key", DataTypes.STRING()),
177+
Column.physical("value", DataTypes.STRING()))
178+
.data(prettyEntries.toArray)
179+
.build
180+
} else {
181+
resultSet = ResultSet.builder
182+
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
183+
.columns(
184+
Column.physical("key", DataTypes.STRING()),
185+
Column.physical("value", DataTypes.STRING()))
186+
.data(Array[Row]())
187+
.build
188+
}
189+
}
190+
setState(OperationState.FINISHED)
191+
}
192+
193+
private def runResetOperation(resetOperation: ResetOperation): Unit = {
194+
if (resetOperation.getKey.isPresent) {
195+
// reset the given property
196+
executor.resetSessionProperty(sessionId, resetOperation.getKey.get())
197+
} else {
198+
// reset all properties
199+
executor.resetSessionProperties(sessionId)
200+
}
201+
resultSet = OperationUtil.successResultSet()
202+
setState(OperationState.FINISHED)
203+
}
204+
142205
private def runOperation(operation: Operation): Unit = {
143206
val result = executor.executeOperation(sessionId, operation)
144207
result.await()

externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -685,4 +685,43 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
685685
assert(resultSet.getLong(1) == -1L)
686686
})
687687
}
688+
689+
test("execute statement - set properties") {
690+
withMultipleConnectionJdbcStatement()({ statement =>
691+
val resultSet = statement.executeQuery("set table.dynamic-table-options.enabled = true")
692+
val metadata = resultSet.getMetaData
693+
assert(metadata.getColumnName(1) == "key")
694+
assert(metadata.getColumnName(2) == "value")
695+
assert(resultSet.next())
696+
assert(resultSet.getString(1) == "table.dynamic-table-options.enabled")
697+
assert(resultSet.getString(2) == "true")
698+
})
699+
}
700+
701+
test("execute statement - show properties") {
702+
withMultipleConnectionJdbcStatement()({ statement =>
703+
val resultSet = statement.executeQuery("set")
704+
val metadata = resultSet.getMetaData
705+
assert(metadata.getColumnName(1) == "key")
706+
assert(metadata.getColumnName(2) == "value")
707+
assert(resultSet.next())
708+
})
709+
}
710+
711+
test("execute statement - reset property") {
712+
withMultipleConnectionJdbcStatement()({ statement =>
713+
statement.executeQuery("set pipeline.jars = my.jar")
714+
statement.executeQuery("reset pipeline.jars")
715+
val resultSet = statement.executeQuery("set")
716+
// Flink does not support set key without value currently,
717+
// thus read all rows to find the desired one
718+
var success = false
719+
while (resultSet.next()) {
720+
if (resultSet.getString(1) == "pipeline.jars" && resultSet.getString(2) == "") {
721+
success = true
722+
}
723+
}
724+
assert(success)
725+
})
726+
}
688727
}

0 commit comments

Comments
 (0)