Skip to content

Commit

Permalink
[BEAM-8566] Do not swallow execution errors during checkpointing
Browse files Browse the repository at this point in the history
If a bundle fails to finalize before creating a checkpoint, it may be swallowed
and just considered a checkpointing error. This breaks the execution flow and
exactly-once guarantees.
  • Loading branch information
mxm committed Nov 6, 2019
1 parent 9ad416e commit 8af8617
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 5 deletions.
Expand Up @@ -763,12 +763,19 @@ public final void snapshotState(StateSnapshotContext context) throws Exception {

// We can't output here anymore because the checkpoint barrier has already been
// sent downstream. This is going to change with 1.6/1.7's prepareSnapshotBarrier.
outputManager.openBuffer();
// Ensure that no new bundle gets started as part of finishing a bundle
while (bundleStarted.get()) {
invokeFinishBundle();
try {
outputManager.openBuffer();
// Ensure that no new bundle gets started as part of finishing a bundle
while (bundleStarted.get()) {
invokeFinishBundle();
}
outputManager.closeBuffer();
} catch (Exception e) {
// Any regular exception during checkpointing will be tolerated by Flink because those
// typically do not affect the execution flow. We need to fail hard here because errors
// in bundle execution are application errors which are not related to checkpointing.
throw new Error("Checkpointing failed because bundle failed to finalize.", e);
}
outputManager.closeBuffer();

super.snapshotState(context);
}
Expand Down
Expand Up @@ -26,6 +26,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.databind.util.LRUMap;
Expand Down Expand Up @@ -1722,6 +1723,63 @@ public void processElement(ProcessContext context) {
Collections.emptyMap());
}

@Test
public void testBundleProcessingExceptionIsFatalDuringCheckpointing() throws Exception {
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setMaxBundleSize(10L);
options.setCheckpointingInterval(1L);

TupleTag<String> outputTag = new TupleTag<>("main-output");

StringUtf8Coder coder = StringUtf8Coder.of();
WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
WindowedValue.getValueOnlyCoder(coder);

DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
new DoFnOperator.MultiOutputOutputManagerFactory(
outputTag,
WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE));

@SuppressWarnings("unchecked")
DoFnOperator doFnOperator =
new DoFnOperator<>(
new IdentityDoFn() {
@FinishBundle
public void finishBundle() {
throw new RuntimeException("something went wrong here");
}
},
"stepName",
windowedValueCoder,
null,
Collections.emptyMap(),
outputTag,
Collections.emptyList(),
outputManagerFactory,
WindowingStrategy.globalDefault(),
new HashMap<>(), /* side-input mapping */
Collections.emptyList(), /* side inputs */
options,
null,
null,
DoFnSchemaInformation.create(),
Collections.emptyMap());

@SuppressWarnings("unchecked")
OneInputStreamOperatorTestHarness<WindowedValue<String>, WindowedValue<String>> testHarness =
new OneInputStreamOperatorTestHarness<>(doFnOperator);

testHarness.open();

// start a bundle
testHarness.processElement(
new StreamRecord<>(WindowedValue.valueInGlobalWindow("regular element")));

// Make sure we throw Error, not a regular Exception.
// A regular exception would just cause the checkpoint to fail.
assertThrows(Error.class, () -> testHarness.snapshot(0, 0));
}

/**
* Ensures Jackson cache is cleaned to get rid of any references to the Flink Classloader. See
* https://jira.apache.org/jira/browse/BEAM-6460
Expand Down

0 comments on commit 8af8617

Please sign in to comment.