Skip to content

Commit

Permalink
Function Serverside Validation (#2701)
Browse files Browse the repository at this point in the history
* First cut of the serverside valiation

* Revert unneeded changes

* Do the easier checks first

* Check for null

* Simplify

* Do the actual validation

* Check for null runtime

* Update Test

* Fix build

* Fix build

* misc fixes

* Add explicit check for serde

* Changed for a better condition

* Better serde check

* Fixed some unittests

* Fixed more tests

* Fixed unittest

* Fixed unittests

* Fixed unittest

* Stricter checks
  • Loading branch information
srkukarni committed Oct 4, 2018
1 parent 474645f commit 9ccaf2c
Show file tree
Hide file tree
Showing 19 changed files with 1,014 additions and 807 deletions.
Expand Up @@ -80,10 +80,11 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
final @FormDataParam("functionDetails") String functionDetailsJson) {
final @FormDataParam("functionDetails") String functionDetailsJson,
final @FormDataParam("functionConfig") String functionConfigJson) {

return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionDetailsJson, clientAppId());
functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
}

@PUT
Expand All @@ -101,10 +102,11 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
final @FormDataParam("functionDetails") String functionDetailsJson) {
final @FormDataParam("functionDetails") String functionDetailsJson,
final @FormDataParam("functionConfig") String functionConfigJson) {

return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionDetailsJson, clientAppId());
functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());

}

Expand Down

Large diffs are not rendered by default.

Expand Up @@ -22,7 +22,6 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
import static org.apache.pulsar.functions.utils.Utils.fileExists;
import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl;

