Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14414: Remove unnecessary usage of ObjectSerializationCache #12890

Merged
merged 3 commits into from Nov 28, 2022

Conversation

divijvaidya
Copy link
Contributor

@divijvaidya divijvaidya commented Nov 22, 2022

Motivation

We create an instance of ObjectSerializationCache at https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L113 which does not get used at all. We always "add" to the cache but never retrieve from it (as is evident by the fact that we don't store the reference of the cache anywhere).

Adding information to the cache is expensive because it uses System.identityHashCode(Object) which is expensive as demonstrated by the flame graph of producer requests over Apache Kafka 3.3.1 plaintext broker.

Screenshot 2022-11-24 at 13 57 31

The above graph is a stack where below the yellow line are all callers of the function. The horizontal axis demonstrates the amount of CPU used by each caller. As you will observe, KafkaAPIs.handleProduceRequest is a major contributor to the CPU usage.

Change

Currently, the header of a request is parsed by the processor thread prior to adding it to RequestChannel. With this change, we cache the computed size in the RequestHeader object itself at the time of parsing the bytebuffer into a RequestHeader object. The cached size is re-used when it is required at RequestChannel.

After the change

Note that the CPU utilization by the above hotspot has been eliminated.
Screenshot 2022-11-24 at 13 59 10

Please note how the CPU utilization by KafkaAPIs.handleProduceRequest has been eliminated since we don't use ObjectSerializationCache in that code path now.

@divijvaidya
Copy link
Contributor Author

@clolov please review when you get a chance.

@divijvaidya
Copy link
Contributor Author

@mimaison please take a look when you get a chance!

@@ -75,7 +77,17 @@ public void write(ByteBuffer buffer, ObjectSerializationCache serializationCache
}

public int size(ObjectSerializationCache serializationCache) {
return data.size(serializationCache, headerVersion);
if (this.size == SIZE_NOT_INITIALIZED) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method does not need to be public anymore, callers from other packages use the new overload.
Also on the golden path size() -> size(ObjectSerializationCache serializationCache) we do the equality check twice. If we inline it, does it make a difference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I changed the access modifier for this method to default. It is used by tests hence, I am currently not changing to private.

  2. I have made similar changes to RespondeHeader as well to prevent incorrect future usage of the size() method over there that would cause similar performance problems.

  3. Added JavaDocs primarily meant for contributors to ensure that they use the correct intended method.

  4. In this refactor, the size(ObjectSerializationCache) method calculates the size every time (instead of using the cached value). This should remove the double equality checks.

  5. Also added some unit tests.

@@ -110,7 +110,7 @@ object RequestChannel extends Logging {

def sizeOfBodyInBytes: Int = bodyAndSize.size

def sizeInBytes: Int = header.size(new ObjectSerializationCache) + sizeOfBodyInBytes
def sizeInBytes: Int = header.size + sizeOfBodyInBytes
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for reviewers:
This is the main change in this PR which impacts performance where we are using the cached value instead of calculating it again.

}

@Override
public int hashCode() {
return this.data.hashCode();
return Objects.hash(data, headerVersion);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for reviewers

Piggybacking this minor change where we now include headerVersion in equality comparison for two RequestHeader objects.

@divijvaidya
Copy link
Contributor Author

Test failures are unrelated.

Build / JDK 17 and Scala 2.13 / org.apache.kafka.clients.consumer.internals.CooperativeConsumerCoordinatorTest.testOutdatedCoordinatorAssignment()

Build / JDK 17 and Scala 2.13 / kafka.api.TransactionsTest.testFailureToFenceEpoch(String).quorum=kraft

Build / JDK 8 and Scala 2.12 / kafka.api.TransactionsExpirationTest.testTransactionAfterProducerIdExpires(String).quorum=kraft

Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldPrefixAllInternalTopicNamesWithNamedTopology

Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

Copy link
Member

@mimaison mimaison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the PR

@ijuma
Copy link
Contributor

ijuma commented Dec 15, 2022

Out of curiosity, what profiler was used to compute this? Also, did we see an actual improvement (eg was cpu usage or latency lower after the change)? I ask because profilers are known to have safepoint bias and can incorrectly attribute the cost when it comes to certain method types.

@ijuma
Copy link
Contributor

ijuma commented Dec 15, 2022

More details on identity hash code here (it's pretty fast): https://shipilev.net/jvm/anatomy-quarks/26-identity-hash-code/

@divijvaidya
Copy link
Contributor Author

divijvaidya commented Dec 15, 2022

Hey @ijuma - thank you for your question.

  1. I used Amazon CodeGuru Profiler earlier. I ran the test again with with async-profiler which should not suffer from safepoint bias. Please find the below image for the usage of the function. It demonstrates that HashCode evaluation is not the major component but calculating the size takes quite some time. After this change, the flamegraph does not have this stack frame at all (since we don't parse the header again). The flamegraphs is in html format so I can't add it here but will be happy to share with you offline in case you are interested. I am running this with Correto JDK 11 and ./profiler.sh -d 500 -i 40ms -e cycles as the profiler configuration.

Screenshot 2022-12-15 at 12 16 04

  1. Irrespective of the CPU usage optimisation that this change brings, I think that this is a good change because it avoids parsing the request header twice.

  2. The actual CPU standard deviation is more than 2-3% during the duration of the test and hence, I cannot reliably say that I observed lower CPU usage after this change. Latency measurement is also not reliable because the bottleneck is thread contention on partition lock in UnifiedLog and hence, this change won't change the latency.

@ijuma
Copy link
Contributor

ijuma commented Dec 15, 2022

Was this profile taken after the broker was running for long enough for the JIT compilation to have completed? The profile seems to show some things that look a bit odd.

That said, I think the change is fine overall. It makes sense to avoid the map allocation (including underlying array) and the overhead of mutating it (including array resizes required for it). As you said, it also makes sense to avoid parsing the request header twice.

My questions were mostly so that we understand what we are trying to achieve and we have the right understanding of the underlying reason. This helps ensure future changes are not based on incorrect conclusions.

Thanks for the improvement!

@divijvaidya
Copy link
Contributor Author

@ijuma yes, there was a warm up workload prior to profiling. The JVM was probably alive for ~7-8 min. before the profile capture started. What are the fishy things that you notice here? I can try again on a long running server if you like.

I am going to be using a similar profiler as a motivation for some future changes that I have lined up (I am currently writing a JMH benchmark for my other ArrayBuffer vs. ListBuffer PR) and I want to ensure we are on the same page wrt it's effectiveness. Hence, let's resolve this. What can I change in my setup that can help us understand the flamegraph better? I would be happy to jump on a call too to explain my setup if that makes things faster or we can use the public slack channel (ASF workspace, #kafka channel) to communicate faster on this.

@ijuma
Copy link
Contributor

ijuma commented Dec 15, 2022

@divijvaidya Thanks. Can you please attach the html file? I think it's possible to do it here, but if not then the JIRA would be helpful. I can take a closer look on the bits that seemed a bit suspicious.

guozhangwang pushed a commit to guozhangwang/kafka that referenced this pull request Jan 25, 2023
…ache#12890)


Reviewers: Mickael Maison <mickael.maison@gmail.com>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants