-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16226 Reduce synchronization between producer threads #15323
KAFKA-16226 Reduce synchronization between producer threads #15323
Conversation
1344252
to
ca32a8f
Compare
* @return The delay for next check | ||
*/ | ||
private long partitionReady(Metadata metadata, long nowMs, String topic, | ||
private long partitionReady(Cluster cluster, long nowMs, String topic, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In some ways, this is a step backwards. We have been trying to reduce the reliance on Cluster
internally because it is public. With a lot of internal usage, we end up making changes to the API which are only needed for the internal implementation (as we are doing in this PR). Have you considered alternatives? Perhaps we could expose something like Cluster
, but with a reduced scope?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing it out. As it turns out I don't need to extend the public api of Cluster
in order to get epoch. So internal usage doesn't change Cluster's api anymore.
We have been trying to reduce the reliance on Cluster internally because it is public.
This could be achieved by created a forwarding "internal" class ClusterView
that uses Cluster
by composition offering the same api. Then client
code can be refactored to use ClusterView
. That way future extensions of Cluster
's public api for internal use-cases could be prevented by making them in ClusterView
.
But this is going to be a size-able refactor, how about keeping it separate from this PR? As the intention of this PR is to fix the perf bug, cherry-pick it to other branches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. We'd probably have to do it the other way around though I guess? The client's dependence on Cluster
cannot be easily changed, but we can move the internal implementation anywhere we want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was looking into your idea a little bit. There might be a simple enough variation that wouldn't require significant changes. What do you think about this? https://github.com/apache/kafka/compare/trunk...hachikuji:kafka:internal-cluster-view?expand=1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji Thanks for the draft PR. I have introduced InternalCluster
as a wrapper around public Cluster
. I have extended InternalCluster
to leaderEpochFor
that is only for client's internal-usage.
@hachikuji https://ge.apache.org/s/fr7yermmdioac/tests/overview?outcome=FAILED |
new HashSet<>(Arrays.asList(node2)), 999999 /* maxSize */, time.milliseconds()); | ||
assertTrue(batches.get(node2.id()).isEmpty()); | ||
} | ||
|
||
@Test | ||
public void testDrainOnANodeWhenItCeasesToBeALeader() throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer needed as drainBatchesForOneNode
uses InternalCluster
now Vs Metadata
earlier. With Metadata
is mutable, it can happen a node is a partition leader but then leadership moves another node. This is not possible as InternalCluster
is immutable.
The latest Jenkins failure is due to compilation errors with Scala 2.12 introduced here UPD: |
@@ -52,7 +52,7 @@ public class MetadataCache { | |||
private final Map<TopicPartition, PartitionMetadata> metadataByPartition; | |||
private final Map<String, Uuid> topicIds; | |||
private final Map<Uuid, String> topicNames; | |||
private Cluster clusterInstance; | |||
private InternalCluster clusterInstance; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The javadoc for MetadataCache
describes it as mutable, but as far as I can tell, we do not actually modify any of the collections. We always build new instances instead of updating an existing one. That makes me wonder if we can change the javadoc and use MetadataCache
as the immutable snapshot of metadata state. Then we could drop InternalCluster
in favor of MetadataCache
. Would that work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji I had thought about MetadataCache
. It has 1 accessor i.e. partitionMetadata()
that is returning mutable PartitionMetadata
, and is not making defensive copies. Rest all accessors are returning immutable objects, or making defensive copies.
Is it ok for partitionMetadata()
to make defensive copies? That could lead to memory going up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. It does not look like PartitionMetadata
should be treated as mutable. It comes directly from the Metadata response and I can't think of a reason the client would update any of the replica sets directly. We should confirm though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should confirm though.
I don't see it being used mutably in code. I see historically, it was made mutable to support deletion/updates within cache, but the deletion/update code has since been removed. As far i can see, read-only semantic. So i have treated MetadataCache
as immutable cache, made its internal data structures unmodifiable and updated the javadoc.
All clients test pass locally, hopefully Jenkins signal is green too 🤞
ProducerBatch.maybeUpdateLeaderEpoch should only update leader-epoch is new one is greater
Remove no longer needed changes to public classes
f101313
to
a75ab88
Compare
/** | ||
* Get the current metadata cache. | ||
*/ | ||
public synchronized MetadataCache fetchCache() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we could make cache
volatile and avoid the synchronization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji Interesting.
Metadata.update()
requires mutual exclusion while updating cache
, other internal data structures of Metadata
. So it makes sense to keep the synchronizatiion, what do you think?
Moreover, fetchCache
is called once in Sender::sendProducerData
, so it's not a bottle neck in the hotpath.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense to require exclusive access when building the cache, but here we're just accessing the built value. So I don't think the synchronization is necessary.
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. One minor suggestion.
Went through the test failures across all jdk/scala combos, they are unrelated, and have been failing before this PR as well The test failure belong to test @hachikuji I believe this good to be merged, what do you think? |
@hachikuji @msn-tldr #15376 looks related. |
…15323) As this [JIRA](https://issues.apache.org/jira/browse/KAFKA-16226) explains, there is increased synchronization between application-thread, and the background thread as the background thread started to synchronized methods Metadata.currentLeader() in [original PR](apache#14384). So this PR does the following changes 1. Changes background thread, i.e. RecordAccumulator's partitionReady(), and drainBatchesForOneNode(), to not use `Metadata.currentLeader()`. Instead rely on `MetadataCache` that is immutable. So access to it is unsynchronized. 2. This PR repurposes `MetadataCache` as an immutable snapshot of Metadata. This is a wrapper around public `Cluster`. `MetadataCache`'s API/functionality should be extended for internal client usage Vs public `Cluster`. For example, this PR adds `MetadataCache.leaderEpochFor()` 3. Rename `MetadataCache` to `MetadataSnapshot` to make it explicit its immutable. **Note both `Cluster` and `MetadataCache` are not syncronized, hence reduce synchronization from the hot path for high partition counts.** Reviewers: Jason Gustafson <jason@confluent.io>
@hachikuji thanks for merging. |
@ijuma thanks for flagging #15376. @hachikuji Looks like this was going to add a test that tested the concurrent update of |
…15323) As this [JIRA](https://issues.apache.org/jira/browse/KAFKA-16226) explains, there is increased synchronization between application-thread, and the background thread as the background thread started to synchronized methods Metadata.currentLeader() in [original PR](apache#14384). So this PR does the following changes 1. Changes background thread, i.e. RecordAccumulator's partitionReady(), and drainBatchesForOneNode(), to not use `Metadata.currentLeader()`. Instead rely on `MetadataCache` that is immutable. So access to it is unsynchronized. 2. This PR repurposes `MetadataCache` as an immutable snapshot of Metadata. This is a wrapper around public `Cluster`. `MetadataCache`'s API/functionality should be extended for internal client usage Vs public `Cluster`. For example, this PR adds `MetadataCache.leaderEpochFor()` 3. Rename `MetadataCache` to `MetadataSnapshot` to make it explicit its immutable. **Note both `Cluster` and `MetadataCache` are not syncronized, hence reduce synchronization from the hot path for high partition counts.** Reviewers: Jason Gustafson <jason@confluent.io>
…15323) As this [JIRA](https://issues.apache.org/jira/browse/KAFKA-16226) explains, there is increased synchronization between application-thread, and the background thread as the background thread started to synchronized methods Metadata.currentLeader() in [original PR](apache#14384). So this PR does the following changes 1. Changes background thread, i.e. RecordAccumulator's partitionReady(), and drainBatchesForOneNode(), to not use `Metadata.currentLeader()`. Instead rely on `MetadataCache` that is immutable. So access to it is unsynchronized. 2. This PR repurposes `MetadataCache` as an immutable snapshot of Metadata. This is a wrapper around public `Cluster`. `MetadataCache`'s API/functionality should be extended for internal client usage Vs public `Cluster`. For example, this PR adds `MetadataCache.leaderEpochFor()` 3. Rename `MetadataCache` to `MetadataSnapshot` to make it explicit its immutable. **Note both `Cluster` and `MetadataCache` are not syncronized, hence reduce synchronization from the hot path for high partition counts.** Reviewers: Jason Gustafson <jason@confluent.io>
…#15493) As this [JIRA](https://issues.apache.org/jira/browse/KAFKA-16226) explains, there is increased synchronization between application-thread, and the background thread as the background thread started to synchronized methods Metadata.currentLeader() in [original PR](#14384). So this PR does the following changes 1. Changes background thread, i.e. RecordAccumulator's partitionReady(), and drainBatchesForOneNode(), to not use `Metadata.currentLeader()`. Instead rely on `MetadataCache` that is immutable. So access to it is unsynchronized. 2. This PR repurposes `MetadataCache` as an immutable snapshot of Metadata. This is a wrapper around public `Cluster`. `MetadataCache`'s API/functionality should be extended for internal client usage Vs public `Cluster`. For example, this PR adds `MetadataCache.leaderEpochFor()` 3. Rename `MetadataCache` to `MetadataSnapshot` to make it explicit its immutable. **Note both `Cluster` and `MetadataCache` are not synchronized, hence reduce synchronization from the hot path for high partition counts.** Reviewers: Jason Gustafson <jason@confluent.io>
…#15498) As this [JIRA](https://issues.apache.org/jira/browse/KAFKA-16226) explains, there is increased synchronization between application-thread, and the background thread as the background thread started to synchronized methods Metadata.currentLeader() in [original PR](#14384). So this PR does the following changes 1. Changes background thread, i.e. RecordAccumulator's partitionReady(), and drainBatchesForOneNode(), to not use `Metadata.currentLeader()`. Instead rely on `MetadataCache` that is immutable. So access to it is unsynchronized. 2. This PR repurposes `MetadataCache` as an immutable snapshot of Metadata. This is a wrapper around public `Cluster`. `MetadataCache`'s API/functionality should be extended for internal client usage Vs public `Cluster`. For example, this PR adds `MetadataCache.leaderEpochFor()` 3. Rename `MetadataCache` to `MetadataSnapshot` to make it explicit its immutable. **Note both `Cluster` and `MetadataCache` are not syncronized, hence reduce synchronization from the hot path for high partition counts.** Reviewers: Jason Gustafson <jason@confluent.io>
…15323) As this [JIRA](https://issues.apache.org/jira/browse/KAFKA-16226) explains, there is increased synchronization between application-thread, and the background thread as the background thread started to synchronized methods Metadata.currentLeader() in [original PR](apache#14384). So this PR does the following changes 1. Changes background thread, i.e. RecordAccumulator's partitionReady(), and drainBatchesForOneNode(), to not use `Metadata.currentLeader()`. Instead rely on `MetadataCache` that is immutable. So access to it is unsynchronized. 2. This PR repurposes `MetadataCache` as an immutable snapshot of Metadata. This is a wrapper around public `Cluster`. `MetadataCache`'s API/functionality should be extended for internal client usage Vs public `Cluster`. For example, this PR adds `MetadataCache.leaderEpochFor()` 3. Rename `MetadataCache` to `MetadataSnapshot` to make it explicit its immutable. **Note both `Cluster` and `MetadataCache` are not syncronized, hence reduce synchronization from the hot path for high partition counts.** Reviewers: Jason Gustafson <jason@confluent.io>
Variable metadataMock was removed by apache#15323 after the CI build of apache#15320 was run and before apache#15320 was merged. Reviewers: Luke Chen <showuon@gmail.com>, Lucas Brutschy <lbrutschy@confluent.io>
…15323) As this [JIRA](https://issues.apache.org/jira/browse/KAFKA-16226) explains, there is increased synchronization between application-thread, and the background thread as the background thread started to synchronized methods Metadata.currentLeader() in [original PR](apache#14384). So this PR does the following changes 1. Changes background thread, i.e. RecordAccumulator's partitionReady(), and drainBatchesForOneNode(), to not use `Metadata.currentLeader()`. Instead rely on `MetadataCache` that is immutable. So access to it is unsynchronized. 2. This PR repurposes `MetadataCache` as an immutable snapshot of Metadata. This is a wrapper around public `Cluster`. `MetadataCache`'s API/functionality should be extended for internal client usage Vs public `Cluster`. For example, this PR adds `MetadataCache.leaderEpochFor()` 3. Rename `MetadataCache` to `MetadataSnapshot` to make it explicit its immutable. **Note both `Cluster` and `MetadataCache` are not syncronized, hence reduce synchronization from the hot path for high partition counts.** Reviewers: Jason Gustafson <jason@confluent.io>
Variable metadataMock was removed by apache#15323 after the CI build of apache#15320 was run and before apache#15320 was merged. Reviewers: Luke Chen <showuon@gmail.com>, Lucas Brutschy <lbrutschy@confluent.io>
As this JIRA explains, there is increased synchronization between application-thread, and the background thread as the background thread started to synchronized methods Metadata.currentLeader() in original PR. So this PR does the following changes
Metadata.currentLeader()
. Instead rely onMetadataCache
that is immutable. So access to it is unsynchronized.MetadataCache
as an immutable snapshot of Metadata. This is a wrapper around publicCluster
.MetadataCache
's API/functionality should be extended for internal client usage Vs publicCluster
. For example, this PR addsMetadataCache.leaderEpochFor()
MetadataCache
toMetadataSnapshot
to make it explicit its immutable.Note both
Cluster
andMetadataCache
are not syncronized, hence reduce synchronization from the hot path for high partition counts.More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)