From 35015eac825c435246ce51b7b3a35d758d871b94 Mon Sep 17 00:00:00 2001 From: Chase Date: Wed, 29 Apr 2026 23:31:26 +0900 Subject: [PATCH 1/2] [FLINK-30514][connector-base] fix hybrid source reader initialization order --- .../base/source/hybrid/HybridSourceReader.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java index 18c049f7c053c..8c7ae24a8e4da 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java @@ -225,15 +225,9 @@ private void setCurrentReader(int index) { } catch (Exception e) { throw new RuntimeException("Failed tp create reader", e); } - reader.start(); currentSourceIndex = index; currentReader = reader; - availabilityFuture.complete(null); - LOG.debug( - "Reader started: subtask={} sourceIndex={} {}", - readerContext.getIndexOfSubtask(), - currentSourceIndex, - reader); + // add restored splits if (!restoredSplits.isEmpty()) { List splits = new ArrayList<>(restoredSplits.size()); @@ -247,6 +241,14 @@ private void setCurrentReader(int index) { } addSplits(splits); } + + reader.start(); + availabilityFuture.complete(null); + LOG.debug( + "Reader started: subtask={} sourceIndex={} {}", + readerContext.getIndexOfSubtask(), + currentSourceIndex, + reader); } @Override From 2463ee9c09575b70d802b4fbed386209fe886be4 Mon Sep 17 00:00:00 2001 From: Chase Date: Wed, 29 Apr 2026 23:57:19 +0900 Subject: [PATCH 2/2] [FLINK-30514][connector-base] update unit tests for HybridSourceReader switch methods order --- .../source/hybrid/HybridSourceReader.java | 1 - .../source/hybrid/HybridSourceReaderTest.java | 50 +++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java index 8c7ae24a8e4da..e7ffb83c3f2f1 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java @@ -227,7 +227,6 @@ private void setCurrentReader(int index) { } currentSourceIndex = index; currentReader = reader; - // add restored splits if (!restoredSplits.isEmpty()) { List splits = new ArrayList<>(restoredSplits.size()); diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java index 7cf1a63fa9b60..71b33d88cc48a 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java @@ -36,6 +36,7 @@ import org.apache.flink.mock.Whitebox; import org.junit.jupiter.api.Test; +import org.mockito.InOrder; import org.mockito.Mockito; import java.util.Collections; @@ -281,6 +282,55 @@ void testReaderRecovery() throws Exception { reader.close(); } + @Test + void testReaderRecoveryInitializationOrder() throws Exception { + TestingReaderContext readerContext = new TestingReaderContext(); + MockBaseSource source = new MockBaseSource(1, 1, Boundedness.BOUNDED); + + // First pass: create a snapshot with an in-progress split + HybridSourceReader reader = new HybridSourceReader<>(readerContext); + reader.start(); + assertAndClearSourceReaderFinishedEvent(readerContext, -1); + reader.handleSourceEvents(new SwitchSourceEvent(0, source, false)); + + MockSourceSplit mockSplit = new MockSourceSplit(0, 0, 2147483647); + SwitchedSources switchedSources = new SwitchedSources(); + switchedSources.put(0, source); + HybridSourceSplit hybridSplit = HybridSourceSplit.wrapSplit(mockSplit, 0, switchedSources); + reader.addSplits(Collections.singletonList(hybridSplit)); + List snapshot = reader.snapshotState(0); + reader.close(); + + // Recovery: capture the underlying reader as a spy to verify call order + readerContext.clearSentEvents(); + SourceReader[] spyHolder = new SourceReader[1]; + Source spySource = + new MockSource(null, 0) { + @Override + public SourceReader createReader( + SourceReaderContext ctx) { + SourceReader spy = + Mockito.spy(source.createReader(ctx)); + spyHolder[0] = spy; + return spy; + } + }; + + HybridSourceReader recoveredReader = new HybridSourceReader<>(readerContext); + recoveredReader.addSplits(snapshot); + recoveredReader.start(); + assertAndClearSourceReaderFinishedEvent(readerContext, -1); + + recoveredReader.handleSourceEvents(new SwitchSourceEvent(0, spySource, false)); + + // Verify addSplits was called before start on the underlying reader + InOrder inOrder = Mockito.inOrder(spyHolder[0]); + inOrder.verify(spyHolder[0]).addSplits(Mockito.anyList()); + inOrder.verify(spyHolder[0]).start(); + + recoveredReader.close(); + } + @Test void testDefaultMethodDelegation() throws Exception { TestingReaderContext readerContext = new TestingReaderContext();