Skip to content

Conversation

@kristoffSC
Copy link
Contributor

What is the purpose of the change

Unchanged backport of #21101

Fixing FLINK-29627 where recovery more than one Committable causesed IllegalStateException and prevents cluster to start.

Implemented fix is based on merging committables from same subtaskId during recovery for Sink V2 architecture.

Brief change log

  • Enhance CheckpointSimpleVersionedSerializer::deserialize by calling SubtaskCommittableManager::merge for committables for same subtaskId.

Verifying this change

  • Add new test Committablecollectorserializertest::testCommittablesForSameSubtaskIdV2SerDe to verify deserialization of multiple committables for the same subtaskId.
  • Enhanced CommitterOperatorTest::testStateRestore to include committables for same subtaskId.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (yes)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

… more than 1 committable.

Recovery more than one Committable causes `IllegalStateException` and prevents cluster to start.

When we recover the `CheckpointCommittableManager` we deserialize SubtaskCommittableManager instances from recovery state, and we put them into `Map<Integer, SubtaskCommittableManager<CommT>>`. The key of this map is subtaskId of the recovered manager. However, this will fail if we have to recover more than one committable.

What was implemented as a fix is to call `SubtaskCommittableManager::merge` if we already deserialize manager for this subtaskId.

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
@flinkbot
Copy link
Collaborator

flinkbot commented Oct 19, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@kristoffSC kristoffSC changed the title [Flink 29627][streaming] Fix duplicate key exception during recover more than 1 committable. [Flink 29627][streaming] Fix duplicate key exception during recovery more than 1 committable. Oct 19, 2022
@kristoffSC
Copy link
Contributor Author

@flinkbot run azure

@fapaul fapaul merged commit 47bcb5a into apache:release-1.15 Oct 20, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants