Skip to content

KAFKA-19389: Fix memory consumption for completed share fetch requests #19928

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 10, 2025

Conversation

apoorvmittal10
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 commented Jun 8, 2025

For ShareFetch Requests, the fetch happens through DelayedShareFetch
operation. The operations which are already completed has reference to
data being sent as response. As the operation is watched over multiple
keys i.e. DelayedShareFetchGroupKey and DelayedShareFetchPartitionKey,
hence if the operation is already completed by either watched keys but
then again the reference to the operation is still present in other
watched key. Which means the memory can only be free once purge
operation is triggered by DelayedOperationPurgatory which removes the
watched key operation from remaining keys, as the operation is already
completed.

The purge operation is dependent on the config
ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG
hence if the value is not smaller than the number of share fetch
requests which can consume complete memory of the broker then broker can
go out of memory. This can also be avoided by having lower fetch max
bytes for request but this value is client dependent hence can't rely to
prevent the broker.

This PR triggers the completion on both watched keys hence the
DelayedShareFetch operation shall be removed from both keys which frees
the broker memory as soon the share fetch response is sent.

Testing

Tested with LocalTieredStorage where broker goes OOM after reading some
8040 messages before the fix, with default configurations as mentioned
in the
doc
here.
But after the fix the consumption continues without any issue. And the
memory is released instantaneously.

Reviewers: Jun Rao junrao@gmail.com, Andrew Schofield
aschofield@confluent.io

@github-actions github-actions bot added triage PRs from the community core Kafka Broker KIP-932 Queues for Kafka small Small PRs labels Jun 8, 2025
@apoorvmittal10 apoorvmittal10 added ci-approved and removed triage PRs from the community labels Jun 8, 2025
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the PR. A couple of comments.

// also be prevented by setting smaller value for configuration {@link ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG}.
// However, it's best to trigger the check on all the keys that are being watched which
// should free the memory for the completed operation.
replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchPartitionKey(topicIdPartition));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you run some perf tests to make sure there is no degradation for share fetch requests not reading from remote storage? If there is degradation, maybe we could only trigger this if remote storage is involved.

Copy link
Contributor Author

@apoorvmittal10 apoorvmittal10 Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added the test results here. Also monitored the memory consumption in jconsole, which looks stable. There is no degradation when not reading from remote storage.

Without this fix, if we run a producer in parallel to share consumer then also the issue cannot happen as produce also triggers purgatory to check on watch keys per topic-partition as well.

@@ -808,9 +809,18 @@ private void releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> topi
// then we should check if there is a pending share fetch request for the topic-partition and complete it.
// We add the action to delayed actions queue to avoid an infinite call stack, which could happen if
// we directly call delayedShareFetchPurgatory.checkAndComplete
replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition ->
replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move the comment above to below this line?

Copy link
Contributor Author

@apoorvmittal10 apoorvmittal10 Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, and updated the comment with minor change. From topic-partition => share-partition.

@apoorvmittal10
Copy link
Contributor Author

Test results for performance:

Setup share groups to read from earliest:

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name share-group-tier3
--alter --add-config 'share.auto.offset.reset=earliest'
Completed updating config for group share-group-tier3.

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name share-group-tier6
--alter --add-config 'share.auto.offset.reset=earliest'
Completed updating config for group share-group-tier6.

Local tiered storage - message size 1024 Bytes

bin/kafka-producer-perf-test.sh --topic tieredTopic1 --num-records 1200000 --record-size 1024
--throughput -1 --producer-props bootstrap.servers=localhost:9092

On trunk - OOM

