From a163c98913f8255dfc869e7368bf87bf7b2124db Mon Sep 17 00:00:00 2001 From: Rodrigo Meneses Date: Tue, 5 Dec 2023 08:54:05 -0800 Subject: [PATCH] Make Flink 1.18 to work --- .github/workflows/flink-ci.yml | 2 +- dev/stage-binaries.sh | 2 +- flink/build.gradle | 8 ++--- flink/v1.18/build.gradle | 36 +++++++++---------- .../org/apache/iceberg/flink/TestHelpers.java | 25 ++++++++++++- .../iceberg/flink/util/TestFlinkPackage.java | 2 +- gradle.properties | 4 +-- gradle/libs.versions.toml | 24 ++++++------- settings.gradle | 18 +++++----- 9 files changed, 72 insertions(+), 49 deletions(-) diff --git a/.github/workflows/flink-ci.yml b/.github/workflows/flink-ci.yml index 30fe6c7723bc..702ae9bc898d 100644 --- a/.github/workflows/flink-ci.yml +++ b/.github/workflows/flink-ci.yml @@ -60,7 +60,7 @@ jobs: strategy: matrix: jvm: [8, 11] - flink: ['1.15', '1.16', '1.17'] + flink: ['1.16', '1.17', '1.18'] env: SPARK_LOCAL_IP: localhost steps: diff --git a/dev/stage-binaries.sh b/dev/stage-binaries.sh index 8b7b3d30aa26..5509a2ea3223 100755 --- a/dev/stage-binaries.sh +++ b/dev/stage-binaries.sh @@ -19,7 +19,7 @@ # SCALA_VERSION=2.12 -FLINK_VERSIONS=1.15,1.16,1.17 +FLINK_VERSIONS=1.16,1.17,1.18 SPARK_VERSIONS=3.2,3.3,3.4,3.5 HIVE_VERSIONS=2,3 diff --git a/flink/build.gradle b/flink/build.gradle index 33c2be904546..a33fc84e5727 100644 --- a/flink/build.gradle +++ b/flink/build.gradle @@ -19,10 +19,6 @@ def flinkVersions = (System.getProperty("flinkVersions") != null ? System.getProperty("flinkVersions") : System.getProperty("defaultFlinkVersions")).split(",") -if (flinkVersions.contains("1.15")) { - apply from: file("$projectDir/v1.15/build.gradle") -} - if (flinkVersions.contains("1.16")) { apply from: file("$projectDir/v1.16/build.gradle") } @@ -30,3 +26,7 @@ if (flinkVersions.contains("1.16")) { if (flinkVersions.contains("1.17")) { apply from: file("$projectDir/v1.17/build.gradle") } + +if (flinkVersions.contains("1.18")) { + apply from: file("$projectDir/v1.18/build.gradle") +} diff --git a/flink/v1.18/build.gradle b/flink/v1.18/build.gradle index 2e081b760878..55578d3b117d 100644 --- a/flink/v1.18/build.gradle +++ b/flink/v1.18/build.gradle @@ -17,7 +17,7 @@ * under the License. */ -String flinkMajorVersion = '1.17' +String flinkMajorVersion = '1.18' String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion") project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { @@ -32,15 +32,15 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { implementation project(':iceberg-parquet') implementation project(':iceberg-hive-metastore') - compileOnly libs.flink117.avro + compileOnly libs.flink118.avro // for dropwizard histogram metrics implementation - compileOnly libs.flink117.metrics.dropwizard - compileOnly libs.flink117.streaming.java - compileOnly "${libs.flink117.streaming.java.get().module}:${libs.flink117.streaming.java.get().getVersion()}:tests" - compileOnly libs.flink117.table.api.java.bridge - compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink117.get()}" - compileOnly libs.flink117.connector.base - compileOnly libs.flink117.connector.files + compileOnly libs.flink118.metrics.dropwizard + compileOnly libs.flink118.streaming.java + compileOnly "${libs.flink118.streaming.java.get().module}:${libs.flink118.streaming.java.get().getVersion()}:tests" + compileOnly libs.flink118.table.api.java.bridge + compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink118.get()}" + compileOnly libs.flink118.connector.base + compileOnly libs.flink118.connector.files compileOnly libs.hadoop2.hdfs compileOnly libs.hadoop2.common @@ -65,13 +65,13 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { exclude group: 'org.apache.hive', module: 'hive-storage-api' } - testImplementation libs.flink117.connector.test.utils - testImplementation libs.flink117.core - testImplementation libs.flink117.runtime - testImplementation(libs.flink117.test.utilsjunit) { + testImplementation libs.flink118.connector.test.utils + testImplementation libs.flink118.core + testImplementation libs.flink118.runtime + testImplementation(libs.flink118.test.utilsjunit) { exclude group: 'junit' } - testImplementation(libs.flink117.test.utils) { + testImplementation(libs.flink118.test.utils) { exclude group: "org.apache.curator", module: 'curator-test' exclude group: 'junit' } @@ -164,7 +164,7 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") { } // for dropwizard histogram metrics implementation - implementation libs.flink117.metrics.dropwizard + implementation libs.flink118.metrics.dropwizard // for integration testing with the flink-runtime-jar // all of those dependencies are required because the integration test extends FlinkTestBase @@ -174,13 +174,13 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") { integrationImplementation project(path: ":iceberg-flink:iceberg-flink-${flinkMajorVersion}", configuration: "testArtifacts") integrationImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') - integrationImplementation(libs.flink117.test.utils) { + integrationImplementation(libs.flink118.test.utils) { exclude group: "org.apache.curator", module: 'curator-test' exclude group: 'junit' } - integrationImplementation libs.flink117.table.api.java.bridge - integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink117.get()}" + integrationImplementation libs.flink118.table.api.java.bridge + integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink118.get()}" integrationImplementation libs.hadoop2.common integrationImplementation libs.hadoop2.hdfs diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java index 3e574b841c8d..7d3777935ca8 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java @@ -416,7 +416,7 @@ private static void assertAvroEquals( Collection expectedArrayData = (Collection) expected; ArrayData actualArrayData; try { - actualArrayData = (ArrayData) actual; + actualArrayData = convertToArray(actual); } catch (ClassCastException e) { actualArrayData = new GenericArrayData((Object[]) actual); } @@ -462,6 +462,29 @@ private static void assertAvroEquals( } } + private static GenericArrayData convertToArray(Object actual) { + Class arrayClass = actual.getClass(); + if (Object[].class.equals(arrayClass)) { + return new GenericArrayData((Object[]) actual); + } else if (int[].class.equals(arrayClass)) { + return new GenericArrayData((int[]) actual); + } else if (long[].class.equals(arrayClass)) { + return new GenericArrayData((long[]) actual); + } else if (float[].class.equals(arrayClass)) { + return new GenericArrayData((float[]) actual); + } else if (double[].class.equals(arrayClass)) { + return new GenericArrayData((double[]) actual); + } else if (short[].class.equals(arrayClass)) { + return new GenericArrayData((short[]) actual); + } else if (byte[].class.equals(arrayClass)) { + return new GenericArrayData((byte[]) actual); + } else if (boolean[].class.equals(arrayClass)) { + return new GenericArrayData((boolean[]) actual); + } else { + throw new IllegalArgumentException("Unsupported type " + arrayClass); + } + } + private static void assertArrayValues( Type type, LogicalType logicalType, Collection expectedArray, ArrayData actualArray) { List expectedElements = Lists.newArrayList(expectedArray); diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java index cf244f410288..a805d160d809 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java @@ -26,6 +26,6 @@ public class TestFlinkPackage { /** This unit test would need to be adjusted as new Flink version is supported. */ @Test public void testVersion() { - Assert.assertEquals("1.17.1", FlinkPackage.version()); + Assert.assertEquals("1.18.0", FlinkPackage.version()); } } diff --git a/gradle.properties b/gradle.properties index 1bce26270354..84644da7c4ba 100644 --- a/gradle.properties +++ b/gradle.properties @@ -16,8 +16,8 @@ jmhOutputPath=build/reports/jmh/human-readable-output.txt jmhJsonOutputPath=build/reports/jmh/results.json jmhIncludeRegex=.* -systemProp.defaultFlinkVersions=1.17 -systemProp.knownFlinkVersions=1.15,1.16,1.17 +systemProp.defaultFlinkVersions=1.18 +systemProp.knownFlinkVersions=1.16,1.17,1.18 systemProp.defaultHiveVersions=2 systemProp.knownHiveVersions=2,3 systemProp.defaultSparkVersions=3.5 diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index f5c391fa95b5..29d5610f3c32 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -36,9 +36,9 @@ delta-core = "2.2.0" esotericsoftware-kryo = "4.0.2" errorprone-annotations = "2.23.0" findbugs-jsr305 = "3.0.2" -flink115 = { strictly = "[1.15, 1.16[", prefer = "1.15.0"} # see rich version usage explanation above flink116 = { strictly = "[1.16, 1.17[", prefer = "1.16.2"} flink117 = { strictly = "[1.17, 1.18[", prefer = "1.17.1"} +flink118 = { strictly = "[1.18, 1.19[", prefer = "1.18.0"} google-libraries-bom = "26.27.0" guava = "32.1.3-jre" hadoop2 = "2.7.3" @@ -104,12 +104,6 @@ calcite-druid = { module = "org.apache.calcite:calcite-druid", version.ref = "ca delta-standalone = { module = "io.delta:delta-standalone_2.12", version.ref = "delta-standalone" } errorprone-annotations = { module = "com.google.errorprone:error_prone_annotations", version.ref = "errorprone-annotations" } findbugs-jsr305 = { module = "com.google.code.findbugs:jsr305", version.ref = "findbugs-jsr305" } -flink115-avro = { module = "org.apache.flink:flink-avro", version.ref = "flink115" } -flink115-connector-base = { module = "org.apache.flink:flink-connector-base", version.ref = "flink115" } -flink115-connector-files = { module = "org.apache.flink:flink-connector-files", version.ref = "flink115" } -flink115-metrics-dropwizard = { module = "org.apache.flink:flink-metrics-dropwizard", version.ref = "flink115" } -flink115-streaming-java = { module = "org.apache.flink:flink-streaming-java", version.ref = "flink115" } -flink115-table-api-java-bridge = { module = "org.apache.flink:flink-table-api-java-bridge", version.ref = "flink115" } flink116-avro = { module = "org.apache.flink:flink-avro", version.ref = "flink116" } flink116-connector-base = { module = "org.apache.flink:flink-connector-base", version.ref = "flink116" } flink116-connector-files = { module = "org.apache.flink:flink-connector-files", version.ref = "flink116" } @@ -122,6 +116,12 @@ flink117-connector-files = { module = "org.apache.flink:flink-connector-files", flink117-metrics-dropwizard = { module = "org.apache.flink:flink-metrics-dropwizard", version.ref = "flink117" } flink117-streaming-java = { module = "org.apache.flink:flink-streaming-java", version.ref = "flink117" } flink117-table-api-java-bridge = { module = "org.apache.flink:flink-table-api-java-bridge", version.ref = "flink117" } +flink118-avro = { module = "org.apache.flink:flink-avro", version.ref = "flink118" } +flink118-connector-base = { module = "org.apache.flink:flink-connector-base", version.ref = "flink118" } +flink118-connector-files = { module = "org.apache.flink:flink-connector-files", version.ref = "flink118" } +flink118-metrics-dropwizard = { module = "org.apache.flink:flink-metrics-dropwizard", version.ref = "flink118" } +flink118-streaming-java = { module = "org.apache.flink:flink-streaming-java", version.ref = "flink118" } +flink118-table-api-java-bridge = { module = "org.apache.flink:flink-table-api-java-bridge", version.ref = "flink118" } google-libraries-bom = { module = "com.google.cloud:libraries-bom", version.ref = "google-libraries-bom" } guava-guava = { module = "com.google.guava:guava", version.ref = "guava" } hadoop2-client = { module = "org.apache.hadoop:hadoop-client", version.ref = "hadoop2" } @@ -171,11 +171,6 @@ assertj-core = { module = "org.assertj:assertj-core", version.ref = "assertj-cor awaitility = { module = "org.awaitility:awaitility", version.ref = "awaitility" } delta-core = { module = "io.delta:delta-core_2.12", version.ref = "delta-core" } esotericsoftware-kryo = { module = "com.esotericsoftware:kryo", version.ref = "esotericsoftware-kryo" } -flink115-connector-test-utils = { module = "org.apache.flink:flink-connector-test-utils", version.ref = "flink115" } -flink115-core = { module = "org.apache.flink:flink-core", version.ref = "flink115" } -flink115-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink115" } -flink115-test-utils = { module = "org.apache.flink:flink-test-utils", version.ref = "flink115" } -flink115-test-utilsjunit = { module = "org.apache.flink:flink-test-utils-junit", version.ref = "flink115" } flink116-connector-test-utils = { module = "org.apache.flink:flink-connector-test-utils", version.ref = "flink116" } flink116-core = { module = "org.apache.flink:flink-core", version.ref = "flink116" } flink116-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink116" } @@ -186,6 +181,11 @@ flink117-core = { module = "org.apache.flink:flink-core", version.ref = "flink11 flink117-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink117" } flink117-test-utils = { module = "org.apache.flink:flink-test-utils", version.ref = "flink117" } flink117-test-utilsjunit = { module = "org.apache.flink:flink-test-utils-junit", version.ref = "flink117" } +flink118-connector-test-utils = { module = "org.apache.flink:flink-connector-test-utils", version.ref = "flink118" } +flink118-core = { module = "org.apache.flink:flink-core", version.ref = "flink118" } +flink118-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink118" } +flink118-test-utils = { module = "org.apache.flink:flink-test-utils", version.ref = "flink118" } +flink118-test-utilsjunit = { module = "org.apache.flink:flink-test-utils-junit", version.ref = "flink118" } guava-testlib = { module = "com.google.guava:guava-testlib", version.ref = "guava" } jackson-annotations = { module = "com.fasterxml.jackson.core:jackson-annotations", version.ref = "jackson-annotations" } jackson-dataformat-xml = { module = "com.fasterxml.jackson.dataformat:jackson-dataformat-xml", version.ref = "jackson-dataformat-xml" } diff --git a/settings.gradle b/settings.gradle index 9ab130413077..a487fe1ffdac 100644 --- a/settings.gradle +++ b/settings.gradle @@ -106,15 +106,6 @@ if (!flinkVersions.isEmpty()) { project(':flink').name = 'iceberg-flink' } -if (flinkVersions.contains("1.15")) { - include ":iceberg-flink:flink-1.15" - include ":iceberg-flink:flink-runtime-1.15" - project(":iceberg-flink:flink-1.15").projectDir = file('flink/v1.15/flink') - project(":iceberg-flink:flink-1.15").name = "iceberg-flink-1.15" - project(":iceberg-flink:flink-runtime-1.15").projectDir = file('flink/v1.15/flink-runtime') - project(":iceberg-flink:flink-runtime-1.15").name = "iceberg-flink-runtime-1.15" -} - if (flinkVersions.contains("1.16")) { include ":iceberg-flink:flink-1.16" include ":iceberg-flink:flink-runtime-1.16" @@ -133,6 +124,15 @@ if (flinkVersions.contains("1.17")) { project(":iceberg-flink:flink-runtime-1.17").name = "iceberg-flink-runtime-1.17" } +if (flinkVersions.contains("1.18")) { + include ":iceberg-flink:flink-1.18" + include ":iceberg-flink:flink-runtime-1.18" + project(":iceberg-flink:flink-1.18").projectDir = file('flink/v1.18/flink') + project(":iceberg-flink:flink-1.18").name = "iceberg-flink-1.18" + project(":iceberg-flink:flink-runtime-1.18").projectDir = file('flink/v1.18/flink-runtime') + project(":iceberg-flink:flink-runtime-1.18").name = "iceberg-flink-runtime-1.18" +} + if (sparkVersions.contains("3.2")) { include ":iceberg-spark:spark-3.2_${scalaVersion}" include ":iceberg-spark:spark-extensions-3.2_${scalaVersion}"