Skip to content

Commit

Permalink
Functions schema integration (#1845)
Browse files Browse the repository at this point in the history
* wip

* Removed internal shading

* wip

* Fixed handling of IllegalArgumentException in ZK client wrapper

* wip

* Extend SerDe<T> with Schema<T> and implement PulsarSink

* Add Typed Consumers to functions

* Munge functions and schemas together

* Remove consumer/producer changes

* formatting

* Review changes

* Remove context cache and add setConsumer to context

* Addressed Jerry's comments

* Use pulsar-client-original and fix the tests
  • Loading branch information
mgodave authored and jerrypeng committed Jun 20, 2018
1 parent da48f42 commit 97b56cf
Show file tree
Hide file tree
Showing 12 changed files with 195 additions and 110 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;


import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
Expand Down Expand Up @@ -117,8 +118,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
this.namespaceName = conf.getTopicNames().stream().findFirst() this.namespaceName = conf.getTopicNames().stream().findFirst()
.flatMap(s -> Optional.of(TopicName.get(s).getNamespaceObject())).get(); .flatMap(s -> Optional.of(TopicName.get(s).getNamespaceObject())).get();


List<CompletableFuture<Void>> futures = conf.getTopicNames().stream().map(t -> subscribeAsync(t)) List<CompletableFuture<Void>> futures =
conf.getTopicNames().stream()
.map(this::subscribeAsync)
.collect(Collectors.toList()); .collect(Collectors.toList());

FutureUtil.waitForAll(futures) FutureUtil.waitForAll(futures)
.thenAccept(finalFuture -> { .thenAccept(finalFuture -> {
try { try {
Expand All @@ -127,7 +131,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
} }
setState(State.Ready); setState(State.Ready);
// We have successfully created N consumers, so we can start receiving messages now // We have successfully created N consumers, so we can start receiving messages now
startReceivingMessages(consumers.values().stream().collect(Collectors.toList())); startReceivingMessages(new ArrayList<>(consumers.values()));
subscribeFuture().complete(MultiTopicsConsumerImpl.this); subscribeFuture().complete(MultiTopicsConsumerImpl.this);
log.info("[{}] [{}] Created topics consumer with {} sub-consumers", log.info("[{}] [{}] Created topics consumer with {} sub-consumers",
topic, subscription, allTopicPartitionsNumber.get()); topic, subscription, allTopicPartitionsNumber.get());
Expand Down
6 changes: 6 additions & 0 deletions pulsar-functions/api-java/pom.xml
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
</dependency> </dependency>


<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-original</artifactId>
<version>${project.version}</version>
</dependency>

<dependency> <dependency>
<groupId>net.jodah</groupId> <groupId>net.jodah</groupId>
<artifactId>typetools</artifactId> <artifactId>typetools</artifactId>
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -18,10 +18,36 @@
*/ */
package org.apache.pulsar.functions.api; package org.apache.pulsar.functions.api;


import java.util.Collections;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

/** /**
* An interface for serializer/deserializer. * An interface for serializer/deserializer.
*/ */
public interface SerDe<T> { public interface SerDe<T> extends Schema<T> {
T deserialize(byte[] input); T deserialize(byte[] input);

byte[] serialize(T input); byte[] serialize(T input);

@Override
default SchemaInfo getSchemaInfo() {
SchemaInfo info = new SchemaInfo();
info.setName("");
info.setType(SchemaType.NONE);
info.setSchema(new byte[0]);
info.setProperties(Collections.emptyMap());
return info;
}

@Override
default byte[] encode(T message) {
return serialize(message);
}

@Override
default T decode(byte[] bytes) {
return deserialize(bytes);
}
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
} }
} }


public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
ClassLoader classLoader) {
this(config, logger, client, classLoader, null);
}

public void setInputConsumer(Consumer inputConsumer) {
this.inputConsumer = inputConsumer;
}

public void setCurrentMessageContext(MessageId messageId, String topicName) { public void setCurrentMessageContext(MessageId messageId, String topicName) {
this.messageId = messageId; this.messageId = messageId;
this.currentTopicName = topicName; this.currentTopicName = topicName;
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
*/ */
package org.apache.pulsar.functions.instance; package org.apache.pulsar.functions.instance;


import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import javax.swing.text.html.Option;
import lombok.AccessLevel; import lombok.AccessLevel;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
import org.apache.pulsar.io.core.Source;

import org.apache.pulsar.functions.source.PulsarSource; import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.io.core.Source;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand All @@ -38,11 +41,12 @@
*/ */
@Slf4j @Slf4j
public class JavaInstance implements AutoCloseable { public class JavaInstance implements AutoCloseable {
private ContextImpl context;


@Getter(AccessLevel.PACKAGE) @Getter(AccessLevel.PACKAGE)
private final ContextImpl context;
private Function function; private Function function;
private java.util.function.Function javaUtilFunction; private java.util.function.Function javaUtilFunction;
private Optional<PulsarSource> optionalPulsarSource = Optional.empty();


public JavaInstance(InstanceConfig config, Object userClassObject, public JavaInstance(InstanceConfig config, Object userClassObject,
ClassLoader clsLoader, ClassLoader clsLoader,
Expand All @@ -52,8 +56,8 @@ public JavaInstance(InstanceConfig config, Object userClassObject,
Logger instanceLog = LoggerFactory.getLogger("function-" + config.getFunctionDetails().getName()); Logger instanceLog = LoggerFactory.getLogger("function-" + config.getFunctionDetails().getName());


if (source instanceof PulsarSource) { if (source instanceof PulsarSource) {
this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader);
((PulsarSource) source).getInputConsumer()); optionalPulsarSource = Optional.of((PulsarSource) source);
} else { } else {
this.context = null; this.context = null;
} }
Expand All @@ -64,13 +68,17 @@ public JavaInstance(InstanceConfig config, Object userClassObject,
} else { } else {
this.javaUtilFunction = (java.util.function.Function) userClassObject; this.javaUtilFunction = (java.util.function.Function) userClassObject;
} }

} }


public JavaExecutionResult handleMessage(MessageId messageId, String topicName, Object input) { public JavaExecutionResult handleMessage(MessageId messageId, String topicName, Object input) {
if (context != null) { optionalPulsarSource.ifPresent((pulsarSource) -> {
context.setCurrentMessageContext(messageId, topicName); this.context.setInputConsumer(pulsarSource.getConsumerForTopic(topicName));
} this.context.setCurrentMessageContext(messageId, topicName);
});

JavaExecutionResult executionResult = new JavaExecutionResult(); JavaExecutionResult executionResult = new JavaExecutionResult();

try { try {
Object output; Object output;
if (function != null) { if (function != null) {
Expand All @@ -85,11 +93,15 @@ public JavaExecutionResult handleMessage(MessageId messageId, String topicName,
return executionResult; return executionResult;
} }


public ContextImpl getContext() {
return this.context;
}

@Override @Override
public void close() { public void close() {
} }


public InstanceCommunication.MetricsData getAndResetMetrics() { public MetricsData getAndResetMetrics() {
return context.getAndResetMetrics(); return context.getAndResetMetrics();
} }
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -27,23 +27,27 @@
import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.functions.instance.FunctionResultRouter; import org.apache.pulsar.functions.instance.FunctionResultRouter;


public abstract class AbstractOneOuputTopicProducers implements Producers { public abstract class AbstractOneOuputTopicProducers<T> implements Producers<T> {


protected final PulsarClient client; protected final PulsarClient client;
protected final String outputTopic; protected final String outputTopic;
protected final Schema<T> schema;


AbstractOneOuputTopicProducers(PulsarClient client, AbstractOneOuputTopicProducers(PulsarClient client,
String outputTopic) String outputTopic,
Schema<T> schema)
throws PulsarClientException { throws PulsarClientException {
this.client = client; this.client = client;
this.outputTopic = outputTopic; this.outputTopic = outputTopic;
this.schema = schema;
} }


static ProducerBuilder<byte[]> newProducerBuilder(PulsarClient client) { static <U> ProducerBuilder<U> newProducerBuilder(PulsarClient client, Schema<U> schema) {
// use function result router to deal with different processing guarantees. // use function result router to deal with different processing guarantees.
return client.newProducer() // return client.newProducer(schema) //
.blockIfQueueFull(true) // .blockIfQueueFull(true) //
.enableBatching(true) // .enableBatching(true) //
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) // .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) //
Expand All @@ -53,23 +57,23 @@ static ProducerBuilder<byte[]> newProducerBuilder(PulsarClient client) {
.messageRouter(FunctionResultRouter.of()); .messageRouter(FunctionResultRouter.of());
} }


protected Producer<byte[]> createProducer(String topic) protected Producer<T> createProducer(String topic, Schema<T> schema)
throws PulsarClientException { throws PulsarClientException {
return createProducer(client, topic); return createProducer(client, topic, schema);
} }


public static Producer<byte[]> createProducer(PulsarClient client, String topic) public static <T> Producer<T> createProducer(PulsarClient client, String topic, Schema<T> schema)
throws PulsarClientException { throws PulsarClientException {
return newProducerBuilder(client).topic(topic).create(); return newProducerBuilder(client, schema).topic(topic).create();
} }


protected Producer<byte[]> createProducer(String topic, String producerName) protected Producer<T> createProducer(String topic, String producerName, Schema<T> schema)
throws PulsarClientException { throws PulsarClientException {
return createProducer(client, topic, producerName); return createProducer(client, topic, producerName, schema);
} }


public static Producer<byte[]> createProducer(PulsarClient client, String topic, String producerName) public static <T> Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema)
throws PulsarClientException { throws PulsarClientException {
return newProducerBuilder(client).topic(topic).producerName(producerName).create(); return newProducerBuilder(client, schema).topic(topic).producerName(producerName).create();
} }
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.functions.instance.producers; package org.apache.pulsar.functions.instance.producers;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
Expand All @@ -31,19 +30,21 @@
import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;


@Slf4j @Slf4j
public class MultiConsumersOneOuputTopicProducers extends AbstractOneOuputTopicProducers { public class MultiConsumersOneOuputTopicProducers<T> extends AbstractOneOuputTopicProducers<T> {


@Getter(AccessLevel.PACKAGE) @Getter(AccessLevel.PACKAGE)
// PartitionId -> producer // PartitionId -> producer
private final Map<String, Producer<byte[]>> producers; private final Map<String, Producer<T>> producers;




public MultiConsumersOneOuputTopicProducers(PulsarClient client, public MultiConsumersOneOuputTopicProducers(PulsarClient client,
String outputTopic) String outputTopic,
Schema<T> schema)
throws PulsarClientException { throws PulsarClientException {
super(client, outputTopic); super(client, outputTopic, schema);
this.producers = new ConcurrentHashMap<>(); this.producers = new ConcurrentHashMap<>();
} }


Expand All @@ -57,18 +58,18 @@ static String makeProducerName(String srcTopicName, String srcTopicPartition) {
} }


@Override @Override
public synchronized Producer<byte[]> getProducer(String srcPartitionId) throws PulsarClientException { public synchronized Producer<T> getProducer(String srcPartitionId) throws PulsarClientException {
Producer<byte[]> producer = producers.get(srcPartitionId); Producer<T> producer = producers.get(srcPartitionId);
if (null == producer) { if (null == producer) {
producer = createProducer(outputTopic, srcPartitionId); producer = createProducer(outputTopic, srcPartitionId, schema);
producers.put(srcPartitionId, producer); producers.put(srcPartitionId, producer);
} }
return producer; return producer;
} }


@Override @Override
public synchronized void closeProducer(String srcPartitionId) { public synchronized void closeProducer(String srcPartitionId) {
Producer<byte[]> producer = producers.get(srcPartitionId); Producer<T> producer = producers.get(srcPartitionId);
if (null != producer) { if (null != producer) {
producer.closeAsync(); producer.closeAsync();
producers.remove(srcPartitionId); producers.remove(srcPartitionId);
Expand All @@ -78,7 +79,7 @@ public synchronized void closeProducer(String srcPartitionId) {
@Override @Override
public synchronized void close() { public synchronized void close() {
List<CompletableFuture<Void>> closeFutures = new ArrayList<>(producers.size()); List<CompletableFuture<Void>> closeFutures = new ArrayList<>(producers.size());
for (Producer<byte[]> producer: producers.values()) { for (Producer<T> producer: producers.values()) {
closeFutures.add(producer.closeAsync()); closeFutures.add(producer.closeAsync());
} }
try { try {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
/** /**
* An interface for managing publishers within a java instance. * An interface for managing publishers within a java instance.
*/ */
public interface Producers extends AutoCloseable { public interface Producers<T> extends AutoCloseable {


/** /**
* Initialize all the producers. * Initialize all the producers.
Expand All @@ -40,7 +40,7 @@ public interface Producers extends AutoCloseable {
* src partition Id * src partition Id
* @return the producer instance to produce messages * @return the producer instance to produce messages
*/ */
Producer<byte[]> getProducer(String srcPartitionId) throws PulsarClientException; Producer<T> getProducer(String srcPartitionId) throws PulsarClientException;


/** /**
* Close a producer specified by <tt>srcPartitionId</tt>. * Close a producer specified by <tt>srcPartitionId</tt>.
Expand Down
Loading

0 comments on commit 97b56cf

Please sign in to comment.