Skip to content

Commit

Permalink
Added ability to add annotations to Connector Configs (apache#6983)
Browse files Browse the repository at this point in the history
* Added sourceConfigClass and sinkConfigClass

* Add Validator annotation helpers to validate class parameters

* Fix build errors

* Take feedback into account

* Connected with validation

* Fix bugs

* Added tests

* Fix class name

* Address feedback

Co-authored-by: Sanjeev Kulkarni <sanjeevk@splunk.com>
  • Loading branch information
2 people authored and Huanli-Meng committed Jun 1, 2020
1 parent b4a8876 commit f9742c7
Show file tree
Hide file tree
Showing 29 changed files with 229 additions and 21 deletions.
Expand Up @@ -51,4 +51,26 @@ public class ConnectorDefinition {
* <p>If not defined, it will be assumed this connector cannot act as a data sink.
*/
private String sinkClass;

/**
* The class name for the source config implementation.
* Most of the sources are using a config class for managing their config
* and directly convert the supplied Map object at open to this object.
* These connector can declare their config class in this variable that will allow
* the framework to check for config parameter checking at submission time.
*
* <p>If not defined, the framework will not be able to do any submission time checks.
*/
private String sourceConfigClass;

/**
* The class name for the sink config implementation.
* Most of the sink are using a config class for managing their config
* and directly convert the supplied Map object at open to this object.
* These connector can declare their config class in this variable that will allow
* the framework to check for config parameter checking at submission time.
*
* <p>If not defined, the framework will not be able to do any submission time checks.
*/
private String sinkConfigClass;
}
Expand Up @@ -257,14 +257,14 @@ public void start(boolean blocking) throws Exception {

if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory));
functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory, true));

} else {
File file = new File(userCodeFile);
if (!file.exists()) {
throw new RuntimeException("Source archive (" + userCodeFile + ") does not exist");
}
functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory));
functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory, true));
}
} else if (sinkConfig != null) {
inferMissingArguments(sinkConfig);
Expand All @@ -285,13 +285,13 @@ public void start(boolean blocking) throws Exception {

if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory));
functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory, true));
} else {
File file = new File(userCodeFile);
if (!file.exists()) {
throw new RuntimeException("Sink archive does not exist");
}
functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory));
functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory, true));
}
} else {
throw new IllegalArgumentException("Must specify Function, Source or Sink config");
Expand Down
Expand Up @@ -140,6 +140,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "The directory where nar packages are extractors"
)
private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "Should we validate connector config during submission"
)
private Boolean validateConnectorConfig = false;
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The pulsar topic used for storing function metadata"
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.apache.pulsar.functions.utils;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import lombok.AllArgsConstructor;
Expand All @@ -31,9 +32,13 @@
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.validator.ConfigValidation;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
Expand Down Expand Up @@ -302,7 +307,8 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
}

public static ExtractedSinkDetails validate(SinkConfig sinkConfig, Path archivePath,
File sinkPackageFile, String narExtractionDirectory) {
File sinkPackageFile, String narExtractionDirectory,
boolean validateConnectorConfig) {
if (isEmpty(sinkConfig.getTenant())) {
throw new IllegalArgumentException("Sink tenant cannot be null");
}
Expand Down Expand Up @@ -374,6 +380,9 @@ public static ExtractedSinkDetails validate(SinkConfig sinkConfig, Path archiveP
} catch (IOException e) {
throw new IllegalArgumentException("Failed to extract Sink class from archive", e);
}
if (validateConnectorConfig) {
validateConnectorConfig(sinkConfig, (NarClassLoader) narClassLoader);
}
try {
typeArg = getSinkType(sinkClassName, narClassLoader);
classLoader = narClassLoader;
Expand All @@ -398,12 +407,18 @@ public static ExtractedSinkDetails validate(SinkConfig sinkConfig, Path archiveP
throw new IllegalArgumentException(
String.format("Sink class %s must be in class path", sinkClassName), e1);
}
if (validateConnectorConfig) {
validateConnectorConfig(sinkConfig, (NarClassLoader) narClassLoader);
}
} else {
throw new IllegalArgumentException(
String.format("Sink class %s must be in class path", sinkClassName), e);
}
}
} else if (narClassLoader != null) {
if (validateConnectorConfig) {
validateConnectorConfig(sinkConfig, (NarClassLoader) narClassLoader);
}
try {
typeArg = getSinkType(sinkClassName, narClassLoader);
classLoader = narClassLoader;
Expand Down Expand Up @@ -580,4 +595,24 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne
}
return mergedConfig;
}

