Skip to content

Commit

Permalink
fix for python config validation (#1874)
Browse files Browse the repository at this point in the history
* fix for python config validation

* removing unnecessary file
  • Loading branch information
jerrypeng committed May 31, 2018
1 parent 886022a commit a1ffa7f
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 26 deletions.
Expand Up @@ -385,7 +385,7 @@ private void validateFunctionConfigs(FunctionConfig functionConfig) {

try {
// Need to load jar and set context class loader before calling
ConfigValidation.validateConfig(functionConfig);
ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name());
} catch (Exception e) {
throw new ParameterException(e.getMessage());
}
Expand Down
Expand Up @@ -25,6 +25,7 @@
import lombok.ToString;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.utils.validation.ConfigValidation;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClasses;
Expand Down Expand Up @@ -86,7 +87,8 @@ public enum Runtime {
@isListEntryCustom(entryValidatorClasses = {ValidatorImpls.TopicNameValidator.class})
private Collection<String> inputs;
@isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class },
valueValidatorClasses = { ValidatorImpls.SerdeValidator.class })
valueValidatorClasses = { ValidatorImpls.SerdeValidator.class }, targetRuntime = ConfigValidation.Runtime.JAVA)
@isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class }, targetRuntime = ConfigValidation.Runtime.PYTHON)
private Map<String, String> customSerdeInputs;
@isValidTopicName
private String output;
Expand Down
Expand Up @@ -19,40 +19,76 @@
package org.apache.pulsar.functions.utils.validation;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.utils.FunctionConfig;

import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import static org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.ValidatorParams.ACTUAL_RUNTIME;
import static org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.ValidatorParams.TARGET_RUNTIME;
import static org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS;

@Slf4j
public class ConfigValidation {

public static void validateConfig(Object config) {
public enum Runtime {
ALL,
JAVA,
PYTHON
}

public static void validateConfig(Object config, String runtimeType) {
for (Field field : config.getClass().getDeclaredFields()) {
Object value = null;
Object value;
field.setAccessible(true);
try {
value = field.get(config);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
validateField(field, value);
validateField(field, value, Runtime.valueOf(runtimeType));
}
validateClass(config);
validateClass(config, Runtime.valueOf(runtimeType));
}

private static void validateClass(Object config) {
processAnnotations(config.getClass().getAnnotations(), config.getClass().getName(), config);
private static void validateClass(Object config, Runtime runtime) {

List<Annotation> annotationList = new LinkedList<>();
Class<?>[] classes = ConfigValidationAnnotations.class.getDeclaredClasses();
for (Class clazz : classes) {
try {
Annotation[] anots = config.getClass().getAnnotationsByType(clazz);
annotationList.addAll(Arrays.asList(anots));
} catch (ClassCastException e) {

}
}
processAnnotations(annotationList, config.getClass().getName(), config, runtime);
}

private static void validateField(Field field, Object value) {
processAnnotations(field.getAnnotations(), field.getName(), value);
private static void validateField(Field field, Object value, Runtime runtime) {
List<Annotation> annotationList = new LinkedList<>();
Class<?>[] classes = ConfigValidationAnnotations.class.getDeclaredClasses();
for (Class clazz : classes) {
try {
Annotation[] anots = field.getAnnotationsByType(clazz);
annotationList.addAll(Arrays.asList(anots));
} catch (ClassCastException e) {

}
}
processAnnotations(annotationList, field.getName(), value, runtime);
}

private static void processAnnotations(Annotation[] annotations, String fieldName, Object value) {
private static void processAnnotations( List<Annotation> annotations, String fieldName, Object value,
Runtime runtime) {
try {
for (Annotation annotation : annotations) {

Expand All @@ -68,27 +104,47 @@ private static void processAnnotations(Annotation[] annotations, String fieldNam
}
if (validatorClass != null) {
Object v = validatorClass.cast(annotation);
@SuppressWarnings("unchecked")
Class<Validator> clazz = (Class<Validator>) validatorClass
.getMethod(ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS).invoke(v);
Validator o = null;
Map<String, Object> params = getParamsFromAnnotation(validatorClass, v);
//two constructor signatures used to initialize validators.
//One constructor takes input a Map of arguments, the other doesn't take any arguments (default constructor)
//If validator has a constructor that takes a Map as an argument call that constructor
if (hasConstructor(clazz, Map.class)) {
o = clazz.getConstructor(Map.class).newInstance(params);
} else { //If not call default constructor
o = clazz.newInstance();
if (hasMethod(validatorClass, VALIDATOR_CLASS)) {

@SuppressWarnings("unchecked")
Class<Validator> clazz = (Class<Validator>) validatorClass
.getMethod(ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS).invoke(v);
Validator o = null;
Map<String, Object> params = getParamsFromAnnotation(validatorClass, v);

if (params.containsKey(TARGET_RUNTIME)
&& params.get(TARGET_RUNTIME) != Runtime.ALL
&& params.get(TARGET_RUNTIME) != runtime) {
continue;
}
params.put(ACTUAL_RUNTIME, runtime);
//two constructor signatures used to initialize validators.
//One constructor takes input a Map of arguments, the other doesn't take any arguments (default constructor)

//If validator has a constructor that takes a Map as an argument call that constructor
if (hasConstructor(clazz, Map.class)) {
o = clazz.getConstructor(Map.class).newInstance(params);
} else { //If not call default constructor
o = clazz.newInstance();
}
o.validateField(fieldName, value);
}
o.validateField(fieldName, value);
}
}
} catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

private static boolean hasMethod(Class<?> clazz, String method) {
try {
clazz.getMethod(method);
return true;
} catch (NoSuchMethodException e) {
return false;
}
}

private static Map<String, Object> getParamsFromAnnotation(Class<?> validatorClass, Object v)
throws InvocationTargetException, IllegalAccessException {
Map<String, Object> params = new HashMap<String, Object>();
Expand Down
Expand Up @@ -18,7 +18,10 @@
*/
package org.apache.pulsar.functions.utils.validation;

import org.apache.pulsar.functions.utils.FunctionConfig;

import java.lang.annotation.ElementType;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
Expand All @@ -32,6 +35,8 @@ public class ConfigValidationAnnotations {
@Target(ElementType.FIELD)
public @interface NotNull {
Class<?> validatorClass() default ValidatorImpls.NotNullValidator.class;

ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
}

/**
Expand All @@ -43,6 +48,8 @@ public class ConfigValidationAnnotations {
Class<?> validatorClass() default ValidatorImpls.PositiveNumberValidator.class;

boolean includeZero() default false;

ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
}


Expand All @@ -54,6 +61,8 @@ public class ConfigValidationAnnotations {
public @interface isValidResources {

Class<?> validatorClass() default ValidatorImpls.ResourcesValidator.class;

ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
}

/**
Expand All @@ -65,6 +74,8 @@ public class ConfigValidationAnnotations {
Class<?> validatorClass() default ValidatorImpls.ListEntryTypeValidator.class;

Class<?> type();

ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
}

@Retention(RetentionPolicy.RUNTIME)
Expand All @@ -73,6 +84,8 @@ public class ConfigValidationAnnotations {
Class<?> validatorClass() default ValidatorImpls.ListEntryTypeValidator.class;

Class<?> type() default String.class;

ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
}

/**
Expand All @@ -84,6 +97,8 @@ public class ConfigValidationAnnotations {
Class<?> validatorClass() default ValidatorImpls.ListEntryCustomValidator.class;

Class<?>[] entryValidatorClasses();

ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
}


Expand All @@ -98,6 +113,8 @@ public class ConfigValidationAnnotations {
Class<?> keyType();

Class<?> valueType();

ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
}

/**
Expand All @@ -109,6 +126,8 @@ public class ConfigValidationAnnotations {
Class<?> validatorClass() default ValidatorImpls.ImplementsClassValidator.class;

Class<?> implementsClass();

ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.JAVA;
}

/**
Expand All @@ -120,20 +139,31 @@ public class ConfigValidationAnnotations {
Class<?> validatorClass() default ValidatorImpls.ImplementsClassesValidator.class;

Class<?>[] implementsClasses();

ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.JAVA;
}

/**
* Validates a each key and value in a Map with a list of validators Validator with fields: validatorClass, keyValidatorClasses,
* valueValidatorClasses
*/
@Repeatable(isMapEntryCustoms.class)
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isMapEntryCustom {
Class<?> validatorClass() default ValidatorImpls.MapEntryCustomValidator.class;

Class<?>[] keyValidatorClasses();
Class<?>[] keyValidatorClasses() default {};

Class<?>[] valueValidatorClasses() default {};

ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
}

Class<?>[] valueValidatorClasses();
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isMapEntryCustoms {
isMapEntryCustom[] value();
}

/**
Expand All @@ -143,6 +173,8 @@ public class ConfigValidationAnnotations {
@Target(ElementType.FIELD)
public @interface isValidTopicName {
Class<?> validatorClass() default ValidatorImpls.TopicNameValidator.class;

ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
}

/**
Expand All @@ -152,6 +184,8 @@ public class ConfigValidationAnnotations {
@Target(ElementType.FIELD)
public @interface isValidWindowConfig {
Class<?> validatorClass() default ValidatorImpls.WindowConfigValidator.class;

ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
}

/**
Expand Down Expand Up @@ -179,5 +213,7 @@ public static class ValidatorParams {
static final String ACCEPTED_VALUES = "acceptedValues";
static final String IMPLEMENTS_CLASS = "implementsClass";
static final String IMPLEMENTS_CLASSES = "implementsClasses";
static final String ACTUAL_RUNTIME = "actualRuntime";
static final String TARGET_RUNTIME = "targetRuntime";
}
}

0 comments on commit a1ffa7f

Please sign in to comment.