Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

function to read compacted topics #7193

Merged
merged 6 commits into from
Jul 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpServer;

Expand All @@ -54,6 +55,8 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -73,6 +76,7 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
Expand All @@ -90,6 +94,7 @@
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.functions.LocalRunner;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
Expand Down Expand Up @@ -501,6 +506,135 @@ public void testE2EPulsarFunctionWithUrl() throws Exception {
testE2EPulsarFunction(jarFilePathUrl);
}

@Test(timeOut = 30000)
public void testReadCompactedFunction() throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
final String sinkTopic = "persistent://" + replNamespace + "/output";
final String functionName = "PulsarFunction-test";
final String subscriptionName = "test-sub";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
final int messageNum = 20;
final int maxKeys = 10;
// 1 Setup producer
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(sourceTopic)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
pulsarClient.newConsumer().topic(sourceTopic).subscriptionName(subscriptionName).readCompacted(true).subscribe().close();
// 2 Send messages and record the expected values after compaction
Map<String, String> expected = new HashMap<>();
for (int j = 0; j < messageNum; j++) {
String key = "key" + j % maxKeys;
String value = "my-message-" + key + j;
producer.newMessage().key(key).value(value).send();
//Duplicate keys will exist, the value of the new key will be retained
expected.put(key, value);
}
// 3 Trigger compaction
ScheduledExecutorService compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
TwoPhaseCompactor twoPhaseCompactor = new TwoPhaseCompactor(config,
pulsarClient, pulsar.getBookKeeperClient(), compactionScheduler);
twoPhaseCompactor.compact(sourceTopic).get();

// 4 Setup function
FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
sourceTopic, sinkTopic, subscriptionName);
Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
ConsumerConfig consumerConfig = new ConsumerConfig();
Map<String,String> consumerProperties = new HashMap<>();
consumerProperties.put("readCompacted","true");
consumerConfig.setConsumerProperties(consumerProperties);
inputSpecs.put(sourceTopic, consumerConfig);
functionConfig.setInputSpecs(inputSpecs);
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);

// 5 Function should only read compacted value,so we will only receive compacted messages
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sink-sub").subscribe();
int count = 0;
while (true) {
Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
if (message == null) {
break;
}
consumer.acknowledge(message);
count++;
Assert.assertEquals(expected.remove(message.getKey()) + "!", message.getValue());
}
Assert.assertEquals(count, maxKeys);
Assert.assertTrue(expected.isEmpty());

compactionScheduler.shutdownNow();
consumer.close();
producer.close();
}

@Test(timeOut = 30000)
public void testReadCompactedSink() throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/my-topic2";
final String sinkName = "PulsarFunction-test";
final String subscriptionName = "test-sub";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
final int messageNum = 20;
final int maxKeys = 10;
// 1 Setup producer
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(sourceTopic)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
pulsarClient.newConsumer().topic(sourceTopic).subscriptionName(subscriptionName).readCompacted(true).subscribe().close();
// 2 Send messages and record the expected values after compaction
Map<String, String> expected = new HashMap<>();
for (int j = 0; j < messageNum; j++) {
String key = "key" + j % maxKeys;
String value = "my-message-" + key + j;
producer.newMessage().key(key).value(value).send();
//Duplicate keys will exist, the value of the new key will be retained
expected.put(key, value);
}
// 3 Trigger compaction
ScheduledExecutorService compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
TwoPhaseCompactor twoPhaseCompactor = new TwoPhaseCompactor(config,
pulsarClient, pulsar.getBookKeeperClient(), compactionScheduler);
twoPhaseCompactor.compact(sourceTopic).get();

// 4 Setup sink
SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, sinkName, sourceTopic, subscriptionName);
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
Map<String,String> consumerProperties = new HashMap<>();
consumerProperties.put("readCompacted","true");
sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().consumerProperties(consumerProperties).build()));
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl);

// 5 Sink should only read compacted value,so we will only receive compacted messages
retryStrategically((test) -> {
try {
String prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
Metric m = metrics.get("pulsar_sink_received_total");
return m.value == (double) maxKeys;
} catch (Exception e) {
return false;
}
}, 50, 1000);