public static void validateConnectorConfig(SinkConfig sinkConfig, ClassLoader classLoader) {
try {
ConnectorDefinition defn = ConnectorUtils.getConnectorDefinition(classLoader);
if (defn.getSinkConfigClass() != null) {
Class configClass = Class.forName(defn.getSinkConfigClass(), true, classLoader);
Object configObject =
ObjectMapperFactory.getThreadLocal().convertValue(sinkConfig.getConfigs(), configClass);
if (configObject != null) {
ConfigValidation.validateConfig(configObject);
}
}
} catch (IOException e) {
throw new IllegalArgumentException("Error validating sink config", e);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Could not find sink config class", e);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Could not validate sink config: " + e.getMessage());
}
}
}
Expand Up @@ -20,6 +20,7 @@
package org.apache.pulsar.functions.utils;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import lombok.AllArgsConstructor;
Expand All @@ -28,10 +29,12 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.validator.ConfigValidation;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
Expand Down Expand Up @@ -209,7 +212,8 @@ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) {
}

public static ExtractedSourceDetails validate(SourceConfig sourceConfig, Path archivePath,
File sourcePackageFile, String narExtractionDirectory) {
File sourcePackageFile, String narExtractionDirectory,
boolean validateConnectorConfig) {
if (isEmpty(sourceConfig.getTenant())) {
throw new IllegalArgumentException("Source tenant cannot be null");
}
Expand Down Expand Up @@ -268,6 +272,9 @@ public static ExtractedSourceDetails validate(SourceConfig sourceConfig, Path ar
} catch (IOException e) {
throw new IllegalArgumentException("Failed to extract source class from archive", e);
}
if (validateConnectorConfig) {
validateConnectorConfig(sourceConfig, (NarClassLoader) narClassLoader);
}
try {
typeArg = getSourceType(sourceClassName, narClassLoader);
classLoader = narClassLoader;
Expand All @@ -292,12 +299,18 @@ public static ExtractedSourceDetails validate(SourceConfig sourceConfig, Path ar
throw new IllegalArgumentException(
String.format("Source class %s must be in class path", sourceClassName), e1);
}
if (validateConnectorConfig) {
validateConnectorConfig(sourceConfig, (NarClassLoader) narClassLoader);
}
} else {
throw new IllegalArgumentException(
String.format("Source class %s must be in class path", sourceClassName), e);
}
}
} else if (narClassLoader != null) {
if (validateConnectorConfig) {
validateConnectorConfig(sourceConfig, (NarClassLoader) narClassLoader);
}
try {
typeArg = getSourceType(sourceClassName, narClassLoader);
classLoader = narClassLoader;
Expand Down Expand Up @@ -386,4 +399,23 @@ public static SourceConfig validateUpdate(SourceConfig existingConfig, SourceCon
return mergedConfig;
}

public static void validateConnectorConfig(SourceConfig sourceConfig, ClassLoader classLoader) {
try {
ConnectorDefinition defn = ConnectorUtils.getConnectorDefinition(classLoader);
if (defn.getSourceConfigClass() != null) {
Class configClass = Class.forName(defn.getSourceConfigClass(), true, classLoader);
Object configObject = ObjectMapperFactory.getThreadLocal().convertValue(sourceConfig.getConfigs(), configClass);
if (configObject != null) {
ConfigValidation.validateConfig(configObject);
}
}
} catch (IOException e) {
throw new IllegalArgumentException("Error validating source config", e);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Could not find source config class");
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Could not validate source config: " + e.getMessage());
}
}

}
Expand Up @@ -74,11 +74,8 @@ public static String getIOSourceClass(NarClassLoader ncl) throws IOException {
* Extract the Pulsar IO Sink class from a connector archive.
*/
public static String getIOSinkClass(ClassLoader classLoader) throws IOException {
ConnectorDefinition conf = getConnectorDefinition(classLoader);
NarClassLoader ncl = (NarClassLoader) classLoader;
String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);

ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
ConnectorDefinition.class);
if (StringUtils.isEmpty(conf.getSinkClass())) {
throw new IOException(
String.format("The '%s' connector does not provide a sink implementation", conf.getName()));
Expand All @@ -100,12 +97,17 @@ public static String getIOSinkClass(ClassLoader classLoader) throws IOException

public static ConnectorDefinition getConnectorDefinition(String narPath, String narExtractionDirectory) throws IOException {
try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) {
String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);

return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, ConnectorDefinition.class);
return getConnectorDefinition(ncl);
}
}

