Skip to content

Commit db0047d

Browse files
committed
[KYUUBI #3230] Flink SQL engine supports run across versions
### _Why are the changes needed?_ To make sure flink sql engine build against flink-1.15 can run on flink-1.14. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #3230 from pan3793/flink. Closes #3230 5c3f859 [Cheng Pan] updated e776ee6 [Cheng Pan] review a5cfb0c [Cheng Pan] review 004c05d [Cheng Pan] nit 9c01955 [Cheng Pan] nit 7c3f553 [Cheng Pan] nit 8e1fb04 [Cheng Pan] nit 6adc7db [Cheng Pan] nit bc10739 [Cheng Pan] fix cf98111 [Cheng Pan] workflow a8895e6 [Cheng Pan] nit 8cab518 [Cheng Pan] Flink SQL engine supports run across versions Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 06d43cb commit db0047d

File tree

11 files changed

+1469
-53
lines changed

11 files changed

+1469
-53
lines changed

.github/workflows/master.yml

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,12 @@ jobs:
5656
spark: '3.2'
5757
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.1.3 -Dspark.archive.name=spark-3.1.3-bin-hadoop3.2.tgz'
5858
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.IcebergTest'
59-
comment: 'verify-spark-3.1'
59+
comment: 'verify-on-spark-3.1-binary'
6060
- java: 8
6161
spark: '3.2'
6262
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.3.0 -Dspark.archive.name=spark-3.3.0-bin-hadoop3.tgz'
6363
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.IcebergTest'
64-
comment: 'verify-spark-3.3'
64+
comment: 'verify-on-spark-3.3-binary'
6565
env:
6666
SPARK_LOCAL_IP: localhost
6767
steps:
@@ -111,12 +111,11 @@ jobs:
111111
- '1.15'
112112
flink-archive: [ "" ]
113113
comment: [ "normal" ]
114-
# FIXME: Cross Flink versions verification is not supported yet
115-
# include:
116-
# - java: 8
117-
# flink: '1.15'
118-
# flink-archive: '-Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.14.5 -Dflink.archive.name=flink-1.14.5-bin-scala_2.12.tgz'
119-
# comment: 'verify-flink-1.14'
114+
include:
115+
- java: 8
116+
flink: '1.15'
117+
flink-archive: '-Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.14.5 -Dflink.archive.name=flink-1.14.5-bin-scala_2.12.tgz'
118+
comment: 'verify-on-flink-1.14-binary'
120119
steps:
121120
- uses: actions/checkout@v2
122121
- name: Tune Runner VM
@@ -128,11 +127,21 @@ jobs:
128127
java-version: ${{ matrix.java }}
129128
cache: 'maven'
130129
check-latest: false
131-
- name: Build and test Flink with maven w/o linters
132-
run: |
130+
- name: Build Flink with maven w/o linters
131+
run: |
132+
TEST_MODULES="externals/kyuubi-flink-sql-engine,integration-tests/kyuubi-flink-it"
133+
./build/mvn ${MVN_OPT} -pl ${TEST_MODULES} -Pflink-${{ matrix.flink }} ${{ matrix.flink-archive }} -am clean install -DskipTests
134+
- name: Test Flink
135+
if: matrix.flink-archive == ''
136+
run: |
133137
TEST_MODULES="externals/kyuubi-flink-sql-engine,integration-tests/kyuubi-flink-it"
134-
./build/mvn ${MVN_OPT} -pl ${TEST_MODULES} -Pflink-${{ matrix.flink }} ${{ matrix.flink-archive }} -am clean install -DskipTests
135138
./build/mvn ${MVN_OPT} -pl ${TEST_MODULES} -Pflink-${{ matrix.flink }} ${{ matrix.flink-archive }} test
139+
- name: Cross-version test Flink
140+
if: matrix.flink-archive != ''
141+
run: |
142+
IT_FLINK=`echo "${{ matrix.flink-archive }}" | grep -E 'flink\-([0-9]+\.[0-9]+.[0-9]+)\-bin' -o | grep -E '[0-9]+\.[0-9]+' -o`
143+
IT_MODULE="integration-tests/kyuubi-flink-it"
144+
./build/mvn ${MVN_OPT} -pl ${IT_MODULE} -Pflink-${IT_FLINK} ${{ matrix.flink-archive }} test
136145
- name: Upload test logs
137146
if: failure()
138147
uses: actions/upload-artifact@v2

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/SemanticVersion.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ case class SemanticVersion(majorVersion: Int, minorVersion: Int) {
5656
val targetMinor = targetVersion.minorVersion
5757
callback(targetMajor, targetMinor, this.majorVersion, this.minorVersion)
5858
}
59+
60+
override def toString: String = s"$majorVersion.$minorVersion"
5961
}
6062

