Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add GetEndOffsets API #129

Merged
merged 3 commits into from
Jul 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.github.bsideup.liiklus.records;

import java.util.Map;
import java.util.concurrent.CompletionStage;

public interface FiniteRecordsStorage extends RecordsStorage {

/**
* Returns a {@link Map} where key is partition's number and value is the latest offset.
* The offset can be zero. Offset -1 means that there is no offset for this partition.
*/
CompletionStage<Map<Integer, Long>> getEndOffsets(String topic);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.github.bsideup.liiklus.positions.GroupId;
import com.github.bsideup.liiklus.positions.PositionsStorage;
import com.github.bsideup.liiklus.protocol.*;
import com.github.bsideup.liiklus.records.FiniteRecordsStorage;
import com.github.bsideup.liiklus.records.RecordPostProcessor;
import com.github.bsideup.liiklus.records.RecordPreProcessor;
import com.github.bsideup.liiklus.records.RecordsStorage;
Expand Down Expand Up @@ -302,6 +303,24 @@ public Mono<GetOffsetsReply> getOffsets(Mono<GetOffsetsRequest> request) {
);
}

@Override
public Mono<GetEndOffsetsReply> getEndOffsets(GetEndOffsetsRequest message, ByteBuf metadata) {
return getEndOffsets(Mono.just(message));
}

@Override
public Mono<GetEndOffsetsReply> getEndOffsets(Mono<GetEndOffsetsRequest> request) {
return request.flatMap(getEndOffsets -> {
if (!(recordsStorage instanceof FiniteRecordsStorage)) {
return Mono.error(Status.INTERNAL.withDescription("The record storage is not finite").asException());
}

var topic = getEndOffsets.getTopic();
return Mono.fromCompletionStage(((FiniteRecordsStorage) recordsStorage).getEndOffsets(topic))
.map(endOffsets -> GetEndOffsetsReply.newBuilder().putAllOffsets(endOffsets).build());
});
}

