Skip to content

Commit

Permalink
KAFKA-8788: Optimize client metadata handling with a large number of …
Browse files Browse the repository at this point in the history
…partitions (apache#7192)

Credit to @lbradstreet for profiling the producer with a large number of partitions.

Cache `topicMetadata`, `brokers` and `controller` in the `MetadataResponse`
the first time it's needed avoid unnecessary recomputation. We were previously
computing`brokersMap` 4 times per partition in one code path that was invoked from
multiple places. This is a regression introduced via a42f16f and first released
in 2.3.0.

The `Cluster` constructor became significantly more allocation heavy due to
2c44e77, first released in 2.2.0. Replaced `merge` calls with more verbose,
but more efficient code. Added a test to verify that the returned collections are
unmodifiable.

Add `topicAuthorizedOperations` and `clusterAuthorizedOperations` to
`MetadataResponse` and remove `data()` method.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Lucas Bradstreet <lucasbradstreet@gmail.com>, Colin P. McCabe <cmccabe@confluent.io>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Justine Olshan <jolshan@confluent.io>
  • Loading branch information
ijuma committed Aug 16, 2019
1 parent 67d965d commit ef1fdec
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 81 deletions.
Expand Up @@ -1526,7 +1526,7 @@ void handleResponse(AbstractResponse abstractResponse) {
}
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions,
validAclOperations(response.data().topics().find(topicName).topicAuthorizedOperations()));
validAclOperations(response.topicAuthorizedOperations(topicName).get()));
future.complete(topicDescription);
}
}
Expand Down Expand Up @@ -1585,7 +1585,7 @@ void handleResponse(AbstractResponse abstractResponse) {
controllerFuture.complete(controller(response));
clusterIdFuture.complete(response.clusterId());
authorizedOperationsFuture.complete(
validAclOperations(response.data().clusterAuthorizedOperations()));
validAclOperations(response.clusterAuthorizedOperations()));
}

private Node controller(MetadataResponse response) {
Expand Down
55 changes: 47 additions & 8 deletions clients/src/main/java/org/apache/kafka/common/Cluster.java
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.kafka.common;

import org.apache.kafka.common.utils.Utils;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -108,23 +106,64 @@ private Cluster(String clusterId,

// Index the nodes for quick lookup
Map<Integer, Node> tmpNodesById = new HashMap<>();
for (Node node : nodes)
Map<Integer, List<PartitionInfo>> tmpPartitionsByNode = new HashMap<>(nodes.size());
for (Node node : nodes) {
tmpNodesById.put(node.id(), node);
// Populate the map here to make it easy to add the partitions per node efficiently when iterating over
// the partitions
tmpPartitionsByNode.put(node.id(), new ArrayList<>());
}
this.nodesById = Collections.unmodifiableMap(tmpNodesById);

// index the partition infos by topic, topic+partition, and node
// note that this code is performance sensitive if there are a large number of partitions so we are careful
// to avoid unnecessary work
Map<TopicPartition, PartitionInfo> tmpPartitionsByTopicPartition = new HashMap<>(partitions.size());
Map<String, List<PartitionInfo>> tmpPartitionsByTopic = new HashMap<>();
Map<String, List<PartitionInfo>> tmpAvailablePartitionsByTopic = new HashMap<>();
Map<Integer, List<PartitionInfo>> tmpPartitionsByNode = new HashMap<>();
for (PartitionInfo p : partitions) {
tmpPartitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
tmpPartitionsByTopic.merge(p.topic(), Collections.singletonList(p), Utils::concatListsUnmodifiable);
List<PartitionInfo> partitionsForTopic = tmpPartitionsByTopic.get(p.topic());
if (partitionsForTopic == null) {
partitionsForTopic = new ArrayList<>();
tmpPartitionsByTopic.put(p.topic(), partitionsForTopic);
}
partitionsForTopic.add(p);
if (p.leader() != null) {
tmpAvailablePartitionsByTopic.merge(p.topic(), Collections.singletonList(p), Utils::concatListsUnmodifiable);
tmpPartitionsByNode.merge(p.leader().id(), Collections.singletonList(p), Utils::concatListsUnmodifiable);
// The broker guarantees that if a partition has a non-null leader, it is one of the brokers returned
// in the metadata response
List<PartitionInfo> partitionsForNode = Objects.requireNonNull(tmpPartitionsByNode.get(p.leader().id()));
partitionsForNode.add(p);
}
}

// Update the values of `tmpPartitionsByNode` to contain unmodifiable lists
for (Map.Entry<Integer, List<PartitionInfo>> entry : tmpPartitionsByNode.entrySet()) {
tmpPartitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
}

// Populate `tmpAvailablePartitionsByTopic` and update the values of `tmpPartitionsByTopic` to contain
// unmodifiable lists
Map<String, List<PartitionInfo>> tmpAvailablePartitionsByTopic = new HashMap<>(tmpPartitionsByTopic.size());
for (Map.Entry<String, List<PartitionInfo>> entry : tmpPartitionsByTopic.entrySet()) {
String topic = entry.getKey();
List<PartitionInfo> partitionsForTopic = Collections.unmodifiableList(entry.getValue());
tmpPartitionsByTopic.put(topic, partitionsForTopic);
// Optimise for the common case where all partitions are available
boolean foundUnavailablePartition = partitionsForTopic.stream().anyMatch(p -> p.leader() == null);
List<PartitionInfo> availablePartitionsForTopic;
if (foundUnavailablePartition) {
availablePartitionsForTopic = new ArrayList<>(partitionsForTopic.size());
for (PartitionInfo p : partitionsForTopic) {
if (p.leader() != null)
availablePartitionsForTopic.add(p);
}
availablePartitionsForTopic = Collections.unmodifiableList(availablePartitionsForTopic);
} else {
availablePartitionsForTopic = partitionsForTopic;
}
tmpAvailablePartitionsByTopic.put(topic, availablePartitionsForTopic);
}

this.partitionsByTopicPartition = Collections.unmodifiableMap(tmpPartitionsByTopicPartition);
this.partitionsByTopic = Collections.unmodifiableMap(tmpPartitionsByTopic);
this.availablePartitionsByTopic = Collections.unmodifiableMap(tmpAvailablePartitionsByTopic);
Expand Down
Expand Up @@ -32,6 +32,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -57,17 +58,13 @@ public class MetadataResponse extends AbstractResponse {

public static final int AUTHORIZED_OPERATIONS_OMITTED = Integer.MIN_VALUE;

private MetadataResponseData data;
private final MetadataResponseData data;
private volatile Holder holder;

public MetadataResponse(MetadataResponseData data) {
this.data = data;
}

private Map<Integer, Node> brokersMap() {
return data.brokers().stream().collect(
Collectors.toMap(MetadataResponseBroker::nodeId, b -> new Node(b.nodeId(), b.host(), b.port(), b.rack())));
}

public MetadataResponse(Struct struct, short version) {
this(new MetadataResponseData(struct, version));
}
Expand All @@ -77,28 +74,6 @@ protected Struct toStruct(short version) {
return data.toStruct(version);
}

public MetadataResponseData data() {
return data;
}

private List<Node> convertToNodes(Map<Integer, Node> brokers, List<Integer> brokerIds) {
List<Node> nodes = new ArrayList<>(brokerIds.size());
for (Integer brokerId : brokerIds)
if (brokers.containsKey(brokerId))
nodes.add(brokers.get(brokerId));
else
nodes.add(new Node(brokerId, "", -1));
return nodes;
}

private Node getControllerNode(int controllerId, Collection<Node> brokers) {
for (Node broker : brokers) {
if (broker.id() == controllerId)
return broker;
}
return null;
}

@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
Expand Down Expand Up @@ -145,7 +120,6 @@ public Cluster cluster() {
Set<String> internalTopics = new HashSet<>();
List<PartitionInfo> partitions = new ArrayList<>();
for (TopicMetadata metadata : topicMetadata()) {

if (metadata.error == Errors.NONE) {
if (metadata.isInternal)
internalTopics.add(metadata.topic);
Expand All @@ -154,13 +128,30 @@ public Cluster cluster() {
}
}
}
return new Cluster(data.clusterId(), brokersMap().values(), partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
return new Cluster(data.clusterId(), brokers(), partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
topicsByError(Errors.INVALID_TOPIC_EXCEPTION), internalTopics, controller());
}

/**
* Transform a topic and PartitionMetadata into PartitionInfo
* @return
* Returns a 32-bit bitfield to represent authorized operations for this topic.
*/
public Optional<Integer> topicAuthorizedOperations(String topicName) {
MetadataResponseTopic topic = data.topics().find(topicName);
if (topic == null)
return Optional.empty();
else
return Optional.of(topic.topicAuthorizedOperations());
}

/**
* Returns a 32-bit bitfield to represent authorized operations for this cluster.
*/
public int clusterAuthorizedOperations() {
return data.clusterAuthorizedOperations();
}

/**
* Transform a topic and PartitionMetadata into PartitionInfo.
*/
public static PartitionInfo partitionMetaToInfo(String topic, PartitionMetadata partitionMetadata) {
return new PartitionInfo(
Expand All @@ -172,51 +163,38 @@ public static PartitionInfo partitionMetaToInfo(String topic, PartitionMetadata
partitionMetadata.offlineReplicas().toArray(new Node[0]));
}

private Holder holder() {
if (holder == null) {
synchronized (data) {
if (holder == null)
holder = new Holder(data);
}
}
return holder;
}

/**
* Get all brokers returned in metadata response
* @return the brokers
*/
public Collection<Node> brokers() {
return new ArrayList<>(brokersMap().values());
return holder().brokers;
}

/**
* Get all topic metadata returned in the metadata response
* @return the topicMetadata
*/
public Collection<TopicMetadata> topicMetadata() {
List<TopicMetadata> topicMetadataList = new ArrayList<>();
for (MetadataResponseTopic topicMetadata : data.topics()) {
Errors topicError = Errors.forCode(topicMetadata.errorCode());
String topic = topicMetadata.name();
boolean isInternal = topicMetadata.isInternal();
List<PartitionMetadata> partitionMetadataList = new ArrayList<>();

for (MetadataResponsePartition partitionMetadata : topicMetadata.partitions()) {
Errors partitionError = Errors.forCode(partitionMetadata.errorCode());
int partitionIndex = partitionMetadata.partitionIndex();
int leader = partitionMetadata.leaderId();
Optional<Integer> leaderEpoch = RequestUtils.getLeaderEpoch(partitionMetadata.leaderEpoch());
Node leaderNode = leader == -1 ? null : brokersMap().get(leader);
List<Node> replicaNodes = convertToNodes(brokersMap(), partitionMetadata.replicaNodes());
List<Node> isrNodes = convertToNodes(brokersMap(), partitionMetadata.isrNodes());
List<Node> offlineNodes = convertToNodes(brokersMap(), partitionMetadata.offlineReplicas());
partitionMetadataList.add(new PartitionMetadata(partitionError, partitionIndex, leaderNode, leaderEpoch,
replicaNodes, isrNodes, offlineNodes));
}

topicMetadataList.add(new TopicMetadata(topicError, topic, isInternal, partitionMetadataList,
topicMetadata.topicAuthorizedOperations()));
}
return topicMetadataList;
return holder().topicMetadata;
}

/**
* The controller node returned in metadata response
* @return the controller node or null if it doesn't exist
*/
public Node controller() {
return getControllerNode(data.controllerId(), brokers());
return holder().controller;
}

/**
Expand Down Expand Up @@ -381,18 +359,76 @@ public String toString() {
}
}

public static MetadataResponse prepareResponse(int throttleTimeMs, List<Node> brokers, String clusterId,
private static class Holder {
private final Collection<Node> brokers;
private final Node controller;
private final Collection<TopicMetadata> topicMetadata;

Holder(MetadataResponseData data) {
this.brokers = Collections.unmodifiableCollection(createBrokers(data));
Map<Integer, Node> brokerMap = brokers.stream().collect(Collectors.toMap(Node::id, b -> b));
this.topicMetadata = createTopicMetadata(data, brokerMap);
this.controller = brokerMap.get(data.controllerId());
}

private Collection<Node> createBrokers(MetadataResponseData data) {
return data.brokers().valuesList().stream().map(b ->
new Node(b.nodeId(), b.host(), b.port(), b.rack())).collect(Collectors.toList());
}

private Collection<TopicMetadata> createTopicMetadata(MetadataResponseData data, Map<Integer, Node> brokerMap) {
List<TopicMetadata> topicMetadataList = new ArrayList<>();
for (MetadataResponseTopic topicMetadata : data.topics()) {
Errors topicError = Errors.forCode(topicMetadata.errorCode());
String topic = topicMetadata.name();
boolean isInternal = topicMetadata.isInternal();
List<PartitionMetadata> partitionMetadataList = new ArrayList<>();

for (MetadataResponsePartition partitionMetadata : topicMetadata.partitions()) {
Errors partitionError = Errors.forCode(partitionMetadata.errorCode());
int partitionIndex = partitionMetadata.partitionIndex();
int leader = partitionMetadata.leaderId();
Optional<Integer> leaderEpoch = RequestUtils.getLeaderEpoch(partitionMetadata.leaderEpoch());
Node leaderNode = leader == -1 ? null : brokerMap.get(leader);
List<Node> replicaNodes = convertToNodes(brokerMap, partitionMetadata.replicaNodes());
List<Node> isrNodes = convertToNodes(brokerMap, partitionMetadata.isrNodes());
List<Node> offlineNodes = convertToNodes(brokerMap, partitionMetadata.offlineReplicas());
partitionMetadataList.add(new PartitionMetadata(partitionError, partitionIndex, leaderNode, leaderEpoch,
replicaNodes, isrNodes, offlineNodes));
}

topicMetadataList.add(new TopicMetadata(topicError, topic, isInternal, partitionMetadataList,
topicMetadata.topicAuthorizedOperations()));
}
return topicMetadataList;
}

private List<Node> convertToNodes(Map<Integer, Node> brokers, List<Integer> brokerIds) {
List<Node> nodes = new ArrayList<>(brokerIds.size());
for (Integer brokerId : brokerIds) {
Node node = brokers.get(brokerId);
if (node == null)
nodes.add(new Node(brokerId, "", -1));
else
nodes.add(node);
}
return nodes;
}

}

public static MetadataResponse prepareResponse(int throttleTimeMs, Collection<Node> brokers, String clusterId,
int controllerId, List<TopicMetadata> topicMetadataList,
int clusterAuthorizedOperations) {
MetadataResponseData responseData = new MetadataResponseData();
responseData.setThrottleTimeMs(throttleTimeMs);
brokers.forEach(broker -> {
brokers.forEach(broker ->
responseData.brokers().add(new MetadataResponseBroker()
.setNodeId(broker.id())
.setHost(broker.host())
.setPort(broker.port())
.setRack(broker.rack()));
});
.setRack(broker.rack()))
);

responseData.setClusterId(clusterId);
responseData.setControllerId(controllerId);
Expand Down Expand Up @@ -421,13 +457,13 @@ public static MetadataResponse prepareResponse(int throttleTimeMs, List<Node> br
return new MetadataResponse(responseData);
}

public static MetadataResponse prepareResponse(int throttleTimeMs, List<Node> brokers, String clusterId,
public static MetadataResponse prepareResponse(int throttleTimeMs, Collection<Node> brokers, String clusterId,
int controllerId, List<TopicMetadata> topicMetadataList) {
return prepareResponse(throttleTimeMs, brokers, clusterId, controllerId, topicMetadataList,
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
}

public static MetadataResponse prepareResponse(List<Node> brokers, String clusterId, int controllerId,
public static MetadataResponse prepareResponse(Collection<Node> brokers, String clusterId, int controllerId,
List<TopicMetadata> topicMetadata) {
return prepareResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, brokers, clusterId, controllerId, topicMetadata);
}
Expand Down
Expand Up @@ -1939,7 +1939,7 @@ public void testGetTopicMetadataOfflinePartitions() {
}
Node controller = originalResponse.controller();
MetadataResponse altered = MetadataResponse.prepareResponse(
(List<Node>) originalResponse.brokers(),
originalResponse.brokers(),
originalResponse.clusterId(),
controller != null ? controller.id() : MetadataResponse.NO_CONTROLLER_ID,
altTopics);
Expand Down
Expand Up @@ -32,7 +32,8 @@ public class StickyPartitionCacheTest {
private final static Node[] NODES = new Node[] {
new Node(0, "localhost", 99),
new Node(1, "localhost", 100),
new Node(12, "localhost", 101)
new Node(2, "localhost", 101),
new Node(11, "localhost", 102)
};
final static String TOPIC_A = "topicA";
final static String TOPIC_B = "topicB";
Expand All @@ -46,7 +47,7 @@ public void testStickyPartitionCache() {
new PartitionInfo(TOPIC_B, 0, NODES[0], NODES, NODES)
);
Cluster testCluster = new Cluster("clusterId", asList(NODES), allPartitions,
Collections.<String>emptySet(), Collections.<String>emptySet());
Collections.emptySet(), Collections.emptySet());
StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

int partA = stickyPartitionCache.partition(TOPIC_A, testCluster);
Expand Down Expand Up @@ -81,8 +82,8 @@ public void unavailablePartitionsTest() {
new PartitionInfo(TOPIC_C, 0, null, NODES, NODES)
);

Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1]), allPartitions,
Collections.<String>emptySet(), Collections.<String>emptySet());
Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1], NODES[2]), allPartitions,
Collections.emptySet(), Collections.emptySet());
StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

// Assure we never choose partition 1 because it is unavailable.
Expand Down

0 comments on commit ef1fdec

Please sign in to comment.