Skip to content

Commit

Permalink
Allow two types of input. (#170)
Browse files Browse the repository at this point in the history
1) one with DefaultSerDe
2) one with custom SerDe
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent efce272 commit edbae3c
Show file tree
Hide file tree
Showing 25 changed files with 297 additions and 298 deletions.
Expand Up @@ -110,11 +110,13 @@ abstract class FunctionConfigCommand extends FunctionCommand {
protected String pyFile;
@Parameter(names = "--source-topics", description = "Input Topic Name\n")
protected String sourceTopicNames;
@Parameter(names = "--custom-serde-source-topics", description = "Input Topic Name that have custom deserializers\n")
protected String customSourceTopics;
@Parameter(names = "--sink-topic", description = "Output Topic Name\n")
protected String sinkTopicName;

@Parameter(names = "--input-serde-classnames", description = "Input SerDe\n")
protected String inputSerdeClassNames;
@Parameter(names = "--custom-serde-classnames", description = "Input SerDe for custom serde source topics\n")
protected String customSerdeClassNames;

@Parameter(names = "--output-serde-classname", description = "Output SerDe\n")
protected String outputSerdeClassName;
Expand All @@ -136,14 +138,20 @@ void processArguments() throws Exception {
} else {
functionConfigBuilder = FunctionConfig.newBuilder();
}
if (null != sourceTopicNames && null != inputSerdeClassNames) {
String[] sourceTopicName = sourceTopicNames.split(",");
String[] inputSerdeClassName = inputSerdeClassNames.split(",");
if (null != sourceTopicNames) {
String[] topicNames = sourceTopicNames.split(",");
for (int i = 0; i < topicNames.length; ++i) {
functionConfigBuilder.addInputs(topicNames[i]);
}
}
if (null != customSourceTopics && null != customSerdeClassNames) {
String[] sourceTopicName = customSourceTopics.split(",");
String[] inputSerdeClassName = customSerdeClassNames.split(",");
if (sourceTopicName.length != inputSerdeClassName.length) {
throw new IllegalArgumentException(String.format("SourceTopics and InputSerde should match"));
throw new IllegalArgumentException(String.format("CustomSerde Topics and InputSerde should match"));
}
for (int i = 0; i < sourceTopicName.length; ++i) {
functionConfigBuilder.putInputs(sourceTopicName[i], inputSerdeClassName[i]);
functionConfigBuilder.putCustomSerdeInputs(sourceTopicName[i], inputSerdeClassName[i]);
}
}
if (null != sinkTopicName) {
Expand Down Expand Up @@ -193,7 +201,7 @@ private void doJavaSubmitChecks(FunctionConfig.Builder functionConfigBuilder) {

// Check if the Input serialization/deserialization class exists in jar or already loaded and that it
// implements SerDe class
functionConfigBuilder.getInputsMap().forEach((topicName, inputSerializer) -> {
functionConfigBuilder.getCustomSerdeInputsMap().forEach((topicName, inputSerializer) -> {
if (!Reflections.classExists(inputSerializer)
&& !Reflections.classExistsInJar(new File(jarFile), inputSerializer)) {
throw new IllegalArgumentException(
Expand Down
Expand Up @@ -98,6 +98,8 @@ public void testLocalRunnerCmdNoArguments() throws Exception {
assertNull(runner.getFnConfigFile());
}

/*
TODO(sijie):- Can we fix this?
@Test
public void testLocalRunnerCmdSettings() throws Exception {
String fnName = TEST_NAME + "-function";
Expand All @@ -117,8 +119,6 @@ public void testLocalRunnerCmdSettings() throws Exception {
assertNull(runner.getFnConfigFile());
}
/*
TODO(sijie):- Can we fix this?
@Test
public void testLocalRunnerCmdYaml() throws Exception {
URL yamlUrl = getClass().getClassLoader().getResource("test_function_config.yml");
Expand All @@ -144,9 +144,9 @@ public void testCreateFunction() throws Exception {
cmd.run(new String[] {
"create",
"--function-name", fnName,
"--source-topics", sourceTopicName,
"--custom-serde-source-topics", sourceTopicName,
"--sink-topic", sinkTopicName,
"--input-serde-classnames", Utf8StringSerDe.class.getName(),
"--custom-serde-classnames", Utf8StringSerDe.class.getName(),
"--output-serde-classname", Utf8StringSerDe.class.getName(),
"--jar", "SomeJar.jar",
"--tenant", "sample",
Expand All @@ -156,7 +156,7 @@ public void testCreateFunction() throws Exception {

CreateFunction creater = cmd.getCreater();
assertEquals(fnName, creater.getFunctionName());
assertEquals(sourceTopicName, creater.getSourceTopicNames());
assertEquals(sourceTopicName, creater.getCustomSourceTopics());
assertEquals(sinkTopicName, creater.getSinkTopicName());

verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
Expand Down Expand Up @@ -216,9 +216,9 @@ public void testUpdateFunction() throws Exception {
cmd.run(new String[] {
"update",
"--function-name", fnName,
"--source-topics", sourceTopicName,
"--custom-serde-source-topics", sourceTopicName,
"--sink-topic", sinkTopicName,
"--input-serde-classnames", Utf8StringSerDe.class.getName(),
"--custom-serde-classnames", Utf8StringSerDe.class.getName(),
"--output-serde-classname", Utf8StringSerDe.class.getName(),
"--jar", "SomeJar.jar",
"--tenant", "sample",
Expand All @@ -228,7 +228,7 @@ public void testUpdateFunction() throws Exception {

UpdateFunction updater = cmd.getUpdater();
assertEquals(fnName, updater.getFunctionName());
assertEquals(sourceTopicName, updater.getSourceTopicNames());
assertEquals(sourceTopicName, updater.getCustomSourceTopics());
assertEquals(sinkTopicName, updater.getSinkTopicName());

verify(functions, times(1)).updateFunction(any(FunctionConfig.class), anyString());
Expand Down
3 changes: 1 addition & 2 deletions pulsar-functions/conf/example.yml
Expand Up @@ -21,8 +21,7 @@ tenant: "test"
namespace: "test-namespace"
name: "example"
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
inputs:
"persistent://sample/standalone/ns1/test_src" : "org.apache.pulsar.functions.api.utils.Utf8StringSerDe"
inputs: ["persistent://sample/standalone/ns1/test_src"]
userConfig:
"PublishTopic" : "persistent://sample/standalone/ns1/test_result"

Expand Down
3 changes: 2 additions & 1 deletion pulsar-functions/proto/src/main/proto/Function.proto
Expand Up @@ -40,8 +40,9 @@ message FunctionConfig {
string namespace = 2;
string name = 3;
string className = 4;
repeated string inputs = 14;
// map from input topic name to serde
map<string, string> inputs = 5;
map<string, string> custom_serde_inputs = 5;
string outputSerdeClassName = 6;
string sinkTopic = 7;
ProcessingGuarantees processingGuarantees = 9;
Expand Down
2 changes: 0 additions & 2 deletions pulsar-functions/run-counter-examples.sh
Expand Up @@ -19,9 +19,7 @@

bin/pulsar-functions --admin-url http://localhost:8080 functions localrun \
--function-config conf/example.yml \
--source-topics persistent://sample/standalone/ns1/test_src \
--sink-topic persistent://sample/standalone/ns1/test_result \
--input-serde-classnames org.apache.pulsar.functions.api.utils.Utf8StringSerDe \
--output-serde-classname org.apache.pulsar.functions.api.utils.Utf8StringSerDe \
--function-classname org.apache.pulsar.functions.api.examples.CounterFunction \
--jar `pwd`/java-examples/target/pulsar-functions-api-examples.jar \
Expand Down
2 changes: 0 additions & 2 deletions pulsar-functions/run-examples.sh
Expand Up @@ -19,9 +19,7 @@

bin/pulsar-functions --admin-url http://localhost:8080 functions localrun \
--function-config conf/example.yml \
--source-topics persistent://sample/standalone/ns1/test_src \
--sink-topic persistent://sample/standalone/ns1/test_result \
--input-serde-classnames org.apache.pulsar.functions.api.utils.Utf8StringSerDe \
--output-serde-classname org.apache.pulsar.functions.api.utils.Utf8StringSerDe \
--function-classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
--jar `pwd`/java-examples/target/pulsar-functions-api-examples.jar
2 changes: 0 additions & 2 deletions pulsar-functions/run-logging-examples.sh
Expand Up @@ -19,9 +19,7 @@

bin/pulsar-functions --admin-url http://localhost:8080 functions localrun \
--function-config conf/example.yml \
--source-topics persistent://sample/standalone/ns1/test_src \
--sink-topic persistent://sample/standalone/ns1/test_result \
--input-serde-classnames org.apache.pulsar.functions.api.utils.Utf8StringSerDe \
--output-serde-classname org.apache.pulsar.functions.api.utils.Utf8StringSerDe \
--function-classname org.apache.pulsar.functions.api.examples.LoggingFunction \
--jar `pwd`/java-examples/target/pulsar-functions-api-examples.jar
2 changes: 0 additions & 2 deletions pulsar-functions/run-publish-example.sh
Expand Up @@ -19,7 +19,5 @@

bin/pulsar-functions --admin-url http://localhost:8080 functions localrun \
--function-config conf/example.yml \
--source-topics persistent://sample/standalone/ns1/test_src \
--input-serde-classnames org.apache.pulsar.functions.api.utils.Utf8StringSerDe \
--function-classname org.apache.pulsar.functions.api.examples.LoggingFunction \
--jar `pwd`/java-examples/target/pulsar-functions-api-examples.jar
Expand Up @@ -56,7 +56,6 @@ class ProcessFunctionContainer implements FunctionContainer {
private InstanceControlGrpc.InstanceControlFutureStub stub;
private Timer alivenessTimer;
private int alivenessCheckInterval;
private int nProcessStarts;

ProcessFunctionContainer(InstanceConfig instanceConfig,
int maxBufferedTuples,
Expand Down Expand Up @@ -100,24 +99,38 @@ class ProcessFunctionContainer implements FunctionContainer {
args.add(instanceConfig.getFunctionConfig().getName());
args.add("--function_classname");
args.add(instanceConfig.getFunctionConfig().getClassName());
String sourceTopicString = "";
String inputSerdeClassNameString = "";
for (Map.Entry<String, String> entry: instanceConfig.getFunctionConfig().getInputsMap().entrySet()) {
if (sourceTopicString.isEmpty()) {
sourceTopicString = entry.getKey();
} else {
sourceTopicString = sourceTopicString + "," + entry.getKey();
if (instanceConfig.getFunctionConfig().getCustomSerdeInputsCount() > 0) {
String sourceTopicString = "";
String inputSerdeClassNameString = "";
for (Map.Entry<String, String> entry : instanceConfig.getFunctionConfig().getCustomSerdeInputsMap().entrySet()) {
if (sourceTopicString.isEmpty()) {
sourceTopicString = entry.getKey();
} else {
sourceTopicString = sourceTopicString + "," + entry.getKey();
}
if (inputSerdeClassNameString.isEmpty()) {
inputSerdeClassNameString = entry.getValue();
} else {
inputSerdeClassNameString = inputSerdeClassNameString + "," + entry.getValue();
}
}
if (inputSerdeClassNameString.isEmpty()) {
inputSerdeClassNameString = entry.getValue();
} else {
inputSerdeClassNameString = inputSerdeClassNameString + "," + entry.getValue();
args.add("--custom_serde_source_topics");
args.add(sourceTopicString);
args.add("--custom_serde_classnames");
args.add(inputSerdeClassNameString);
}
if (instanceConfig.getFunctionConfig().getInputsCount() > 0) {
String sourceTopicString = "";
for (String topicName : instanceConfig.getFunctionConfig().getInputsList()) {
if (sourceTopicString.isEmpty()) {
sourceTopicString = topicName;
} else {
sourceTopicString = sourceTopicString + "," + topicName;
}
}
args.add("--source_topics");
args.add(sourceTopicString);
}
args.add("--source_topics");
args.add(sourceTopicString);
args.add("--input_serde_classnames");
args.add(inputSerdeClassNameString);
args.add("--auto_ack");
if (instanceConfig.getFunctionConfig().getAutoAck()) {
args.add("true");
Expand Down Expand Up @@ -258,7 +271,6 @@ private int findAvailablePort() {
}

private void startProcess() {
nProcessStarts++;
try {
log.info("ProcessBuilder starting the process with args {}", String.join(" ", processBuilder.command()));
process = processBuilder.start();
Expand Down
Expand Up @@ -56,19 +56,7 @@ public class JavaInstance implements AutoCloseable {
private PulsarFunction pulsarFunction;
private ExecutorService executorService;

public JavaInstance(InstanceConfig config, ClassLoader clsLoader,
PulsarClient pulsarClient,
List<SerDe> inputSerDe,
SerDe outputSerDe,
Map<String, Consumer> sourceConsumers) {
this(
config,
Reflections.createInstance(
config.getFunctionConfig().getClassName(),
clsLoader), clsLoader, pulsarClient, inputSerDe, outputSerDe, sourceConsumers);
}

JavaInstance(InstanceConfig config, Object object,
public JavaInstance(InstanceConfig config, PulsarFunction pulsarFunction,
ClassLoader clsLoader,
PulsarClient pulsarClient,
List<SerDe> inputSerDe, SerDe outputSerDe, Map<String, Consumer> sourceConsumers) {
Expand All @@ -78,12 +66,9 @@ public JavaInstance(InstanceConfig config, ClassLoader clsLoader,
this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, sourceConsumers);

// create the functions
if (object instanceof PulsarFunction) {
pulsarFunction = (PulsarFunction) object;
verifyTypes(inputSerDe, outputSerDe);
} else {
throw new RuntimeException("User class must be either a Request or Raw Request Handler");
}
this.pulsarFunction = pulsarFunction;
verifyTypes(inputSerDe, outputSerDe);


if (config.getLimitsConfig() != null && config.getLimitsConfig().getMaxTimeMs() > 0) {
log.info("Spinning up a executor service since time budget is infinite");
Expand All @@ -96,7 +81,7 @@ private void verifyTypes(List<SerDe> inputSerDe, SerDe outputSerDe) {

for (SerDe serDe : inputSerDe) {
Class<?>[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
if (!typeArgs[0].equals(inputSerdeTypeArgs[0])) {
if (!inputSerdeTypeArgs[0].isAssignableFrom(typeArgs[0])) {
throw new RuntimeException("Inconsistent types found between function input type and input serde type: "
+ " function type = " + typeArgs[0] + ", serde type = " + inputSerdeTypeArgs[0]);
}
Expand All @@ -107,7 +92,7 @@ private void verifyTypes(List<SerDe> inputSerDe, SerDe outputSerDe) {
throw new RuntimeException("Output serde class is null even though return type is not Void!");
}
Class<?>[] outputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, outputSerDe.getClass());
if (!typeArgs[1].equals(outputSerdeTypeArgs[0])) {
if (!outputSerdeTypeArgs[0].isAssignableFrom(typeArgs[1])) {
throw new RuntimeException("Inconsistent types found between function output type and output serde type: "
+ " function type = " + typeArgs[1] + ", serde type = " + outputSerdeTypeArgs[0]);
}
Expand Down
Expand Up @@ -58,13 +58,16 @@ public class JavaInstanceMain {
protected String tenant;
@Parameter(names = "--namespace", description = "Namespace Name\n", required = true)
protected String namespace;
@Parameter(names = "--source_topics", description = "Input Topic Name\n", required = true)
protected String sourceTopicName;

@Parameter(names = "--sink_topic", description = "Output Topic Name\n")
protected String sinkTopicName;

@Parameter(names = "--input_serde_classnames", description = "Input SerDe\n", required = true)
protected String inputSerdeClassName;
@Parameter(names = "--custom_serde_source_topics", description = "Input Topics that need custom deserialization\n", required = false)
protected String customSerdeSourceTopics;
@Parameter(names = "--custom_serde_classnames", description = "Input SerDe\n", required = false)
protected String customSerdeClassnames;
@Parameter(names = "--source_topics", description = "Input Topics\n", required = false)
protected String defaultSerdeSourceTopics;

@Parameter(names = "--output_serde_classname", description = "Output SerDe\n")
protected String outputSerdeClassName;
Expand Down Expand Up @@ -114,13 +117,21 @@ public void start() throws Exception {
functionConfigBuilder.setNamespace(namespace);
functionConfigBuilder.setName(functionName);
functionConfigBuilder.setClassName(className);
String[] sourceTopics = sourceTopicName.split(",");
String[] inputSerdeClassNames = inputSerdeClassName.split(",");
if (sourceTopics.length != inputSerdeClassNames.length) {
throw new RuntimeException("Error specifying inputs");
if (defaultSerdeSourceTopics != null) {
String[] sourceTopics = defaultSerdeSourceTopics.split(",");
for (String sourceTopic : sourceTopics) {
functionConfigBuilder.addInputs(sourceTopic);
}
}
for (int i = 0; i < sourceTopics.length; ++i) {
functionConfigBuilder.putInputs(sourceTopics[i], inputSerdeClassNames[i]);
if (customSerdeSourceTopics != null && customSerdeClassnames != null) {
String[] sourceTopics = customSerdeSourceTopics.split(",");
String[] inputSerdeClassNames = customSerdeClassnames.split(",");
if (sourceTopics.length != inputSerdeClassNames.length) {
throw new RuntimeException("Error specifying inputs");
}
for (int i = 0; i < sourceTopics.length; ++i) {
functionConfigBuilder.putCustomSerdeInputs(sourceTopics[i], inputSerdeClassNames[i]);
}
}
if (outputSerdeClassName != null) {
functionConfigBuilder.setOutputSerdeClassName(outputSerdeClassName);
Expand Down

0 comments on commit edbae3c

Please sign in to comment.