Skip to content

Conversation

@StephanEwen
Copy link
Contributor

@StephanEwen StephanEwen commented Nov 29, 2020

What is the purpose of the change

This changes the split duplication problem in the new source framework, as initially observed on the File Source.
The issues are independent of the file source and affect all sources implemented against the new framework.

This PR combines the changes from

We need to squash some commits during merging.

Verifying this change

This PR adds significant test coverage via the extension of the FileTextLinesITCase with TaskManager and JobManager failures.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive):
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: yes
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit eb435ff (Sun Nov 29 22:43:30 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!
  • This pull request references an unassigned Jira ticket. According to the code contribution guide, tickets need to be assigned before starting with the implementation work.

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 29, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@StephanEwen
Copy link
Contributor Author

I pushed an updated version, based on the latest master and with some commit squashing.

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this PR @StephanEwen. The changes look good to me and as far as I can tell should solve the problem with late on completed checkpoint messages. Using the restored checkpoint id for deciding on which splits need to be added back to the SplitEnumerator looks like a very good solution.

One thing I noticed is that the SplitAssignmentTracker assumes that when it sees a onCompleteCheckpoint(checkpointId) message for checkpointId, then it will never be called with a lower checkpoint id than checkpointId (via getAndRemoveUncheckpointedAssignment). That's why it can drop the split assignments which have been persisted with checkpoint checkpointId.

Looking at the CheckpointCoordinator I am not entirely sure whether this assumption currently holds in all cases. For example, if the last successful checkpoint was a savepoint and we set CheckpointConfig.setPreferCheckpointForRecovery(true), then this should lead to lost splits. The same applies if the DefaultCompletedCheckpointStore.recover method converges on a set of recovered checkpoints w/o the latest one.

The latter case is probably quite academic (but still possible if there is another Dispatcher holding a lock on a checkpoint which should have been removed) but the former case can lead to data loss if users configure their jobs with the mentioned checkpoint configuration. I mean this option can also lead to duplicate outputs even when using exactly once sinks which is somewhat the same problem. Long story short, I think there are good reasons for considering to remove this config option. The PR which added the behaviour is #8410.

* Get the split to put back. This only happens when a source reader subtask has failed.
*
* @param failedSubtaskId the failed subtask id.
* @param subtaskId the failed subtask id.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing @param restoredCheckpointId


private static void triggerJobManagerFailover(JobID jobId, Runnable afterFailAction) throws Exception {
highAvailabilityServices.revokeJobMasterLeadership(jobId).get();
Thread.sleep(50);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the sleeps needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if they are strictly needed, this is from @azagrebin 's PR and I thought they didn't hurt much.
We can try to remove them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@azagrebin Do you have any concerns about removing the sleeps here?

Comment on lines +320 to +322
Thread.sleep(50);
afterFailAction.run();
Thread.sleep(50);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here for the sleeps?

@StephanEwen
Copy link
Contributor Author

@tillrohrmann That is a very good observation, with the checkpoint versus savepoint issue.

I think what we need to do is the following: Whenever we go back to something other than the latest checkpoint (which would be the case if latest is savepoint and we go to a checkpoint), we need to trigger a global failover, so all tasks go back equally, not just some of them go back (regional failover) and others stay ahead.

In that case we should be consistent here as well, because then all tasks and the coordinator consistently reset to the same state of the earlier checkpoint, where the split is still on the coordinator. The problem is only if the tasks go back to an earlier state (split not yet assigned) but the coordinator does not go back (retains assumption split was assigned).

Copy link
Contributor

@becketqin becketqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix looks good to me for the original issue that caused duplicate splits assignment. One nit comment, please feel free to either ignore or take it.
I checked the code after Till brought up the potential issue with CheckpointConfig.setPreferCheckpointForRecovery(true). It is a very good catch. I agree we should fix that, probably in a separate patch, though.

public List<SplitT> getAndRemoveUncheckpointedAssignment(int subtaskId, long restoredCheckpointId) {
final ArrayList<SplitT> splits = new ArrayList<>();

for (final Map.Entry<Long, Map<Integer, LinkedHashSet<SplitT>>> entry : assignmentsByCheckpointId.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the following code might be slightly simpler.

assignmentsByCheckpointId.tailMap(restoredCheckpointId).values().forEach(...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually had that code initially, but the tailMap()' lower bound key is inclusive (anything >= checkpointID) and we need a tailmap where the key is exclusive. Compensating for that makes the code in the end more complex than this version here.

@StephanEwen
Copy link
Contributor Author

Manually merged in 2049639

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants