-
Notifications
You must be signed in to change notification settings - Fork 34
/
Consumer.java
105 lines (90 loc) · 4.7 KB
/
Consumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package com.example;
import com.github.bsideup.liiklus.protocol.*;
import com.github.bsideup.liiklus.protocol.SubscribeRequest.AutoOffsetReset;
import com.google.protobuf.ByteString;
import io.grpc.netty.NettyChannelBuilder;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.reactivestreams.Publisher;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Stream;
@Slf4j
public class Consumer {
public static void main(String[] args) {
// This variable should point to your Liiklus deployment (possible behind a Load Balancer)
String liiklusTarget = getLiiklusTarget();
val channel = NettyChannelBuilder.forTarget(liiklusTarget)
.directExecutor()
.usePlaintext(true)
.build();
val subscribeAction = SubscribeRequest.newBuilder()
.setTopic("events-topic")
.setGroup("my-group")
.setAutoOffsetReset(AutoOffsetReset.EARLIEST)
.build();
val stub = ReactorLiiklusServiceGrpc.newReactorStub(channel);
// Send an event every second
Flux.interval(Duration.ofSeconds(1))
.onBackpressureDrop()
.concatMap(it -> stub.publish(Mono.just(
PublishRequest.newBuilder()
.setTopic(subscribeAction.getTopic())
.setKey(ByteString.copyFromUtf8(UUID.randomUUID().toString()))
.setValue(ByteString.copyFromUtf8(UUID.randomUUID().toString()))
.build()
)))
.subscribe();
// Consume the events
Function<Integer, Function<ReceiveReply.Record, Publisher<?>>> businessLogic = partition -> record -> {
log.info("Processing record from partition {} offset {}", partition, record.getOffset());
// simulate processing
return Mono.delay(Duration.ofMillis(200));
};
stub
.subscribe(Mono.just(subscribeAction))
.filter(it -> it.getReplyCase() == SubscribeReply.ReplyCase.ASSIGNMENT)
.map(SubscribeReply::getAssignment)
.doOnNext(assignment -> log.info("Assigned to partition {}", assignment.getPartition()))
.flatMap(assignment -> stub
// Start receiving the events from a partition
.receive(Mono.just(ReceiveRequest.newBuilder().setAssignment(assignment).build()))
.window(1000) // ACK every 1000th record
.concatMap(
batch -> batch
.map(ReceiveReply::getRecord)
.delayUntil(businessLogic.apply(assignment.getPartition()))
.sample(Duration.ofSeconds(5)) // ACK every 5 seconds
.onBackpressureLatest()
.delayUntil(record -> {
log.info("ACKing partition {} offset {}", assignment.getPartition(), record.getOffset());
return stub.ack(Mono.just(
AckRequest.newBuilder()
.setAssignment(assignment)
.setOffset(record.getOffset())
.build()
));
}),
1
)
)
.blockLast();
}
private static String getLiiklusTarget() {
val kafka = new KafkaContainer()
.withEnv("KAFKA_NUM_PARTITIONS", "4");
GenericContainer liiklus = new GenericContainer<>("bsideup/liiklus:0.1.8")
.withNetwork(kafka.getNetwork())
.withExposedPorts(6565)
.withEnv("kafka_bootstrapServers", kafka.getNetworkAliases().get(0) + ":9093")
.withEnv("storage_positions_type", "MEMORY"); // Fine for testing, NOT FINE I WARNED YOU for production :D
Stream.of(kafka, liiklus).parallel().forEach(GenericContainer::start);
log.info("Containers started");
return String.format("%s:%d", liiklus.getContainerIpAddress(), liiklus.getFirstMappedPort());
}
}