Skip to content

Commit

Permalink
Attach Producer/Consumer property tags so its easier to identify topi…
Browse files Browse the repository at this point in the history
…cs being produced/consumed by functions (#2490)
  • Loading branch information
srkukarni committed Aug 31, 2018
1 parent 5fb58d2 commit affbe17
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 42 deletions.
Expand Up @@ -553,7 +553,8 @@ public void setupInput(ContextImpl contextImpl) throws Exception {
if (sourceSpec.getTimeoutMs() > 0 ) { if (sourceSpec.getTimeoutMs() > 0 ) {
pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs()); pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs());
} }
object = new PulsarSource(this.client, pulsarSourceConfig); object = new PulsarSource(this.client, pulsarSourceConfig,
FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
} else { } else {
object = Reflections.createInstance( object = Reflections.createInstance(
sourceSpec.getClassName(), sourceSpec.getClassName(),
Expand Down Expand Up @@ -599,7 +600,8 @@ public void setupOutput(ContextImpl contextImpl) throws Exception {


pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName()); pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());


object = new PulsarSink(this.client, pulsarSinkConfig); object = new PulsarSink(this.client, pulsarSinkConfig,
FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
} }
} else { } else {
object = Reflections.createInstance( object = Reflections.createInstance(
Expand Down
Expand Up @@ -54,23 +54,20 @@ static <T> ProducerBuilder<T> newProducerBuilder(PulsarClient client, Schema<T>
.messageRouter(FunctionResultRouter.of()); .messageRouter(FunctionResultRouter.of());
} }


protected Producer<T> createProducer(String topic, Schema<T> schema) protected Producer<T> createProducerWithProducerName(String topic, String producerName, Schema<T> schema, String fqfn)
throws PulsarClientException { throws PulsarClientException {
return createProducer(client, topic, schema); return createProducer(client, topic, producerName, schema, fqfn);
} }


public static <T> Producer<T> createProducer(PulsarClient client, String topic, Schema<T> schema) public static <T> Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema, String fqfn)
throws PulsarClientException { throws PulsarClientException {
return newProducerBuilder(client, schema).topic(topic).create(); ProducerBuilder<T> builder = newProducerBuilder(client, schema).topic(topic);
} if (producerName != null) {
builder.producerName(producerName);
}


protected Producer<T> createProducer(String topic, String producerName, Schema<T> schema) return builder
throws PulsarClientException { .property("application", "pulsarfunction")
return createProducer(client, topic, schema, producerName); .property("fqfn", fqfn).create();
}

public static <T> Producer<T> createProducer(PulsarClient client, String topic, Schema<T> schema, String producerName)
throws PulsarClientException {
return newProducerBuilder(client, schema).topic(topic).producerName(producerName).create();
} }
} }
Expand Up @@ -42,14 +42,17 @@ public class MultiConsumersOneOuputTopicProducers<T> extends AbstractOneOuputTop
private final Map<String, Producer<T>> producers; private final Map<String, Producer<T>> producers;


private final Schema<T> schema; private final Schema<T> schema;
private final String fqfn;




public MultiConsumersOneOuputTopicProducers(PulsarClient client, public MultiConsumersOneOuputTopicProducers(PulsarClient client,
String outputTopic, Schema<T> schema) String outputTopic, Schema<T> schema,
String fqfn)
throws PulsarClientException { throws PulsarClientException {
super(client, outputTopic); super(client, outputTopic);
this.producers = new ConcurrentHashMap<>(); this.producers = new ConcurrentHashMap<>();
this.schema = schema; this.schema = schema;
this.fqfn = fqfn;
} }


