Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,8 @@ public class KafkaAdminClient extends AdminClient {
private final Map<TopicPartition, Integer> partitionLeaderCache;
private final AdminFetchMetricsManager adminFetchMetricsManager;
private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
private final Map<String, Uuid> topicIdsByNames = new HashMap<>();
private final Map<Uuid, String> topicNameById = new HashMap<>();

/**
* The telemetry requests client instance id.
Expand Down Expand Up @@ -4263,7 +4265,8 @@ public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartit
ListOffsetsHandler.newFuture(topicPartitionOffsets.keySet(), partitionLeaderCache);
Map<TopicPartition, Long> offsetQueriesByPartition = topicPartitionOffsets.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> getOffsetFromSpec(e.getValue())));
ListOffsetsHandler handler = new ListOffsetsHandler(offsetQueriesByPartition, options, logContext, defaultApiTimeoutMs);
ListOffsetsHandler handler = new ListOffsetsHandler(offsetQueriesByPartition, options, logContext, defaultApiTimeoutMs,
topicIdsByNames, topicNameById);
invokeDriver(handler, future, options.timeoutMs);
return new ListOffsetsResult(future.all());
}
Expand Down Expand Up @@ -5109,6 +5112,8 @@ AbstractRequest.Builder<?> createRequest(int timeoutMs) {
void handleResponse(AbstractResponse response) {
long currentTimeMs = time.milliseconds();
driver.onResponse(currentTimeMs, spec, response, this.curNode());
topicIdsByNames.putAll(driver.getTopicIdByName());
topicNameById.putAll(driver.getTopicNameById());
maybeSendRequests(driver, currentTimeMs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.AbstractRequest;
Expand Down Expand Up @@ -92,6 +93,8 @@ public class AdminApiDriver<K, V> {
private final BiMultimap<ApiRequestScope, K> lookupMap = new BiMultimap<>();
private final BiMultimap<FulfillmentScope, K> fulfillmentMap = new BiMultimap<>();
private final Map<ApiRequestScope, RequestState> requestStates = new HashMap<>();
private final Map<String, Uuid> topicIdByName = new HashMap<>();
private final Map<Uuid, String> topicNameById = new HashMap<>();

public AdminApiDriver(
AdminApiHandler<K, V> handler,
Expand Down Expand Up @@ -243,11 +246,22 @@ public void onResponse(
);

result.completedKeys.forEach(lookupMap::remove);
this.topicIdByName.putAll(result.topicIdByName);
this.topicNameById.putAll(result.topicNameById);

completeLookup(result.mappedKeys);
completeLookupExceptionally(result.failedKeys);
}
}

public Map<String, Uuid> getTopicIdByName() {
return topicIdByName;
}

public Map<Uuid, String> getTopicNameById() {
return topicNameById;
}

/**
* Callback that is invoked when a `Call` is failed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.admin.internals;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
Expand Down Expand Up @@ -113,21 +114,46 @@ class LookupResult<K> {
// phase. The driver will not attempt lookup or fulfillment for failed keys.
public final Map<K, Throwable> failedKeys;

public final Map<String, Uuid> topicIdByName;

public final Map<Uuid, String> topicNameById;

public LookupResult(
Map<K, Throwable> failedKeys,
Map<K, Integer> mappedKeys
) {
this(Collections.emptyList(), failedKeys, mappedKeys);
}

public LookupResult(
Map<K, Throwable> failedKeys,
Map<K, Integer> mappedKeys,
Map<String, Uuid> topicIdByName,
Map<Uuid, String> topicNameById
) {
this(Collections.emptyList(), failedKeys, mappedKeys, topicIdByName, topicNameById);
}

public LookupResult(
List<K> completedKeys,
Map<K, Throwable> failedKeys,
Map<K, Integer> mappedKeys
) {
this(completedKeys, failedKeys, mappedKeys, Collections.emptyMap(), Collections.emptyMap());
}

public LookupResult(
List<K> completedKeys,
Map<K, Throwable> failedKeys,
Map<K, Integer> mappedKeys,
Map<String, Uuid> topicIdByName,
Map<Uuid, String> topicNameById
) {
this.completedKeys = Collections.unmodifiableList(completedKeys);
this.failedKeys = Collections.unmodifiableMap(failedKeys);
this.mappedKeys = Collections.unmodifiableMap(mappedKeys);
this.topicIdByName = Collections.unmodifiableMap(topicIdByName);
this.topicNameById = Collections.unmodifiableMap(topicNameById);
}

static <K> LookupResult<K> empty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
Expand All @@ -43,6 +44,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -54,18 +56,24 @@ public final class ListOffsetsHandler extends Batched<TopicPartition, ListOffset
private final Logger log;
private final AdminApiLookupStrategy<TopicPartition> lookupStrategy;
private final int defaultApiTimeoutMs;
private final Map<String, Uuid> topicIdByName;
private final Map<Uuid, String> topicNameById;

public ListOffsetsHandler(
Map<TopicPartition, Long> offsetTimestampsByPartition,
ListOffsetsOptions options,
LogContext logContext,
int defaultApiTimeoutMs
int defaultApiTimeoutMs,
Map<String, Uuid> topicIdByName,
Map<Uuid, String> topicNameById
) {
this.offsetTimestampsByPartition = offsetTimestampsByPartition;
this.options = options;
this.log = logContext.logger(ListOffsetsHandler.class);
this.lookupStrategy = new PartitionLeaderStrategy(logContext, false);
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
this.topicIdByName = topicIdByName;
this.topicNameById = topicNameById;
}

@Override
Expand All @@ -82,7 +90,8 @@ public AdminApiLookupStrategy<TopicPartition> lookupStrategy() {
ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set<TopicPartition> keys) {
Map<String, ListOffsetsTopic> topicsByName = CollectionUtils.groupPartitionsByTopic(
keys,
topicName -> new ListOffsetsTopic().setName(topicName),
topicName -> new ListOffsetsTopic().setName(topicName)
.setTopicId(topicIdByName.getOrDefault(topicName, Uuid.ZERO_UUID)),
(listOffsetsTopic, partitionId) -> {
TopicPartition topicPartition = new TopicPartition(listOffsetsTopic.name(), partitionId);
long offsetTimestamp = offsetTimestampsByPartition.get(topicPartition);
Expand All @@ -91,6 +100,15 @@ ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set<TopicPartition>
.setPartitionIndex(partitionId)
.setTimestamp(offsetTimestamp));
});

// Only allow topicId-based protocol (v12) if ALL topics have valid topicIds
// If any topic has ZERO_UUID, we must restrict to name-based protocol (v11 or lower)
// This is because in a given protocol version, we can only use topicId OR topicName, not both
boolean canUseTopicIds = !topicsByName.isEmpty() && topicsByName.values().stream()
.filter(Objects::nonNull)
.map(ListOffsetsTopic::topicId)
.allMatch(topicId -> topicId != null && !topicId.equals(Uuid.ZERO_UUID));

boolean supportsMaxTimestamp = keys
.stream()
.anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.MAX_TIMESTAMP);
Expand All @@ -113,7 +131,8 @@ ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set<TopicPartition>
supportsMaxTimestamp,
requireEarliestLocalTimestamp,
requireTieredStorageTimestamp,
requireEarliestPendingUploadTimestamp)
requireEarliestPendingUploadTimestamp,
canUseTopicIds)
.setTargetTimes(new ArrayList<>(topicsByName.values()))
.setTimeoutMs(timeoutMs);
}
Expand All @@ -132,7 +151,18 @@ public ApiResult<TopicPartition, ListOffsetsResultInfo> handleResponse(

for (ListOffsetsTopicResponse topic : response.topics()) {
for (ListOffsetsPartitionResponse partition : topic.partitions()) {
TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex());
// Determine topic name based on response version:
// Version 12+: uses topicId (name will be null/empty)
// Version < 12: uses name (topicId will be null or ZERO_UUID)
TopicPartition topicPartition;
if (topic.topicId() != null && !topic.topicId().equals(Uuid.ZERO_UUID)) {
// Version 12+: resolve topicName from topicId
String topicName = topicNameById.get(topic.topicId());
topicPartition = new TopicPartition(topicName, partition.partitionIndex());
} else {
// Version < 12: use topicName directly
topicPartition = new TopicPartition(topic.name(), partition.partitionIndex());
}
Errors error = Errors.forCode(partition.errorCode());
if (!offsetTimestampsByPartition.containsKey(topicPartition)) {
log.warn("ListOffsets response includes unknown topic partition {}", topicPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
Expand Down Expand Up @@ -163,9 +164,14 @@ public LookupResult<TopicPartition> handleResponse(
MetadataResponse response = (MetadataResponse) abstractResponse;
Map<TopicPartition, Throwable> failed = new HashMap<>();
Map<TopicPartition, Integer> mapped = new HashMap<>();
Map<String, Uuid> topicIdByName = new HashMap<>();
Map<Uuid, String> topiccNameById = new HashMap<>();

for (MetadataResponseData.MetadataResponseTopic topicMetadata : response.data().topics()) {
String topic = topicMetadata.name();
Uuid topicId = topicMetadata.topicId();
topicIdByName.put(topic, topicId);
topiccNameById.put(topicId, topic);
Errors topicError = Errors.forCode(topicMetadata.errorCode());
if (topicError != Errors.NONE) {
handleTopicError(topic, topicError, requestPartitions, failed);
Expand Down Expand Up @@ -196,7 +202,7 @@ public LookupResult<TopicPartition> handleResponse(
}
}
}
return new LookupResult<>(failed, mapped);
return new LookupResult<>(failed, mapped, topicIdByName, topiccNameById);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
Expand Down Expand Up @@ -60,7 +62,13 @@ public static class Builder extends AbstractRequest.Builder<ListOffsetsRequest>

public static Builder forConsumer(boolean requireTimestamp,
IsolationLevel isolationLevel) {
return forConsumer(requireTimestamp, isolationLevel, false, false, false, false);
return forConsumer(requireTimestamp, isolationLevel, false, false, false, false, false);
}

public static Builder forConsumer(boolean requireTimestamp,
IsolationLevel isolationLevel,
boolean canUseTopicIds) {
return forConsumer(requireTimestamp, isolationLevel, false, false, false, false, canUseTopicIds);
}

public static Builder forConsumer(boolean requireTimestamp,
Expand All @@ -69,6 +77,18 @@ public static Builder forConsumer(boolean requireTimestamp,
boolean requireEarliestLocalTimestamp,
boolean requireTieredStorageTimestamp,
boolean requireEarliestPendingUploadTimestamp) {
return forConsumer(requireTimestamp, isolationLevel, requireMaxTimestamp,
requireEarliestLocalTimestamp, requireTieredStorageTimestamp,
requireEarliestPendingUploadTimestamp, false);
}

public static Builder forConsumer(boolean requireTimestamp,
IsolationLevel isolationLevel,
boolean requireMaxTimestamp,
boolean requireEarliestLocalTimestamp,
boolean requireTieredStorageTimestamp,
boolean requireEarliestPendingUploadTimestamp,
boolean canUseTopicIds) {
short minVersion = ApiKeys.LIST_OFFSETS.oldestVersion();
if (requireEarliestPendingUploadTimestamp)
minVersion = 11;
Expand All @@ -82,7 +102,10 @@ else if (isolationLevel == IsolationLevel.READ_COMMITTED)
minVersion = 2;
else if (requireTimestamp)
minVersion = 1;
return new Builder(minVersion, ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel);

// When canUseTopicIds is false, limit maxVersion to 11 to use name-based protocol
short maxVersion = canUseTopicIds ? ApiKeys.LIST_OFFSETS.latestVersion() : (short) 11;
return new Builder(minVersion, maxVersion, CONSUMER_REPLICA_ID, isolationLevel);
}

public static Builder forReplica(short allowedVersion, int replicaId) {
Expand Down Expand Up @@ -111,6 +134,22 @@ public Builder setTimeoutMs(int timeoutMs) {

@Override
public ListOffsetsRequest build(short version) {
if (version >= 12) {
data.topics().forEach(topic -> {
if (topic.topicId() == null || topic.topicId().equals(Uuid.ZERO_UUID)) {
throw new UnsupportedVersionException("The broker offset commit api version " +
version + " does require usage of topic ids.");
}
});
} else {
data.topics().forEach(topic -> {
if (topic.name() == null || topic.name().isEmpty()) {
throw new UnsupportedVersionException("The broker offset commit api version " +
version + " does require usage of topic names.");
}
});
}

return new ListOffsetsRequest(data, version);
}

Expand Down Expand Up @@ -144,14 +183,16 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {

List<ListOffsetsTopicResponse> responses = new ArrayList<>();
for (ListOffsetsTopic topic : data.topics()) {
ListOffsetsTopicResponse topicResponse = new ListOffsetsTopicResponse().setName(topic.name());
ListOffsetsTopicResponse topicResponse = new ListOffsetsTopicResponse()
.setName(topic.name())
.setTopicId(topic.topicId());
List<ListOffsetsPartitionResponse> partitions = new ArrayList<>();
for (ListOffsetsPartition partition : topic.partitions()) {
ListOffsetsPartitionResponse partitionResponse = new ListOffsetsPartitionResponse()
.setErrorCode(errorCode)
.setPartitionIndex(partition.partitionIndex());
partitionResponse.setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
.setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP);
.setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP);
partitions.add(partitionResponse);
}
topicResponse.setPartitions(partitions);
Expand Down Expand Up @@ -201,4 +242,4 @@ public static List<ListOffsetsTopic> toListOffsetsTopics(Map<TopicPartition, Lis
}
return new ArrayList<>(topics.values());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ public boolean shouldClientThrottle(short version) {
return version >= 3;
}

public static boolean useTopicIds(short version) {
return version >= 12;
}

public static ListOffsetsTopicResponse singletonListOffsetsTopicResponse(TopicPartition tp, Errors error, long timestamp, long offset, int epoch) {
return new ListOffsetsTopicResponse()
.setName(tp.topic())
Expand All @@ -112,4 +116,6 @@ public static ListOffsetsTopicResponse singletonListOffsetsTopicResponse(TopicPa
.setOffset(offset)
.setLeaderEpoch(epoch)));
}
}


}
Loading
Loading