From e272d43daee33e2214d05d135005c2cbc0b7c50f Mon Sep 17 00:00:00 2001 From: Sergei Egorov Date: Wed, 3 Jul 2019 21:43:32 +0200 Subject: [PATCH] add `GetEndOffsets` API --- .../liiklus/records/FiniteRecordsStorage.java | 13 ++++ .../service/ReactorLiiklusServiceImpl.java | 19 ++++++ .../bsideup/liiklus/EndOffsetsTest.java | 57 ++++++++++++++++ .../github/bsideup/liiklus/LiiklusClient.java | 2 + .../bsideup/liiklus/RSocketLiiklusClient.java | 5 ++ .../inmemory/InMemoryRecordsStorage.java | 18 ++++- .../liiklus/kafka/KafkaRecordsStorage.java | 38 ++++++++++- protocol/src/main/proto/LiiklusService.proto | 12 ++++ .../liiklus/records/RecordStorageTests.java | 3 +- .../liiklus/records/tests/EndOffsetsTest.java | 67 +++++++++++++++++++ 10 files changed, 229 insertions(+), 5 deletions(-) create mode 100644 api/src/main/java/com/github/bsideup/liiklus/records/FiniteRecordsStorage.java create mode 100644 app/src/test/java/com/github/bsideup/liiklus/EndOffsetsTest.java create mode 100644 tck/src/main/java/com/github/bsideup/liiklus/records/tests/EndOffsetsTest.java diff --git a/api/src/main/java/com/github/bsideup/liiklus/records/FiniteRecordsStorage.java b/api/src/main/java/com/github/bsideup/liiklus/records/FiniteRecordsStorage.java new file mode 100644 index 00000000..3d44eed7 --- /dev/null +++ b/api/src/main/java/com/github/bsideup/liiklus/records/FiniteRecordsStorage.java @@ -0,0 +1,13 @@ +package com.github.bsideup.liiklus.records; + +import java.util.Map; +import java.util.concurrent.CompletionStage; + +public interface FiniteRecordsStorage extends RecordsStorage { + + /** + * Returns a {@link Map} where key is partition's number and value is the latest offset. + * The offset can be zero. Offset -1 means that there is no offset for this partition. + */ + CompletionStage> getEndOffsets(String topic); +} \ No newline at end of file diff --git a/app/src/main/java/com/github/bsideup/liiklus/service/ReactorLiiklusServiceImpl.java b/app/src/main/java/com/github/bsideup/liiklus/service/ReactorLiiklusServiceImpl.java index eb411b4f..b9b81f74 100644 --- a/app/src/main/java/com/github/bsideup/liiklus/service/ReactorLiiklusServiceImpl.java +++ b/app/src/main/java/com/github/bsideup/liiklus/service/ReactorLiiklusServiceImpl.java @@ -5,6 +5,7 @@ import com.github.bsideup.liiklus.positions.GroupId; import com.github.bsideup.liiklus.positions.PositionsStorage; import com.github.bsideup.liiklus.protocol.*; +import com.github.bsideup.liiklus.records.FiniteRecordsStorage; import com.github.bsideup.liiklus.records.RecordPostProcessor; import com.github.bsideup.liiklus.records.RecordPreProcessor; import com.github.bsideup.liiklus.records.RecordsStorage; @@ -298,6 +299,24 @@ public Mono getOffsets(Mono request) { ); } + @Override + public Mono getEndOffsets(GetEndOffsetsRequest message, ByteBuf metadata) { + return getEndOffsets(Mono.just(message)); + } + + @Override + public Mono getEndOffsets(Mono request) { + return request.flatMap(getEndOffsets -> { + if (!(recordsStorage instanceof FiniteRecordsStorage)) { + return Mono.error(Status.INTERNAL.withDescription("The record storage is not finite").asException()); + } + + var topic = getEndOffsets.getTopic(); + return Mono.fromCompletionStage(((FiniteRecordsStorage) recordsStorage).getEndOffsets(topic)) + .map(endOffsets -> GetEndOffsetsReply.newBuilder().putAllOffsets(endOffsets).build()); + }); + } + private Mono>> getLatestOffsetsOfGroup(String topic, String groupName) { return getOffsetsByGroupName(topic, groupName) .map(ackedOffsets -> ackedOffsets.values().stream() diff --git a/app/src/test/java/com/github/bsideup/liiklus/EndOffsetsTest.java b/app/src/test/java/com/github/bsideup/liiklus/EndOffsetsTest.java new file mode 100644 index 00000000..7a8a703a --- /dev/null +++ b/app/src/test/java/com/github/bsideup/liiklus/EndOffsetsTest.java @@ -0,0 +1,57 @@ +package com.github.bsideup.liiklus; + +import com.github.bsideup.liiklus.protocol.GetEndOffsetsRequest; +import com.github.bsideup.liiklus.protocol.PublishRequest; +import com.github.bsideup.liiklus.test.AbstractIntegrationTest; +import com.google.protobuf.ByteString; +import org.junit.Before; +import org.junit.Test; + +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +public class EndOffsetsTest extends AbstractIntegrationTest { + + private String topic; + + @Before + public final void setUpEndOffsetsTest() { + topic = testName.getMethodName(); + } + + @Test + public void testEndOffsets() { + var value = ByteString.copyFromUtf8("foo"); + + for (int partition = 0; partition < NUM_PARTITIONS; partition++) { + for (int i = 0; i < partition + 1; i++) { + stub.publish(PublishRequest.newBuilder() + .setTopic(topic) + .setKey(ByteString.copyFromUtf8(PARTITION_KEYS.get(partition))) + .setValue(value) + .build() + ).block(); + } + } + + var reply = stub.getEndOffsets(GetEndOffsetsRequest.newBuilder().setTopic(topic).build()).block(); + + assertThat(reply.getOffsetsMap()) + .hasSize(NUM_PARTITIONS) + .allSatisfy((partition, offset) -> { + assertThat(offset) + .as("offset of p" + partition) + .isEqualTo(partition.longValue()); + }); + } + + @Test + public void testEndOffsets_unknownTopic() { + var randomTopic = UUID.randomUUID().toString(); + var reply = stub.getEndOffsets(GetEndOffsetsRequest.newBuilder().setTopic(randomTopic).build()).block(); + + assertThat(reply.getOffsetsMap()).isEmpty(); + } + +} diff --git a/client/src/main/java/com/github/bsideup/liiklus/LiiklusClient.java b/client/src/main/java/com/github/bsideup/liiklus/LiiklusClient.java index 5ca6d5d5..8814773b 100644 --- a/client/src/main/java/com/github/bsideup/liiklus/LiiklusClient.java +++ b/client/src/main/java/com/github/bsideup/liiklus/LiiklusClient.java @@ -16,4 +16,6 @@ public interface LiiklusClient { Mono ack(AckRequest message); Mono getOffsets(GetOffsetsRequest message); + + Mono getEndOffsets(GetEndOffsetsRequest message); } diff --git a/client/src/main/java/com/github/bsideup/liiklus/RSocketLiiklusClient.java b/client/src/main/java/com/github/bsideup/liiklus/RSocketLiiklusClient.java index a835ac6d..c7c15730 100644 --- a/client/src/main/java/com/github/bsideup/liiklus/RSocketLiiklusClient.java +++ b/client/src/main/java/com/github/bsideup/liiklus/RSocketLiiklusClient.java @@ -43,4 +43,9 @@ public Mono ack(AckRequest message) { public Mono getOffsets(GetOffsetsRequest message) { return liiklusServiceClient.getOffsets(message); } + + @Override + public Mono getEndOffsets(GetEndOffsetsRequest message) { + return liiklusServiceClient.getEndOffsets(message); + } } diff --git a/plugins/inmemory-records-storage/src/main/java/com/github/bsideup/liiklus/records/inmemory/InMemoryRecordsStorage.java b/plugins/inmemory-records-storage/src/main/java/com/github/bsideup/liiklus/records/inmemory/InMemoryRecordsStorage.java index cf6c9f40..484d0320 100644 --- a/plugins/inmemory-records-storage/src/main/java/com/github/bsideup/liiklus/records/inmemory/InMemoryRecordsStorage.java +++ b/plugins/inmemory-records-storage/src/main/java/com/github/bsideup/liiklus/records/inmemory/InMemoryRecordsStorage.java @@ -1,6 +1,6 @@ package com.github.bsideup.liiklus.records.inmemory; -import com.github.bsideup.liiklus.records.RecordsStorage; +import com.github.bsideup.liiklus.records.FiniteRecordsStorage; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; import lombok.Value; @@ -32,7 +32,7 @@ */ @RequiredArgsConstructor @FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) -public class InMemoryRecordsStorage implements RecordsStorage { +public class InMemoryRecordsStorage implements FiniteRecordsStorage { public static int partitionByKey(String key, int numberOfPartitions) { return partitionByKey(ByteBuffer.wrap(key.getBytes()), numberOfPartitions); @@ -71,6 +71,20 @@ public CompletionStage publish(Envelope envelope) { )); } + @Override + public CompletionStage> getEndOffsets(String topic) { + var partitions = state.getOrDefault(topic, new StoredTopic(numberOfPartitions)).getPartitions(); + return CompletableFuture.completedFuture( + partitions.entrySet().stream().collect(Collectors.toMap( + Map.Entry::getKey, + it -> Math.max( + 0, + it.getValue().getNextOffset().get() - 1 + ) + )) + ); + } + @Override public Subscription subscribe(String topic, String groupName, Optional autoOffsetReset) { var storedTopic = state.computeIfAbsent(topic, __ -> new StoredTopic(numberOfPartitions)); diff --git a/plugins/kafka-records-storage/src/main/java/com/github/bsideup/liiklus/kafka/KafkaRecordsStorage.java b/plugins/kafka-records-storage/src/main/java/com/github/bsideup/liiklus/kafka/KafkaRecordsStorage.java index e47cc57a..2d32b0a1 100644 --- a/plugins/kafka-records-storage/src/main/java/com/github/bsideup/liiklus/kafka/KafkaRecordsStorage.java +++ b/plugins/kafka-records-storage/src/main/java/com/github/bsideup/liiklus/kafka/KafkaRecordsStorage.java @@ -1,6 +1,6 @@ package com.github.bsideup.liiklus.kafka; -import com.github.bsideup.liiklus.records.RecordsStorage; +import com.github.bsideup.liiklus.records.FiniteRecordsStorage; import lombok.SneakyThrows; import lombok.Value; import lombok.experimental.FieldDefaults; @@ -33,11 +33,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.Stream; @FieldDefaults(makeFinal = true) @Slf4j -public class KafkaRecordsStorage implements RecordsStorage { +public class KafkaRecordsStorage implements FiniteRecordsStorage { private static final Scheduler KAFKA_POLL_SCHEDULER = Schedulers.elastic(); @@ -60,6 +61,39 @@ public KafkaRecordsStorage(String bootstrapServers) { ); } + @Override + public CompletionStage> getEndOffsets(String topic) { + return Mono.fromCallable(() -> { + var properties = new HashMap(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "0"); + properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1"); + + try ( + var consumer = new KafkaConsumer( + properties, + new ByteBufferDeserializer(), + new ByteBufferDeserializer() + ) + ) { + consumer.subscribe(List.of(topic)); + + var endOffsets = consumer.endOffsets( + consumer.partitionsFor(topic).stream() + .map(it -> new TopicPartition(topic, it.partition())) + .collect(Collectors.toSet()) + ); + + return endOffsets.entrySet().stream().collect(Collectors.toMap( + it -> it.getKey().partition(), + it -> it.getValue() - 1 + )); + } + }).subscribeOn(Schedulers.elastic()).toFuture(); + } + @Override public CompletionStage publish(Envelope envelope) { String topic = envelope.getTopic(); diff --git a/protocol/src/main/proto/LiiklusService.proto b/protocol/src/main/proto/LiiklusService.proto index d802d3b9..c693c204 100644 --- a/protocol/src/main/proto/LiiklusService.proto +++ b/protocol/src/main/proto/LiiklusService.proto @@ -28,6 +28,10 @@ service LiiklusService { rpc GetOffsets(GetOffsetsRequest) returns (GetOffsetsReply) { } + + rpc GetEndOffsets(GetEndOffsetsRequest) returns (GetEndOffsetsReply) { + + } } message PublishRequest { @@ -122,4 +126,12 @@ message GetOffsetsRequest { message GetOffsetsReply { map offsets = 1; +} + +message GetEndOffsetsRequest { + string topic = 1; +} + +message GetEndOffsetsReply { + map offsets = 1; } \ No newline at end of file diff --git a/tck/src/main/java/com/github/bsideup/liiklus/records/RecordStorageTests.java b/tck/src/main/java/com/github/bsideup/liiklus/records/RecordStorageTests.java index ee3fc329..11624882 100644 --- a/tck/src/main/java/com/github/bsideup/liiklus/records/RecordStorageTests.java +++ b/tck/src/main/java/com/github/bsideup/liiklus/records/RecordStorageTests.java @@ -2,8 +2,9 @@ import com.github.bsideup.liiklus.records.tests.BackPressureTest; import com.github.bsideup.liiklus.records.tests.ConsumerGroupTest; +import com.github.bsideup.liiklus.records.tests.EndOffsetsTest; import com.github.bsideup.liiklus.records.tests.PublishTest; import com.github.bsideup.liiklus.records.tests.SubscribeTest; -public interface RecordStorageTests extends PublishTest, SubscribeTest, ConsumerGroupTest, BackPressureTest { +public interface RecordStorageTests extends PublishTest, SubscribeTest, ConsumerGroupTest, BackPressureTest, EndOffsetsTest { } diff --git a/tck/src/main/java/com/github/bsideup/liiklus/records/tests/EndOffsetsTest.java b/tck/src/main/java/com/github/bsideup/liiklus/records/tests/EndOffsetsTest.java new file mode 100644 index 00000000..62acb4d4 --- /dev/null +++ b/tck/src/main/java/com/github/bsideup/liiklus/records/tests/EndOffsetsTest.java @@ -0,0 +1,67 @@ +package com.github.bsideup.liiklus.records.tests; + +import com.github.bsideup.liiklus.records.FiniteRecordsStorage; +import com.github.bsideup.liiklus.records.RecordStorageTestSupport; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +import java.lang.reflect.Method; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +public interface EndOffsetsTest extends RecordStorageTestSupport { + + default FiniteRecordsStorage getFiniteTarget() { + return (FiniteRecordsStorage) getTarget(); + } + + int getNumberOfPartitions(); + + String keyByPartition(int partition); + + @BeforeEach + default void blah(TestInfo testInfo) { + if (EndOffsetsTest.class == testInfo.getTestMethod().map(Method::getDeclaringClass).orElse(null)) { + Assumptions.assumeTrue(getTarget() instanceof FiniteRecordsStorage, "target is finite"); + } + } + + @Test + default void testEndOffsets() throws Exception { + var topic = getTopic(); + + for (int partition = 0; partition < getNumberOfPartitions(); partition++) { + for (int i = 0; i < partition + 1; i++) { + publish(keyByPartition(partition).getBytes(), new byte[1]); + } + } + + var offsets = getFiniteTarget().getEndOffsets(topic).toCompletableFuture().get(10, TimeUnit.SECONDS); + + assertThat(offsets) + .hasSize(getNumberOfPartitions()) + .allSatisfy((partition, offset) -> { + assertThat(offset) + .as("offset of p" + partition) + .isEqualTo(partition.longValue()); + }); + } + + @Test + default void testEndOffsets_unknownTopic() throws Exception { + var topic = UUID.randomUUID().toString(); + + var offsets = getFiniteTarget().getEndOffsets(topic).toCompletableFuture().get(10, TimeUnit.SECONDS); + + assertThat(offsets) + .allSatisfy((partition, offset) -> { + assertThat(offset) + .as("offset of p" + partition) + .isEqualTo(-1L); + }); + } +}