Skip to content

Commit

Permalink
Clean up and correct properties to producer and consumers created by …
Browse files Browse the repository at this point in the history
…Functions/Sinks/Sources (#3315)

* clean up and correct properties to producer and consumers created by functions

* fix test

* cleaning up comment

* refactoring
  • Loading branch information
jerrypeng committed Jan 7, 2019
1 parent e3b03fb commit 7b239ec
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 43 deletions.
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
Expand Down Expand Up @@ -95,6 +96,7 @@ class ContextImpl implements Context, SinkContext, SourceContext {
userMetricsLabelNames = Arrays.copyOf(ComponentStatsManager.metricsLabelNames, ComponentStatsManager.metricsLabelNames.length + 1);
userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length] = "metric";
}
private final Utils.ComponentType componentType;

public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
Expand Down Expand Up @@ -148,6 +150,7 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, Li
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.register(collectorRegistry);
this.componentType = componentType;
}

public void setCurrentMessageContext(Record<?> record) {
Expand Down Expand Up @@ -307,7 +310,15 @@ public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O>
if (producer == null) {
try {
Producer<O> newProducer = ((ProducerBuilderImpl<O>) producerBuilder.clone())
.schema(schema).topic(topicName).create();
.schema(schema)
.topic(topicName)
.properties(InstanceUtils.getProperties(componentType,
FunctionDetailsUtils.getFullyQualifiedName(
this.config.getFunctionDetails().getTenant(),
this.config.getFunctionDetails().getNamespace(),
this.config.getFunctionDetails().getName()),
this.config.getInstanceId()))
.create();

Producer<O> existingProducer = (Producer<O>) publishProducers.putIfAbsent(topicName, newProducer);

Expand Down
Expand Up @@ -36,6 +36,9 @@
import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.functions.utils.Utils;

import java.util.HashMap;
import java.util.Map;

@UtilityClass
public class InstanceUtils {
public static SerDe<?> initializeSerDe(String serdeClassName, ClassLoader clsLoader, Class<?> typeArg,
Expand Down Expand Up @@ -103,4 +106,23 @@ public Utils.ComponentType calculateSubjectType(Function.FunctionDetails functio
}
return SINK;
}

public static Map<String, String> getProperties(Utils.ComponentType componentType,
String fullyQualifiedName, int instanceId) {
Map<String, String> properties = new HashMap<>();
switch (componentType) {
case FUNCTION:
properties.put("application", "pulsar-function");
break;
case SOURCE:
properties.put("application", "pulsar-source");
break;
case SINK:
properties.put("application", "pulsar-sink");
break;
}
properties.put("id", fullyQualifiedName);
properties.put("instance_id", String.valueOf(instanceId));
return properties;
}
}
Expand Up @@ -132,6 +132,10 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {

private InstanceCache instanceCache;

private final Utils.ComponentType componentType;

private final Map<String, String> properties;

public JavaInstanceRunnable(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
String jarFile,
Expand All @@ -156,6 +160,12 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig,
FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails())
};

this.componentType = InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails());

this.properties = InstanceUtils.getProperties(this.componentType,
FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()),
this.instanceConfig.getInstanceId());

// Declare function local collector registry so that it will not clash with other function instances'
// metrics collection especially in threaded mode
// In process mode the JavaInstanceMain will declare a CollectorRegistry and pass it down
Expand Down Expand Up @@ -205,7 +215,7 @@ ContextImpl setupContext() {
Logger instanceLog = LoggerFactory.getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
return new ContextImpl(instanceConfig, instanceLog, client, inputTopics, secretsProvider,
collectorRegistry, metricsLabels, InstanceUtils.calculateSubjectType(this.instanceConfig.getFunctionDetails()));
collectorRegistry, metricsLabels, this.componentType);
}

