diff --git a/docs/developer-guide/udf.rst b/docs/developer-guide/udf.rst index 85e7dbb89e18..b42c046c2cba 100644 --- a/docs/developer-guide/udf.rst +++ b/docs/developer-guide/udf.rst @@ -113,7 +113,7 @@ implement a UDF with a non-deterministic return type. A UDF which returns ``BigD for example, may vary the precision and scale of the output based on the input schema. To use this functionality, you need to specify a method with signature -``public Schema (final List params)`` and annotate it with ``@SchemaProvider``. +``public SqlType (final List params)`` and annotate it with ``@SchemaProvider``. Also, you need to link it to the corresponding UDF by using the ``schemaProvider=`` parameter of the ``@Udf`` annotation. diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/KsqlFunction.java b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlFunction.java index c3b5928f2701..ee7e4e01a487 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/function/KsqlFunction.java +++ b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlFunction.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import io.confluent.ksql.function.udf.Kudf; +import io.confluent.ksql.schema.ksql.SchemaConverters; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.SchemaUtil; @@ -177,9 +178,11 @@ private void checkMatchingReturnTypes(final Schema s1, final Schema s2) { if (!SchemaUtil.areCompatible(s1, s2)) { throw new KsqlException(String.format("Return type %s of UDF %s does not match the declared " + "return type %s.", - s1.toString(), + SchemaConverters.connectToSqlConverter().toSqlType( + s1).toString(), functionName, - s2.toString())); + SchemaConverters.connectToSqlConverter().toSqlType( + s2).toString())); } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java b/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java index ed6a54fd6465..ac61e266f7e1 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java @@ -28,6 +28,7 @@ import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.schema.ksql.SchemaConverters; import io.confluent.ksql.schema.ksql.TypeContextUtil; +import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.security.ExtensionSecurityManager; import io.confluent.ksql.util.DecimalUtil; import io.confluent.ksql.util.KsqlConfig; @@ -404,8 +405,12 @@ private Function,Schema> handleUdfSchemaProviderAnnotation( final Method m = findSchemaProvider(theClass, schemaProviderName); final Object instance = instantiateUdfClass(theClass, annotation); - return parameterTypes -> { - return invokeSchemaProviderMethod(instance, m, parameterTypes, annotation); + return parameterSchemas -> { + final List parameterTypes = parameterSchemas.stream() + .map(p -> SchemaConverters.connectToSqlConverter().toSqlType(p)) + .collect(Collectors.toList()); + return SchemaConverters.sqlToConnectConverter().toConnectSchema(invokeSchemaProviderMethod( + instance, m, parameterTypes, annotation)); }; } @@ -421,15 +426,17 @@ private Method findSchemaProvider(final Class theClass, return m; } catch (NoSuchMethodException e) { throw new KsqlException(String.format( - "Cannot find schema provider method with name %s and parameter List in class %s.", - schemaProviderName,theClass.getName()),e); + "Cannot find schema provider method with name %s and parameter List in class " + + "%s.", schemaProviderName,theClass.getName()),e); } } - private Schema invokeSchemaProviderMethod(final Object instance, final Method m, - final List args, final UdfDescription annotation) { + private SqlType invokeSchemaProviderMethod(final Object instance, + final Method m, + final List args, + final UdfDescription annotation) { try { - return (Schema) m.invoke(instance, args); + return (SqlType) m.invoke(instance, args); } catch (IllegalAccessException | InvocationTargetException e) { throw new KsqlException(String.format("Cannot invoke the schema provider " diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Abs.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Abs.java index 5f0fff13a646..017f9d9c0980 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Abs.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Abs.java @@ -19,11 +19,11 @@ import io.confluent.ksql.function.udf.UdfDescription; import io.confluent.ksql.function.udf.UdfParameter; import io.confluent.ksql.function.udf.UdfSchemaProvider; -import io.confluent.ksql.util.DecimalUtil; +import io.confluent.ksql.schema.ksql.SqlBaseType; +import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.util.KsqlException; import java.math.BigDecimal; import java.util.List; -import org.apache.kafka.connect.data.Schema; @UdfDescription(name = "Abs", description = Abs.DESCRIPTION) public class Abs { @@ -54,12 +54,12 @@ public BigDecimal abs(@UdfParameter final BigDecimal val) { } @UdfSchemaProvider - public Schema provideSchema(final List params) { + public SqlType provideSchema(final List params) { if (params.size() != 1) { throw new KsqlException("Abs udf accepts one parameter"); } - final Schema s = params.get(0); - if (!DecimalUtil.isDecimal(s)) { + final SqlType s = params.get(0); + if (s.baseType() != SqlBaseType.DECIMAL) { throw new KsqlException("The schema provider method for Abs expects a BigDecimal parameter" + "type"); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java index da7e7957aef5..ac5a8e324e0e 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java @@ -36,6 +36,8 @@ import io.confluent.ksql.function.udf.UdfDescription; import io.confluent.ksql.function.udf.UdfParameter; import io.confluent.ksql.function.udf.UdfSchemaProvider; +import io.confluent.ksql.schema.ksql.types.SqlDecimal; +import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.util.DecimalUtil; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; @@ -246,9 +248,8 @@ public void shouldThrowOnReturnTypeMismatch() { // Expect: expectedException.expect(KsqlException.class); - expectedException.expectMessage(is("Return type Schema{org.apache.kafka.connect.data." - + "Decimal:BYTES} of UDF ReturnIncompatible does not " - + "match the declared return type Schema{STRING}.")); + expectedException.expectMessage(is("Return type DECIMAL(2, 1) of UDF ReturnIncompatible does not " + + "match the declared return type STRING.")); // When: function.getReturnType(args); @@ -301,7 +302,7 @@ public void shouldThrowOnMissingSchemaProvider() throws ClassNotFoundException { // Expect: expectedException.expect(KsqlException.class); expectedException.expectMessage(is("Cannot find schema provider method with name provideSchema " - + "and parameter List in class org.damian.ksql.udf." + + "and parameter List in class org.damian.ksql.udf." + "MissingSchemaProviderUdf.")); /// When: @@ -671,8 +672,8 @@ public BigDecimal foo(@UdfParameter("justValue") final BigDecimal p) { } @UdfSchemaProvider - public Schema provideSchema(List params) { - return DecimalUtil.builder(2, 1).build(); + public SqlType provideSchema(List params) { + return SqlDecimal.of(2, 1); } } @@ -689,8 +690,8 @@ public String foo(@UdfParameter("justValue") final BigDecimal p) { } @UdfSchemaProvider - public Schema provideSchema(List params) { - return DecimalUtil.builder(2, 1).build(); + public SqlType provideSchema(List params) { + return SqlDecimal.of(2, 1); } } } \ No newline at end of file diff --git a/ksql-engine/src/test/resources/udf-failing-tests.jar b/ksql-engine/src/test/resources/udf-failing-tests.jar index be374a0909c9..f96cfa98040b 100644 Binary files a/ksql-engine/src/test/resources/udf-failing-tests.jar and b/ksql-engine/src/test/resources/udf-failing-tests.jar differ diff --git a/ksql-engine/src/test/resources/udf-failing-tests/src/main/java/org/damian/ksql/udf/MissingAnnotationUdf.java b/ksql-engine/src/test/resources/udf-failing-tests/src/main/java/org/damian/ksql/udf/MissingAnnotationUdf.java index d6455098cb00..00e2b35aab69 100644 --- a/ksql-engine/src/test/resources/udf-failing-tests/src/main/java/org/damian/ksql/udf/MissingAnnotationUdf.java +++ b/ksql-engine/src/test/resources/udf-failing-tests/src/main/java/org/damian/ksql/udf/MissingAnnotationUdf.java @@ -19,6 +19,8 @@ import io.confluent.ksql.function.udf.UdfDescription; import io.confluent.ksql.function.udf.UdfParameter; import io.confluent.ksql.function.udf.UdfSchemaProvider; +import io.confluent.ksql.schema.ksql.types.SqlDecimal; +import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.util.DecimalUtil; import java.math.BigDecimal; import java.util.List; @@ -41,7 +43,7 @@ public BigDecimal foo(@UdfParameter("justValue") final BigDecimal p) { } @UdfSchemaProvider - public Schema provideSchema(List params) { - return DecimalUtil.builder(2, 1).build(); + public SqlType provideSchema(List params) { + return SqlDecimal.of(2, 1); } } diff --git a/ksql-streams/pom.xml b/ksql-streams/pom.xml index 40d9b26c9be5..a247d84682e9 100644 --- a/ksql-streams/pom.xml +++ b/ksql-streams/pom.xml @@ -69,6 +69,10 @@ ${project.version} test + + io.confluent.ksql + ksql-common +