Skip to content

Commit

Permalink
Add more type checks for Java submissions (#189)
Browse files Browse the repository at this point in the history
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 79a3b78 commit 555a08b
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 15 deletions.
Expand Up @@ -44,7 +44,6 @@ public class DefaultSerDe implements SerDe<Object> {

public DefaultSerDe(Class type) {
this.type = type;
verifySupportedType();
}

@Override
Expand Down Expand Up @@ -90,9 +89,7 @@ public byte[] serialize(Object input) {
}
}

public void verifySupportedType() {
if (!supportedInputTypes.contains(type)) {
throw new RuntimeException("Non Basic types not yet supported: " + type);
}
public static boolean IsSupportedType(Class typ) {
return supportedInputTypes.contains(typ);
}
}
Expand Up @@ -29,10 +29,12 @@
import com.google.protobuf.util.JsonFormat;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarFunctionsAdmin;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.functions.api.PulsarFunction;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.runtime.container.ThreadFunctionContainerFactory;
import org.apache.pulsar.functions.api.SerDe;
Expand Down Expand Up @@ -210,15 +212,23 @@ void processArguments() throws Exception {
}

private void doJavaSubmitChecks(FunctionConfig.Builder functionConfigBuilder) {
File file = new File(jarFile);
// check if the function class exists in Jar and it implements PulsarFunction class
if (!Reflections.classExistsInJar(new File(jarFile), functionConfigBuilder.getClassName())) {
if (!Reflections.classExistsInJar(file, functionConfigBuilder.getClassName())) {
throw new IllegalArgumentException(String.format("Pulsar function class %s does not exist in jar %s",
functionConfigBuilder.getClassName(), jarFile));
} else if (!Reflections.classInJarImplementsIface(new File(jarFile), functionConfigBuilder.getClassName(), PulsarFunction.class)) {
} else if (!Reflections.classInJarImplementsIface(file, functionConfigBuilder.getClassName(), PulsarFunction.class)) {
throw new IllegalArgumentException(String.format("Pulsar function class %s in jar %s does not implemement PulsarFunction.class",
functionConfigBuilder.getClassName(), jarFile));
}

PulsarFunction pulsarFunction = (PulsarFunction) Reflections.createInstance(functionConfigBuilder.getClassName(), file);
if (pulsarFunction == null) {
throw new IllegalArgumentException(String.format("Pulsar function class %s could not be instantiated from jar %s",
functionConfigBuilder.getClassName(), jarFile));
}
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());

// Check if the Input serialization/deserialization class exists in jar or already loaded and that it
// implements SerDe class
functionConfigBuilder.getCustomSerdeInputsMap().forEach((topicName, inputSerializer) -> {
Expand All @@ -238,7 +248,43 @@ private void doJavaSubmitChecks(FunctionConfig.Builder functionConfigBuilder) {
inputSerializer, SerDe.class.getCanonicalName()));
}
}
if (inputSerializer.equals(DefaultSerDe.class.getName())) {
if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
throw new RuntimeException("Default Serializer does not support type " + typeArgs[0]);
}
} else {
SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, file);
if (serDe == null) {
throw new IllegalArgumentException(String.format("SerDe class %s does not exist in jar %s",
inputSerializer, jarFile));
}
Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
if (!serDeTypes[0].isAssignableFrom(typeArgs[0])) {
throw new RuntimeException("Serializer type mismatch " + typeArgs[0] + " vs " + serDeTypes[0]);
}
}
});
functionConfigBuilder.getInputsList().forEach((topicName) -> {
if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
throw new RuntimeException("Default Serializer does not support type " + typeArgs[0]);
}
});
if (functionConfigBuilder.getOutputSerdeClassName() == null
|| functionConfigBuilder.getOutputSerdeClassName().isEmpty()) {
if (!DefaultSerDe.IsSupportedType(typeArgs[1])) {
throw new RuntimeException("Default Serializer does not support type " + typeArgs[1]);
}
} else {
SerDe serDe = (SerDe) Reflections.createInstance(functionConfigBuilder.getOutputSerdeClassName(), file);
if (serDe == null) {
throw new IllegalArgumentException(String.format("SerDe class %s does not exist in jar %s",
functionConfigBuilder.getOutputSerdeClassName(), jarFile));
}
Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
if (!serDeTypes[0].isAssignableFrom(typeArgs[1])) {
throw new RuntimeException("Serializer type mismatch " + typeArgs[1] + " vs " + serDeTypes[0]);
}
}
}

