Skip to content

KAFKA-20535: Improve async consumer CPU usage under low max.poll.records.#22199

Open
chickenchickenlove wants to merge 2 commits intoapache:trunkfrom
chickenchickenlove:KAFKA-20535
Open

KAFKA-20535: Improve async consumer CPU usage under low max.poll.records.#22199
chickenchickenlove wants to merge 2 commits intoapache:trunkfrom
chickenchickenlove:KAFKA-20535

Conversation

@chickenchickenlove
Copy link
Copy Markdown
Contributor

@chickenchickenlove chickenchickenlove commented May 3, 2026

Description

KAFKA-20332 fixed a correctness issue in the async consumer where the application thread could collect buffered records before the background thread had checked for pending reconciliations. The fix added a wait on
inflightPoll.reconciliationCheckFuture() in AsyncKafkaConsumer.collectFetch().

This restored the correctness guarantee, but it also increased CPU usage in low max.poll.records scenarios. With max.poll.records=5, profiling shows that the additional cost mainly comes from the application thread waiting on
ConsumerUtils.getResult(inflightPoll.reconciliationCheckFuture(), timeoutMs) even when the consumer group member is not reconciling.

This patch avoids that unnecessary wait by tracking the consumer group member state in AsyncKafkaConsumer. AbstractMembershipManager now notifies MemberStateListener whenever the consumer member transitions
to a new state. AsyncKafkaConsumer uses this signal to wait for the reconciliation check only while the member is in MemberState.RECONCILING.

Test Condition

  • Broker
    • stand alone
    • 12 vCPU, 32GB RAM.
  • Producer
    • Use bin/kafka-producer-perf-test.sh in Broker.
    • throughput 50000
    • record-size 100
  • Consumer (before, after, optimized)
    • kubernetes environment.
    • All consumers are scheduled on the same worker node.
  • Profiler
    • async-profiler
    • duration 180 seconds.
  • branch

Test Result

  1. Check the throughput of each consumer
Before - RATE: 49997.400259974005 records/sec, total=7050826
After - RATE: 50002.199780022 records/sec, total=7557854
Optimized - RATE: 50002.199780022 records/sec, total=7584198
  • All consumers have same throughput.
  1. Average CPU usage from kubectl top pod
Revision Average CPU
Before 225.7m
After 325.7m
Optimized 248.4m

every 30second, 10 times. The optimized version reduced CPU usage by
about 23.7% compared with after.

Flame Graph Summary

Metric Before After Optimized
Samples 2,402 3,160 2,542
markReconciliationCheckComplete 0.00% 2.82% 0.51%
setActiveTask 0.00% 0.06% 0.00%
pollTimeMs 0.00% 0.00% 0.00%
AsyncPollEvent 0.37% 2.91% 0.87%
processBackgroundEvents 2.37% 1.93% 2.28%
Reaper 0.17% 0.19% 0.35%
parkNanos 1.08% 4.05% 0.94%
unpark 0.37% 2.06% 0.63%
AsyncKafkaConsumer.poll 38.68% 38.32% 38.20%
AsyncKafkaConsumer.collectFetch 20.32% 23.61% 19.39%
ApplicationEventProcessor.process 2.66% 6.46% 2.83%
ApplicationEventHandler.add 7.87% 7.37% 8.06%

After shows higher CPU usage, and the profile also shows increased time in parkNanos and unpark. This suggests that the additional wait on reconciliationCheckFuture introduced more application/background thread coordination overhead.

AsyncKafkaConsumer.collectFetch

Metric Before After Optimized
AsyncKafkaConsumer.collectFetch samples 488 746 493
AsyncKafkaConsumer.collectFetch % 20.32% 23.61% 19.39%
FetchCollector.collectFetch samples 482 512 477
FetchCollector.collectFetch % 20.07% 16.20% 18.76%
ConsumerUtils.getResult samples 0 194 0
ConsumerUtils.getResult % 0.00% 6.14% 0.00%

In after, the application thread spends additional time in ConsumerUtils.getResult() while waiting for the reconciliation check future. This also increases related park/unpark activity and application event processing on the background thread. In the optimized version, this wait is skipped unless the member is actually in RECONCILING
state.

ConsumerNetworkThread.runOnce

Metric Before After Optimized
ConsumerNetworkThread.runOnce samples 1,216 1,621 1,295
ConsumerNetworkThread.runOnce % 50.62% 51.30% 50.94%
ConsumerNetworkThread.processApplicationEvents samples 283 442 263
ConsumerNetworkThread.processApplicationEvents % 11.78% 13.99% 10.35%
FetchRequestManager.poll samples 169 194 158
FetchRequestManager.poll % 7.04% 6.13% 6.22%
NetworkClientDelegate.poll samples 600 755 684
NetworkClientDelegate.poll % 24.98% 23.89% 26.91%

The higher CPU usage in onsumerNetworkThread appears to be a secondary effect of the application thread waiting on reconciliationCheckFuture more often. Each wait requires coordination between the application
thread and the ackground thread: the app thread enqueues an AsyncPollEvent, waits for the reconciliation check to complete, and the background thread processes that event and completes the future. As
a result, ConsumerNetworkThread.processApplicationEvents and related AsyncPollEvent processing show higher CPU usage in the after profile.

