diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 8478ed19f80c8..dea905fcf803c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -410,6 +410,8 @@ public class KafkaAdminClient extends AdminClient { private final Map partitionLeaderCache; private final AdminFetchMetricsManager adminFetchMetricsManager; private final Optional clientTelemetryReporter; + private final Map topicIdsByNames = new HashMap<>(); + private final Map topicNameById = new HashMap<>(); /** * The telemetry requests client instance id. @@ -4263,7 +4265,8 @@ public ListOffsetsResult listOffsets(Map topicPartit ListOffsetsHandler.newFuture(topicPartitionOffsets.keySet(), partitionLeaderCache); Map offsetQueriesByPartition = topicPartitionOffsets.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> getOffsetFromSpec(e.getValue()))); - ListOffsetsHandler handler = new ListOffsetsHandler(offsetQueriesByPartition, options, logContext, defaultApiTimeoutMs); + ListOffsetsHandler handler = new ListOffsetsHandler(offsetQueriesByPartition, options, logContext, defaultApiTimeoutMs, + topicIdsByNames, topicNameById); invokeDriver(handler, future, options.timeoutMs); return new ListOffsetsResult(future.all()); } @@ -5109,6 +5112,8 @@ AbstractRequest.Builder createRequest(int timeoutMs) { void handleResponse(AbstractResponse response) { long currentTimeMs = time.milliseconds(); driver.onResponse(currentTimeMs, spec, response, this.curNode()); + topicIdsByNames.putAll(driver.getTopicIdByName()); + topicNameById.putAll(driver.getTopicNameById()); maybeSendRequests(driver, currentTimeMs); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java index 6286f59ed7163..dc45727dfaf26 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.requests.AbstractRequest; @@ -92,6 +93,8 @@ public class AdminApiDriver { private final BiMultimap lookupMap = new BiMultimap<>(); private final BiMultimap fulfillmentMap = new BiMultimap<>(); private final Map requestStates = new HashMap<>(); + private final Map topicIdByName = new HashMap<>(); + private final Map topicNameById = new HashMap<>(); public AdminApiDriver( AdminApiHandler handler, @@ -243,11 +246,22 @@ public void onResponse( ); result.completedKeys.forEach(lookupMap::remove); + this.topicIdByName.putAll(result.topicIdByName); + this.topicNameById.putAll(result.topicNameById); + completeLookup(result.mappedKeys); completeLookupExceptionally(result.failedKeys); } } + public Map getTopicIdByName() { + return topicIdByName; + } + + public Map getTopicNameById() { + return topicNameById; + } + /** * Callback that is invoked when a `Call` is failed. */ diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiLookupStrategy.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiLookupStrategy.java index 084ae97d813b0..a7428d51c62cb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiLookupStrategy.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiLookupStrategy.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.admin.internals; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; @@ -113,6 +114,10 @@ class LookupResult { // phase. The driver will not attempt lookup or fulfillment for failed keys. public final Map failedKeys; + public final Map topicIdByName; + + public final Map topicNameById; + public LookupResult( Map failedKeys, Map mappedKeys @@ -120,14 +125,35 @@ public LookupResult( this(Collections.emptyList(), failedKeys, mappedKeys); } + public LookupResult( + Map failedKeys, + Map mappedKeys, + Map topicIdByName, + Map topicNameById + ) { + this(Collections.emptyList(), failedKeys, mappedKeys, topicIdByName, topicNameById); + } + public LookupResult( List completedKeys, Map failedKeys, Map mappedKeys + ) { + this(completedKeys, failedKeys, mappedKeys, Collections.emptyMap(), Collections.emptyMap()); + } + + public LookupResult( + List completedKeys, + Map failedKeys, + Map mappedKeys, + Map topicIdByName, + Map topicNameById ) { this.completedKeys = Collections.unmodifiableList(completedKeys); this.failedKeys = Collections.unmodifiableMap(failedKeys); this.mappedKeys = Collections.unmodifiableMap(mappedKeys); + this.topicIdByName = Collections.unmodifiableMap(topicIdByName); + this.topicNameById = Collections.unmodifiableMap(topicNameById); } static LookupResult empty() { diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java index 330a9efaf9b6c..a64f82b15738e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -43,6 +44,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -54,18 +56,24 @@ public final class ListOffsetsHandler extends Batched lookupStrategy; private final int defaultApiTimeoutMs; + private final Map topicIdByName; + private final Map topicNameById; public ListOffsetsHandler( Map offsetTimestampsByPartition, ListOffsetsOptions options, LogContext logContext, - int defaultApiTimeoutMs + int defaultApiTimeoutMs, + Map topicIdByName, + Map topicNameById ) { this.offsetTimestampsByPartition = offsetTimestampsByPartition; this.options = options; this.log = logContext.logger(ListOffsetsHandler.class); this.lookupStrategy = new PartitionLeaderStrategy(logContext, false); this.defaultApiTimeoutMs = defaultApiTimeoutMs; + this.topicIdByName = topicIdByName; + this.topicNameById = topicNameById; } @Override @@ -82,7 +90,8 @@ public AdminApiLookupStrategy lookupStrategy() { ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set keys) { Map topicsByName = CollectionUtils.groupPartitionsByTopic( keys, - topicName -> new ListOffsetsTopic().setName(topicName), + topicName -> new ListOffsetsTopic().setName(topicName) + .setTopicId(topicIdByName.getOrDefault(topicName, Uuid.ZERO_UUID)), (listOffsetsTopic, partitionId) -> { TopicPartition topicPartition = new TopicPartition(listOffsetsTopic.name(), partitionId); long offsetTimestamp = offsetTimestampsByPartition.get(topicPartition); @@ -91,6 +100,15 @@ ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set .setPartitionIndex(partitionId) .setTimestamp(offsetTimestamp)); }); + + // Only allow topicId-based protocol (v12) if ALL topics have valid topicIds + // If any topic has ZERO_UUID, we must restrict to name-based protocol (v11 or lower) + // This is because in a given protocol version, we can only use topicId OR topicName, not both + boolean canUseTopicIds = !topicsByName.isEmpty() && topicsByName.values().stream() + .filter(Objects::nonNull) + .map(ListOffsetsTopic::topicId) + .allMatch(topicId -> topicId != null && !topicId.equals(Uuid.ZERO_UUID)); + boolean supportsMaxTimestamp = keys .stream() .anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.MAX_TIMESTAMP); @@ -113,7 +131,8 @@ ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set supportsMaxTimestamp, requireEarliestLocalTimestamp, requireTieredStorageTimestamp, - requireEarliestPendingUploadTimestamp) + requireEarliestPendingUploadTimestamp, + canUseTopicIds) .setTargetTimes(new ArrayList<>(topicsByName.values())) .setTimeoutMs(timeoutMs); } @@ -132,7 +151,18 @@ public ApiResult handleResponse( for (ListOffsetsTopicResponse topic : response.topics()) { for (ListOffsetsPartitionResponse partition : topic.partitions()) { - TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex()); + // Determine topic name based on response version: + // Version 12+: uses topicId (name will be null/empty) + // Version < 12: uses name (topicId will be null or ZERO_UUID) + TopicPartition topicPartition; + if (topic.topicId() != null && !topic.topicId().equals(Uuid.ZERO_UUID)) { + // Version 12+: resolve topicName from topicId + String topicName = topicNameById.get(topic.topicId()); + topicPartition = new TopicPartition(topicName, partition.partitionIndex()); + } else { + // Version < 12: use topicName directly + topicPartition = new TopicPartition(topic.name(), partition.partitionIndex()); + } Errors error = Errors.forCode(partition.errorCode()); if (!offsetTimestampsByPartition.containsKey(topicPartition)) { log.warn("ListOffsets response includes unknown topic partition {}", topicPartition); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java index ff7dff2db8e22..6552889f3404a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.internals.KafkaFutureImpl; @@ -163,9 +164,14 @@ public LookupResult handleResponse( MetadataResponse response = (MetadataResponse) abstractResponse; Map failed = new HashMap<>(); Map mapped = new HashMap<>(); + Map topicIdByName = new HashMap<>(); + Map topiccNameById = new HashMap<>(); for (MetadataResponseData.MetadataResponseTopic topicMetadata : response.data().topics()) { String topic = topicMetadata.name(); + Uuid topicId = topicMetadata.topicId(); + topicIdByName.put(topic, topicId); + topiccNameById.put(topicId, topic); Errors topicError = Errors.forCode(topicMetadata.errorCode()); if (topicError != Errors.NONE) { handleTopicError(topic, topicError, requestPartitions, failed); @@ -196,7 +202,7 @@ public LookupResult handleResponse( } } } - return new LookupResult<>(failed, mapped); + return new LookupResult<>(failed, mapped, topicIdByName, topiccNameById); } /** diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java index 5862ebdfafc67..3ced2883ff943 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java @@ -18,6 +18,8 @@ import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.ListOffsetsRequestData; import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition; import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic; @@ -60,7 +62,13 @@ public static class Builder extends AbstractRequest.Builder public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel) { - return forConsumer(requireTimestamp, isolationLevel, false, false, false, false); + return forConsumer(requireTimestamp, isolationLevel, false, false, false, false, false); + } + + public static Builder forConsumer(boolean requireTimestamp, + IsolationLevel isolationLevel, + boolean canUseTopicIds) { + return forConsumer(requireTimestamp, isolationLevel, false, false, false, false, canUseTopicIds); } public static Builder forConsumer(boolean requireTimestamp, @@ -69,6 +77,18 @@ public static Builder forConsumer(boolean requireTimestamp, boolean requireEarliestLocalTimestamp, boolean requireTieredStorageTimestamp, boolean requireEarliestPendingUploadTimestamp) { + return forConsumer(requireTimestamp, isolationLevel, requireMaxTimestamp, + requireEarliestLocalTimestamp, requireTieredStorageTimestamp, + requireEarliestPendingUploadTimestamp, false); + } + + public static Builder forConsumer(boolean requireTimestamp, + IsolationLevel isolationLevel, + boolean requireMaxTimestamp, + boolean requireEarliestLocalTimestamp, + boolean requireTieredStorageTimestamp, + boolean requireEarliestPendingUploadTimestamp, + boolean canUseTopicIds) { short minVersion = ApiKeys.LIST_OFFSETS.oldestVersion(); if (requireEarliestPendingUploadTimestamp) minVersion = 11; @@ -82,7 +102,10 @@ else if (isolationLevel == IsolationLevel.READ_COMMITTED) minVersion = 2; else if (requireTimestamp) minVersion = 1; - return new Builder(minVersion, ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel); + + // When canUseTopicIds is false, limit maxVersion to 11 to use name-based protocol + short maxVersion = canUseTopicIds ? ApiKeys.LIST_OFFSETS.latestVersion() : (short) 11; + return new Builder(minVersion, maxVersion, CONSUMER_REPLICA_ID, isolationLevel); } public static Builder forReplica(short allowedVersion, int replicaId) { @@ -111,6 +134,22 @@ public Builder setTimeoutMs(int timeoutMs) { @Override public ListOffsetsRequest build(short version) { + if (version >= 12) { + data.topics().forEach(topic -> { + if (topic.topicId() == null || topic.topicId().equals(Uuid.ZERO_UUID)) { + throw new UnsupportedVersionException("The broker offset commit api version " + + version + " does require usage of topic ids."); + } + }); + } else { + data.topics().forEach(topic -> { + if (topic.name() == null || topic.name().isEmpty()) { + throw new UnsupportedVersionException("The broker offset commit api version " + + version + " does require usage of topic names."); + } + }); + } + return new ListOffsetsRequest(data, version); } @@ -144,14 +183,16 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { List responses = new ArrayList<>(); for (ListOffsetsTopic topic : data.topics()) { - ListOffsetsTopicResponse topicResponse = new ListOffsetsTopicResponse().setName(topic.name()); + ListOffsetsTopicResponse topicResponse = new ListOffsetsTopicResponse() + .setName(topic.name()) + .setTopicId(topic.topicId()); List partitions = new ArrayList<>(); for (ListOffsetsPartition partition : topic.partitions()) { ListOffsetsPartitionResponse partitionResponse = new ListOffsetsPartitionResponse() .setErrorCode(errorCode) .setPartitionIndex(partition.partitionIndex()); partitionResponse.setOffset(ListOffsetsResponse.UNKNOWN_OFFSET) - .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP); + .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP); partitions.add(partitionResponse); } topicResponse.setPartitions(partitions); @@ -201,4 +242,4 @@ public static List toListOffsetsTopics(Map(topics.values()); } -} +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java index cadff02033958..7db7ca0302652 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java @@ -102,6 +102,10 @@ public boolean shouldClientThrottle(short version) { return version >= 3; } + public static boolean useTopicIds(short version) { + return version >= 12; + } + public static ListOffsetsTopicResponse singletonListOffsetsTopicResponse(TopicPartition tp, Errors error, long timestamp, long offset, int epoch) { return new ListOffsetsTopicResponse() .setName(tp.topic()) @@ -112,4 +116,6 @@ public static ListOffsetsTopicResponse singletonListOffsetsTopicResponse(TopicPa .setOffset(offset) .setLeaderEpoch(epoch))); } -} + + +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/ListOffsetsRequest.json b/clients/src/main/resources/common/message/ListOffsetsRequest.json index 1a2de6ca30a2f..101be979bf169 100644 --- a/clients/src/main/resources/common/message/ListOffsetsRequest.json +++ b/clients/src/main/resources/common/message/ListOffsetsRequest.json @@ -42,7 +42,9 @@ // Version 10 enables async remote list offsets support (KIP-1075) // // Version 11 enables listing offsets by earliest pending upload offset (KIP-1023) - "validVersions": "1-11", + // + // Version 12 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code. + "validVersions": "1-12", "flexibleVersions": "6+", "latestVersionUnstable": false, "fields": [ @@ -52,8 +54,10 @@ "about": "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records." }, { "name": "Topics", "type": "[]ListOffsetsTopic", "versions": "0+", "about": "Each topic in the request.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0-11", "entityType": "topicName", "ignorable": true, "about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "12+", "ignorable": true, + "about": "The unique topic ID."}, { "name": "Partitions", "type": "[]ListOffsetsPartition", "versions": "0+", "about": "Each partition in the request.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", diff --git a/clients/src/main/resources/common/message/ListOffsetsResponse.json b/clients/src/main/resources/common/message/ListOffsetsResponse.json index 1407273bf4d8c..960d5c024e015 100644 --- a/clients/src/main/resources/common/message/ListOffsetsResponse.json +++ b/clients/src/main/resources/common/message/ListOffsetsResponse.json @@ -42,15 +42,19 @@ // Version 10 enables async remote list offsets support (KIP-1075) // // Version 11 enables listing offsets by earliest pending upload offset (KIP-1023) - "validVersions": "1-11", + // + // Version 12 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code. + "validVersions": "1-12", "flexibleVersions": "6+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Topics", "type": "[]ListOffsetsTopicResponse", "versions": "0+", "about": "Each topic in the response.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0-11", "entityType": "topicName", "ignorable": true, "about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "12+", "ignorable": true, + "about": "The unique topic ID."}, { "name": "Partitions", "type": "[]ListOffsetsPartitionResponse", "versions": "0+", "about": "Each partition in the response.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandlerTest.java index a7156554001aa..91736722b3c10 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandlerTest.java @@ -79,7 +79,8 @@ public final class ListOffsetsHandlerTest { @Test public void testBuildRequestSimple() { ListOffsetsHandler handler = - new ListOffsetsHandler(offsetTimestampsByPartition, new ListOffsetsOptions(), logContext, defaultApiTimeoutMs); + new ListOffsetsHandler(offsetTimestampsByPartition, new ListOffsetsOptions(), logContext, + defaultApiTimeoutMs, new HashMap<>(), new HashMap<>()); ListOffsetsRequest request = handler.buildBatchedRequest(node.id(), Set.of(t0p0, t0p1)).build(); List topics = request.topics(); assertEquals(1, topics.size()); @@ -96,7 +97,8 @@ public void testBuildRequestSimple() { public void testBuildRequestMultipleTopicsWithReadCommitted() { ListOffsetsHandler handler = new ListOffsetsHandler( - offsetTimestampsByPartition, new ListOffsetsOptions(IsolationLevel.READ_COMMITTED), logContext, defaultApiTimeoutMs); + offsetTimestampsByPartition, new ListOffsetsOptions(IsolationLevel.READ_COMMITTED), logContext, + defaultApiTimeoutMs, new HashMap<>(), new HashMap<>()); ListOffsetsRequest request = handler.buildBatchedRequest(node.id(), offsetTimestampsByPartition.keySet()).build(); List topics = request.topics(); @@ -117,14 +119,16 @@ public void testBuildRequestMultipleTopicsWithReadCommitted() { @Test public void testBuildRequestAllowedVersions() { ListOffsetsHandler defaultOptionsHandler = - new ListOffsetsHandler(offsetTimestampsByPartition, new ListOffsetsOptions(), logContext, defaultApiTimeoutMs); + new ListOffsetsHandler(offsetTimestampsByPartition, new ListOffsetsOptions(), logContext, + defaultApiTimeoutMs, new HashMap<>(), new HashMap<>()); ListOffsetsRequest.Builder builder = defaultOptionsHandler.buildBatchedRequest(node.id(), Set.of(t0p0, t0p1, t1p0)); assertEquals(1, builder.oldestAllowedVersion()); ListOffsetsHandler readCommittedHandler = new ListOffsetsHandler( - offsetTimestampsByPartition, new ListOffsetsOptions(IsolationLevel.READ_COMMITTED), logContext, defaultApiTimeoutMs); + offsetTimestampsByPartition, new ListOffsetsOptions(IsolationLevel.READ_COMMITTED), logContext, + defaultApiTimeoutMs, new HashMap<>(), new HashMap<>()); builder = readCommittedHandler.buildBatchedRequest(node.id(), Set.of(t0p0, t0p1, t1p0)); assertEquals(2, builder.oldestAllowedVersion()); @@ -224,7 +228,8 @@ public void testHandleResponseUnsupportedVersion() { maxTimestampPartitions.put(t1p1, OffsetSpec.maxTimestamp()); ListOffsetsHandler handler = - new ListOffsetsHandler(offsetTimestampsByPartition, new ListOffsetsOptions(), logContext, defaultApiTimeoutMs); + new ListOffsetsHandler(offsetTimestampsByPartition, new ListOffsetsOptions(), logContext, + defaultApiTimeoutMs, new HashMap<>(), new HashMap<>()); final Map nonMaxTimestampPartitions = new HashMap<>(offsetTimestampsByPartition); maxTimestampPartitions.forEach((k, v) -> nonMaxTimestampPartitions.remove(k)); @@ -256,7 +261,8 @@ public void testHandleResponseUnsupportedVersion() { public void testBuildRequestWithDefaultApiTimeoutMs() { ListOffsetsOptions options = new ListOffsetsOptions(); ListOffsetsHandler handler = - new ListOffsetsHandler(offsetTimestampsByPartition, options, logContext, defaultApiTimeoutMs); + new ListOffsetsHandler(offsetTimestampsByPartition, options, logContext, + defaultApiTimeoutMs, new HashMap<>(), new HashMap<>()); ListOffsetsRequest request = handler.buildBatchedRequest(node.id(), Set.of(t0p0, t0p1)).build(); assertEquals(defaultApiTimeoutMs, request.timeoutMs()); } @@ -266,7 +272,8 @@ public void testBuildRequestWithTimeoutMs() { Integer timeoutMs = 200; ListOffsetsOptions options = new ListOffsetsOptions().timeoutMs(timeoutMs); ListOffsetsHandler handler = - new ListOffsetsHandler(offsetTimestampsByPartition, options, logContext, defaultApiTimeoutMs); + new ListOffsetsHandler(offsetTimestampsByPartition, options, logContext, + defaultApiTimeoutMs, new HashMap<>(), new HashMap<>()); ListOffsetsRequest request = handler.buildBatchedRequest(node.id(), Set.of(t0p0, t0p1)).build(); assertEquals(timeoutMs, request.timeoutMs()); } @@ -307,7 +314,8 @@ private static ListOffsetsResponse createResponse( private ApiResult handleResponse(ListOffsetsResponse response) { ListOffsetsHandler handler = - new ListOffsetsHandler(offsetTimestampsByPartition, new ListOffsetsOptions(), logContext, defaultApiTimeoutMs); + new ListOffsetsHandler(offsetTimestampsByPartition, new ListOffsetsOptions(), logContext, + defaultApiTimeoutMs, new HashMap<>(), new HashMap<>()); return handler.handleResponse(node, offsetTimestampsByPartition.keySet(), response); } diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index 0bcd9731c462d..960fbbda891c8 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -182,43 +182,47 @@ public void testJoinGroupRequestVersions() throws Exception { testAllMessageRoundTripsFromVersion((short) 5, newRequest.get().setGroupInstanceId("instanceId")); } - @Test - public void testListOffsetsRequestVersions() throws Exception { - List v = Collections.singletonList(new ListOffsetsTopic() - .setName("topic") - .setPartitions(Collections.singletonList(new ListOffsetsPartition() - .setPartitionIndex(0) - .setTimestamp(123L)))); - Supplier newRequest = () -> new ListOffsetsRequestData() - .setTopics(v) - .setReplicaId(0); - testAllMessageRoundTrips(newRequest.get()); - testAllMessageRoundTripsFromVersion((short) 2, newRequest.get().setIsolationLevel(IsolationLevel.READ_COMMITTED.id())); + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.LIST_OFFSETS) + public void testListOffsetsRequestVersions(short version) throws Exception { + ListOffsetsRequestData request = new ListOffsetsRequestData() + .setReplicaId(0) + .setIsolationLevel(version >= 2 ? IsolationLevel.READ_COMMITTED.id() : 0) + .setTopics(singletonList( + new ListOffsetsTopic() + .setTopicId(version >= 12 ? Uuid.randomUuid() : Uuid.ZERO_UUID) + .setName(version < 12 ? "topic" : "") + .setPartitions(singletonList( + new ListOffsetsPartition() + .setPartitionIndex(0) + .setCurrentLeaderEpoch(version >= 4 ? 10 : -1) + .setTimestamp(123L) + )) + )); + + testMessageRoundTrip(version, request, request); } - @Test - public void testListOffsetsResponseVersions() throws Exception { - ListOffsetsPartitionResponse partition = new ListOffsetsPartitionResponse() - .setErrorCode(Errors.NONE.code()) - .setPartitionIndex(0); - List topics = Collections.singletonList(new ListOffsetsTopicResponse() - .setName("topic") - .setPartitions(Collections.singletonList(partition))); - Supplier response = () -> new ListOffsetsResponseData() - .setTopics(topics); - for (short version = ApiKeys.LIST_OFFSETS.oldestVersion(); version <= ApiKeys.LIST_OFFSETS.latestVersion(); ++version) { - ListOffsetsResponseData responseData = response.get(); - responseData.topics().get(0).partitions().get(0) - .setOffset(456L) - .setTimestamp(123L); - if (version > 1) { - responseData.setThrottleTimeMs(1000); - } - if (version > 3) { - partition.setLeaderEpoch(1); - } - testEquivalentMessageRoundTrip(version, responseData); - } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.LIST_OFFSETS) + public void testListOffsetsResponseVersions(short version) throws Exception { + ListOffsetsResponseData response = new ListOffsetsResponseData() + .setThrottleTimeMs(version >= 2 ? 1000 : 0) + .setTopics(singletonList( + new ListOffsetsTopicResponse() + .setTopicId(version >= 12 ? Uuid.randomUuid() : Uuid.ZERO_UUID) + .setName(version < 12 ? "topic" : "") + .setPartitions(singletonList( + new ListOffsetsPartitionResponse() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code()) + .setTimestamp(123L) + .setOffset(456L) + .setLeaderEpoch(version >= 4 ? 1 : -1) + )) + )); + + testMessageRoundTrip(version, response, response); } @Test diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java index 48542c1a2fd7d..86efdc1c0ca88 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java @@ -18,6 +18,8 @@ import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.ListOffsetsRequestData; import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition; import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic; @@ -37,6 +39,7 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class ListOffsetsRequestTest { @@ -61,22 +64,27 @@ public void testDuplicatePartitions() { @Test public void testGetErrorResponse() { + Uuid topicId = Uuid.randomUuid(); for (short version = 1; version <= ApiKeys.LIST_OFFSETS.latestVersion(); version++) { + // For version >= 12, we need to use topic IDs + boolean useTopicIds = version >= 12; List topics = Collections.singletonList( new ListOffsetsTopic() .setName("topic") + .setTopicId(useTopicIds ? topicId : Uuid.ZERO_UUID) .setPartitions(Collections.singletonList( new ListOffsetsPartition() .setPartitionIndex(0)))); ListOffsetsRequest request = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_COMMITTED) + .forConsumer(true, IsolationLevel.READ_COMMITTED, useTopicIds) .setTargetTimes(topics) .build(version); ListOffsetsResponse response = (ListOffsetsResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception()); - + List v = Collections.singletonList( new ListOffsetsTopicResponse() .setName("topic") + .setTopicId(useTopicIds ? topicId : Uuid.ZERO_UUID) .setPartitions(Collections.singletonList( new ListOffsetsPartitionResponse() .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()) @@ -146,4 +154,98 @@ public void testListOffsetsRequestOldestVersion() { assertEquals((short) 9, requireTieredStorageTimestampRequestBuilder.oldestAllowedVersion()); assertEquals((short) 11, requireEarliestPendingUploadTimestampRequestBuilder.oldestAllowedVersion()); } + + @Test + public void testBuildWithTopicIdsForVersion12() { + Uuid topicId = Uuid.randomUuid(); + List topics = Collections.singletonList( + new ListOffsetsTopic() + .setName("topic") + .setTopicId(topicId) + .setPartitions(Collections.singletonList( + new ListOffsetsPartition() + .setPartitionIndex(0) + .setTimestamp(123L)))); + + ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder + .forConsumer(true, IsolationLevel.READ_COMMITTED, true) + .setTargetTimes(topics); + + // Version 12 should succeed with topic IDs + ListOffsetsRequest request = builder.build((short) 12); + assertEquals(1, request.data().topics().size()); + assertEquals(topicId, request.data().topics().get(0).topicId()); + } + + @Test + public void testBuildWithTopicNamesForVersion11() { + List topics = Collections.singletonList( + new ListOffsetsTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new ListOffsetsPartition() + .setPartitionIndex(0) + .setTimestamp(123L)))); + + ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder + .forConsumer(true, IsolationLevel.READ_COMMITTED, false) + .setTargetTimes(topics); + + // Version 11 should succeed with topic names + ListOffsetsRequest request = builder.build((short) 11); + assertEquals(1, request.data().topics().size()); + assertEquals("topic", request.data().topics().get(0).name()); + } + + @Test + public void testBuildVersion12RequiresTopicIds() { + // Version 12 without topic ID should throw exception + List topics = Collections.singletonList( + new ListOffsetsTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new ListOffsetsPartition() + .setPartitionIndex(0) + .setTimestamp(123L)))); + + ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder + .forConsumer(true, IsolationLevel.READ_COMMITTED, true) + .setTargetTimes(topics); + + assertThrows(UnsupportedVersionException.class, () -> builder.build((short) 12)); + } + + @Test + public void testBuildVersion11RequiresTopicNames() { + // Version 11 without topic name should throw exception + Uuid topicId = Uuid.randomUuid(); + List topics = Collections.singletonList( + new ListOffsetsTopic() + .setTopicId(topicId) + .setPartitions(Collections.singletonList( + new ListOffsetsPartition() + .setPartitionIndex(0) + .setTimestamp(123L)))); + + ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder + .forConsumer(true, IsolationLevel.READ_COMMITTED, false) + .setTargetTimes(topics); + + assertThrows(UnsupportedVersionException.class, () -> builder.build((short) 11)); + } + + @Test + public void testCanUseTopicIdsLimitsMaxVersion() { + // When canUseTopicIds is false, maxVersion should be limited to 11 + ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder + .forConsumer(true, IsolationLevel.READ_COMMITTED, false); + + assertEquals((short) 11, builder.latestAllowedVersion()); + + // When canUseTopicIds is true, maxVersion should be the latest + ListOffsetsRequest.Builder builderWithTopicIds = ListOffsetsRequest.Builder + .forConsumer(true, IsolationLevel.READ_COMMITTED, true); + + assertEquals(ApiKeys.LIST_OFFSETS.latestVersion(), builderWithTopicIds.latestAllowedVersion()); + } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index b3fb3939b1aae..145f4462bcf4c 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -2309,9 +2309,15 @@ private ListOffsetsRequest createListOffsetRequest(short version) { .setTimestamp(1000000L) .setCurrentLeaderEpoch(5); - ListOffsetsTopic topic = new ListOffsetsTopic() - .setName("test") - .setPartitions(singletonList(partition)); + ListOffsetsTopic topic = new ListOffsetsTopic(); + // Version 12+ requires topic ID instead of topic name + if (version >= 12) { + topic.setTopicId(Uuid.randomUuid()); + } else { + topic.setName("test"); + } + topic.setPartitions(singletonList(partition)); + return ListOffsetsRequest.Builder .forConsumer(true, IsolationLevel.READ_COMMITTED) .setTargetTimes(singletonList(topic)) @@ -2331,10 +2337,17 @@ private ListOffsetsResponse createListOffsetResponse(short version) { if (version >= 4) { partition.setLeaderEpoch(27); } + ListOffsetsTopicResponse topicResponse = new ListOffsetsTopicResponse(); + // Version 12+ uses topic ID instead of topic name + if (version >= 12) { + topicResponse.setTopicId(Uuid.randomUuid()); + } else { + topicResponse.setName("test"); + } + topicResponse.setPartitions(singletonList(partition)); + ListOffsetsResponseData data = new ListOffsetsResponseData() - .setTopics(singletonList(new ListOffsetsTopicResponse() - .setName("test") - .setPartitions(singletonList(partition)))); + .setTopics(singletonList(topicResponse)); return new ListOffsetsResponse(data); } else { throw new IllegalArgumentException("Illegal ListOffsetResponse version " + version); diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b7f2e3364f077..5a815447518ab 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -35,7 +35,7 @@ import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartit import org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult, DeleteRecordsTopicResult} import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic -import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition +import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse} import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic} import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic @@ -784,12 +784,42 @@ class KafkaApis(val requestChannel: RequestChannel, .setOffset(ListOffsetsResponse.UNKNOWN_OFFSET) } + // For version >= 12, resolve topic IDs to names and handle unknown topic IDs + val (knownTopics, unknownTopicIdResponses) = if (ListOffsetsResponse.useTopicIds(version)) { + val known = new util.ArrayList[ListOffsetsTopic]() + val unknown = new util.ArrayList[ListOffsetsTopicResponse]() + offsetRequest.topics.asScala.foreach { topic => + val topicName = metadataCache.getTopicName(topic.topicId()).orElse(null) + + if (topicName == null) { + // Topic ID cannot be resolved to a name + unknown.add(new ListOffsetsTopicResponse() + .setName(topic.name()) + .setTopicId(topic.topicId()) + .setPartitions(topic.partitions.asScala.map(partition => + buildErrorResponse(Errors.UNKNOWN_TOPIC_ID, partition)).asJava)) + } else { + // Topic ID successfully resolved, create topic with resolved name + val resolvedTopic = new ListOffsetsTopic() + .setName(topicName) + .setTopicId(topic.topicId()) + .setPartitions(topic.partitions()) + known.add(resolvedTopic) + } + } + (known, unknown) + } else { + // version < 12, use topic names directly + (offsetRequest.topics(), new util.ArrayList[ListOffsetsTopicResponse]()) + } + val (authorizedRequestInfo, unauthorizedRequestInfo) = authHelper.partitionSeqByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + DESCRIBE, TOPIC, knownTopics.asScala.toSeq)(_.name) val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => new ListOffsetsTopicResponse() .setName(topic.name) + .setTopicId(topic.topicId) .setPartitions(topic.partitions.asScala.map(partition => buildErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED, partition)).asJava) ).asJava @@ -797,6 +827,7 @@ class KafkaApis(val requestChannel: RequestChannel, def sendResponseCallback(response: util.Collection[ListOffsetsTopicResponse]): Void = { val mergedResponses = new util.ArrayList(response) mergedResponses.addAll(unauthorizedResponseStatus) + mergedResponses.addAll(unknownTopicIdResponses) requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetsResponse(new ListOffsetsResponseData() .setThrottleTimeMs(requestThrottleMs) @@ -804,7 +835,7 @@ class KafkaApis(val requestChannel: RequestChannel, null } - if (authorizedRequestInfo.isEmpty) { + if (authorizedRequestInfo.isEmpty || knownTopics.isEmpty) { sendResponseCallback(util.List.of) } else { replicaManager.fetchOffset(authorizedRequestInfo, offsetRequest.duplicatePartitions().asScala, diff --git a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala index 80d41e3b0cf13..8fbaaabfa1d7a 100644 --- a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala +++ b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala @@ -106,21 +106,34 @@ class RemoteLeaderEndPoint(logPrefix: String, } private def fetchOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int, timestamp: Long): OffsetAndEpoch = { - val topic = new ListOffsetsTopic() - .setName(topicPartition.topic) - .setPartitions(Collections.singletonList( - new ListOffsetsPartition() - .setPartitionIndex(topicPartition.partition) - .setCurrentLeaderEpoch(currentLeaderEpoch) - .setTimestamp(timestamp))) val metadataVersion = metadataVersionSupplier() + val topicIdPartition = replicaManager.topicIdPartition(topicPartition) + val topicId = if (topicIdPartition != null) topicIdPartition.topicId() else Uuid.ZERO_UUID + val useTopicId = metadataVersion.listOffsetRequestVersion >= 12 && topicId != Uuid.ZERO_UUID + + val topic = new ListOffsetsTopic() + if (useTopicId) { + topic.setTopicId(topicId) + } else { + topic.setName(topicPartition.topic) + } + topic.setPartitions(Collections.singletonList( + new ListOffsetsPartition() + .setPartitionIndex(topicPartition.partition) + .setCurrentLeaderEpoch(currentLeaderEpoch) + .setTimestamp(timestamp))) + val requestBuilder = ListOffsetsRequest.Builder.forReplica(metadataVersion.listOffsetRequestVersion, brokerConfig.brokerId) .setTargetTimes(Collections.singletonList(topic)) val clientResponse = blockingSender.sendRequest(requestBuilder) val response = clientResponse.responseBody.asInstanceOf[ListOffsetsResponse] - val responsePartition = response.topics.asScala.find(_.name == topicPartition.topic).get - .partitions.asScala.find(_.partitionIndex == topicPartition.partition).get + val responseTopic = if (useTopicId) { + response.topics.asScala.find(_.topicId == topicId).get + } else { + response.topics.asScala.find(_.name == topicPartition.topic).get + } + val responsePartition = responseTopic.partitions.asScala.find(_.partitionIndex == topicPartition.partition).get Errors.forCode(responsePartition.errorCode) match { case Errors.NONE => new OffsetAndEpoch(responsePartition.offset, responsePartition.leaderEpoch) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 4ee24f2e41428..cf8a55c52f78d 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1477,17 +1477,31 @@ class ReplicaManager(val config: KafkaConfig, buildErrorResponse: (Errors, ListOffsetsPartition) => ListOffsetsPartitionResponse, responseCallback: Consumer[util.Collection[ListOffsetsTopicResponse]], timeoutMs: Int = 0): Unit = { - val statusByPartition = mutable.Map[TopicPartition, ListOffsetsPartitionStatus]() + val statusByPartition = mutable.Map[TopicIdPartition, ListOffsetsPartitionStatus]() topics.foreach { topic => topic.partitions.asScala.foreach { partition => - val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) - if (duplicatePartitions.contains(topicPartition)) { - debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + + var topicId = topic.topicId() + if (topicId == null) { + topicId = Uuid.ZERO_UUID + } + + val topicIdPartition = new TopicIdPartition(topicId, partition.partitionIndex, topic.name) + val requestTopicIdOpt = if (topicIdPartition.topicId == Uuid.ZERO_UUID) None else Some(topicIdPartition.topicId) + val metadataTopicId = metadataCache.getTopicId(topic.name) + val cachedTopicIdOpt = Option(metadataTopicId).filterNot(_ == Uuid.ZERO_UUID) + + if (duplicatePartitions.contains(topicIdPartition.topicPartition())) { + debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition ${topicIdPartition} " + s"failed because the partition is duplicated in the request.") - statusByPartition += topicPartition -> + statusByPartition += topicIdPartition -> ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.INVALID_REQUEST, partition))).build() + } else if (requestTopicIdOpt.isDefined && !cachedTopicIdOpt.contains(requestTopicIdOpt.get)) { + debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition ${topicIdPartition} " + + s"failed because the provided topic ID ${requestTopicIdOpt.get} does not match the current topic ID $cachedTopicIdOpt.") + statusByPartition += topicIdPartition -> + ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.INCONSISTENT_TOPIC_ID, partition))).build() } else if (isListOffsetsTimestampUnsupported(partition.timestamp(), version)) { - statusByPartition += topicPartition -> + statusByPartition += topicIdPartition -> ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.UNSUPPORTED_VERSION, partition))).build() } else { try { @@ -1498,7 +1512,7 @@ class ReplicaManager(val config: KafkaConfig, else None - val resultHolder = fetchOffsetForTimestamp(topicPartition, + val resultHolder = fetchOffsetForTimestamp(topicIdPartition.topicPartition(), partition.timestamp, isolationLevelOpt, if (partition.currentLeaderEpoch == ListOffsetsResponse.UNKNOWN_EPOCH) Optional.empty() else Optional.of(partition.currentLeaderEpoch), @@ -1538,33 +1552,34 @@ class ReplicaManager(val config: KafkaConfig, throw new IllegalStateException(s"Unexpected result holder state $resultHolder") } } - statusByPartition += topicPartition -> status + statusByPartition += topicIdPartition -> status } catch { // NOTE: These exceptions are special cases since these error messages are typically transient or the client // would have received a clear exception and there is no value in logging the entire stack trace for the same case e @ (_ : UnknownTopicOrPartitionException | + _ : UnknownTopicIdException | _ : NotLeaderOrFollowerException | _ : UnknownLeaderEpochException | _ : FencedLeaderEpochException | _ : KafkaStorageException | _ : UnsupportedForMessageFormatException) => debug(s"Offset request with correlation id $correlationId from client $clientId on " + - s"partition $topicPartition failed due to ${e.getMessage}") - statusByPartition += topicPartition -> + s"partition $topicIdPartition.topicPartition() failed due to ${e.getMessage}") + statusByPartition += topicIdPartition -> ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.forException(e), partition))).build() // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE case e: OffsetNotAvailableException => if (version >= 5) { - statusByPartition += topicPartition -> + statusByPartition += topicIdPartition -> ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.forException(e), partition))).build() } else { - statusByPartition += topicPartition -> + statusByPartition += topicIdPartition -> ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.LEADER_NOT_AVAILABLE, partition))).build() } case e: Throwable => error("Error while responding to offset request", e) - statusByPartition += topicPartition -> + statusByPartition += topicIdPartition -> ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.forException(e), partition))).build() } } @@ -1581,15 +1596,18 @@ class ReplicaManager(val config: KafkaConfig, delayedRemoteListOffsetsPurgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys.asJava) } else { // we can respond immediately - val responseTopics = statusByPartition.groupBy(e => e._1.topic()).map { - case (topic, status) => - new ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s => Some(s.responseOpt.get())).toList.asJava) + val responseTopics = statusByPartition.groupBy(e => (e._1.topic(), e._1.topicId())).map { + case ((topicName, topicId), statuses) => + new ListOffsetsTopicResponse() + .setName(topicName) + .setTopicId(topicId) + .setPartitions(statuses.values.flatMap(s => Some(s.responseOpt.get())).toList.asJava) }.toList responseCallback.accept(responseTopics.asJava) } } - private def delayedRemoteListOffsetsRequired(responseByPartition: Map[TopicPartition, ListOffsetsPartitionStatus]): Boolean = { + private def delayedRemoteListOffsetsRequired(responseByPartition: Map[TopicIdPartition, ListOffsetsPartitionStatus]): Boolean = { responseByPartition.values.exists(status => status.futureHolderOpt.isPresent) } @@ -2569,4 +2587,4 @@ class ReplicaManager(val config: KafkaConfig, () => () ) } -} +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 4d4fbe5b71084..c887c796ad830 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -4246,6 +4246,154 @@ class KafkaApisTest extends Logging { testConsumerListOffsetWithUnsupportedVersion(-6, 1) } + @Test + def testHandleListOffsetRequestWithTopicId(): Unit = { + val tp = new TopicPartition("foo", 0) + val topicId = Uuid.randomUuid() + val isolationLevel = IsolationLevel.READ_UNCOMMITTED + val currentLeaderEpoch = Optional.of[Integer](15) + + // Add topic to metadata cache + addTopicToMetadataCache(tp.topic(), numPartitions = 1, topicId = topicId) + + when(replicaManager.fetchOffset( + ArgumentMatchers.any[Seq[ListOffsetsTopic]](), + ArgumentMatchers.eq(Set.empty[TopicPartition]), + ArgumentMatchers.eq(isolationLevel), + ArgumentMatchers.eq(ListOffsetsRequest.CONSUMER_REPLICA_ID), + ArgumentMatchers.eq[String](clientId), + ArgumentMatchers.anyInt(), // correlationId + ArgumentMatchers.anyShort(), // version + ArgumentMatchers.any[(Errors, ListOffsetsPartition) => ListOffsetsPartitionResponse](), + ArgumentMatchers.any[Consumer[util.Collection[ListOffsetsTopicResponse]]], + ArgumentMatchers.anyInt() // timeoutMs + )).thenAnswer(ans => { + val callback = ans.getArgument[Consumer[util.List[ListOffsetsTopicResponse]]](8) + val partitionResponse = new ListOffsetsPartitionResponse() + .setErrorCode(Errors.NONE.code()) + .setOffset(42L) + .setTimestamp(123456L) + .setPartitionIndex(tp.partition()) + callback.accept(util.List.of(new ListOffsetsTopicResponse() + .setName(tp.topic()) + .setTopicId(topicId) + .setPartitions(util.List.of(partitionResponse)))) + }) + + // Version 12: use topic ID + val targetTimes = util.List.of(new ListOffsetsTopic() + .setName(tp.topic) + .setTopicId(topicId) + .setPartitions(util.List.of(new ListOffsetsPartition() + .setPartitionIndex(tp.partition) + .setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP) + .setCurrentLeaderEpoch(currentLeaderEpoch.get)))) + val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, true) + .setTargetTimes(targetTimes).build(12.toShort) + val request = buildRequest(listOffsetRequest) + when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), + any[Long])).thenReturn(0) + kafkaApis = createKafkaApis() + kafkaApis.handleListOffsetRequest(request) + + val response = verifyNoThrottling[ListOffsetsResponse](request) + // v12 does not return topic name, return topic id instead + assertTrue(response.topics.asScala.exists(_.name == ""), s"Topic ${tp.topic} not found in response. Found: ${response.topics.asScala.map(_.name).mkString(", ")}") + val topicResponse = response.topics.asScala.find(_.name == "").get + assertEquals(topicId, topicResponse.topicId) + val partitionData = topicResponse.partitions.asScala.find(_.partitionIndex == tp.partition).get + assertEquals(Errors.NONE.code, partitionData.errorCode) + assertEquals(42L, partitionData.offset) + assertEquals(123456L, partitionData.timestamp) + } + + @Test + def testHandleListOffsetRequestWithUnknownTopicId(): Unit = { + val tp = new TopicPartition("foo", 0) + val unknownTopicId = Uuid.randomUuid() + val isolationLevel = IsolationLevel.READ_UNCOMMITTED + val currentLeaderEpoch = Optional.of[Integer](15) + + // Don't add topic to metadata cache - simulate unknown topic ID + + // Version 12: use unknown topic ID + val targetTimes = util.List.of(new ListOffsetsTopic() + .setName(tp.topic) + .setTopicId(unknownTopicId) + .setPartitions(util.List.of(new ListOffsetsPartition() + .setPartitionIndex(tp.partition) + .setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP) + .setCurrentLeaderEpoch(currentLeaderEpoch.get)))) + val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, true) + .setTargetTimes(targetTimes).build(12.toShort) + val request = buildRequest(listOffsetRequest) + when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), + any[Long])).thenReturn(0) + kafkaApis = createKafkaApis() + kafkaApis.handleListOffsetRequest(request) + + val response = verifyNoThrottling[ListOffsetsResponse](request) + val topicResponse = response.topics.asScala.find(_.topicId == unknownTopicId).get + assertEquals(unknownTopicId, topicResponse.topicId) + val partitionData = topicResponse.partitions.asScala.find(_.partitionIndex == tp.partition).get + assertEquals(Errors.UNKNOWN_TOPIC_ID.code, partitionData.errorCode) + } + + @Test + def testHandleListOffsetRequestVersion11WithTopicName(): Unit = { + val tp = new TopicPartition("foo", 0) + val isolationLevel = IsolationLevel.READ_UNCOMMITTED + val currentLeaderEpoch = Optional.of[Integer](15) + + // Add topic to metadata cache + addTopicToMetadataCache(tp.topic(), 1) + + when(replicaManager.fetchOffset( + ArgumentMatchers.any[Seq[ListOffsetsTopic]](), + ArgumentMatchers.eq(Set.empty[TopicPartition]), + ArgumentMatchers.eq(isolationLevel), + ArgumentMatchers.eq(ListOffsetsRequest.CONSUMER_REPLICA_ID), + ArgumentMatchers.eq[String](clientId), + ArgumentMatchers.anyInt(), // correlationId + ArgumentMatchers.anyShort(), // version + ArgumentMatchers.any[(Errors, ListOffsetsPartition) => ListOffsetsPartitionResponse](), + ArgumentMatchers.any[Consumer[util.Collection[ListOffsetsTopicResponse]]], + ArgumentMatchers.anyInt() // timeoutMs + )).thenAnswer(ans => { + val callback = ans.getArgument[Consumer[util.List[ListOffsetsTopicResponse]]](8) + val partitionResponse = new ListOffsetsPartitionResponse() + .setErrorCode(Errors.NONE.code()) + .setOffset(42L) + .setTimestamp(123456L) + .setPartitionIndex(tp.partition()) + callback.accept(util.List.of(new ListOffsetsTopicResponse() + .setName(tp.topic()) + .setPartitions(util.List.of(partitionResponse)))) + }) + + // Version 11: use topic name + val targetTimes = util.List.of(new ListOffsetsTopic() + .setName(tp.topic) + .setPartitions(util.List.of(new ListOffsetsPartition() + .setPartitionIndex(tp.partition) + .setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP) + .setCurrentLeaderEpoch(currentLeaderEpoch.get)))) + val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false) + .setTargetTimes(targetTimes).build(11.toShort) + val request = buildRequest(listOffsetRequest) + when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), + any[Long])).thenReturn(0) + kafkaApis = createKafkaApis() + kafkaApis.handleListOffsetRequest(request) + + val response = verifyNoThrottling[ListOffsetsResponse](request) + val topicResponse = response.topics.asScala.find(_.name == tp.topic).get + val partitionData = topicResponse.partitions.asScala.find(_.partitionIndex == tp.partition).get + assertEquals(Errors.NONE.code, partitionData.errorCode) + assertEquals(42L, partitionData.offset) + assertEquals(123456L, partitionData.timestamp) + } + /** * Verifies that the metadata response is correct if the broker listeners are inconsistent (i.e. one broker has * more listeners than another) and the request is sent on the listener that exists in both brokers. diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala index 304e63602a3d6..a6117386d93e2 100644 --- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala @@ -22,7 +22,7 @@ import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartit import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse} import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource -import org.apache.kafka.common.{IsolationLevel, TopicPartition} +import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid} import org.apache.kafka.server.config.ServerConfigs import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test @@ -46,6 +46,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { @Test def testListOffsetsErrorCodes(): Unit = { + // Use version 11 which supports topic names instead of version 12 which requires topic IDs val targetTimes = List(new ListOffsetsTopic() .setName(topic) .setPartitions(List(new ListOffsetsPartition() @@ -56,17 +57,17 @@ class ListOffsetsRequestTest extends BaseRequestTest { val consumerRequest = ListOffsetsRequest.Builder .forConsumer(false, IsolationLevel.READ_UNCOMMITTED) .setTargetTimes(targetTimes) - .build() + .build(11.toShort) val replicaRequest = ListOffsetsRequest.Builder - .forReplica(ApiKeys.LIST_OFFSETS.latestVersion, brokers.head.config.brokerId) + .forReplica(11.toShort, brokers.head.config.brokerId) .setTargetTimes(targetTimes) - .build() + .build(11.toShort) val debugReplicaRequest = ListOffsetsRequest.Builder - .forReplica(ApiKeys.LIST_OFFSETS.latestVersion, ListOffsetsRequest.DEBUGGING_REPLICA_ID) + .forReplica(11.toShort, ListOffsetsRequest.DEBUGGING_REPLICA_ID) .setTargetTimes(targetTimes) - .build() + .build(11.toShort) // Unknown topic val randomBrokerId = brokers.head.config.brokerId @@ -138,11 +139,27 @@ class ListOffsetsRequestTest extends BaseRequestTest { private[this] def sendRequest(serverId: Int, timestamp: Long, version: Short): ListOffsetsPartitionResponse = { - val targetTimes = List(new ListOffsetsTopic() - .setName(topic) - .setPartitions(List(new ListOffsetsPartition() - .setPartitionIndex(partition.partition) - .setTimestamp(timestamp)).asJava)).asJava + // For version >= 12, we need to get the actual topic ID + val listOffsetsTopic = new ListOffsetsTopic() + if (version >= 12) { + try { + val topicDescription = createAdminClient().describeTopics(Seq(partition.topic).asJava).allTopicNames.get + val topicId = topicDescription.get(partition.topic).topicId() + listOffsetsTopic.setTopicId(topicId) + } catch { + case _: Exception => + // Topic doesn't exist, use ZERO_UUID + listOffsetsTopic.setTopicId(Uuid.ZERO_UUID) + } + } else { + listOffsetsTopic.setName(topic) + } + + listOffsetsTopic.setPartitions(List(new ListOffsetsPartition() + .setPartitionIndex(partition.partition) + .setTimestamp(timestamp)).asJava) + + val targetTimes = List(listOffsetsTopic).asJava val builder = ListOffsetsRequest.Builder .forConsumer(false, IsolationLevel.READ_UNCOMMITTED) @@ -150,8 +167,14 @@ class ListOffsetsRequestTest extends BaseRequestTest { val request = if (version == -1) builder.build() else builder.build(version) - sendRequest(serverId, request).topics.asScala.find(_.name == topic).get - .partitions.asScala.find(_.partitionIndex == partition.partition).get + val response = sendRequest(serverId, request) + // For version >= 12, response uses topic ID instead of name + val topicResponse = if (version >= 12) { + response.topics.asScala.head + } else { + response.topics.asScala.find(_.name == topic).get + } + topicResponse.partitions.asScala.find(_.partitionIndex == partition.partition).get } // -1 indicate "latest" @@ -260,4 +283,123 @@ class ListOffsetsRequestTest extends BaseRequestTest { def createTopic(numPartitions: Int, replicationFactor: Int): Map[Int, Int] = { super.createTopic(topic, numPartitions, replicationFactor) } + + @Test + def testListOffsetsWithTopicId(): Unit = { + // Create topic + val partitionToLeader = createTopic(numPartitions = 1, replicationFactor = 2) + val leader = partitionToLeader(partition.partition) + + // Get the actual topic ID + val topicDescription = createAdminClient().describeTopics(Seq(partition.topic).asJava).allTopicNames.get + val topicId = topicDescription.get(partition.topic).topicId() + + // Produce some messages + TestUtils.generateAndProduceMessages(brokers, topic, 10) + + // Test with correct topic ID (version 12) + val targetTimesWithCorrectId = List(new ListOffsetsTopic() + .setTopicId(topicId) + .setPartitions(List(new ListOffsetsPartition() + .setPartitionIndex(partition.partition) + .setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava + + val requestWithCorrectId = ListOffsetsRequest.Builder + .forReplica(12.toShort, brokers.head.config.brokerId) + .setTargetTimes(targetTimesWithCorrectId) + .build(12.toShort) + + val responseWithCorrectId = sendRequest(leader, requestWithCorrectId) + assertEquals(1, responseWithCorrectId.topics.size) + val topicResponse = responseWithCorrectId.topics.asScala.head + assertEquals(1, topicResponse.partitions.size) + val partitionResponse = topicResponse.partitions.asScala.head + assertEquals(Errors.NONE.code, partitionResponse.errorCode) + assertEquals(10L, partitionResponse.offset) + } + + @Test + def testListOffsetsWithIncorrectTopicId(): Unit = { + // Create topic + val partitionToLeader = createTopic(numPartitions = 1, replicationFactor = 2) + val leader = partitionToLeader(partition.partition) + + // Produce some messages + TestUtils.generateAndProduceMessages(brokers, topic, 10) + + // Test with incorrect topic ID (version 12) + val randomTopicId = Uuid.randomUuid() + val targetTimesWithIncorrectId = List(new ListOffsetsTopic() + .setTopicId(randomTopicId) + .setPartitions(List(new ListOffsetsPartition() + .setPartitionIndex(partition.partition) + .setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava + + val requestWithIncorrectId = ListOffsetsRequest.Builder + .forReplica(12.toShort, brokers.head.config.brokerId) + .setTargetTimes(targetTimesWithIncorrectId) + .build(12.toShort) + + val responseWithIncorrectId = sendRequest(leader, requestWithIncorrectId) + assertEquals(1, responseWithIncorrectId.topics.size) + val topicResponse = responseWithIncorrectId.topics.asScala.head + assertEquals(1, topicResponse.partitions.size) + val partitionResponse = topicResponse.partitions.asScala.head + assertEquals(Errors.UNKNOWN_TOPIC_ID.code, partitionResponse.errorCode) + } + + @Test + def testListOffsetsVersion12RequiresTopicId(): Unit = { + // Create topic + createTopic(numPartitions = 1, replicationFactor = 2) + + // Produce some messages + TestUtils.generateAndProduceMessages(brokers, topic, 10) + + // Test version 12 with ZERO_UUID - should throw UnsupportedVersionException when building request + val targetTimesWithZeroUuid = List(new ListOffsetsTopic() + .setTopicId(Uuid.ZERO_UUID) + .setPartitions(List(new ListOffsetsPartition() + .setPartitionIndex(partition.partition) + .setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava + + // Version 12 requires non-zero topic ID, so building the request should throw exception + assertThrows(classOf[org.apache.kafka.common.errors.UnsupportedVersionException], () => { + ListOffsetsRequest.Builder + .forReplica(12.toShort, brokers.head.config.brokerId) + .setTargetTimes(targetTimesWithZeroUuid) + .build(12.toShort) + }) + } + + @Test + def testListOffsetsVersion11UsesTopicName(): Unit = { + // Create topic + val partitionToLeader = createTopic(numPartitions = 1, replicationFactor = 2) + val leader = partitionToLeader(partition.partition) + + // Produce some messages + TestUtils.generateAndProduceMessages(brokers, topic, 10) + + // Test version 11 with topic name (no topic ID needed) + val targetTimesWithName = List(new ListOffsetsTopic() + .setName(topic) + .setPartitions(List(new ListOffsetsPartition() + .setPartitionIndex(partition.partition) + .setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava + + val requestWithName = ListOffsetsRequest.Builder + .forReplica(11.toShort, brokers.head.config.brokerId) + .setTargetTimes(targetTimesWithName) + .build(11.toShort) + + val responseWithName = sendRequest(leader, requestWithName) + assertEquals(1, responseWithName.topics.size) + val topicResponse = responseWithName.topics.asScala.head + assertEquals(topic, topicResponse.name) + assertEquals(1, topicResponse.partitions.size) + val partitionResponse = topicResponse.partitions.asScala.head + assertEquals(Errors.NONE.code, partitionResponse.errorCode) + assertEquals(10L, partitionResponse.offset) + } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 753b831b594df..d97e6b7e51c56 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -37,6 +37,8 @@ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.errors.InvalidPidMappingException import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.{DeleteRecordsResponseData, FetchResponseData, ShareFetchResponseData} +import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsTopic, ListOffsetsPartition} +import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsTopicResponse, ListOffsetsPartitionResponse} import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.metadata.{PartitionChangeRecord, PartitionRecord, RemoveTopicRecord, TopicRecord} import org.apache.kafka.common.metrics.Metrics @@ -93,7 +95,7 @@ import java.io.{ByteArrayInputStream, File} import java.net.InetAddress import java.nio.file.{Files, Paths} import java.util -import java.util.concurrent.atomic.{AtomicLong, AtomicReference} +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference} import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap, CountDownLatch, Future, TimeUnit} import java.util.function.{BiConsumer, Consumer} import java.util.stream.IntStream @@ -6096,6 +6098,140 @@ class ReplicaManagerTest { } } } + + @Test + def testFetchOffsetWithInconsistentTopicId(): Unit = { + // Use class-level topicId as the correct one, and create a wrong one + val wrongTopicId = Uuid.randomUuid() + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2)) + + try { + // Create topic with class-level topicId in metadata cache + val delta = topicsCreateDelta(0, isStartIdLeader = true, partitions = List(0), topicName = topic, topicId = topicId) + val image = imageFromTopics(delta.apply()) + replicaManager.applyDelta(delta, image) + + val responseReceived = new AtomicBoolean(false) + val responseTopics = new AtomicReference[util.Collection[ListOffsetsTopicResponse]]() + + val buildErrorResponse = (error: Errors, partition: ListOffsetsPartition) => { + new ListOffsetsPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(error.code) + .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetsResponse.UNKNOWN_OFFSET) + } + + val responseCallback: Consumer[util.Collection[ListOffsetsTopicResponse]] = (topics: util.Collection[ListOffsetsTopicResponse]) => { + responseTopics.set(topics) + responseReceived.set(true) + } + + val listOffsetsTopic = new ListOffsetsTopic() + .setName(topic) + .setTopicId(wrongTopicId) // Use wrong topic ID + .setPartitions(util.Arrays.asList( + new ListOffsetsPartition() + .setPartitionIndex(0) + .setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP))) + + // Call fetchOffset with wrong topic ID + replicaManager.fetchOffset( + topics = Seq(listOffsetsTopic), + duplicatePartitions = Set.empty, + isolationLevel = IsolationLevel.READ_UNCOMMITTED, + replicaId = ListOffsetsRequest.CONSUMER_REPLICA_ID, + clientId = "test-client", + correlationId = 1, + version = 12, + buildErrorResponse = buildErrorResponse, + responseCallback = responseCallback, + timeoutMs = 0) + + // Verify response contains INCONSISTENT_TOPIC_ID error + assertTrue(responseReceived.get(), "Response should be received") + val topics = responseTopics.get() + assertEquals(1, topics.size(), "Should have 1 topic in response") + val topicResponse = topics.iterator().next() + assertEquals(topic, topicResponse.name()) + assertEquals(wrongTopicId, topicResponse.topicId()) + assertEquals(1, topicResponse.partitions().size()) + val partitionResponse = topicResponse.partitions().get(0) + assertEquals(0, partitionResponse.partitionIndex()) + assertEquals(Errors.INCONSISTENT_TOPIC_ID.code, partitionResponse.errorCode()) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testFetchOffsetWithZeroUuid(): Unit = { + val tp = new TopicPartition(topic, 0) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2)) + + try { + // Create topic with class-level topicId in metadata cache (legacy clients use ZERO_UUID) + val delta = topicsCreateDelta(0, isStartIdLeader = true, partitions = List(0), topicName = topic, topicId = topicId) + val image = imageFromTopics(delta.apply()) + replicaManager.applyDelta(delta, image) + + // Append some records to create offsets + val records = MemoryRecords.withRecords(Compression.NONE, + new SimpleRecord("message1".getBytes), + new SimpleRecord("message2".getBytes)) + appendRecords(replicaManager, tp, records) + + val responseReceived = new AtomicBoolean(false) + val responseTopics = new AtomicReference[util.Collection[ListOffsetsTopicResponse]]() + + val buildErrorResponse = (error: Errors, partition: ListOffsetsPartition) => { + new ListOffsetsPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(error.code) + .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetsResponse.UNKNOWN_OFFSET) + } + + val responseCallback: Consumer[util.Collection[ListOffsetsTopicResponse]] = (topics: util.Collection[ListOffsetsTopicResponse]) => { + responseTopics.set(topics) + responseReceived.set(true) + } + + val listOffsetsTopic = new ListOffsetsTopic() + .setName(topic) + .setTopicId(Uuid.ZERO_UUID) // Use ZERO_UUID for backward compatibility + .setPartitions(util.Arrays.asList( + new ListOffsetsPartition() + .setPartitionIndex(0) + .setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP))) + + // Call fetchOffset with ZERO_UUID (legacy behavior) + replicaManager.fetchOffset( + topics = Seq(listOffsetsTopic), + duplicatePartitions = Set.empty, + isolationLevel = IsolationLevel.READ_UNCOMMITTED, + replicaId = ListOffsetsRequest.CONSUMER_REPLICA_ID, + clientId = "test-client", + correlationId = 1, + version = 11, // Version 11 uses topic names + buildErrorResponse = buildErrorResponse, + responseCallback = responseCallback, + timeoutMs = 0) + + // Verify response - should succeed with ZERO_UUID + assertTrue(responseReceived.get(), "Response should be received") + val topics = responseTopics.get() + assertEquals(1, topics.size(), "Should have 1 topic in response") + val topicResponse = topics.iterator().next() + assertEquals(topic, topicResponse.name()) + assertEquals(1, topicResponse.partitions().size()) + val partitionResponse = topicResponse.partitions().get(0) + assertEquals(0, partitionResponse.partitionIndex()) + assertEquals(Errors.NONE.code, partitionResponse.errorCode()) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } } class MockReplicaSelector extends ReplicaSelector { diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index 8bdce25808fa9..90c9bb6dd94f3 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -274,7 +274,9 @@ public short fetchRequestVersion() { } public short listOffsetRequestVersion() { - if (this.isAtLeast(IBP_4_2_IV1)) { + if (this.isAtLeast(IBP_4_3_IV0)) { + return 12; + } else if (this.isAtLeast(IBP_4_2_IV1)) { return 11; } else if (this.isAtLeast(IBP_4_0_IV3)) { return 10; diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java index e0bcfb7f146ec..fcd11087a0aac 100644 --- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java @@ -284,8 +284,8 @@ public void assertLatestIsNotProduction() { @ParameterizedTest @EnumSource(value = MetadataVersion.class) public void testListOffsetsValueVersion(MetadataVersion metadataVersion) { - final short expectedVersion = 11; - if (metadataVersion.isAtLeast(IBP_4_2_IV1)) { + final short expectedVersion = 12; + if (metadataVersion.isAtLeast(IBP_4_3_IV0)) { assertEquals(expectedVersion, metadataVersion.listOffsetRequestVersion()); } else { assertTrue(metadataVersion.listOffsetRequestVersion() < expectedVersion); diff --git a/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java b/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java index 200fb2262acb2..1b6ce507f0016 100644 --- a/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java +++ b/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.server.purgatory; +import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.message.ListOffsetsResponseData; @@ -53,13 +54,13 @@ public class DelayedRemoteListOffsets extends DelayedOperation { static final Map PARTITION_EXPIRATION_METERS = new ConcurrentHashMap<>(); private final int version; - private final Map statusByPartition; + private final Map statusByPartition; private final Consumer partitionOrException; private final Consumer> responseCallback; public DelayedRemoteListOffsets(long delayMs, int version, - Map statusByPartition, + Map statusByPartition, Consumer partitionOrException, Consumer> responseCallback) { super(delayMs); @@ -83,11 +84,11 @@ public DelayedRemoteListOffsets(long delayMs, */ @Override public void onExpiration() { - statusByPartition.forEach((topicPartition, status) -> { + statusByPartition.forEach((topicIdPartition, status) -> { if (!status.completed()) { - LOG.debug("Expiring list offset request for partition {} with status {}", topicPartition, status); + LOG.debug("Expiring list offset request for partition {} with status {}", topicIdPartition.topicPartition(), status); status.futureHolderOpt().ifPresent(futureHolder -> futureHolder.jobFuture().cancel(true)); - recordExpiration(topicPartition); + recordExpiration(topicIdPartition.topicPartition()); } }); } @@ -99,9 +100,13 @@ public void onExpiration() { @Override public void onComplete() { Map groupedByTopic = new HashMap<>(); - statusByPartition.forEach((tp, status) -> { - ListOffsetsResponseData.ListOffsetsTopicResponse response = groupedByTopic.computeIfAbsent(tp.topic(), k -> - new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(tp.topic())); + statusByPartition.forEach((topicIdPartition, status) -> { + ListOffsetsResponseData.ListOffsetsTopicResponse response = groupedByTopic.computeIfAbsent( + topicIdPartition.topic(), + k -> new ListOffsetsResponseData.ListOffsetsTopicResponse() + .setName(topicIdPartition.topic()) + .setTopicId(topicIdPartition.topicId()) + ); status.responseOpt().ifPresent(res -> response.partitions().add(res)); }); responseCallback.accept(groupedByTopic.values()); @@ -118,7 +123,7 @@ public boolean tryComplete() { statusByPartition.forEach((partition, status) -> { if (!status.completed()) { try { - partitionOrException.accept(partition); + partitionOrException.accept(partition.topicPartition()); } catch (ApiException e) { status.futureHolderOpt().ifPresent(futureHolder -> { futureHolder.jobFuture().cancel(false); diff --git a/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java index 6586bf35ae091..b7701743e38ce 100644 --- a/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java +++ b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.server.purgatory; +import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.message.ListOffsetsResponseData; import org.apache.kafka.common.protocol.Errors; @@ -82,10 +84,10 @@ public void testResponseOnRequestExpiration() throws InterruptedException { return true; }); - Map statusByPartition = Map.of( - new TopicPartition("test", 0), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), - new TopicPartition("test", 1), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), - new TopicPartition("test1", 0), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build() + Map statusByPartition = Map.of( + new TopicIdPartition(Uuid.randomUuid(), 0, "test"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicIdPartition(Uuid.randomUuid(), 1, "test"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicIdPartition(Uuid.randomUuid(), 0, "test1"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build() ); DelayedRemoteListOffsets delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, 5, statusByPartition, partitionOrException, responseCallback); @@ -133,10 +135,10 @@ public void testResponseOnSuccess() { return true; }); - Map statusByPartition = Map.of( - new TopicPartition("test", 0), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), - new TopicPartition("test", 1), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), - new TopicPartition("test1", 0), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build() + Map statusByPartition = Map.of( + new TopicIdPartition(Uuid.randomUuid(), 0, "test"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicIdPartition(Uuid.randomUuid(), 1, "test"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicIdPartition(Uuid.randomUuid(), 0, "test1"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build() ); DelayedRemoteListOffsets delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, 5, statusByPartition, partitionOrException, responseCallback); @@ -188,10 +190,10 @@ public void testResponseOnPartialError() { when(errorFutureHolder.taskFuture()).thenAnswer(f -> errorTaskFuture); when(errorFutureHolder.jobFuture()).thenReturn(jobFuture); - Map statusByPartition = Map.of( - new TopicPartition("test", 0), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), - new TopicPartition("test", 1), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), - new TopicPartition("test1", 0), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build() + Map statusByPartition = Map.of( + new TopicIdPartition(Uuid.randomUuid(), 0, "test"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicIdPartition(Uuid.randomUuid(), 1, "test"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicIdPartition(Uuid.randomUuid(), 0, "test1"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build() ); DelayedRemoteListOffsets delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, 5, statusByPartition, partitionOrException, responseCallback); @@ -244,11 +246,11 @@ public void testPartialResponseWhenNotLeaderOrFollowerExceptionOnOnePartition() when(errorFutureHolder.taskFuture()).thenAnswer(f -> errorTaskFuture); when(errorFutureHolder.jobFuture()).thenReturn(jobFuture); - Map statusByPartition = Map.of( - new TopicPartition("test", 0), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), - new TopicPartition("test", 1), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), - new TopicPartition("test1", 0), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build(), - new TopicPartition("test1", 1), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build() + Map statusByPartition = Map.of( + new TopicIdPartition(Uuid.randomUuid(), 0, "test"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicIdPartition(Uuid.randomUuid(), 1, "test"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicIdPartition(Uuid.randomUuid(), 0, "test1"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build(), + new TopicIdPartition(Uuid.randomUuid(), 1, "test1"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build() ); DelayedRemoteListOffsets delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, 5, statusByPartition, partitionOrException, responseCallback);