From 9ccaf2c4b8409ae37960cfb9ed16fb60d3924a7f Mon Sep 17 00:00:00 2001 From: Sanjeev Kulkarni Date: Wed, 3 Oct 2018 17:33:55 -0700 Subject: [PATCH] Function Serverside Validation (#2701) * First cut of the serverside valiation * Revert unneeded changes * Do the easier checks first * Check for null * Simplify * Do the actual validation * Check for null runtime * Update Test * Fix build * Fix build * misc fixes * Add explicit check for serde * Changed for a better condition * Better serde check * Fixed some unittests * Fixed more tests * Fixed unittest * Fixed unittests * Fixed unittest * Stricter checks --- .../broker/admin/impl/FunctionsBase.java | 10 +- .../apache/pulsar/admin/cli/CmdFunctions.java | 224 ++------------- .../org/apache/pulsar/admin/cli/CmdSinks.java | 188 ++---------- .../apache/pulsar/admin/cli/CmdSources.java | 136 ++------- .../apache/pulsar/admin/cli/TestCmdSinks.java | 4 +- .../pulsar/admin/cli/TestCmdSources.java | 4 +- .../functions/utils/FunctionConfig.java | 3 +- .../functions/utils/FunctionConfigUtils.java | 199 +++++++++++++ .../functions/utils/SinkConfigUtils.java | 181 ++++++++++++ .../functions/utils/SourceConfigUtils.java | 136 +++++++++ .../apache/pulsar/functions/utils/Utils.java | 13 +- .../utils/validation/ConfigValidation.java | 18 +- .../ConfigValidationAnnotations.java | 11 + .../functions/utils/validation/Validator.java | 2 +- .../utils/validation/ValidatorImpls.java | 173 ++++++----- .../apache/pulsar/functions/worker/Utils.java | 11 +- .../worker/rest/api/FunctionsImpl.java | 227 +++++++++------ .../rest/api/v2/FunctionApiV2Resource.java | 10 +- .../api/v2/FunctionApiV2ResourceTest.java | 271 +++++++++++------- 19 files changed, 1014 insertions(+), 807 deletions(-) create mode 100644 pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java create mode 100644 pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java create mode 100644 pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 80f2df552b1e1..cff3c34651e81 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -80,10 +80,11 @@ public Response registerFunction(final @PathParam("tenant") String tenant, final @FormDataParam("data") InputStream uploadedInputStream, final @FormDataParam("data") FormDataContentDisposition fileDetail, final @FormDataParam("url") String functionPkgUrl, - final @FormDataParam("functionDetails") String functionDetailsJson) { + final @FormDataParam("functionDetails") String functionDetailsJson, + final @FormDataParam("functionConfig") String functionConfigJson) { return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, - functionPkgUrl, functionDetailsJson, clientAppId()); + functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId()); } @PUT @@ -101,10 +102,11 @@ public Response updateFunction(final @PathParam("tenant") String tenant, final @FormDataParam("data") InputStream uploadedInputStream, final @FormDataParam("data") FormDataContentDisposition fileDetail, final @FormDataParam("url") String functionPkgUrl, - final @FormDataParam("functionDetails") String functionDetailsJson) { + final @FormDataParam("functionDetails") String functionDetailsJson, + final @FormDataParam("functionConfig") String functionConfigJson) { return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, - functionPkgUrl, functionDetailsJson, clientAppId()); + functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId()); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 9e3c6e8ece0c6..c771e7b5d455b 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -45,7 +45,6 @@ import io.netty.buffer.Unpooled; import java.io.File; -import java.io.IOException; import java.lang.reflect.Type; import java.net.MalformedURLException; import java.util.Arrays; @@ -84,10 +83,7 @@ import org.apache.pulsar.functions.proto.Function.SubscriptionType; import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory; import org.apache.pulsar.functions.runtime.RuntimeSpawner; -import org.apache.pulsar.functions.utils.FunctionConfig; -import org.apache.pulsar.functions.utils.Reflections; -import org.apache.pulsar.functions.utils.Utils; -import org.apache.pulsar.functions.utils.WindowConfig; +import org.apache.pulsar.functions.utils.*; import org.apache.pulsar.functions.utils.FunctionConfig.ProcessingGuarantees; import org.apache.pulsar.functions.utils.validation.ConfigValidation; import org.apache.pulsar.functions.utils.validation.ValidatorImpls.ImplementsClassesValidator; @@ -229,8 +225,8 @@ abstract class FunctionDetailsCommand extends BaseCommand { description = "Path to the main Python file/Python Wheel file for the function (if the function is written in Python)", listConverter = StringConverter.class) protected String pyFile; - @Parameter(names = { "-i", - "--inputs" }, description = "The function's input topic or topics (multiple topics can be specified as a comma-separated list)") + @Parameter(names = {"-i", + "--inputs"}, description = "The function's input topic or topics (multiple topics can be specified as a comma-separated list)") protected String inputs; // for backwards compatibility purposes @Parameter(names = "--topicsPattern", description = "TopicsPattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topic-pattern] are mutually exclusive. Add SerDe class name for a pattern in --custom-serde-inputs (supported for java fun only)", hidden = true) @@ -326,6 +322,8 @@ abstract class FunctionDetailsCommand extends BaseCommand { protected String deadLetterTopic; protected FunctionConfig functionConfig; protected String userCodeFile; + // The classLoader associated with this function defn + protected ClassLoader classLoader; private void mergeArgs() { @@ -379,12 +377,12 @@ void processArguments() throws Exception { functionConfig.setInputs(inputTopics); } if (null != customSerdeInputString) { - Type type = new TypeToken>(){}.getType(); + Type type = new TypeToken>() {}.getType(); Map customSerdeInputMap = new Gson().fromJson(customSerdeInputString, type); functionConfig.setCustomSerdeInputs(customSerdeInputMap); } if (null != customSchemaInputString) { - Type type = new TypeToken>(){}.getType(); + Type type = new TypeToken>() {}.getType(); Map customschemaInputMap = new Gson().fromJson(customSchemaInputString, type); functionConfig.setCustomSchemaInputs(customschemaInputMap); } @@ -412,13 +410,13 @@ void processArguments() throws Exception { } functionConfig.setRetainOrdering(retainOrdering); - + if (isNotBlank(subsName)) { functionConfig.setSubName(subsName); } if (null != userConfigString) { - Type type = new TypeToken>(){}.getType(); + Type type = new TypeToken>() {}.getType(); Map userConfigMap = new Gson().fromJson(userConfigString, type); functionConfig.setUserConfig(userConfigMap); } @@ -490,6 +488,9 @@ void processArguments() throws Exception { // infer default vaues inferMissingArguments(functionConfig); + + // check if configs are valid + validateFunctionConfigs(functionConfig); } protected void validateFunctionConfigs(FunctionConfig functionConfig) { @@ -532,26 +533,22 @@ protected void validateFunctionConfigs(FunctionConfig functionConfig) { if (jarFilePath != null) { File file = new File(jarFilePath); - ClassLoader userJarLoader; try { - userJarLoader = Reflections.loadJar(file); + classLoader = Reflections.loadJar(file); } catch (MalformedURLException e) { throw new ParameterException( "Failed to load user jar " + file + " with error " + e.getMessage()); } - // make sure the function class loader is accessible thread-locally - Thread.currentThread().setContextClassLoader(userJarLoader); - (new ImplementsClassesValidator(Function.class, java.util.function.Function.class)) - .validateField("className", functionConfig.getClassName()); + .validateField("className", functionConfig.getClassName(), classLoader); } } try { // Need to load jar and set context class loader before calling - ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name()); + ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name(), classLoader); } catch (Exception e) { - throw new ParameterException(e.getMessage()); + throw new IllegalArgumentException(e.getMessage()); } } @@ -588,7 +585,7 @@ private void inferMissingFunctionName(FunctionConfig functionConfig) { throw new ParameterException("You must specify a class name for the function"); } - String [] domains = functionConfig.getClassName().split("\\."); + String[] domains = functionConfig.getClassName().split("\\."); if (domains.length == 0) { functionConfig.setName(functionConfig.getClassName()); } else { @@ -603,183 +600,6 @@ private void inferMissingTenant(FunctionConfig functionConfig) { private void inferMissingNamespace(FunctionConfig functionConfig) { functionConfig.setNamespace(DEFAULT_NAMESPACE); } - - protected FunctionDetails convert(FunctionConfig functionConfig) - throws IOException { - - // check if configs are valid - validateFunctionConfigs(functionConfig); - - Class[] typeArgs = null; - if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { - if (functionConfig.getJar().startsWith(Utils.FILE)) { - // server derives the arg-type by loading a class - if (isBlank(functionConfig.getClassName())) { - throw new ParameterException("Class-name must be present for jar with file-url"); - } - } else { - typeArgs = Utils.getFunctionTypes(functionConfig); - } - } - - FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); - - // Setup source - SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder(); - if (functionConfig.getInputs() != null) { - functionConfig.getInputs().forEach((topicName -> { - sourceSpecBuilder.putInputSpecs(topicName, - ConsumerSpec.newBuilder() - .setIsRegexPattern(false) - .build()); - })); - } - if (functionConfig.getTopicsPattern() != null && !functionConfig.getTopicsPattern().isEmpty()) { - sourceSpecBuilder.putInputSpecs(functionConfig.getTopicsPattern(), - ConsumerSpec.newBuilder() - .setIsRegexPattern(true) - .build()); - } - if (functionConfig.getCustomSerdeInputs() != null) { - functionConfig.getCustomSerdeInputs().forEach((topicName, serdeClassName) -> { - sourceSpecBuilder.putInputSpecs(topicName, - ConsumerSpec.newBuilder() - .setSerdeClassName(serdeClassName) - .setIsRegexPattern(false) - .build()); - }); - } - if (functionConfig.getCustomSchemaInputs() != null) { - functionConfig.getCustomSchemaInputs().forEach((topicName, schemaType) -> { - sourceSpecBuilder.putInputSpecs(topicName, - ConsumerSpec.newBuilder() - .setSchemaType(schemaType) - .setIsRegexPattern(false) - .build()); - }); - } - if (functionConfig.getInputSpecs() != null) { - functionConfig.getInputSpecs().forEach((topicName, consumerConf) -> { - ConsumerSpec.Builder bldr = ConsumerSpec.newBuilder() - .setIsRegexPattern(consumerConf.isRegexPattern()); - if (!StringUtils.isBlank(consumerConf.getSchemaType())) { - bldr.setSchemaType(consumerConf.getSchemaType()); - } else if (!StringUtils.isBlank(consumerConf.getSerdeClassName())) { - bldr.setSerdeClassName(consumerConf.getSerdeClassName()); - } - sourceSpecBuilder.putInputSpecs(topicName, bldr.build()); - }); - } - - // Set subscription type based on ordering and EFFECTIVELY_ONCE semantics - SubscriptionType subType = (functionConfig.isRetainOrdering() - || ProcessingGuarantees.EFFECTIVELY_ONCE.equals(functionConfig.getProcessingGuarantees())) - ? SubscriptionType.FAILOVER - : SubscriptionType.SHARED; - sourceSpecBuilder.setSubscriptionType(subType); - - if (isNotBlank(functionConfig.getSubName())) { - sourceSpecBuilder.setSubscriptionName(functionConfig.getSubName()); - } - - if (typeArgs != null) { - sourceSpecBuilder.setTypeClassName(typeArgs[0].getName()); - } - if (functionConfig.getTimeoutMs() != null) { - sourceSpecBuilder.setTimeoutMs(functionConfig.getTimeoutMs()); - } - functionDetailsBuilder.setSource(sourceSpecBuilder); - - // Setup sink - SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder(); - if (functionConfig.getOutput() != null) { - sinkSpecBuilder.setTopic(functionConfig.getOutput()); - } - if (!StringUtils.isBlank(functionConfig.getOutputSerdeClassName())) { - sinkSpecBuilder.setSerDeClassName(functionConfig.getOutputSerdeClassName()); - } - if (!StringUtils.isBlank(functionConfig.getOutputSchemaType())) { - sinkSpecBuilder.setSchemaType(functionConfig.getOutputSchemaType()); - } - - if (typeArgs != null) { - sinkSpecBuilder.setTypeClassName(typeArgs[1].getName()); - } - functionDetailsBuilder.setSink(sinkSpecBuilder); - - if (functionConfig.getTenant() != null) { - functionDetailsBuilder.setTenant(functionConfig.getTenant()); - } - if (functionConfig.getNamespace() != null) { - functionDetailsBuilder.setNamespace(functionConfig.getNamespace()); - } - if (functionConfig.getName() != null) { - functionDetailsBuilder.setName(functionConfig.getName()); - } - if (functionConfig.getLogTopic() != null) { - functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic()); - } - if (functionConfig.getRuntime() != null) { - functionDetailsBuilder.setRuntime(Utils.convertRuntime(functionConfig.getRuntime())); - } - if (functionConfig.getProcessingGuarantees() != null) { - functionDetailsBuilder.setProcessingGuarantees( - Utils.convertProcessingGuarantee(functionConfig.getProcessingGuarantees())); - } - - if (functionConfig.getMaxMessageRetries() >= 0) { - RetryDetails.Builder retryBuilder = RetryDetails.newBuilder(); - retryBuilder.setMaxMessageRetries(functionConfig.getMaxMessageRetries()); - if (isNotEmpty(functionConfig.getDeadLetterTopic())) { - retryBuilder.setDeadLetterTopic(functionConfig.getDeadLetterTopic()); - } - functionDetailsBuilder.setRetryDetails(retryBuilder); - } - - Map configs = new HashMap<>(); - configs.putAll(functionConfig.getUserConfig()); - - // windowing related - WindowConfig windowConfig = functionConfig.getWindowConfig(); - if (windowConfig != null) { - windowConfig.setActualWindowFunctionClassName(functionConfig.getClassName()); - configs.put(WindowConfig.WINDOW_CONFIG_KEY, windowConfig); - // set class name to window function executor - functionDetailsBuilder.setClassName(WindowFunctionExecutor.class.getName()); - - } else { - if (functionConfig.getClassName() != null) { - functionDetailsBuilder.setClassName(functionConfig.getClassName()); - } - } - if (!configs.isEmpty()) { - functionDetailsBuilder.setUserConfig(new Gson().toJson(configs)); - } - - functionDetailsBuilder.setAutoAck(functionConfig.isAutoAck()); - functionDetailsBuilder.setParallelism(functionConfig.getParallelism()); - if (functionConfig.getResources() != null) { - Resources.Builder bldr = Resources.newBuilder(); - if (functionConfig.getResources().getCpu() != null) { - bldr.setCpu(functionConfig.getResources().getCpu()); - } - if (functionConfig.getResources().getRam() != null) { - bldr.setRam(functionConfig.getResources().getRam()); - } - if (functionConfig.getResources().getDisk() != null) { - bldr.setDisk(functionConfig.getResources().getDisk()); - } - functionDetailsBuilder.setResources(bldr.build()); - } - return functionDetailsBuilder.build(); - } - - protected org.apache.pulsar.functions.proto.Function.FunctionDetails convertProto2(FunctionConfig functionConfig) - throws IOException { - org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder(); - Utils.mergeJson(FunctionsImpl.printJson(convert(functionConfig)), functionDetailsBuilder); - return functionDetailsBuilder.build(); - } } @Parameters(commandDescription = "Run the Pulsar Function locally (rather than deploying it to the Pulsar cluster)") @@ -848,7 +668,7 @@ private void mergeArgs() { void runCmd() throws Exception { // merge deprecated args with new args mergeArgs(); - CmdFunctions.startLocalRun(convertProto2(functionConfig), functionConfig.getParallelism(), + CmdFunctions.startLocalRun(FunctionConfigUtils.convert(functionConfig, classLoader), functionConfig.getParallelism(), instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl, AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin) .clientAuthenticationParameters(clientAuthParams).useTls(useTls) @@ -864,9 +684,9 @@ class CreateFunction extends FunctionDetailsCommand { @Override void runCmd() throws Exception { if (Utils.isFunctionPackageUrlSupported(functionConfig.getJar())) { - admin.functions().createFunctionWithUrl(convert(functionConfig), functionConfig.getJar()); + admin.functions().createFunctionWithUrl(FunctionConfigUtils.convert(functionConfig, classLoader), functionConfig.getJar()); } else { - admin.functions().createFunction(convert(functionConfig), userCodeFile); + admin.functions().createFunction(FunctionConfigUtils.convert(functionConfig, classLoader), userCodeFile); } print("Created successfully"); @@ -956,9 +776,9 @@ class UpdateFunction extends FunctionDetailsCommand { @Override void runCmd() throws Exception { if (Utils.isFunctionPackageUrlSupported(functionConfig.getJar())) { - admin.functions().updateFunctionWithUrl(convert(functionConfig), functionConfig.getJar()); + admin.functions().updateFunctionWithUrl(FunctionConfigUtils.convert(functionConfig, classLoader), functionConfig.getJar()); } else { - admin.functions().updateFunction(convert(functionConfig), userCodeFile); + admin.functions().updateFunction(FunctionConfigUtils.convert(functionConfig, classLoader), userCodeFile); } print("Updated successfully"); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 8f9eefea94af7..bd5af791a8a20 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -22,7 +22,6 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; -import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee; import static org.apache.pulsar.functions.utils.Utils.fileExists; import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl; @@ -34,14 +33,11 @@ import com.google.gson.reflect.TypeToken; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.lang.reflect.Type; import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; @@ -56,20 +52,8 @@ import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.nar.NarClassLoader; -import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.instance.AuthenticationConfig; -import org.apache.pulsar.functions.proto.Function; -import org.apache.pulsar.functions.proto.Function.ConsumerSpec; -import org.apache.pulsar.functions.proto.Function.FunctionDetails; -import org.apache.pulsar.functions.proto.Function.Resources; -import org.apache.pulsar.functions.proto.Function.SinkSpec; -import org.apache.pulsar.functions.proto.Function.SourceSpec; -import org.apache.pulsar.functions.proto.Function.SubscriptionType; -import org.apache.pulsar.functions.utils.FunctionConfig; -import org.apache.pulsar.functions.utils.FunctionConfig.ProcessingGuarantees; -import org.apache.pulsar.functions.utils.SinkConfig; -import org.apache.pulsar.functions.utils.ConsumerConfig; -import org.apache.pulsar.functions.utils.Utils; +import org.apache.pulsar.functions.utils.*; import org.apache.pulsar.functions.utils.io.ConnectorUtils; import org.apache.pulsar.functions.utils.io.Connectors; import org.apache.pulsar.functions.utils.validation.ConfigValidation; @@ -203,9 +187,9 @@ protected class CreateSink extends SinkCommand { @Override void runCmd() throws Exception { if (Utils.isFunctionPackageUrlSupported(archive)) { - admin.functions().createFunctionWithUrl(createSinkConfig(sinkConfig), sinkConfig.getArchive()); + admin.functions().createFunctionWithUrl(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive()); } else { - admin.functions().createFunction(createSinkConfig(sinkConfig), sinkConfig.getArchive()); + admin.functions().createFunction(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive()); } print("Created successfully"); } @@ -216,9 +200,9 @@ protected class UpdateSink extends SinkCommand { @Override void runCmd() throws Exception { if (Utils.isFunctionPackageUrlSupported(archive)) { - admin.functions().updateFunctionWithUrl(createSinkConfig(sinkConfig), sinkConfig.getArchive()); + admin.functions().updateFunctionWithUrl(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive()); } else { - admin.functions().updateFunction(createSinkConfig(sinkConfig), sinkConfig.getArchive()); + admin.functions().updateFunction(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive()); } print("Updated successfully"); } @@ -413,6 +397,9 @@ void processArguments() throws Exception { } inferMissingArguments(sinkConfig); + + // check if configs are valid + validateSinkConfigs(sinkConfig); } protected Map parseConfigs(String str) { @@ -462,6 +449,7 @@ protected void validateSinkConfigs(SinkConfig sinkConfig) { } // if jar file is present locally then load jar and validate SinkClass in it + ClassLoader classLoader = null; if (archivePath != null) { if (!fileExists(archivePath)) { throw new ParameterException("Archive file " + archivePath + " does not exist"); @@ -470,17 +458,20 @@ protected void validateSinkConfigs(SinkConfig sinkConfig) { try { ConnectorDefinition connector = ConnectorUtils.getConnectorDefinition(archivePath); log.info("Connector: {}", connector); - - // Validate sink class - ConnectorUtils.getIOSinkClass(archivePath); } catch (IOException e) { throw new ParameterException("Connector from " + archivePath + " has error: " + e.getMessage()); } + + try { + classLoader = NarClassLoader.getFromArchive(new File(archivePath), Collections.emptySet()); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } } try { // Need to load jar and set context class loader before calling - ConfigValidation.validateConfig(sinkConfig, FunctionConfig.Runtime.JAVA.name()); + ConfigValidation.validateConfig(sinkConfig, FunctionConfig.Runtime.JAVA.name(), classLoader); } catch (Exception e) { throw new ParameterException(e.getMessage()); } @@ -491,152 +482,7 @@ protected org.apache.pulsar.functions.proto.Function.FunctionDetails createSinkC throws IOException { org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder(); - Utils.mergeJson(FunctionsImpl.printJson(createSinkConfig(sinkConfig)), functionDetailsBuilder); - return functionDetailsBuilder.build(); - } - - protected FunctionDetails createSinkConfig(SinkConfig sinkConfig) throws IOException { - - // check if configs are valid - validateSinkConfigs(sinkConfig); - - String sinkClassName = null; - String typeArg = null; - - FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); - - boolean isBuiltin = sinkConfig.getArchive().startsWith(Utils.BUILTIN); - - if (!isBuiltin) { - if (sinkConfig.getArchive().startsWith(Utils.FILE)) { - if (isBlank(sinkConfig.getClassName())) { - throw new ParameterException("Class-name must be present for archive with file-url"); - } - sinkClassName = sinkConfig.getClassName(); // server derives the arg-type by loading a class - } else { - sinkClassName = ConnectorUtils.getIOSinkClass(sinkConfig.getArchive()); - try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sinkConfig.getArchive()), - Collections.emptySet())) { - typeArg = Utils.getSinkType(sinkClassName, ncl).getName(); - } - } - } - - if (sinkConfig.getTenant() != null) { - functionDetailsBuilder.setTenant(sinkConfig.getTenant()); - } - if (sinkConfig.getNamespace() != null) { - functionDetailsBuilder.setNamespace(sinkConfig.getNamespace()); - } - if (sinkConfig.getName() != null) { - functionDetailsBuilder.setName(sinkConfig.getName()); - } - functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA); - functionDetailsBuilder.setParallelism(sinkConfig.getParallelism()); - functionDetailsBuilder.setClassName(IdentityFunction.class.getName()); - if (sinkConfig.getProcessingGuarantees() != null) { - functionDetailsBuilder.setProcessingGuarantees( - convertProcessingGuarantee(sinkConfig.getProcessingGuarantees())); - } - - // set source spec - // source spec classname should be empty so that the default pulsar source will be used - SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder(); - sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED); - if (sinkConfig.getInputs() != null) { - sinkConfig.getInputs().forEach(topicName -> - sourceSpecBuilder.putInputSpecs(topicName, - ConsumerSpec.newBuilder() - .setIsRegexPattern(false) - .build())); - } - if (!StringUtils.isEmpty(sinkConfig.getTopicsPattern())) { - sourceSpecBuilder.putInputSpecs(sinkConfig.getTopicsPattern(), - ConsumerSpec.newBuilder() - .setIsRegexPattern(true) - .build()); - } - if (sinkConfig.getTopicToSerdeClassName() != null) { - sinkConfig.getTopicToSerdeClassName().forEach((topicName, serde) -> { - sourceSpecBuilder.putInputSpecs(topicName, - ConsumerSpec.newBuilder() - .setSerdeClassName(serde == null ? "" : serde) - .setIsRegexPattern(false) - .build()); - }); - } - if (sinkConfig.getTopicToSchemaType() != null) { - sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> { - sourceSpecBuilder.putInputSpecs(topicName, - ConsumerSpec.newBuilder() - .setSchemaType(schemaType == null ? "" : schemaType) - .setIsRegexPattern(false) - .build()); - }); - } - if (sinkConfig.getInputSpecs() != null) { - sinkConfig.getInputSpecs().forEach((topic, spec) -> { - sourceSpecBuilder.putInputSpecs(topic, - ConsumerSpec.newBuilder() - .setSerdeClassName(spec.getSerdeClassName() != null ? spec.getSerdeClassName() : "") - .setSchemaType(spec.getSchemaType() != null ? spec.getSchemaType() : "") - .setIsRegexPattern(spec.isRegexPattern()) - .build()); - }); - } - - if (typeArg != null) { - sourceSpecBuilder.setTypeClassName(typeArg); - } - if (isNotBlank(sinkConfig.getSourceSubscriptionName())) { - sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName()); - } - - SubscriptionType subType = (sinkConfig.isRetainOrdering() - || ProcessingGuarantees.EFFECTIVELY_ONCE.equals(sinkConfig.getProcessingGuarantees())) - ? SubscriptionType.FAILOVER - : SubscriptionType.SHARED; - sourceSpecBuilder.setSubscriptionType(subType); - - functionDetailsBuilder.setAutoAck(sinkConfig.isAutoAck()); - if (sinkConfig.getTimeoutMs() != null) { - sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs()); - } - - functionDetailsBuilder.setSource(sourceSpecBuilder); - - // set up sink spec - SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder(); - if (sinkClassName != null) { - sinkSpecBuilder.setClassName(sinkClassName); - } - - if (isBuiltin) { - String builtin = sinkConfig.getArchive().replaceFirst("^builtin://", ""); - sinkSpecBuilder.setBuiltin(builtin); - } - - if (sinkConfig.getConfigs() != null) { - sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfig.getConfigs())); - } - if (typeArg != null) { - sinkSpecBuilder.setTypeClassName(typeArg); - } - functionDetailsBuilder.setSink(sinkSpecBuilder); - - if (sinkConfig.getResources() != null) { - Resources.Builder bldr = Resources.newBuilder(); - if (sinkConfig.getResources().getCpu() != null) { - bldr.setCpu(sinkConfig.getResources().getCpu()); - } - if (sinkConfig.getResources().getRam() != null) { - bldr.setRam(sinkConfig.getResources().getRam()); - } - if (sinkConfig.getResources().getDisk() != null) { - bldr.setDisk(sinkConfig.getResources().getDisk()); - } - functionDetailsBuilder.setResources(bldr.build()); - } + Utils.mergeJson(FunctionsImpl.printJson(SinkConfigUtils.convert(sinkConfig)), functionDetailsBuilder); return functionDetailsBuilder.build(); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index fb32a6a02e218..5d0c84c90d810 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -20,9 +20,7 @@ import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; -import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee; import static org.apache.pulsar.functions.utils.Utils.fileExists; -import static org.apache.pulsar.functions.utils.Utils.getSourceType; import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl; import com.beust.jcommander.Parameter; @@ -33,7 +31,6 @@ import com.google.gson.reflect.TypeToken; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.lang.reflect.Type; import java.nio.file.Paths; @@ -52,14 +49,10 @@ import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.nar.NarClassLoader; -import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.instance.AuthenticationConfig; -import org.apache.pulsar.functions.proto.Function.FunctionDetails; -import org.apache.pulsar.functions.proto.Function.Resources; -import org.apache.pulsar.functions.proto.Function.SinkSpec; -import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.functions.utils.SourceConfig; +import org.apache.pulsar.functions.utils.SourceConfigUtils; import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.utils.io.ConnectorUtils; import org.apache.pulsar.functions.utils.io.Connectors; @@ -194,9 +187,9 @@ protected class CreateSource extends SourceCommand { @Override void runCmd() throws Exception { if (Utils.isFunctionPackageUrlSupported(this.sourceConfig.getArchive())) { - admin.functions().createFunctionWithUrl(createSourceConfig(sourceConfig), sourceConfig.getArchive()); + admin.functions().createFunctionWithUrl(SourceConfigUtils.convert(sourceConfig), sourceConfig.getArchive()); } else { - admin.functions().createFunction(createSourceConfig(sourceConfig), sourceConfig.getArchive()); + admin.functions().createFunction(SourceConfigUtils.convert(sourceConfig), sourceConfig.getArchive()); } print("Created successfully"); } @@ -207,9 +200,9 @@ protected class UpdateSource extends SourceCommand { @Override void runCmd() throws Exception { if (Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive())) { - admin.functions().updateFunctionWithUrl(createSourceConfig(sourceConfig), sourceConfig.getArchive()); + admin.functions().updateFunctionWithUrl(SourceConfigUtils.convert(sourceConfig), sourceConfig.getArchive()); } else { - admin.functions().updateFunction(createSourceConfig(sourceConfig), sourceConfig.getArchive()); + admin.functions().updateFunction(SourceConfigUtils.convert(sourceConfig), sourceConfig.getArchive()); } print("Updated successfully"); } @@ -357,6 +350,9 @@ void processArguments() throws Exception { } inferMissingArguments(sourceConfig); + + // check if source configs are valid + validateSourceConfigs(sourceConfig); } protected Map parseConfigs(String str) { @@ -407,6 +403,7 @@ protected void validateSourceConfigs(SourceConfig sourceConfig) { // if jar file is present locally then load jar and validate SinkClass in it + ClassLoader classLoader = null; if (archivePath != null) { if (!fileExists(archivePath)) { throw new ParameterException("Archive file " + archivePath + " does not exist"); @@ -415,17 +412,21 @@ protected void validateSourceConfigs(SourceConfig sourceConfig) { try { ConnectorDefinition connector = ConnectorUtils.getConnectorDefinition(archivePath); log.info("Connector: {}", connector); - - // Validate source class - ConnectorUtils.getIOSourceClass(archivePath); } catch (IOException e) { throw new ParameterException("Connector from " + archivePath + " has error: " + e.getMessage()); } + + try { + classLoader = NarClassLoader.getFromArchive(new File(archivePath), + Collections.emptySet()); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } } try { // Need to load jar and set context class loader before calling - ConfigValidation.validateConfig(sourceConfig, FunctionConfig.Runtime.JAVA.name()); + ConfigValidation.validateConfig(sourceConfig, FunctionConfig.Runtime.JAVA.name(), classLoader); } catch (Exception e) { throw new ParameterException(e.getMessage()); } @@ -435,108 +436,7 @@ protected org.apache.pulsar.functions.proto.Function.FunctionDetails createSourc throws IOException { org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder(); - Utils.mergeJson(FunctionsImpl.printJson(createSourceConfig(sourceConfig)), functionDetailsBuilder); - return functionDetailsBuilder.build(); - } - - protected FunctionDetails createSourceConfig(SourceConfig sourceConfig) throws IOException { - - // check if source configs are valid - validateSourceConfigs(sourceConfig); - - String sourceClassName = null; - String typeArg = null; - - FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); - - boolean isBuiltin = sourceConfig.getArchive().startsWith(Utils.BUILTIN); - - if (!isBuiltin) { - if (sourceConfig.getArchive().startsWith(Utils.FILE)) { - if (StringUtils.isBlank(sourceConfig.getClassName())) { - throw new ParameterException("Class-name must be present for archive with file-url"); - } - sourceClassName = sourceConfig.getClassName(); // server derives the arg-type by loading a class - } else { - sourceClassName = ConnectorUtils.getIOSourceClass(sourceConfig.getArchive()); - - try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sourceConfig.getArchive()), - Collections.emptySet())) { - typeArg = getSourceType(sourceClassName, ncl).getName(); - } - } - } - - if (sourceConfig.getTenant() != null) { - functionDetailsBuilder.setTenant(sourceConfig.getTenant()); - } - if (sourceConfig.getNamespace() != null) { - functionDetailsBuilder.setNamespace(sourceConfig.getNamespace()); - } - if (sourceConfig.getName() != null) { - functionDetailsBuilder.setName(sourceConfig.getName()); - } - functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA); - functionDetailsBuilder.setParallelism(sourceConfig.getParallelism()); - functionDetailsBuilder.setClassName(IdentityFunction.class.getName()); - functionDetailsBuilder.setAutoAck(true); - if (sourceConfig.getProcessingGuarantees() != null) { - functionDetailsBuilder.setProcessingGuarantees( - convertProcessingGuarantee(sourceConfig.getProcessingGuarantees())); - } - - // set source spec - SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder(); - if (sourceClassName != null) { - sourceSpecBuilder.setClassName(sourceClassName); - } - - if (isBuiltin) { - String builtin = sourceConfig.getArchive().replaceFirst("^builtin://", ""); - sourceSpecBuilder.setBuiltin(builtin); - } - - if (sourceConfig.getConfigs() != null) { - sourceSpecBuilder.setConfigs(new Gson().toJson(sourceConfig.getConfigs())); - } - - if (typeArg != null) { - sourceSpecBuilder.setTypeClassName(typeArg); - } - functionDetailsBuilder.setSource(sourceSpecBuilder); - - // set up sink spec. - // Sink spec classname should be empty so that the default pulsar sink will be used - SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder(); - if (!StringUtils.isEmpty(sourceConfig.getSchemaType())) { - sinkSpecBuilder.setSchemaType(sourceConfig.getSchemaType()); - } - if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName())) { - sinkSpecBuilder.setSerDeClassName(sourceConfig.getSerdeClassName()); - } - - sinkSpecBuilder.setTopic(sourceConfig.getTopicName()); - - if (typeArg != null) { - sinkSpecBuilder.setTypeClassName(typeArg); - } - - functionDetailsBuilder.setSink(sinkSpecBuilder); - - if (sourceConfig.getResources() != null) { - Resources.Builder bldr = Resources.newBuilder(); - if (sourceConfig.getResources().getCpu() != null) { - bldr.setCpu(sourceConfig.getResources().getCpu()); - } - if (sourceConfig.getResources().getRam() != null) { - bldr.setRam(sourceConfig.getResources().getRam()); - } - if (sourceConfig.getResources().getDisk() != null) { - bldr.setDisk(sourceConfig.getResources().getDisk()); - } - functionDetailsBuilder.setResources(bldr.build()); - } - + Utils.mergeJson(FunctionsImpl.printJson(SourceConfigUtils.convert(sourceConfig)), functionDetailsBuilder); return functionDetailsBuilder.build(); } diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java index fb13d39913772..fc0f180bc1ee5 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java @@ -435,7 +435,7 @@ public void testMissingArchive() throws Exception { ); } - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Connector from .*.pulsar-io-twitter.nar has error: The 'twitter' connector does not provide a sink implementation") + @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "The 'twitter' connector does not provide a sink implementation") public void testInvalidJarWithNoSource() throws Exception { SinkConfig sinkConfig = getSinkConfig(); sinkConfig.setArchive(WRONG_JAR_PATH); @@ -783,7 +783,7 @@ public void testCmdSinkConfigFileInvalidJar() throws Exception { testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig); } - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Connector from .*.pulsar-io-twitter.nar has error: The 'twitter' connector does not provide a sink implementation") + @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "The 'twitter' connector does not provide a sink implementation") public void testCmdSinkConfigFileInvalidJarNoSink() throws Exception { SinkConfig testSinkConfig = getSinkConfig(); testSinkConfig.setArchive(WRONG_JAR_PATH); diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java index fbd51519f864b..6548f2d73b5ad 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java @@ -355,7 +355,7 @@ public void testInvalidJar() throws Exception { ); } - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Connector from .*.pulsar-io-cassandra.nar has error: The 'cassandra' connector does not provide a source implementation") + @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Failed to extract source class from archive") public void testInvalidJarWithNoSource() throws Exception { SourceConfig sourceConfig = getSourceConfig(); sourceConfig.setArchive(WRONG_JAR_PATH); @@ -650,7 +650,7 @@ public void testCmdSourceConfigFileInvalidJar() throws Exception { testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig); } - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Connector from .*.pulsar-io-cassandra.nar has error: The 'cassandra' connector does not provide a source implementation") + @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Failed to extract source class from archive") public void testCmdSourceConfigFileInvalidJarNoSource() throws Exception { SourceConfig testSourceConfig = getSourceConfig(); testSourceConfig.setArchive(WRONG_JAR_PATH); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java index dc368126f42dc..d2b94e363917f 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java @@ -32,6 +32,7 @@ import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.utils.validation.ConfigValidation; +import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass; @@ -98,7 +99,7 @@ public enum Runtime { */ private String outputSchemaType; - @isImplementationOfClass(implementsClass = SerDe.class) + @ConfigValidationAnnotations.isValidSerde private String outputSerdeClassName; @isValidTopicName private String logTopic; diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java new file mode 100644 index 0000000000000..d546830701fb8 --- /dev/null +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.functions.utils; + +import com.google.gson.Gson; +import org.apache.commons.lang.StringUtils; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.Function.FunctionDetails; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.commons.lang.StringUtils.isBlank; +import static org.apache.commons.lang.StringUtils.isNotBlank; +import static org.apache.commons.lang.StringUtils.isNotEmpty; + +public class FunctionConfigUtils { + + public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader classLoader) + throws IllegalArgumentException { + + Class[] typeArgs = null; + if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { + if (classLoader != null) { + typeArgs = Utils.getFunctionTypes(functionConfig, classLoader); + } + } + + FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); + + // Setup source + Function.SourceSpec.Builder sourceSpecBuilder = Function.SourceSpec.newBuilder(); + if (functionConfig.getInputs() != null) { + functionConfig.getInputs().forEach((topicName -> { + sourceSpecBuilder.putInputSpecs(topicName, + Function.ConsumerSpec.newBuilder() + .setIsRegexPattern(false) + .build()); + })); + } + if (functionConfig.getTopicsPattern() != null && !functionConfig.getTopicsPattern().isEmpty()) { + sourceSpecBuilder.putInputSpecs(functionConfig.getTopicsPattern(), + Function.ConsumerSpec.newBuilder() + .setIsRegexPattern(true) + .build()); + } + if (functionConfig.getCustomSerdeInputs() != null) { + functionConfig.getCustomSerdeInputs().forEach((topicName, serdeClassName) -> { + sourceSpecBuilder.putInputSpecs(topicName, + Function.ConsumerSpec.newBuilder() + .setSerdeClassName(serdeClassName) + .setIsRegexPattern(false) + .build()); + }); + } + if (functionConfig.getCustomSchemaInputs() != null) { + functionConfig.getCustomSchemaInputs().forEach((topicName, schemaType) -> { + sourceSpecBuilder.putInputSpecs(topicName, + Function.ConsumerSpec.newBuilder() + .setSchemaType(schemaType) + .setIsRegexPattern(false) + .build()); + }); + } + if (functionConfig.getInputSpecs() != null) { + functionConfig.getInputSpecs().forEach((topicName, consumerConf) -> { + Function.ConsumerSpec.Builder bldr = Function.ConsumerSpec.newBuilder() + .setIsRegexPattern(consumerConf.isRegexPattern()); + if (!StringUtils.isBlank(consumerConf.getSchemaType())) { + bldr.setSchemaType(consumerConf.getSchemaType()); + } else if (!StringUtils.isBlank(consumerConf.getSerdeClassName())) { + bldr.setSerdeClassName(consumerConf.getSerdeClassName()); + } + sourceSpecBuilder.putInputSpecs(topicName, bldr.build()); + }); + } + + // Set subscription type based on ordering and EFFECTIVELY_ONCE semantics + Function.SubscriptionType subType = (functionConfig.isRetainOrdering() + || FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE.equals(functionConfig.getProcessingGuarantees())) + ? Function.SubscriptionType.FAILOVER + : Function.SubscriptionType.SHARED; + sourceSpecBuilder.setSubscriptionType(subType); + + if (isNotBlank(functionConfig.getSubName())) { + sourceSpecBuilder.setSubscriptionName(functionConfig.getSubName()); + } + + if (typeArgs != null) { + sourceSpecBuilder.setTypeClassName(typeArgs[0].getName()); + } + if (functionConfig.getTimeoutMs() != null) { + sourceSpecBuilder.setTimeoutMs(functionConfig.getTimeoutMs()); + } + functionDetailsBuilder.setSource(sourceSpecBuilder); + + // Setup sink + Function.SinkSpec.Builder sinkSpecBuilder = Function.SinkSpec.newBuilder(); + if (functionConfig.getOutput() != null) { + sinkSpecBuilder.setTopic(functionConfig.getOutput()); + } + if (!StringUtils.isBlank(functionConfig.getOutputSerdeClassName())) { + sinkSpecBuilder.setSerDeClassName(functionConfig.getOutputSerdeClassName()); + } + if (!StringUtils.isBlank(functionConfig.getOutputSchemaType())) { + sinkSpecBuilder.setSchemaType(functionConfig.getOutputSchemaType()); + } + + if (typeArgs != null) { + sinkSpecBuilder.setTypeClassName(typeArgs[1].getName()); + } + functionDetailsBuilder.setSink(sinkSpecBuilder); + + if (functionConfig.getTenant() != null) { + functionDetailsBuilder.setTenant(functionConfig.getTenant()); + } + if (functionConfig.getNamespace() != null) { + functionDetailsBuilder.setNamespace(functionConfig.getNamespace()); + } + if (functionConfig.getName() != null) { + functionDetailsBuilder.setName(functionConfig.getName()); + } + if (functionConfig.getLogTopic() != null) { + functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic()); + } + if (functionConfig.getRuntime() != null) { + functionDetailsBuilder.setRuntime(Utils.convertRuntime(functionConfig.getRuntime())); + } + if (functionConfig.getProcessingGuarantees() != null) { + functionDetailsBuilder.setProcessingGuarantees( + Utils.convertProcessingGuarantee(functionConfig.getProcessingGuarantees())); + } + + if (functionConfig.getMaxMessageRetries() >= 0) { + Function.RetryDetails.Builder retryBuilder = Function.RetryDetails.newBuilder(); + retryBuilder.setMaxMessageRetries(functionConfig.getMaxMessageRetries()); + if (isNotEmpty(functionConfig.getDeadLetterTopic())) { + retryBuilder.setDeadLetterTopic(functionConfig.getDeadLetterTopic()); + } + functionDetailsBuilder.setRetryDetails(retryBuilder); + } + + Map configs = new HashMap<>(); + if (functionConfig.getUserConfig() != null) { + configs.putAll(functionConfig.getUserConfig()); + } + + // windowing related + WindowConfig windowConfig = functionConfig.getWindowConfig(); + if (windowConfig != null) { + windowConfig.setActualWindowFunctionClassName(functionConfig.getClassName()); + configs.put(WindowConfig.WINDOW_CONFIG_KEY, windowConfig); + // set class name to window function executor + functionDetailsBuilder.setClassName("org.apache.pulsar.functions.windowing.WindowFunctionExecutor"); + + } else { + if (functionConfig.getClassName() != null) { + functionDetailsBuilder.setClassName(functionConfig.getClassName()); + } + } + if (!configs.isEmpty()) { + functionDetailsBuilder.setUserConfig(new Gson().toJson(configs)); + } + + functionDetailsBuilder.setAutoAck(functionConfig.isAutoAck()); + functionDetailsBuilder.setParallelism(functionConfig.getParallelism()); + if (functionConfig.getResources() != null) { + Function.Resources.Builder bldr = Function.Resources.newBuilder(); + if (functionConfig.getResources().getCpu() != null) { + bldr.setCpu(functionConfig.getResources().getCpu()); + } + if (functionConfig.getResources().getRam() != null) { + bldr.setRam(functionConfig.getResources().getRam()); + } + if (functionConfig.getResources().getDisk() != null) { + bldr.setDisk(functionConfig.getResources().getDisk()); + } + functionDetailsBuilder.setResources(bldr.build()); + } + return functionDetailsBuilder.build(); + } +} diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java new file mode 100644 index 0000000000000..d44ea301a433d --- /dev/null +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.functions.utils; + +import com.google.gson.Gson; +import org.apache.commons.lang.StringUtils; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.functions.api.utils.IdentityFunction; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.utils.io.ConnectorUtils; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; + +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee; + +public class SinkConfigUtils { + + public static FunctionDetails convert(SinkConfig sinkConfig) throws IOException { + + String sinkClassName = null; + String typeArg = null; + + FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); + + boolean isBuiltin = sinkConfig.getArchive().startsWith(Utils.BUILTIN); + + if (!isBuiltin) { + if (sinkConfig.getArchive().startsWith(Utils.FILE)) { + if (isBlank(sinkConfig.getClassName())) { + throw new IllegalArgumentException("Class-name must be present for archive with file-url"); + } + sinkClassName = sinkConfig.getClassName(); // server derives the arg-type by loading a class + } else { + sinkClassName = ConnectorUtils.getIOSinkClass(sinkConfig.getArchive()); + try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sinkConfig.getArchive()), + Collections.emptySet())) { + typeArg = Utils.getSinkType(sinkClassName, ncl).getName(); + } + } + } + + if (sinkConfig.getTenant() != null) { + functionDetailsBuilder.setTenant(sinkConfig.getTenant()); + } + if (sinkConfig.getNamespace() != null) { + functionDetailsBuilder.setNamespace(sinkConfig.getNamespace()); + } + if (sinkConfig.getName() != null) { + functionDetailsBuilder.setName(sinkConfig.getName()); + } + functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA); + functionDetailsBuilder.setParallelism(sinkConfig.getParallelism()); + functionDetailsBuilder.setClassName(IdentityFunction.class.getName()); + if (sinkConfig.getProcessingGuarantees() != null) { + functionDetailsBuilder.setProcessingGuarantees( + convertProcessingGuarantee(sinkConfig.getProcessingGuarantees())); + } + + // set source spec + // source spec classname should be empty so that the default pulsar source will be used + Function.SourceSpec.Builder sourceSpecBuilder = Function.SourceSpec.newBuilder(); + sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED); + if (sinkConfig.getInputs() != null) { + sinkConfig.getInputs().forEach(topicName -> + sourceSpecBuilder.putInputSpecs(topicName, + Function.ConsumerSpec.newBuilder() + .setIsRegexPattern(false) + .build())); + } + if (!StringUtils.isEmpty(sinkConfig.getTopicsPattern())) { + sourceSpecBuilder.putInputSpecs(sinkConfig.getTopicsPattern(), + Function.ConsumerSpec.newBuilder() + .setIsRegexPattern(true) + .build()); + } + if (sinkConfig.getTopicToSerdeClassName() != null) { + sinkConfig.getTopicToSerdeClassName().forEach((topicName, serde) -> { + sourceSpecBuilder.putInputSpecs(topicName, + Function.ConsumerSpec.newBuilder() + .setSerdeClassName(serde == null ? "" : serde) + .setIsRegexPattern(false) + .build()); + }); + } + if (sinkConfig.getTopicToSchemaType() != null) { + sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> { + sourceSpecBuilder.putInputSpecs(topicName, + Function.ConsumerSpec.newBuilder() + .setSchemaType(schemaType == null ? "" : schemaType) + .setIsRegexPattern(false) + .build()); + }); + } + if (sinkConfig.getInputSpecs() != null) { + sinkConfig.getInputSpecs().forEach((topic, spec) -> { + sourceSpecBuilder.putInputSpecs(topic, + Function.ConsumerSpec.newBuilder() + .setSerdeClassName(spec.getSerdeClassName() != null ? spec.getSerdeClassName() : "") + .setSchemaType(spec.getSchemaType() != null ? spec.getSchemaType() : "") + .setIsRegexPattern(spec.isRegexPattern()) + .build()); + }); + } + + if (typeArg != null) { + sourceSpecBuilder.setTypeClassName(typeArg); + } + if (isNotBlank(sinkConfig.getSourceSubscriptionName())) { + sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName()); + } + + Function.SubscriptionType subType = (sinkConfig.isRetainOrdering() + || FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE.equals(sinkConfig.getProcessingGuarantees())) + ? Function.SubscriptionType.FAILOVER + : Function.SubscriptionType.SHARED; + sourceSpecBuilder.setSubscriptionType(subType); + + functionDetailsBuilder.setAutoAck(sinkConfig.isAutoAck()); + if (sinkConfig.getTimeoutMs() != null) { + sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs()); + } + + functionDetailsBuilder.setSource(sourceSpecBuilder); + + // set up sink spec + Function.SinkSpec.Builder sinkSpecBuilder = Function.SinkSpec.newBuilder(); + if (sinkClassName != null) { + sinkSpecBuilder.setClassName(sinkClassName); + } + + if (isBuiltin) { + String builtin = sinkConfig.getArchive().replaceFirst("^builtin://", ""); + sinkSpecBuilder.setBuiltin(builtin); + } + + if (sinkConfig.getConfigs() != null) { + sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfig.getConfigs())); + } + if (typeArg != null) { + sinkSpecBuilder.setTypeClassName(typeArg); + } + functionDetailsBuilder.setSink(sinkSpecBuilder); + + if (sinkConfig.getResources() != null) { + Function.Resources.Builder bldr = Function.Resources.newBuilder(); + if (sinkConfig.getResources().getCpu() != null) { + bldr.setCpu(sinkConfig.getResources().getCpu()); + } + if (sinkConfig.getResources().getRam() != null) { + bldr.setRam(sinkConfig.getResources().getRam()); + } + if (sinkConfig.getResources().getDisk() != null) { + bldr.setDisk(sinkConfig.getResources().getDisk()); + } + functionDetailsBuilder.setResources(bldr.build()); + } + return functionDetailsBuilder.build(); + } +} \ No newline at end of file diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java new file mode 100644 index 0000000000000..73b331ad9da18 --- /dev/null +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.functions.utils; + +import com.google.gson.Gson; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.functions.api.utils.IdentityFunction; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.utils.io.ConnectorUtils; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; + +import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee; +import static org.apache.pulsar.functions.utils.Utils.getSourceType; + +public class SourceConfigUtils { + + public static FunctionDetails convert(SourceConfig sourceConfig) + throws IllegalArgumentException, IOException { + + String sourceClassName = null; + String typeArg = null; + + FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); + + boolean isBuiltin = sourceConfig.getArchive().startsWith(Utils.BUILTIN); + + if (!isBuiltin) { + if (sourceConfig.getArchive().startsWith(Utils.FILE)) { + if (org.apache.commons.lang3.StringUtils.isBlank(sourceConfig.getClassName())) { + throw new IllegalArgumentException("Class-name must be present for archive with file-url"); + } + sourceClassName = sourceConfig.getClassName(); // server derives the arg-type by loading a class + } else { + sourceClassName = ConnectorUtils.getIOSourceClass(sourceConfig.getArchive()); + + try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sourceConfig.getArchive()), + Collections.emptySet())) { + typeArg = getSourceType(sourceClassName, ncl).getName(); + } + } + } + + if (sourceConfig.getTenant() != null) { + functionDetailsBuilder.setTenant(sourceConfig.getTenant()); + } + if (sourceConfig.getNamespace() != null) { + functionDetailsBuilder.setNamespace(sourceConfig.getNamespace()); + } + if (sourceConfig.getName() != null) { + functionDetailsBuilder.setName(sourceConfig.getName()); + } + functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA); + functionDetailsBuilder.setParallelism(sourceConfig.getParallelism()); + functionDetailsBuilder.setClassName(IdentityFunction.class.getName()); + functionDetailsBuilder.setAutoAck(true); + if (sourceConfig.getProcessingGuarantees() != null) { + functionDetailsBuilder.setProcessingGuarantees( + convertProcessingGuarantee(sourceConfig.getProcessingGuarantees())); + } + + // set source spec + Function.SourceSpec.Builder sourceSpecBuilder = Function.SourceSpec.newBuilder(); + if (sourceClassName != null) { + sourceSpecBuilder.setClassName(sourceClassName); + } + + if (isBuiltin) { + String builtin = sourceConfig.getArchive().replaceFirst("^builtin://", ""); + sourceSpecBuilder.setBuiltin(builtin); + } + + if (sourceConfig.getConfigs() != null) { + sourceSpecBuilder.setConfigs(new Gson().toJson(sourceConfig.getConfigs())); + } + + if (typeArg != null) { + sourceSpecBuilder.setTypeClassName(typeArg); + } + functionDetailsBuilder.setSource(sourceSpecBuilder); + + // set up sink spec. + // Sink spec classname should be empty so that the default pulsar sink will be used + Function.SinkSpec.Builder sinkSpecBuilder = Function.SinkSpec.newBuilder(); + if (!org.apache.commons.lang3.StringUtils.isEmpty(sourceConfig.getSchemaType())) { + sinkSpecBuilder.setSchemaType(sourceConfig.getSchemaType()); + } + if (!org.apache.commons.lang3.StringUtils.isEmpty(sourceConfig.getSerdeClassName())) { + sinkSpecBuilder.setSerDeClassName(sourceConfig.getSerdeClassName()); + } + + sinkSpecBuilder.setTopic(sourceConfig.getTopicName()); + + if (typeArg != null) { + sinkSpecBuilder.setTypeClassName(typeArg); + } + + functionDetailsBuilder.setSink(sinkSpecBuilder); + + if (sourceConfig.getResources() != null) { + Function.Resources.Builder bldr = Function.Resources.newBuilder(); + if (sourceConfig.getResources().getCpu() != null) { + bldr.setCpu(sourceConfig.getResources().getCpu()); + } + if (sourceConfig.getResources().getRam() != null) { + bldr.setRam(sourceConfig.getResources().getRam()); + } + if (sourceConfig.getResources().getDisk() != null) { + bldr.setDisk(sourceConfig.getResources().getDisk()); + } + functionDetailsBuilder.setResources(bldr.build()); + } + + return functionDetailsBuilder.build(); + } +} diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java index 94c315df3fdbc..7befc859ee92a 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java @@ -101,9 +101,8 @@ public static int findAvailablePort() { } } - public static Class[] getFunctionTypes(FunctionConfig functionConfig) { - Object userClass = createInstance(functionConfig.getClassName(), - Thread.currentThread().getContextClassLoader()); + public static Class[] getFunctionTypes(FunctionConfig functionConfig, ClassLoader classLoader) { + Object userClass = createInstance(functionConfig.getClassName(), classLoader); boolean isWindowConfigPresent = functionConfig.getWindowConfig() != null; return getFunctionTypes(userClass, isWindowConfigPresent); } @@ -187,10 +186,6 @@ public static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees co throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name()); } - public static Class getSourceType(String className) { - return getSourceType(className, Thread.currentThread().getContextClassLoader()); - } - public static Class getSourceType(String className, ClassLoader classloader) { Object userClass = Reflections.createInstance(className, classloader); @@ -205,10 +200,6 @@ public static Class getSourceType(String className, ClassLoader classloader) return typeArg; } - public static Class getSinkType(String className) { - return getSinkType(className, Thread.currentThread().getContextClassLoader()); - } - public static Class getSinkType(String className, ClassLoader classLoader) { Object userClass = Reflections.createInstance(className, classLoader); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java index fe89a013fbbd6..88ee41b4f9390 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java @@ -44,7 +44,7 @@ public enum Runtime { PYTHON } - public static void validateConfig(Object config, String runtimeType) { + public static void validateConfig(Object config, String runtimeType, ClassLoader classLoader) { for (Field field : config.getClass().getDeclaredFields()) { Object value; field.setAccessible(true); @@ -53,12 +53,12 @@ public static void validateConfig(Object config, String runtimeType) { } catch (IllegalAccessException e) { throw new RuntimeException(e); } - validateField(field, value, Runtime.valueOf(runtimeType)); + validateField(field, value, Runtime.valueOf(runtimeType), classLoader); } - validateClass(config, Runtime.valueOf(runtimeType)); + validateClass(config, Runtime.valueOf(runtimeType), classLoader); } - private static void validateClass(Object config, Runtime runtime) { + private static void validateClass(Object config, Runtime runtime, ClassLoader classLoader) { List annotationList = new LinkedList<>(); Class[] classes = ConfigValidationAnnotations.class.getDeclaredClasses(); @@ -70,10 +70,10 @@ private static void validateClass(Object config, Runtime runtime) { } } - processAnnotations(annotationList, config.getClass().getName(), config, runtime); + processAnnotations(annotationList, config.getClass().getName(), config, runtime, classLoader); } - private static void validateField(Field field, Object value, Runtime runtime) { + private static void validateField(Field field, Object value, Runtime runtime, ClassLoader classLoader) { List annotationList = new LinkedList<>(); Class[] classes = ConfigValidationAnnotations.class.getDeclaredClasses(); for (Class clazz : classes) { @@ -84,11 +84,11 @@ private static void validateField(Field field, Object value, Runtime runtime) { } } - processAnnotations(annotationList, field.getName(), value, runtime); + processAnnotations(annotationList, field.getName(), value, runtime, classLoader); } private static void processAnnotations( List annotations, String fieldName, Object value, - Runtime runtime) { + Runtime runtime, ClassLoader classLoader) { try { for (Annotation annotation : annotations) { @@ -127,7 +127,7 @@ private static void processAnnotations( List annotations, String fie } else { //If not call default constructor o = clazz.newInstance(); } - o.validateField(fieldName, value); + o.validateField(fieldName, value, classLoader); } } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java index e6e0583a707b9..08f0d661ad8e3 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java @@ -177,6 +177,17 @@ public class ConfigValidationAnnotations { ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL; } + /** + * checks if the topic name is valid + */ + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.FIELD) + public @interface isValidSerde { + Class validatorClass() default ValidatorImpls.SerdeValidator.class; + + ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.JAVA; + } + /** * checks if window configs is valid */ diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java index 59410482a137b..ab8782b546695 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java @@ -27,5 +27,5 @@ public Validator(Map params) { public Validator() { } - public abstract void validateField(String name, Object o); + public abstract void validateField(String name, Object o, ClassLoader classLoader); } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java index e8acc2839b5ee..dba6bd91cdbfa 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java @@ -92,7 +92,7 @@ public static void validateField(String name, boolean includeZero, Object o) { } @Override - public void validateField(String name, Object o) { + public void validateField(String name, Object o, ClassLoader classLoader) { validateField(name, this.includeZero, o); } } @@ -104,7 +104,7 @@ public void validateField(String name, Object o) { public static class NotNullValidator extends Validator { @Override - public void validateField(String name, Object o) { + public void validateField(String name, Object o, ClassLoader classLoader) { if (o == null) { throw new IllegalArgumentException(String.format("Field '%s' cannot be null!", name)); } @@ -113,9 +113,9 @@ public void validateField(String name, Object o) { public static class ResourcesValidator extends Validator { @Override - public void validateField(String name, Object o) { + public void validateField(String name, Object o, ClassLoader classLoader) { if (o == null) { - throw new IllegalArgumentException(String.format("Field '%s' cannot be null!", name)); + return; } if (o instanceof Resources) { @@ -152,7 +152,7 @@ public static void validateField(String name, Class type, Object o) { } @Override - public void validateField(String name, Object o) { + public void validateField(String name, Object o, ClassLoader classLoader) { validateField(name, this.type, o); } } @@ -176,7 +176,7 @@ public static void validateField(String name, Class keyType, Class valueTy } @Override - public void validateField(String name, Object o) { + public void validateField(String name, Object o, ClassLoader classLoader) { validateField(name, this.keyType, this.valueType, o); } } @@ -194,7 +194,7 @@ public ImplementsClassValidator(Class classImplements) { } @Override - public void validateField(String name, Object o) { + public void validateField(String name, Object o, ClassLoader classLoader) { if (o == null) { return; } @@ -206,7 +206,7 @@ public void validateField(String name, Object o) { Class objectClass; try { - objectClass = loadClass(className); + objectClass = loadClass(className, classLoader); } catch (ClassNotFoundException e) { throw new IllegalArgumentException("Cannot find/load class " + className); } @@ -235,7 +235,7 @@ public ImplementsClassesValidator(Class... classesImplements) { } @Override - public void validateField(String name, Object o) { + public void validateField(String name, Object o, ClassLoader classLoader) { if (o == null) { return; } @@ -248,7 +248,7 @@ public void validateField(String name, Object o) { for (Class classImplements : classesImplements) { Class objectClass = null; try { - objectClass = loadClass(className); + objectClass = loadClass(className, classLoader); } catch (ClassNotFoundException e) { throw new IllegalArgumentException("Cannot find/load class " + className); } @@ -269,8 +269,9 @@ public void validateField(String name, Object o) { public static class SerdeValidator extends Validator { @Override - public void validateField(String name, Object o) { - new ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, o); + public void validateField(String name, Object o, ClassLoader classLoader) { + if (o != null && o.equals(DEFAULT_SERDE)) return; + new ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, o, classLoader); } } @@ -278,8 +279,8 @@ public void validateField(String name, Object o) { public static class SchemaValidator extends Validator { @Override - public void validateField(String name, Object o) { - new ValidatorImpls.ImplementsClassValidator(Schema.class).validateField(name, o); + public void validateField(String name, Object o, ClassLoader classLoader) { + new ValidatorImpls.ImplementsClassValidator(Schema.class).validateField(name, o, classLoader); } } @@ -298,7 +299,8 @@ public MapEntryCustomValidator(Map params) { } @SuppressWarnings("unchecked") - public static void validateField(String name, Class[] keyValidators, Class[] valueValidators, Object o) + public static void validateField(String name, Class[] keyValidators, Class[] valueValidators, Object o, + ClassLoader classLoader) throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException { if (o == null) { return; @@ -309,7 +311,7 @@ public static void validateField(String name, Class[] keyValidators, Class for (Class kv : keyValidators) { Object keyValidator = kv.getConstructor().newInstance(); if (keyValidator instanceof Validator) { - ((Validator) keyValidator).validateField(name + " Map key", entry.getKey()); + ((Validator) keyValidator).validateField(name + " Map key", entry.getKey(), classLoader); } else { log.warn( "validator: {} cannot be used in MapEntryCustomValidator to validate keys. Individual entry validators must " + @@ -320,7 +322,7 @@ public static void validateField(String name, Class[] keyValidators, Class for (Class vv : valueValidators) { Object valueValidator = vv.getConstructor().newInstance(); if (valueValidator instanceof Validator) { - ((Validator) valueValidator).validateField(name + " Map value", entry.getValue()); + ((Validator) valueValidator).validateField(name + " Map value", entry.getValue(), classLoader); } else { log.warn( "validator: {} cannot be used in MapEntryCustomValidator to validate values. Individual entry validators " + @@ -332,9 +334,9 @@ public static void validateField(String name, Class[] keyValidators, Class } @Override - public void validateField(String name, Object o) { + public void validateField(String name, Object o, ClassLoader classLoader) { try { - validateField(name, this.keyValidators, this.valueValidators, o); + validateField(name, this.keyValidators, this.valueValidators, o, classLoader); } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) { throw new RuntimeException(e); } @@ -357,7 +359,7 @@ public StringValidator(Map params) { } @Override - public void validateField(String name, Object o) { + public void validateField(String name, Object o, ClassLoader classLoader) { SimpleTypeValidator.validateField(name, String.class, o); if (this.acceptedValues != null) { if (!this.acceptedValues.contains((String) o)) { @@ -370,11 +372,8 @@ public void validateField(String name, Object o) { @NoArgsConstructor public static class FunctionConfigValidator extends Validator { - private static void doJavaChecks(FunctionConfig functionConfig, String name) { - Class[] typeArgs = Utils.getFunctionTypes(functionConfig); - - ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); - + private static void doJavaChecks(FunctionConfig functionConfig, String name, ClassLoader clsLoader) { + Class[] typeArgs = Utils.getFunctionTypes(functionConfig, clsLoader); // inputs use default schema, so there is no check needed there // Check if the Input serialization/deserialization class exists in jar or already loaded and that it @@ -441,7 +440,7 @@ private static void validateSchema(String schemaType, Class typeArg, String n // If it's built-in, no need to validate } else { try { - new SchemaValidator().validateField(name, schemaType); + new SchemaValidator().validateField(name, schemaType, clsLoader); } catch (IllegalArgumentException ex) { throw new IllegalArgumentException( String.format("The input schema class %s does not not implement %s", @@ -455,9 +454,10 @@ private static void validateSchema(String schemaType, Class typeArg, String n private static void validateSerde(String inputSerializer, Class typeArg, String name, ClassLoader clsLoader, boolean deser) { if (StringUtils.isEmpty(inputSerializer)) return; + if (inputSerializer.equals(DEFAULT_SERDE)) return; Class serdeClass; try { - serdeClass = loadClass(inputSerializer); + serdeClass = loadClass(inputSerializer, clsLoader); } catch (ClassNotFoundException e) { throw new IllegalArgumentException( String.format("The input serialization/deserialization class %s does not exist", @@ -465,7 +465,7 @@ private static void validateSerde(String inputSerializer, Class typeArg, Stri } try { - new ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, inputSerializer); + new ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, inputSerializer, clsLoader); } catch (IllegalArgumentException ex) { throw new IllegalArgumentException( String.format("The input serialization/deserialization class %s does not not implement %s", @@ -473,35 +473,31 @@ private static void validateSerde(String inputSerializer, Class typeArg, Stri inputSerializer, SerDe.class.getCanonicalName())); } - if (inputSerializer.equals(DEFAULT_SERDE)) { - // No checks needed here - } else { - SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, clsLoader); - if (serDe == null) { - throw new IllegalArgumentException(String.format("The SerDe class %s does not exist", - inputSerializer)); - } - Class[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass()); + SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, clsLoader); + if (serDe == null) { + throw new IllegalArgumentException(String.format("The SerDe class %s does not exist", + inputSerializer)); + } + Class[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass()); - // type inheritance information seems to be lost in generic type - // load the actual type class for verification - Class fnInputClass; - Class serdeInputClass; - try { - fnInputClass = Class.forName(typeArg.getName(), true, clsLoader); - serdeInputClass = Class.forName(serDeTypes[0].getName(), true, clsLoader); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException("Failed to load type class", e); - } + // type inheritance information seems to be lost in generic type + // load the actual type class for verification + Class fnInputClass; + Class serdeInputClass; + try { + fnInputClass = Class.forName(typeArg.getName(), true, clsLoader); + serdeInputClass = Class.forName(serDeTypes[0].getName(), true, clsLoader); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Failed to load type class", e); + } - if (deser) { - if (!fnInputClass.isAssignableFrom(serdeInputClass)) { - throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]); - } - } else { - if (!serdeInputClass.isAssignableFrom(fnInputClass)) { - throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]); - } + if (deser) { + if (!fnInputClass.isAssignableFrom(serdeInputClass)) { + throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]); + } + } else { + if (!serdeInputClass.isAssignableFrom(fnInputClass)) { + throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]); } } } @@ -584,12 +580,12 @@ private static Collection collectAllInputTopics(FunctionConfig functionC } @Override - public void validateField(String name, Object o) { + public void validateField(String name, Object o, ClassLoader classLoader) { FunctionConfig functionConfig = (FunctionConfig) o; doCommonChecks(functionConfig); if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { - if (!functionConfig.getJar().startsWith(Utils.FILE)) { - doJavaChecks(functionConfig, name); + if (classLoader != null) { + doJavaChecks(functionConfig, name, classLoader); } } else { doPythonChecks(functionConfig, name); @@ -609,7 +605,7 @@ public ListEntryCustomValidator(Map params) { this.entryValidators = (Class[]) params.get(ConfigValidationAnnotations.ValidatorParams.ENTRY_VALIDATOR_CLASSES); } - public static void validateField(String name, Class[] validators, Object o) + public static void validateField(String name, Class[] validators, Object o, ClassLoader classLoader) throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException { if (o == null) { return; @@ -620,7 +616,7 @@ public static void validateField(String name, Class[] validators, Object o) for (Class validator : validators) { Object v = validator.getConstructor().newInstance(); if (v instanceof Validator) { - ((Validator) v).validateField(name + " list entry", entry); + ((Validator) v).validateField(name + " list entry", entry, classLoader); } else { log.warn( "validator: {} cannot be used in ListEntryCustomValidator. Individual entry validators must a instance of " + @@ -632,9 +628,9 @@ public static void validateField(String name, Class[] validators, Object o) } @Override - public void validateField(String name, Object o) { + public void validateField(String name, Object o, ClassLoader classLoader) { try { - validateField(name, this.entryValidators, o); + validateField(name, this.entryValidators, o, classLoader); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) { throw new RuntimeException(e); } @@ -645,11 +641,11 @@ public void validateField(String name, Object o) { public static class TopicNameValidator extends Validator { @Override - public void validateField(String name, Object o) { + public void validateField(String name, Object o, ClassLoader classLoader) { if (o == null) { return; } - new StringValidator().validateField(name, o); + new StringValidator().validateField(name, o, classLoader); String topic = (String) o; if (!TopicName.isValid(topic)) { throw new IllegalArgumentException( @@ -715,7 +711,7 @@ public static void validateWindowConfig(WindowConfig windowConfig) { } @Override - public void validateField(String name, Object o) { + public void validateField(String name, Object o, ClassLoader classLoader) { if (o == null) { return; } @@ -729,7 +725,7 @@ public void validateField(String name, Object o) { public static class SourceConfigValidator extends Validator { @Override - public void validateField(String name, Object o) { + public void validateField(String name, Object o, ClassLoader classLoader) { SourceConfig sourceConfig = (SourceConfig) o; if (sourceConfig.getArchive().startsWith(Utils.BUILTIN)) { // We don't have to check the archive, since it's provided on the worker itself @@ -743,31 +739,27 @@ public void validateField(String name, Object o) { throw new IllegalArgumentException("Failed to extract source class from archive", e1); } - try (NarClassLoader clsLoader = NarClassLoader.getFromArchive(new File(sourceConfig.getArchive()), - Collections.emptySet())) { - Class typeArg = getSourceType(sourceClassName, clsLoader); - // Only one of serdeClassName or schemaType should be set - if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty() - && sourceConfig.getSchemaType() != null && !sourceConfig.getSchemaType().isEmpty()) { - throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set"); - } + Class typeArg = getSourceType(sourceClassName, classLoader); - if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()) { - FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, name, clsLoader, false); - } - if (sourceConfig.getSchemaType() != null && !sourceConfig.getSchemaType().isEmpty()) { - FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, name, clsLoader, false); - } - } catch (IOException e) { - throw new IllegalArgumentException(e); + // Only one of serdeClassName or schemaType should be set + if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty() + && sourceConfig.getSchemaType() != null && !sourceConfig.getSchemaType().isEmpty()) { + throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set"); + } + + if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()) { + FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, name, classLoader, false); + } + if (sourceConfig.getSchemaType() != null && !sourceConfig.getSchemaType().isEmpty()) { + FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, name, classLoader, false); } } } public static class SinkConfigValidator extends Validator { @Override - public void validateField(String name, Object o) { + public void validateField(String name, Object o, ClassLoader classLoader) { SinkConfig sinkConfig = (SinkConfig) o; if (sinkConfig.getArchive().startsWith(Utils.BUILTIN)) { // We don't have to check the archive, since it's provided on the worker itself @@ -822,7 +814,7 @@ public void validateField(String name, Object o) { }); } } catch (IOException e) { - throw new IllegalArgumentException(e); + throw new IllegalArgumentException(e.getMessage()); } } @@ -849,11 +841,11 @@ private static Collection collectAllInputTopics(SinkConfig sinkConfig) { public static class FileValidator extends Validator { @Override - public void validateField(String name, Object o) { + public void validateField(String name, Object o, ClassLoader classLoader) { if (o == null) { return; } - new StringValidator().validateField(name, o); + new StringValidator().validateField(name, o, classLoader); String path = (String) o; @@ -890,19 +882,18 @@ public static void validateField(String name, Class type, Object o) { } @Override - public void validateField(String name, Object o) { + public void validateField(String name, Object o, ClassLoader classLoader) { validateField(name, this.type, o); } } - private static Class loadClass(String className) throws ClassNotFoundException { + private static Class loadClass(String className, ClassLoader classLoader) throws ClassNotFoundException { Class objectClass; try { objectClass = Class.forName(className); } catch (ClassNotFoundException e) { - ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); - if (clsLoader != null) { - objectClass = clsLoader.loadClass(className); + if (classLoader != null) { + objectClass = classLoader.loadClass(className); } else { throw e; } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java index 6487dcfed2be6..aa80403009615 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java @@ -27,6 +27,7 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStream; +import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; @@ -46,6 +47,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.worker.dlog.DLInputStream; import org.apache.pulsar.functions.worker.dlog.DLOutputStream; import org.apache.zookeeper.KeeperException.Code; @@ -133,13 +135,19 @@ public static void uploadToBookeeper(Namespace dlogNamespace, } } - public static void validateFileUrl(String destPkgUrl, String downloadPkgDir) throws IOException, URISyntaxException { + public static ClassLoader validateFileUrl(String destPkgUrl, String downloadPkgDir) throws IOException, URISyntaxException { if (destPkgUrl.startsWith(FILE)) { URL url = new URL(destPkgUrl); File file = new File(url.toURI()); if (!file.exists()) { throw new IOException(destPkgUrl + " does not exists locally"); } + try { + return Reflections.loadJar(file); + } catch (MalformedURLException e) { + throw new IllegalArgumentException( + "Corrupt User PackageFile " + file + " with error " + e.getMessage()); + } } else if (destPkgUrl.startsWith("http")) { URL website = new URL(destPkgUrl); File tempFile = new File(downloadPkgDir, website.getHost() + UUID.randomUUID().toString()); @@ -150,6 +158,7 @@ public static void validateFileUrl(String destPkgUrl, String downloadPkgDir) thr if (tempFile.exists()) { tempFile.delete(); } + return null; } else { throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]"); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index ee3b4800f3cf7..312473187403e 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -20,8 +20,10 @@ import static com.google.common.base.Preconditions.checkNotNull; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; import static org.apache.pulsar.functions.utils.Reflections.createInstance; +import static org.apache.pulsar.functions.utils.Reflections.loadJar; import static org.apache.pulsar.functions.utils.Utils.FILE; import static org.apache.pulsar.functions.utils.Utils.HTTP; import static org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported; @@ -38,6 +40,9 @@ import java.net.URISyntaxException; import java.net.URL; import java.net.URLClassLoader; +import java.nio.file.CopyOption; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.util.Base64; import java.util.Collection; import java.util.LinkedList; @@ -66,6 +71,7 @@ import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.join; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -83,7 +89,10 @@ import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; +import org.apache.pulsar.functions.utils.FunctionConfig; +import org.apache.pulsar.functions.utils.FunctionConfigUtils; import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders; +import org.apache.pulsar.functions.utils.validation.ConfigValidation; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.Utils; @@ -126,7 +135,8 @@ private boolean isWorkerServiceAvailable() { public Response registerFunction(final String tenant, final String namespace, final String functionName, final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail, - final String functionPkgUrl, final String functionDetailsJson, final String clientRole) { + final String functionPkgUrl, final String functionDetailsJson, final String functionConfigJson, + final String clientRole) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -145,16 +155,28 @@ public Response registerFunction(final String tenant, final String namespace, fi .entity(new ErrorData(e.getMessage())).build(); } + FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); + + if (functionMetaDataManager.containsFunction(tenant, namespace, functionName)) { + log.error("Function {}/{}/{} already exists", tenant, namespace, functionName); + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(String.format("Function %s already exists", functionName))).build(); + } + FunctionDetails functionDetails; boolean isPkgUrlProvided = isNotBlank(functionPkgUrl); + File uploadedInputStreamAsFile = null; + if (uploadedInputStream != null) { + uploadedInputStreamAsFile = dumpToTmpFile(uploadedInputStream); + } // validate parameters try { if (isPkgUrlProvided) { functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl, - functionDetailsJson); + functionDetailsJson, functionConfigJson); } else { - functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStream, - fileDetail, functionDetailsJson); + functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStreamAsFile, + fileDetail, functionDetailsJson, functionConfigJson); } } catch (Exception e) { log.error("Invalid register function request @ /{}/{}/{}", tenant, namespace, functionName, e); @@ -162,14 +184,6 @@ public Response registerFunction(final String tenant, final String namespace, fi .entity(new ErrorData(e.getMessage())).build(); } - FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); - - if (functionMetaDataManager.containsFunction(tenant, namespace, functionName)) { - log.error("Function {}/{}/{} already exists", tenant, namespace, functionName); - return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(String.format("Function %s already exists", functionName))).build(); - } - try { worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails); } catch (Exception e) { @@ -196,12 +210,13 @@ public Response registerFunction(final String tenant, final String namespace, fi functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); return (isPkgUrlProvided || isBuiltin) ? updateRequest(functionMetaDataBuilder.build()) - : updateRequest(functionMetaDataBuilder.build(), uploadedInputStream); + : updateRequest(functionMetaDataBuilder.build(), uploadedInputStreamAsFile); } public Response updateFunction(final String tenant, final String namespace, final String functionName, final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail, - final String functionPkgUrl, final String functionDetailsJson, final String clientRole) { + final String functionPkgUrl, final String functionDetailsJson, final String functionConfigJson, + final String clientRole) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -220,16 +235,27 @@ public Response updateFunction(final String tenant, final String namespace, fina .entity(new ErrorData(e.getMessage())).build(); } + FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); + + if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build(); + } + FunctionDetails functionDetails; boolean isPkgUrlProvided = isNotBlank(functionPkgUrl); + File uploadedInputStreamAsFile = null; + if (uploadedInputStream != null) { + uploadedInputStreamAsFile = dumpToTmpFile(uploadedInputStream); + } // validate parameters try { if (isPkgUrlProvided) { functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl, - functionDetailsJson); + functionDetailsJson, functionConfigJson); } else { - functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStream, - fileDetail, functionDetailsJson); + functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStreamAsFile, + fileDetail, functionDetailsJson, functionConfigJson); } } catch (Exception e) { log.error("Invalid register function request @ /{}/{}/{}", tenant, namespace, functionName, e); @@ -237,13 +263,6 @@ public Response updateFunction(final String tenant, final String namespace, fina .entity(new ErrorData(e.getMessage())).build(); } - FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); - - if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) { - return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build(); - } - try { worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails); } catch (Exception e) { @@ -271,7 +290,7 @@ public Response updateFunction(final String tenant, final String namespace, fina functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); return (isPkgUrlProvided || isBuiltin) ? updateRequest(functionMetaDataBuilder.build()) - : updateRequest(functionMetaDataBuilder.build(), uploadedInputStream); + : updateRequest(functionMetaDataBuilder.build(), uploadedInputStreamAsFile); } public Response deregisterFunction(final String tenant, final String namespace, final String functionName, @@ -557,10 +576,11 @@ public Response listFunctions(final String tenant, final String namespace) { return Response.status(Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build(); } - private Response updateRequest(FunctionMetaData functionMetaData, InputStream uploadedInputStream) { + private Response updateRequest(FunctionMetaData functionMetaData, File uploadedInputStreamAsFile) { // Upload to bookkeeper try { log.info("Uploading function package to {}", functionMetaData.getPackageLocation()); + FileInputStream uploadedInputStream = new FileInputStream(uploadedInputStreamAsFile); Utils.uploadToBookeeper(worker().getDlogNamespace(), uploadedInputStream, functionMetaData.getPackageLocation().getPackagePath()); @@ -857,30 +877,41 @@ private void validateDeregisterRequestParams(String tenant, String namespace, St } private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, String namespace, String functionName, - String functionPkgUrl, String functionDetailsJson) + String functionPkgUrl, String functionDetailsJson, String functionConfigJson) throws IllegalArgumentException, IOException, URISyntaxException { if (!isFunctionPackageUrlSupported(functionPkgUrl)) { throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)"); } - Utils.validateFileUrl(functionPkgUrl, workerServiceSupplier.get().getWorkerConfig().getDownloadDirectory()); FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, - functionDetailsJson, functionPkgUrl); + functionDetailsJson, functionConfigJson, functionPkgUrl, null); return functionDetails; } private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName, - InputStream uploadedInputStream, FormDataContentDisposition fileDetail, String functionDetailsJson) - throws IllegalArgumentException { + File uploadedInputStreamAsFile, FormDataContentDisposition fileDetail, String functionDetailsJson, + String functionConfigJson) + throws IllegalArgumentException, IOException, URISyntaxException { FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, - functionDetailsJson, null); - if (!isFunctionCodeBuiltin(functionDetails) && (uploadedInputStream == null || fileDetail == null)) { + functionDetailsJson, functionConfigJson, null, uploadedInputStreamAsFile); + if (!isFunctionCodeBuiltin(functionDetails) && (uploadedInputStreamAsFile == null || fileDetail == null)) { throw new IllegalArgumentException("Function Package is not provided"); } return functionDetails; } + private static File dumpToTmpFile(InputStream uploadedInputStream) { + try { + File tmpFile = File.createTempFile("functions", null); + tmpFile.deleteOnExit(); + Files.copy(uploadedInputStream, tmpFile.toPath(), REPLACE_EXISTING); + return tmpFile; + } catch (IOException e) { + throw new RuntimeException("Cannot create a temporary file", e); + } + } + private void validateGetFunctionStateParams(String tenant, String namespace, String functionName, String key) throws IllegalArgumentException { @@ -935,7 +966,7 @@ private String getFunctionCodeBuiltin(FunctionDetails functionDetails) { } private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName, - String functionDetailsJson, String functionPkgUrl) throws IllegalArgumentException { + String functionDetailsJson, String functionConfigJson, String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException, URISyntaxException { if (tenant == null) { throw new IllegalArgumentException("Tenant is not provided"); } @@ -946,63 +977,87 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp throw new IllegalArgumentException("Function Name is not provided"); } - if (functionDetailsJson == null) { - throw new IllegalArgumentException("FunctionDetails is not provided"); + if (StringUtils.isEmpty(functionDetailsJson) && StringUtils.isEmpty(functionConfigJson)) { + throw new IllegalArgumentException("FunctionConfig is not provided"); } - try { - FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); - org.apache.pulsar.functions.utils.Utils.mergeJson(functionDetailsJson, functionDetailsBuilder); - if (isNotBlank(functionPkgUrl)) { - // validate function details by loading function-jar from local file-system - File jarWithFileUrl = functionPkgUrl.startsWith(FILE) ? (new File((new URL(functionPkgUrl)).toURI())) - : null; - validateFunctionClassTypes(jarWithFileUrl, functionDetailsBuilder); - // set package-url if present - functionDetailsBuilder.setPackageUrl(functionPkgUrl); - } - FunctionDetails functionDetails = functionDetailsBuilder.build(); - - List missingFields = new LinkedList<>(); - if (functionDetails.getTenant() == null || functionDetails.getTenant().isEmpty()) { - missingFields.add("Tenant"); - } - if (functionDetails.getNamespace() == null || functionDetails.getNamespace().isEmpty()) { - missingFields.add("Namespace"); - } - if (functionDetails.getName() == null || functionDetails.getName().isEmpty()) { - missingFields.add("Name"); - } - if (functionDetails.getClassName() == null || functionDetails.getClassName().isEmpty()) { - missingFields.add("ClassName"); - } - // TODO in the future add more check here for functions and connectors - if (!functionDetails.getSource().isInitialized()) { - missingFields.add("Source"); - } - // TODO in the future add more check here for functions and connectors - if (!functionDetails.getSink().isInitialized()) { - missingFields.add("Sink"); + if (!StringUtils.isEmpty(functionDetailsJson) && !StringUtils.isEmpty(functionConfigJson)) { + throw new IllegalArgumentException("Only one of FunctionDetails or FunctionConfig should be provided"); + } + if (!StringUtils.isEmpty(functionConfigJson)) { + FunctionConfig functionConfig = new Gson().fromJson(functionConfigJson, FunctionConfig.class); + ClassLoader clsLoader = null; + if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { + clsLoader = extractClassLoader(functionPkgUrl, uploadedInputStreamAsFile); } - if (!missingFields.isEmpty()) { - String errorMessage = join(missingFields, ","); - throw new IllegalArgumentException(errorMessage + " is not provided"); + if (functionConfig.getRuntime() == null) { + throw new IllegalArgumentException("Function Runtime no specified"); } - if (functionDetails.getParallelism() <= 0) { - throw new IllegalArgumentException("Parallelism needs to be set to a positive number"); + ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name(), clsLoader); + return FunctionConfigUtils.convert(functionConfig, clsLoader); + } + FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); + org.apache.pulsar.functions.utils.Utils.mergeJson(functionDetailsJson, functionDetailsBuilder); + if (isNotBlank(functionPkgUrl)) { + // set package-url if present + functionDetailsBuilder.setPackageUrl(functionPkgUrl); + } + ClassLoader clsLoader = null; + if (functionDetailsBuilder.getRuntime() == FunctionDetails.Runtime.JAVA) { + clsLoader = extractClassLoader(functionPkgUrl, uploadedInputStreamAsFile); + } + validateFunctionClassTypes(clsLoader, functionDetailsBuilder); + + FunctionDetails functionDetails = functionDetailsBuilder.build(); + + List missingFields = new LinkedList<>(); + if (functionDetails.getTenant() == null || functionDetails.getTenant().isEmpty()) { + missingFields.add("Tenant"); + } + if (functionDetails.getNamespace() == null || functionDetails.getNamespace().isEmpty()) { + missingFields.add("Namespace"); + } + if (functionDetails.getName() == null || functionDetails.getName().isEmpty()) { + missingFields.add("Name"); + } + if (functionDetails.getClassName() == null || functionDetails.getClassName().isEmpty()) { + missingFields.add("ClassName"); + } + // TODO in the future add more check here for functions and connectors + if (!functionDetails.getSource().isInitialized()) { + missingFields.add("Source"); + } + // TODO in the future add more check here for functions and connectors + if (!functionDetails.getSink().isInitialized()) { + missingFields.add("Sink"); + } + if (!missingFields.isEmpty()) { + String errorMessage = join(missingFields, ","); + throw new IllegalArgumentException(errorMessage + " is not provided"); + } + if (functionDetails.getParallelism() <= 0) { + throw new IllegalArgumentException("Parallelism needs to be set to a positive number"); + } + return functionDetails; + } + + private ClassLoader extractClassLoader(String functionPkgUrl, File uploadedInputStreamAsFile) throws URISyntaxException, IOException { + if (isNotBlank(functionPkgUrl)) { + return Utils.validateFileUrl(functionPkgUrl, workerServiceSupplier.get().getWorkerConfig().getDownloadDirectory()); + } else if (uploadedInputStreamAsFile != null) { + try { + return loadJar(uploadedInputStreamAsFile); + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Corrupted Jar File", e); } - return functionDetails; - } catch (IllegalArgumentException ex) { - throw ex; - } catch (Exception ex) { - throw new IllegalArgumentException("Invalid FunctionDetails"); + } else { + return null; } } - private void validateFunctionClassTypes(File jarFile, FunctionDetails.Builder functionDetailsBuilder) - throws MalformedURLException { + private void validateFunctionClassTypes(ClassLoader classLoader, FunctionDetails.Builder functionDetailsBuilder) { - // validate only if jar-file is provided - if (jarFile == null) { + // validate only if classLoader is provided + if (classLoader == null) { return; } @@ -1010,10 +1065,6 @@ private void validateFunctionClassTypes(File jarFile, FunctionDetails.Builder fu throw new IllegalArgumentException("function class-name can't be empty"); } - URL[] urls = new URL[1]; - urls[0] = jarFile.toURI().toURL(); - URLClassLoader classLoader = create(urls, FunctionClassLoaders.class.getClassLoader()); - // validate function class-type Object functionObject = createInstance(functionDetailsBuilder.getClassName(), classLoader); Class[] typeArgs = org.apache.pulsar.functions.utils.Utils.getFunctionTypes(functionObject, false); @@ -1076,7 +1127,7 @@ && isNotBlank(functionDetailsBuilder.getSink().getClassName())) { } - private Class getTypeArg(String className, Class funClass, URLClassLoader classLoader) + private Class getTypeArg(String className, Class funClass, ClassLoader classLoader) throws ClassNotFoundException { Class loadedClass = classLoader.loadClass(className); if (!funClass.isAssignableFrom(loadedClass)) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java index 789fbea67ce46..1e44a6072beac 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java @@ -56,10 +56,11 @@ public Response registerFunction(final @PathParam("tenant") String tenant, final @FormDataParam("data") InputStream uploadedInputStream, final @FormDataParam("data") FormDataContentDisposition fileDetail, final @FormDataParam("url") String functionPkgUrl, - final @FormDataParam("functionDetails") String functionDetailsJson) { + final @FormDataParam("functionDetails") String functionDetailsJson, + final @FormDataParam("functionConfig") String functionConfigJson) { return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, - functionPkgUrl, functionDetailsJson, clientAppId()); + functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId()); } @@ -72,10 +73,11 @@ public Response updateFunction(final @PathParam("tenant") String tenant, final @FormDataParam("data") InputStream uploadedInputStream, final @FormDataParam("data") FormDataContentDisposition fileDetail, final @FormDataParam("url") String functionPkgUrl, - final @FormDataParam("functionDetails") String functionDetailsJson) { + final @FormDataParam("functionDetails") String functionDetailsJson, + final @FormDataParam("functionConfig") String functionConfigJson) { return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, - functionPkgUrl, functionDetailsJson, clientAppId()); + functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId()); } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index 3d7620ed42702..9ce87d016e005 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -37,16 +37,14 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.concurrent.CompletableFuture; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; +import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.config.Configurator; @@ -64,6 +62,7 @@ import org.apache.pulsar.functions.proto.Function.SubscriptionType; import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.source.TopicSchema; +import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.functions.worker.*; import org.apache.pulsar.functions.worker.request.RequestResult; import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; @@ -83,6 +82,7 @@ */ @PrepareForTest(Utils.class) @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" }) +@Slf4j public class FunctionApiV2ResourceTest { @ObjectFactory @@ -92,7 +92,8 @@ public IObjectFactory getObjectFactory() { private static final class TestFunction implements Function { - public String process(String input, Context context) throws Exception { + @Override + public String process(String input, Context context) { return input; } } @@ -100,15 +101,15 @@ public String process(String input, Context context) throws Exception { public static final class TestSink implements Sink { @Override - public void close() throws Exception { + public void close() { } @Override - public void open(Map config, SinkContext sinkContext) throws Exception { + public void open(Map config, SinkContext sinkContext) { } @Override - public void write(Record record) throws Exception { + public void write(Record record) { } } @@ -176,12 +177,13 @@ public void testRegisterFunctionMissingTenant() throws IOException { namespace, function, mockedInputStream, + topicsToSerDeClassName, mockedFormData, outputTopic, outputSerdeClassName, className, parallelism, - "Tenant"); + "Tenant is not provided"); } @Test @@ -191,12 +193,13 @@ public void testRegisterFunctionMissingNamespace() throws IOException { null, function, mockedInputStream, + topicsToSerDeClassName, mockedFormData, outputTopic, outputSerdeClassName, className, parallelism, - "Namespace"); + "Namespace is not provided"); } @Test @@ -206,12 +209,13 @@ public void testRegisterFunctionMissingFunctionName() throws IOException { namespace, null, mockedInputStream, + topicsToSerDeClassName, mockedFormData, outputTopic, outputSerdeClassName, className, parallelism, - "Function Name"); + "Function Name is not provided"); } @Test @@ -221,12 +225,29 @@ public void testRegisterFunctionMissingPackage() throws IOException { namespace, function, null, + topicsToSerDeClassName, mockedFormData, outputTopic, outputSerdeClassName, className, parallelism, - "Function Package"); + "Function Package is not provided"); + } + + @Test + public void testRegisterFunctionMissingInputTopics() throws IOException { + testRegisterFunctionMissingArguments( + tenant, + namespace, + function, + null, + null, + mockedFormData, + outputTopic, + outputSerdeClassName, + className, + parallelism, + "No input topic(s) specified for the function"); } @Test @@ -236,12 +257,13 @@ public void testRegisterFunctionMissingPackageDetails() throws IOException { namespace, function, mockedInputStream, + topicsToSerDeClassName, null, outputTopic, outputSerdeClassName, className, parallelism, - "Function Package"); + "Function Package is not provided"); } @Test @@ -251,12 +273,13 @@ public void testRegisterFunctionMissingClassName() throws IOException { namespace, function, mockedInputStream, + topicsToSerDeClassName, mockedFormData, outputTopic, outputSerdeClassName, null, parallelism, - "ClassName"); + "Field 'className' cannot be null!"); } @Test @@ -266,12 +289,13 @@ public void testRegisterFunctionMissingParallelism() throws IOException { namespace, function, mockedInputStream, + topicsToSerDeClassName, mockedFormData, outputTopic, outputSerdeClassName, className, null, - "parallelism"); + "Field 'parallelism' must be a Positive Number"); } private void testRegisterFunctionMissingArguments( @@ -279,38 +303,40 @@ private void testRegisterFunctionMissingArguments( String namespace, String function, InputStream inputStream, + Map topicsToSerDeClassName, FormDataContentDisposition details, String outputTopic, String outputSerdeClassName, String className, Integer parallelism, - String missingFieldName) throws IOException { - FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); + String errorExpected) throws IOException { + FunctionConfig functionConfig = new FunctionConfig(); if (tenant != null) { - functionDetailsBuilder.setTenant(tenant); + functionConfig.setTenant(tenant); } if (namespace != null) { - functionDetailsBuilder.setNamespace(namespace); + functionConfig.setNamespace(namespace); } if (function != null) { - functionDetailsBuilder.setName(function); + functionConfig.setName(function); + } + if (topicsToSerDeClassName != null) { + functionConfig.setCustomSerdeInputs(topicsToSerDeClassName); } - SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder(); if (outputTopic != null) { - sinkSpecBuilder.setTopic(outputTopic); + functionConfig.setOutput(outputTopic); } if (outputSerdeClassName != null) { - sinkSpecBuilder.setSerDeClassName(outputSerdeClassName); + functionConfig.setOutputSerdeClassName(outputSerdeClassName); } - functionDetailsBuilder.setSink(sinkSpecBuilder); if (className != null) { - functionDetailsBuilder.setClassName(className); + functionConfig.setClassName(className); } if (parallelism != null) { - functionDetailsBuilder.setParallelism(parallelism); + functionConfig.setParallelism(parallelism); } + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); - FunctionDetails functionDetails = functionDetailsBuilder.build(); Response response = resource.registerFunction( tenant, namespace, @@ -318,28 +344,25 @@ private void testRegisterFunctionMissingArguments( inputStream, details, null, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), + null, + new Gson().toJson(functionConfig), null); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - if (missingFieldName.equals("parallelism")) { - Assert.assertEquals(new ErrorData("Parallelism needs to be set to a positive number").reason, ((ErrorData) response.getEntity()).reason); - } else { - Assert.assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason); - } + Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(errorExpected).reason); } - private Response registerDefaultFunction() throws IOException { - SinkSpec sinkSpec = SinkSpec.newBuilder() - .setTopic(outputTopic) - .setSerDeClassName(outputSerdeClassName).build(); - FunctionDetails functionDetails = FunctionDetails.newBuilder() - .setTenant(tenant).setNamespace(namespace).setName(function) - .setSink(sinkSpec) - .setClassName(className) - .setParallelism(parallelism) - .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType) - .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build(); + private Response registerDefaultFunction() { + FunctionConfig functionConfig = new FunctionConfig(); + functionConfig.setTenant(tenant); + functionConfig.setNamespace(namespace); + functionConfig.setName(function); + functionConfig.setClassName(className); + functionConfig.setParallelism(parallelism); + functionConfig.setCustomSerdeInputs(topicsToSerDeClassName); + functionConfig.setOutput(outputTopic); + functionConfig.setOutputSerdeClassName(outputSerdeClassName); + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); return resource.registerFunction( tenant, namespace, @@ -347,7 +370,8 @@ private Response registerDefaultFunction() throws IOException { mockedInputStream, mockedFormData, null, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), + null, + new Gson().toJson(functionConfig), null); } @@ -452,12 +476,13 @@ public void testUpdateFunctionMissingTenant() throws IOException { namespace, function, mockedInputStream, + topicsToSerDeClassName, mockedFormData, outputTopic, outputSerdeClassName, className, parallelism, - "Tenant"); + "Tenant is not provided"); } @Test @@ -467,12 +492,13 @@ public void testUpdateFunctionMissingNamespace() throws IOException { null, function, mockedInputStream, + topicsToSerDeClassName, mockedFormData, outputTopic, outputSerdeClassName, className, parallelism, - "Namespace"); + "Namespace is not provided"); } @Test @@ -482,12 +508,13 @@ public void testUpdateFunctionMissingFunctionName() throws IOException { namespace, null, mockedInputStream, + topicsToSerDeClassName, mockedFormData, outputTopic, outputSerdeClassName, className, parallelism, - "Function Name"); + "Function Name is not provided"); } @Test @@ -497,12 +524,29 @@ public void testUpdateFunctionMissingPackage() throws IOException { namespace, function, null, + topicsToSerDeClassName, mockedFormData, outputTopic, outputSerdeClassName, className, parallelism, - "Function Package"); + "Function Package is not provided"); + } + + @Test + public void testUpdateFunctionMissingInputTopic() throws IOException { + testUpdateFunctionMissingArguments( + tenant, + namespace, + function, + mockedInputStream, + null, + mockedFormData, + outputTopic, + outputSerdeClassName, + className, + parallelism, + "No input topic(s) specified for the function"); } @Test @@ -512,12 +556,13 @@ public void testUpdateFunctionMissingPackageDetails() throws IOException { namespace, function, mockedInputStream, + topicsToSerDeClassName, null, outputTopic, outputSerdeClassName, className, parallelism, - "Function Package"); + "Function Package is not provided"); } @Test @@ -527,12 +572,13 @@ public void testUpdateFunctionMissingClassName() throws IOException { namespace, function, mockedInputStream, + topicsToSerDeClassName, mockedFormData, outputTopic, outputSerdeClassName, null, parallelism, - "ClassName"); + "Field 'className' cannot be null!"); } @Test public void testUpdateFunctionMissingParallelism() throws IOException { @@ -541,12 +587,13 @@ public void testUpdateFunctionMissingParallelism() throws IOException { namespace, function, mockedInputStream, + topicsToSerDeClassName, mockedFormData, outputTopic, outputSerdeClassName, className, null, - "parallelism"); + "Field 'parallelism' must be a Positive Number"); } private void testUpdateFunctionMissingArguments( @@ -554,38 +601,42 @@ private void testUpdateFunctionMissingArguments( String namespace, String function, InputStream inputStream, + Map topicsToSerDeClassName, FormDataContentDisposition details, String outputTopic, String outputSerdeClassName, String className, Integer parallelism, - String missingFieldName) throws IOException { - FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); + String expectedError) throws IOException { + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); + + FunctionConfig functionConfig = new FunctionConfig(); if (tenant != null) { - functionDetailsBuilder.setTenant(tenant); + functionConfig.setTenant(tenant); } if (namespace != null) { - functionDetailsBuilder.setNamespace(namespace); + functionConfig.setNamespace(namespace); } if (function != null) { - functionDetailsBuilder.setName(function); + functionConfig.setName(function); + } + if (topicsToSerDeClassName != null) { + functionConfig.setCustomSerdeInputs(topicsToSerDeClassName); } - SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder(); if (outputTopic != null) { - sinkSpecBuilder.setTopic(outputTopic); + functionConfig.setOutput(outputTopic); } if (outputSerdeClassName != null) { - sinkSpecBuilder.setSerDeClassName(outputSerdeClassName); + functionConfig.setOutputSerdeClassName(outputSerdeClassName); } - functionDetailsBuilder.setSink(sinkSpecBuilder); if (className != null) { - functionDetailsBuilder.setClassName(className); + functionConfig.setClassName(className); } if (parallelism != null) { - functionDetailsBuilder.setParallelism(parallelism); + functionConfig.setParallelism(parallelism); } + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); - FunctionDetails functionDetails = functionDetailsBuilder.build(); Response response = resource.updateFunction( tenant, namespace, @@ -593,28 +644,26 @@ private void testUpdateFunctionMissingArguments( inputStream, details, null, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), + null, + new Gson().toJson(functionConfig), null); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - if (missingFieldName.equals("parallelism")) { - Assert.assertEquals(new ErrorData("Parallelism needs to be set to a positive number").reason, ((ErrorData) response.getEntity()).reason); - } else { - Assert.assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason); - } + Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(expectedError).reason); } private Response updateDefaultFunction() throws IOException { - SinkSpec sinkSpec = SinkSpec.newBuilder() - .setTopic(outputTopic) - .setSerDeClassName(outputSerdeClassName).build(); - FunctionDetails functionDetails = FunctionDetails.newBuilder() - .setTenant(tenant).setNamespace(namespace).setName(function) - .setSink(sinkSpec) - .setClassName(className) - .setParallelism(parallelism) - .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType) - .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build(); + FunctionConfig functionConfig = new FunctionConfig(); + functionConfig.setTenant(tenant); + functionConfig.setNamespace(namespace); + functionConfig.setName(function); + functionConfig.setClassName(className); + functionConfig.setParallelism(parallelism); + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + functionConfig.setCustomSerdeInputs(topicsToSerDeClassName); + functionConfig.setOutput(outputTopic); + functionConfig.setOutputSerdeClassName(outputSerdeClassName); + return resource.updateFunction( tenant, namespace, @@ -622,7 +671,8 @@ private Response updateDefaultFunction() throws IOException { mockedInputStream, mockedFormData, null, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), + null, + new Gson().toJson(functionConfig), null); } @@ -679,16 +729,16 @@ public void testUpdateFunctionWithUrl() throws IOException { String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath(); String filePackageUrl = "file://" + fileLocation; - SinkSpec sinkSpec = SinkSpec.newBuilder() - .setTopic(outputTopic) - .setSerDeClassName(outputSerdeClassName).build(); - FunctionDetails functionDetails = FunctionDetails.newBuilder() - .setTenant(tenant).setNamespace(namespace).setName(function) - .setSink(sinkSpec) - .setClassName(className) - .setParallelism(parallelism) - .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType) - .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build(); + FunctionConfig functionConfig = new FunctionConfig(); + functionConfig.setOutput(outputTopic); + functionConfig.setOutputSerdeClassName(outputSerdeClassName); + functionConfig.setTenant(tenant); + functionConfig.setNamespace(namespace); + functionConfig.setName(function); + functionConfig.setClassName(className); + functionConfig.setParallelism(parallelism); + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + functionConfig.setCustomSerdeInputs(topicsToSerDeClassName); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); RequestResult rr = new RequestResult() @@ -704,7 +754,8 @@ public void testUpdateFunctionWithUrl() throws IOException { null, null, filePackageUrl, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), + null, + new Gson().toJson(functionConfig), null); assertEquals(Status.OK.getStatusCode(), response.getStatus()); @@ -1046,19 +1097,24 @@ public void testRegisterFunctionFileUrlWithValidSinkClass() throws IOException { CompletableFuture requestResult = CompletableFuture.completedFuture(rr); when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); - SinkSpec sinkSpec = SinkSpec.newBuilder().setClassName(TestSink.class.getName()).setTopic(outputTopic) - .setSerDeClassName(outputSerdeClassName).build(); - FunctionDetails functionDetails = FunctionDetails - .newBuilder().setTenant(tenant).setNamespace(namespace).setName(function).setSink(sinkSpec) - .setClassName(className).setParallelism(parallelism).setSource(SourceSpec.newBuilder() - .setSubscriptionType(subscriptionType).putAllTopicsToSerDeClassName(topicsToSerDeClassName)) - .build(); + FunctionConfig functionConfig = new FunctionConfig(); + functionConfig.setTenant(tenant); + functionConfig.setNamespace(namespace); + functionConfig.setName(function); + functionConfig.setClassName(className); + functionConfig.setParallelism(parallelism); + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + functionConfig.setCustomSerdeInputs(topicsToSerDeClassName); + functionConfig.setOutput(outputTopic); + functionConfig.setOutputSerdeClassName(outputSerdeClassName); Response response = resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), null); + null, new Gson().toJson(functionConfig), null); assertEquals(Status.OK.getStatusCode(), response.getStatus()); } + /* + TODO:- Needs to be moved to a source/sink specific unittest @Test public void testRegisterFunctionFileUrlWithInValidSinkClass() throws IOException { Configurator.setRootLevel(Level.DEBUG); @@ -1071,6 +1127,16 @@ public void testRegisterFunctionFileUrlWithInValidSinkClass() throws IOException CompletableFuture requestResult = CompletableFuture.completedFuture(rr); when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); + FunctionConfig functionConfig = new FunctionConfig(); + functionConfig.setTenant(tenant); + functionConfig.setNamespace(namespace); + functionConfig.setName(function); + functionConfig.setClassName(className); + functionConfig.setParallelism(parallelism); + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + functionConfig.setCustomSerdeInputs(topicsToSerDeClassName); + functionConfig.setOutput(outputTopic); + functionConfig.setOutputSerdeClassName(outputSerdeClassName); SinkSpec sinkSpec = SinkSpec.newBuilder().setClassName(className).setTopic(outputTopic) .setSerDeClassName(outputSerdeClassName).build(); FunctionDetails functionDetails = FunctionDetails @@ -1079,8 +1145,9 @@ public void testRegisterFunctionFileUrlWithInValidSinkClass() throws IOException .setSubscriptionType(subscriptionType).putAllTopicsToSerDeClassName(topicsToSerDeClassName)) .build(); Response response = resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), null); + null, new Gson().toJson(functionConfig), null); - assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + assertEquals(response.getStatus(), Status.BAD_REQUEST.getStatusCode()); } + */ }