From 98c8bac9c8ee9e960104afe408602a1d69d06e95 Mon Sep 17 00:00:00 2001 From: Tobias Kaymak Date: Thu, 23 Apr 2026 09:00:12 +0000 Subject: [PATCH 1/4] [runners-flink] Use index-based version comparison in flink_runner.gradle `previous_versions` was computed by lexicographic string comparison against `flink_major`: def previous_versions = all_versions.findAll { it < flink_major } This breaks for `flink_versions=1.17,1.18,1.19,1.20,2.0` (the current gradle.properties value) because lexicographic ordering disagrees with semantic ordering whenever a two-digit minor crosses a single-digit boundary. Concretely, today this resolves to: flink_major=1.17 -> [] (correct) flink_major=1.18 -> ["1.17", "1.20"] (wrong; should be ["1.17"]) flink_major=1.19 -> ["1.17", "1.18", "1.20"] (wrong; should be ["1.17", "1.18"]) flink_major=1.20 -> [] (wrong; should be ["1.17", "1.18", "1.19"]) flink_major=2.0 -> ["1.17", "1.18", "1.19", "1.20"](correct) The 1.18 and 1.19 builds therefore pick up `runners/flink/1.20/src/main/java/.../DoFnOperator.java` as a "previous" override, and the 1.20 build loses the 1.19 test overrides (MemoryStateBackendWrapper, StreamSources). Replace the lex compare with index lookup against `flink_versions`, exactly like spark_runner.gradle does (PR #38233). Also: - Trim whitespace from each entry (defensive against `flink_versions=1.17, 1.18, ...`). - Throw a clear GradleException if `flink_major` isn't listed in `flink_versions`, instead of silently producing an empty `previous_versions` list. - Update `use_override` to derive from the same index, mirroring the Spark version. --- runners/flink/flink_runner.gradle | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index 2047e4d556c5..a7fa99d8dc96 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -32,9 +32,16 @@ def overrides(versions, type, base_path, group='java') { ["${base_path}/src/${type}/${group}"] + versions.collect { "${base_path}/${it}/src/${type}/${group}" } + ["./src/${type}/${group}"] } -def all_versions = flink_versions.split(",") - -def previous_versions = all_versions.findAll { it < flink_major } +def all_versions = flink_versions.split(",").collect { it.trim() } +// Determine version order by list position rather than string comparison so two-digit +// minors (e.g. "1.20") sort correctly relative to "1.18" / "1.19". +def flink_major_index = all_versions.indexOf(flink_major) +if (flink_major_index < 0) { + throw new GradleException( + "flink_major='${flink_major}' is not listed in flink_versions='${flink_versions}' " + + "(see root gradle.properties).") +} +def previous_versions = flink_major_index > 0 ? all_versions.subList(0, flink_major_index) : [] // Version specific code overrides. def main_source_overrides = overrides(previous_versions, "main", base_path) @@ -106,7 +113,7 @@ def copyTestResourcesOverrides = tasks.register('copyTestResourcesOverrides', Co it.duplicatesStrategy DuplicatesStrategy.INCLUDE } -def use_override = (flink_major != all_versions.first()) +def use_override = (flink_major_index > 0) def sourceBase = "${project.projectDir}/../src" if (use_override) { From 89cf8ee6a30e2897d857426691fa9250f16eb7c5 Mon Sep 17 00:00:00 2001 From: Tobias Kaymak Date: Thu, 23 Apr 2026 12:12:27 +0000 Subject: [PATCH 2/4] Trigger Build From 9834e3d6bf5a65d7578c76f1e52b3c6a27f413d1 Mon Sep 17 00:00:00 2001 From: Tobias Kaymak Date: Thu, 23 Apr 2026 14:15:30 +0000 Subject: [PATCH 3/4] [runners-flink] Address Gemini review on flink_runner.gradle - Trim/stringify flink_major before indexOf so command-line / GString values match the trimmed all_versions entries. - Drop redundant ternary on previous_versions; subList(0, 0) is already empty when flink_major is the first listed version. --- runners/flink/flink_runner.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index a7fa99d8dc96..98e7d547b4d3 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -35,13 +35,13 @@ def overrides(versions, type, base_path, group='java') { def all_versions = flink_versions.split(",").collect { it.trim() } // Determine version order by list position rather than string comparison so two-digit // minors (e.g. "1.20") sort correctly relative to "1.18" / "1.19". -def flink_major_index = all_versions.indexOf(flink_major) +def flink_major_index = all_versions.indexOf(flink_major?.toString()?.trim()) if (flink_major_index < 0) { throw new GradleException( "flink_major='${flink_major}' is not listed in flink_versions='${flink_versions}' " + "(see root gradle.properties).") } -def previous_versions = flink_major_index > 0 ? all_versions.subList(0, flink_major_index) : [] +def previous_versions = all_versions.subList(0, flink_major_index) // Version specific code overrides. def main_source_overrides = overrides(previous_versions, "main", base_path) From 054c4e3b394098d02f56c6edb5c5523ae10bd74d Mon Sep 17 00:00:00 2001 From: Tobias Kaymak Date: Thu, 23 Apr 2026 19:27:10 +0000 Subject: [PATCH 4/4] Trigger Build