-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19389: Fix memory consumption for completed share fetch requests #19928
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the PR. A couple of comments.
// also be prevented by setting smaller value for configuration {@link ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG}. | ||
// However, it's best to trigger the check on all the keys that are being watched which | ||
// should free the memory for the completed operation. | ||
replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchPartitionKey(topicIdPartition)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you run some perf tests to make sure there is no degradation for share fetch requests not reading from remote storage? If there is degradation, maybe we could only trigger this if remote storage is involved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added the test results here. Also monitored the memory consumption in jconsole, which looks stable. There is no degradation when not reading from remote storage.
Without this fix, if we run a producer in parallel to share consumer then also the issue cannot happen as produce also triggers purgatory to check on watch keys per topic-partition as well.
@@ -808,9 +809,18 @@ private void releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> topi | |||
// then we should check if there is a pending share fetch request for the topic-partition and complete it. | |||
// We add the action to delayed actions queue to avoid an infinite call stack, which could happen if | |||
// we directly call delayedShareFetchPurgatory.checkAndComplete | |||
replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition -> | |||
replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move the comment above to below this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, and updated the comment with minor change. From topic-partition
=> share-partition
.
Test results for performance:Setup share groups to read from earliest:
Local tiered storage - message size 1024 Bytes
On trunk - OOM
On fix branch - Successful Consumption
Local tiered storage - message size 10 Bytes
On trunk - OOM
On fix branch - Successful Consumption
Local log fetch - message size 1024 Bytes
On trunk - 21347 ms consumption time
On fix branch - 20254 ms consumption time
|
@junrao Thanks for reviewing, I have addressed the comments. |
@apoorvmittal10 : Thanks for the experimental results. For Local log fetch, could you run multiple groups on the same topic? Could you also measure the CPU usage on the broker? |
Please find the details below: Message size 1024 Bytes, 5 share groups, no parallel produce, read 2Million records each (2 GB bytes read in each group)
Memory Usage: |
Total 5 * 2GB = 10 GB data read by 5 share groups on 1GB Kafka server in ~130 secs. |
@junrao The importance of the fix is more evident when messages are larger in size. I produced messages of size 102400 Bytes hence a single share fetch should have 500 (default) * 102400 = 50 MB max data. Ran with 5 share groups, with timeout of 5 minutes. The consumption bytes and records are similar, though with fix there is better perfromance i.e. ~8.7 GB read per group vs ~8.9 GB read with fix in 5 minutes. Also the memory footprints are far lower with the fix as memory is reclaimed faster now. Without fix - trunkWith Fix |
@junrao CPU usage is around 2-3% in both cases. Read 1Million local records of 1024 Bytes per share group, total 5 share groups in parallel. With trunk:With fix: |
@junrao Please let me know if I need to test a bit more? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the perf results. LGTM
@mimaison This is a critical fix for 4.1 hence I am cherry-picking same in 4.1 branch. |
#19928) For ShareFetch Requests, the fetch happens through DelayedShareFetch operation. The operations which are already completed has reference to data being sent as response. As the operation is watched over multiple keys i.e. DelayedShareFetchGroupKey and DelayedShareFetchPartitionKey, hence if the operation is already completed by either watched keys but then again the reference to the operation is still present in other watched key. Which means the memory can only be free once purge operation is triggered by DelayedOperationPurgatory which removes the watched key operation from remaining keys, as the operation is already completed. The purge operation is dependent on the config `ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG` hence if the value is not smaller than the number of share fetch requests which can consume complete memory of the broker then broker can go out of memory. This can also be avoided by having lower fetch max bytes for request but this value is client dependent hence can't rely to prevent the broker. This PR triggers the completion on both watched keys hence the DelayedShareFetch operation shall be removed from both keys which frees the broker memory as soon the share fetch response is sent. #### Testing Tested with LocalTieredStorage where broker goes OOM after reading some 8040 messages before the fix, with default configurations as mentioned in the doc [here](https://kafka.apache.org/documentation/#tiered_storage_config_ex). But after the fix the consumption continues without any issue. And the memory is released instantaneously. Reviewers: Jun Rao <junrao@gmail.com>, Andrew Schofield <aschofield@confluent.io>
Ack, thanks |
For ShareFetch Requests, the fetch happens through DelayedShareFetch
operation. The operations which are already completed has reference to
data being sent as response. As the operation is watched over multiple
keys i.e. DelayedShareFetchGroupKey and DelayedShareFetchPartitionKey,
hence if the operation is already completed by either watched keys but
then again the reference to the operation is still present in other
watched key. Which means the memory can only be free once purge
operation is triggered by DelayedOperationPurgatory which removes the
watched key operation from remaining keys, as the operation is already
completed.
The purge operation is dependent on the config
ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG
hence if the value is not smaller than the number of share fetch
requests which can consume complete memory of the broker then broker can
go out of memory. This can also be avoided by having lower fetch max
bytes for request but this value is client dependent hence can't rely to
prevent the broker.
This PR triggers the completion on both watched keys hence the
DelayedShareFetch operation shall be removed from both keys which frees
the broker memory as soon the share fetch response is sent.
Testing
Tested with LocalTieredStorage where broker goes OOM after reading some
8040 messages before the fix, with default configurations as mentioned
in the
doc
here.
But after the fix the consumption continues without any issue. And the
memory is released instantaneously.
Reviewers: Jun Rao junrao@gmail.com, Andrew Schofield
aschofield@confluent.io