Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UDF tidy ups #1429

Merged
merged 5 commits into from Jun 14, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,6 +21,10 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Used to signal that a method in a class that has the @UdfDescription annotation
* as a function that can be invoked.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean "is a function that can be invoked"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe: "The {@code Udf} annotation on a method tells KSQL that this method should be exposed as a user-defined function in KSQL. The enclosing class must also be annotated with {@code UdfDescription}."

*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Udf {
Expand Down
Expand Up @@ -21,6 +21,12 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Classes with this annotation will be scanned for the @Udf annotation. This tells KSQL
* that this class contains methods that you would like to add as functions to KSQL.
* The name of the udf will be the same for each of the @Udf annotated methods in your class.
* The parameters and return types can vary.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface UdfDescription {
Expand Down
Expand Up @@ -26,31 +26,30 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* Used to restrict the classes that can be loaded by user supplied UDFs
*/
public class Blacklist implements Predicate<String> {
private static final Logger logger = LoggerFactory.getLogger(Blacklist.class);
private static final String EMPTY_BLACKLIST = "^(?)\\.?.*$";
private static final String EMPTY_BLACKLIST = "^(?:)\\.?.*$";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split into two constants:

private static final String BLACKLIST_PREFIX = "^(?:";
private static final String BLACKLIST_SUFFIX = ")\\.?.*$";

Then your collect line can be:

.collect(Collectors.joining("|",  BLACKLIST_PREFIX, BLACKLIST_SUFFIX));

And your empty check can be:

if (this.blackList.equals(BLACKLIST_PREFIX + BLACKLIST_SUFFIX))

Which I think is easier to understand.


private String blackList;
private String blackList = ".*";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: moving ".*" into constant BLACKLIST_ALL would help impart meaning more.


Blacklist(final File inputFile) {
try {
final StringBuilder builder = new StringBuilder("^(?:");
Files.readLines(inputFile, Charset.forName(StandardCharsets.UTF_8.name()))
.forEach(item -> {
final String trimmed = item.trim();
if (!(trimmed.isEmpty() || trimmed.startsWith("#"))) {
builder.append(trimmed.replaceAll("\\.", "\\\\.")).append("|");
}
});
builder.deleteCharAt(builder.length() - 1);
builder.append(")\\.?.*$");
this.blackList = builder.toString().equals(EMPTY_BLACKLIST)
? ""
: builder.toString();
this.blackList = Files.readLines(inputFile, Charset.forName(StandardCharsets.UTF_8.name()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have a comment describing how this block is parsing the blacklist file. Specifically, how are strings being transformed with replace and join? The regexes are not straightforward to interpret.

.stream()
.map(String::trim)
.filter(line -> !line.isEmpty())
.filter(line -> !line.startsWith("#"))
.map(line -> line.replaceAll("\\.", "\\\\."))
.collect(Collectors.joining("|", "^(?:",")\\.?.*$"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: formatting - needs space after comma, to make it easier to grok.


if (this.blackList.equals(EMPTY_BLACKLIST)) {
this.blackList = "";
}
logger.info("Setting UDF blacklisted classes to: " + blackList);
} catch (IOException e) {
logger.error("failed to load resource blacklist from " + inputFile
Expand All @@ -60,6 +59,6 @@ public class Blacklist implements Predicate<String> {

@Override
public boolean test(final String resourceName) {
return blackList == null || resourceName.matches(blackList);
return resourceName.matches(blackList);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of interest, why are we using a regex for such simple matching? Can't we just dump each line into a Set and use contains(resourceName)???

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is intended to do partial matches, i.e., the regex produced would be something like:
^(?:java\.lang\.Process|java\.lang\.Runtime|javax)\.?.*$
which will match anything starting with java.lang.Process, java.lang.Runtime, javax

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, gotcha.

}
}
Expand Up @@ -152,10 +152,10 @@ private void addStringFunctions() {
addFunction(ucase);


KsqlFunction concat = new KsqlFunction(Schema.STRING_SCHEMA, Arrays.asList(Schema.STRING_SCHEMA,
Schema.STRING_SCHEMA),
"CONCAT", ConcatKudf.class);

KsqlFunction concat = new KsqlFunction(Schema.OPTIONAL_STRING_SCHEMA,
Arrays.asList(Schema.OPTIONAL_STRING_SCHEMA,
Schema.OPTIONAL_STRING_SCHEMA),
"CONCAT", ConcatKudf.class);
addFunction(concat);

KsqlFunction trim = new KsqlFunction(Schema.OPTIONAL_STRING_SCHEMA,
Expand Down
Expand Up @@ -25,6 +25,8 @@
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import avro.shaded.com.google.common.collect.ImmutableMap;
import io.confluent.ksql.util.KsqlException;
Expand All @@ -51,6 +53,22 @@ public class UdfCompiler {
.put(List.class, index -> typeConversionCode("List", index))
.build();

// Templates used to generate the UDF code
private static final String genericTemplate =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much nicer!

"#TYPE arg#INDEX;\n"
+ "if(args[#INDEX] == null) arg#INDEX = null;\n"
+ "else if (args[#INDEX] instanceof #TYPE) arg#INDEX = (#TYPE)args[#INDEX];\n"
+ "else if (args[#INDEX] instanceof String) arg#INDEX = "
+ "#TYPE.valueOf((String)args[#INDEX]);\n";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line basically coercing an unmatched type into a string? Wouldn't it be better to just throw an exception?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay - grokable code!

I'm with @apurvam though - implicit type coercion scares me. Why do we need it? Why not just throw?

If we need this, then at the very least let's wrap this in a try/catch so we can throw a more meaningful error. Also, what if #Type.valueOf(String) does not exist?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No - there are no unmatched types as the types need to be in typeConverters map.
For types that are not Map or List (as they are handled differently), it is converting it from a String to the type by using the valueOf() method, i.e., Boolean.valueOf(String), Long.valueOf(String)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i added a test to UdfCompilerTest to show that an exception will be raised if the type isn't one that we accept


private static final String NUMBER_TEMPLATE =
"else if (args[#INDEX] instanceof Number) arg#INDEX = "
+ "((Number)args[#INDEX]).#NUM_VALUE;\n";

private static final String THROWS_TEMPLATE =
"else throw new KsqlFunctionException(\"Type: \" + args[#INDEX].getClass() + \""
+ " is not supported by KSQL UDFS\");";


UdfInvoker compile(final Method method, final ClassLoader loader) {
try {
Expand Down Expand Up @@ -87,26 +105,23 @@ UdfInvoker compile(final Method method, final ClassLoader loader) {
*/

private static String generateCode(final Method method) {
final StringBuilder builder = new StringBuilder();
final Class<?>[] params = method.getParameterTypes();
for (int i = 0; i < params.length; i++) {

final String prefix = IntStream.range(0, params.length).mapToObj(i -> {
final Function<Integer, String> converter = typeConverters.get(params[i]);
if (converter == null) {
throw new KsqlException("Type " + params[i] + " is not supported in UDFs");
}
builder.append(converter.apply(i)).append("\n");
}
return converter.apply(i);
}).collect(Collectors.joining("\n", "", "\nreturn (("
+ method.getDeclaringClass().getSimpleName()
+ ") thiz)." + method.getName() + "("
));

builder.append("\nreturn ((").append(method.getDeclaringClass().getSimpleName())
.append(") thiz).").append(method.getName()).append("(");
final String code = IntStream.range(0, params.length).mapToObj(i -> "arg" + i)
.collect(Collectors.joining(",",
prefix, ");"));

for (int i = 0; i < params.length; i++) {
builder.append("arg").append(i).append(",");
}

builder.deleteCharAt(builder.length() - 1);
builder.append(");");
final String code = builder.toString();
logger.debug("generated code for udf method = {}\n{}", method, code);
return code;
}
Expand All @@ -126,33 +141,23 @@ private static IScriptEvaluator createScriptEvaluator(final Method method,
return scriptEvaluator;
}


private static String typeConversionCode(final String type, final int index) {
if (type.equals("Map") || type.equals("List")) {
return type + " arg" + index + " = (" + type + ")args[" + index + "];\n";
}
final String argArrayVal = "args[" + index + "]";
final String argVarAssignment = "arg" + index + " = ";

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numericValue can just be in the code template, and then you can simplify this code:

private static final String INTEGER_NUMBER_TEMPLATE =
      "else if (args[#INDEX] instanceof Number) arg#INDEX = "
          + "((Number)args[#INDEX]).intValue();\n";

private static final String NUMBER_TEMPLATE =
      "else if (args[#INDEX] instanceof Number) arg#INDEX = "
      + "((Number)args[#INDEX]).#LC_TYPEValue();\n";

private static String typeConversionCode(final String type, final int index) {
    if (type.equals("Map") || type.equals("List")) {
      return type + " arg" + index + " = (" + type + ")args[" + index + "];\n";
    }

    final StringBuilder builder = new StringBuilder();
    builder.append(genericTemplate);
    if (type.equals("Integer")) {
      builder.append(INTEGER_NUMBER_TEMPLATE);
    } else if (!type.equals("String") && !type.equals("Boolean")) {
      builder.append(GENERIC_NUMBER_TEMPLATE);
    }
    builder.append(THROWS_TEMPLATE);

    return builder.toString()
        .replaceAll("#TYPE", type)
        .replaceAll("#LC_TYPE", type.toLowerCase())
        .replaceAll("#INDEX", String.valueOf(index));
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I'm still wondering if there is scope for this function to generate invalid code. Even if we know today that its only going to be called with a known set of type parameter values, I think it would still be sensible to validate type against a supported list and throw if not supported. This is better then generating code that won't compile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See line 113

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't really see your code as simplification!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it switching the code to only have one set of replaceAlls, i.e. it generates the whole set of code before doing the replacements, and moves the code generation in:

final String numericValue = type.equals("Integer") ? "intValue()" : type.toLowerCase()
         .append(argVarAssignment)	+          + "Value()";

Into the easy to grok constants.

final StringBuilder builder = new StringBuilder();
final String numericValue = type.equals("Integer") ? "intValue()" : type.toLowerCase()
+ "Value()";
builder.append(type).append(" arg").append(index).append(";\n")
.append("if(").append(argArrayVal).append(" == null) ").append(argVarAssignment)
.append("null;\n")
.append("else if(").append(argArrayVal).append(" instanceof ").append(type).append(") ")
.append(argVarAssignment).append("(").append(type).append(")")
.append(argArrayVal).append(";\n")
.append("else if(").append(argArrayVal).append(" instanceof String) ")
.append(argVarAssignment).append(type).append(".valueOf((String)")
.append(argArrayVal).append(");\n");
builder.append(genericTemplate.replaceAll("#TYPE", type)
.replaceAll("#INDEX", String.valueOf(index)));

if (!type.equals("String") && !type.equals("Boolean")) {
builder.append("else if(").append(argArrayVal).append(" instanceof Number) ")
.append(argVarAssignment)
.append("((Number)").append(argArrayVal).append(").").append(numericValue)
.append(";\n");
final String numericValue = type.equals("Integer") ? "intValue()" : type.toLowerCase()
+ "Value()";
builder.append(NUMBER_TEMPLATE.replaceAll("#INDEX", String.valueOf(index))
.replaceAll("#NUM_VALUE", numericValue));
}
builder.append("else throw new KsqlFunctionException(\"Type: \" + ").append(argArrayVal)
.append(".getClass() + \"is not supported by KSQL UDFS\");");

builder.append(THROWS_TEMPLATE.replaceAll("#INDEX", String.valueOf(index)));
return builder.toString();
}
}
Expand Up @@ -16,7 +16,6 @@

package io.confluent.ksql.function;

import com.google.common.base.Preconditions;

import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
Expand Down Expand Up @@ -50,7 +49,7 @@

public class UdfLoader {

private static final Logger logger = LoggerFactory.getLogger(UdfLoader.class);
private static final Logger LOGGER = LoggerFactory.getLogger(UdfLoader.class);

private final MetaStore metaStore;
private final File pluginDir;
Expand Down Expand Up @@ -90,7 +89,7 @@ public void load() {
.map(path -> UdfClassLoader.newClassLoader(path, parentClassLoader, blacklist))
.forEach(this::loadUdfs);
} catch (IOException e) {
logger.error("Failed to load UDFs from location {}", pluginDir, e);
LOGGER.error("Failed to load UDFs from location {}", pluginDir, e);
}
}
}
Expand All @@ -116,7 +115,7 @@ private void loadUdfs(final ClassLoader loader) {
if (parentClassLoader == loader) {
throw e;
} else {
logger.warn("Failed to add UDF to the MetaStore. name={} method={}",
LOGGER.warn("Failed to add UDF to the MetaStore. name={} method={}",
annotation.name(),
method,
e);
Expand All @@ -130,33 +129,41 @@ private void loadUdfs(final ClassLoader loader) {
private void addFunction(final UdfDescription annotation,
final Method method,
final UdfInvoker udf) {
// sanity check
instantiateUdfClass(method, annotation);
final String sensorName = "ksql-udf-" + annotation.name();
addSensor(sensorName, annotation.name());

metaStore.addFunction(new KsqlFunction(
SchemaUtil.getSchemaFromType(method.getReturnType()),
Arrays.stream(method.getGenericParameterTypes())
.map(SchemaUtil::getSchemaFromType).collect(Collectors.toList()),
annotation.name(),
collectMetrics ? UdfMetricProducer.class : PluggableUdf.class,
() -> {
try {
final PluggableUdf theUdf
= new PluggableUdf(udf, method.getDeclaringClass().newInstance());
if (collectMetrics) {
return new UdfMetricProducer(metrics.getSensor(sensorName),
theUdf,
new SystemTime());
}
return theUdf;
} catch (Exception e) {
throw new KsqlException("Failed to create instance for UDF="
+ annotation.name()
+ ", method=" + method,
e);
final PluggableUdf theUdf
= new PluggableUdf(udf, instantiateUdfClass(method, annotation));
if (collectMetrics) {
return new UdfMetricProducer(metrics.getSensor(sensorName),
theUdf,
new SystemTime());
}
return theUdf;
}));
}

private Object instantiateUdfClass(final Method method,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Static

final UdfDescription annotation) {
try {
return method.getDeclaringClass().newInstance();
} catch (final Exception e) {
throw new KsqlException("Failed to create instance for UDF="
+ annotation.name()
+ ", method=" + method,
e);
}
}

private void addSensor(final String sensorName, final String udfName) {
if (collectMetrics && metrics.getSensor(sensorName) == null) {
final Sensor sensor = metrics.sensor(sensorName);
Expand All @@ -183,10 +190,6 @@ public static UdfLoader newInstance(final KsqlConfig config,
final Boolean loadCustomerUdfs = config.getBoolean(KsqlConfig.KSQL_ENABLE_UDFS);
final Boolean collectMetrics = config.getBoolean(KsqlConfig.KSQL_COLLECT_UDF_METRICS);
final File pluginDir = new File(ksqlInstallDir, "ext");

Preconditions.checkArgument(!loadCustomerUdfs || pluginDir.isDirectory(),
pluginDir.getPath() + " must be a directory when " + KsqlConfig.KSQL_ENABLE_UDFS
+ " is true");
return new UdfLoader(metaStore,
pluginDir,
Thread.currentThread().getContextClassLoader(),
Expand Down
Expand Up @@ -19,6 +19,8 @@
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;

import java.util.Objects;

import io.confluent.ksql.function.udf.Kudf;

/**
Expand All @@ -32,9 +34,9 @@ class UdfMetricProducer implements Kudf {
public UdfMetricProducer(final Sensor sensor,
final Kudf kudf,
final Time time) {
this.sensor = sensor;
this.kudf = kudf;
this.time = time;
this.sensor = Objects.requireNonNull(sensor, "sensor can't be null");
this.kudf = Objects.requireNonNull(kudf, "kudf can't be null");
this.time = Objects.requireNonNull(time, "time can't be null");
}

@Override
Expand Down
Expand Up @@ -16,6 +16,8 @@

package io.confluent.ksql.function.udf;

import java.util.Objects;

import io.confluent.ksql.function.UdfInvoker;

/**
Expand All @@ -29,10 +31,10 @@ public class PluggableUdf implements Kudf {
private final UdfInvoker udf;
private final Object actualUdf;

public PluggableUdf(final UdfInvoker udf,
public PluggableUdf(final UdfInvoker udfInvoker,
final Object actualUdf) {
this.udf = udf;
this.actualUdf = actualUdf;
this.udf = Objects.requireNonNull(udfInvoker, "udfInvoker can't be null");
this.actualUdf = Objects.requireNonNull(actualUdf, "actualUdf can't be null");
}

@Override
Expand Down
Expand Up @@ -30,6 +30,7 @@
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

Expand All @@ -54,9 +55,9 @@ public class StandaloneExecutor implements Executable {
StandaloneExecutor(final KsqlEngine ksqlEngine,
final String queriesFile,
final UdfLoader udfLoader) {
this.ksqlEngine = ksqlEngine;
this.queriesFile = queriesFile;
this.udfLoader = udfLoader;
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine can't be null");
this.queriesFile = Objects.requireNonNull(queriesFile, "queriesFile can't be null");
this.udfLoader = Objects.requireNonNull(udfLoader, "udfLoader can't be null");
}

public void start() {
Expand Down