bin/kafka-share-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic tieredTopic1
--messages 1000000 --timeout 300000 --group share-group-tier3

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "RLMMConsumerTask"
[2025-06-09 19:14:15,035] ERROR [ExpirationReaper-1-Fetch]: Error due to (org.apache.kafka.server.purgatory.DelayedOperationPurgatory$ExpiredOperationReaper)
java.lang.OutOfMemoryError: Java heap space
[2025-06-09 19:14:15,035] ERROR Uncaught exception in thread 'quorum-controller-1-event-handler': (org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
[2025-06-09 19:14:15,035] ERROR Uncaught exception in scheduled task 'kafka-log-retention' (org.apache.kafka.server.util.KafkaScheduler)
java.lang.OutOfMemoryError: Java heap space
[2025-06-09 19:14:15,036] INFO [ExpirationReaper-1-Fetch]: Stopped (org.apache.kafka.server.purgatory.DelayedOperationPurgatory$ExpiredOperationReaper)
[2025-06-09 19:14:15,035] ERROR [ExpirationReaper-1-ShareFetch]: Error due to (org.apache.kafka.server.purgatory.DelayedOperationPurgatory$ExpiredOperationReaper)
java.lang.OutOfMemoryError: Java heap space
    at java.base/java.lang.invoke.DirectMethodHandle.allocateInstance(DirectMethodHandle.java:500) ~[?:?]
    at java.base/java.lang.invoke.DirectMethodHandle$Holder.newInvokeSpecial(DirectMethodHandle$Holder) ~[?:?]
    at java.base/java.lang.invoke.Invokers$Holder.linkToTargetMethod(Invokers$Holder) ~[?:?]
    at org.apache.kafka.server.util.timer.SystemTimer.advanceClock(SystemTimer.java:96) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
    at org.apache.kafka.server.purgatory.DelayedOperationPurgatory.advanceClock(DelayedOperationPurgatory.java:387) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
    at org.apache.kafka.server.purgatory.DelayedOperationPurgatory$ExpiredOperationReaper.doWork(DelayedOperationPurgatory.java:417) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
    at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136) [kafka-server-common-4.1.0-SNAPSHOT.jar:?]

On fix branch - Successful Consumption

bin/kafka-share-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic tieredTopic1
--messages 1000000 --timeout 300000 --group share-group-tier6

start.time, end.time, data.consumed.in.MB, MB.sec, nMsg.sec, data.consumed.in.nMsg, fetch.time.ms
[2025-06-09 19:29:50,195] WARN Share groups and KafkaShareConsumer are part of a preview feature introduced by KIP-932, and are not recommended for use in production. (org.apache.kafka.clients.consumer.internals.ShareConsumerDelegateCreator)
2025-06-09 19:29:50:224, 2025-06-09 19:30:12:655, 977.0361, 43.5574, 44602.7819, 1000485, 22431

Local tiered storage - message size 10 Bytes

bin/kafka-producer-perf-test.sh --topic tieredTopic2 --num-records 1000000 --record-size 10
--throughput -1 --producer-props bootstrap.servers=localhost:9092

On trunk - OOM

bin/kafka-share-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic tieredTopic2
--messages 1000000 --timeout 300000 --group share-group-tier3

java.lang.OutOfMemoryError: Java heap space
[2025-06-09 19:21:54,077] ERROR [controller-1-to-controller-registration-channel-manager]: Stopped due to fatal error with exit code 1 (kafka.server.NodeToControllerRequestThread)
org.apache.kafka.common.internals.FatalExitError: null
    at org.apache.kafka.server.util.InterBrokerSendThread.pollOnce(InterBrokerSendThread.java:131) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
    at kafka.server.NodeToControllerRequestThread.doWork(NodeToControllerChannelManager.scala:322) ~[kafka_2.13-4.1.0-SNAPSHOT.jar:?]
    at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136) [kafka-server-common-4.1.0-SNAPSHOT.jar:?]
[2025-06-09 19:21:54,077] ERROR [broker-1-to-controller-heartbeat-channel-manager]: Stopped due to fatal error with exit code 1 (kafka.server.NodeToControllerRequestThread)
org.apache.kafka.common.internals.FatalExitError: null
    at org.apache.kafka.server.util.InterBrokerSendThread.pollOnce(InterBrokerSendThread.java:131) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
    at kafka.server.NodeToControllerRequestThread.doWork(NodeToControllerChannelManager.scala:322) ~[kafka_2.13-4.1.0-SNAPSHOT.jar:?]
    at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136) [kafka-server-common-4.1.0-SNAPSHOT.jar:?]

On fix branch - Successful Consumption

bin/kafka-share-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic tieredTopic2 --messages 1000000 --timeout 300000 --group share-group-tier6
start.time, end.time, data.consumed.in.MB, MB.sec, nMsg.sec, data.consumed.in.nMsg, fetch.time.ms
[2025-06-09 19:31:08,148] WARN Share groups and KafkaShareConsumer are part of a preview feature introduced by KIP-932, and are not recommended for use in production. (org.apache.kafka.clients.consumer.internals.ShareConsumerDelegateCreator)
2025-06-09 19:31:08:176, 2025-06-09 19:31:21:874, 9.5367, 0.6962, 73003.3582, 1000000, 13698

Local log fetch - message size 1024 Bytes

bin/kafka-producer-perf-test.sh --topic T2 --num-records 1200000 --record-size 1024
--throughput -1 --producer-props bootstrap.servers=localhost:9092

On trunk - 21347 ms consumption time

