Skip to content

NIFI-14752 Add Max Uncommitted Size to ConsumeKafka#10093

Closed
exceptionfactory wants to merge 1 commit intoapache:mainfrom
exceptionfactory:NIFI-14752
Closed

NIFI-14752 Add Max Uncommitted Size to ConsumeKafka#10093
exceptionfactory wants to merge 1 commit intoapache:mainfrom
exceptionfactory:NIFI-14752

Conversation

@exceptionfactory
Copy link
Contributor

Summary

NIFI-14752 Adds a Max Uncommitted Size optional property to the ConsumeKafka Processor.

The Max Uncommitted Size property provides an optional soft limit on the total size of records consumed during an invocation of ConsumeKafka. When configured with a value, Max Uncommitted Size breaks the polling loop inside ConsumeKafka.onTrigger() before reaching the Max Uncommitted Time.

The Max Uncommitted Time property behavior remains unchanged, but in flows with high volume, configuring the Max Uncommitted Size enables ConsumeKafka to transfer FlowFiles before waiting for the total time configured.

Changes include a new unit test with a smaller Kafka max.poll.records and a larger Max Uncommitted Time with corresponding JUnit test timeout. The new test method with Max Uncommitted Size configured verifies that ConsumeKafka completes before waiting for the maximum amount of time to elapse.

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using mvn clean install -P contrib-check
    • JDK 21

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

- Updated Offset Tracker to calculate Total Record Size based on Kafka Consumer Record key and value length
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant