Skip to content

Commit

Permalink
addressed comment and fixed tests
Browse files Browse the repository at this point in the history
Update AsyncKafkaConsumer.java
  • Loading branch information
philipnee committed Mar 29, 2024
1 parent 6a02f76 commit 6fb9bd8
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 223 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsForTimeEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
Expand Down Expand Up @@ -95,6 +94,7 @@

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -1094,16 +1094,22 @@ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartitio
return Collections.emptyMap();
}
final Timer timer = time.timer(timeout);
final ListOffsetsForTimeEvent listOffsetsEvent = new ListOffsetsForTimeEvent(
timestampsToSearch,
timer);
ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
timestampsToSearch,
timer,
true);

// If timeout is set to zero return empty immediately; otherwise try to get the results
// and throw timeout exception if it cannot complete in time.
if (timeout.toMillis() == 0L)
return listOffsetsEvent.emptyResult();

return applicationEventHandler.addAndGet(listOffsetsEvent, timer);
return listOffsetsEvent.emptyResults();

return applicationEventHandler.addAndGet(listOffsetsEvent, timer)
.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().buildOffsetAndTimestamp()));
} finally {
release();
}
Expand Down Expand Up @@ -1143,23 +1149,27 @@ private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition
}

Map<TopicPartition, Long> timestampToSearch = partitions
.stream()
.collect(Collectors.toMap(Function.identity(), tp -> timestamp));
.stream()
.collect(Collectors.toMap(Function.identity(), tp -> timestamp));
Timer timer = time.timer(timeout);
ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
timestampToSearch,
timer);
timestampToSearch,
timer,
false);

Map<TopicPartition, Long> offsetAndTimestampMap;
Map<TopicPartition, OffsetAndTimestampInternal> offsetAndTimestampMap;
if (timeout.isZero()) {
applicationEventHandler.add(listOffsetsEvent);
offsetAndTimestampMap = listOffsetsEvent.emptyResult();
} else {
offsetAndTimestampMap = applicationEventHandler.addAndGet(
return listOffsetsEvent.emptyResults();
}
offsetAndTimestampMap = applicationEventHandler.addAndGet(
listOffsetsEvent,
timer);
}
return offsetAndTimestampMap;
return offsetAndTimestampMap.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().offset()));
} finally {
release();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.OffsetAndTimestamp;

import java.util.Optional;

/**
* Internal representation of {@link OffsetAndTimestamp}.
*/
public class OffsetAndTimestampInternal {
private final long timestamp;
private final long offset;
private final Optional<Integer> leaderEpoch;

public OffsetAndTimestampInternal(long offset, long timestamp, Optional<Integer> leaderEpoch) {
this.offset = offset;
this.timestamp = timestamp;
this.leaderEpoch = leaderEpoch;
}

long offset() {
return offset;
}

long timestamp() {
return timestamp;
}

Optional<Integer> leaderEpoch() {
return leaderEpoch;
}

public OffsetAndTimestamp buildOffsetAndTimestamp() {
return new OffsetAndTimestamp(offset, timestamp, leaderEpoch);
}

@Override
public int hashCode() {
int result = (int) (timestamp ^ (timestamp >>> 32));
result = 31 * result + (int) (offset ^ (offset >>> 32));
result = 31 * result + leaderEpoch.hashCode();
return result;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof OffsetAndTimestampInternal)) return false;

OffsetAndTimestampInternal that = (OffsetAndTimestampInternal) o;

if (timestamp != that.timestamp) return false;
if (offset != that.offset) return false;
return leaderEpoch.equals(that.leaderEpoch);
}

@Override
public String toString() {
return "OffsetAndTimestampInternal{" +
"timestamp=" + timestamp +
", offset=" + offset +
", leaderEpoch=" + leaderEpoch +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,24 @@ static Map<TopicPartition, OffsetAndTimestamp> buildOffsetsForTimesResult(
offsetData.leaderEpoch));
}

static Map<TopicPartition, OffsetAndTimestampInternal> buildOffsetsForTimeInternalResult(
final Map<TopicPartition, Long> timestampsToSearch,
final Map<TopicPartition, ListOffsetData> fetchedOffsets) {

HashMap<TopicPartition, OffsetAndTimestampInternal> offsetsResults = new HashMap<>(timestampsToSearch.size());
for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
offsetsResults.put(entry.getKey(), null);
}
for (Map.Entry<TopicPartition, ListOffsetData> entry : fetchedOffsets.entrySet()) {
ListOffsetData offsetData = entry.getValue();
offsetsResults.put(entry.getKey(), new OffsetAndTimestampInternal(
offsetData.offset,
offsetData.timestamp,
offsetData.leaderEpoch));
}
return offsetsResults;
}