private void inferMissingArguments(FunctionConfig.Builder builder) {
Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.PulsarFunctionsAdmin;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.PulsarFunction;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
Expand Down Expand Up @@ -70,6 +71,13 @@ public IObjectFactory getObjectFactory() {
private Functions functions;
private CmdFunctions cmd;

public class DummyFunction implements PulsarFunction<String, String> {
@Override
public String process(String input, Context context) throws Exception {
return null;
}
}

@BeforeMethod
public void setup() throws Exception {
this.admin = mock(PulsarFunctionsAdmin.class);
Expand All @@ -86,6 +94,8 @@ public void setup() throws Exception {
when(Reflections.classInJarImplementsIface(any(File.class), anyString(), eq(PulsarFunction.class)))
.thenReturn(true);
when(Reflections.classImplementsIface(anyString(), any())).thenReturn(true);
when(Reflections.createInstance(eq(DummyFunction.class.getName()), any(File.class))).thenReturn(new DummyFunction());
when(Reflections.createInstance(eq(DefaultSerDe.class.getName()), any(File.class))).thenReturn(new DefaultSerDe(String.class));
}

@Test
Expand Down Expand Up @@ -152,7 +162,7 @@ public void testCreateFunction() throws Exception {
"--jar", "SomeJar.jar",
"--tenant", "sample",
"--namespace", "ns1",
"--function-classname", "MyClass",
"--function-classname", DummyFunction.class.getName(),
});

CreateFunction creater = cmd.getCreater();
Expand All @@ -178,7 +188,7 @@ public void testCreateWithoutTenant() throws Exception {
"--output-serde-classname", DefaultSerDe.class.getName(),
"--jar", "SomeJar.jar",
"--namespace", "ns1",
"--function-classname", "MyClass",
"--function-classname", DummyFunction.class.getName(),
});

CreateFunction creater = cmd.getCreater();
Expand All @@ -199,7 +209,7 @@ public void testCreateWithoutNamespace() throws Exception {
"--custom-serde-classnames", DefaultSerDe.class.getName(),
"--output-serde-classname", DefaultSerDe.class.getName(),
"--jar", "SomeJar.jar",
"--function-classname", "MyClass",
"--function-classname", DummyFunction.class.getName(),
});

CreateFunction creater = cmd.getCreater();
Expand All @@ -221,11 +231,11 @@ public void testCreateWithoutFunctionName() throws Exception {
"--jar", "SomeJar.jar",
"--tenant", "sample",
"--namespace", "ns1",
"--function-classname", "MyClass",
"--function-classname", DummyFunction.class.getName(),
});

CreateFunction creater = cmd.getCreater();
assertEquals("MyClass", creater.getFunctionConfig().getName());
assertEquals("CmdFunctionsTest$DummyFunction", creater.getFunctionConfig().getName());
verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
}

Expand All @@ -240,11 +250,11 @@ public void testCreateWithoutSinkTopic() throws Exception {
"--jar", "SomeJar.jar",
"--tenant", "sample",
"--namespace", "ns1",
"--function-classname", "MyClass",
"--function-classname", DummyFunction.class.getName(),
});

CreateFunction creater = cmd.getCreater();
assertEquals(sourceTopicName + "-MyClass-output", creater.getFunctionConfig().getSinkTopic());
assertEquals(sourceTopicName + "-" + "CmdFunctionsTest$DummyFunction" + "-output", creater.getFunctionConfig().getSinkTopic());
verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
}

Expand Down Expand Up @@ -308,7 +318,7 @@ public void testUpdateFunction() throws Exception {
"--jar", "SomeJar.jar",
"--tenant", "sample",
"--namespace", "ns1",
"--function-classname", "MyClass",
"--function-classname", DummyFunction.class.getName(),
});

UpdateFunction updater = cmd.getUpdater();
Expand Down
Expand Up @@ -483,8 +483,14 @@ private static SerDe initializeSerDe(String serdeClassName, ClassLoader clsLoade
private static SerDe initializeDefaultSerDe(PulsarFunction pulsarFunction, boolean inputArgs) {
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
if (inputArgs) {
if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
throw new RuntimeException("Default Serializer does not support " + typeArgs[0]);
}
return new DefaultSerDe(typeArgs[0]);
} else {
if (!DefaultSerDe.IsSupportedType(typeArgs[1])) {
throw new RuntimeException("Default Serializer does not support " + typeArgs[1]);
}
return new DefaultSerDe(typeArgs[1]);
}
}
Expand Down
Expand Up @@ -116,6 +116,14 @@ public static Object createInstance(String userClassName,

}

public static Object createInstance(String userClassName, java.io.File jar) {
try {
return createInstance(userClassName, loadJar(jar));
} catch (Exception ex) {
return null;
}
}

/**
* Load a jar
* @param jar file of jar
Expand Down

0 comments on commit 555a08b

Please sign in to comment.