Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support gateway start from partitioned topics offsets #588

Merged
merged 1 commit into from
Oct 16, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import ai.langstream.api.runtime.Topic;
import ai.langstream.pulsar.PulsarClientUtils;
import ai.langstream.pulsar.PulsarTopic;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
Expand Down Expand Up @@ -68,6 +69,8 @@

@Slf4j
public class PulsarTopicConnectionsRuntimeProvider implements TopicConnectionsRuntimeProvider {
private static final ObjectMapper mapper = new ObjectMapper();

@Override
public boolean supports(String streamingClusterType) {
return "pulsar".equals(streamingClusterType);
Expand Down Expand Up @@ -102,14 +105,6 @@ public TopicReader createReader(
Map<String, Object> configuration,
TopicOffsetPosition initialPosition) {
Map<String, Object> copy = new HashMap<>(configuration);
switch (initialPosition.position()) {
case Earliest -> copy.put(
"subscriptionInitialPosition", SubscriptionInitialPosition.Earliest);
case Latest -> copy.put(
"subscriptionInitialPosition", SubscriptionInitialPosition.Latest);
default -> throw new IllegalArgumentException(
"Unsupported initial position: " + initialPosition.position());
}
return new PulsarTopicReader(copy, initialPosition);
}

Expand Down Expand Up @@ -361,6 +356,8 @@ private class PulsarTopicReader implements TopicReader {
private final Map<String, Object> configuration;
private final MessageId startMessageId;

private Map<String, byte[]> topicMessageIds = new HashMap<>();

private Reader<GenericRecord> reader;

private PulsarTopicReader(
Expand All @@ -369,15 +366,17 @@ private PulsarTopicReader(
this.startMessageId =
switch (initialPosition.position()) {
case Earliest -> MessageId.earliest;
case Latest -> MessageId.latest;
case Absolute -> {
try {
yield MessageId.fromByteArray(initialPosition.offset());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
case Latest, Absolute -> MessageId.latest;
};
if (initialPosition.position() == TopicOffsetPosition.Position.Absolute) {
try {
this.topicMessageIds =
mapper.readerForMapOf(byte[].class)
.readValue(initialPosition.offset());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

@Override
Expand All @@ -389,6 +388,18 @@ public void start() throws Exception {
.startMessageId(this.startMessageId)
.loadConf(configuration)
.create();

reader.seek(
topicPartition -> {
try {
String topicName = TopicName.get(topicPartition).toString();
return MessageId.fromByteArray(
topicMessageIds.getOrDefault(
topicName, startMessageId.toByteArray()));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

@Override
Expand All @@ -415,7 +426,9 @@ public TopicReadResult read() throws Exception {
final Object finalValue = value;
log.info("Received message: {}", receive);
records = List.of(new PulsarConsumerRecord(finalKey, finalValue, receive));
offset = receive.getMessageId().toByteArray();
topicMessageIds.put(
receive.getTopicName(), receive.getMessageId().toByteArray());
offset = mapper.writeValueAsBytes(topicMessageIds);
} else {
records = List.of();
offset = null;
Expand Down
Loading