Skip to content

NIFI-15319 Decrease memory counter after buffer shutdown in ConsumeKinesis#10633

Closed
awelless wants to merge 1 commit intoapache:mainfrom
awelless:NIFI-15319_Free_memory_for_shutdown_in_ConsumeKinesis
Closed

NIFI-15319 Decrease memory counter after buffer shutdown in ConsumeKinesis#10633
awelless wants to merge 1 commit intoapache:mainfrom
awelless:NIFI-15319_Free_memory_for_shutdown_in_ConsumeKinesis

Conversation

@awelless
Copy link
Contributor

@awelless awelless commented Dec 11, 2025

Summary

NIFI-15319

Memory counter is decreased when shutdownShardConsumption is called. As tested, this method is called not only during application shutdown, but also as a part of graceful lease handoff for a single shard.

Added more information in the debug logs.

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

} else {
logger.debug("Shutting down the buffer {}. Checkpointing last consumed record", bufferId);
final Collection<RecordBatch> invalidatedBatches = buffer.shutdownBuffer(checkpointer);
memoryTracker.freeMemory(invalidatedBatches, bufferId);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This absent line was causing the memory tracker to think the buffer is full, even when no data was there.

logger.debug("Reserved {} bytes for {} records. Total consumed memory: {} bytes",
consumedBytes, recordBatch.size(), newConsumedBytes);
logger.debug("Reserved {} bytes for {} records for buffer {}. Total consumed memory: {} bytes",
consumedBytes, recordBatch.size(), bufferId, newConsumedBytes);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging bufferId in reserveMemory and freeMemory allows us to precisely track memory operations for each buffer in the logs.

private Collection<RecordBatch> drainInvalidatedBatches() {
if (!invalidated.get()) {
throw new IllegalStateException("Can't drain invalidated batches for valid shard buffer: " + bufferId);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a safety check to make sure we perform this destructive action for invalidated buffers only.

} catch (final ShutdownException e) {
logger.warn("Failed to checkpoint records due to shutdown. Ignoring checkpoint", e);
return;
} catch (final RuntimeException e) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's essential to ensure no exception is thrown from the buffer methods, so that freeMemory is always called.

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing this issue and additional logs, the changes look good. +1 merging

mark-bathori pushed a commit to mark-bathori/nifi that referenced this pull request Feb 5, 2026
…nesis

This closes apache#10633

Signed-off-by: David Handermann <exceptionfactory@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants