Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8788: Optimize client metadata handling with a large number of partitions #7192

Merged
merged 11 commits into from Aug 15, 2019

Conversation

@ijuma
Copy link
Contributor

commented Aug 11, 2019

Credit to @lbradstreet for profiling the producer with a large number of partitions.

Cache topicMetadata, brokers and controller in the MetadataResponse
the first time it's needed avoid unnecessary recomputation. We were previously
computingbrokersMap 4 times per partition in one code path that was invoked from
multiple places. This is a regression introduced via a42f16f and first released
in 2.3.0.

The Cluster constructor became significantly more allocation heavy due to
2c44e77, first released in 2.2.0. Replaced merge calls with more verbose,
but more efficient code. Added a test to verify that the returned collections are
unmodifiable.

Add topicAuthorizedOperations and clusterAuthorizedOperations to
MetadataResponse and remove data() method.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@ijuma ijuma requested review from omkreddy and rajinisivaram Aug 11, 2019

@ijuma ijuma changed the title MINOR: Compute `brokersMap` once in `MetadataResponse.topicMetadata()` MINOR: Optimize client metadata code when there are a large number of partitions Aug 11, 2019

@ijuma ijuma changed the title MINOR: Optimize client metadata code when there are a large number of partitions MINOR: Optimize client metadata handling with a large number of partitions Aug 11, 2019

clients/src/main/java/org/apache/kafka/common/Cluster.java Outdated
}
availablePartitionsForTopic = Collections.unmodifiableList(availablePartitionsForTopic);
}
else {

This comment has been minimized.

Copy link
@lbradstreet

lbradstreet Aug 11, 2019

Contributor

nit: else should be on the previous line

@ijuma ijuma force-pushed the ijuma:compute-brokers-map-once-topic-metadata branch Aug 11, 2019

@ijuma ijuma changed the title MINOR: Optimize client metadata handling with a large number of partitions KAFKA-8788: Optimize client metadata handling with a large number of partitions Aug 11, 2019

clients/src/main/java/org/apache/kafka/common/Cluster.java Outdated
if (p.leader() != null) {
tmpAvailablePartitionsByTopic.merge(p.topic(), Collections.singletonList(p), Utils::concatListsUnmodifiable);
tmpPartitionsByNode.merge(p.leader().id(), Collections.singletonList(p), Utils::concatListsUnmodifiable);
List<PartitionInfo> partitionsForNode = Utils.notNull(tmpPartitionsByNode.get(p.leader().id()));

This comment has been minimized.

Copy link
@stanislavkozlovski

stanislavkozlovski Aug 11, 2019

Contributor

If a broker is down, will it be present in tmpPartitionsByNode and if not, is it possible that a partition may still point to that nonexistent broker as its leader?

This comment has been minimized.

Copy link
@ijuma

ijuma Aug 11, 2019

Author Contributor

The idea here is that the p.leader() != null check would not succeed if the broker is down and hence this code would not run. The Cluster code had this assumption before 2.2.0:

List<PartitionInfo> psNode = Utils.notNull(partsForNode.get(p.leader().id()));

It looks like a new unit test is not fulfilling this assumption. Will fix.

@@ -108,23 +108,57 @@ private Cluster(String clusterId,

// Index the nodes for quick lookup
Map<Integer, Node> tmpNodesById = new HashMap<>();
for (Node node : nodes)
Map<Integer, List<PartitionInfo>> tmpPartitionsByNode = new HashMap<>(nodes.size());

This comment has been minimized.

Copy link
@stanislavkozlovski

stanislavkozlovski Aug 11, 2019

Contributor

tmpPartitionsByNode has modifiable lists as values and by extension this.partitionsByNode will as well. Is that a concern? We can access it via a public interface so I assume we should keep the immutableness

This comment has been minimized.

Copy link
@ijuma

ijuma Aug 11, 2019

Author Contributor

Yeah, it's a bit annoying that we have to do an iteration just to do this. Pushed a fix.

This comment has been minimized.

Copy link
@ijuma

ijuma Aug 11, 2019

Author Contributor

I also added a test to verify this since the logic is a bit complex.

This comment has been minimized.

Copy link
@stanislavkozlovski
Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1]), allPartitions,
Collections.<String>emptySet(), Collections.<String>emptySet());
Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1], NODES[2]), allPartitions,
Collections.emptySet(), Collections.emptySet());

This comment has been minimized.

Copy link
@ijuma

ijuma Aug 11, 2019

Author Contributor

@jolshan @cmccabe Please review this change to the StickyPartitionCacheTest.

This comment has been minimized.

Copy link
@jolshan

jolshan Aug 14, 2019

Contributor

It seems that those extra lines are in DefaultPartitionerTest as well. (That is where I got the line from.)
Also, this is more of a question for myself to understand, but what is the fourth node for?

@ijuma ijuma force-pushed the ijuma:compute-brokers-map-once-topic-metadata branch Aug 11, 2019


