Skip to content

Commit

Permalink
Add possibility to set retention policies in Pulsar (#624)
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Oct 19, 2023
1 parent 092046a commit d483fc5
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ protected StreamingCluster getStreamingCluster() {
Map.of("serviceUrl", pulsarContainer.getHttpServiceUrl()),
"service",
Map.of("serviceUrl", pulsarContainer.getBrokerUrl()),
"defaultTenant",
"default-tenant",
"public",
"defaultNamespace",
"default-namespace",
"default"));
}

Expand All @@ -60,8 +60,8 @@ public ApplicationStore store() {
serviceUrl: "%s"
service:
serviceUrl: "%s"
defaultTenant: "public"
defaultNamespace: "default"
default-tenant: "public"
default-namespace: "default"
computeCluster:
type: "none"
"""
Expand Down
20 changes: 0 additions & 20 deletions langstream-pulsar-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,30 +58,10 @@
<artifactId>pulsar</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,37 @@
public class PulsarClientUtils {
private static final ObjectMapper MAPPER = new ObjectMapper();

public static PulsarAdmin buildPulsarAdmin(StreamingCluster streamingCluster) throws Exception {
final PulsarClusterRuntimeConfiguration pulsarClusterRuntimeConfiguration =
getPulsarClusterRuntimeConfiguration(streamingCluster);
Map<String, Object> adminConfig = pulsarClusterRuntimeConfiguration.getAdmin();
public static PulsarAdmin buildPulsarAdmin(
PulsarClusterRuntimeConfiguration pulsarClusterRuntimeConfiguration) throws Exception {
Map<String, Object> adminConfig = pulsarClusterRuntimeConfiguration.admin();
if (adminConfig == null) {
adminConfig = new HashMap<>();
} else {
adminConfig = new HashMap<>(adminConfig);
}
if (pulsarClusterRuntimeConfiguration.getAuthentication() != null) {
adminConfig.putAll(pulsarClusterRuntimeConfiguration.getAuthentication());
if (pulsarClusterRuntimeConfiguration.authentication() != null) {
adminConfig.putAll(pulsarClusterRuntimeConfiguration.authentication());
}
adminConfig.putIfAbsent("serviceUrl", "http://localhost:8080");
return PulsarAdmin.builder().loadConf(adminConfig).build();
}

public static PulsarAdmin buildPulsarAdmin(StreamingCluster streamingCluster) throws Exception {
return buildPulsarAdmin(getPulsarClusterRuntimeConfiguration(streamingCluster));
}

public static PulsarClient buildPulsarClient(StreamingCluster streamingCluster)
throws Exception {
final PulsarClusterRuntimeConfiguration pulsarClusterRuntimeConfiguration =
getPulsarClusterRuntimeConfiguration(streamingCluster);
Map<String, Object> clientConfig = pulsarClusterRuntimeConfiguration.getService();
Map<String, Object> clientConfig = pulsarClusterRuntimeConfiguration.service();
if (clientConfig == null) {
clientConfig = new HashMap<>();
} else {
clientConfig = new HashMap<>(clientConfig);
}
if (pulsarClusterRuntimeConfiguration.getAuthentication() != null) {
clientConfig.putAll(pulsarClusterRuntimeConfiguration.getAuthentication());
if (pulsarClusterRuntimeConfiguration.authentication() != null) {
clientConfig.putAll(pulsarClusterRuntimeConfiguration.authentication());
}
clientConfig.putIfAbsent("serviceUrl", "pulsar://localhost:6650");
return PulsarClient.builder().loadConf(clientConfig).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package ai.langstream.pulsar.runner;

import static ai.langstream.pulsar.PulsarClientUtils.buildPulsarAdmin;
import static ai.langstream.pulsar.PulsarClientUtils.getPulsarClusterRuntimeConfiguration;
import static java.util.Map.entry;

import ai.langstream.api.model.Application;
Expand All @@ -35,6 +36,7 @@
import ai.langstream.api.runtime.ExecutionPlan;
import ai.langstream.api.runtime.Topic;
import ai.langstream.pulsar.PulsarClientUtils;
import ai.langstream.pulsar.PulsarClusterRuntimeConfiguration;
import ai.langstream.pulsar.PulsarTopic;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
Expand Down Expand Up @@ -71,6 +73,7 @@
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down Expand Up @@ -165,14 +168,46 @@ public TopicAdmin createTopicAdmin(
@SneakyThrows
public void deploy(ExecutionPlan applicationInstance) {
Application logicalInstance = applicationInstance.getApplication();
try (PulsarAdmin admin =
buildPulsarAdmin(logicalInstance.getInstance().streamingCluster())) {
PulsarClusterRuntimeConfiguration configuration =
getPulsarClusterRuntimeConfiguration(
logicalInstance.getInstance().streamingCluster());
try (PulsarAdmin admin = buildPulsarAdmin(configuration)) {
applyRetentionPolicies(
admin,
"%s/%s"
.formatted(
configuration.defaultTenant(),
configuration.defaultNamespace()),
configuration.defaultRetentionPolicies());
for (Topic topic : applicationInstance.getLogicalTopics()) {
deployTopic(admin, (PulsarTopic) topic);
}
}
}

private static void applyRetentionPolicies(
PulsarAdmin admin,
String namespace,
PulsarClusterRuntimeConfiguration.RetentionPolicies policies)
throws PulsarAdminException {
if (policies == null) {
return;
}
if (policies.retentionSizeInMB() == null && policies.retentionTimeInMinutes() == null) {
return;
}
admin.namespaces()
.setRetention(
namespace,
new RetentionPolicies(
policies.retentionTimeInMinutes() == null
? -1
: policies.retentionTimeInMinutes(),
policies.retentionSizeInMB() == null
? -1
: policies.retentionSizeInMB()));
}

private static void deployTopic(PulsarAdmin admin, PulsarTopic topic)
throws PulsarAdminException {
String createMode = topic.createMode();
Expand Down
20 changes: 0 additions & 20 deletions langstream-pulsar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,30 +44,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,26 @@
*/
package ai.langstream.pulsar;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class PulsarClusterRuntimeConfiguration {
public record PulsarClusterRuntimeConfiguration(
Map<String, Object> admin,
Map<String, Object> service,
Map<String, Object> authentication,
@JsonProperty("default-tenant") String defaultTenant,
@JsonProperty("default-namespace") String defaultNamespace,
@JsonProperty("default-retention-policies") RetentionPolicies defaultRetentionPolicies) {
public record RetentionPolicies(
@JsonProperty("retention-time-in-minutes") Integer retentionTimeInMinutes,
@JsonProperty("retention-size-in-mb") Long retentionSizeInMB) {}

private Map<String, Object> admin;
private Map<String, Object> service;

private Map<String, Object> authentication;
private String defaultTenant = "public";
private String defaultNamespace = "default";
public PulsarClusterRuntimeConfiguration {
if (defaultTenant == null) {
defaultTenant = "public";
}
if (defaultNamespace == null) {
defaultNamespace = "public";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public Topic createTopicImplementation(
SchemaDefinition keySchema = topicDefinition.getKeySchema();
SchemaDefinition valueSchema = topicDefinition.getValueSchema();
String name = topicDefinition.getName();
String tenant = config.getDefaultTenant();
String tenant = config.defaultTenant();
String creationMode = topicDefinition.getCreationMode();
String namespace = config.getDefaultNamespace();
String namespace = config.defaultNamespace();
PulsarName topicName = new PulsarName(tenant, namespace, name);
return new PulsarTopic(
topicName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ private String buildInstanceYaml() {
serviceUrl: "%s"
service:
serviceUrl: "%s"
defaultTenant: "public"
defaultNamespace: "default"
default-tenant: "public"
default-namespace: "default"
computeCluster:
type: "kubernetes"
"""
Expand Down

0 comments on commit d483fc5

Please sign in to comment.