Skip to content

Conversation

@kristoffSC
Copy link
Contributor

What is the purpose of the change

Unchanged backport of PR_20979

Fixing FLINK-29509 by setting the correct subtaskId and numberOfSubtasks for CheckpointCommittableManagerImpl during recovery of committables for Sink V2 architecture.

Brief change log

  • Remove constructor from CheckpointCommittableManagerImpl that was previously used only during deserialization and replace it with a new one that accepts Map<Integer, SubtaskCommittableManager<CommT>> subtasksCommittableManagers along with subtaskId and numberOfSubtasks.
  • Use new CheckpointCommittableManagerImpl constructor in CommittableCollectorSerializer deserialize method for CheckpointCommittableManagerImpl passing subtaskId and numberOfSubtasks from serializer instance.

Verifying this change

This changed enhanced existing CommittableCollectorSerializerTest::testCommittableCollectorV2SerDe() test by verifying sybtaskId and numberOfSubtasks on deserialized CheckpointCommittableManagerImpl instance.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (yes)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

…ointCommittableManagerImpl during deserialization.

When we recover the `CheckpointCommittableManager` we were ignoring the subtaskId it is recovered on. This becomes a problem when a sink uses a post-commit topology because multiple committer operators might forward committable summaries coming from the same subtaskId.

This ticket implements a fix to use the subtaskId already present in the CommittableCollectorSerializer when recreating CheckpointCommittableManagerImpl during recovery.

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
@flinkbot
Copy link
Collaborator

flinkbot commented Oct 12, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@fapaul fapaul merged commit 6b48827 into apache:release-1.15 Oct 13, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants