Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,14 @@ public Map<String, Uuid> topicIds() {
return metadataSnapshot.topicIds();
}

public Uuid getTopicIdByName(String topic) {
return metadataSnapshot.topicIds().getOrDefault(topic, Uuid.ZERO_UUID);
}

public String getTopicNameById(Uuid topicid) {
return metadataSnapshot.topicNames().get(topicid);
}

public synchronized LeaderAndEpoch currentLeader(TopicPartition topicPartition) {
Optional<MetadataResponse.PartitionMetadata> maybeMetadata = partitionMetadataIfCurrent(topicPartition);
if (maybeMetadata.isEmpty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
Expand All @@ -54,6 +55,7 @@
import static org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.regroupFetchPositionsByLeader;
import static org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.topicsForPartitions;


/**
* {@link OffsetFetcher} is responsible for fetching the {@link OffsetAndTimestamp offsets} for
* a given set of {@link TopicPartition topic and partition pairs} and for validation and resetting of positions,
Expand Down Expand Up @@ -393,9 +395,14 @@ private Map<Node, Map<TopicPartition, ListOffsetsPartition>> groupListOffsetRequ
private RequestFuture<ListOffsetResult> sendListOffsetRequest(final Node node,
final Map<TopicPartition, ListOffsetsPartition> timestampsToSearch,
boolean requireTimestamp) {
boolean canUseTopicIds = !timestampsToSearch.isEmpty() && timestampsToSearch.keySet().stream()
.map(tp -> metadata.getTopicIdByName(tp.topic()))
.allMatch(topicId -> topicId != null && !topicId.equals(Uuid.ZERO_UUID));


ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
.forConsumer(requireTimestamp, isolationLevel)
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(timestampsToSearch))
.forConsumer(requireTimestamp, isolationLevel, canUseTopicIds)
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(timestampsToSearch, metadata))
.setTimeoutMs(requestTimeoutMs);

log.debug("Sending ListOffsetRequest {} to broker {}", builder, node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.message.ApiVersionsResponseData;
Expand Down Expand Up @@ -110,8 +111,13 @@ OffsetFetcherUtils.ListOffsetResult handleListOffsetResponse(ListOffsetsResponse
Set<String> unauthorizedTopics = new HashSet<>();

for (ListOffsetsResponseData.ListOffsetsTopicResponse topic : listOffsetsResponse.topics()) {
String topicName = topic.name();
if (topic.topicId() != null && topic.topicId() != Uuid.ZERO_UUID) {
topicName = metadata.getTopicNameById(topic.topicId());
}

for (ListOffsetsResponseData.ListOffsetsPartitionResponse partition : topic.partitions()) {
TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex());
TopicPartition topicPartition = new TopicPartition(topicName, partition.partitionIndex());
Errors error = Errors.forCode(partition.errorCode());
switch (error) {
case NONE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.requests.AbstractRequest;
Expand Down Expand Up @@ -615,9 +616,14 @@ private CompletableFuture<ListOffsetResult> buildListOffsetRequestToNode(
Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> targetTimes,
boolean requireTimestamps,
List<NetworkClientDelegate.UnsentRequest> unsentRequests) {

boolean canUseTopicIds = !targetTimes.isEmpty() && targetTimes.keySet().stream()
.map(tp -> metadata.getTopicIdByName(tp.topic()))
.allMatch(topicId -> topicId != null && !topicId.equals(Uuid.ZERO_UUID));

ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
.forConsumer(requireTimestamps, isolationLevel)
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes))
.forConsumer(requireTimestamps, isolationLevel, canUseTopicIds)
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes, metadata))
.setTimeoutMs(requestTimeoutMs);

log.debug("Creating ListOffset request {} for broker {} to reset positions", builder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
Expand Down Expand Up @@ -60,7 +63,13 @@ public static class Builder extends AbstractRequest.Builder<ListOffsetsRequest>

public static Builder forConsumer(boolean requireTimestamp,
IsolationLevel isolationLevel) {
return forConsumer(requireTimestamp, isolationLevel, false, false, false, false);
return forConsumer(requireTimestamp, isolationLevel, false, false, false, false, false);
}

public static Builder forConsumer(boolean requireTimestamp,
IsolationLevel isolationLevel,
boolean canUseTopicIds) {
return forConsumer(requireTimestamp, isolationLevel, false, false, false, false, canUseTopicIds);
}

public static Builder forConsumer(boolean requireTimestamp,
Expand All @@ -69,6 +78,18 @@ public static Builder forConsumer(boolean requireTimestamp,
boolean requireEarliestLocalTimestamp,
boolean requireTieredStorageTimestamp,
boolean requireEarliestPendingUploadTimestamp) {
return forConsumer(requireTimestamp, isolationLevel, requireMaxTimestamp,
requireEarliestLocalTimestamp, requireTieredStorageTimestamp,
requireEarliestPendingUploadTimestamp, false);
}

public static Builder forConsumer(boolean requireTimestamp,
IsolationLevel isolationLevel,
boolean requireMaxTimestamp,
boolean requireEarliestLocalTimestamp,
boolean requireTieredStorageTimestamp,
boolean requireEarliestPendingUploadTimestamp,
boolean canUseTopicIds) {
short minVersion = ApiKeys.LIST_OFFSETS.oldestVersion();
if (requireEarliestPendingUploadTimestamp)
minVersion = 11;
Expand All @@ -82,7 +103,10 @@ else if (isolationLevel == IsolationLevel.READ_COMMITTED)
minVersion = 2;
else if (requireTimestamp)
minVersion = 1;
return new Builder(minVersion, ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel);

// When canUseTopicIds is false, limit maxVersion to 11 to use name-based protocol
short maxVersion = canUseTopicIds ? ApiKeys.LIST_OFFSETS.latestVersion() : (short) 11;
return new Builder(minVersion, maxVersion, CONSUMER_REPLICA_ID, isolationLevel);
}

public static Builder forReplica(short allowedVersion, int replicaId) {
Expand Down Expand Up @@ -111,6 +135,22 @@ public Builder setTimeoutMs(int timeoutMs) {

@Override
public ListOffsetsRequest build(short version) {
if (version >= 12) {
data.topics().forEach(topic -> {
if (topic.topicId() == null || topic.topicId().equals(Uuid.ZERO_UUID)) {
throw new UnsupportedVersionException("The broker offset commit api version " +
version + " does require usage of topic ids.");
}
});
} else {
data.topics().forEach(topic -> {
if (topic.name() == null || topic.name().isEmpty()) {
throw new UnsupportedVersionException("The broker offset commit api version " +
version + " does require usage of topic names.");
}
});
}

return new ListOffsetsRequest(data, version);
}

Expand Down Expand Up @@ -144,14 +184,16 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {

List<ListOffsetsTopicResponse> responses = new ArrayList<>();
for (ListOffsetsTopic topic : data.topics()) {
ListOffsetsTopicResponse topicResponse = new ListOffsetsTopicResponse().setName(topic.name());
ListOffsetsTopicResponse topicResponse = new ListOffsetsTopicResponse()
.setName(topic.name())
.setTopicId(topic.topicId());
List<ListOffsetsPartitionResponse> partitions = new ArrayList<>();
for (ListOffsetsPartition partition : topic.partitions()) {
ListOffsetsPartitionResponse partitionResponse = new ListOffsetsPartitionResponse()
.setErrorCode(errorCode)
.setPartitionIndex(partition.partitionIndex());
partitionResponse.setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
.setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP);
.setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP);
partitions.add(partitionResponse);
}
topicResponse.setPartitions(partitions);
Expand Down Expand Up @@ -192,13 +234,17 @@ public static ListOffsetsRequest parse(Readable readable, short version) {
return new ListOffsetsRequest(new ListOffsetsRequestData(readable, version), version);
}

public static List<ListOffsetsTopic> toListOffsetsTopics(Map<TopicPartition, ListOffsetsPartition> timestampsToSearch) {
public static List<ListOffsetsTopic> toListOffsetsTopics(
Map<TopicPartition, ListOffsetsPartition> timestampsToSearch,
ConsumerMetadata metadata
) {
Map<String, ListOffsetsTopic> topics = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsPartition> entry : timestampsToSearch.entrySet()) {
TopicPartition tp = entry.getKey();
ListOffsetsTopic topic = topics.computeIfAbsent(tp.topic(), k -> new ListOffsetsTopic().setName(tp.topic()));
ListOffsetsTopic topic = topics.computeIfAbsent(tp.topic(), k -> new ListOffsetsTopic()
.setName(tp.topic()).setTopicId(metadata.getTopicIdByName(tp.topic())));
topic.partitions().add(entry.getValue());
}
return new ArrayList<>(topics.values());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ public boolean shouldClientThrottle(short version) {
return version >= 3;
}

public static boolean useTopicIds(short version) {
return version >= 12;
}

public static ListOffsetsTopicResponse singletonListOffsetsTopicResponse(TopicPartition tp, Errors error, long timestamp, long offset, int epoch) {
return new ListOffsetsTopicResponse()
.setName(tp.topic())
Expand All @@ -112,4 +116,6 @@ public static ListOffsetsTopicResponse singletonListOffsetsTopicResponse(TopicPa
.setOffset(offset)
.setLeaderEpoch(epoch)));
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
// Version 10 enables async remote list offsets support (KIP-1075)
//
// Version 11 enables listing offsets by earliest pending upload offset (KIP-1023)
"validVersions": "1-11",
//
// Version 12 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code.
"validVersions": "1-12",
"flexibleVersions": "6+",
"latestVersionUnstable": false,
"fields": [
Expand All @@ -52,8 +54,10 @@
"about": "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records." },
{ "name": "Topics", "type": "[]ListOffsetsTopic", "versions": "0+",
"about": "Each topic in the request.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
{ "name": "Name", "type": "string", "versions": "0-11", "entityType": "topicName", "ignorable": true,
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "12+", "ignorable": true,
"about": "The unique topic ID."},
{ "name": "Partitions", "type": "[]ListOffsetsPartition", "versions": "0+",
"about": "Each partition in the request.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,19 @@
// Version 10 enables async remote list offsets support (KIP-1075)
//
// Version 11 enables listing offsets by earliest pending upload offset (KIP-1023)
"validVersions": "1-11",
//
// Version 12 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code.
"validVersions": "1-12",
"flexibleVersions": "6+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Topics", "type": "[]ListOffsetsTopicResponse", "versions": "0+",
"about": "Each topic in the response.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
{ "name": "Name", "type": "string", "versions": "0-11", "entityType": "topicName", "ignorable": true,
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "12+", "ignorable": true,
"about": "The unique topic ID."},
{ "name": "Partitions", "type": "[]ListOffsetsPartitionResponse", "versions": "0+",
"about": "Each partition in the response.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,7 @@ public void testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() {
List<ListOffsetsTopicResponse> topics = Collections.singletonList(
new ListOffsetsTopicResponse()
.setName(tp0.topic())
.setTopicId(topicId)
.setPartitions(Arrays.asList(
tp0NoError,
new ListOffsetsPartitionResponse()
Expand All @@ -798,6 +799,7 @@ public void testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() {
List<ListOffsetsTopic> expectedTopics = Collections.singletonList(
new ListOffsetsTopic()
.setName(tp0.topic())
.setTopicId(topicId)
.setPartitions(Arrays.asList(
new ListOffsetsPartition()
.setPartitionIndex(tp1.partition())
Expand All @@ -820,6 +822,7 @@ public void testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() {
List<ListOffsetsTopicResponse> topicsWithFatalError = Collections.singletonList(
new ListOffsetsTopicResponse()
.setName(tp0.topic())
.setTopicId(topicId)
.setPartitions(Arrays.asList(
tp0NoError,
new ListOffsetsPartitionResponse()
Expand Down Expand Up @@ -1015,6 +1018,7 @@ public void testBatchedListOffsetsMetadataErrors() {
.setThrottleTimeMs(0)
.setTopics(Collections.singletonList(new ListOffsetsTopicResponse()
.setName(tp0.topic())
.setTopicId(topicId)
.setPartitions(Arrays.asList(
new ListOffsetsPartitionResponse()
.setPartitionIndex(tp0.partition())
Expand Down Expand Up @@ -1095,6 +1099,7 @@ private void testGetOffsetsForTimesWithUnknownOffset() {
.setThrottleTimeMs(0)
.setTopics(Collections.singletonList(new ListOffsetsTopicResponse()
.setName(tp0.topic())
.setTopicId(topicId)
.setPartitions(Collections.singletonList(new ListOffsetsPartitionResponse()
.setPartitionIndex(tp0.partition())
.setErrorCode(Errors.NONE.code())
Expand Down Expand Up @@ -1665,6 +1670,7 @@ private ListOffsetsResponse listOffsetResponse(Map<TopicPartition, Long> offsets
for (Map.Entry<String, List<ListOffsetsPartitionResponse>> response : responses.entrySet()) {
topics.add(new ListOffsetsTopicResponse()
.setName(response.getKey())
.setTopicId(topicIds.getOrDefault(response.getKey(), Uuid.ZERO_UUID))
.setPartitions(response.getValue()));
}
ListOffsetsResponseData data = new ListOffsetsResponseData().setTopics(topics);
Expand Down
Loading