6163
object SemanticVersion {

extensions/spark/kyuubi-spark-connector-common/src/main/scala/org/apache/kyuubi/spark/connector/common/SemanticVersion.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ case class SemanticVersion(majorVersion: Int, minorVersion: Int) {
5656
val targetMinor = targetVersion.minorVersion
5757
callback(targetMajor, targetMinor, this.majorVersion, this.minorVersion)
5858
}
59+
60+
override def toString: String = s"$majorVersion.$minorVersion"
5961
}
6062

6163
object SemanticVersion {

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,29 @@ object FlinkEngineUtils extends Logging {
3939
val MODE_EMBEDDED = "embedded"
4040
val EMBEDDED_MODE_CLIENT_OPTIONS: Options = getEmbeddedModeClientOptions(new Options);
4141

42+
val SUPPORTED_FLINK_VERSIONS: Array[SemanticVersion] =
43+
Array("1.14", "1.15").map(SemanticVersion.apply)
44+
4245
def checkFlinkVersion(): Unit = {
4346
val flinkVersion = EnvironmentInformation.getVersion
44-
SemanticVersion(flinkVersion) match {
45-
case SemanticVersion(1, 14 | 15) =>
46-
logger.info(s"The current Flink version is $flinkVersion")
47-
case _ =>
48-
throw new UnsupportedOperationException(
49-
s"The current Flink version is $flinkVersion, " +
50-
s"Only Flink 1.14.x and 1.15 are supported, not supported in other versions")
47+
if (SUPPORTED_FLINK_VERSIONS.contains(SemanticVersion(flinkVersion))) {
48+
info(s"The current Flink version is $flinkVersion")
49+
} else {
50+
throw new UnsupportedOperationException(
51+
s"You are using unsupported Flink version $flinkVersion, " +
52+
s"only Flink ${SUPPORTED_FLINK_VERSIONS.mkString(", ")} are supported now.")
5153
}
5254
}
5355

56+
def isFlinkVersionAtMost(targetVersionString: String): Boolean =
57+
SemanticVersion(EnvironmentInformation.getVersion).isVersionAtMost(targetVersionString)
58+
59+
def isFlinkVersionAtLeast(targetVersionString: String): Boolean =
60+
SemanticVersion(EnvironmentInformation.getVersion).isVersionAtLeast(targetVersionString)
61+
62+
def isFlinkVersionEqualTo(targetVersionString: String): Boolean =
63+
SemanticVersion(EnvironmentInformation.getVersion).isVersionEqualTo(targetVersionString)
64+
5465
def parseCliOptions(args: Array[String]): CliOptions = {
5566
val (mode, modeArgs) =
5667
if (args.isEmpty || args(0).startsWith("-")) (MODE_EMBEDDED, args)

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@ import org.apache.flink.table.types.logical._
3434
import org.apache.flink.types.Row
3535

3636
import org.apache.kyuubi.Logging
37+
import org.apache.kyuubi.engine.flink.FlinkEngineUtils._
3738
import org.apache.kyuubi.engine.flink.result.ResultSet
3839
import org.apache.kyuubi.engine.flink.schema.RowSet.toHiveString
3940
import org.apache.kyuubi.operation.OperationState
4041
import org.apache.kyuubi.operation.log.OperationLog
42+
import org.apache.kyuubi.reflection.DynMethods
4143
import org.apache.kyuubi.session.Session
4244
import org.apache.kyuubi.util.RowSetUtils
4345

@@ -116,17 +118,16 @@ class ExecuteStatement(
116118
(1 to result.getPayload).foreach { page =>
117119
if (rows.size < resultMaxRows) {
118120
// FLINK-24461 retrieveResultPage method changes the return type from Row to RowData
119-
val result = executor.retrieveResultPage(resultId, page).asScala.toList
120-
result.headOption match {
121-
case None =>
122-
case Some(r) =>
123-
// for flink 1.14
124-
if (r.getClass == classOf[Row]) {
125-
rows ++= result.asInstanceOf[List[Row]]
126-
} else {
127-
// for flink 1.15+
128-
rows ++= result.map(r => convertToRow(r.asInstanceOf[RowData], dataTypes))
129-
}
121+
val retrieveResultPage = DynMethods.builder("retrieveResultPage")
122+
.impl(executor.getClass, classOf[String], classOf[Int])
123+
.build(executor)
124+
val _page = Integer.valueOf(page)
125+
if (isFlinkVersionEqualTo("1.14")) {
126+
val result = retrieveResultPage.invoke[util.List[Row]](resultId, _page)
127+
rows ++= result.asScala
128+
} else if (isFlinkVersionAtLeast("1.15")) {
129+
val result = retrieveResultPage.invoke[util.List[RowData]](resultId, _page)
130+
rows ++= result.asScala.map(r => convertToRow(r, dataTypes))
130131
}
131132
} else {
132133
loop = false
@@ -216,7 +217,7 @@ class ExecuteStatement(
216217
case d: BinaryMapData =>
217218
val kvArray = toArray(keyType, valueType, d)
218219
val map: util.Map[Any, Any] = new util.HashMap[Any, Any]
219-
for (i <- 0 until kvArray._1.length) {
220+
for (i <- kvArray._1.indices) {
220221
val value: Any = kvArray._2(i)
221222
map.put(kvArray._1(i), value)
222223
}
@@ -229,11 +230,13 @@ class ExecuteStatement(
229230
case _: DoubleType =>
230231
row.setField(i, r.getDouble(i))
231232
case t: RowType =>
232-
val clazz = Class.forName("org.apache.flink.table.types.DataType")
233-
val fieldDataTypes = clazz.getDeclaredMethod("getFieldDataTypes", classOf[DataType])
234-
.invoke(null, dataType).asInstanceOf[java.util.List[DataType]]
233+
val fieldDataTypes = DynMethods.builder("getFieldDataTypes")
234+
.impl(classOf[DataType], classOf[DataType])
235+
.buildStatic
236+
.invoke[util.List[DataType]](dataType)
237+
.asScala.toList
235238
val internalRowData = r.getRow(i, t.getFieldCount)
236-
val internalRow = convertToRow(internalRowData, fieldDataTypes.asScala.toList)
239+
val internalRow = convertToRow(internalRowData, fieldDataTypes)
237240
row.setField(i, internalRow)
238241
case t =>
239242
val hiveString = toHiveString((row.getField(i), t))

externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,14 @@ import java.util.UUID
2323

2424
import scala.collection.JavaConverters._
2525

26-
import org.apache.flink.runtime.util.EnvironmentInformation
2726
import org.apache.flink.table.types.logical.LogicalTypeRoot
2827
import org.apache.hive.service.rpc.thrift._
2928
import org.scalatest.concurrent.PatienceConfiguration.Timeout
3029
import org.scalatest.time.SpanSugar._
3130

3231
import org.apache.kyuubi.config.KyuubiConf._
3332
import org.apache.kyuubi.config.KyuubiConf.OperationModes.NONE
34-
import org.apache.kyuubi.engine.SemanticVersion
33+
import org.apache.kyuubi.engine.flink.FlinkEngineUtils._
3534
import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
3635
import org.apache.kyuubi.engine.flink.result.Constants
3736
import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
@@ -46,8 +45,6 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
4645
override protected def jdbcUrl: String =
4746
s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;"
4847

49-
val runtimeVersion = SemanticVersion(EnvironmentInformation.getVersion)
50-
5148
ignore("release session if shared level is CONNECTION") {
5249
logger.info(s"jdbc url is $jdbcUrl")
5350
assert(engine.getServiceState == STARTED)
@@ -579,10 +576,13 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
579576
val metaData = resultSet.getMetaData
580577
assert(metaData.getColumnType(1) === java.sql.Types.ARRAY)
581578
assert(resultSet.next())
582-
runtimeVersion.minorVersion match {
583-
case 14 =>
584-
assert(resultSet.getObject(1).toString == """["v1","v2","v3"]""")
585-
case _ => assert(resultSet.getObject(1).toString == "[v1,v2,v3]")
579+
if (isFlinkVersionEqualTo("1.14")) {
580+
val expected = """["v1","v2","v3"]"""
581+
assert(resultSet.getObject(1).toString == expected)
582+
}
583+
if (isFlinkVersionAtLeast("1.15")) {
584+
val expected = "[v1,v2,v3]"
585+
assert(resultSet.getObject(1).toString == expected)
586586
}
587587
}
588588
}
@@ -603,12 +603,13 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
603603
val resultSet =
604604
statement.executeQuery("select (1, '2', true)")
605605
assert(resultSet.next())
606-
runtimeVersion.minorVersion match {
607-
case 14 => assert(resultSet.getString(
608-
1) == "{INT NOT NULL:1,CHAR(1) NOT NULL:\"2\",BOOLEAN NOT NULL:true}")
609-
case _ =>
610-
assert(
611-
resultSet.getString(1) == "{INT NOT NULL:1,CHAR(1) NOT NULL:2,BOOLEAN NOT NULL:true}")
606+
if (isFlinkVersionEqualTo("1.14")) {
607+
val expected = """{INT NOT NULL:1,CHAR(1) NOT NULL:"2",BOOLEAN NOT NULL:true}"""
608+
assert(resultSet.getString(1) == expected)
609+
}
610+
if (isFlinkVersionAtLeast("1.15")) {
611+
val expected = """{INT NOT NULL:1,CHAR(1) NOT NULL:2,BOOLEAN NOT NULL:true}"""
612+
assert(resultSet.getString(1) == expected)
612613
}
613614
val metaData = resultSet.getMetaData
614615
assert(metaData.getColumnType(1) === java.sql.Types.STRUCT)
@@ -619,11 +620,12 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
619620
withJdbcStatement() { statement =>
620621
val resultSet = statement.executeQuery("select encode('kyuubi', 'UTF-8')")
621622
assert(resultSet.next())
622-
runtimeVersion.minorVersion match {
623-
case 14 => assert(resultSet.getString(1) == "kyuubi")
624-
case _ =>
625-
// TODO: validate table results after FLINK-28882 is resolved
626-
assert(resultSet.getString(1) == "k")
623+
if (isFlinkVersionEqualTo("1.14")) {
624+
assert(resultSet.getString(1) == "kyuubi")
625+
}
626+
if (isFlinkVersionAtLeast("1.15")) {
627+
// TODO: validate table results after FLINK-28882 is resolved
628+
assert(resultSet.getString(1) == "k")
627629
}
628630
val metaData = resultSet.getMetaData
629631
assert(metaData.getColumnType(1) === java.sql.Types.BINARY)
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.kyuubi.reflection;
21+
22+
import java.util.LinkedHashSet;
23+
import java.util.Set;
24+
25+
/** Copied from iceberg-common */
26+
public class DynClasses {
27+
28+
private DynClasses() {}
29+
30+
public static Builder builder() {
31+
return new Builder();
32+
}
33+
34+
public static class Builder {
35+
private ClassLoader loader = Thread.currentThread().getContextClassLoader();
36+
private Class<?> foundClass = null;
37+
private boolean nullOk = false;
38+
private Set<String> classNames = new LinkedHashSet<>();
39+
40+
private Builder() {}
41+
42+
/**
43+
* Set the {@link ClassLoader} used to lookup classes by name.
44+
*
45+
* <p>If not set, the current thread's ClassLoader is used.
46+
*
47+
* @param newLoader a ClassLoader
48+
* @return this Builder for method chaining
49+
*/
50+
public Builder loader(ClassLoader newLoader) {
51+
this.loader = newLoader;
52+
return this;
53+
}
54+
55+
/**
56+
* Checks for an implementation of the class by name.
57+
*
58+
* @param className name of a class
59+
* @return this Builder for method chaining
60+
*/
61+
public Builder impl(String className) {
62+
classNames.add(className);
63+
64+
if (foundClass != null) {
65+
return this;
66+
}
67+
68+
try {
69+
this.foundClass = Class.forName(className, true, loader);
70+
} catch (ClassNotFoundException e) {
71+
// not the right implementation
72+
}
73+
74+
return this;
75+
}
76+
77+
/**
78+
* Instructs this builder to return null if no class is found, rather than throwing an
79+
* Exception.
80+
*
81+
* @return this Builder for method chaining
82+
*/
83+
public Builder orNull() {
84+
this.nullOk = true;
85+
return this;
86+
}
87+
88+
/**
89+
* Returns the first implementation or throws ClassNotFoundException if one was not found.
90+
*
91+
* @param <S> Java superclass
92+
* @return a {@link Class} for the first implementation found
93+
* @throws ClassNotFoundException if no implementation was found
94+
*/
95+
@SuppressWarnings("unchecked")
96+
public <S> Class<? extends S> buildChecked() throws ClassNotFoundException {
97+
if (!nullOk && foundClass == null) {
98+
throw new ClassNotFoundException(
99+
"Cannot find class; alternatives: " + String.join(", ", classNames));
100+
}
101+
return (Class<? extends S>) foundClass;
102+
}
103+
104+
/**
105+
* Returns the first implementation or throws RuntimeException if one was not found.
106+
*
107+
* @param <S> Java superclass
108+
* @return a {@link Class} for the first implementation found
109+
* @throws RuntimeException if no implementation was found
110+
*/
111+
@SuppressWarnings("unchecked")
112+
public <S> Class<? extends S> build() {
113+
if (!nullOk && foundClass == null) {
114+
throw new RuntimeException(
115+
"Cannot find class; alternatives: " + String.join(", ", classNames));
116+
}
117+
return (Class<? extends S>) foundClass;
118+
}
119+
}
120+
}

0 commit comments

Comments
 (0)