Skip to content

Comments

NIFI-15293 Checkpoint committed records in ConsumeKinesis#10600

Merged
exceptionfactory merged 1 commit intoapache:mainfrom
awelless:NIFI-15293_Checkpoint_committed_records_only_in_ConsumeKinesis
Dec 4, 2025
Merged

NIFI-15293 Checkpoint committed records in ConsumeKinesis#10600
exceptionfactory merged 1 commit intoapache:mainfrom
awelless:NIFI-15293_Checkpoint_committed_records_only_in_ConsumeKinesis

Conversation

@awelless
Copy link
Contributor

@awelless awelless commented Dec 4, 2025

Summary

NIFI-15293

The pull request addresses potential data loss caused by incorrect checkpointing logic. See the jira ticket for more details.

Also tested manually, with abrupt interruptions and shutdowns of the processor.

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

void doCheckpoint() throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException;
}

private record LastIgnoredCheckpoint(RecordProcessorCheckpointer checkpointer, String sequenceNumber, long subSequenceNumber) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the actual KCL implementation the same stateful RecordProcessorCheckpointer is passed to each processor's method, however the API doesn't call this out explicitly.
In case this changes in the future, it's better to keep a reference to the checkpointer which was passed with a batch of records.

@awelless awelless force-pushed the NIFI-15293_Checkpoint_committed_records_only_in_ConsumeKinesis branch from 906cfcc to 5cac719 Compare December 4, 2025 12:26
Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing this issue @awelless, the updated approach looks good. +1 merging

@exceptionfactory exceptionfactory merged commit 19db2ef into apache:main Dec 4, 2025
7 checks passed
mark-bathori pushed a commit to mark-bathori/nifi that referenced this pull request Feb 5, 2026
Signed-off-by: David Handermann <exceptionfactory@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants