Skip to content

Commit

Permalink
Make Flink 1.18 to work
Browse files Browse the repository at this point in the history
  • Loading branch information
rodmeneses committed Dec 5, 2023
1 parent 4579d64 commit 890a7ae
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 49 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flink-ci.yml
Expand Up @@ -60,7 +60,7 @@ jobs:
strategy:
matrix:
jvm: [8, 11]
flink: ['1.15', '1.16', '1.17']
flink: ['1.16', '1.17', '1.18']
env:
SPARK_LOCAL_IP: localhost
steps:
Expand Down
2 changes: 1 addition & 1 deletion dev/stage-binaries.sh
Expand Up @@ -19,7 +19,7 @@
#

SCALA_VERSION=2.12
FLINK_VERSIONS=1.15,1.16,1.17
FLINK_VERSIONS=1.16,1.17,1.18
SPARK_VERSIONS=3.2,3.3,3.4,3.5
HIVE_VERSIONS=2,3

Expand Down
8 changes: 4 additions & 4 deletions flink/build.gradle
Expand Up @@ -19,14 +19,14 @@

def flinkVersions = (System.getProperty("flinkVersions") != null ? System.getProperty("flinkVersions") : System.getProperty("defaultFlinkVersions")).split(",")

if (flinkVersions.contains("1.15")) {
apply from: file("$projectDir/v1.15/build.gradle")
}

if (flinkVersions.contains("1.16")) {
apply from: file("$projectDir/v1.16/build.gradle")
}

if (flinkVersions.contains("1.17")) {
apply from: file("$projectDir/v1.17/build.gradle")
}

if (flinkVersions.contains("1.18")) {
apply from: file("$projectDir/v1.18/build.gradle")
}
36 changes: 18 additions & 18 deletions flink/v1.18/build.gradle
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

String flinkMajorVersion = '1.17'
String flinkMajorVersion = '1.18'
String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")

project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
Expand All @@ -32,15 +32,15 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
implementation project(':iceberg-parquet')
implementation project(':iceberg-hive-metastore')

compileOnly libs.flink117.avro
compileOnly libs.flink118.avro
// for dropwizard histogram metrics implementation
compileOnly libs.flink117.metrics.dropwizard
compileOnly libs.flink117.streaming.java
compileOnly "${libs.flink117.streaming.java.get().module}:${libs.flink117.streaming.java.get().getVersion()}:tests"
compileOnly libs.flink117.table.api.java.bridge
compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink117.get()}"
compileOnly libs.flink117.connector.base
compileOnly libs.flink117.connector.files
compileOnly libs.flink118.metrics.dropwizard
compileOnly libs.flink118.streaming.java
compileOnly "${libs.flink118.streaming.java.get().module}:${libs.flink118.streaming.java.get().getVersion()}:tests"
compileOnly libs.flink118.table.api.java.bridge
compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink118.get()}"
compileOnly libs.flink118.connector.base
compileOnly libs.flink118.connector.files

compileOnly libs.hadoop2.hdfs
compileOnly libs.hadoop2.common
Expand All @@ -65,13 +65,13 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
exclude group: 'org.apache.hive', module: 'hive-storage-api'
}