public class ClusterTest {

private final static Node[] NODES = new Node[] {

This comment has been minimized.

Copy link
@cmccabe

cmccabe Aug 12, 2019

Contributor

Maybe it's just me, but it seems surprising for NODES[3] to have an ID of 11. Is there a reason for not just assigning them sequential IDs?

This comment has been minimized.

Copy link
@ijuma

ijuma Aug 13, 2019

Author Contributor

I just copied that from the StickyPartitionCacheTest that @jolshan and you wrote. :) I thought that had been done that way to highlight the offline node.

This comment has been minimized.

Copy link
@jolshan

jolshan Aug 14, 2019

Contributor

This may have just been a simple mistake on my part. It is in DefaultPartitionerTest as well.

@@ -57,15 +58,22 @@

public static final int AUTHORIZED_OPERATIONS_OMITTED = Integer.MIN_VALUE;

private MetadataResponseData data;
private final MetadataResponseData data;

This comment has been minimized.

Copy link
@cmccabe

cmccabe Aug 12, 2019

Contributor

If we're going to go down the route of wrapping everything, then we should hide the MetadataResponseData field so that it can't be modified to be different than the wrapped fields. So we shouldn't have data as a public method (it should be private, or absent)

This comment has been minimized.

Copy link
@ijuma

ijuma Aug 13, 2019

Author Contributor

This is used in a couple of places in KafkaAdminClient currently.

This comment has been minimized.

Copy link
@ijuma

ijuma Aug 14, 2019

Author Contributor

I replaced this with two methods that return an int so that callers cannot mutate data.

@cmccabe

This comment has been minimized.

Copy link
Contributor

commented Aug 12, 2019

Nice catch, @lbradstreet. And thanks for creating the PR, @ijuma.

In the long term, I don't think we need MetadataResponse#TopicMetadata any more.
It's just a wrapper class for the generated code. If we keep it around, we will always have to convert to and from the wrapper classes during serialization. It's also annoying to keep them in sync.

I think the better fix of just using the generated classes is actually not that much harder. If you leave out test code, there are not that many places that use MetadataResponse#TopicMetadata. KafkaApis, the MetadataCache, the Metadata class, and KafkaAdminClient#listTopics.

@cmccabe

This comment has been minimized.

Copy link
Contributor

commented Aug 12, 2019

BTW, Cluster doesn't expose TopicMetadata. To my knowledge, it's not part of our public API

@omkreddy
Copy link
Contributor

left a comment

@ijuma Thanks for the PR. LGTM.
@lbradstreet Thanks for catching this.

@ijuma

This comment has been minimized.

Copy link
Contributor Author

commented Aug 13, 2019

I think the better fix of just using the generated classes is actually not that much harder. If you leave out test code, there are not that many places that use MetadataResponse#TopicMetadata. KafkaApis, the MetadataCache, the Metadata class, and KafkaAdminClient#listTopics.

@cmccabe Unfortunately, the generated API is mutable and harder to use/read (no support for Error, using magic values instead of Optional, etc.). This may be a good path longer term, as you said, but I don't think we should make that change in the 2.3 branch as there is a non trivial risk of regressions.

I think we should merge this as it is to trunk and 2.3. And make further changes in trunk, if needed.

@cmccabe

This comment has been minimized.

Copy link
Contributor

commented Aug 13, 2019

OK. I think it's reasonable to merge this and continue working on a better solution in trunk. Let's not expose mutable fields that could diverge with the copied data, though

@ijuma ijuma force-pushed the ijuma:compute-brokers-map-once-topic-metadata branch to 8b4153a Aug 14, 2019

@ijuma

This comment has been minimized.

Copy link
Contributor Author

commented Aug 14, 2019

Can you please check the latest update @omkreddy

@omkreddy

This comment has been minimized.

Copy link
Contributor

commented Aug 14, 2019

@ijuma latest changes LGTM. thanks,

@ijuma

This comment has been minimized.

Copy link
Contributor Author

commented Aug 14, 2019

Two jobs passed, one failed with flaky org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector.

Document topicAuthorizedOperations and clusterAuthorizedOperations. A…
…nd return `Optional<Integer>` from the former
@ijuma

This comment has been minimized.

Copy link
Contributor Author

commented Aug 14, 2019

I pushed a small change to document the two new methods in MetadataResponse and to return Optional<Integer> from topicAuthorizedOperations. @omkreddy, still good?

@ijuma

This comment has been minimized.

Copy link
Contributor Author

commented Aug 14, 2019

retest this please

@ijuma

This comment has been minimized.

Copy link
Contributor Author

commented Aug 15, 2019

One job passed, two failed with a single unrelated flaky test each:

kafka.api.PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation
kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl

Merging to master and 2.3.

@ijuma ijuma merged commit ebf78c0 into apache:trunk Aug 15, 2019

1 of 3 checks passed

JDK 11 and Scala 2.13 FAILURE 11817 tests run, 77 skipped, 1 failed.
Details
JDK 8 and Scala 2.11 FAILURE 11817 tests run, 77 skipped, 1 failed.
Details
JDK 11 and Scala 2.12 SUCCESS 11817 tests run, 77 skipped, 0 failed.
Details

@ijuma ijuma deleted the ijuma:compute-brokers-map-once-topic-metadata branch Aug 15, 2019

ijuma added a commit that referenced this pull request Aug 15, 2019
KAFKA-8788: Optimize client metadata handling with a large number of …
…partitions (#7192)

Credit to @lbradstreet for profiling the producer with a large number of partitions.

Cache `topicMetadata`, `brokers` and `controller` in the `MetadataResponse`
the first time it's needed avoid unnecessary recomputation. We were previously
computing`brokersMap` 4 times per partition in one code path that was invoked from
multiple places. This is a regression introduced via a42f16f and first released
in 2.3.0.

The `Cluster` constructor became significantly more allocation heavy due to
2c44e77, first released in 2.2.0. Replaced `merge` calls with more verbose,
but more efficient code. Added a test to verify that the returned collections are
unmodifiable.

Add `topicAuthorizedOperations` and `clusterAuthorizedOperations` to
`MetadataResponse` and remove `data()` method.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Lucas Bradstreet <lucasbradstreet@gmail.com>, Colin P. McCabe <cmccabe@confluent.io>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Justine Olshan <jolshan@confluent.io>
ijuma added a commit to confluentinc/kafka that referenced this pull request Aug 16, 2019
KAFKA-8788: Optimize client metadata handling with a large number of …
…partitions (apache#7192)

Credit to @lbradstreet for profiling the producer with a large number of partitions.

Cache `topicMetadata`, `brokers` and `controller` in the `MetadataResponse`
the first time it's needed avoid unnecessary recomputation. We were previously
computing`brokersMap` 4 times per partition in one code path that was invoked from
multiple places. This is a regression introduced via a42f16f and first released
in 2.3.0.

The `Cluster` constructor became significantly more allocation heavy due to
2c44e77, first released in 2.2.0. Replaced `merge` calls with more verbose,
but more efficient code. Added a test to verify that the returned collections are
unmodifiable.

Add `topicAuthorizedOperations` and `clusterAuthorizedOperations` to
`MetadataResponse` and remove `data()` method.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Lucas Bradstreet <lucasbradstreet@gmail.com>, Colin P. McCabe <cmccabe@confluent.io>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Justine Olshan <jolshan@confluent.io>
ableegoldman added a commit to confluentinc/kafka that referenced this pull request Aug 20, 2019
KAFKA-8788: Optimize client metadata handling with a large number of …
…partitions (apache#7192)

Credit to @lbradstreet for profiling the producer with a large number of partitions.

Cache `topicMetadata`, `brokers` and `controller` in the `MetadataResponse`
the first time it's needed avoid unnecessary recomputation. We were previously
computing`brokersMap` 4 times per partition in one code path that was invoked from
multiple places. This is a regression introduced via a42f16f and first released
in 2.3.0.

The `Cluster` constructor became significantly more allocation heavy due to
2c44e77, first released in 2.2.0. Replaced `merge` calls with more verbose,
but more efficient code. Added a test to verify that the returned collections are
unmodifiable.

Add `topicAuthorizedOperations` and `clusterAuthorizedOperations` to
`MetadataResponse` and remove `data()` method.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Lucas Bradstreet <lucasbradstreet@gmail.com>, Colin P. McCabe <cmccabe@confluent.io>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Justine Olshan <jolshan@confluent.io>
xiowu0 added a commit to linkedin/kafka that referenced this pull request Aug 22, 2019
[LI-CHERRY-PICK] [4d0cc43] KAFKA-8788: Optimize client metadata handl…
…ing with a large number of partitions (apache#7192)

TICKET = KAFKA-8788
LI_DESCRIPTION =
EXIT_CRITERIA = HASH [4d0cc43]
ORIGINAL_DESCRIPTION =

Credit to @lbradstreet for profiling the producer with a large number of partitions.

Cache `topicMetadata`, `brokers` and `controller` in the `MetadataResponse`
the first time it's needed avoid unnecessary recomputation. We were previously
computing`brokersMap` 4 times per partition in one code path that was invoked from
multiple places. This is a regression introduced via a42f16f and first released
in 2.3.0.

The `Cluster` constructor became significantly more allocation heavy due to
2c44e77, first released in 2.2.0. Replaced `merge` calls with more verbose,
but more efficient code. Added a test to verify that the returned collections are
unmodifiable.

Add `topicAuthorizedOperations` and `clusterAuthorizedOperations` to
`MetadataResponse` and remove `data()` method.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Lucas Bradstreet <lucasbradstreet@gmail.com>, Colin P. McCabe <cmccabe@confluent.io>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Justine Olshan <jolshan@confluent.io>
(cherry picked from commit 4d0cc43)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants
You can’t perform that action at this time.