From 7b239ec838f36ef2d61a817f858d0b4711b6c219 Mon Sep 17 00:00:00 2001 From: Boyang Jerry Peng Date: Mon, 7 Jan 2019 14:01:46 -0800 Subject: [PATCH] Clean up and correct properties to producer and consumers created by Functions/Sinks/Sources (#3315) * clean up and correct properties to producer and consumers created by functions * fix test * cleaning up comment * refactoring --- .../functions/instance/ContextImpl.java | 13 +++++++++- .../functions/instance/InstanceUtils.java | 22 +++++++++++++++++ .../instance/JavaInstanceRunnable.java | 20 +++++++++++----- .../pulsar/functions/sink/PulsarSink.java | 19 +++++++-------- .../pulsar/functions/source/PulsarSource.java | 13 ++++------ .../instance/src/main/python/contextimpl.py | 7 +++++- .../src/main/python/python_instance.py | 24 +++++++++++++++---- .../instance/src/main/python/util.py | 6 +++++ .../pulsar/functions/sink/PulsarSinkTest.java | 18 +++++++------- .../functions/source/PulsarSourceTest.java | 11 +++++---- 10 files changed, 110 insertions(+), 43 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 460ba75ac7277..97956b86746d0 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -41,6 +41,7 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.secretsprovider.SecretsProvider; import org.apache.pulsar.functions.source.TopicSchema; +import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.SourceContext; @@ -95,6 +96,7 @@ class ContextImpl implements Context, SinkContext, SourceContext { userMetricsLabelNames = Arrays.copyOf(ComponentStatsManager.metricsLabelNames, ComponentStatsManager.metricsLabelNames.length + 1); userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length] = "metric"; } + private final Utils.ComponentType componentType; public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List inputTopics, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels, @@ -148,6 +150,7 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, Li .quantile(0.99, 0.01) .quantile(0.999, 0.01) .register(collectorRegistry); + this.componentType = componentType; } public void setCurrentMessageContext(Record record) { @@ -307,7 +310,15 @@ public CompletableFuture publish(String topicName, O object, Schema if (producer == null) { try { Producer newProducer = ((ProducerBuilderImpl) producerBuilder.clone()) - .schema(schema).topic(topicName).create(); + .schema(schema) + .topic(topicName) + .properties(InstanceUtils.getProperties(componentType, + FunctionDetailsUtils.getFullyQualifiedName( + this.config.getFunctionDetails().getTenant(), + this.config.getFunctionDetails().getNamespace(), + this.config.getFunctionDetails().getName()), + this.config.getInstanceId())) + .create(); Producer existingProducer = (Producer) publishProducers.putIfAbsent(topicName, newProducer); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java index 9e3273685f7d5..88a9df30f88f1 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java @@ -36,6 +36,9 @@ import net.jodah.typetools.TypeResolver; import org.apache.pulsar.functions.utils.Utils; +import java.util.HashMap; +import java.util.Map; + @UtilityClass public class InstanceUtils { public static SerDe initializeSerDe(String serdeClassName, ClassLoader clsLoader, Class typeArg, @@ -103,4 +106,23 @@ public Utils.ComponentType calculateSubjectType(Function.FunctionDetails functio } return SINK; } + + public static Map getProperties(Utils.ComponentType componentType, + String fullyQualifiedName, int instanceId) { + Map properties = new HashMap<>(); + switch (componentType) { + case FUNCTION: + properties.put("application", "pulsar-function"); + break; + case SOURCE: + properties.put("application", "pulsar-source"); + break; + case SINK: + properties.put("application", "pulsar-sink"); + break; + } + properties.put("id", fullyQualifiedName); + properties.put("instance_id", String.valueOf(instanceId)); + return properties; + } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 7c36b584d02ce..fde96282eac17 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -132,6 +132,10 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private InstanceCache instanceCache; + private final Utils.ComponentType componentType; + + private final Map properties; + public JavaInstanceRunnable(InstanceConfig instanceConfig, FunctionCacheManager fnCache, String jarFile, @@ -156,6 +160,12 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig, FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()) }; + this.componentType = InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails()); + + this.properties = InstanceUtils.getProperties(this.componentType, + FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()), + this.instanceConfig.getInstanceId()); + // Declare function local collector registry so that it will not clash with other function instances' // metrics collection especially in threaded mode // In process mode the JavaInstanceMain will declare a CollectorRegistry and pass it down @@ -205,7 +215,7 @@ ContextImpl setupContext() { Logger instanceLog = LoggerFactory.getLogger( "function-" + instanceConfig.getFunctionDetails().getName()); return new ContextImpl(instanceConfig, instanceLog, client, inputTopics, secretsProvider, - collectorRegistry, metricsLabels, InstanceUtils.calculateSubjectType(this.instanceConfig.getFunctionDetails())); + collectorRegistry, metricsLabels, this.componentType); } /** @@ -221,7 +231,7 @@ public void run() { } this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels, this.instanceCache.getScheduledExecutorService(), - InstanceUtils.calculateSubjectType(this.instanceConfig.getFunctionDetails())); + this.componentType); ContextImpl contextImpl = setupContext(); javaInstance = setupJavaInstance(contextImpl); @@ -648,8 +658,7 @@ public void setupInput(ContextImpl contextImpl) throws Exception { pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries()); pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic()); } - object = new PulsarSource(this.client, pulsarSourceConfig, - FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails())); + object = new PulsarSource(this.client, pulsarSourceConfig, this.properties); } else { object = Reflections.createInstance( sourceSpec.getClassName(), @@ -695,8 +704,7 @@ public void setupOutput(ContextImpl contextImpl) throws Exception { pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName()); - object = new PulsarSink(this.client, pulsarSinkConfig, - FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails())); + object = new PulsarSink(this.client, pulsarSinkConfig, this.properties); } } else { object = Reflections.createInstance( diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index 1c61b45af9a02..3195a82b742ea 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -32,11 +32,13 @@ import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.instance.FunctionResultRouter; +import org.apache.pulsar.functions.instance.InstanceUtils; import org.apache.pulsar.functions.instance.SinkRecord; import org.apache.pulsar.functions.source.PulsarRecord; import org.apache.pulsar.functions.source.TopicSchema; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.functions.utils.Reflections; +import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.SinkContext; @@ -54,12 +56,12 @@ public class PulsarSink implements Sink { private final PulsarClient client; private final PulsarSinkConfig pulsarSinkConfig; + private final Map properties; @VisibleForTesting PulsarSinkProcessor pulsarSinkProcessor; private final TopicSchema topicSchema; - private final String fqfn; private interface PulsarSinkProcessor { @@ -78,7 +80,7 @@ protected PulsarSinkProcessorBase(Schema schema) { this.schema = schema; } - public Producer createProducer(PulsarClient client, String topic, String producerName, Schema schema, String fqfn) + public Producer createProducer(PulsarClient client, String topic, String producerName, Schema schema) throws PulsarClientException { ProducerBuilder builder = client.newProducer(schema) .blockIfQueueFull(true) @@ -96,9 +98,7 @@ public Producer createProducer(PulsarClient client, String topic, String builder.producerName(producerName); } - return builder - .property("application", "pulsarfunction") - .property("fqfn", fqfn).create(); + return builder.properties(properties).create(); } protected Producer getProducer(String destinationTopic) { @@ -112,8 +112,7 @@ protected Producer getProducer(String producerId, String producerName, String client, topicName, producerName, - schema, - fqfn); + schema); } catch (PulsarClientException e) { log.error("Failed to create Producer while doing user publish", e); throw new RuntimeException(e); @@ -143,7 +142,7 @@ public PulsarSinkAtMostOnceProcessor(Schema schema) { // initialize default topic try { publishProducers.put(pulsarSinkConfig.getTopic(), - createProducer(client, pulsarSinkConfig.getTopic(), null, schema, fqfn)); + createProducer(client, pulsarSinkConfig.getTopic(), null, schema)); } catch (PulsarClientException e) { log.error("Failed to create Producer while doing user publish", e); throw new RuntimeException(e); } @@ -209,11 +208,11 @@ public void sendOutputMessage(TypedMessageBuilder msg, Record record) } } - public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, String fqfn) { + public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, Map properties) { this.client = client; this.pulsarSinkConfig = pulsarSinkConfig; this.topicSchema = new TopicSchema(client); - this.fqfn = fqfn; + this.properties = properties; } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index ff41dc846db59..869c706f63684 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -36,7 +36,9 @@ import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.functions.instance.InstanceUtils; import org.apache.pulsar.functions.utils.Reflections; +import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.io.core.PushSource; import org.apache.pulsar.io.core.SourceContext; @@ -45,17 +47,16 @@ public class PulsarSource extends PushSource implements MessageListener private final PulsarClient pulsarClient; private final PulsarSourceConfig pulsarSourceConfig; + private final Map properties; private List inputTopics; private List> inputConsumers; private final TopicSchema topicSchema; - private final String fqfn; - public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, - String fqfn) { + public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, Map properties) { this.pulsarClient = pulsarClient; this.pulsarSourceConfig = pulsarConfig; this.topicSchema = new TopicSchema(pulsarClient); - this.fqfn = fqfn; + this.properties = properties; } @Override @@ -64,10 +65,6 @@ public void open(Map config, SourceContext sourceContext) throws log.info("Opening pulsar source with config: {}", pulsarSourceConfig); Map> configs = setupConsumerConfigs(); - Map properties = new HashMap<>(); - properties.put("application", "pulsarfunction"); - properties.put("fqfn", fqfn); - inputConsumers = configs.entrySet().stream().map(e -> { String topic = e.getKey(); ConsumerConfig conf = e.getValue(); diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py index 63332d0171deb..6b56163c36d86 100644 --- a/pulsar-functions/instance/src/main/python/contextimpl.py +++ b/pulsar-functions/instance/src/main/python/contextimpl.py @@ -147,7 +147,12 @@ def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", p batching_enabled=True, batching_max_publish_delay_ms=1, max_pending_messages=100000, - compression_type=pulsar_compression_type + compression_type=pulsar_compression_type, + properties=util.get_properties(util.getFullyQualifiedFunctionName( + self.instance_config.function_details.tenant, + self.instance_config.function_details.namespace, + self.instance_config.function_details.name), + self.instance_config.instance_id) ) if serde_class_name not in self.publish_serializers: diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index d4c3da5a953cd..cfd9eae48387c 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -122,6 +122,13 @@ def run(self): subscription_name = str(self.instance_config.function_details.tenant) + "/" + \ str(self.instance_config.function_details.namespace) + "/" + \ str(self.instance_config.function_details.name) + + properties = util.get_properties(util.getFullyQualifiedFunctionName( + self.instance_config.function_details.tenant, + self.instance_config.function_details.namespace, + self.instance_config.function_details.name), + self.instance_config.instance_id) + for topic, serde in self.instance_config.function_details.source.topicsToSerDeClassName.items(): if not serde: serde_kclass = util.import_class(os.path.dirname(self.user_code), DEFAULT_SERIALIZER) @@ -133,7 +140,8 @@ def run(self): str(topic), subscription_name, consumer_type=mode, message_listener=partial(self.message_listener, self.input_serdes[topic]), - unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None + unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None, + properties=properties ) for topic, consumer_conf in self.instance_config.function_details.source.inputSpecs.items(): @@ -148,14 +156,16 @@ def run(self): re.compile(str(topic)), subscription_name, consumer_type=mode, message_listener=partial(self.message_listener, self.input_serdes[topic]), - unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None + unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None, + properties=properties ) else: self.consumers[topic] = self.pulsar_client.subscribe( str(topic), subscription_name, consumer_type=mode, message_listener=partial(self.message_listener, self.input_serdes[topic]), - unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None + unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None, + properties=properties ) function_kclass = util.import_class(os.path.dirname(self.user_code), self.instance_config.function_details.className) @@ -271,7 +281,13 @@ def setup_producer(self): # set send timeout to be infinity to prevent potential deadlock with consumer # that might happen when consumer is blocked due to unacked messages send_timeout_millis=0, - max_pending_messages=100000) + max_pending_messages=100000, + properties=util.get_properties(util.getFullyQualifiedFunctionName( + self.instance_config.function_details.tenant, + self.instance_config.function_details.namespace, + self.instance_config.function_details.name), + self.instance_config.instance_id) + ) def message_listener(self, serde, consumer, message): # increment number of received records from source diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py index 76f75bd2285db..0978f39ac2470 100644 --- a/pulsar-functions/instance/src/main/python/util.py +++ b/pulsar-functions/instance/src/main/python/util.py @@ -68,6 +68,12 @@ def import_class_from_path(from_path, full_class_name): def getFullyQualifiedFunctionName(tenant, namespace, name): return "%s/%s/%s" % (tenant, namespace, name) +def getFullyQualifiedInstanceId(tenant, namespace, name, instance_id): + return "%s/%s/%s:%s" % (tenant, namespace, name, instance_id) + +def get_properties(fullyQualifiedName, instanceId): + return {"application": "pulsar-function", "id": str(fullyQualifiedName), "instance_id": str(instanceId)} + class FixedTimer(): def __init__(self, t, hFunction): diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java index fb3d2c2af62a1..53cfeb68557d1 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java @@ -49,6 +49,7 @@ import org.apache.pulsar.functions.instance.SinkRecord; import org.apache.pulsar.functions.source.TopicSchema; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.io.core.SinkContext; import org.mockito.ArgumentMatcher; import org.testng.Assert; @@ -100,6 +101,7 @@ private static PulsarClientImpl getPulsarClient() throws PulsarClientException { doReturn(producerBuilder).when(producerBuilder).topic(anyString()); doReturn(producerBuilder).when(producerBuilder).producerName(anyString()); doReturn(producerBuilder).when(producerBuilder).property(anyString(), anyString()); + doReturn(producerBuilder).when(producerBuilder).properties(any()); doReturn(producerBuilder).when(producerBuilder).sendTimeout(anyInt(), any()); CompletableFuture completableFuture = new CompletableFuture<>(); @@ -158,7 +160,7 @@ public void testVoidOutputClasses() throws Exception { PulsarSinkConfig pulsarConfig = getPulsarConfigs(); // set type to void pulsarConfig.setTypeClassName(Void.class.getName()); - PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test"); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>()); try { Schema schema = pulsarSink.initializeSchema(); @@ -176,7 +178,7 @@ public void testInconsistentOutputType() throws IOException { // set type to be inconsistent to that of SerDe pulsarConfig.setTypeClassName(Integer.class.getName()); pulsarConfig.setSerdeClassName(TestSerDe.class.getName()); - PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test"); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>()); try { pulsarSink.initializeSchema(); fail("Should fail constructing java instance if function type is inconsistent with serde type"); @@ -198,7 +200,7 @@ public void testDefaultSerDe() throws PulsarClientException { PulsarSinkConfig pulsarConfig = getPulsarConfigs(); // set type to void pulsarConfig.setTypeClassName(String.class.getName()); - PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test"); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>()); try { pulsarSink.initializeSchema(); @@ -217,7 +219,7 @@ public void testExplicitDefaultSerDe() throws PulsarClientException { // set type to void pulsarConfig.setTypeClassName(String.class.getName()); pulsarConfig.setSerdeClassName(TopicSchema.DEFAULT_SERDE); - PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test"); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>()); try { pulsarSink.initializeSchema(); @@ -233,7 +235,7 @@ public void testComplexOuputType() throws PulsarClientException { // set type to void pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName()); pulsarConfig.setSerdeClassName(ComplexSerDe.class.getName()); - PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test"); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>()); try { pulsarSink.initializeSchema(); @@ -257,7 +259,7 @@ public void testSinkAndMessageRouting() throws Exception { /** test At-least-once **/ pulsarClient = getPulsarClient(); pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); - PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, "test"); + PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>()); pulsarSink.open(new HashMap<>(), mock(SinkContext.class)); @@ -314,7 +316,7 @@ public boolean matches(Object o) { /** test At-most-once **/ pulsarClient = getPulsarClient(); pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATMOST_ONCE); - pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, "test"); + pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>()); pulsarSink.open(new HashMap<>(), mock(SinkContext.class)); @@ -371,7 +373,7 @@ public boolean matches(Object o) { /** test Effectively-once **/ pulsarClient = getPulsarClient(); pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE); - pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, "test"); + pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>()); pulsarSink.open(new HashMap<>(), mock(SinkContext.class)); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java index 8e59e002cbf35..88c9637c31f43 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java @@ -46,6 +46,7 @@ import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.common.functions.ConsumerConfig; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.io.core.SourceContext; import org.testng.annotations.Test; @@ -125,7 +126,7 @@ public void testVoidInputClasses() throws IOException { PulsarSourceConfig pulsarConfig = getPulsarConfigs(); // set type to void pulsarConfig.setTypeClassName(Void.class.getName()); - PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test"); + PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>()); try { pulsarSource.open(new HashMap<>(), mock(SourceContext.class)); @@ -151,7 +152,7 @@ public void testInconsistentInputType() throws IOException { topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", ConsumerConfig.builder().serdeClassName(TestSerDe.class.getName()).build()); pulsarConfig.setTopicSchema(topicSerdeClassNameMap); - PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test"); + PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>()); try { pulsarSource.open(new HashMap<>(), mock(SourceContext.class)); fail("Should fail constructing java instance if function type is inconsistent with serde type"); @@ -176,7 +177,7 @@ public void testDefaultSerDe() throws Exception { consumerConfigs.put("persistent://sample/standalone/ns1/test_result", ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build()); pulsarConfig.setTopicSchema(consumerConfigs); - PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test"); + PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>()); pulsarSource.open(new HashMap<>(), mock(SourceContext.class)); } @@ -192,7 +193,7 @@ public void testExplicitDefaultSerDe() throws Exception { consumerConfigs.put("persistent://sample/standalone/ns1/test_result", ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build()); pulsarConfig.setTopicSchema(consumerConfigs); - PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test"); + PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>()); pulsarSource.open(new HashMap<>(), mock(SourceContext.class)); } @@ -205,7 +206,7 @@ public void testComplexOuputType() throws Exception { consumerConfigs.put("persistent://sample/standalone/ns1/test_result", ConsumerConfig.builder().serdeClassName(ComplexSerDe.class.getName()).build()); pulsarConfig.setTopicSchema(consumerConfigs); - PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test"); + PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>()); pulsarSource.setupConsumerConfigs(); }