bin/kafka-share-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic T2
--messages 1000000 --timeout 300000 --group share-group-tier3

start.time, end.time, data.consumed.in.MB, MB.sec, nMsg.sec, data.consumed.in.nMsg, fetch.time.ms
[2025-06-09 19:26:40,870] WARN Share groups and KafkaShareConsumer are part of a preview feature introduced by KIP-932, and are not recommended for use in production. (org.apache.kafka.clients.consumer.internals.ShareConsumerDelegateCreator)
2025-06-09 19:26:40:902, 2025-06-09 19:27:02:249, 976.6699, 45.7521, 46850.1429, 1000110, 21347

On fix branch - 20254 ms consumption time

bin/kafka-share-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic T2
--messages 1000000 --timeout 300000 --group share-group-tier6

start.time, end.time, data.consumed.in.MB, MB.sec, nMsg.sec, data.consumed.in.nMsg, fetch.time.ms
[2025-06-09 19:34:04,444] WARN Share groups and KafkaShareConsumer are part of a preview feature introduced by KIP-932, and are not recommended for use in production. (org.apache.kafka.clients.consumer.internals.ShareConsumerDelegateCreator)
2025-06-09 19:34:04:474, 2025-06-09 19:34:24:728, 976.6699, 48.2211, 49378.3944, 1000110, 20254

@apoorvmittal10 apoorvmittal10 requested a review from junrao June 9, 2025 19:04
@apoorvmittal10
Copy link
Contributor Author

@junrao Thanks for reviewing, I have addressed the comments.

@junrao
Copy link
Contributor

junrao commented Jun 9, 2025

@apoorvmittal10 : Thanks for the experimental results. For Local log fetch, could you run multiple groups on the same topic? Could you also measure the CPU usage on the broker?

@apoorvmittal10
Copy link
Contributor Author

apoorvmittal10 commented Jun 9, 2025

@apoorvmittal10 : Thanks for the experimental results. For Local log fetch, could you run multiple groups on the same topic? Could you also measure the CPU usage on the broker?

Please find the details below:

Message size 1024 Bytes, 5 share groups, no parallel produce, read 2Million records each (2 GB bytes read in each group)

➜  kafka git:(KAFKA-19389) ✗ bin/kafka-share-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic T2 
--messages 2000000 --timeout 300000 --group share-group-local1
start.time, end.time, data.consumed.in.MB, MB.sec, nMsg.sec, data.consumed.in.nMsg, fetch.time.ms
[2025-06-10 00:05:56,619] WARN Share groups and KafkaShareConsumer are part of a preview feature introduced by KIP-932, and are not recommended for use in production. (org.apache.kafka.clients.consumer.internals.ShareConsumerDelegateCreator)
2025-06-10 00:05:56:646, 2025-06-10 00:08:04:118, 1953.3984, 15.3241, 15691.9167, 2000280, 127472
➜  kafka git:(KAFKA-19389) ✗ bin/kafka-share-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic T2
 --messages 2000000 --timeout 300000 --group share-group-local2
start.time, end.time, data.consumed.in.MB, MB.sec, nMsg.sec, data.consumed.in.nMsg, fetch.time.ms
[2025-06-10 00:05:57,676] WARN Share groups and KafkaShareConsumer are part of a preview feature introduced by KIP-932, and are not recommended for use in production. (org.apache.kafka.clients.consumer.internals.ShareConsumerDelegateCreator)
2025-06-10 00:05:57:705, 2025-06-10 00:08:07:343, 1953.3984, 15.0681, 15429.7351, 2000280, 129638
➜  kafka git:(KAFKA-19389) ✗ bin/kafka-share-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic T2 
--messages 2000000 --timeout 300000 --group share-group-local3
start.time, end.time, data.consumed.in.MB, MB.sec, nMsg.sec, data.consumed.in.nMsg, fetch.time.ms
[2025-06-10 00:05:58,849] WARN Share groups and KafkaShareConsumer are part of a preview feature introduced by KIP-932, and are not recommended for use in production. (org.apache.kafka.clients.consumer.internals.ShareConsumerDelegateCreator)
2025-06-10 00:05:58:877, 2025-06-10 00:08:09:053, 1953.3984, 15.0058, 15365.9661, 2000280, 130176
➜  kafka git:(KAFKA-19389) ✗ bin/kafka-share-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic T2 
--messages 2000000 --timeout 300000 --group share-group-local4
start.time, end.time, data.consumed.in.MB, MB.sec, nMsg.sec, data.consumed.in.nMsg, fetch.time.ms
[2025-06-10 00:06:00,081] WARN Share groups and KafkaShareConsumer are part of a preview feature introduced by KIP-932, and are not recommended for use in production. (org.apache.kafka.clients.consumer.internals.ShareConsumerDelegateCreator)
2025-06-10 00:06:00:109, 2025-06-10 00:08:09:869, 1953.3984, 15.0539, 15415.2281, 2000280, 129760
➜  kafka git:(KAFKA-19389) ✗ bin/kafka-share-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic T2
 --messages 2000000 --timeout 300000 --group share-group-local5