private Mono<Map<Integer, Optional<Long>>> getLatestOffsetsOfGroup(String topic, String groupName) {
return getOffsetsByGroupName(topic, groupName)
.map(ackedOffsets -> ackedOffsets.values().stream()
Expand Down
57 changes: 57 additions & 0 deletions app/src/test/java/com/github/bsideup/liiklus/EndOffsetsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.github.bsideup.liiklus;

import com.github.bsideup.liiklus.protocol.GetEndOffsetsRequest;
import com.github.bsideup.liiklus.protocol.PublishRequest;
import com.github.bsideup.liiklus.test.AbstractIntegrationTest;
import com.google.protobuf.ByteString;
import org.junit.Before;
import org.junit.Test;

import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

public class EndOffsetsTest extends AbstractIntegrationTest {

private String topic;

@Before
public final void setUpEndOffsetsTest() {
topic = testName.getMethodName();
}

@Test
public void testEndOffsets() {
var value = ByteString.copyFromUtf8("foo");

for (int partition = 0; partition < NUM_PARTITIONS; partition++) {
for (int i = 0; i < partition + 1; i++) {
stub.publish(PublishRequest.newBuilder()
.setTopic(topic)
.setKey(ByteString.copyFromUtf8(PARTITION_KEYS.get(partition)))
.setValue(value)
.build()
).block();
}
}

var reply = stub.getEndOffsets(GetEndOffsetsRequest.newBuilder().setTopic(topic).build()).block();

assertThat(reply.getOffsetsMap())
.hasSize(NUM_PARTITIONS)
.allSatisfy((partition, offset) -> {
assertThat(offset)
.as("offset of p" + partition)
.isEqualTo(partition.longValue());
});
}

@Test
public void testEndOffsets_unknownTopic() {
var randomTopic = UUID.randomUUID().toString();
var reply = stub.getEndOffsets(GetEndOffsetsRequest.newBuilder().setTopic(randomTopic).build()).block();

assertThat(reply.getOffsetsMap()).isEmpty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ public interface LiiklusClient {
Mono<Empty> ack(AckRequest message);

Mono<GetOffsetsReply> getOffsets(GetOffsetsRequest message);

Mono<GetEndOffsetsReply> getEndOffsets(GetEndOffsetsRequest message);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,9 @@ public Mono<Empty> ack(AckRequest message) {
public Mono<GetOffsetsReply> getOffsets(GetOffsetsRequest message) {
return liiklusServiceClient.getOffsets(message);
}

@Override
public Mono<GetEndOffsetsReply> getEndOffsets(GetEndOffsetsRequest message) {
return liiklusServiceClient.getEndOffsets(message);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.github.bsideup.liiklus.records.inmemory;

import com.github.bsideup.liiklus.records.RecordsStorage;
import com.github.bsideup.liiklus.records.FiniteRecordsStorage;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.Value;
Expand Down Expand Up @@ -33,7 +33,7 @@
*/
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
public class InMemoryRecordsStorage implements RecordsStorage {
public class InMemoryRecordsStorage implements FiniteRecordsStorage {

public static int partitionByKey(String key, int numberOfPartitions) {
return partitionByKey(ByteBuffer.wrap(key.getBytes()), numberOfPartitions);
Expand Down Expand Up @@ -74,6 +74,20 @@ public CompletionStage<OffsetInfo> publish(Envelope envelope) {
));
}

@Override
public CompletionStage<Map<Integer, Long>> getEndOffsets(String topic) {
var partitions = state.getOrDefault(topic, new StoredTopic(numberOfPartitions)).getPartitions();
return CompletableFuture.completedFuture(
partitions.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey,
it -> Math.max(
0,
it.getValue().getNextOffset().get() - 1
)
))
);
}

@Override
public Subscription subscribe(String topic, String groupName, Optional<String> autoOffsetReset) {
var storedTopic = state.computeIfAbsent(topic, __ -> new StoredTopic(numberOfPartitions));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.github.bsideup.liiklus.kafka;

import com.github.bsideup.liiklus.records.RecordsStorage;
import com.github.bsideup.liiklus.records.FiniteRecordsStorage;
import lombok.SneakyThrows;
import lombok.Value;
import lombok.experimental.FieldDefaults;
Expand Down Expand Up @@ -33,11 +33,12 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@FieldDefaults(makeFinal = true)
@Slf4j
public class KafkaRecordsStorage implements RecordsStorage {
public class KafkaRecordsStorage implements FiniteRecordsStorage {

private static final Scheduler KAFKA_POLL_SCHEDULER = Schedulers.elastic();

Expand All @@ -60,6 +61,39 @@ public KafkaRecordsStorage(String bootstrapServers) {
);
}

@Override
public CompletionStage<Map<Integer, Long>> getEndOffsets(String topic) {
return Mono.fromCallable(() -> {
var properties = new HashMap<String, Object>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "0");
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");

try (
var consumer = new KafkaConsumer<ByteBuffer, ByteBuffer>(
properties,
new ByteBufferDeserializer(),
new ByteBufferDeserializer()
)
) {
consumer.subscribe(List.of(topic));

var endOffsets = consumer.endOffsets(
consumer.partitionsFor(topic).stream()
.map(it -> new TopicPartition(topic, it.partition()))
.collect(Collectors.toSet())
);

return endOffsets.entrySet().stream().collect(Collectors.toMap(
it -> it.getKey().partition(),
it -> it.getValue() - 1
));
}
}).subscribeOn(Schedulers.elastic()).toFuture();
}

@Override
public CompletionStage<OffsetInfo> publish(Envelope envelope) {
String topic = envelope.getTopic();
Expand Down
12 changes: 12 additions & 0 deletions protocol/src/main/proto/LiiklusService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ service LiiklusService {
rpc GetOffsets(GetOffsetsRequest) returns (GetOffsetsReply) {

}

rpc GetEndOffsets(GetEndOffsetsRequest) returns (GetEndOffsetsReply) {

}
}

message PublishRequest {
Expand Down Expand Up @@ -122,4 +126,12 @@ message GetOffsetsRequest {

message GetOffsetsReply {
map<uint32, uint64> offsets = 1;
}

message GetEndOffsetsRequest {
string topic = 1;
}

message GetEndOffsetsReply {
map<uint32, uint64> offsets = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

import com.github.bsideup.liiklus.records.tests.BackPressureTest;
import com.github.bsideup.liiklus.records.tests.ConsumerGroupTest;
import com.github.bsideup.liiklus.records.tests.EndOffsetsTest;
import com.github.bsideup.liiklus.records.tests.PublishTest;
import com.github.bsideup.liiklus.records.tests.SubscribeTest;

public interface RecordStorageTests extends PublishTest, SubscribeTest, ConsumerGroupTest, BackPressureTest {
public interface RecordStorageTests extends PublishTest, SubscribeTest, ConsumerGroupTest, BackPressureTest, EndOffsetsTest {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.github.bsideup.liiklus.records.tests;

import com.github.bsideup.liiklus.records.FiniteRecordsStorage;
import com.github.bsideup.liiklus.records.RecordStorageTestSupport;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;

import java.lang.reflect.Method;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

public interface EndOffsetsTest extends RecordStorageTestSupport {

default FiniteRecordsStorage getFiniteTarget() {
return (FiniteRecordsStorage) getTarget();
}

int getNumberOfPartitions();

String keyByPartition(int partition);

@BeforeEach
default void blah(TestInfo testInfo) {
if (EndOffsetsTest.class == testInfo.getTestMethod().map(Method::getDeclaringClass).orElse(null)) {
Assumptions.assumeTrue(getTarget() instanceof FiniteRecordsStorage, "target is finite");
}
}

@Test
default void testEndOffsets() throws Exception {
var topic = getTopic();

for (int partition = 0; partition < getNumberOfPartitions(); partition++) {
for (int i = 0; i < partition + 1; i++) {
publish(keyByPartition(partition).getBytes(), new byte[1]);
}
}

var offsets = getFiniteTarget().getEndOffsets(topic).toCompletableFuture().get(10, TimeUnit.SECONDS);

assertThat(offsets)
.hasSize(getNumberOfPartitions())
.allSatisfy((partition, offset) -> {
assertThat(offset)
.as("offset of p" + partition)
.isEqualTo(partition.longValue());
});
}

@Test
default void testEndOffsets_unknownTopic() throws Exception {
var topic = UUID.randomUUID().toString();

var offsets = getFiniteTarget().getEndOffsets(topic).toCompletableFuture().get(10, TimeUnit.SECONDS);

assertThat(offsets)
.allSatisfy((partition, offset) -> {
assertThat(offset)
.as("offset of p" + partition)
.isEqualTo(-1L);
});
}
}