diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 0986d8a67bc36..50ab3279104ba 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -292,6 +292,14 @@ public Map topicIds() { return metadataSnapshot.topicIds(); } + public Uuid getTopicIdByName(String topic) { + return metadataSnapshot.topicIds().getOrDefault(topic, Uuid.ZERO_UUID); + } + + public String getTopicNameById(Uuid topicid) { + return metadataSnapshot.topicNames().get(topicid); + } + public synchronized LeaderAndEpoch currentLeader(TopicPartition topicPartition) { Optional maybeMetadata = partitionMetadataIfCurrent(topicPartition); if (maybeMetadata.isEmpty()) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java index f9cca5c1339a2..bdad73eab678d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition; @@ -54,6 +55,7 @@ import static org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.regroupFetchPositionsByLeader; import static org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.topicsForPartitions; + /** * {@link OffsetFetcher} is responsible for fetching the {@link OffsetAndTimestamp offsets} for * a given set of {@link TopicPartition topic and partition pairs} and for validation and resetting of positions, @@ -393,9 +395,14 @@ private Map> groupListOffsetRequ private RequestFuture sendListOffsetRequest(final Node node, final Map timestampsToSearch, boolean requireTimestamp) { + boolean canUseTopicIds = !timestampsToSearch.isEmpty() && timestampsToSearch.keySet().stream() + .map(tp -> metadata.getTopicIdByName(tp.topic())) + .allMatch(topicId -> topicId != null && !topicId.equals(Uuid.ZERO_UUID)); + + ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(requireTimestamp, isolationLevel) - .setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(timestampsToSearch)) + .forConsumer(requireTimestamp, isolationLevel, canUseTopicIds) + .setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(timestampsToSearch, metadata)) .setTimeoutMs(requestTimeoutMs); log.debug("Sending ListOffsetRequest {} to broker {}", builder, node); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java index a92237d0fc9fc..8eed57e4504bc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.message.ApiVersionsResponseData; @@ -110,8 +111,13 @@ OffsetFetcherUtils.ListOffsetResult handleListOffsetResponse(ListOffsetsResponse Set unauthorizedTopics = new HashSet<>(); for (ListOffsetsResponseData.ListOffsetsTopicResponse topic : listOffsetsResponse.topics()) { + String topicName = topic.name(); + if (topic.topicId() != null && topic.topicId() != Uuid.ZERO_UUID) { + topicName = metadata.getTopicNameById(topic.topicId()); + } + for (ListOffsetsResponseData.ListOffsetsPartitionResponse partition : topic.partitions()) { - TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex()); + TopicPartition topicPartition = new TopicPartition(topicName, partition.partitionIndex()); Errors error = Errors.forCode(partition.errorCode()); switch (error) { case NONE: diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java index 080dfa5cd961d..2cea7ad160e0e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.message.ListOffsetsRequestData; import org.apache.kafka.common.requests.AbstractRequest; @@ -615,9 +616,14 @@ private CompletableFuture buildListOffsetRequestToNode( Map targetTimes, boolean requireTimestamps, List unsentRequests) { + + boolean canUseTopicIds = !targetTimes.isEmpty() && targetTimes.keySet().stream() + .map(tp -> metadata.getTopicIdByName(tp.topic())) + .allMatch(topicId -> topicId != null && !topicId.equals(Uuid.ZERO_UUID)); + ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(requireTimestamps, isolationLevel) - .setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes)) + .forConsumer(requireTimestamps, isolationLevel, canUseTopicIds) + .setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes, metadata)) .setTimeoutMs(requestTimeoutMs); log.debug("Creating ListOffset request {} for broker {} to reset positions", builder, diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java index 5862ebdfafc67..064f62302cf33 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java @@ -16,8 +16,11 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.ListOffsetsRequestData; import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition; import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic; @@ -60,7 +63,13 @@ public static class Builder extends AbstractRequest.Builder public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel) { - return forConsumer(requireTimestamp, isolationLevel, false, false, false, false); + return forConsumer(requireTimestamp, isolationLevel, false, false, false, false, false); + } + + public static Builder forConsumer(boolean requireTimestamp, + IsolationLevel isolationLevel, + boolean canUseTopicIds) { + return forConsumer(requireTimestamp, isolationLevel, false, false, false, false, canUseTopicIds); } public static Builder forConsumer(boolean requireTimestamp, @@ -69,6 +78,18 @@ public static Builder forConsumer(boolean requireTimestamp, boolean requireEarliestLocalTimestamp, boolean requireTieredStorageTimestamp, boolean requireEarliestPendingUploadTimestamp) { + return forConsumer(requireTimestamp, isolationLevel, requireMaxTimestamp, + requireEarliestLocalTimestamp, requireTieredStorageTimestamp, + requireEarliestPendingUploadTimestamp, false); + } + + public static Builder forConsumer(boolean requireTimestamp, + IsolationLevel isolationLevel, + boolean requireMaxTimestamp, + boolean requireEarliestLocalTimestamp, + boolean requireTieredStorageTimestamp, + boolean requireEarliestPendingUploadTimestamp, + boolean canUseTopicIds) { short minVersion = ApiKeys.LIST_OFFSETS.oldestVersion(); if (requireEarliestPendingUploadTimestamp) minVersion = 11; @@ -82,7 +103,10 @@ else if (isolationLevel == IsolationLevel.READ_COMMITTED) minVersion = 2; else if (requireTimestamp) minVersion = 1; - return new Builder(minVersion, ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel); + + // When canUseTopicIds is false, limit maxVersion to 11 to use name-based protocol + short maxVersion = canUseTopicIds ? ApiKeys.LIST_OFFSETS.latestVersion() : (short) 11; + return new Builder(minVersion, maxVersion, CONSUMER_REPLICA_ID, isolationLevel); } public static Builder forReplica(short allowedVersion, int replicaId) { @@ -111,6 +135,22 @@ public Builder setTimeoutMs(int timeoutMs) { @Override public ListOffsetsRequest build(short version) { + if (version >= 12) { + data.topics().forEach(topic -> { + if (topic.topicId() == null || topic.topicId().equals(Uuid.ZERO_UUID)) { + throw new UnsupportedVersionException("The broker offset commit api version " + + version + " does require usage of topic ids."); + } + }); + } else { + data.topics().forEach(topic -> { + if (topic.name() == null || topic.name().isEmpty()) { + throw new UnsupportedVersionException("The broker offset commit api version " + + version + " does require usage of topic names."); + } + }); + } + return new ListOffsetsRequest(data, version); } @@ -144,14 +184,16 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { List responses = new ArrayList<>(); for (ListOffsetsTopic topic : data.topics()) { - ListOffsetsTopicResponse topicResponse = new ListOffsetsTopicResponse().setName(topic.name()); + ListOffsetsTopicResponse topicResponse = new ListOffsetsTopicResponse() + .setName(topic.name()) + .setTopicId(topic.topicId()); List partitions = new ArrayList<>(); for (ListOffsetsPartition partition : topic.partitions()) { ListOffsetsPartitionResponse partitionResponse = new ListOffsetsPartitionResponse() .setErrorCode(errorCode) .setPartitionIndex(partition.partitionIndex()); partitionResponse.setOffset(ListOffsetsResponse.UNKNOWN_OFFSET) - .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP); + .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP); partitions.add(partitionResponse); } topicResponse.setPartitions(partitions); @@ -192,13 +234,17 @@ public static ListOffsetsRequest parse(Readable readable, short version) { return new ListOffsetsRequest(new ListOffsetsRequestData(readable, version), version); } - public static List toListOffsetsTopics(Map timestampsToSearch) { + public static List toListOffsetsTopics( + Map timestampsToSearch, + ConsumerMetadata metadata + ) { Map topics = new HashMap<>(); for (Map.Entry entry : timestampsToSearch.entrySet()) { TopicPartition tp = entry.getKey(); - ListOffsetsTopic topic = topics.computeIfAbsent(tp.topic(), k -> new ListOffsetsTopic().setName(tp.topic())); + ListOffsetsTopic topic = topics.computeIfAbsent(tp.topic(), k -> new ListOffsetsTopic() + .setName(tp.topic()).setTopicId(metadata.getTopicIdByName(tp.topic()))); topic.partitions().add(entry.getValue()); } return new ArrayList<>(topics.values()); } -} +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java index cadff02033958..7db7ca0302652 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java @@ -102,6 +102,10 @@ public boolean shouldClientThrottle(short version) { return version >= 3; } + public static boolean useTopicIds(short version) { + return version >= 12; + } + public static ListOffsetsTopicResponse singletonListOffsetsTopicResponse(TopicPartition tp, Errors error, long timestamp, long offset, int epoch) { return new ListOffsetsTopicResponse() .setName(tp.topic()) @@ -112,4 +116,6 @@ public static ListOffsetsTopicResponse singletonListOffsetsTopicResponse(TopicPa .setOffset(offset) .setLeaderEpoch(epoch))); } -} + + +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/ListOffsetsRequest.json b/clients/src/main/resources/common/message/ListOffsetsRequest.json index 1a2de6ca30a2f..101be979bf169 100644 --- a/clients/src/main/resources/common/message/ListOffsetsRequest.json +++ b/clients/src/main/resources/common/message/ListOffsetsRequest.json @@ -42,7 +42,9 @@ // Version 10 enables async remote list offsets support (KIP-1075) // // Version 11 enables listing offsets by earliest pending upload offset (KIP-1023) - "validVersions": "1-11", + // + // Version 12 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code. + "validVersions": "1-12", "flexibleVersions": "6+", "latestVersionUnstable": false, "fields": [ @@ -52,8 +54,10 @@ "about": "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records." }, { "name": "Topics", "type": "[]ListOffsetsTopic", "versions": "0+", "about": "Each topic in the request.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0-11", "entityType": "topicName", "ignorable": true, "about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "12+", "ignorable": true, + "about": "The unique topic ID."}, { "name": "Partitions", "type": "[]ListOffsetsPartition", "versions": "0+", "about": "Each partition in the request.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", diff --git a/clients/src/main/resources/common/message/ListOffsetsResponse.json b/clients/src/main/resources/common/message/ListOffsetsResponse.json index 1407273bf4d8c..960d5c024e015 100644 --- a/clients/src/main/resources/common/message/ListOffsetsResponse.json +++ b/clients/src/main/resources/common/message/ListOffsetsResponse.json @@ -42,15 +42,19 @@ // Version 10 enables async remote list offsets support (KIP-1075) // // Version 11 enables listing offsets by earliest pending upload offset (KIP-1023) - "validVersions": "1-11", + // + // Version 12 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code. + "validVersions": "1-12", "flexibleVersions": "6+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Topics", "type": "[]ListOffsetsTopicResponse", "versions": "0+", "about": "Each topic in the response.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0-11", "entityType": "topicName", "ignorable": true, "about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "12+", "ignorable": true, + "about": "The unique topic ID."}, { "name": "Partitions", "type": "[]ListOffsetsPartitionResponse", "versions": "0+", "about": "Each partition in the response.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java index 182900c0207ac..5e2d7dc7bf3ed 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java @@ -780,6 +780,7 @@ public void testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() { List topics = Collections.singletonList( new ListOffsetsTopicResponse() .setName(tp0.topic()) + .setTopicId(topicId) .setPartitions(Arrays.asList( tp0NoError, new ListOffsetsPartitionResponse() @@ -798,6 +799,7 @@ public void testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() { List expectedTopics = Collections.singletonList( new ListOffsetsTopic() .setName(tp0.topic()) + .setTopicId(topicId) .setPartitions(Arrays.asList( new ListOffsetsPartition() .setPartitionIndex(tp1.partition()) @@ -820,6 +822,7 @@ public void testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() { List topicsWithFatalError = Collections.singletonList( new ListOffsetsTopicResponse() .setName(tp0.topic()) + .setTopicId(topicId) .setPartitions(Arrays.asList( tp0NoError, new ListOffsetsPartitionResponse() @@ -1015,6 +1018,7 @@ public void testBatchedListOffsetsMetadataErrors() { .setThrottleTimeMs(0) .setTopics(Collections.singletonList(new ListOffsetsTopicResponse() .setName(tp0.topic()) + .setTopicId(topicId) .setPartitions(Arrays.asList( new ListOffsetsPartitionResponse() .setPartitionIndex(tp0.partition()) @@ -1095,6 +1099,7 @@ private void testGetOffsetsForTimesWithUnknownOffset() { .setThrottleTimeMs(0) .setTopics(Collections.singletonList(new ListOffsetsTopicResponse() .setName(tp0.topic()) + .setTopicId(topicId) .setPartitions(Collections.singletonList(new ListOffsetsPartitionResponse() .setPartitionIndex(tp0.partition()) .setErrorCode(Errors.NONE.code()) @@ -1665,6 +1670,7 @@ private ListOffsetsResponse listOffsetResponse(Map offsets for (Map.Entry> response : responses.entrySet()) { topics.add(new ListOffsetsTopicResponse() .setName(response.getKey()) + .setTopicId(topicIds.getOrDefault(response.getKey(), Uuid.ZERO_UUID)) .setPartitions(response.getValue())); } ListOffsetsResponseData data = new ListOffsetsResponseData().setTopics(topics); diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index 0bcd9731c462d..960fbbda891c8 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -182,43 +182,47 @@ public void testJoinGroupRequestVersions() throws Exception { testAllMessageRoundTripsFromVersion((short) 5, newRequest.get().setGroupInstanceId("instanceId")); } - @Test - public void testListOffsetsRequestVersions() throws Exception { - List v = Collections.singletonList(new ListOffsetsTopic() - .setName("topic") - .setPartitions(Collections.singletonList(new ListOffsetsPartition() - .setPartitionIndex(0) - .setTimestamp(123L)))); - Supplier newRequest = () -> new ListOffsetsRequestData() - .setTopics(v) - .setReplicaId(0); - testAllMessageRoundTrips(newRequest.get()); - testAllMessageRoundTripsFromVersion((short) 2, newRequest.get().setIsolationLevel(IsolationLevel.READ_COMMITTED.id())); + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.LIST_OFFSETS) + public void testListOffsetsRequestVersions(short version) throws Exception { + ListOffsetsRequestData request = new ListOffsetsRequestData() + .setReplicaId(0) + .setIsolationLevel(version >= 2 ? IsolationLevel.READ_COMMITTED.id() : 0) + .setTopics(singletonList( + new ListOffsetsTopic() + .setTopicId(version >= 12 ? Uuid.randomUuid() : Uuid.ZERO_UUID) + .setName(version < 12 ? "topic" : "") + .setPartitions(singletonList( + new ListOffsetsPartition() + .setPartitionIndex(0) + .setCurrentLeaderEpoch(version >= 4 ? 10 : -1) + .setTimestamp(123L) + )) + )); + + testMessageRoundTrip(version, request, request); } - @Test - public void testListOffsetsResponseVersions() throws Exception { - ListOffsetsPartitionResponse partition = new ListOffsetsPartitionResponse() - .setErrorCode(Errors.NONE.code()) - .setPartitionIndex(0); - List topics = Collections.singletonList(new ListOffsetsTopicResponse() - .setName("topic") - .setPartitions(Collections.singletonList(partition))); - Supplier response = () -> new ListOffsetsResponseData() - .setTopics(topics); - for (short version = ApiKeys.LIST_OFFSETS.oldestVersion(); version <= ApiKeys.LIST_OFFSETS.latestVersion(); ++version) { - ListOffsetsResponseData responseData = response.get(); - responseData.topics().get(0).partitions().get(0) - .setOffset(456L) - .setTimestamp(123L); - if (version > 1) { - responseData.setThrottleTimeMs(1000); - } - if (version > 3) { - partition.setLeaderEpoch(1); - } - testEquivalentMessageRoundTrip(version, responseData); - } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.LIST_OFFSETS) + public void testListOffsetsResponseVersions(short version) throws Exception { + ListOffsetsResponseData response = new ListOffsetsResponseData() + .setThrottleTimeMs(version >= 2 ? 1000 : 0) + .setTopics(singletonList( + new ListOffsetsTopicResponse() + .setTopicId(version >= 12 ? Uuid.randomUuid() : Uuid.ZERO_UUID) + .setName(version < 12 ? "topic" : "") + .setPartitions(singletonList( + new ListOffsetsPartitionResponse() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code()) + .setTimestamp(123L) + .setOffset(456L) + .setLeaderEpoch(version >= 4 ? 1 : -1) + )) + )); + + testMessageRoundTrip(version, response, response); } @Test diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java index 48542c1a2fd7d..abe486ba1f9e0 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java @@ -16,8 +16,11 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.ListOffsetsRequestData; import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition; import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic; @@ -37,7 +40,10 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class ListOffsetsRequestTest { @@ -61,22 +67,27 @@ public void testDuplicatePartitions() { @Test public void testGetErrorResponse() { + Uuid topicId = Uuid.randomUuid(); for (short version = 1; version <= ApiKeys.LIST_OFFSETS.latestVersion(); version++) { + // For version >= 12, we need to use topic IDs + boolean useTopicIds = version >= 12; List topics = Collections.singletonList( new ListOffsetsTopic() .setName("topic") + .setTopicId(useTopicIds ? topicId : Uuid.ZERO_UUID) .setPartitions(Collections.singletonList( new ListOffsetsPartition() .setPartitionIndex(0)))); ListOffsetsRequest request = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_COMMITTED) + .forConsumer(true, IsolationLevel.READ_COMMITTED, useTopicIds) .setTargetTimes(topics) .build(version); ListOffsetsResponse response = (ListOffsetsResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception()); - + List v = Collections.singletonList( new ListOffsetsTopicResponse() .setName("topic") + .setTopicId(useTopicIds ? topicId : Uuid.ZERO_UUID) .setPartitions(Collections.singletonList( new ListOffsetsPartitionResponse() .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()) @@ -95,6 +106,10 @@ public void testGetErrorResponse() { @Test public void testToListOffsetsTopics() { + Uuid topicId = Uuid.randomUuid(); + ConsumerMetadata metadata = mock(ConsumerMetadata.class); + when(metadata.getTopicIdByName("topic")).thenReturn(topicId); + ListOffsetsPartition lop0 = new ListOffsetsPartition() .setPartitionIndex(0) .setCurrentLeaderEpoch(1) @@ -106,10 +121,11 @@ public void testToListOffsetsTopics() { Map timestampsToSearch = new HashMap<>(); timestampsToSearch.put(new TopicPartition("topic", 0), lop0); timestampsToSearch.put(new TopicPartition("topic", 1), lop1); - List listOffsetTopics = ListOffsetsRequest.toListOffsetsTopics(timestampsToSearch); + List listOffsetTopics = ListOffsetsRequest.toListOffsetsTopics(timestampsToSearch, metadata); assertEquals(1, listOffsetTopics.size()); ListOffsetsTopic topic = listOffsetTopics.get(0); assertEquals("topic", topic.name()); + assertEquals(topicId, topic.topicId()); assertEquals(2, topic.partitions().size()); assertTrue(topic.partitions().contains(lop0)); assertTrue(topic.partitions().contains(lop1)); @@ -146,4 +162,98 @@ public void testListOffsetsRequestOldestVersion() { assertEquals((short) 9, requireTieredStorageTimestampRequestBuilder.oldestAllowedVersion()); assertEquals((short) 11, requireEarliestPendingUploadTimestampRequestBuilder.oldestAllowedVersion()); } + + @Test + public void testBuildWithTopicIdsForVersion12() { + Uuid topicId = Uuid.randomUuid(); + List topics = Collections.singletonList( + new ListOffsetsTopic() + .setName("topic") + .setTopicId(topicId) + .setPartitions(Collections.singletonList( + new ListOffsetsPartition() + .setPartitionIndex(0) + .setTimestamp(123L)))); + + ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder + .forConsumer(true, IsolationLevel.READ_COMMITTED, true) + .setTargetTimes(topics); + + // Version 12 should succeed with topic IDs + ListOffsetsRequest request = builder.build((short) 12); + assertEquals(1, request.data().topics().size()); + assertEquals(topicId, request.data().topics().get(0).topicId()); + } + + @Test + public void testBuildWithTopicNamesForVersion11() { + List topics = Collections.singletonList( + new ListOffsetsTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new ListOffsetsPartition() + .setPartitionIndex(0) + .setTimestamp(123L)))); + + ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder + .forConsumer(true, IsolationLevel.READ_COMMITTED, false) + .setTargetTimes(topics); + + // Version 11 should succeed with topic names + ListOffsetsRequest request = builder.build((short) 11); + assertEquals(1, request.data().topics().size()); + assertEquals("topic", request.data().topics().get(0).name()); + } + + @Test + public void testBuildVersion12RequiresTopicIds() { + // Version 12 without topic ID should throw exception + List topics = Collections.singletonList( + new ListOffsetsTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new ListOffsetsPartition() + .setPartitionIndex(0) + .setTimestamp(123L)))); + + ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder + .forConsumer(true, IsolationLevel.READ_COMMITTED, true) + .setTargetTimes(topics); + + assertThrows(UnsupportedVersionException.class, () -> builder.build((short) 12)); + } + + @Test + public void testBuildVersion11RequiresTopicNames() { + // Version 11 without topic name should throw exception + Uuid topicId = Uuid.randomUuid(); + List topics = Collections.singletonList( + new ListOffsetsTopic() + .setTopicId(topicId) + .setPartitions(Collections.singletonList( + new ListOffsetsPartition() + .setPartitionIndex(0) + .setTimestamp(123L)))); + + ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder + .forConsumer(true, IsolationLevel.READ_COMMITTED, false) + .setTargetTimes(topics); + + assertThrows(UnsupportedVersionException.class, () -> builder.build((short) 11)); + } + + @Test + public void testCanUseTopicIdsLimitsMaxVersion() { + // When canUseTopicIds is false, maxVersion should be limited to 11 + ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder + .forConsumer(true, IsolationLevel.READ_COMMITTED, false); + + assertEquals((short) 11, builder.latestAllowedVersion()); + + // When canUseTopicIds is true, maxVersion should be the latest + ListOffsetsRequest.Builder builderWithTopicIds = ListOffsetsRequest.Builder + .forConsumer(true, IsolationLevel.READ_COMMITTED, true); + + assertEquals(ApiKeys.LIST_OFFSETS.latestVersion(), builderWithTopicIds.latestAllowedVersion()); + } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index b3fb3939b1aae..145f4462bcf4c 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -2309,9 +2309,15 @@ private ListOffsetsRequest createListOffsetRequest(short version) { .setTimestamp(1000000L) .setCurrentLeaderEpoch(5); - ListOffsetsTopic topic = new ListOffsetsTopic() - .setName("test") - .setPartitions(singletonList(partition)); + ListOffsetsTopic topic = new ListOffsetsTopic(); + // Version 12+ requires topic ID instead of topic name + if (version >= 12) { + topic.setTopicId(Uuid.randomUuid()); + } else { + topic.setName("test"); + } + topic.setPartitions(singletonList(partition)); + return ListOffsetsRequest.Builder .forConsumer(true, IsolationLevel.READ_COMMITTED) .setTargetTimes(singletonList(topic)) @@ -2331,10 +2337,17 @@ private ListOffsetsResponse createListOffsetResponse(short version) { if (version >= 4) { partition.setLeaderEpoch(27); } + ListOffsetsTopicResponse topicResponse = new ListOffsetsTopicResponse(); + // Version 12+ uses topic ID instead of topic name + if (version >= 12) { + topicResponse.setTopicId(Uuid.randomUuid()); + } else { + topicResponse.setName("test"); + } + topicResponse.setPartitions(singletonList(partition)); + ListOffsetsResponseData data = new ListOffsetsResponseData() - .setTopics(singletonList(new ListOffsetsTopicResponse() - .setName("test") - .setPartitions(singletonList(partition)))); + .setTopics(singletonList(topicResponse)); return new ListOffsetsResponse(data); } else { throw new IllegalArgumentException("Illegal ListOffsetResponse version " + version); diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b7f2e3364f077..5a815447518ab 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -35,7 +35,7 @@ import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartit import org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult, DeleteRecordsTopicResult} import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic -import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition +import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse} import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic} import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic @@ -784,12 +784,42 @@ class KafkaApis(val requestChannel: RequestChannel, .setOffset(ListOffsetsResponse.UNKNOWN_OFFSET) } + // For version >= 12, resolve topic IDs to names and handle unknown topic IDs + val (knownTopics, unknownTopicIdResponses) = if (ListOffsetsResponse.useTopicIds(version)) { + val known = new util.ArrayList[ListOffsetsTopic]() + val unknown = new util.ArrayList[ListOffsetsTopicResponse]() + offsetRequest.topics.asScala.foreach { topic => + val topicName = metadataCache.getTopicName(topic.topicId()).orElse(null) + + if (topicName == null) { + // Topic ID cannot be resolved to a name + unknown.add(new ListOffsetsTopicResponse() + .setName(topic.name()) + .setTopicId(topic.topicId()) + .setPartitions(topic.partitions.asScala.map(partition => + buildErrorResponse(Errors.UNKNOWN_TOPIC_ID, partition)).asJava)) + } else { + // Topic ID successfully resolved, create topic with resolved name + val resolvedTopic = new ListOffsetsTopic() + .setName(topicName) + .setTopicId(topic.topicId()) + .setPartitions(topic.partitions()) + known.add(resolvedTopic) + } + } + (known, unknown) + } else { + // version < 12, use topic names directly + (offsetRequest.topics(), new util.ArrayList[ListOffsetsTopicResponse]()) + } + val (authorizedRequestInfo, unauthorizedRequestInfo) = authHelper.partitionSeqByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + DESCRIBE, TOPIC, knownTopics.asScala.toSeq)(_.name) val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => new ListOffsetsTopicResponse() .setName(topic.name) + .setTopicId(topic.topicId) .setPartitions(topic.partitions.asScala.map(partition => buildErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED, partition)).asJava) ).asJava @@ -797,6 +827,7 @@ class KafkaApis(val requestChannel: RequestChannel, def sendResponseCallback(response: util.Collection[ListOffsetsTopicResponse]): Void = { val mergedResponses = new util.ArrayList(response) mergedResponses.addAll(unauthorizedResponseStatus) + mergedResponses.addAll(unknownTopicIdResponses) requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetsResponse(new ListOffsetsResponseData() .setThrottleTimeMs(requestThrottleMs) @@ -804,7 +835,7 @@ class KafkaApis(val requestChannel: RequestChannel, null } - if (authorizedRequestInfo.isEmpty) { + if (authorizedRequestInfo.isEmpty || knownTopics.isEmpty) { sendResponseCallback(util.List.of) } else { replicaManager.fetchOffset(authorizedRequestInfo, offsetRequest.duplicatePartitions().asScala, diff --git a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala index 80d41e3b0cf13..8fbaaabfa1d7a 100644 --- a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala +++ b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala @@ -106,21 +106,34 @@ class RemoteLeaderEndPoint(logPrefix: String, } private def fetchOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int, timestamp: Long): OffsetAndEpoch = { - val topic = new ListOffsetsTopic() - .setName(topicPartition.topic) - .setPartitions(Collections.singletonList( - new ListOffsetsPartition() - .setPartitionIndex(topicPartition.partition) - .setCurrentLeaderEpoch(currentLeaderEpoch) - .setTimestamp(timestamp))) val metadataVersion = metadataVersionSupplier() + val topicIdPartition = replicaManager.topicIdPartition(topicPartition) + val topicId = if (topicIdPartition != null) topicIdPartition.topicId() else Uuid.ZERO_UUID + val useTopicId = metadataVersion.listOffsetRequestVersion >= 12 && topicId != Uuid.ZERO_UUID + + val topic = new ListOffsetsTopic() + if (useTopicId) { + topic.setTopicId(topicId) + } else { + topic.setName(topicPartition.topic) + } + topic.setPartitions(Collections.singletonList( + new ListOffsetsPartition() + .setPartitionIndex(topicPartition.partition) + .setCurrentLeaderEpoch(currentLeaderEpoch) + .setTimestamp(timestamp))) + val requestBuilder = ListOffsetsRequest.Builder.forReplica(metadataVersion.listOffsetRequestVersion, brokerConfig.brokerId) .setTargetTimes(Collections.singletonList(topic)) val clientResponse = blockingSender.sendRequest(requestBuilder) val response = clientResponse.responseBody.asInstanceOf[ListOffsetsResponse] - val responsePartition = response.topics.asScala.find(_.name == topicPartition.topic).get - .partitions.asScala.find(_.partitionIndex == topicPartition.partition).get + val responseTopic = if (useTopicId) { + response.topics.asScala.find(_.topicId == topicId).get + } else { + response.topics.asScala.find(_.name == topicPartition.topic).get + } + val responsePartition = responseTopic.partitions.asScala.find(_.partitionIndex == topicPartition.partition).get Errors.forCode(responsePartition.errorCode) match { case Errors.NONE => new OffsetAndEpoch(responsePartition.offset, responsePartition.leaderEpoch) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 4ee24f2e41428..cf8a55c52f78d 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1477,17 +1477,31 @@ class ReplicaManager(val config: KafkaConfig, buildErrorResponse: (Errors, ListOffsetsPartition) => ListOffsetsPartitionResponse, responseCallback: Consumer[util.Collection[ListOffsetsTopicResponse]], timeoutMs: Int = 0): Unit = { - val statusByPartition = mutable.Map[TopicPartition, ListOffsetsPartitionStatus]() + val statusByPartition = mutable.Map[TopicIdPartition, ListOffsetsPartitionStatus]() topics.foreach { topic => topic.partitions.asScala.foreach { partition => - val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) - if (duplicatePartitions.contains(topicPartition)) { - debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + + var topicId = topic.topicId() + if (topicId == null) { + topicId = Uuid.ZERO_UUID + } + + val topicIdPartition = new TopicIdPartition(topicId, partition.partitionIndex, topic.name) + val requestTopicIdOpt = if (topicIdPartition.topicId == Uuid.ZERO_UUID) None else Some(topicIdPartition.topicId) + val metadataTopicId = metadataCache.getTopicId(topic.name) + val cachedTopicIdOpt = Option(metadataTopicId).filterNot(_ == Uuid.ZERO_UUID) + + if (duplicatePartitions.contains(topicIdPartition.topicPartition())) { + debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition ${topicIdPartition} " + s"failed because the partition is duplicated in the request.") - statusByPartition += topicPartition -> + statusByPartition += topicIdPartition -> ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.INVALID_REQUEST, partition))).build() + } else if (requestTopicIdOpt.isDefined && !cachedTopicIdOpt.contains(requestTopicIdOpt.get)) { + debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition ${topicIdPartition} " + + s"failed because the provided topic ID ${requestTopicIdOpt.get} does not match the current topic ID $cachedTopicIdOpt.") + statusByPartition += topicIdPartition -> + ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.INCONSISTENT_TOPIC_ID, partition))).build() } else if (isListOffsetsTimestampUnsupported(partition.timestamp(), version)) { - statusByPartition += topicPartition -> + statusByPartition += topicIdPartition -> ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.UNSUPPORTED_VERSION, partition))).build() } else { try { @@ -1498,7 +1512,7 @@ class ReplicaManager(val config: KafkaConfig, else None - val resultHolder = fetchOffsetForTimestamp(topicPartition, + val resultHolder = fetchOffsetForTimestamp(topicIdPartition.topicPartition(), partition.timestamp, isolationLevelOpt, if (partition.currentLeaderEpoch == ListOffsetsResponse.UNKNOWN_EPOCH) Optional.empty() else Optional.of(partition.currentLeaderEpoch), @@ -1538,33 +1552,34 @@ class ReplicaManager(val config: KafkaConfig, throw new IllegalStateException(s"Unexpected result holder state $resultHolder") } } - statusByPartition += topicPartition -> status + statusByPartition += topicIdPartition -> status } catch { // NOTE: These exceptions are special cases since these error messages are typically transient or the client // would have received a clear exception and there is no value in logging the entire stack trace for the same case e @ (_ : UnknownTopicOrPartitionException | + _ : UnknownTopicIdException | _ : NotLeaderOrFollowerException | _ : UnknownLeaderEpochException | _ : FencedLeaderEpochException | _ : KafkaStorageException | _ : UnsupportedForMessageFormatException) => debug(s"Offset request with correlation id $correlationId from client $clientId on " + - s"partition $topicPartition failed due to ${e.getMessage}") - statusByPartition += topicPartition -> + s"partition $topicIdPartition.topicPartition() failed due to ${e.getMessage}") + statusByPartition += topicIdPartition -> ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.forException(e), partition))).build() // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE case e: OffsetNotAvailableException => if (version >= 5) { - statusByPartition += topicPartition -> + statusByPartition += topicIdPartition -> ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.forException(e), partition))).build() } else { - statusByPartition += topicPartition -> + statusByPartition += topicIdPartition -> ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.LEADER_NOT_AVAILABLE, partition))).build() } case e: Throwable => error("Error while responding to offset request", e) - statusByPartition += topicPartition -> + statusByPartition += topicIdPartition -> ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.forException(e), partition))).build() } } @@ -1581,15 +1596,18 @@ class ReplicaManager(val config: KafkaConfig, delayedRemoteListOffsetsPurgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys.asJava) } else { // we can respond immediately - val responseTopics = statusByPartition.groupBy(e => e._1.topic()).map { - case (topic, status) => - new ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s => Some(s.responseOpt.get())).toList.asJava) + val responseTopics = statusByPartition.groupBy(e => (e._1.topic(), e._1.topicId())).map { + case ((topicName, topicId), statuses) => + new ListOffsetsTopicResponse() + .setName(topicName) + .setTopicId(topicId) + .setPartitions(statuses.values.flatMap(s => Some(s.responseOpt.get())).toList.asJava) }.toList responseCallback.accept(responseTopics.asJava) } } - private def delayedRemoteListOffsetsRequired(responseByPartition: Map[TopicPartition, ListOffsetsPartitionStatus]): Boolean = { + private def delayedRemoteListOffsetsRequired(responseByPartition: Map[TopicIdPartition, ListOffsetsPartitionStatus]): Boolean = { responseByPartition.values.exists(status => status.futureHolderOpt.isPresent) } @@ -2569,4 +2587,4 @@ class ReplicaManager(val config: KafkaConfig, () => () ) } -} +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 4d4fbe5b71084..c887c796ad830 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -4246,6 +4246,154 @@ class KafkaApisTest extends Logging { testConsumerListOffsetWithUnsupportedVersion(-6, 1) } + @Test + def testHandleListOffsetRequestWithTopicId(): Unit = { + val tp = new TopicPartition("foo", 0) + val topicId = Uuid.randomUuid() + val isolationLevel = IsolationLevel.READ_UNCOMMITTED + val currentLeaderEpoch = Optional.of[Integer](15) + + // Add topic to metadata cache + addTopicToMetadataCache(tp.topic(), numPartitions = 1, topicId = topicId) + + when(replicaManager.fetchOffset( + ArgumentMatchers.any[Seq[ListOffsetsTopic]](), + ArgumentMatchers.eq(Set.empty[TopicPartition]), + ArgumentMatchers.eq(isolationLevel), + ArgumentMatchers.eq(ListOffsetsRequest.CONSUMER_REPLICA_ID), + ArgumentMatchers.eq[String](clientId), + ArgumentMatchers.anyInt(), // correlationId + ArgumentMatchers.anyShort(), // version + ArgumentMatchers.any[(Errors, ListOffsetsPartition) => ListOffsetsPartitionResponse](), + ArgumentMatchers.any[Consumer[util.Collection[ListOffsetsTopicResponse]]], + ArgumentMatchers.anyInt() // timeoutMs + )).thenAnswer(ans => { + val callback = ans.getArgument[Consumer[util.List[ListOffsetsTopicResponse]]](8) + val partitionResponse = new ListOffsetsPartitionResponse() + .setErrorCode(Errors.NONE.code()) + .setOffset(42L) + .setTimestamp(123456L) + .setPartitionIndex(tp.partition()) + callback.accept(util.List.of(new ListOffsetsTopicResponse() + .setName(tp.topic()) + .setTopicId(topicId) + .setPartitions(util.List.of(partitionResponse)))) + }) + + // Version 12: use topic ID + val targetTimes = util.List.of(new ListOffsetsTopic() + .setName(tp.topic) + .setTopicId(topicId) + .setPartitions(util.List.of(new ListOffsetsPartition() + .setPartitionIndex(tp.partition) + .setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP) + .setCurrentLeaderEpoch(currentLeaderEpoch.get)))) + val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, true) + .setTargetTimes(targetTimes).build(12.toShort) + val request = buildRequest(listOffsetRequest) + when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), + any[Long])).thenReturn(0) + kafkaApis = createKafkaApis() + kafkaApis.handleListOffsetRequest(request) + + val response = verifyNoThrottling[ListOffsetsResponse](request) + // v12 does not return topic name, return topic id instead + assertTrue(response.topics.asScala.exists(_.name == ""), s"Topic ${tp.topic} not found in response. Found: ${response.topics.asScala.map(_.name).mkString(", ")}") + val topicResponse = response.topics.asScala.find(_.name == "").get + assertEquals(topicId, topicResponse.topicId) + val partitionData = topicResponse.partitions.asScala.find(_.partitionIndex == tp.partition).get + assertEquals(Errors.NONE.code, partitionData.errorCode) + assertEquals(42L, partitionData.offset) + assertEquals(123456L, partitionData.timestamp) + } + + @Test + def testHandleListOffsetRequestWithUnknownTopicId(): Unit = { + val tp = new TopicPartition("foo", 0) + val unknownTopicId = Uuid.randomUuid() + val isolationLevel = IsolationLevel.READ_UNCOMMITTED + val currentLeaderEpoch = Optional.of[Integer](15) + + // Don't add topic to metadata cache - simulate unknown topic ID + + // Version 12: use unknown topic ID + val targetTimes = util.List.of(new ListOffsetsTopic() + .setName(tp.topic) + .setTopicId(unknownTopicId) + .setPartitions(util.List.of(new ListOffsetsPartition() + .setPartitionIndex(tp.partition) + .setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP) + .setCurrentLeaderEpoch(currentLeaderEpoch.get)))) + val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, true) + .setTargetTimes(targetTimes).build(12.toShort) + val request = buildRequest(listOffsetRequest) + when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), + any[Long])).thenReturn(0) + kafkaApis = createKafkaApis() + kafkaApis.handleListOffsetRequest(request) + + val response = verifyNoThrottling[ListOffsetsResponse](request) + val topicResponse = response.topics.asScala.find(_.topicId == unknownTopicId).get + assertEquals(unknownTopicId, topicResponse.topicId) + val partitionData = topicResponse.partitions.asScala.find(_.partitionIndex == tp.partition).get + assertEquals(Errors.UNKNOWN_TOPIC_ID.code, partitionData.errorCode) + } + + @Test + def testHandleListOffsetRequestVersion11WithTopicName(): Unit = { + val tp = new TopicPartition("foo", 0) + val isolationLevel = IsolationLevel.READ_UNCOMMITTED + val currentLeaderEpoch = Optional.of[Integer](15) + + // Add topic to metadata cache + addTopicToMetadataCache(tp.topic(), 1) + + when(replicaManager.fetchOffset( + ArgumentMatchers.any[Seq[ListOffsetsTopic]](), + ArgumentMatchers.eq(Set.empty[TopicPartition]), + ArgumentMatchers.eq(isolationLevel), + ArgumentMatchers.eq(ListOffsetsRequest.CONSUMER_REPLICA_ID), + ArgumentMatchers.eq[String](clientId), + ArgumentMatchers.anyInt(), // correlationId + ArgumentMatchers.anyShort(), // version + ArgumentMatchers.any[(Errors, ListOffsetsPartition) => ListOffsetsPartitionResponse](), + ArgumentMatchers.any[Consumer[util.Collection[ListOffsetsTopicResponse]]], + ArgumentMatchers.anyInt() // timeoutMs + )).thenAnswer(ans => { + val callback = ans.getArgument[Consumer[util.List[ListOffsetsTopicResponse]]](8) + val partitionResponse = new ListOffsetsPartitionResponse() + .setErrorCode(Errors.NONE.code()) + .setOffset(42L) + .setTimestamp(123456L) + .setPartitionIndex(tp.partition()) + callback.accept(util.List.of(new ListOffsetsTopicResponse() + .setName(tp.topic()) + .setPartitions(util.List.of(partitionResponse)))) + }) + + // Version 11: use topic name + val targetTimes = util.List.of(new ListOffsetsTopic() + .setName(tp.topic) + .setPartitions(util.List.of(new ListOffsetsPartition() + .setPartitionIndex(tp.partition) + .setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP) + .setCurrentLeaderEpoch(currentLeaderEpoch.get)))) + val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false) + .setTargetTimes(targetTimes).build(11.toShort) + val request = buildRequest(listOffsetRequest) + when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), + any[Long])).thenReturn(0) + kafkaApis = createKafkaApis() + kafkaApis.handleListOffsetRequest(request) + + val response = verifyNoThrottling[ListOffsetsResponse](request) + val topicResponse = response.topics.asScala.find(_.name == tp.topic).get + val partitionData = topicResponse.partitions.asScala.find(_.partitionIndex == tp.partition).get + assertEquals(Errors.NONE.code, partitionData.errorCode) + assertEquals(42L, partitionData.offset) + assertEquals(123456L, partitionData.timestamp) + } + /** * Verifies that the metadata response is correct if the broker listeners are inconsistent (i.e. one broker has * more listeners than another) and the request is sent on the listener that exists in both brokers. diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala index 304e63602a3d6..a6117386d93e2 100644 --- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala @@ -22,7 +22,7 @@ import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartit import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse} import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource -import org.apache.kafka.common.{IsolationLevel, TopicPartition} +import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid} import org.apache.kafka.server.config.ServerConfigs import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test @@ -46,6 +46,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { @Test def testListOffsetsErrorCodes(): Unit = { + // Use version 11 which supports topic names instead of version 12 which requires topic IDs val targetTimes = List(new ListOffsetsTopic() .setName(topic) .setPartitions(List(new ListOffsetsPartition() @@ -56,17 +57,17 @@ class ListOffsetsRequestTest extends BaseRequestTest { val consumerRequest = ListOffsetsRequest.Builder .forConsumer(false, IsolationLevel.READ_UNCOMMITTED) .setTargetTimes(targetTimes) - .build() + .build(11.toShort) val replicaRequest = ListOffsetsRequest.Builder - .forReplica(ApiKeys.LIST_OFFSETS.latestVersion, brokers.head.config.brokerId) + .forReplica(11.toShort, brokers.head.config.brokerId) .setTargetTimes(targetTimes) - .build() + .build(11.toShort) val debugReplicaRequest = ListOffsetsRequest.Builder - .forReplica(ApiKeys.LIST_OFFSETS.latestVersion, ListOffsetsRequest.DEBUGGING_REPLICA_ID) + .forReplica(11.toShort, ListOffsetsRequest.DEBUGGING_REPLICA_ID) .setTargetTimes(targetTimes) - .build() + .build(11.toShort) // Unknown topic val randomBrokerId = brokers.head.config.brokerId @@ -138,11 +139,27 @@ class ListOffsetsRequestTest extends BaseRequestTest { private[this] def sendRequest(serverId: Int, timestamp: Long, version: Short): ListOffsetsPartitionResponse = { - val targetTimes = List(new ListOffsetsTopic() - .setName(topic) - .setPartitions(List(new ListOffsetsPartition() - .setPartitionIndex(partition.partition) - .setTimestamp(timestamp)).asJava)).asJava + // For version >= 12, we need to get the actual topic ID + val listOffsetsTopic = new ListOffsetsTopic() + if (version >= 12) { + try { + val topicDescription = createAdminClient().describeTopics(Seq(partition.topic).asJava).allTopicNames.get + val topicId = topicDescription.get(partition.topic).topicId() + listOffsetsTopic.setTopicId(topicId) + } catch { + case _: Exception => + // Topic doesn't exist, use ZERO_UUID + listOffsetsTopic.setTopicId(Uuid.ZERO_UUID) + } + } else { + listOffsetsTopic.setName(topic) + } + + listOffsetsTopic.setPartitions(List(new ListOffsetsPartition() + .setPartitionIndex(partition.partition) + .setTimestamp(timestamp)).asJava) + + val targetTimes = List(listOffsetsTopic).asJava val builder = ListOffsetsRequest.Builder .forConsumer(false, IsolationLevel.READ_UNCOMMITTED) @@ -150,8 +167,14 @@ class ListOffsetsRequestTest extends BaseRequestTest { val request = if (version == -1) builder.build() else builder.build(version) - sendRequest(serverId, request).topics.asScala.find(_.name == topic).get - .partitions.asScala.find(_.partitionIndex == partition.partition).get + val response = sendRequest(serverId, request) + // For version >= 12, response uses topic ID instead of name + val topicResponse = if (version >= 12) { + response.topics.asScala.head + } else { + response.topics.asScala.find(_.name == topic).get + } + topicResponse.partitions.asScala.find(_.partitionIndex == partition.partition).get } // -1 indicate "latest" @@ -260,4 +283,123 @@ class ListOffsetsRequestTest extends BaseRequestTest { def createTopic(numPartitions: Int, replicationFactor: Int): Map[Int, Int] = { super.createTopic(topic, numPartitions, replicationFactor) } + + @Test + def testListOffsetsWithTopicId(): Unit = { + // Create topic + val partitionToLeader = createTopic(numPartitions = 1, replicationFactor = 2) + val leader = partitionToLeader(partition.partition) + + // Get the actual topic ID + val topicDescription = createAdminClient().describeTopics(Seq(partition.topic).asJava).allTopicNames.get + val topicId = topicDescription.get(partition.topic).topicId() + + // Produce some messages + TestUtils.generateAndProduceMessages(brokers, topic, 10) + + // Test with correct topic ID (version 12) + val targetTimesWithCorrectId = List(new ListOffsetsTopic() + .setTopicId(topicId) + .setPartitions(List(new ListOffsetsPartition() + .setPartitionIndex(partition.partition) + .setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava + + val requestWithCorrectId = ListOffsetsRequest.Builder + .forReplica(12.toShort, brokers.head.config.brokerId) + .setTargetTimes(targetTimesWithCorrectId) + .build(12.toShort) + + val responseWithCorrectId = sendRequest(leader, requestWithCorrectId) + assertEquals(1, responseWithCorrectId.topics.size) + val topicResponse = responseWithCorrectId.topics.asScala.head + assertEquals(1, topicResponse.partitions.size) + val partitionResponse = topicResponse.partitions.asScala.head + assertEquals(Errors.NONE.code, partitionResponse.errorCode) + assertEquals(10L, partitionResponse.offset) + } + + @Test + def testListOffsetsWithIncorrectTopicId(): Unit = { + // Create topic + val partitionToLeader = createTopic(numPartitions = 1, replicationFactor = 2) + val leader = partitionToLeader(partition.partition) + + // Produce some messages + TestUtils.generateAndProduceMessages(brokers, topic, 10) + + // Test with incorrect topic ID (version 12) + val randomTopicId = Uuid.randomUuid() + val targetTimesWithIncorrectId = List(new ListOffsetsTopic() + .setTopicId(randomTopicId) + .setPartitions(List(new ListOffsetsPartition() + .setPartitionIndex(partition.partition) + .setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava + + val requestWithIncorrectId = ListOffsetsRequest.Builder + .forReplica(12.toShort, brokers.head.config.brokerId) + .setTargetTimes(targetTimesWithIncorrectId) + .build(12.toShort) + + val responseWithIncorrectId = sendRequest(leader, requestWithIncorrectId) + assertEquals(1, responseWithIncorrectId.topics.size) + val topicResponse = responseWithIncorrectId.topics.asScala.head + assertEquals(1, topicResponse.partitions.size) + val partitionResponse = topicResponse.partitions.asScala.head + assertEquals(Errors.UNKNOWN_TOPIC_ID.code, partitionResponse.errorCode) + } + + @Test + def testListOffsetsVersion12RequiresTopicId(): Unit = { + // Create topic + createTopic(numPartitions = 1, replicationFactor = 2) + + // Produce some messages + TestUtils.generateAndProduceMessages(brokers, topic, 10) + + // Test version 12 with ZERO_UUID - should throw UnsupportedVersionException when building request + val targetTimesWithZeroUuid = List(new ListOffsetsTopic() + .setTopicId(Uuid.ZERO_UUID) + .setPartitions(List(new ListOffsetsPartition() + .setPartitionIndex(partition.partition) + .setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava + + // Version 12 requires non-zero topic ID, so building the request should throw exception + assertThrows(classOf[org.apache.kafka.common.errors.UnsupportedVersionException], () => { + ListOffsetsRequest.Builder + .forReplica(12.toShort, brokers.head.config.brokerId) + .setTargetTimes(targetTimesWithZeroUuid) + .build(12.toShort) + }) + } + + @Test + def testListOffsetsVersion11UsesTopicName(): Unit = { + // Create topic + val partitionToLeader = createTopic(numPartitions = 1, replicationFactor = 2) + val leader = partitionToLeader(partition.partition) + + // Produce some messages + TestUtils.generateAndProduceMessages(brokers, topic, 10) + + // Test version 11 with topic name (no topic ID needed) + val targetTimesWithName = List(new ListOffsetsTopic() + .setName(topic) + .setPartitions(List(new ListOffsetsPartition() + .setPartitionIndex(partition.partition) + .setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava + + val requestWithName = ListOffsetsRequest.Builder + .forReplica(11.toShort, brokers.head.config.brokerId) + .setTargetTimes(targetTimesWithName) + .build(11.toShort) + + val responseWithName = sendRequest(leader, requestWithName) + assertEquals(1, responseWithName.topics.size) + val topicResponse = responseWithName.topics.asScala.head + assertEquals(topic, topicResponse.name) + assertEquals(1, topicResponse.partitions.size) + val partitionResponse = topicResponse.partitions.asScala.head + assertEquals(Errors.NONE.code, partitionResponse.errorCode) + assertEquals(10L, partitionResponse.offset) + } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 753b831b594df..d97e6b7e51c56 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -37,6 +37,8 @@ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.errors.InvalidPidMappingException import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.{DeleteRecordsResponseData, FetchResponseData, ShareFetchResponseData} +import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsTopic, ListOffsetsPartition} +import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsTopicResponse, ListOffsetsPartitionResponse} import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.metadata.{PartitionChangeRecord, PartitionRecord, RemoveTopicRecord, TopicRecord} import org.apache.kafka.common.metrics.Metrics @@ -93,7 +95,7 @@ import java.io.{ByteArrayInputStream, File} import java.net.InetAddress import java.nio.file.{Files, Paths} import java.util -import java.util.concurrent.atomic.{AtomicLong, AtomicReference} +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference} import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap, CountDownLatch, Future, TimeUnit} import java.util.function.{BiConsumer, Consumer} import java.util.stream.IntStream @@ -6096,6 +6098,140 @@ class ReplicaManagerTest { } } } + + @Test + def testFetchOffsetWithInconsistentTopicId(): Unit = { + // Use class-level topicId as the correct one, and create a wrong one + val wrongTopicId = Uuid.randomUuid() + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2)) + + try { + // Create topic with class-level topicId in metadata cache + val delta = topicsCreateDelta(0, isStartIdLeader = true, partitions = List(0), topicName = topic, topicId = topicId) + val image = imageFromTopics(delta.apply()) + replicaManager.applyDelta(delta, image) + + val responseReceived = new AtomicBoolean(false) + val responseTopics = new AtomicReference[util.Collection[ListOffsetsTopicResponse]]() + + val buildErrorResponse = (error: Errors, partition: ListOffsetsPartition) => { + new ListOffsetsPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(error.code) + .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetsResponse.UNKNOWN_OFFSET) + } + + val responseCallback: Consumer[util.Collection[ListOffsetsTopicResponse]] = (topics: util.Collection[ListOffsetsTopicResponse]) => { + responseTopics.set(topics) + responseReceived.set(true) + } + + val listOffsetsTopic = new ListOffsetsTopic() + .setName(topic) + .setTopicId(wrongTopicId) // Use wrong topic ID + .setPartitions(util.Arrays.asList( + new ListOffsetsPartition() + .setPartitionIndex(0) + .setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP))) + + // Call fetchOffset with wrong topic ID + replicaManager.fetchOffset( + topics = Seq(listOffsetsTopic), + duplicatePartitions = Set.empty, + isolationLevel = IsolationLevel.READ_UNCOMMITTED, + replicaId = ListOffsetsRequest.CONSUMER_REPLICA_ID, + clientId = "test-client", + correlationId = 1, + version = 12, + buildErrorResponse = buildErrorResponse, + responseCallback = responseCallback, + timeoutMs = 0) + + // Verify response contains INCONSISTENT_TOPIC_ID error + assertTrue(responseReceived.get(), "Response should be received") + val topics = responseTopics.get() + assertEquals(1, topics.size(), "Should have 1 topic in response") + val topicResponse = topics.iterator().next() + assertEquals(topic, topicResponse.name()) + assertEquals(wrongTopicId, topicResponse.topicId()) + assertEquals(1, topicResponse.partitions().size()) + val partitionResponse = topicResponse.partitions().get(0) + assertEquals(0, partitionResponse.partitionIndex()) + assertEquals(Errors.INCONSISTENT_TOPIC_ID.code, partitionResponse.errorCode()) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testFetchOffsetWithZeroUuid(): Unit = { + val tp = new TopicPartition(topic, 0) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2)) + + try { + // Create topic with class-level topicId in metadata cache (legacy clients use ZERO_UUID) + val delta = topicsCreateDelta(0, isStartIdLeader = true, partitions = List(0), topicName = topic, topicId = topicId) + val image = imageFromTopics(delta.apply()) + replicaManager.applyDelta(delta, image) + + // Append some records to create offsets + val records = MemoryRecords.withRecords(Compression.NONE, + new SimpleRecord("message1".getBytes), + new SimpleRecord("message2".getBytes)) + appendRecords(replicaManager, tp, records) + + val responseReceived = new AtomicBoolean(false) + val responseTopics = new AtomicReference[util.Collection[ListOffsetsTopicResponse]]() + + val buildErrorResponse = (error: Errors, partition: ListOffsetsPartition) => { + new ListOffsetsPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(error.code) + .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetsResponse.UNKNOWN_OFFSET) + } + + val responseCallback: Consumer[util.Collection[ListOffsetsTopicResponse]] = (topics: util.Collection[ListOffsetsTopicResponse]) => { + responseTopics.set(topics) + responseReceived.set(true) + } + + val listOffsetsTopic = new ListOffsetsTopic() + .setName(topic) + .setTopicId(Uuid.ZERO_UUID) // Use ZERO_UUID for backward compatibility + .setPartitions(util.Arrays.asList( + new ListOffsetsPartition() + .setPartitionIndex(0) + .setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP))) + + // Call fetchOffset with ZERO_UUID (legacy behavior) + replicaManager.fetchOffset( + topics = Seq(listOffsetsTopic), + duplicatePartitions = Set.empty, + isolationLevel = IsolationLevel.READ_UNCOMMITTED, + replicaId = ListOffsetsRequest.CONSUMER_REPLICA_ID, + clientId = "test-client", + correlationId = 1, + version = 11, // Version 11 uses topic names + buildErrorResponse = buildErrorResponse, + responseCallback = responseCallback, + timeoutMs = 0) + + // Verify response - should succeed with ZERO_UUID + assertTrue(responseReceived.get(), "Response should be received") + val topics = responseTopics.get() + assertEquals(1, topics.size(), "Should have 1 topic in response") + val topicResponse = topics.iterator().next() + assertEquals(topic, topicResponse.name()) + assertEquals(1, topicResponse.partitions().size()) + val partitionResponse = topicResponse.partitions().get(0) + assertEquals(0, partitionResponse.partitionIndex()) + assertEquals(Errors.NONE.code, partitionResponse.errorCode()) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } } class MockReplicaSelector extends ReplicaSelector { diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index 8bdce25808fa9..90c9bb6dd94f3 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -274,7 +274,9 @@ public short fetchRequestVersion() { } public short listOffsetRequestVersion() { - if (this.isAtLeast(IBP_4_2_IV1)) { + if (this.isAtLeast(IBP_4_3_IV0)) { + return 12; + } else if (this.isAtLeast(IBP_4_2_IV1)) { return 11; } else if (this.isAtLeast(IBP_4_0_IV3)) { return 10; diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java index e0bcfb7f146ec..fcd11087a0aac 100644 --- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java @@ -284,8 +284,8 @@ public void assertLatestIsNotProduction() { @ParameterizedTest @EnumSource(value = MetadataVersion.class) public void testListOffsetsValueVersion(MetadataVersion metadataVersion) { - final short expectedVersion = 11; - if (metadataVersion.isAtLeast(IBP_4_2_IV1)) { + final short expectedVersion = 12; + if (metadataVersion.isAtLeast(IBP_4_3_IV0)) { assertEquals(expectedVersion, metadataVersion.listOffsetRequestVersion()); } else { assertTrue(metadataVersion.listOffsetRequestVersion() < expectedVersion); diff --git a/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java b/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java index 200fb2262acb2..1b6ce507f0016 100644 --- a/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java +++ b/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.server.purgatory; +import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.message.ListOffsetsResponseData; @@ -53,13 +54,13 @@ public class DelayedRemoteListOffsets extends DelayedOperation { static final Map PARTITION_EXPIRATION_METERS = new ConcurrentHashMap<>(); private final int version; - private final Map statusByPartition; + private final Map statusByPartition; private final Consumer partitionOrException; private final Consumer> responseCallback; public DelayedRemoteListOffsets(long delayMs, int version, - Map statusByPartition, + Map statusByPartition, Consumer partitionOrException, Consumer> responseCallback) { super(delayMs); @@ -83,11 +84,11 @@ public DelayedRemoteListOffsets(long delayMs, */ @Override public void onExpiration() { - statusByPartition.forEach((topicPartition, status) -> { + statusByPartition.forEach((topicIdPartition, status) -> { if (!status.completed()) { - LOG.debug("Expiring list offset request for partition {} with status {}", topicPartition, status); + LOG.debug("Expiring list offset request for partition {} with status {}", topicIdPartition.topicPartition(), status); status.futureHolderOpt().ifPresent(futureHolder -> futureHolder.jobFuture().cancel(true)); - recordExpiration(topicPartition); + recordExpiration(topicIdPartition.topicPartition()); } }); } @@ -99,9 +100,13 @@ public void onExpiration() { @Override public void onComplete() { Map groupedByTopic = new HashMap<>(); - statusByPartition.forEach((tp, status) -> { - ListOffsetsResponseData.ListOffsetsTopicResponse response = groupedByTopic.computeIfAbsent(tp.topic(), k -> - new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(tp.topic())); + statusByPartition.forEach((topicIdPartition, status) -> { + ListOffsetsResponseData.ListOffsetsTopicResponse response = groupedByTopic.computeIfAbsent( + topicIdPartition.topic(), + k -> new ListOffsetsResponseData.ListOffsetsTopicResponse() + .setName(topicIdPartition.topic()) + .setTopicId(topicIdPartition.topicId()) + ); status.responseOpt().ifPresent(res -> response.partitions().add(res)); }); responseCallback.accept(groupedByTopic.values()); @@ -118,7 +123,7 @@ public boolean tryComplete() { statusByPartition.forEach((partition, status) -> { if (!status.completed()) { try { - partitionOrException.accept(partition); + partitionOrException.accept(partition.topicPartition()); } catch (ApiException e) { status.futureHolderOpt().ifPresent(futureHolder -> { futureHolder.jobFuture().cancel(false); diff --git a/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java index 6586bf35ae091..b7701743e38ce 100644 --- a/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java +++ b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.server.purgatory; +import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.message.ListOffsetsResponseData; import org.apache.kafka.common.protocol.Errors; @@ -82,10 +84,10 @@ public void testResponseOnRequestExpiration() throws InterruptedException { return true; }); - Map statusByPartition = Map.of( - new TopicPartition("test", 0), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), - new TopicPartition("test", 1), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), - new TopicPartition("test1", 0), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build() + Map statusByPartition = Map.of( + new TopicIdPartition(Uuid.randomUuid(), 0, "test"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicIdPartition(Uuid.randomUuid(), 1, "test"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicIdPartition(Uuid.randomUuid(), 0, "test1"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build() ); DelayedRemoteListOffsets delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, 5, statusByPartition, partitionOrException, responseCallback); @@ -133,10 +135,10 @@ public void testResponseOnSuccess() { return true; }); - Map statusByPartition = Map.of( - new TopicPartition("test", 0), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), - new TopicPartition("test", 1), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), - new TopicPartition("test1", 0), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build() + Map statusByPartition = Map.of( + new TopicIdPartition(Uuid.randomUuid(), 0, "test"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicIdPartition(Uuid.randomUuid(), 1, "test"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicIdPartition(Uuid.randomUuid(), 0, "test1"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build() ); DelayedRemoteListOffsets delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, 5, statusByPartition, partitionOrException, responseCallback); @@ -188,10 +190,10 @@ public void testResponseOnPartialError() { when(errorFutureHolder.taskFuture()).thenAnswer(f -> errorTaskFuture); when(errorFutureHolder.jobFuture()).thenReturn(jobFuture); - Map statusByPartition = Map.of( - new TopicPartition("test", 0), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), - new TopicPartition("test", 1), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), - new TopicPartition("test1", 0), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build() + Map statusByPartition = Map.of( + new TopicIdPartition(Uuid.randomUuid(), 0, "test"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicIdPartition(Uuid.randomUuid(), 1, "test"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicIdPartition(Uuid.randomUuid(), 0, "test1"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build() ); DelayedRemoteListOffsets delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, 5, statusByPartition, partitionOrException, responseCallback); @@ -244,11 +246,11 @@ public void testPartialResponseWhenNotLeaderOrFollowerExceptionOnOnePartition() when(errorFutureHolder.taskFuture()).thenAnswer(f -> errorTaskFuture); when(errorFutureHolder.jobFuture()).thenReturn(jobFuture); - Map statusByPartition = Map.of( - new TopicPartition("test", 0), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), - new TopicPartition("test", 1), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), - new TopicPartition("test1", 0), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build(), - new TopicPartition("test1", 1), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build() + Map statusByPartition = Map.of( + new TopicIdPartition(Uuid.randomUuid(), 0, "test"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicIdPartition(Uuid.randomUuid(), 1, "test"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicIdPartition(Uuid.randomUuid(), 0, "test1"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build(), + new TopicIdPartition(Uuid.randomUuid(), 1, "test1"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build() ); DelayedRemoteListOffsets delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, 5, statusByPartition, partitionOrException, responseCallback);