Skip to content

Commit

Permalink
Forward user-properties to sink (#2057)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Jul 1, 2018
1 parent f14b9e9 commit 7cebe23
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 21 deletions.
Expand Up @@ -32,10 +32,6 @@
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import org.apache.bookkeeper.test.PortManager; import org.apache.bookkeeper.test.PortManager;
Expand All @@ -49,9 +45,12 @@
import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.SubscriptionStats;
Expand Down Expand Up @@ -80,7 +79,6 @@
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.gson.Gson; import com.google.gson.Gson;


import io.netty.util.concurrent.DefaultThreadFactory;
import jersey.repackaged.com.google.common.collect.Lists; import jersey.repackaged.com.google.common.collect.Lists;


/** /**
Expand Down Expand Up @@ -240,22 +238,26 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
@Test(timeOut = 20000) @Test(timeOut = 20000)
public void testE2EPulsarSink() throws Exception { public void testE2EPulsarSink() throws Exception {


final String namespacePortion = "myReplNs"; final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion; final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/my-topic1"; final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
final String sinkTopic = "persistent://" + replNamespace + "/output";
final String propertyKey = "key";
final String propertyValue = "value";
admin.namespaces().createNamespace(replNamespace); admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);


// create a producer that creates a topic at broker // create a producer that creates a topic at broker
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(sourceTopic); Producer<byte[]> producer = pulsarClient.newProducer().topic(sourceTopic).create();
Producer<byte[]> producer = producerBuilder.create(); Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(sinkTopic).subscriptionName("sub").subscribe();


String jarFilePathUrl = Utils.FILE + ":" String jarFilePathUrl = Utils.FILE + ":"
+ PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, "PulsarSink-test"); FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, "PulsarSink-test",
sinkTopic);
admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);

// try to update function to test: update-function functionality // try to update function to test: update-function functionality
admin.functions().updateFunctionWithUrl(functionDetails, jarFilePathUrl); admin.functions().updateFunctionWithUrl(functionDetails, jarFilePathUrl);


Expand All @@ -271,8 +273,8 @@ public void testE2EPulsarSink() throws Exception {


int totalMsgs = 5; int totalMsgs = 5;
for (int i = 0; i < totalMsgs; i++) { for (int i = 0; i < totalMsgs; i++) {
String message = "my-message-" + i; String data = "my-message-" + i;
producer.send(message.getBytes()); producer.newMessage().property(propertyKey, propertyValue).value(data.getBytes()).send();
} }
retryStrategically((test) -> { retryStrategically((test) -> {
try { try {
Expand All @@ -283,14 +285,20 @@ public void testE2EPulsarSink() throws Exception {
return false; return false;
} }
}, 5, 150); }, 5, 150);

Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedPropertyValue = msg.getProperty(propertyKey);
Assert.assertEquals(propertyValue, receivedPropertyValue);

// validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked messages // validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked messages
// due to publish failure // due to publish failure
Assert.assertNotEquals( Assert.assertNotEquals(
admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages, totalMsgs); admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
totalMsgs);


} }


protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String sinkName) { protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String sinkName, String sinkTopic) {


File file = new File(jarFile); File file = new File(jarFile);
try { try {
Expand Down Expand Up @@ -322,7 +330,7 @@ protected FunctionDetails createSinkConfig(String jarFile, String tenant, String
// set up sink spec // set up sink spec
SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder(); SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
// sinkSpecBuilder.setClassName(PulsarSink.class.getName()); // sinkSpecBuilder.setClassName(PulsarSink.class.getName());
sinkSpecBuilder.setTopic(String.format("persistent://%s/%s/%s", tenant, namespace, "output")); sinkSpecBuilder.setTopic(sinkTopic);
Map<String, Object> sinkConfigMap = Maps.newHashMap(); Map<String, Object> sinkConfigMap = Maps.newHashMap();
sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfigMap)); sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfigMap));
sinkSpecBuilder.setTypeClassName(typeArg.getName()); sinkSpecBuilder.setTypeClassName(typeArg.getName());
Expand Down
Expand Up @@ -184,8 +184,8 @@ public void run() {


if (currentRecord instanceof PulsarRecord) { if (currentRecord instanceof PulsarRecord) {
PulsarRecord pulsarRecord = (PulsarRecord) currentRecord; PulsarRecord pulsarRecord = (PulsarRecord) currentRecord;
messageId = pulsarRecord.getMessageId(); messageId = pulsarRecord.getMessageId();
topicName = pulsarRecord.getTopicName(); topicName = pulsarRecord.getTopicName();
} }
result = javaInstance.handleMessage(messageId, topicName, currentRecord.getValue()); result = javaInstance.handleMessage(messageId, topicName, currentRecord.getValue());


Expand Down
Expand Up @@ -221,10 +221,13 @@ public void write(RecordContext recordContext, T value) throws Exception {
msgBuilder.setContent(output); msgBuilder.setContent(output);
if (recordContext instanceof PulsarRecord) { if (recordContext instanceof PulsarRecord) {
PulsarRecord pulsarRecord = (PulsarRecord) recordContext; PulsarRecord pulsarRecord = (PulsarRecord) recordContext;
msgBuilder // forward user properties to sink-topic
.setProperty("__pfn_input_topic__", pulsarRecord.getTopicName()) if (pulsarRecord.getProperties() != null) {
.setProperty("__pfn_input_msg_id__", new String( msgBuilder.setProperties(pulsarRecord.getProperties());
Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray()))); }
msgBuilder.setProperty("__pfn_input_topic__", pulsarRecord.getTopicName()).setProperty(
"__pfn_input_msg_id__",
new String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
} }


this.pulsarSinkProcessor.sendOutputMessage(msgBuilder, recordContext); this.pulsarSinkProcessor.sendOutputMessage(msgBuilder, recordContext);
Expand Down
Expand Up @@ -23,6 +23,9 @@
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.Getter; import lombok.Getter;
import lombok.ToString; import lombok.ToString;

import java.util.Map;

import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.io.core.Record; import org.apache.pulsar.io.core.Record;


Expand All @@ -38,6 +41,7 @@ public class PulsarRecord<T> implements Record<T> {
private T value; private T value;
private MessageId messageId; private MessageId messageId;
private String topicName; private String topicName;
private Map<String, String> properties;
private Runnable failFunction; private Runnable failFunction;
private Runnable ackFunction; private Runnable ackFunction;


Expand Down
Expand Up @@ -132,6 +132,7 @@ public Record<T> read() throws Exception {
.partitionId(String.format("%s-%s", topicName, partitionId)) .partitionId(String.format("%s-%s", topicName, partitionId))
.recordSequence(Utils.getSequenceId(message.getMessageId())) .recordSequence(Utils.getSequenceId(message.getMessageId()))
.topicName(topicName) .topicName(topicName)
.properties(message.getProperties())
.ackFunction(() -> { .ackFunction(() -> {
if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
inputConsumer.acknowledgeCumulativeAsync(message); inputConsumer.acknowledgeCumulativeAsync(message);
Expand Down

0 comments on commit 7cebe23

Please sign in to comment.