testImplementation libs.flink117.connector.test.utils
testImplementation libs.flink117.core
testImplementation libs.flink117.runtime
testImplementation(libs.flink117.test.utilsjunit) {
testImplementation libs.flink118.connector.test.utils
testImplementation libs.flink118.core
testImplementation libs.flink118.runtime
testImplementation(libs.flink118.test.utilsjunit) {
exclude group: 'junit'
}
testImplementation(libs.flink117.test.utils) {
testImplementation(libs.flink118.test.utils) {
exclude group: "org.apache.curator", module: 'curator-test'
exclude group: 'junit'
}
Expand Down Expand Up @@ -164,7 +164,7 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
}

// for dropwizard histogram metrics implementation
implementation libs.flink117.metrics.dropwizard
implementation libs.flink118.metrics.dropwizard

// for integration testing with the flink-runtime-jar
// all of those dependencies are required because the integration test extends FlinkTestBase
Expand All @@ -174,13 +174,13 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
integrationImplementation project(path: ":iceberg-flink:iceberg-flink-${flinkMajorVersion}", configuration: "testArtifacts")
integrationImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
integrationImplementation(libs.flink117.test.utils) {
integrationImplementation(libs.flink118.test.utils) {
exclude group: "org.apache.curator", module: 'curator-test'
exclude group: 'junit'
}

integrationImplementation libs.flink117.table.api.java.bridge
integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink117.get()}"
integrationImplementation libs.flink118.table.api.java.bridge
integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink118.get()}"

integrationImplementation libs.hadoop2.common
integrationImplementation libs.hadoop2.hdfs
Expand Down
Expand Up @@ -416,7 +416,7 @@ private static void assertAvroEquals(
Collection<?> expectedArrayData = (Collection<?>) expected;
ArrayData actualArrayData;
try {
actualArrayData = (ArrayData) actual;
actualArrayData = convertToArray(actual);
} catch (ClassCastException e) {
actualArrayData = new GenericArrayData((Object[]) actual);
}
Expand Down Expand Up @@ -462,6 +462,29 @@ private static void assertAvroEquals(
}
}

private static GenericArrayData convertToArray(Object actual) {
Class<?> arrayClass = actual.getClass();
if (Object[].class.equals(arrayClass)) {
return new GenericArrayData((Object[]) actual);
} else if (int[].class.equals(arrayClass)) {
return new GenericArrayData((int[]) actual);
} else if (long[].class.equals(arrayClass)) {
return new GenericArrayData((long[]) actual);
} else if (float[].class.equals(arrayClass)) {
return new GenericArrayData((float[]) actual);
} else if (double[].class.equals(arrayClass)) {
return new GenericArrayData((double[]) actual);
} else if (short[].class.equals(arrayClass)) {
return new GenericArrayData((short[]) actual);
} else if (byte[].class.equals(arrayClass)) {
return new GenericArrayData((byte[]) actual);
} else if (boolean[].class.equals(arrayClass)) {
return new GenericArrayData((boolean[]) actual);
} else {
throw new IllegalArgumentException("Unsupported type " + arrayClass);
}
}

private static void assertArrayValues(
Type type, LogicalType logicalType, Collection<?> expectedArray, ArrayData actualArray) {
List<?> expectedElements = Lists.newArrayList(expectedArray);
Expand Down
Expand Up @@ -26,6 +26,6 @@ public class TestFlinkPackage {
/** This unit test would need to be adjusted as new Flink version is supported. */
@Test
public void testVersion() {
Assert.assertEquals("1.17.1", FlinkPackage.version());
Assert.assertEquals("1.18.0", FlinkPackage.version());
}
}
4 changes: 2 additions & 2 deletions gradle.properties
Expand Up @@ -16,8 +16,8 @@
jmhOutputPath=build/reports/jmh/human-readable-output.txt
jmhJsonOutputPath=build/reports/jmh/results.json
jmhIncludeRegex=.*
systemProp.defaultFlinkVersions=1.17
systemProp.knownFlinkVersions=1.15,1.16,1.17
systemProp.defaultFlinkVersions=1.18
systemProp.knownFlinkVersions=1.16,1.17,1.18
systemProp.defaultHiveVersions=2
systemProp.knownHiveVersions=2,3
systemProp.defaultSparkVersions=3.5
Expand Down
24 changes: 12 additions & 12 deletions gradle/libs.versions.toml
Expand Up @@ -36,9 +36,9 @@ delta-core = "2.2.0"
esotericsoftware-kryo = "4.0.2"
errorprone-annotations = "2.23.0"
findbugs-jsr305 = "3.0.2"
flink115 = { strictly = "[1.15, 1.16[", prefer = "1.15.0"} # see rich version usage explanation above
flink116 = { strictly = "[1.16, 1.17[", prefer = "1.16.2"}
flink117 = { strictly = "[1.17, 1.18[", prefer = "1.17.1"}
flink118 = { strictly = "[1.18, 1.19[", prefer = "1.18.0"}
google-libraries-bom = "26.27.0"
guava = "32.1.3-jre"
hadoop2 = "2.7.3"
Expand Down Expand Up @@ -104,12 +104,6 @@ calcite-druid = { module = "org.apache.calcite:calcite-druid", version.ref = "ca
delta-standalone = { module = "io.delta:delta-standalone_2.12", version.ref = "delta-standalone" }
errorprone-annotations = { module = "com.google.errorprone:error_prone_annotations", version.ref = "errorprone-annotations" }
findbugs-jsr305 = { module = "com.google.code.findbugs:jsr305", version.ref = "findbugs-jsr305" }
flink115-avro = { module = "org.apache.flink:flink-avro", version.ref = "flink115" }
flink115-connector-base = { module = "org.apache.flink:flink-connector-base", version.ref = "flink115" }
flink115-connector-files = { module = "org.apache.flink:flink-connector-files", version.ref = "flink115" }
flink115-metrics-dropwizard = { module = "org.apache.flink:flink-metrics-dropwizard", version.ref = "flink115" }
flink115-streaming-java = { module = "org.apache.flink:flink-streaming-java", version.ref = "flink115" }
flink115-table-api-java-bridge = { module = "org.apache.flink:flink-table-api-java-bridge", version.ref = "flink115" }
flink116-avro = { module = "org.apache.flink:flink-avro", version.ref = "flink116" }
flink116-connector-base = { module = "org.apache.flink:flink-connector-base", version.ref = "flink116" }
flink116-connector-files = { module = "org.apache.flink:flink-connector-files", version.ref = "flink116" }
Expand All @@ -122,6 +116,12 @@ flink117-connector-files = { module = "org.apache.flink:flink-connector-files",
flink117-metrics-dropwizard = { module = "org.apache.flink:flink-metrics-dropwizard", version.ref = "flink117" }
flink117-streaming-java = { module = "org.apache.flink:flink-streaming-java", version.ref = "flink117" }
flink117-table-api-java-bridge = { module = "org.apache.flink:flink-table-api-java-bridge", version.ref = "flink117" }
flink118-avro = { module = "org.apache.flink:flink-avro", version.ref = "flink118" }
flink118-connector-base = { module = "org.apache.flink:flink-connector-base", version.ref = "flink118" }
flink118-connector-files = { module = "org.apache.flink:flink-connector-files", version.ref = "flink118" }
flink118-metrics-dropwizard = { module = "org.apache.flink:flink-metrics-dropwizard", version.ref = "flink118" }
flink118-streaming-java = { module = "org.apache.flink:flink-streaming-java", version.ref = "flink118" }
flink118-table-api-java-bridge = { module = "org.apache.flink:flink-table-api-java-bridge", version.ref = "flink118" }
google-libraries-bom = { module = "com.google.cloud:libraries-bom", version.ref = "google-libraries-bom" }
guava-guava = { module = "com.google.guava:guava", version.ref = "guava" }
hadoop2-client = { module = "org.apache.hadoop:hadoop-client", version.ref = "hadoop2" }
Expand Down Expand Up @@ -171,11 +171,6 @@ assertj-core = { module = "org.assertj:assertj-core", version.ref = "assertj-cor
awaitility = { module = "org.awaitility:awaitility", version.ref = "awaitility" }
delta-core = { module = "io.delta:delta-core_2.12", version.ref = "delta-core" }
esotericsoftware-kryo = { module = "com.esotericsoftware:kryo", version.ref = "esotericsoftware-kryo" }
flink115-connector-test-utils = { module = "org.apache.flink:flink-connector-test-utils", version.ref = "flink115" }
flink115-core = { module = "org.apache.flink:flink-core", version.ref = "flink115" }
flink115-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink115" }
flink115-test-utils = { module = "org.apache.flink:flink-test-utils", version.ref = "flink115" }
flink115-test-utilsjunit = { module = "org.apache.flink:flink-test-utils-junit", version.ref = "flink115" }
flink116-connector-test-utils = { module = "org.apache.flink:flink-connector-test-utils", version.ref = "flink116" }
flink116-core = { module = "org.apache.flink:flink-core", version.ref = "flink116" }
flink116-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink116" }
Expand All @@ -186,6 +181,11 @@ flink117-core = { module = "org.apache.flink:flink-core", version.ref = "flink11
flink117-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink117" }
flink117-test-utils = { module = "org.apache.flink:flink-test-utils", version.ref = "flink117" }
flink117-test-utilsjunit = { module = "org.apache.flink:flink-test-utils-junit", version.ref = "flink117" }
flink118-connector-test-utils = { module = "org.apache.flink:flink-connector-test-utils", version.ref = "flink118" }
flink118-core = { module = "org.apache.flink:flink-core", version.ref = "flink118" }
flink118-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink118" }
flink118-test-utils = { module = "org.apache.flink:flink-test-utils", version.ref = "flink118" }
flink118-test-utilsjunit = { module = "org.apache.flink:flink-test-utils-junit", version.ref = "flink118" }
guava-testlib = { module = "com.google.guava:guava-testlib", version.ref = "guava" }
jackson-annotations = { module = "com.fasterxml.jackson.core:jackson-annotations", version.ref = "jackson-annotations" }
jackson-dataformat-xml = { module = "com.fasterxml.jackson.dataformat:jackson-dataformat-xml", version.ref = "jackson-dataformat-xml" }
Expand Down
18 changes: 9 additions & 9 deletions settings.gradle
Expand Up @@ -106,15 +106,6 @@ if (!flinkVersions.isEmpty()) {
project(':flink').name = 'iceberg-flink'
}

if (flinkVersions.contains("1.15")) {
include ":iceberg-flink:flink-1.15"
include ":iceberg-flink:flink-runtime-1.15"
project(":iceberg-flink:flink-1.15").projectDir = file('flink/v1.15/flink')
project(":iceberg-flink:flink-1.15").name = "iceberg-flink-1.15"
project(":iceberg-flink:flink-runtime-1.15").projectDir = file('flink/v1.15/flink-runtime')
project(":iceberg-flink:flink-runtime-1.15").name = "iceberg-flink-runtime-1.15"
}

if (flinkVersions.contains("1.16")) {
include ":iceberg-flink:flink-1.16"
include ":iceberg-flink:flink-runtime-1.16"
Expand All @@ -133,6 +124,15 @@ if (flinkVersions.contains("1.17")) {
project(":iceberg-flink:flink-runtime-1.17").name = "iceberg-flink-runtime-1.17"
}

if (flinkVersions.contains("1.18")) {
include ":iceberg-flink:flink-1.18"
include ":iceberg-flink:flink-runtime-1.18"
project(":iceberg-flink:flink-1.18").projectDir = file('flink/v1.18/flink')
project(":iceberg-flink:flink-1.18").name = "iceberg-flink-1.18"
project(":iceberg-flink:flink-runtime-1.18").projectDir = file('flink/v1.18/flink-runtime')
project(":iceberg-flink:flink-runtime-1.18").name = "iceberg-flink-runtime-1.18"
}

if (sparkVersions.contains("3.2")) {
include ":iceberg-spark:spark-3.2_${scalaVersion}"
include ":iceberg-spark:spark-extensions-3.2_${scalaVersion}"
Expand Down

0 comments on commit 890a7ae

Please sign in to comment.