Skip to content

Commit

Permalink
[FLINK-20079][task] Initialize operator chain before upstream partiti…
Browse files Browse the repository at this point in the history
…on request
  • Loading branch information
pnowojski authored and AHeise committed Nov 11, 2020
1 parent 0609918 commit bf16a7d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 23 deletions.
Expand Up @@ -499,32 +499,29 @@ protected void beforeInvoke() throws Exception {
// we need to make sure that any triggers scheduled in open() cannot be
// executed before all operators are opened
actionExecutor.runThrowing(() -> {
// both the following operations are protected by the lock
// so that we avoid race conditions in the case that initializeState()
// registers a timer, that fires before the open() is called.
readRecoveredChannelState(); // WARN: should be done before operatorChain.initializeStateAndOpenOperators (see FLINK-19907)

SequentialChannelStateReader reader = getEnvironment().getTaskStateManager().getSequentialChannelStateReader();
reader.readOutputData(getEnvironment().getAllWriters(), !configuration.isGraphContainingLoops());

operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
});

isRunning = true;
}
channelIOExecutor.execute(() -> {
try {
reader.readInputData(getEnvironment().getAllInputGates());
} catch (Exception e) {
asyncExceptionHandler.handleAsyncException("Unable to read channel state", e);
}
});

private void readRecoveredChannelState() throws IOException, InterruptedException {
SequentialChannelStateReader reader = getEnvironment().getTaskStateManager().getSequentialChannelStateReader();
reader.readOutputData(getEnvironment().getAllWriters(), !configuration.isGraphContainingLoops());
channelIOExecutor.execute(() -> {
try {
reader.readInputData(getEnvironment().getAllInputGates());
} catch (Exception e) {
asyncExceptionHandler.handleAsyncException("Unable to read channel state", e);
for (InputGate inputGate : getEnvironment().getAllInputGates()) {
inputGate
.getStateConsumedFuture()
.thenRun(() -> mainMailboxExecutor.execute(inputGate::requestPartitions, "Input gate request partitions"));
}

});
for (InputGate inputGate : getEnvironment().getAllInputGates()) {
inputGate
.getStateConsumedFuture()
.thenRun(() -> mainMailboxExecutor.execute(inputGate::requestPartitions, "Input gate request partitions"));
}

isRunning = true;
}

@Override
Expand Down
Expand Up @@ -80,6 +80,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -211,7 +212,7 @@ private LocalStreamEnvironment createEnv(int parallelism, int slotsPerTaskManage

final LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, conf);
env.enableCheckpointing(100);
env.getCheckpointConfig().setAlignmentTimeout(0);
env.getCheckpointConfig().setAlignmentTimeout(1);
env.setParallelism(parallelism);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(EXPECTED_FAILURES, Time.milliseconds(100)));
env.getCheckpointConfig().enableUnalignedCheckpoints(true);
Expand Down Expand Up @@ -462,6 +463,7 @@ private static class VerifyingSink extends RichSinkFunction<Long> implements Che
private ListState<State> stateList;
private State state;
private final long minCheckpoints;
private Random random = new Random();

private VerifyingSink(long minCheckpoints) {
this.minCheckpoints = minCheckpoints;
Expand All @@ -470,6 +472,7 @@ private VerifyingSink(long minCheckpoints) {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
random = new Random();
getRuntimeContext().addAccumulator(NUM_OUTPUTS, numOutputCounter);
getRuntimeContext().addAccumulator(NUM_OUT_OF_ORDER, outOfOrderCounter);
getRuntimeContext().addAccumulator(NUM_DUPLICATES, duplicatesCounter);
Expand Down Expand Up @@ -531,8 +534,10 @@ public void invoke(Long value, Context context) throws Exception {
state.numOutput++;

if (state.completedCheckpoints < minCheckpoints) {
// induce heavy backpressure until enough checkpoints have been written
Thread.sleep(0, 100_000);
// induce backpressure until enough checkpoints have been written
if (random.nextInt(1000) == 42) {
Thread.sleep(1);
}
}
// after all checkpoints have been completed, the remaining data should be flushed out fairly quickly
}
Expand Down

0 comments on commit bf16a7d

Please sign in to comment.