start.time, end.time, data.consumed.in.MB, MB.sec, nMsg.sec, data.consumed.in.nMsg, fetch.time.ms
[2025-06-10 00:06:01,475] WARN Share groups and KafkaShareConsumer are part of a preview feature introduced by KIP-932, and are not recommended for use in production. (org.apache.kafka.clients.consumer.internals.ShareConsumerDelegateCreator)
2025-06-10 00:06:01:503, 2025-06-10 00:08:10:307, 1953.3984, 15.1657, 15529.6419, 2000280, 128804

Memory Usage:

Screenshot 2025-06-10 at 00 10 53

@apoorvmittal10
Copy link
Contributor Author

Total 5 * 2GB = 10 GB data read by 5 share groups on 1GB Kafka server in ~130 secs.

@apoorvmittal10
Copy link
Contributor Author

@junrao The importance of the fix is more evident when messages are larger in size. I produced messages of size 102400 Bytes hence a single share fetch should have 500 (default) * 102400 = 50 MB max data. Ran with 5 share groups, with timeout of 5 minutes.

The consumption bytes and records are similar, though with fix there is better perfromance i.e. ~8.7 GB read per group vs ~8.9 GB read with fix in 5 minutes. Also the memory footprints are far lower with the fix as memory is reclaimed faster now.

Without fix - trunk

Screenshot 2025-06-10 at 00 38 38

Screenshot 2025-06-10 at 00 38 46

With Fix

Screenshot 2025-06-10 at 00 49 27
Screenshot 2025-06-10 at 00 52 09

@apoorvmittal10
Copy link
Contributor Author

@junrao CPU usage is around 2-3% in both cases. Read 1Million local records of 1024 Bytes per share group, total 5 share groups in parallel.

With trunk:

Screenshot 2025-06-10 at 13 30 28
Screenshot 2025-06-10 at 13 30 39

With fix:

Screenshot 2025-06-10 at 13 36 32

Screenshot 2025-06-10 at 13 36 11

@apoorvmittal10
Copy link
Contributor Author

@junrao Please let me know if I need to test a bit more?

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the perf results. LGTM

@apoorvmittal10 apoorvmittal10 merged commit 997abe4 into apache:trunk Jun 10, 2025
26 of 27 checks passed
@apoorvmittal10
Copy link
Contributor Author

@mimaison This is a critical fix for 4.1 hence I am cherry-picking same in 4.1 branch.

apoorvmittal10 added a commit that referenced this pull request Jun 10, 2025
#19928)

For ShareFetch Requests, the fetch happens through DelayedShareFetch
operation. The operations which are already completed has reference to
data being sent as response. As the operation is watched over multiple
keys i.e. DelayedShareFetchGroupKey and DelayedShareFetchPartitionKey,
hence if the operation is already completed by either  watched keys  but
then again the reference to the operation is still present in other
watched  key. Which means the memory can only be free once purge
operation is  triggered by DelayedOperationPurgatory which removes the
watched key  operation from remaining keys, as the operation is already
completed.

The purge operation is dependent on the config
`ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG`
hence if the value is not smaller than the number of share fetch
requests which can consume complete memory of the broker then broker can
go out of memory. This can also be avoided by having lower fetch max
bytes for request but this value is client dependent hence can't rely to
prevent  the broker.

This PR triggers the completion on both watched keys hence the
DelayedShareFetch operation shall be removed from both keys which frees
the broker memory as soon the share fetch response is sent.

#### Testing

Tested with LocalTieredStorage where broker goes OOM after reading some
8040 messages before the fix, with default configurations as mentioned
in the
doc

[here](https://kafka.apache.org/documentation/#tiered_storage_config_ex).
But after the fix the consumption continues without any issue. And the
memory is released instantaneously.

Reviewers: Jun Rao <junrao@gmail.com>, Andrew Schofield
<aschofield@confluent.io>
@mimaison
Copy link
Member

Ack, thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker KIP-932 Queues for Kafka small Small PRs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants