Skip to content

Commit

Permalink
KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs (#11331)
Browse files Browse the repository at this point in the history
With the changes for topic IDs, we have a different flow. When a broker receives a request, it uses a map to convert the topic ID to topic names. If the topic ID is not found in the map, we return a top level error and close the session. This decision was motivated by the difficulty to store “unresolved” partitions in the session. In earlier iterations we stored an “unresolved” partition object in the cache, but it was somewhat hard to reason about and required extra logic to try to resolve the topic ID on each incremental request and add to the session. It also required extra logic to forget the topic (either by topic ID if the topic name was never known or by topic name if it was finally resolved when we wanted to remove from the session.)

One helpful simplifying factor is that we only allow one type of request (uses topic ID or does not use topic ID) in the session. That means we can rely on a session continuing to have the same information. We don’t have to worry about converting topics only known by name to topic ID for a response and we won’t need to convert topics only known by ID to name for a response.

This PR introduces a change to store the "unresolved partitions" in the cached partition object. If a version 13+ request is sent with a topic ID that is unknown, a cached partition will be created with that fetch request data and a null topic name. On subsequent incremental requests, unresolved partitions may be resolved with the new IDs found in the metadata cache. When handling the request, getting all partitions will return a TopicIdPartition object that will be used to handle the request and build the response. Since we can rely on only one type of request (with IDs or without), the cached partitions map will have different keys depending on what fetch request version is being used. 

This PR involves changes both in FetchSessionHandler and FetchSession. Some major changes are outlined below.

1. FetchSessionHandler: Forgetting a topic and adding a new topic with the same name -  We may have a case where there is a topic foo with ID 1 in the session. Upon a subsequent metadata update, we may have topic foo with ID 2. This means that topic foo has been deleted and recreated. When sending fetch requests version 13+ we will send a request to add foo ID 2 to the session and remove foo ID 1. Otherwise, we will fall back to the same behavior for versions 12 and below

2. FetchSession: Resolving in Incremental Sessions - Incremental sessions contain two distinct sets of partitions. Partitions that are sent in the latest request that are new/updates/forgotten partitions and the partitions already in the session. If we want to resolve unknown topic IDs we will need to handle both cases.
    * Partitions in the request  - These partitions are either new or updating/forgetting previous partitions in the session. The new partitions are trivial. We either have a resolved partition or create a partition that is unresolved. For the other cases, we need to be a bit more careful. 
        * For updated partitions we have a few cases – keep in mind, we may not programmatically know if a partition is an update:
            1. partition in session is resolved, update is resolved: trivial

            2. partition in session is unresolved, update is unresolved: in code, this is equivalent to the case above, so trivial as well

            3. partition in session is unresolved, update is resolved: this means the partition in the session does not have a name, but the metadata cache now contains the name –  to fix this we can check if there exists a cached partition with the given ID and update it both with the partition update and with the topic name.

            4. partition in session is resolved, update is unresolved: this means the partition in the session has a name, but the update was unable to be resolved (ie, the topic is deleted) – this is the odd case. We will look up the partition using the ID. We will find the old version with a name but will not replace the name. This will lead to an UNKNOWN_TOPIC_OR_PARTITION or INCONSISTENT_TOPIC_ID error which will be handled with a metadata update. Likely a future request will forget the partition, and we will be able to do so by ID.

            5. Two partitions in the session have IDs, but they are different: only one topic ID should exist in the metadata at a time, so likely only one topic ID is in the fetch set. The other one should be in the toForget. We will be able to remove this partition from the session. If for some reason, we don't try to forget this partition — one of the partitions in the session will cause an inconsistent topic ID error and the metadata for this partition will be refreshed — this should result in the old ID being removed from the session. This should not happen if the FetchSessionHandler is correctly in sync.

        * For the forgotten partitions we have the same cases:
            1. partition in session is resolved, forgotten is resolved: trivial

            2. partition in session is unresolved, forgotten is unresolved: in code, this is equivalent to the case above, so trivial as well

            3. partition in session is unresolved, forgotten is resolved: this means the partition in the session does not have a name, but the metadata cache now contains the name –  to fix this we can check if there exists a cached partition with the given ID and try to forget it before we check the resolved name case.

            4. partition in session is resolved, update is unresolved: this means the partition in the session has a name, but the update was unable to be resolved (ie, the topic is deleted) We will look up the partition using the ID. We will find the old version with a name and be able to delete it.

            5. both partitions in the session have IDs, but they are different: This should be the same case as described above. If we somehow do not have the ID in the session, no partition will be removed. This should not happen unless the Fetch Session Handler is out of sync.

    * Partitions in the session - there may be some partitions in the session already that are unresolved. We can resolve them in forEachPartition using a method that checks if the partition is unresolved and tries to resolve it using a topicName map from the request. The partition will be resolved before the function using the cached partition is applied.

Reviewers: David Jacot <djacot@confluent.io>
  • Loading branch information
jolshan committed Nov 15, 2021
1 parent e9db5a1 commit e8818e2
Show file tree
Hide file tree
Showing 45 changed files with 2,854 additions and 1,515 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>

<suppress checks="CyclomaticComplexity"
files="(ConsumerCoordinator|Fetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords).java"/>
files="(ConsumerCoordinator|Fetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler).java"/>

<suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kafka.clients;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.protocol.Errors;
Expand Down Expand Up @@ -76,11 +77,6 @@ public FetchSessionHandler(LogContext logContext, int node) {
private LinkedHashMap<TopicPartition, PartitionData> sessionPartitions =
new LinkedHashMap<>(0);

/**
* All of the topic ids mapped to topic names for topics which exist in the fetch request session.
*/
private Map<String, Uuid> sessionTopicIds = new HashMap<>(0);

/**
* All of the topic names mapped to topic ids for topics which exist in the fetch request session.
*/
Expand All @@ -99,22 +95,18 @@ public static class FetchRequestData {
/**
* The partitions to send in the request's "forget" list.
*/
private final List<TopicPartition> toForget;

/**
* All of the partitions which exist in the fetch request session.
*/
private final Map<TopicPartition, PartitionData> sessionPartitions;
private final List<TopicIdPartition> toForget;

/**
* All of the topic IDs for topics which exist in the fetch request session.
* The partitions to send in the request's "forget" list if
* the version is >= 13.
*/
private final Map<String, Uuid> topicIds;
private final List<TopicIdPartition> toReplace;

/**
* All of the topic IDs for topics which exist in the fetch request session
* All of the partitions which exist in the fetch request session.
*/
private final Map<Uuid, String> topicNames;
private final Map<TopicPartition, PartitionData> sessionPartitions;

/**
* The metadata to use in this fetch request.
Expand All @@ -128,17 +120,15 @@ public static class FetchRequestData {
private final boolean canUseTopicIds;

FetchRequestData(Map<TopicPartition, PartitionData> toSend,
List<TopicPartition> toForget,
List<TopicIdPartition> toForget,
List<TopicIdPartition> toReplace,
Map<TopicPartition, PartitionData> sessionPartitions,
Map<String, Uuid> topicIds,
Map<Uuid, String> topicNames,
FetchMetadata metadata,
boolean canUseTopicIds) {
this.toSend = toSend;
this.toForget = toForget;
this.toReplace = toReplace;
this.sessionPartitions = sessionPartitions;
this.topicIds = topicIds;
this.topicNames = topicNames;
this.metadata = metadata;
this.canUseTopicIds = canUseTopicIds;
}
Expand All @@ -153,25 +143,24 @@ public Map<TopicPartition, PartitionData> toSend() {
/**
* Get a list of partitions to forget in this fetch request.
*/
public List<TopicPartition> toForget() {
public List<TopicIdPartition> toForget() {
return toForget;
}

/**
* Get a list of partitions to forget in this fetch request.
*/
public List<TopicIdPartition> toReplace() {
return toReplace;
}

/**
* Get the full set of partitions involved in this fetch request.
*/
public Map<TopicPartition, PartitionData> sessionPartitions() {
return sessionPartitions;
}

public Map<String, Uuid> topicIds() {
return topicIds;
}

public Map<Uuid, String> topicNames() {
return topicNames;
}

public FetchMetadata metadata() {
return metadata;
}
Expand Down Expand Up @@ -201,7 +190,14 @@ public String toString() {
}
bld.append("), toForget=(");
prefix = "";
for (TopicPartition partition : toForget) {
for (TopicIdPartition partition : toForget) {
bld.append(prefix);
bld.append(partition);
prefix = ", ";
}
bld.append("), toReplace=(");
prefix = "";
for (TopicIdPartition partition : toReplace) {
bld.append(prefix);
bld.append(partition);
prefix = ", ";
Expand All @@ -216,15 +212,6 @@ public String toString() {
}
}
}
bld.append("), topicIds=(");
String prefix = "";
for (Map.Entry<String, Uuid> entry : topicIds.entrySet()) {
bld.append(prefix);
bld.append(entry.getKey());
bld.append(": ");
bld.append(entry.getValue());
prefix = ", ";
}
if (canUseTopicIds) {
bld.append("), canUseTopicIds=True");
} else {
Expand All @@ -250,32 +237,32 @@ public class Builder {
* incremental fetch requests (see below).
*/
private LinkedHashMap<TopicPartition, PartitionData> next;
private Map<String, Uuid> topicIds;
private Map<Uuid, String> topicNames;
private final boolean copySessionPartitions;
private int partitionsWithoutTopicIds = 0;

Builder() {
this.next = new LinkedHashMap<>();
this.topicIds = new HashMap<>();
this.topicNames = new HashMap<>();
this.copySessionPartitions = true;
}

Builder(int initialSize, boolean copySessionPartitions) {
this.next = new LinkedHashMap<>(initialSize);
this.topicIds = new HashMap<>(initialSize);
this.topicNames = new HashMap<>();
this.copySessionPartitions = copySessionPartitions;
}

/**
* Mark that we want data from this partition in the upcoming fetch.
*/
public void add(TopicPartition topicPartition, Uuid topicId, PartitionData data) {
public void add(TopicPartition topicPartition, PartitionData data) {
next.put(topicPartition, data);
// topicIds should not change between adding partitions and building, so we can use putIfAbsent
if (!topicId.equals(Uuid.ZERO_UUID)) {
topicIds.putIfAbsent(topicPartition.topic(), topicId);
} else {
if (data.topicId.equals(Uuid.ZERO_UUID)) {
partitionsWithoutTopicIds++;
} else {
topicNames.putIfAbsent(data.topicId, topicPartition.topic());
}
}

Expand All @@ -285,52 +272,55 @@ public FetchRequestData build() {
if (nextMetadata.isFull()) {
if (log.isDebugEnabled()) {
log.debug("Built full fetch {} for node {} with {}.",
nextMetadata, node, partitionsToLogString(next.keySet()));
nextMetadata, node, topicPartitionsToLogString(next.keySet()));
}
sessionPartitions = next;
next = null;
// Only add topic IDs to the session if we are using topic IDs.
if (canUseTopicIds) {
sessionTopicIds = topicIds;
sessionTopicNames = new HashMap<>(topicIds.size());
topicIds.forEach((name, id) -> sessionTopicNames.put(id, name));
sessionTopicNames = topicNames;
} else {
sessionTopicIds = new HashMap<>();
sessionTopicNames = new HashMap<>();
sessionTopicNames = Collections.emptyMap();
}
topicIds = null;
Map<TopicPartition, PartitionData> toSend =
Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions));
Map<String, Uuid> toSendTopicIds =
Collections.unmodifiableMap(new HashMap<>(sessionTopicIds));
Map<Uuid, String> toSendTopicNames =
Collections.unmodifiableMap(new HashMap<>(sessionTopicNames));
return new FetchRequestData(toSend, Collections.emptyList(), toSend, toSendTopicIds, toSendTopicNames, nextMetadata, canUseTopicIds);
Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions));
return new FetchRequestData(toSend, Collections.emptyList(), Collections.emptyList(), toSend, nextMetadata, canUseTopicIds);
}

List<TopicPartition> added = new ArrayList<>();
List<TopicPartition> removed = new ArrayList<>();
List<TopicPartition> altered = new ArrayList<>();
List<TopicIdPartition> added = new ArrayList<>();
List<TopicIdPartition> removed = new ArrayList<>();
List<TopicIdPartition> altered = new ArrayList<>();
List<TopicIdPartition> replaced = new ArrayList<>();
for (Iterator<Entry<TopicPartition, PartitionData>> iter =
sessionPartitions.entrySet().iterator(); iter.hasNext(); ) {
sessionPartitions.entrySet().iterator(); iter.hasNext(); ) {
Entry<TopicPartition, PartitionData> entry = iter.next();
TopicPartition topicPartition = entry.getKey();
PartitionData prevData = entry.getValue();
PartitionData nextData = next.remove(topicPartition);
if (nextData != null) {
if (!prevData.equals(nextData)) {
// We basically check if the new partition had the same topic ID. If not,
// we add it to the "replaced" set. If the request is version 13 or higher, the replaced
// partition will be forgotten. In any case, we will send the new partition in the request.
if (!prevData.topicId.equals(nextData.topicId)
&& !prevData.topicId.equals(Uuid.ZERO_UUID)
&& !nextData.topicId.equals(Uuid.ZERO_UUID)) {
// Re-add the replaced partition to the end of 'next'
next.put(topicPartition, nextData);
entry.setValue(nextData);
replaced.add(new TopicIdPartition(prevData.topicId, topicPartition));
} else if (!prevData.equals(nextData)) {
// Re-add the altered partition to the end of 'next'
next.put(topicPartition, nextData);
entry.setValue(nextData);
altered.add(topicPartition);
altered.add(new TopicIdPartition(nextData.topicId, topicPartition));
}
} else {
// Remove this partition from the session.
iter.remove();
// Indicate that we no longer want to listen to this partition.
removed.add(topicPartition);
removed.add(new TopicIdPartition(prevData.topicId, topicPartition));
// If we do not have this topic ID in the builder or the session, we can not use topic IDs.
if (canUseTopicIds && !topicIds.containsKey(topicPartition.topic()) && !sessionTopicIds.containsKey(topicPartition.topic()))
if (canUseTopicIds && prevData.topicId.equals(Uuid.ZERO_UUID))
canUseTopicIds = false;
}
}
Expand All @@ -346,38 +336,34 @@ public FetchRequestData build() {
break;
}
sessionPartitions.put(topicPartition, nextData);
added.add(topicPartition);
added.add(new TopicIdPartition(nextData.topicId, topicPartition));
}

// Add topic IDs to session if we can use them. If an ID is inconsistent, we will handle in the receiving broker.
// If we switched from using topic IDs to not using them (or vice versa), that error will also be handled in the receiving broker.
if (canUseTopicIds) {
for (Map.Entry<String, Uuid> topic : topicIds.entrySet()) {
String topicName = topic.getKey();
Uuid addedId = topic.getValue();
sessionTopicIds.put(topicName, addedId);
sessionTopicNames.put(addedId, topicName);
}
sessionTopicNames = topicNames;
} else {
sessionTopicNames = Collections.emptyMap();
}

if (log.isDebugEnabled()) {
log.debug("Built incremental fetch {} for node {}. Added {}, altered {}, removed {} " +
"out of {}", nextMetadata, node, partitionsToLogString(added),
partitionsToLogString(altered), partitionsToLogString(removed),
partitionsToLogString(sessionPartitions.keySet()));
log.debug("Built incremental fetch {} for node {}. Added {}, altered {}, removed {}, " +
"replaced {} out of {}", nextMetadata, node, topicIdPartitionsToLogString(added),
topicIdPartitionsToLogString(altered), topicIdPartitionsToLogString(removed),
topicIdPartitionsToLogString(replaced), topicPartitionsToLogString(sessionPartitions.keySet()));
}
Map<TopicPartition, PartitionData> toSend = Collections.unmodifiableMap(next);
Map<TopicPartition, PartitionData> curSessionPartitions = copySessionPartitions
? Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions))
: Collections.unmodifiableMap(sessionPartitions);
Map<String, Uuid> toSendTopicIds =
Collections.unmodifiableMap(new HashMap<>(sessionTopicIds));
Map<Uuid, String> toSendTopicNames =
Collections.unmodifiableMap(new HashMap<>(sessionTopicNames));
next = null;
topicIds = null;
return new FetchRequestData(toSend, Collections.unmodifiableList(removed), curSessionPartitions,
toSendTopicIds, toSendTopicNames, nextMetadata, canUseTopicIds);
return new FetchRequestData(toSend,
Collections.unmodifiableList(removed),
Collections.unmodifiableList(replaced),
curSessionPartitions,
nextMetadata,
canUseTopicIds);
}
}

Expand All @@ -397,7 +383,14 @@ public Builder newBuilder(int size, boolean copySessionPartitions) {
return new Builder(size, copySessionPartitions);
}

private String partitionsToLogString(Collection<TopicPartition> partitions) {
private String topicPartitionsToLogString(Collection<TopicPartition> partitions) {
if (!log.isTraceEnabled()) {
return String.format("%d partition(s)", partitions.size());
}
return "(" + Utils.join(partitions, ", ") + ")";
}

private String topicIdPartitionsToLogString(Collection<TopicIdPartition> partitions) {
if (!log.isTraceEnabled()) {
return String.format("%d partition(s)", partitions.size());
}
Expand Down
Loading

0 comments on commit e8818e2

Please sign in to comment.