Skip to content

Commit

Permalink
Update based on comments
Browse files Browse the repository at this point in the history
wip
  • Loading branch information
philipnee committed Mar 25, 2024
1 parent e2ca63c commit bfadb18
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsForTimeEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
Expand Down Expand Up @@ -1093,9 +1094,8 @@ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartitio
return Collections.emptyMap();
}
final Timer timer = time.timer(timeout);
final ListOffsetsEvent<OffsetAndTimestamp> listOffsetsEvent = new ListOffsetsEvent<>(
final ListOffsetsForTimeEvent listOffsetsEvent = new ListOffsetsForTimeEvent(
timestampsToSearch,
true,
timer);

// If timeout is set to zero return empty immediately; otherwise try to get the results
Expand Down Expand Up @@ -1145,9 +1145,8 @@ private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition
.stream()
.collect(Collectors.toMap(Function.identity(), tp -> timestamp));
Timer timer = time.timer(timeout);
ListOffsetsEvent<Long> listOffsetsEvent = new ListOffsetsEvent<>(
ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
timestampToSearch,
false,
timer);
Map<TopicPartition, Long> offsetAndTimestampMap = applicationEventHandler.addAndGet(
listOffsetsEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,8 @@ public int memberEpoch() {
public void onHeartbeatSuccess(ConsumerGroupHeartbeatResponseData response) {
if (response.errorCode() != Errors.NONE.code()) {
String errorMessage = String.format(
"Unexpected error in Heartbeat response. Expected no error, but received: %s",
Errors.forCode(response.errorCode())
"Unexpected error in Heartbeat response. Expected no error, but received: %s",
Errors.forCode(response.errorCode())
);
throw new IllegalArgumentException(errorMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public abstract class ApplicationEvent {

public enum Type {
COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE,
LIST_OFFSETS, LIST_OFFSETS_FOR_TIME, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, ALL_TOPICS_METADATA,
SUBSCRIPTION_CHANGE,
UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
COMMIT_ON_CLOSE, LEAVE_ON_CLOSE
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ public void process(ApplicationEvent event) {
process((ListOffsetsEvent) event);
return;

case LIST_OFFSETS_FOR_TIME:
process((ListOffsetsForTimeEvent) event);
return;

case RESET_POSITIONS:
process((ResetPositionsEvent) event);
return;
Expand Down Expand Up @@ -202,17 +206,20 @@ private void process(final AssignmentChangeEvent event) {
* fetchOffsetsForTime is used, otherwise beginningOrEndOffsets is used.
*/

@SuppressWarnings("unchecked")
private void process(final ListOffsetsEvent event) {
if (event.requireTimestamps()) {
final CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> future =
requestManagers.offsetsRequestManager.fetchOffsetsForTime(event.timestampsToSearch());
future.whenComplete(complete(event.future()));
} else {
final CompletableFuture<Map<TopicPartition, Long>> future =
requestManagers.offsetsRequestManager.beginningOrEndOffset(event.timestampsToSearch());
future.whenComplete(complete(event.future()));
}
final CompletableFuture<Map<TopicPartition, Long>> future =
requestManagers.offsetsRequestManager.beginningOrEndOffset(event.timestampsToSearch());
future.whenComplete(complete(event.future()));
}

/**
* List the offsets for the given partitions at the given timestamps.
*/

private void process(final ListOffsetsForTimeEvent event) {
final CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> future =
requestManagers.offsetsRequestManager.fetchOffsetsForTime(event.timestampsToSearch());
future.whenComplete(complete(event.future()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,15 @@
import java.util.Map;

/**
* Event for retrieving partition offsets by performing a
* Application Event for retrieving partition offsets by performing a
* {@link org.apache.kafka.common.requests.ListOffsetsRequest ListOffsetsRequest}.
* This event is created with a map of {@link TopicPartition} and target timestamps to search
* offsets for. It is completed with the map of {@link TopicPartition} and
* {@link OffsetAndTimestamp} found (offset of the first message whose timestamp is greater than
* or equals to the target timestamp)
*/
public class ListOffsetsEvent<T> extends CompletableApplicationEvent<Map<TopicPartition, T>> {
public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicPartition, Long>> {
private final Map<TopicPartition, Long> timestampsToSearch;
private final boolean requireTimestamps;

public ListOffsetsEvent(final Map<TopicPartition, Long> timestampToSearch, final boolean requireTimestamps, final Timer timer) {
public ListOffsetsEvent(final Map<TopicPartition, Long> timestampToSearch, final Timer timer) {
super(Type.LIST_OFFSETS, timer);
this.timestampsToSearch = Collections.unmodifiableMap(timestampToSearch);
this.requireTimestamps = requireTimestamps;
}

/**
Expand All @@ -59,15 +53,10 @@ public Map<TopicPartition, Long> timestampsToSearch() {
return timestampsToSearch;
}

public boolean requireTimestamps() {
return requireTimestamps;
}

@Override
public String toStringBase() {
return super.toStringBase() +
", timestampsToSearch=" + timestampsToSearch +
", requireTimestamps=" + requireTimestamps;
", timestampsToSearch=" + timestampsToSearch;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Timer;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* Application event to list offsets for partitions by timestamps. It is completed with the map of
* {@link TopicPartition} and {@link OffsetAndTimestamp} found (offset of the first message whose timestamp is
* greater than or equals to the target timestamp)
*/
public class ListOffsetsForTimeEvent extends CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestamp>> {
private final Map<TopicPartition, Long> timestampsToSearch;

public ListOffsetsForTimeEvent(final Map<TopicPartition, Long> timestampToSearch, final Timer timer) {
super(Type.LIST_OFFSETS_FOR_TIME, timer);
this.timestampsToSearch = Collections.unmodifiableMap(timestampToSearch);
}

/**
* Build result representing that no offsets were found as part of the current event.
*
* @return Map containing all the partitions the event was trying to get offsets for, and
* null {@link OffsetAndTimestamp} as value
*/
public Map<TopicPartition, OffsetAndTimestamp> emptyResult() {
HashMap<TopicPartition, OffsetAndTimestamp> offsetsByTimes = new HashMap<>(timestampsToSearch.size());
for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet())
offsetsByTimes.put(entry.getKey(), null);
return offsetsByTimes;
}

public Map<TopicPartition, Long> timestampsToSearch() {
return timestampsToSearch;
}

@Override
public String toStringBase() {
return super.toStringBase() +
", timestampsToSearch=" + timestampsToSearch;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsForTimeEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
Expand Down Expand Up @@ -916,7 +917,7 @@ public void testOffsetsForTimes() {
Map<TopicPartition, OffsetAndTimestamp> result =
assertDoesNotThrow(() -> consumer.offsetsForTimes(timestampToSearch, Duration.ofMillis(1)));
assertEquals(expectedResult, result);
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class),
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsForTimeEvent.class),
ArgumentMatchers.isA(Timer.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsForTimeEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
Expand Down Expand Up @@ -170,11 +170,22 @@ public void testSyncCommitEvent() {
verify(applicationEventProcessor).process(any(SyncCommitEvent.class));
}

@Test
public void testListOffsetsForTimeEventIsProcessed() {
Map<TopicPartition, Long> timestamps = Collections.singletonMap(new TopicPartition("topic1", 1), 5L);
Timer timer = time.timer(100);
ApplicationEvent e = new ListOffsetsForTimeEvent(timestamps, timer);
applicationEventsQueue.add(e);
consumerNetworkThread.runOnce();
verify(applicationEventProcessor).process(any(ListOffsetsForTimeEvent.class));
assertTrue(applicationEventsQueue.isEmpty());
}

@Test
public void testListOffsetsEventIsProcessed() {
Map<TopicPartition, Long> timestamps = Collections.singletonMap(new TopicPartition("topic1", 1), 5L);
Timer timer = time.timer(100);
ApplicationEvent e = new ListOffsetsEvent<OffsetAndTimestamp>(timestamps, true, timer);
ApplicationEvent e = new ListOffsetsEvent(timestamps, timer);
applicationEventsQueue.add(e);
consumerNetworkThread.runOnce();
verify(applicationEventProcessor).process(any(ListOffsetsEvent.class));
Expand Down

0 comments on commit bfadb18

Please sign in to comment.