Skip to content

Commit

Permalink
BE: Messages: Fix first offsets retrieval for compacted topic (provec…
Browse files Browse the repository at this point in the history
…tus#406)

Co-authored-by: iliax <ilya.kuramshin@almatech.dev>
  • Loading branch information
iliax and iliax committed May 25, 2024
1 parent c06385d commit 8cf4e11
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
26 changes: 25 additions & 1 deletion api/src/main/java/io/kafbat/ui/emitter/OffsetsInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;

@Slf4j
@Getter
Expand All @@ -34,7 +36,7 @@ class OffsetsInfo {

OffsetsInfo(Consumer<?, ?> consumer, Collection<TopicPartition> targetPartitions) {
this.consumer = consumer;
this.beginOffsets = consumer.beginningOffsets(targetPartitions);
this.beginOffsets = firstOffsetsForPolling(consumer, targetPartitions);
this.endOffsets = consumer.endOffsets(targetPartitions);
endOffsets.forEach((tp, endOffset) -> {
var beginningOffset = beginOffsets.get(tp);
Expand All @@ -46,6 +48,28 @@ class OffsetsInfo {
});
}


private Map<TopicPartition, Long> firstOffsetsForPolling(Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
try {
// we try to use offsetsForTimes() to find earliest offsets, since for
// some topics (like compacted) beginningOffsets() ruturning 0 offsets
// even when effectively first offset can be very high
var offsets = consumer.offsetsForTimes(
partitions.stream().collect(Collectors.toMap(p -> p, p -> 0L))
);
// result of offsetsForTimes() can be null, if message version < 0.10.0
if (offsets.entrySet().stream().noneMatch(e -> e.getValue() == null)) {
return offsets.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
}
} catch (UnsupportedOperationException | UnsupportedVersionException e) {
// offsetsForTimes() not supported
}
//falling back to beginningOffsets() if offsetsForTimes() not supported
return consumer.beginningOffsets(partitions);
}

boolean assignedPartitionsFullyPolled() {
for (var tp : consumer.assignment()) {
Preconditions.checkArgument(endOffsets.containsKey(tp));
Expand Down
10 changes: 7 additions & 3 deletions api/src/main/java/io/kafbat/ui/emitter/RangePollingEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import io.kafbat.ui.model.ConsumerPosition;
import io.kafbat.ui.model.TopicMessageEventDTO;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -84,20 +86,22 @@ private List<ConsumerRecord<Bytes, Bytes>> poll(EnhancedConsumer consumer,
range.forEach((tp, fromTo) -> consumer.seek(tp, fromTo.from));

List<ConsumerRecord<Bytes, Bytes>> result = new ArrayList<>();
while (!sink.isCancelled() && consumer.paused().size() < range.size()) {
Set<TopicPartition> paused = new HashSet<>();
while (!sink.isCancelled() && paused.size() < range.size()) {
var polledRecords = poll(sink, consumer);
range.forEach((tp, fromTo) -> {
polledRecords.records(tp).stream()
.filter(r -> r.offset() < fromTo.to)
.forEach(result::add);

//next position is out of target range -> pausing partition
if (consumer.position(tp) >= fromTo.to) {
if (!paused.contains(tp) && consumer.position(tp) >= fromTo.to) {
paused.add(tp);
consumer.pause(List.of(tp));
}
});
}
consumer.resume(consumer.paused());
consumer.resume(paused);
return result;
}
}

0 comments on commit 8cf4e11

Please sign in to comment.