@Override @Override
Expand All @@ -65,7 +68,7 @@ static String makeProducerName(String srcTopicName, String srcTopicPartition) {
public synchronized Producer<T> getProducer(String srcPartitionId) throws PulsarClientException { public synchronized Producer<T> getProducer(String srcPartitionId) throws PulsarClientException {
Producer<T> producer = producers.get(srcPartitionId); Producer<T> producer = producers.get(srcPartitionId);
if (null == producer) { if (null == producer) {
producer = createProducer(outputTopic, srcPartitionId, schema); producer = createProducerWithProducerName(outputTopic, srcPartitionId, schema, fqfn);
producers.put(srcPartitionId, producer); producers.put(srcPartitionId, producer);
} }
return producer; return producer;
Expand Down
Expand Up @@ -57,9 +57,10 @@ public class PulsarSink<T> implements Sink<T> {
private PulsarSinkProcessor<T> pulsarSinkProcessor; private PulsarSinkProcessor<T> pulsarSinkProcessor;


private final TopicSchema topicSchema; private final TopicSchema topicSchema;
private final String fqfn;


private interface PulsarSinkProcessor<T> { private interface PulsarSinkProcessor<T> {
void initializeOutputProducer(String outputTopic, Schema<T> schema) throws Exception; void initializeOutputProducer(String outputTopic, Schema<T> schema, String fqfn) throws Exception;


TypedMessageBuilder<T> newMessage(Record<T> record) throws Exception; TypedMessageBuilder<T> newMessage(Record<T> record) throws Exception;


Expand All @@ -72,9 +73,9 @@ private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor<T> {
private Producer<T> producer; private Producer<T> producer;


@Override @Override
public void initializeOutputProducer(String outputTopic, Schema<T> schema) throws Exception { public void initializeOutputProducer(String outputTopic, Schema<T> schema, String fqfn) throws Exception {
this.producer = AbstractOneOuputTopicProducers.createProducer( this.producer = AbstractOneOuputTopicProducers.createProducer(
client, pulsarSinkConfig.getTopic(), schema); client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
} }


@Override @Override
Expand Down Expand Up @@ -103,9 +104,9 @@ private class PulsarSinkAtLeastOnceProcessor implements PulsarSinkProcessor<T> {
private Producer<T> producer; private Producer<T> producer;


@Override @Override
public void initializeOutputProducer(String outputTopic, Schema<T> schema) throws Exception { public void initializeOutputProducer(String outputTopic, Schema<T> schema, String fqfn) throws Exception {
this.producer = AbstractOneOuputTopicProducers.createProducer( this.producer = AbstractOneOuputTopicProducers.createProducer(
client, pulsarSinkConfig.getTopic(), schema); client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
} }


@Override @Override
Expand Down Expand Up @@ -136,8 +137,8 @@ private class PulsarSinkEffectivelyOnceProcessor implements PulsarSinkProcessor<
protected Producers<T> outputProducer; protected Producers<T> outputProducer;


@Override @Override
public void initializeOutputProducer(String outputTopic, Schema<T> schema) throws Exception { public void initializeOutputProducer(String outputTopic, Schema<T> schema, String fqfn) throws Exception {
outputProducer = new MultiConsumersOneOuputTopicProducers<T>(client, outputTopic, schema); outputProducer = new MultiConsumersOneOuputTopicProducers<T>(client, outputTopic, schema, fqfn);
outputProducer.initialize(); outputProducer.initialize();
} }


Expand Down Expand Up @@ -195,10 +196,11 @@ public void becameInactive(Consumer<?> consumer, int partitionId) {
} }
} }


public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig) { public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, String fqfn) {
this.client = client; this.client = client;
this.pulsarSinkConfig = pulsarSinkConfig; this.pulsarSinkConfig = pulsarSinkConfig;
this.topicSchema = new TopicSchema(client); this.topicSchema = new TopicSchema(client);
this.fqfn = fqfn;
} }


@Override @Override
Expand All @@ -217,7 +219,7 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor(); this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor();
break; break;
} }
this.pulsarSinkProcessor.initializeOutputProducer(this.pulsarSinkConfig.getTopic(), schema); this.pulsarSinkProcessor.initializeOutputProducer(this.pulsarSinkConfig.getTopic(), schema, fqfn);
} }


@Override @Override
Expand Down
Expand Up @@ -22,10 +22,7 @@


import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;


import java.util.Collections; import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
Expand Down Expand Up @@ -58,11 +55,14 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
private List<String> inputTopics; private List<String> inputTopics;
private List<Consumer<T>> inputConsumers; private List<Consumer<T>> inputConsumers;
private final TopicSchema topicSchema; private final TopicSchema topicSchema;
private final String fqfn;


public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig) { public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig,
String fqfn) {
this.pulsarClient = pulsarClient; this.pulsarClient = pulsarClient;
this.pulsarSourceConfig = pulsarConfig; this.pulsarSourceConfig = pulsarConfig;
this.topicSchema = new TopicSchema(pulsarClient); this.topicSchema = new TopicSchema(pulsarClient);
this.fqfn = fqfn;
} }