compactionScheduler.shutdownNow();
producer.close();
}

@Test(timeOut = 30000)
private void testPulsarSinkDLQ() throws Exception {
final String namespacePortion = "io";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.UpdateOptions;
Expand Down Expand Up @@ -235,6 +236,8 @@ abstract class FunctionDetailsCommand extends BaseCommand {
protected String customSchemaInputString;
@Parameter(names = "--custom-schema-outputs", description = "The map of input topics to Schema properties (as a JSON string)")
protected String customSchemaOutputString;
@Parameter(names = "--input-specs", description = "The map of inputs to custom configuration (as a JSON string)")
315157973 marked this conversation as resolved.
Show resolved Hide resolved
protected String inputSpecs;
// for backwards compatibility purposes
@Parameter(names = "--outputSerdeClassName", description = "The SerDe class to be used for messages output by the function", hidden = true)
protected String DEPRECATED_outputSerdeClassName;
Expand Down Expand Up @@ -375,6 +378,10 @@ void processArguments() throws Exception {
Map<String, String> customSchemaOutputMap = new Gson().fromJson(customSchemaOutputString, type);
functionConfig.setCustomSchemaOutputs(customSchemaOutputMap);
}
if (null != inputSpecs) {
Type type = new TypeToken<Map<String, ConsumerConfig>>() {}.getType();
functionConfig.setInputSpecs(new Gson().fromJson(inputSpecs, type));
}
if (null != topicsPattern) {
functionConfig.setTopicsPattern(topicsPattern);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.UpdateOptions;
Expand Down Expand Up @@ -276,6 +277,9 @@ abstract class SinkDetailsCommand extends BaseCommand {
@Parameter(names = "--custom-schema-inputs", description = "The map of input topics to Schema types or class names (as a JSON string)")
protected String customSchemaInputString;

@Parameter(names = "--input-specs", description = "The map of inputs to custom configuration (as a JSON string)")
315157973 marked this conversation as resolved.
Show resolved Hide resolved
protected String inputSpecs;

@Parameter(names = "--max-redeliver-count", description = "Maximum number of times that a message will be redelivered before being sent to the dead letter queue")
protected Integer maxMessageRetries;
@Parameter(names = "--dead-letter-topic", description = "Name of the dead topic where the failing messages will be sent.")
Expand Down Expand Up @@ -386,6 +390,11 @@ void processArguments() throws Exception {
sinkConfig.setTopicToSchemaType(customSchemaInputMap);
}

if(null != inputSpecs){
Type type = new TypeToken<Map<String, ConsumerConfig>>(){}.getType();
sinkConfig.setInputSpecs(new Gson().fromJson(inputSpecs, type));
}

sinkConfig.setMaxMessageRetries(maxMessageRetries);
if (null != deadLetterTopic) {
sinkConfig.setDeadLetterTopic(deadLetterTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class ConsumerConfig {
private boolean isRegexPattern;
@Builder.Default
private Map<String, String> schemaProperties = new HashMap<>();
@Builder.Default
private Map<String, String> consumerProperties = new HashMap<>();
private Integer receiverQueueSize;

public ConsumerConfig(String schemaType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
consumerConfig.setSerdeClassName(conf.getSerdeClassName());
}
consumerConfig.setSchemaProperties(conf.getSchemaPropertiesMap());
consumerConfig.setConsumerProperties(conf.getConsumerPropertiesMap());
if (conf.hasReceiverQueueSize()) {
consumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize().getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,13 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
.cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
.subscriptionName(pulsarSourceConfig.getSubscriptionName())
.subscriptionInitialPosition(pulsarSourceConfig.getSubscriptionPosition())
.subscriptionType(pulsarSourceConfig.getSubscriptionType())
.messageListener(this);
.subscriptionType(pulsarSourceConfig.getSubscriptionType());

if (conf.getConsumerProperties() != null && !conf.getConsumerProperties().isEmpty()) {
cb.loadConf(new HashMap<>(conf.getConsumerProperties()));
}
//messageListener is annotated with @JsonIgnore,so setting messageListener should be put behind loadConf
cb.messageListener(this);

if (conf.isRegexPattern) {
cb = cb.topicsPattern(topic);
Expand All @@ -94,7 +99,6 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
if (pulsarSourceConfig.getTimeoutMs() != null) {
cb = cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS);
}

if (pulsarSourceConfig.getMaxMessageRetries() != null && pulsarSourceConfig.getMaxMessageRetries() >= 0) {
DeadLetterPolicy.DeadLetterPolicyBuilder deadLetterPolicyBuilder = DeadLetterPolicy.builder();
deadLetterPolicyBuilder.maxRedeliverCount(pulsarSourceConfig.getMaxMessageRetries());
Expand Down Expand Up @@ -169,7 +173,11 @@ Map<String, ConsumerConfig<T>> setupConsumerConfigs() throws ClassNotFoundExcept
schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf, true);
}
configs.put(topic,
ConsumerConfig.<T> builder().schema(schema).isRegexPattern(conf.isRegexPattern()).receiverQueueSize(conf.getReceiverQueueSize()).build());
ConsumerConfig.<T> builder().
schema(schema).
isRegexPattern(conf.isRegexPattern()).
receiverQueueSize(conf.getReceiverQueueSize()).
consumerProperties(conf.getConsumerProperties()).build());
});

return configs;
Expand All @@ -189,6 +197,7 @@ private static class ConsumerConfig<T> {
private Schema<T> schema;
private boolean isRegexPattern;
private Integer receiverQueueSize;
private Map<String, String> consumerProperties;
}

}
3 changes: 3 additions & 0 deletions pulsar-functions/proto/src/main/proto/Function.proto
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ message ConsumerSpec {
}
ReceiverQueueSize receiverQueueSize = 4;
map<string, string> schemaProperties = 5;
map<string, string> consumerProperties = 6;
}

message SourceSpec {
Expand Down Expand Up @@ -149,6 +150,8 @@ message SinkSpec {
bool forwardSourceMessageProperty = 8;

map<string, string> schemaProperties = 9;

map<string, string> consumerProperties = 10;
}

message PackageLocationMetaData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader
Function.ConsumerSpec.newBuilder()
.setSchemaType(consumerConfig.getSchemaType())
.putAllSchemaProperties(consumerConfig.getSchemaProperties())
.putAllConsumerProperties(consumerConfig.getConsumerProperties())
.setIsRegexPattern(false)
.build());
} catch (JsonProcessingException e) {
Expand All @@ -132,6 +133,7 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader
if (consumerConf.getSchemaProperties() != null) {
bldr.putAllSchemaProperties(consumerConf.getSchemaProperties());
}
bldr.putAllConsumerProperties(consumerConf.getConsumerProperties());
sourceSpecBuilder.putInputSpecs(topicName, bldr.build());
});
}
Expand Down Expand Up @@ -180,6 +182,7 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader
if (StringUtils.isNotEmpty(conf)) {
ConsumerConfig consumerConfig = OBJECT_MAPPER.readValue(conf, ConsumerConfig.class);
sinkSpecBuilder.putAllSchemaProperties(consumerConfig.getSchemaProperties());
sinkSpecBuilder.putAllConsumerProperties(consumerConfig.getConsumerProperties());
}
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(String.format("Incorrect custom schema outputs ,Topic %s ", functionConfig.getOutput()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
bldr.setReceiverQueueSize(Function.ConsumerSpec.ReceiverQueueSize.newBuilder()
.setValue(spec.getReceiverQueueSize()).build());
}
bldr.putAllConsumerProperties(spec.getConsumerProperties());
sourceSpecBuilder.putInputSpecs(topic, bldr.build());
});
}
Expand Down Expand Up @@ -259,6 +260,7 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
consumerConfig.setReceiverQueueSize(input.getValue().getReceiverQueueSize().getValue());
}
consumerConfig.setRegexPattern(input.getValue().getIsRegexPattern());
consumerConfig.setConsumerProperties(input.getValue().getConsumerPropertiesMap());
consumerConfigMap.put(input.getKey(), consumerConfig);
}
sinkConfig.setInputSpecs(consumerConfigMap);
Expand Down