Skip to content

Commit 34ef880

Browse files
df-Liucxzl25
authored andcommitted
[KYUUBI #3560] Flink SQL engine supports run DDL across versions
### _Why are the changes needed?_ Followup #3230 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #3560 from df-Liu/flink_ddl. Closes #3560 0dbdfb3 [df_liu] flink ddl Authored-by: df_liu <df_liu@trip.com> Signed-off-by: Shaoyun Chen <csy@apache.org> (cherry picked from commit d06c656) Signed-off-by: Shaoyun Chen <csy@apache.org>
1 parent 743e707 commit 34ef880

File tree

2 files changed

+15
-2
lines changed

2 files changed

+15
-2
lines changed

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
2424
import scala.collection.mutable.ArrayBuffer
2525

2626
import org.apache.flink.api.common.JobID
27-
import org.apache.flink.table.api.ResultKind
27+
import org.apache.flink.table.api.{ResultKind, TableResult}
2828
import org.apache.flink.table.client.gateway.TypedResult
2929
import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData}
3030
import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryMapData}
@@ -154,7 +154,12 @@ class ExecuteStatement(
154154
}
155155

156156
private def runOperation(operation: Operation): Unit = {
157-
val result = executor.executeOperation(sessionId, operation)
157+
// FLINK-24461 executeOperation method changes the return type
158+
// from TableResult to TableResultInternal
159+
val executeOperation = DynMethods.builder("executeOperation")
160+
.impl(executor.getClass, classOf[String], classOf[Operation])
161+
.build(executor)
162+
val result = executeOperation.invoke[TableResult](sessionId, operation)
158163
jobId = result.getJobClient.asScala.map(_.getJobID)
159164
result.await()
160165
resultSet = ResultSet.fromTableResult(result)

integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@ class FlinkOperationSuite extends WithKyuubiServerAndFlinkMiniCluster
4848
}
4949
}
5050

51+
test("execute statement - create/alter/drop table") {
52+
withJdbcStatement()({ statement =>
53+
statement.executeQuery("create table tbl_a (a string) with ('connector' = 'blackhole')")
54+
assert(statement.execute("alter table tbl_a rename to tbl_b"))
55+
assert(statement.execute("drop table tbl_b"))
56+
})
57+
}
58+
5159
test("execute statement - select column name with dots") {
5260
withJdbcStatement() { statement =>
5361
val resultSet = statement.executeQuery("select 'tmp.hello'")

0 commit comments

Comments
 (0)