Skip to content

Commit

Permalink
Support gateway start from partitioned topics offsets (#588)
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet authored Oct 16, 2023
1 parent e3b1567 commit 62bc303
Showing 1 changed file with 30 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import ai.langstream.api.runtime.Topic;
import ai.langstream.pulsar.PulsarClientUtils;
import ai.langstream.pulsar.PulsarTopic;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
Expand Down Expand Up @@ -68,6 +69,8 @@

@Slf4j
public class PulsarTopicConnectionsRuntimeProvider implements TopicConnectionsRuntimeProvider {
private static final ObjectMapper mapper = new ObjectMapper();

@Override
public boolean supports(String streamingClusterType) {
return "pulsar".equals(streamingClusterType);
Expand Down Expand Up @@ -102,14 +105,6 @@ public TopicReader createReader(
Map<String, Object> configuration,
TopicOffsetPosition initialPosition) {
Map<String, Object> copy = new HashMap<>(configuration);
switch (initialPosition.position()) {
case Earliest -> copy.put(
"subscriptionInitialPosition", SubscriptionInitialPosition.Earliest);
case Latest -> copy.put(
"subscriptionInitialPosition", SubscriptionInitialPosition.Latest);
default -> throw new IllegalArgumentException(
"Unsupported initial position: " + initialPosition.position());
}
return new PulsarTopicReader(copy, initialPosition);
}

Expand Down Expand Up @@ -361,6 +356,8 @@ private class PulsarTopicReader implements TopicReader {
private final Map<String, Object> configuration;
private final MessageId startMessageId;

private Map<String, byte[]> topicMessageIds = new HashMap<>();

private Reader<GenericRecord> reader;

private PulsarTopicReader(
Expand All @@ -369,15 +366,17 @@ private PulsarTopicReader(
this.startMessageId =
switch (initialPosition.position()) {
case Earliest -> MessageId.earliest;
case Latest -> MessageId.latest;
case Absolute -> {
try {
yield MessageId.fromByteArray(initialPosition.offset());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
case Latest, Absolute -> MessageId.latest;
};
if (initialPosition.position() == TopicOffsetPosition.Position.Absolute) {
try {
this.topicMessageIds =
mapper.readerForMapOf(byte[].class)
.readValue(initialPosition.offset());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

@Override
Expand All @@ -389,6 +388,18 @@ public void start() throws Exception {
.startMessageId(this.startMessageId)
.loadConf(configuration)
.create();

reader.seek(
topicPartition -> {
try {
String topicName = TopicName.get(topicPartition).toString();
return MessageId.fromByteArray(
topicMessageIds.getOrDefault(
topicName, startMessageId.toByteArray()));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

@Override
Expand All @@ -415,7 +426,9 @@ public TopicReadResult read() throws Exception {
final Object finalValue = value;
log.info("Received message: {}", receive);
records = List.of(new PulsarConsumerRecord(finalKey, finalValue, receive));
offset = receive.getMessageId().toByteArray();
topicMessageIds.put(
receive.getTopicName(), receive.getMessageId().toByteArray());
offset = mapper.writeValueAsBytes(topicMessageIds);
} else {
records = List.of();
offset = null;
Expand Down

0 comments on commit 62bc303

Please sign in to comment.