public static ConnectorDefinition getConnectorDefinition(ClassLoader classLoader) throws IOException {
NarClassLoader narClassLoader = (NarClassLoader) classLoader;
String configStr = narClassLoader.getServiceDefinition(PULSAR_IO_SERVICE_NAME);

return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, ConnectorDefinition.class);
}

public static Connectors searchForConnectors(String connectorsDirectory, String narExtractionDirectory) throws IOException {
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
Expand Down
Expand Up @@ -19,12 +19,22 @@
package org.apache.pulsar.functions.utils;

import com.google.gson.Gson;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.validator.ConfigValidationAnnotations;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.testng.PowerMockTestCase;
import org.testng.annotations.Test;

import java.io.IOException;
Expand All @@ -33,12 +43,26 @@
import java.util.Map;

import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
import static org.testng.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.testng.Assert.*;

/**
* Unit test of {@link Reflections}.
*/
public class SinkConfigUtilsTest {
@PrepareForTest(ConnectorUtils.class)
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "javax.xml.*", "org.xml.*", "org.w3c.dom.*", "org.springframework.context.*", "org.apache.log4j.*", "com.sun.org.apache.xerces.*", "javax.management.*" })
public class SinkConfigUtilsTest extends PowerMockTestCase {

private ConnectorDefinition defn;

@Data
@Accessors(chain = true)
@NoArgsConstructor
public static class TestSinkConfig {
@ConfigValidationAnnotations.NotNull
private String configParameter;
}

@Test
public void testConvertBackFidelity() throws IOException {
Expand Down Expand Up @@ -279,6 +303,25 @@ public void testMergeRuntimeFlags() {
);
}

@Test
public void testValidateConfig() throws IOException {
mockStatic(ConnectorUtils.class);
defn = new ConnectorDefinition();
defn.setSinkConfigClass(TestSinkConfig.class.getName());
PowerMockito.when(ConnectorUtils.getConnectorDefinition(any())).thenReturn(defn);

SinkConfig sinkConfig = createSinkConfig();

// Good config
sinkConfig.getConfigs().put("configParameter", "Test");
SinkConfigUtils.validateConnectorConfig(sinkConfig, Thread.currentThread().getContextClassLoader());

// Bad config
sinkConfig.getConfigs().put("configParameter", null);
Exception e = expectThrows(IllegalArgumentException.class, () -> SinkConfigUtils.validateConnectorConfig(sinkConfig, Thread.currentThread().getContextClassLoader()));
assertTrue(e.getMessage().contains("Could not validate sink config: Field 'configParameter' cannot be null!"));
}

private SinkConfig createSinkConfig() {
SinkConfig sinkConfig = new SinkConfig();
sinkConfig.setTenant("test-tenant");
Expand Down

0 comments on commit f9742c7

Please sign in to comment.