Skip to content

Conversation

@AHeise
Copy link
Contributor

@AHeise AHeise commented Mar 17, 2022

What is the purpose of the change

Externally induced sources are currently ill-defined and have a lot of unnecessary limitations. This PR addresses it by explicitly only holding back checkpoint barriers until the external source induces it.

While this approach seemingly limits the way externally induce sources work (they can't trigger a checkpoint on their own anymore), it actually explicitly supports the only plausible way. Sources simply can't trigger a checkpoint on their own - the checkpoint coordinator needs to track it and multiple sources need the coordinator to work at all (or else they deadlock).

Brief change log

  • Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.
  • Clarify the contract.
  • Migrate related tests to JUnit5 and AssertJ.

Verifying this change

Expanded the original unit test to check that the barrier is correctly relayed.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 17, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@AHeise AHeise force-pushed the FLINK-25256 branch 2 times, most recently from 8ded29b to 99ed2d6 Compare March 17, 2022 22:22
Comment on lines 204 to 217
// note that at this point, we should probably not emit more data such that data is
// properly aligned
// however, unless we receive a reliable checkpoint abort RPC, this may deadlock
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably discuss if this is the best choice.

Comment on lines 133 to 134
// cleanup any old checkpoint that was cancelled before trigger
triggeredCheckpoints.headSet(checkpointMetaData.getCheckpointId()).clear();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleanup (and the one in #trigger) don't work well with concurrent checkpoints. Do we have a way to determine max concurrent checkpoints or can we actually rely on abortCheckpoint?

@AHeise AHeise marked this pull request as ready for review March 18, 2022 14:46
new UntriggeredCheckpoint(checkpointMetaData, checkpointOptions));
triggerFuture.complete(isRunning());
} else {
// not externally induced or trigger already received (rare case)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess, the comment is wrong now? It is only trigger already received (rare case), right?

Copy link
Contributor

@dawidwys dawidwys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How hard would it be to add a test for the blocking/unblocking of the externally induced source?

super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
/** Remove temporary data about a canceled checkpoint. */
private void cleanupCheckpoint(long checkpointId) {
assert (mailboxProcessor.isMailboxThread());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we potentially unblock the input here? If the only pending checkpoint was aborted/declined/cancelled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are absolutely right.

@AHeise AHeise force-pushed the FLINK-25256 branch 2 times, most recently from d9081db to f993e8f Compare March 21, 2022 14:05
@AHeise
Copy link
Contributor Author

AHeise commented Mar 21, 2022

How hard would it be to add a test for the blocking/unblocking of the externally induced source?

I have added assertions into the main test method that cover that. Please check if you think I should have additional test cases.

Copy link
Contributor

@dawidwys dawidwys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine to me now.

@AHeise
Copy link
Contributor Author

AHeise commented Mar 23, 2022

This PR has been verified by the flink-pravega maintainers to work on their tests for checkpoints (savepoint test pending).

Copy link
Contributor

@crazyzhou crazyzhou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested with basic Pravega reader application savepointing locally with a rocksdb state backend, with a simple app
job graph:
image

After this fix, this app can successfully do the stop-with-savepoint while it failed before.

It can recover nicely with the savepoint:
image

The application is now only having _metadata file in each checkpoint and savepoint, so still trying some more complicated cases to see I can reproduce the issue.

@dawidwys
Copy link
Contributor

On the matter of writing into separate files instead of keeping data inside of the metadata, you might want to have a look at: state.storage.fs.memory-threshold

Copy link
Contributor

@crazyzhou crazyzhou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested with an app with a larger state, here is the savepoint structure:

:~/flink-savepoints/savepoint-9a7e45-0215c1d6f545$ ls
9c355659-2475-4402-a5e7-3450c70394b4  _metadata

The application with Pravega source can both cancel and stop with savepoint nicely, and can successfully recover from that.

Arvid Heise added 3 commits March 29, 2022 10:10
…eceived over RPC instead of inventing them out of thin air.

This change preserves the CheckpointOptions and properly integrates user-triggered snapshots and workflows with more than one source.
The externally induced source now merely delays the barrier instead of being able to insert one at a whim which would never work in aforementioned setups.
@AHeise AHeise merged commit a4d194e into apache:master Mar 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants