Skip to content

Commit c84ea87

Browse files
wolfboyspan3793
authored andcommitted
[KYUUBI #2730] [WIP][KYUUBI #2238] Support Flink 1.15
### Why are the changes needed? Flink 1.15.0 is out, Kyuubi needs to support it. Please clarify why the changes are needed. For instance 1. new add flink 1.15 support 2. new add `flink-1.14` and `flink-1.15` maven profiles to support both Flink 1.14 & 1.15 flink 1.14 all functions and codes are kept unchanged without any impact flink 1.15 support part of the work is still in progress, At present, 32 test cases have passed and 5 have failed, You can review, put forward suggestions for improvement, I will continue to improve. ### how to build with flink 1.14 ``` mvn clean install -DskipTests -Pflink-1.14 ``` ### how to build with flink 1.15 ``` mvn clean install -DskipTests -Pflink-1.15 ``` ### how to run test cases with flink 1.14 enable `flink-1.14` maven profile and run the test cases ### how to run test cases with flink 1.15 enable `flink-1.15` maven profile and run the test cases ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2730 from wolfboys/master. Closes #2730 Closes #2238 ba9716f [benjobs] Improve check flink version 0616e74 [benjobs] [KYUUBI #2238] Support Flink 1.15 Authored-by: benjobs <benjobs@qq.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 81c48b0 commit c84ea87

File tree

6 files changed

+151
-31
lines changed

6 files changed

+151
-31
lines changed

externals/kyuubi-flink-sql-engine/pom.xml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,19 +61,19 @@
6161

6262
<dependency>
6363
<groupId>org.apache.flink</groupId>
64-
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
64+
<artifactId>flink-streaming-java${flink.module.scala.suffix}</artifactId>
6565
<scope>provided</scope>
6666
</dependency>
6767

6868
<dependency>
6969
<groupId>org.apache.flink</groupId>
70-
<artifactId>flink-clients_${scala.binary.version}</artifactId>
70+
<artifactId>flink-clients${flink.module.scala.suffix}</artifactId>
7171
<scope>provided</scope>
7272
</dependency>
7373

7474
<dependency>
7575
<groupId>org.apache.flink</groupId>
76-
<artifactId>flink-sql-client_${scala.binary.version}</artifactId>
76+
<artifactId>flink-sql-client${flink.module.scala.suffix}</artifactId>
7777
<scope>provided</scope>
7878
</dependency>
7979

@@ -91,7 +91,7 @@
9191

9292
<dependency>
9393
<groupId>org.apache.flink</groupId>
94-
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
94+
<artifactId>flink-table-api-java-bridge${flink.module.scala.suffix}</artifactId>
9595
<scope>provided</scope>
9696
</dependency>
9797

@@ -103,7 +103,7 @@
103103

104104
<dependency>
105105
<groupId>org.apache.flink</groupId>
106-
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
106+
<artifactId>flink-table-runtime${flink.module.scala.suffix}</artifactId>
107107
<scope>provided</scope>
108108
</dependency>
109109

@@ -137,7 +137,7 @@
137137

138138
<dependency>
139139
<groupId>org.apache.flink</groupId>
140-
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
140+
<artifactId>flink-test-utils${flink.module.scala.suffix}</artifactId>
141141
<scope>test</scope>
142142
</dependency>
143143
</dependencies>

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.flink.table.client.cli.CliOptionsParser._
3131
import org.apache.flink.table.client.gateway.context.SessionContext
3232
import org.apache.flink.table.client.gateway.local.LocalExecutor
3333

34-
import org.apache.kyuubi.Logging
34+
import org.apache.kyuubi.{Logging, Utils}
3535

3636
object FlinkEngineUtils extends Logging {
3737

@@ -40,8 +40,13 @@ object FlinkEngineUtils extends Logging {
4040

4141
def checkFlinkVersion(): Unit = {
4242
val flinkVersion = EnvironmentInformation.getVersion
43-
if (!flinkVersion.startsWith("1.14")) {
44-
throw new RuntimeException("Only Flink-1.14.x is supported now!")
43+
Utils.majorMinorVersion(flinkVersion) match {
44+
case (1, 14 | 15) =>
45+
logger.info(s"The current Flink version is $flinkVersion")
46+
case _ =>
47+
throw new UnsupportedOperationException(
48+
s"The current Flink version is $flinkVersion, " +
49+
s"Only Flink 1.14.x and 1.15 are supported, not supported in other versions")
4550
}
4651
}
4752

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.kyuubi.engine.flink.operation
1919

20+
import java.time.LocalDate
21+
import java.util
2022
import java.util.concurrent.RejectedExecutionException
2123

2224
import scala.collection.JavaConverters._
@@ -26,15 +28,21 @@ import com.google.common.annotations.VisibleForTesting
2628
import org.apache.calcite.rel.metadata.{DefaultRelMetadataProvider, JaninoRelMetadataProvider, RelMetadataQueryBase}
2729
import org.apache.flink.table.api.ResultKind
2830
import org.apache.flink.table.client.gateway.{Executor, TypedResult}
31+
import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData}
32+
import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryMapData}
2933
import org.apache.flink.table.operations.{Operation, QueryOperation}
3034
import org.apache.flink.table.operations.command._
35+
import org.apache.flink.table.types.DataType
36+
import org.apache.flink.table.types.logical._
3137
import org.apache.flink.types.Row
3238

3339
import org.apache.kyuubi.{KyuubiSQLException, Logging}
3440
import org.apache.kyuubi.engine.flink.result.ResultSet
41+
import org.apache.kyuubi.engine.flink.schema.RowSet.toHiveString
3542
import org.apache.kyuubi.operation.{OperationState, OperationType}
3643
import org.apache.kyuubi.operation.log.OperationLog
3744
import org.apache.kyuubi.session.Session
45+
import org.apache.kyuubi.util.RowSetUtils
3846

3947
class ExecuteStatement(
4048
session: Session,
@@ -127,6 +135,7 @@ class ExecuteStatement(
127135
var resultId: String = null
128136
try {
129137
val resultDescriptor = executor.executeQuery(sessionId, operation)
138+
val dataTypes = resultDescriptor.getResultSchema.getColumnDataTypes.asScala.toList
130139

131140
resultId = resultDescriptor.getResultId
132141

@@ -142,7 +151,19 @@ class ExecuteStatement(
142151
case TypedResult.ResultType.PAYLOAD =>
143152
(1 to result.getPayload).foreach { page =>
144153
if (rows.size < resultMaxRows) {
145-
rows ++= executor.retrieveResultPage(resultId, page).asScala
154+
// FLINK-24461 retrieveResultPage method changes the return type from Row to RowData
155+
val result = executor.retrieveResultPage(resultId, page).asScala.toList
156+
result.headOption match {
157+
case None =>
158+
case Some(r) =>
159+
// for flink 1.14
160+
if (r.getClass == classOf[Row]) {
161+
rows ++= result.asInstanceOf[List[Row]]
162+
} else {
163+
// for flink 1.15+
164+
rows ++= result.map(r => convertToRow(r.asInstanceOf[RowData], dataTypes))
165+
}
166+
}
146167
} else {
147168
loop = false
148169
}
@@ -178,4 +199,88 @@ class ExecuteStatement(
178199
warn(s"Failed to clean result set $resultId in session $sessionId", t)
179200
}
180201
}
202+
203+
private[this] def convertToRow(r: RowData, dataTypes: List[DataType]): Row = {
204+
val row = Row.withPositions(r.getRowKind, r.getArity)
205+
for (i <- 0 until r.getArity) {
206+
val dataType = dataTypes(i)
207+
dataType.getLogicalType match {
208+
case arrayType: ArrayType =>
209+
val arrayData = r.getArray(i)
210+
if (arrayData == null) {
211+
row.setField(i, null)
212+
}
213+
arrayData match {
214+
case d: GenericArrayData =>
215+
row.setField(i, d.toObjectArray)
216+
case d: BinaryArrayData =>
217+
row.setField(i, d.toObjectArray(arrayType.getElementType))
218+
case _ =>
219+
}
220+
case _: BinaryType =>
221+
row.setField(i, r.getBinary(i))
222+
case _: BigIntType =>
223+
row.setField(i, r.getLong(i))
224+
case _: BooleanType =>
225+
row.setField(i, r.getBoolean(i))
226+
case _: VarCharType | _: CharType =>
227+
row.setField(i, r.getString(i))
228+
case t: DecimalType =>
229+
row.setField(i, r.getDecimal(i, t.getPrecision, t.getScale).toBigDecimal)
230+
case _: DateType =>
231+
val date = RowSetUtils.formatLocalDate(LocalDate.ofEpochDay(r.getInt(i)))
232+
row.setField(i, date)
233+
case t: TimestampType =>
234+
val ts = RowSetUtils
235+
.formatLocalDateTime(r.getTimestamp(i, t.getPrecision)
236+
.toLocalDateTime)
237+
row.setField(i, ts)
238+
case _: TinyIntType =>
239+
row.setField(i, r.getByte(i))
240+
case _: SmallIntType =>
241+
row.setField(i, r.getShort(i))
242+
case _: IntType =>
243+
row.setField(i, r.getInt(i))
244+
case _: FloatType =>
245+
row.setField(i, r.getFloat(i))
246+
case mapType: MapType =>
247+
val mapData = r.getMap(i)
248+
if (mapData != null && mapData.size > 0) {
249+
val keyType = mapType.getKeyType
250+
val valueType = mapType.getValueType
251+
mapData match {
252+
case d: BinaryMapData =>
253+
val kvArray = toArray(keyType, valueType, d)
254+
val map: util.Map[Any, Any] = new util.HashMap[Any, Any]
255+
for (i <- 0 until kvArray._1.length) {
256+
val value: Any = kvArray._2(i)
257+
map.put(kvArray._1(i), value)
258+
}
259+
row.setField(i, map)
260+
case d: GenericMapData => // TODO
261+
}
262+
} else {
263+
row.setField(i, null)
264+
}
265+
case _: DoubleType =>
266+
row.setField(i, r.getDouble(i))
267+
case t: RowType =>
268+
val v = r.getRow(i, t.getFieldCount)
269+
row.setField(i, v)
270+
case t =>
271+
val hiveString = toHiveString((row.getField(i), t))
272+
row.setField(i, hiveString)
273+
}
274+
}
275+
row
276+
}
277+
278+
private[this] def toArray(
279+
keyType: LogicalType,
280+
valueType: LogicalType,
281+
arrayData: BinaryMapData): (Array[_], Array[_]) = {
282+
283+
arrayData.keyArray().toObjectArray(keyType) -> arrayData.valueArray().toObjectArray(valueType)
284+
}
285+
181286
}

externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ import java.nio.file.Files
2121
import java.sql.DatabaseMetaData
2222
import java.util.UUID
2323

24-
import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
25-
import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE
2624
import org.apache.flink.table.types.logical.LogicalTypeRoot
2725
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchResultsReq, TOpenSessionReq}
2826
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -349,16 +347,18 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
349347
withJdbcStatement() { statement =>
350348
val metaData = statement.getConnection.getMetaData
351349
var resultSet = metaData.getSchemas(null, null)
350+
val defaultCatalog = "default_catalog"
351+
val defaultDatabase = "default_database"
352352
while (resultSet.next()) {
353-
assert(resultSet.getString(TABLE_SCHEM) === DEFAULT_BUILTIN_DATABASE)
354-
assert(resultSet.getString(TABLE_CATALOG) === DEFAULT_BUILTIN_CATALOG)
353+
assert(resultSet.getString(TABLE_SCHEM) === defaultDatabase)
354+
assert(resultSet.getString(TABLE_CATALOG) === defaultCatalog)
355355
}
356356
resultSet = metaData.getSchemas(
357-
DEFAULT_BUILTIN_CATALOG.split("_").apply(0),
358-
DEFAULT_BUILTIN_DATABASE.split("_").apply(0))
357+
defaultCatalog.split("_").apply(0),
358+
defaultDatabase.split("_").apply(0))
359359
while (resultSet.next()) {
360-
assert(resultSet.getString(TABLE_SCHEM) === DEFAULT_BUILTIN_DATABASE)
361-
assert(resultSet.getString(TABLE_CATALOG) === DEFAULT_BUILTIN_CATALOG)
360+
assert(resultSet.getString(TABLE_SCHEM) === defaultDatabase)
361+
assert(resultSet.getString(TABLE_CATALOG) === defaultCatalog)
362362
}
363363
}
364364
}

integration-tests/kyuubi-flink-it/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@
7777

7878
<dependency>
7979
<groupId>org.apache.flink</groupId>
80-
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
80+
<artifactId>flink-table-runtime${flink.module.scala.suffix}</artifactId>
8181
<scope>test</scope>
8282
</dependency>
8383

pom.xml

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@
111111
<delta.version>1.2.1</delta.version>
112112
<fb303.version>0.9.3</fb303.version>
113113
<flink.version>1.14.4</flink.version>
114+
<flink.module.scala.suffix>_${scala.binary.version}</flink.module.scala.suffix>
114115
<flink.archive.name>flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.archive.name>
115116
<flink.archive.mirror>${apache.archive.dist}/flink/flink-${flink.version}</flink.archive.mirror>
116117
<flink.archive.download.skip>false</flink.archive.download.skip>
@@ -1295,13 +1296,13 @@
12951296

12961297
<dependency>
12971298
<groupId>org.apache.flink</groupId>
1298-
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
1299+
<artifactId>flink-streaming-java${flink.module.scala.suffix}</artifactId>
12991300
<version>${flink.version}</version>
13001301
</dependency>
13011302

13021303
<dependency>
13031304
<groupId>org.apache.flink</groupId>
1304-
<artifactId>flink-clients_${scala.binary.version}</artifactId>
1305+
<artifactId>flink-clients${flink.module.scala.suffix}</artifactId>
13051306
<version>${flink.version}</version>
13061307
</dependency>
13071308

@@ -1319,7 +1320,7 @@
13191320

13201321
<dependency>
13211322
<groupId>org.apache.flink</groupId>
1322-
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
1323+
<artifactId>flink-table-api-java-bridge${flink.module.scala.suffix}</artifactId>
13231324
<version>${flink.version}</version>
13241325
</dependency>
13251326

@@ -1331,7 +1332,7 @@
13311332

13321333
<dependency>
13331334
<groupId>org.apache.flink</groupId>
1334-
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
1335+
<artifactId>flink-table-runtime${flink.module.scala.suffix}</artifactId>
13351336
<version>${flink.version}</version>
13361337
<scope>provided</scope>
13371338
</dependency>
@@ -1344,19 +1345,13 @@
13441345

13451346
<dependency>
13461347
<groupId>org.apache.flink</groupId>
1347-
<artifactId>flink-yarn_${scala.binary.version}</artifactId>
1348+
<artifactId>flink-sql-client${flink.module.scala.suffix}</artifactId>
13481349
<version>${flink.version}</version>
13491350
</dependency>
13501351

13511352
<dependency>
13521353
<groupId>org.apache.flink</groupId>
1353-
<artifactId>flink-sql-client_${scala.binary.version}</artifactId>
1354-
<version>${flink.version}</version>
1355-
</dependency>
1356-
1357-
<dependency>
1358-
<groupId>org.apache.flink</groupId>
1359-
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
1354+
<artifactId>flink-test-utils${flink.module.scala.suffix}</artifactId>
13601355
<version>${flink.version}</version>
13611356
<exclusions>
13621357
<exclusion>
@@ -2027,6 +2022,21 @@
20272022
</properties>
20282023
</profile>
20292024

2025+
<profile>
2026+
<id>flink-1.14</id>
2027+
<properties>
2028+
<flink.version>1.14.4</flink.version>
2029+
<flink.module.scala.suffix>_${scala.binary.version}</flink.module.scala.suffix>
2030+
</properties>
2031+
</profile>
2032+
<profile>
2033+
<id>flink-1.15</id>
2034+
<properties>
2035+
<flink.version>1.15.0</flink.version>
2036+
<flink.module.scala.suffix></flink.module.scala.suffix>
2037+
</properties>
2038+
</profile>
2039+
20302040
<profile>
20312041
<id>spark-provided</id>
20322042
<properties>

0 commit comments

Comments
 (0)