Skip to content

Commit

Permalink
Fix for FQFN flag (#1528)
Browse files Browse the repository at this point in the history
* move fqfn flag into proper command base

* Extend FunctionCommand from BaseCommand rather than NamespaceCommand

* clean up input topics function

* Add special processArguments function for FunctionCommand

* Match wording of error output across commands

* Add checks for tenant, namespace, and function name
  • Loading branch information
lucperkins authored and merlimat committed Apr 10, 2018
1 parent ab67386 commit 9525047
Showing 1 changed file with 56 additions and 21 deletions.
Expand Up @@ -26,6 +26,7 @@
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.util.Arrays;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
Expand Down Expand Up @@ -103,9 +104,6 @@ void processArguments() throws Exception {}
*/ */
@Getter @Getter
abstract class NamespaceCommand extends BaseCommand { abstract class NamespaceCommand extends BaseCommand {
@Parameter(names = "--fqfn", description = "The Fully Qualified Function Name (FQFN) for the function", required = false)
protected String fqfn;

@Parameter(names = "--tenant", description = "The function's tenant", required = true) @Parameter(names = "--tenant", description = "The function's tenant", required = true)
protected String tenant; protected String tenant;


Expand All @@ -117,9 +115,47 @@ abstract class NamespaceCommand extends BaseCommand {
* Function level command * Function level command
*/ */
@Getter @Getter
abstract class FunctionCommand extends NamespaceCommand { abstract class FunctionCommand extends BaseCommand {
@Parameter(names = "--name", description = "The function's name", required = true) @Parameter(names = "--fqfn", description = "The Fully Qualified Function Name (FQFN) for the function")
protected String fqfn;

@Parameter(names = "--tenant", description = "The function's tenant")
protected String tenant;

@Parameter(names = "--namespace", description = "The function's namespace")
protected String namespace;

@Parameter(names = "--name", description = "The function's name")
protected String functionName; protected String functionName;

@Override
void processArguments() throws Exception {
super.processArguments();

boolean usesSetters = (null != tenant || null != namespace || null != functionName);
boolean usesFqfn = (null != fqfn);

// Throw an exception if --fqfn is set alongside any combination of --tenant, --namespace, and --name
if (usesFqfn && usesSetters) {
throw new RuntimeException(
"You must specify either a Fully Qualified Function Name (FQFN) or tenant, namespace, and function name");
} else if (usesFqfn) {
// If the --fqfn flag is used, parse tenant, namespace, and name using that flag
String[] fqfnParts = fqfn.split("/");
if (fqfnParts.length != 3) {
throw new RuntimeException(
"Fully qualified function names (FQFNs) must be of the form tenant/namespace/name");
}
tenant = fqfnParts[0];
namespace = fqfnParts[1];
functionName = fqfnParts[2];
} else {
if (null == tenant || null == namespace || null == functionName) {
throw new RuntimeException(
"You must specify a tenant, namespace, and name for the function or a Fully Qualified Function Name (FQFN)");
}
}
}
} }


/** /**
Expand All @@ -135,7 +171,7 @@ abstract class FunctionConfigCommand extends BaseCommand {
protected String namespace; protected String namespace;
@Parameter(names = "--name", description = "The function's name") @Parameter(names = "--name", description = "The function's name")
protected String functionName; protected String functionName;
@Parameter(names = "--className", description = "The function's class name", required = true) @Parameter(names = "--className", description = "The function's class name")
protected String className; protected String className;
@Parameter( @Parameter(
names = "--jar", names = "--jar",
Expand Down Expand Up @@ -173,6 +209,8 @@ abstract class FunctionConfigCommand extends BaseCommand {


@Override @Override
void processArguments() throws Exception { void processArguments() throws Exception {
super.processArguments();

FunctionConfig.Builder functionConfigBuilder; FunctionConfig.Builder functionConfigBuilder;


// Initialize config builder either from a supplied YAML config file or from scratch // Initialize config builder either from a supplied YAML config file or from scratch
Expand All @@ -197,10 +235,7 @@ void processArguments() throws Exception {
} }


if (null != inputs) { if (null != inputs) {
String[] topicNames = inputs.split(","); Arrays.asList(inputs.split(",")).forEach(functionConfigBuilder::addInputs);
for (int i = 0; i < topicNames.length; ++i) {
functionConfigBuilder.addInputs(topicNames[i]);
}
} }
if (null != customSerdeInputString) { if (null != customSerdeInputString) {
Type type = new TypeToken<Map<String, String>>(){}.getType(); Type type = new TypeToken<Map<String, String>>(){}.getType();
Expand Down Expand Up @@ -385,17 +420,6 @@ private void doJavaSubmitChecks(FunctionConfig.Builder functionConfigBuilder) {
} }
} }


private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig.Builder functionConfigBuilder) {
String[] args = fqfn.split("/");
if (args.length != 3) {
throw new RuntimeException("Fully qualified function names must be of the form tenant/namespace/name");
} else {
functionConfigBuilder.setTenant(args[0]);
functionConfigBuilder.setNamespace(args[1]);
functionConfigBuilder.setName(args[2]);
}
}

private void doPythonSubmitChecks(FunctionConfig.Builder functionConfigBuilder) { private void doPythonSubmitChecks(FunctionConfig.Builder functionConfigBuilder) {
if (functionConfigBuilder.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { if (functionConfigBuilder.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
throw new RuntimeException("Effectively-once processing guarantees not yet supported in Python"); throw new RuntimeException("Effectively-once processing guarantees not yet supported in Python");
Expand Down Expand Up @@ -746,4 +770,15 @@ private org.apache.pulsar.functions.proto.Function.FunctionConfig convert(Functi
Utils.mergeJson(FunctionsImpl.printJson(functionConfig), functionConfigBuilder); Utils.mergeJson(FunctionsImpl.printJson(functionConfig), functionConfigBuilder);
return functionConfigBuilder.build(); return functionConfigBuilder.build();
} }

private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig.Builder functionConfigBuilder) {
String[] args = fqfn.split("/");
if (args.length != 3) {
throw new RuntimeException("Fully qualified function names (FQFNs) must be of the form tenant/namespace/name");
} else {
functionConfigBuilder.setTenant(args[0]);
functionConfigBuilder.setNamespace(args[1]);
functionConfigBuilder.setName(args[2]);
}
}
} }

0 comments on commit 9525047

Please sign in to comment.