Reviewers: Lianet Magrans lmagrans@confluent.io

@github-actions github-actions Bot added triage PRs from the community consumer clients small Small PRs labels May 3, 2026
@chickenchickenlove
Copy link
Copy Markdown
Contributor Author

Hi, @lianetm !
I identified the issue through profiling and first drafted the improvement approach I had in mind.
I have not updated the tests yet, but if this direction looks reasonable to you, I'll go ahead and update them as well.
When you get a chance, please take a look 🙇‍♂️

Copy link
Copy Markdown
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @chickenchickenlove, high level the approach makes sense to me. Just some nits from a first pass.

And some brainstorming on the side, just to make sure there is nothing else that needs attention. Given that you have the repro in hand, can you get some visibility on what part of the background thread is "taking long" to process the inflightPoll that triggers reconciliation? E.g., app thread adds the inflightPoll to the queue and let's assume the member is RECONCILING. From what you describe in the PR, you're seeing that by the time the app thread checks inflightPoll.isReconciliationCheckComplete the background hasn't made it to the point where it sets it to true. So is there anything "delaying" the background process of the inflightPoll that we can improve? (why doesn't it make it to marking reconciliation check done before the app thread needs it? that reconciliation trigger does not block on anything, it just prepares requests if needed and sets some flags in the subscription state). We can address this separately if needed, but raising it here because it's the scenario that makes me wonder why

*
* @param memberState The new consumer group member state.
*/
default void onConsumerMemberStateChange(MemberState memberState) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we name it onMemberStateUpdated to be consistent with the existing funcs?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original intention was to show that this callback is only needed for the ConsumerGroup. However, since the default method is a no-op, generalizing it as onMemberStateUpdated and letting the call site override it if needed seems like a better approach.
Thank you! 🙇‍♂️

private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
private final AtomicInteger refCount = new AtomicInteger(0);

private volatile boolean memberInReconciliationState = false;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reconciliation state means there is a pending reconciliation. Given here we're at the consumer level (not in the internal state machine impl), would it be clearer to name this around hasPendingReconciliation or similar?

@lianetm lianetm removed the triage PRs from the community label May 4, 2026
@chickenchickenlove
Copy link
Copy Markdown
Contributor Author

chickenchickenlove commented May 5, 2026

@lianetm
Thanks for the time to take a look! 🙇‍♂️

I don't think this necessarily means that the background thread is slow compared to the application thread.

With low max.poll.records, if there are enough records already available in the fetch buffer, the application thread can reach ConsumerUtils.getResult(inflightPoll.reconciliationCheckFuture(), timeoutMs) very quickly after enqueueing the AsyncPollEvent.

On the other hand, adding an AsyncPollEvent does not mean that the background thread processes it immediately. The background thread runs independently from the application thread, and it may be blocked in network poll or processing another part of its run loop before it gets to the application event queue. So even if processing the AsyncPollEvent itself is cheap, the application thread can still observe the reconciliation check future as incomplete when it reaches collectFetch().

The profiling results seem consistent with this interpretation:

  • In before and optimized, ConsumerUtils.getResult is not visible in the collectFetch path, and parkNanos / unpark are much lower.
  • In after, ConsumerUtils.getResult, parkNanos, unpark, AsyncPollEvent, and markReconciliationCheckComplete all increase.

So my interpretation is that the extra CPU usage is not from reconciliation work itself being expensive, but from repeatedly going through the wait/future-completion path: the application thread parks while waiting for reconciliationCheckFuture, and the background thread completes that future and wakes it up.

This patch does not reduce the number of AsyncPollEvents. It still enqueues AsyncPollEvent as before, so I would not expect it to significantly reduce the ConsumerNetworkThread run loop itself. This is also consistent with the profile, where ConsumerNetworkThread.runOnce stays around the same level across the three runs:

Metric Before After Optimized
ConsumerNetworkThread.runOnce 50.62% 51.30% 50.94%

The main difference is that this patch avoids waiting on reconciliationCheckFuture in steady state. In other words, AsyncPollEvent is still enqueued and processed, but its reconciliation-check completion is no longer on the application thread's critical path unless the member is actually in RECONCILING state.

For more optimization, while we could consider a further optimization to only enqueue AsyncPollEvent when necessary, given the current profiling results, I believe the resulting performance improvement would be minimal. 🤔

Also, To confirm the assumptions, I can add temporary instrumentation around AsyncPollEvent latency. For each sampled AsyncPollEvent, we can record when it is enqueued by the application thread, when the application thread starts waiting on reconciliationCheckFuture, when the background thread starts processing it, and when markReconciliationCheckComplete() is called.

This should show whether the application thread is reaching collectFetch() before the background thread has had a chance to process the AsyncPollEvent, or whether the delay is actually inside the background processing path.

What do you think?
Please let me know your thoughts. 🙇‍♂️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants