From d2fa5774174fbb1cacca44dc2867ad62f3b954fd Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Mon, 6 Apr 2026 17:18:47 -0400 Subject: [PATCH] Ensure LazyFlinkSourceSplitEnumerator handle splits happens after initialized --- .../beam_PostCommit_Java_Jpms_Flink_Java11.json | 2 +- .../source/LazyFlinkSourceSplitEnumerator.java | 17 ++++++++++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_Jpms_Flink_Java11.json b/.github/trigger_files/beam_PostCommit_Java_Jpms_Flink_Java11.json index 9c289f9b3c75..bef52adc5ab5 100644 --- a/.github/trigger_files/beam_PostCommit_Java_Jpms_Flink_Java11.json +++ b/.github/trigger_files/beam_PostCommit_Java_Jpms_Flink_Java11.json @@ -1,3 +1,3 @@ { - "revision": 1 + "revision": 2 } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/LazyFlinkSourceSplitEnumerator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/LazyFlinkSourceSplitEnumerator.java index f5cd53c42ffa..94c14b2999b9 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/LazyFlinkSourceSplitEnumerator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/LazyFlinkSourceSplitEnumerator.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import javax.annotation.Nullable; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.sdk.io.BoundedSource; @@ -53,7 +54,8 @@ public class LazyFlinkSourceSplitEnumerator private final PipelineOptions pipelineOptions; private final int numSplits; private final List> pendingSplits; - private boolean splitsInitialized; + private volatile boolean splitsInitialized; + private final CountDownLatch initializationLatch = new CountDownLatch(1); public LazyFlinkSourceSplitEnumerator( SplitEnumeratorContext> context, @@ -90,6 +92,8 @@ public void initializeSplits() { return pendingSplits; } catch (Exception e) { throw new RuntimeException(e); + } finally { + initializationLatch.countDown(); } }, (sourceSplits, error) -> { @@ -97,6 +101,7 @@ public void initializeSplits() { pendingSplits.addAll(sourceSplits); throw new RuntimeException("Failed to start source enumerator.", error); } + splitsInitialized = true; }); } @@ -113,6 +118,16 @@ public void handleSplitRequest(int subtask, @Nullable String hostname) { LOG.info("Subtask {} {} is requesting a file source split", subtask, hostInfo); } + if (!splitsInitialized) { + try { + initializationLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while waiting for splits initialization", e); + return; + } + } + if (!pendingSplits.isEmpty()) { final FlinkSourceSplit split = pendingSplits.remove(pendingSplits.size() - 1); context.assignSplit(split, subtask);