/**
Expand All @@ -221,7 +231,7 @@ public void run() {
}
this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels,
this.instanceCache.getScheduledExecutorService(),
InstanceUtils.calculateSubjectType(this.instanceConfig.getFunctionDetails()));
this.componentType);

ContextImpl contextImpl = setupContext();
javaInstance = setupJavaInstance(contextImpl);
Expand Down Expand Up @@ -648,8 +658,7 @@ public void setupInput(ContextImpl contextImpl) throws Exception {
pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
}
object = new PulsarSource(this.client, pulsarSourceConfig,
FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
object = new PulsarSource(this.client, pulsarSourceConfig, this.properties);
} else {
object = Reflections.createInstance(
sourceSpec.getClassName(),
Expand Down Expand Up @@ -695,8 +704,7 @@ public void setupOutput(ContextImpl contextImpl) throws Exception {

pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());

object = new PulsarSink(this.client, pulsarSinkConfig,
FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
object = new PulsarSink(this.client, pulsarSinkConfig, this.properties);
}
} else {
object = Reflections.createInstance(
Expand Down
Expand Up @@ -32,11 +32,13 @@
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.FunctionResultRouter;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.instance.SinkRecord;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;

Expand All @@ -54,12 +56,12 @@ public class PulsarSink<T> implements Sink<T> {

private final PulsarClient client;
private final PulsarSinkConfig pulsarSinkConfig;
private final Map<String, String> properties;

@VisibleForTesting
PulsarSinkProcessor<T> pulsarSinkProcessor;

private final TopicSchema topicSchema;
private final String fqfn;

private interface PulsarSinkProcessor<T> {

Expand All @@ -78,7 +80,7 @@ protected PulsarSinkProcessorBase(Schema schema) {
this.schema = schema;
}

public <T> Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema, String fqfn)
public <T> Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema)
throws PulsarClientException {
ProducerBuilder<T> builder = client.newProducer(schema)
.blockIfQueueFull(true)
Expand All @@ -96,9 +98,7 @@ public <T> Producer<T> createProducer(PulsarClient client, String topic, String
builder.producerName(producerName);
}

return builder
.property("application", "pulsarfunction")
.property("fqfn", fqfn).create();
return builder.properties(properties).create();
}

protected Producer<T> getProducer(String destinationTopic) {
Expand All @@ -112,8 +112,7 @@ protected Producer<T> getProducer(String producerId, String producerName, String
client,
topicName,
producerName,
schema,
fqfn);
schema);
} catch (PulsarClientException e) {
log.error("Failed to create Producer while doing user publish", e);
throw new RuntimeException(e);
Expand Down Expand Up @@ -143,7 +142,7 @@ public PulsarSinkAtMostOnceProcessor(Schema schema) {
// initialize default topic
try {
publishProducers.put(pulsarSinkConfig.getTopic(),
createProducer(client, pulsarSinkConfig.getTopic(), null, schema, fqfn));
createProducer(client, pulsarSinkConfig.getTopic(), null, schema));
} catch (PulsarClientException e) {
log.error("Failed to create Producer while doing user publish", e);
throw new RuntimeException(e); }
Expand Down Expand Up @@ -209,11 +208,11 @@ public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record)
}
}

public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, String fqfn) {
public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, Map<String, String> properties) {
this.client = client;
this.pulsarSinkConfig = pulsarSinkConfig;
this.topicSchema = new TopicSchema(client);
this.fqfn = fqfn;
this.properties = properties;
}

@Override
Expand Down
Expand Up @@ -36,7 +36,9 @@
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;

Expand All @@ -45,17 +47,16 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>

private final PulsarClient pulsarClient;
private final PulsarSourceConfig pulsarSourceConfig;
private final Map<String, String> properties;
private List<String> inputTopics;
private List<Consumer<T>> inputConsumers;
private final TopicSchema topicSchema;
private final String fqfn;

public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig,
String fqfn) {
public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, Map<String, String> properties) {
this.pulsarClient = pulsarClient;
this.pulsarSourceConfig = pulsarConfig;
this.topicSchema = new TopicSchema(pulsarClient);
this.fqfn = fqfn;
this.properties = properties;
}

@Override
Expand All @@ -64,10 +65,6 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
log.info("Opening pulsar source with config: {}", pulsarSourceConfig);
Map<String, ConsumerConfig<T>> configs = setupConsumerConfigs();

Map<String, String> properties = new HashMap<>();
properties.put("application", "pulsarfunction");
properties.put("fqfn", fqfn);

inputConsumers = configs.entrySet().stream().map(e -> {
String topic = e.getKey();
ConsumerConfig<T> conf = e.getValue();
Expand Down
7 changes: 6 additions & 1 deletion pulsar-functions/instance/src/main/python/contextimpl.py
Expand Up @@ -147,7 +147,12 @@ def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", p
batching_enabled=True,
batching_max_publish_delay_ms=1,
max_pending_messages=100000,
compression_type=pulsar_compression_type
compression_type=pulsar_compression_type,
properties=util.get_properties(util.getFullyQualifiedFunctionName(
self.instance_config.function_details.tenant,
self.instance_config.function_details.namespace,
self.instance_config.function_details.name),
self.instance_config.instance_id)
)

if serde_class_name not in self.publish_serializers:
Expand Down
24 changes: 20 additions & 4 deletions pulsar-functions/instance/src/main/python/python_instance.py
Expand Up @@ -122,6 +122,13 @@ def run(self):
subscription_name = str(self.instance_config.function_details.tenant) + "/" + \
str(self.instance_config.function_details.namespace) + "/" + \
str(self.instance_config.function_details.name)

properties = util.get_properties(util.getFullyQualifiedFunctionName(
self.instance_config.function_details.tenant,
self.instance_config.function_details.namespace,
self.instance_config.function_details.name),
self.instance_config.instance_id)

for topic, serde in self.instance_config.function_details.source.topicsToSerDeClassName.items():
if not serde:
serde_kclass = util.import_class(os.path.dirname(self.user_code), DEFAULT_SERIALIZER)
Expand All @@ -133,7 +140,8 @@ def run(self):
str(topic), subscription_name,
consumer_type=mode,
message_listener=partial(self.message_listener, self.input_serdes[topic]),
unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None,
properties=properties
)

for topic, consumer_conf in self.instance_config.function_details.source.inputSpecs.items():
Expand All @@ -148,14 +156,16 @@ def run(self):
re.compile(str(topic)), subscription_name,
consumer_type=mode,
message_listener=partial(self.message_listener, self.input_serdes[topic]),
unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None,
properties=properties
)
else:
self.consumers[topic] = self.pulsar_client.subscribe(
str(topic), subscription_name,
consumer_type=mode,
message_listener=partial(self.message_listener, self.input_serdes[topic]),
unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None,
properties=properties
)

function_kclass = util.import_class(os.path.dirname(self.user_code), self.instance_config.function_details.className)
Expand Down Expand Up @@ -271,7 +281,13 @@ def setup_producer(self):
# set send timeout to be infinity to prevent potential deadlock with consumer
# that might happen when consumer is blocked due to unacked messages
send_timeout_millis=0,
max_pending_messages=100000)
max_pending_messages=100000,
properties=util.get_properties(util.getFullyQualifiedFunctionName(
self.instance_config.function_details.tenant,
self.instance_config.function_details.namespace,
self.instance_config.function_details.name),
self.instance_config.instance_id)
)

def message_listener(self, serde, consumer, message):
# increment number of received records from source
Expand Down
6 changes: 6 additions & 0 deletions pulsar-functions/instance/src/main/python/util.py
Expand Up @@ -68,6 +68,12 @@ def import_class_from_path(from_path, full_class_name):
def getFullyQualifiedFunctionName(tenant, namespace, name):
return "%s/%s/%s" % (tenant, namespace, name)

def getFullyQualifiedInstanceId(tenant, namespace, name, instance_id):
return "%s/%s/%s:%s" % (tenant, namespace, name, instance_id)

def get_properties(fullyQualifiedName, instanceId):
return {"application": "pulsar-function", "id": str(fullyQualifiedName), "instance_id": str(instanceId)}

class FixedTimer():

def __init__(self, t, hFunction):
Expand Down

0 comments on commit 7b239ec

Please sign in to comment.