From 8ae7986e1dca80a686b678feb4ea3bdfff4a19bb Mon Sep 17 00:00:00 2001 From: sunxia Date: Wed, 5 Jun 2024 19:17:40 +0800 Subject: [PATCH] [FLINK-35522][runtime] Fix the issue that the source task may get stuck in speculative execution mode. --- .../coordinator/SourceCoordinatorContext.java | 12 ++--- .../SpeculativeSchedulerITCase.java | 53 ++++++++++++++++++- 2 files changed, 57 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java index fdcb40bde0c79..8e1660c365872 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java @@ -606,14 +606,14 @@ private void sendCachedSplitsToNewlyRegisteredReader(int subtaskIndex, int attem assignmentTracker.uncheckpointedAssignments().get(subtaskIndex); if (cachedSplits != null) { - if (supportsConcurrentExecutionAttempts) { - assignSplitsToAttempt(subtaskIndex, attemptNumber, new ArrayList<>(cachedSplits)); - if (hasNoMoreSplits(subtaskIndex)) { - signalNoMoreSplitsToAttempt(subtaskIndex, attemptNumber); - } - } else { + if (!supportsConcurrentExecutionAttempts) { throw new IllegalStateException("No cached split is expected."); } + assignSplitsToAttempt(subtaskIndex, attemptNumber, new ArrayList<>(cachedSplits)); + } + + if (supportsConcurrentExecutionAttempts && hasNoMoreSplits(subtaskIndex)) { + signalNoMoreSplitsToAttempt(subtaskIndex, attemptNumber); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java index fd493575bfc9e..ae2d5207106e6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java @@ -34,7 +34,10 @@ import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.lib.NumberSequenceSource; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator; import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader; import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit; import org.apache.flink.api.java.tuple.Tuple3; @@ -71,6 +74,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -174,6 +178,13 @@ void testSpeculativeExecutionOfNewSource() throws Exception { checkResults(); } + @Test + void testSpeculativeExecutionOfNewSourceWithFailure() throws Exception { + executeJob(env -> setupJobWithSlowNewSource(env, true)); + waitUntilJobArchived(); + checkResults(); + } + @Test public void testSpeculativeSlowSink() throws Exception { executeJob(this::setupSpeculativeSlowSink); @@ -333,9 +344,14 @@ private void setupJobWithSlowInputFormatSource(StreamExecutionEnvironment env) { } private void setupJobWithSlowNewSource(StreamExecutionEnvironment env) { + setupJobWithSlowNewSource(env, false); + } + + private void setupJobWithSlowNewSource( + StreamExecutionEnvironment env, boolean forceFailureFlag) { final DataStream source = env.fromSource( - new TestingNumberSequenceSource(), + new TestingNumberSequenceSource(forceFailureFlag), WatermarkStrategy.noWatermarks(), "source"); addSink(source); @@ -480,8 +496,18 @@ public void open(GenericInputSplit split) throws IOException { } private static class TestingNumberSequenceSource extends NumberSequenceSource { - private TestingNumberSequenceSource() { + + private final boolean forceFailureFlag; + // When forceFailureCounter > 0, the source task will throw an exception on reader close + // until forceFailureCounter reaches 0. + public static AtomicInteger forceFailureCounter = new AtomicInteger(0); + + private TestingNumberSequenceSource(boolean forceFailureFlag) { super(0, NUMBERS_TO_PRODUCE - 1); + this.forceFailureFlag = forceFailureFlag; + if (forceFailureFlag) { + forceFailureCounter = new AtomicInteger(1); + } } @Override @@ -489,6 +515,21 @@ public SourceReader createReader( SourceReaderContext readerContext) { return new TestingIteratorSourceReader(readerContext); } + + @Override + public SplitEnumerator> + createEnumerator(final SplitEnumeratorContext enumContext) { + + int splitSize = enumContext.currentParallelism(); + // Simulating the case that the splits number less than the parallelism to verify + // unassigned source tasks and failover. + if (forceFailureFlag) { + splitSize = 1; + } + final List splits = + splitNumberRange(0, NUMBERS_TO_PRODUCE - 1, splitSize); + return new IteratorSourceEnumerator<>(enumContext, splits); + } } private static class TestingIteratorSourceReader< @@ -504,6 +545,14 @@ public InputStatus pollNext(ReaderOutput output) { maybeSleep(); return super.pollNext(output); } + + @Override + public void close() throws Exception { + if (TestingNumberSequenceSource.forceFailureCounter.get() > 0) { + TestingNumberSequenceSource.forceFailureCounter.decrementAndGet(); + throw new RuntimeException("Forced failure for testing"); + } + } } private static class NumberCounterSink extends RichSinkFunction {