private Long offsetResetStrategyTimestamp(final TopicPartition partition) {
OffsetResetStrategy strategy = subscriptionState.resetStrategy(partition);
if (strategy == OffsetResetStrategy.EARLIEST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,33 +149,12 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
* found .The future will complete when the requests responses are received and
* processed, following a call to {@link #poll(long)}
*/
public CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsetsForTime(
Map<TopicPartition, Long> timestampsToSearch) {
public CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsets(
Map<TopicPartition, Long> timestampsToSearch,
boolean requireTimestamps) {
if (timestampsToSearch.isEmpty()) {
return CompletableFuture.completedFuture(Collections.emptyMap());
}
ListOffsetsRequestState listOffsetsRequestState = fetchOffsets(timestampsToSearch, true);
return listOffsetsRequestState.globalResult.thenApply(
result -> OffsetFetcherUtils.buildOffsetsForTimesResult(timestampsToSearch, result.fetchedOffsets));
}

/**
* Handles the API request for getting the first or the last offsets for the given partitions.
*/
public CompletableFuture<Map<TopicPartition, Long>> beginningOrEndOffset(Map<TopicPartition, Long> timestampsToSearch) {
if (timestampsToSearch.isEmpty()) {
return CompletableFuture.completedFuture(Collections.emptyMap());
}
ListOffsetsRequestState listOffsetsRequestState = fetchOffsets(timestampsToSearch, false);
return listOffsetsRequestState.globalResult.thenApply(
result -> OffsetFetcherUtils.buildListOffsetsResult(
timestampsToSearch,
result.fetchedOffsets,
(topicPartition, offsetData) -> offsetData.offset));
}

private ListOffsetsRequestState fetchOffsets(
Map<TopicPartition, Long> timestampsToSearch, boolean requireTimestamps) {
metadata.addTransientTopics(OffsetFetcherUtils.topicsForPartitions(timestampsToSearch.keySet()));
ListOffsetsRequestState listOffsetsRequestState = new ListOffsetsRequestState(
timestampsToSearch,
Expand All @@ -194,7 +173,10 @@ private ListOffsetsRequestState fetchOffsets(
});

prepareFetchOffsetsRequests(timestampsToSearch, requireTimestamps, listOffsetsRequestState);
return listOffsetsRequestState;
return listOffsetsRequestState.globalResult.thenApply(
result -> OffsetFetcherUtils.buildOffsetsForTimeInternalResult(
timestampsToSearch,
result.fetchedOffsets));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.CachedSupplier;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
import org.apache.kafka.clients.consumer.internals.MembershipManager;
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
Expand Down Expand Up @@ -107,10 +107,6 @@ public void process(ApplicationEvent event) {
process((ListOffsetsEvent) event);
return;

case LIST_OFFSETS_FOR_TIME:
process((ListOffsetsForTimeEvent) event);
return;

case RESET_POSITIONS:
process((ResetPositionsEvent) event);
return;
Expand Down Expand Up @@ -206,17 +202,8 @@ private void process(final AssignmentChangeEvent event) {
* fetchOffsetsForTime is used, otherwise beginningOrEndOffsets is used.
*/
private void process(final ListOffsetsEvent event) {
final CompletableFuture<Map<TopicPartition, Long>> future =
requestManagers.offsetsRequestManager.beginningOrEndOffset(event.timestampsToSearch());
future.whenComplete(complete(event.future()));
}

/**
* List the offsets for the given partitions at the given timestamps.
*/
private void process(final ListOffsetsForTimeEvent event) {
final CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> future =
requestManagers.offsetsRequestManager.fetchOffsetsForTime(event.timestampsToSearch());
final CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> future =
requestManagers.offsetsRequestManager.fetchOffsets(event.timestampsToSearch(), event.requireTimestamps);
future.whenComplete(complete(event.future()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Timer;

Expand All @@ -25,28 +26,26 @@
import java.util.Map;

/**
* Application Event for retrieving partition offsets by performing a
* {@link org.apache.kafka.common.requests.ListOffsetsRequest ListOffsetsRequest}.
* Application event to list offsets for partitions by timestamps. It is completed with the map of
* {@link TopicPartition} and {@link OffsetAndTimestamp} found (offset of the first message whose timestamp is
* greater than or equals to the target timestamp)
*/
public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicPartition, Long>> {
public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestampInternal>> {
private final Map<TopicPartition, Long> timestampsToSearch;
public final boolean requireTimestamps;

public ListOffsetsEvent(final Map<TopicPartition, Long> timestampToSearch, final Timer timer) {
public ListOffsetsEvent(Map<TopicPartition, Long> timestampToSearch,
Timer timer,
boolean requireTimestamps) {
super(Type.LIST_OFFSETS, timer);
this.timestampsToSearch = Collections.unmodifiableMap(timestampToSearch);
this.requireTimestamps = requireTimestamps;
}

/**
* Build result representing that no offsets were found as part of the current event.
*
* @return Map containing all the partitions the event was trying to get offsets for, and
* null {@link OffsetAndTimestamp} as value
*/
public Map<TopicPartition, Long> emptyResult() {
HashMap<TopicPartition, Long> offsetsByTimes = new HashMap<>(timestampsToSearch.size());
for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet())
offsetsByTimes.put(entry.getKey(), null);
return offsetsByTimes;
public <T> Map<TopicPartition, T> emptyResults() {
Map<TopicPartition, T> result = new HashMap<>();
timestampsToSearch.keySet().forEach(tp -> result.put(tp, null));
return result;
}

public Map<TopicPartition, Long> timestampsToSearch() {
Expand Down

This file was deleted.

0 comments on commit 6fb9bd8

Please sign in to comment.