@Override @Override
Expand All @@ -71,6 +71,10 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
log.info("Opening pulsar source with config: {}", pulsarSourceConfig); log.info("Opening pulsar source with config: {}", pulsarSourceConfig);
Map<String, ConsumerConfig<T>> configs = setupConsumerConfigs(); Map<String, ConsumerConfig<T>> configs = setupConsumerConfigs();


Map<String, String> properties = new HashMap<>();
properties.put("application", "pulsarfunction");
properties.put("fqfn", fqfn);

inputConsumers = configs.entrySet().stream().map(e -> { inputConsumers = configs.entrySet().stream().map(e -> {
String topic = e.getKey(); String topic = e.getKey();
ConsumerConfig<T> conf = e.getValue(); ConsumerConfig<T> conf = e.getValue();
Expand All @@ -87,6 +91,7 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
} else { } else {
cb.topic(topic); cb.topic(topic);
} }
cb.properties(properties);


if (pulsarSourceConfig.getTimeoutMs() != null) { if (pulsarSourceConfig.getTimeoutMs() != null) {
cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS); cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS);
Expand Down
Expand Up @@ -199,7 +199,7 @@ public void setup() throws Exception {
when(mockClient.newProducer(any(Schema.class))) when(mockClient.newProducer(any(Schema.class)))
.thenReturn(new MockProducerBuilder()); .thenReturn(new MockProducerBuilder());


producers = new MultiConsumersOneOuputTopicProducers<byte[]>(mockClient, TEST_OUTPUT_TOPIC, Schema.BYTES); producers = new MultiConsumersOneOuputTopicProducers<byte[]>(mockClient, TEST_OUTPUT_TOPIC, Schema.BYTES, "test");
producers.initialize(); producers.initialize();
} }


Expand Down
Expand Up @@ -117,7 +117,7 @@ public void testVoidOutputClasses() throws Exception {
PulsarSinkConfig pulsarConfig = getPulsarConfigs(); PulsarSinkConfig pulsarConfig = getPulsarConfigs();
// set type to void // set type to void
pulsarConfig.setTypeClassName(Void.class.getName()); pulsarConfig.setTypeClassName(Void.class.getName());
PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig); PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test");


try { try {
pulsarSink.initializeSchema(); pulsarSink.initializeSchema();
Expand All @@ -134,7 +134,7 @@ public void testInconsistentOutputType() throws IOException {
// set type to be inconsistent to that of SerDe // set type to be inconsistent to that of SerDe
pulsarConfig.setTypeClassName(Integer.class.getName()); pulsarConfig.setTypeClassName(Integer.class.getName());
pulsarConfig.setSerdeClassName(TestSerDe.class.getName()); pulsarConfig.setSerdeClassName(TestSerDe.class.getName());
PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig); PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test");
try { try {
pulsarSink.initializeSchema(); pulsarSink.initializeSchema();
fail("Should fail constructing java instance if function type is inconsistent with serde type"); fail("Should fail constructing java instance if function type is inconsistent with serde type");
Expand All @@ -156,7 +156,7 @@ public void testDefaultSerDe() throws PulsarClientException {
PulsarSinkConfig pulsarConfig = getPulsarConfigs(); PulsarSinkConfig pulsarConfig = getPulsarConfigs();
// set type to void // set type to void
pulsarConfig.setTypeClassName(String.class.getName()); pulsarConfig.setTypeClassName(String.class.getName());
PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig); PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test");


try { try {
pulsarSink.initializeSchema(); pulsarSink.initializeSchema();
Expand All @@ -175,7 +175,7 @@ public void testExplicitDefaultSerDe() throws PulsarClientException {
// set type to void // set type to void
pulsarConfig.setTypeClassName(String.class.getName()); pulsarConfig.setTypeClassName(String.class.getName());
pulsarConfig.setSerdeClassName(TopicSchema.DEFAULT_SERDE); pulsarConfig.setSerdeClassName(TopicSchema.DEFAULT_SERDE);
PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig); PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test");


try { try {
pulsarSink.initializeSchema(); pulsarSink.initializeSchema();
Expand All @@ -191,7 +191,7 @@ public void testComplexOuputType() throws PulsarClientException {
// set type to void // set type to void
pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName()); pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName());
pulsarConfig.setSerdeClassName(ComplexSerDe.class.getName()); pulsarConfig.setSerdeClassName(ComplexSerDe.class.getName());
PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig); PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test");


try { try {
pulsarSink.initializeSchema(); pulsarSink.initializeSchema();
Expand Down
Expand Up @@ -127,7 +127,7 @@ public void testVoidInputClasses() throws IOException {
PulsarSourceConfig pulsarConfig = getPulsarConfigs(); PulsarSourceConfig pulsarConfig = getPulsarConfigs();
// set type to void // set type to void
pulsarConfig.setTypeClassName(Void.class.getName()); pulsarConfig.setTypeClassName(Void.class.getName());
PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test");


try { try {
pulsarSource.open(new HashMap<>(), mock(SourceContext.class)); pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
Expand All @@ -153,7 +153,7 @@ public void testInconsistentInputType() throws IOException {
topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result",
ConsumerConfig.builder().serdeClassName(TestSerDe.class.getName()).build()); ConsumerConfig.builder().serdeClassName(TestSerDe.class.getName()).build());
pulsarConfig.setTopicSchema(topicSerdeClassNameMap); pulsarConfig.setTopicSchema(topicSerdeClassNameMap);
PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test");
try { try {
pulsarSource.open(new HashMap<>(), mock(SourceContext.class)); pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
fail("Should fail constructing java instance if function type is inconsistent with serde type"); fail("Should fail constructing java instance if function type is inconsistent with serde type");
Expand All @@ -178,7 +178,7 @@ public void testDefaultSerDe() throws Exception {
consumerConfigs.put("persistent://sample/standalone/ns1/test_result", consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build()); ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build());
pulsarConfig.setTopicSchema(consumerConfigs); pulsarConfig.setTopicSchema(consumerConfigs);
PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test");


pulsarSource.open(new HashMap<>(), mock(SourceContext.class)); pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
} }
Expand All @@ -194,7 +194,7 @@ public void testExplicitDefaultSerDe() throws Exception {
consumerConfigs.put("persistent://sample/standalone/ns1/test_result", consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build()); ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build());
pulsarConfig.setTopicSchema(consumerConfigs); pulsarConfig.setTopicSchema(consumerConfigs);
PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test");


pulsarSource.open(new HashMap<>(), mock(SourceContext.class)); pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
} }
Expand All @@ -207,7 +207,7 @@ public void testComplexOuputType() throws Exception {
consumerConfigs.put("persistent://sample/standalone/ns1/test_result", consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
ConsumerConfig.builder().serdeClassName(ComplexSerDe.class.getName()).build()); ConsumerConfig.builder().serdeClassName(ComplexSerDe.class.getName()).build());
pulsarConfig.setTopicSchema(consumerConfigs); pulsarConfig.setTopicSchema(consumerConfigs);
PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test");


pulsarSource.setupConsumerConfigs(); pulsarSource.setupConsumerConfigs();
} }
Expand Down

0 comments on commit affbe17

Please sign in to comment.