Skip to content

Commit

Permalink
[FLINK-35522][runtime] Fix the issue that the source task may get stu…
Browse files Browse the repository at this point in the history
…ck in speculative execution mode.
  • Loading branch information
SinBex authored and zhuzhurk committed Jun 7, 2024
1 parent 0c4f641 commit 8ae7986
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -606,14 +606,14 @@ private void sendCachedSplitsToNewlyRegisteredReader(int subtaskIndex, int attem
assignmentTracker.uncheckpointedAssignments().get(subtaskIndex);

if (cachedSplits != null) {
if (supportsConcurrentExecutionAttempts) {
assignSplitsToAttempt(subtaskIndex, attemptNumber, new ArrayList<>(cachedSplits));
if (hasNoMoreSplits(subtaskIndex)) {
signalNoMoreSplitsToAttempt(subtaskIndex, attemptNumber);
}
} else {
if (!supportsConcurrentExecutionAttempts) {
throw new IllegalStateException("No cached split is expected.");
}
assignSplitsToAttempt(subtaskIndex, attemptNumber, new ArrayList<>(cachedSplits));
}

if (supportsConcurrentExecutionAttempts && hasNoMoreSplits(subtaskIndex)) {
signalNoMoreSplitsToAttempt(subtaskIndex, attemptNumber);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
import org.apache.flink.api.java.tuple.Tuple3;
Expand Down Expand Up @@ -71,6 +74,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -174,6 +178,13 @@ void testSpeculativeExecutionOfNewSource() throws Exception {
checkResults();
}

@Test
void testSpeculativeExecutionOfNewSourceWithFailure() throws Exception {
executeJob(env -> setupJobWithSlowNewSource(env, true));
waitUntilJobArchived();
checkResults();
}

@Test
public void testSpeculativeSlowSink() throws Exception {
executeJob(this::setupSpeculativeSlowSink);
Expand Down Expand Up @@ -333,9 +344,14 @@ private void setupJobWithSlowInputFormatSource(StreamExecutionEnvironment env) {
}

private void setupJobWithSlowNewSource(StreamExecutionEnvironment env) {
setupJobWithSlowNewSource(env, false);
}

private void setupJobWithSlowNewSource(
StreamExecutionEnvironment env, boolean forceFailureFlag) {
final DataStream<Long> source =
env.fromSource(
new TestingNumberSequenceSource(),
new TestingNumberSequenceSource(forceFailureFlag),
WatermarkStrategy.noWatermarks(),
"source");
addSink(source);
Expand Down Expand Up @@ -480,15 +496,40 @@ public void open(GenericInputSplit split) throws IOException {
}

private static class TestingNumberSequenceSource extends NumberSequenceSource {
private TestingNumberSequenceSource() {

private final boolean forceFailureFlag;
// When forceFailureCounter > 0, the source task will throw an exception on reader close
// until forceFailureCounter reaches 0.
public static AtomicInteger forceFailureCounter = new AtomicInteger(0);

private TestingNumberSequenceSource(boolean forceFailureFlag) {
super(0, NUMBERS_TO_PRODUCE - 1);
this.forceFailureFlag = forceFailureFlag;
if (forceFailureFlag) {
forceFailureCounter = new AtomicInteger(1);
}
}

@Override
public SourceReader<Long, NumberSequenceSplit> createReader(
SourceReaderContext readerContext) {
return new TestingIteratorSourceReader(readerContext);
}

@Override
public SplitEnumerator<NumberSequenceSplit, Collection<NumberSequenceSplit>>
createEnumerator(final SplitEnumeratorContext<NumberSequenceSplit> enumContext) {

int splitSize = enumContext.currentParallelism();
// Simulating the case that the splits number less than the parallelism to verify
// unassigned source tasks and failover.
if (forceFailureFlag) {
splitSize = 1;
}
final List<NumberSequenceSplit> splits =
splitNumberRange(0, NUMBERS_TO_PRODUCE - 1, splitSize);
return new IteratorSourceEnumerator<>(enumContext, splits);
}
}

private static class TestingIteratorSourceReader<
Expand All @@ -504,6 +545,14 @@ public InputStatus pollNext(ReaderOutput<E> output) {
maybeSleep();
return super.pollNext(output);
}

@Override
public void close() throws Exception {
if (TestingNumberSequenceSource.forceFailureCounter.get() > 0) {
TestingNumberSequenceSource.forceFailureCounter.decrementAndGet();
throw new RuntimeException("Forced failure for testing");
}
}
}

private static class NumberCounterSink extends RichSinkFunction<Long> {
Expand Down

0 comments on commit 8ae7986

Please sign in to comment.