Expand All @@ -34,14 +33,11 @@
import com.google.gson.reflect.TypeToken;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -56,20 +52,8 @@
import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.Resources;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.Function.SubscriptionType;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.FunctionConfig.ProcessingGuarantees;
import org.apache.pulsar.functions.utils.SinkConfig;
import org.apache.pulsar.functions.utils.ConsumerConfig;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.utils.*;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.utils.io.Connectors;
import org.apache.pulsar.functions.utils.validation.ConfigValidation;
Expand Down Expand Up @@ -203,9 +187,9 @@ protected class CreateSink extends SinkCommand {
@Override
void runCmd() throws Exception {
if (Utils.isFunctionPackageUrlSupported(archive)) {
admin.functions().createFunctionWithUrl(createSinkConfig(sinkConfig), sinkConfig.getArchive());
admin.functions().createFunctionWithUrl(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive());
} else {
admin.functions().createFunction(createSinkConfig(sinkConfig), sinkConfig.getArchive());
admin.functions().createFunction(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive());
}
print("Created successfully");
}
Expand All @@ -216,9 +200,9 @@ protected class UpdateSink extends SinkCommand {
@Override
void runCmd() throws Exception {
if (Utils.isFunctionPackageUrlSupported(archive)) {
admin.functions().updateFunctionWithUrl(createSinkConfig(sinkConfig), sinkConfig.getArchive());
admin.functions().updateFunctionWithUrl(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive());
} else {
admin.functions().updateFunction(createSinkConfig(sinkConfig), sinkConfig.getArchive());
admin.functions().updateFunction(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive());
}
print("Updated successfully");
}
Expand Down Expand Up @@ -413,6 +397,9 @@ void processArguments() throws Exception {
}

inferMissingArguments(sinkConfig);

// check if configs are valid
validateSinkConfigs(sinkConfig);
}

protected Map<String, Object> parseConfigs(String str) {
Expand Down Expand Up @@ -462,6 +449,7 @@ protected void validateSinkConfigs(SinkConfig sinkConfig) {
}

// if jar file is present locally then load jar and validate SinkClass in it
ClassLoader classLoader = null;
if (archivePath != null) {
if (!fileExists(archivePath)) {
throw new ParameterException("Archive file " + archivePath + " does not exist");
Expand All @@ -470,17 +458,20 @@ protected void validateSinkConfigs(SinkConfig sinkConfig) {
try {
ConnectorDefinition connector = ConnectorUtils.getConnectorDefinition(archivePath);
log.info("Connector: {}", connector);

// Validate sink class
ConnectorUtils.getIOSinkClass(archivePath);
} catch (IOException e) {
throw new ParameterException("Connector from " + archivePath + " has error: " + e.getMessage());
}

try {
classLoader = NarClassLoader.getFromArchive(new File(archivePath), Collections.emptySet());
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}

try {
// Need to load jar and set context class loader before calling
ConfigValidation.validateConfig(sinkConfig, FunctionConfig.Runtime.JAVA.name());
ConfigValidation.validateConfig(sinkConfig, FunctionConfig.Runtime.JAVA.name(), classLoader);
} catch (Exception e) {
throw new ParameterException(e.getMessage());
}
Expand All @@ -491,152 +482,7 @@ protected org.apache.pulsar.functions.proto.Function.FunctionDetails createSinkC
throws IOException {
org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder
= org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
Utils.mergeJson(FunctionsImpl.printJson(createSinkConfig(sinkConfig)), functionDetailsBuilder);
return functionDetailsBuilder.build();
}

protected FunctionDetails createSinkConfig(SinkConfig sinkConfig) throws IOException {

// check if configs are valid
validateSinkConfigs(sinkConfig);

String sinkClassName = null;
String typeArg = null;

FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();

boolean isBuiltin = sinkConfig.getArchive().startsWith(Utils.BUILTIN);

if (!isBuiltin) {
if (sinkConfig.getArchive().startsWith(Utils.FILE)) {
if (isBlank(sinkConfig.getClassName())) {
throw new ParameterException("Class-name must be present for archive with file-url");
}
sinkClassName = sinkConfig.getClassName(); // server derives the arg-type by loading a class
} else {
sinkClassName = ConnectorUtils.getIOSinkClass(sinkConfig.getArchive());
try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sinkConfig.getArchive()),
Collections.emptySet())) {
typeArg = Utils.getSinkType(sinkClassName, ncl).getName();
}
}
}

if (sinkConfig.getTenant() != null) {
functionDetailsBuilder.setTenant(sinkConfig.getTenant());
}
if (sinkConfig.getNamespace() != null) {
functionDetailsBuilder.setNamespace(sinkConfig.getNamespace());
}
if (sinkConfig.getName() != null) {
functionDetailsBuilder.setName(sinkConfig.getName());
}
functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
functionDetailsBuilder.setParallelism(sinkConfig.getParallelism());
functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
if (sinkConfig.getProcessingGuarantees() != null) {
functionDetailsBuilder.setProcessingGuarantees(
convertProcessingGuarantee(sinkConfig.getProcessingGuarantees()));
}

// set source spec
// source spec classname should be empty so that the default pulsar source will be used
SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
if (sinkConfig.getInputs() != null) {
sinkConfig.getInputs().forEach(topicName ->
sourceSpecBuilder.putInputSpecs(topicName,
ConsumerSpec.newBuilder()
.setIsRegexPattern(false)
.build()));
}
if (!StringUtils.isEmpty(sinkConfig.getTopicsPattern())) {
sourceSpecBuilder.putInputSpecs(sinkConfig.getTopicsPattern(),
ConsumerSpec.newBuilder()
.setIsRegexPattern(true)
.build());
}
if (sinkConfig.getTopicToSerdeClassName() != null) {
sinkConfig.getTopicToSerdeClassName().forEach((topicName, serde) -> {
sourceSpecBuilder.putInputSpecs(topicName,
ConsumerSpec.newBuilder()
.setSerdeClassName(serde == null ? "" : serde)
.setIsRegexPattern(false)
.build());
});
}
if (sinkConfig.getTopicToSchemaType() != null) {
sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> {
sourceSpecBuilder.putInputSpecs(topicName,
ConsumerSpec.newBuilder()
.setSchemaType(schemaType == null ? "" : schemaType)
.setIsRegexPattern(false)
.build());
});
}
if (sinkConfig.getInputSpecs() != null) {
sinkConfig.getInputSpecs().forEach((topic, spec) -> {
sourceSpecBuilder.putInputSpecs(topic,
ConsumerSpec.newBuilder()
.setSerdeClassName(spec.getSerdeClassName() != null ? spec.getSerdeClassName() : "")
.setSchemaType(spec.getSchemaType() != null ? spec.getSchemaType() : "")
.setIsRegexPattern(spec.isRegexPattern())
.build());
});
}

if (typeArg != null) {
sourceSpecBuilder.setTypeClassName(typeArg);
}
if (isNotBlank(sinkConfig.getSourceSubscriptionName())) {
sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
}

SubscriptionType subType = (sinkConfig.isRetainOrdering()
|| ProcessingGuarantees.EFFECTIVELY_ONCE.equals(sinkConfig.getProcessingGuarantees()))
? SubscriptionType.FAILOVER
: SubscriptionType.SHARED;
sourceSpecBuilder.setSubscriptionType(subType);

functionDetailsBuilder.setAutoAck(sinkConfig.isAutoAck());
if (sinkConfig.getTimeoutMs() != null) {
sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs());
}

functionDetailsBuilder.setSource(sourceSpecBuilder);

// set up sink spec
SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
if (sinkClassName != null) {
sinkSpecBuilder.setClassName(sinkClassName);
}

if (isBuiltin) {
String builtin = sinkConfig.getArchive().replaceFirst("^builtin://", "");
sinkSpecBuilder.setBuiltin(builtin);
}

if (sinkConfig.getConfigs() != null) {
sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfig.getConfigs()));
}
if (typeArg != null) {
sinkSpecBuilder.setTypeClassName(typeArg);
}
functionDetailsBuilder.setSink(sinkSpecBuilder);

if (sinkConfig.getResources() != null) {
Resources.Builder bldr = Resources.newBuilder();
if (sinkConfig.getResources().getCpu() != null) {
bldr.setCpu(sinkConfig.getResources().getCpu());
}
if (sinkConfig.getResources().getRam() != null) {
bldr.setRam(sinkConfig.getResources().getRam());
}
if (sinkConfig.getResources().getDisk() != null) {
bldr.setDisk(sinkConfig.getResources().getDisk());
}
functionDetailsBuilder.setResources(bldr.build());
}
Utils.mergeJson(FunctionsImpl.printJson(SinkConfigUtils.convert(sinkConfig)), functionDetailsBuilder);
return functionDetailsBuilder.build();
}

Expand Down

0 comments on commit 9ccaf2c

Please sign in to comment.