From 13242851cebc33212da991bb99abd78315d80b43 Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Wed, 4 Sep 2019 15:20:01 -0700 Subject: [PATCH] feat: change the public API of schema provider method (#3287) * change public api of method schema provider to use SqlType instead of Schema * Updated docs and tests * forgot test jar * reverted pom changes, applied almog's comments * applied Andy's comment --- docs/developer-guide/udf.rst | 2 +- .../confluent/ksql/function/KsqlFunction.java | 7 ++++-- .../io/confluent/ksql/function/UdfLoader.java | 21 ++++++++++++------ .../confluent/ksql/function/udf/math/Abs.java | 10 ++++----- .../ksql/function/UdfLoaderTest.java | 17 +++++++------- .../src/test/resources/udf-failing-tests.jar | Bin 5058 -> 5017 bytes .../damian/ksql/udf/MissingAnnotationUdf.java | 6 +++-- ksql-streams/pom.xml | 4 ++++ 8 files changed, 42 insertions(+), 25 deletions(-) diff --git a/docs/developer-guide/udf.rst b/docs/developer-guide/udf.rst index 85e7dbb89e18..b42c046c2cba 100644 --- a/docs/developer-guide/udf.rst +++ b/docs/developer-guide/udf.rst @@ -113,7 +113,7 @@ implement a UDF with a non-deterministic return type. A UDF which returns ``BigD for example, may vary the precision and scale of the output based on the input schema. To use this functionality, you need to specify a method with signature -``public Schema (final List params)`` and annotate it with ``@SchemaProvider``. +``public SqlType (final List params)`` and annotate it with ``@SchemaProvider``. Also, you need to link it to the corresponding UDF by using the ``schemaProvider=`` parameter of the ``@Udf`` annotation. diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/KsqlFunction.java b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlFunction.java index c3b5928f2701..ee7e4e01a487 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/function/KsqlFunction.java +++ b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlFunction.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import io.confluent.ksql.function.udf.Kudf; +import io.confluent.ksql.schema.ksql.SchemaConverters; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.SchemaUtil; @@ -177,9 +178,11 @@ private void checkMatchingReturnTypes(final Schema s1, final Schema s2) { if (!SchemaUtil.areCompatible(s1, s2)) { throw new KsqlException(String.format("Return type %s of UDF %s does not match the declared " + "return type %s.", - s1.toString(), + SchemaConverters.connectToSqlConverter().toSqlType( + s1).toString(), functionName, - s2.toString())); + SchemaConverters.connectToSqlConverter().toSqlType( + s2).toString())); } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java b/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java index ed6a54fd6465..ac61e266f7e1 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java @@ -28,6 +28,7 @@ import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.schema.ksql.SchemaConverters; import io.confluent.ksql.schema.ksql.TypeContextUtil; +import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.security.ExtensionSecurityManager; import io.confluent.ksql.util.DecimalUtil; import io.confluent.ksql.util.KsqlConfig; @@ -404,8 +405,12 @@ private Function,Schema> handleUdfSchemaProviderAnnotation( final Method m = findSchemaProvider(theClass, schemaProviderName); final Object instance = instantiateUdfClass(theClass, annotation); - return parameterTypes -> { - return invokeSchemaProviderMethod(instance, m, parameterTypes, annotation); + return parameterSchemas -> { + final List parameterTypes = parameterSchemas.stream() + .map(p -> SchemaConverters.connectToSqlConverter().toSqlType(p)) + .collect(Collectors.toList()); + return SchemaConverters.sqlToConnectConverter().toConnectSchema(invokeSchemaProviderMethod( + instance, m, parameterTypes, annotation)); }; } @@ -421,15 +426,17 @@ private Method findSchemaProvider(final Class theClass, return m; } catch (NoSuchMethodException e) { throw new KsqlException(String.format( - "Cannot find schema provider method with name %s and parameter List in class %s.", - schemaProviderName,theClass.getName()),e); + "Cannot find schema provider method with name %s and parameter List in class " + + "%s.", schemaProviderName,theClass.getName()),e); } } - private Schema invokeSchemaProviderMethod(final Object instance, final Method m, - final List args, final UdfDescription annotation) { + private SqlType invokeSchemaProviderMethod(final Object instance, + final Method m, + final List args, + final UdfDescription annotation) { try { - return (Schema) m.invoke(instance, args); + return (SqlType) m.invoke(instance, args); } catch (IllegalAccessException | InvocationTargetException e) { throw new KsqlException(String.format("Cannot invoke the schema provider " diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Abs.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Abs.java index 5f0fff13a646..017f9d9c0980 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Abs.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Abs.java @@ -19,11 +19,11 @@ import io.confluent.ksql.function.udf.UdfDescription; import io.confluent.ksql.function.udf.UdfParameter; import io.confluent.ksql.function.udf.UdfSchemaProvider; -import io.confluent.ksql.util.DecimalUtil; +import io.confluent.ksql.schema.ksql.SqlBaseType; +import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.util.KsqlException; import java.math.BigDecimal; import java.util.List; -import org.apache.kafka.connect.data.Schema; @UdfDescription(name = "Abs", description = Abs.DESCRIPTION) public class Abs { @@ -54,12 +54,12 @@ public BigDecimal abs(@UdfParameter final BigDecimal val) { } @UdfSchemaProvider - public Schema provideSchema(final List params) { + public SqlType provideSchema(final List params) { if (params.size() != 1) { throw new KsqlException("Abs udf accepts one parameter"); } - final Schema s = params.get(0); - if (!DecimalUtil.isDecimal(s)) { + final SqlType s = params.get(0); + if (s.baseType() != SqlBaseType.DECIMAL) { throw new KsqlException("The schema provider method for Abs expects a BigDecimal parameter" + "type"); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java index da7e7957aef5..ac5a8e324e0e 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java @@ -36,6 +36,8 @@ import io.confluent.ksql.function.udf.UdfDescription; import io.confluent.ksql.function.udf.UdfParameter; import io.confluent.ksql.function.udf.UdfSchemaProvider; +import io.confluent.ksql.schema.ksql.types.SqlDecimal; +import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.util.DecimalUtil; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; @@ -246,9 +248,8 @@ public void shouldThrowOnReturnTypeMismatch() { // Expect: expectedException.expect(KsqlException.class); - expectedException.expectMessage(is("Return type Schema{org.apache.kafka.connect.data." - + "Decimal:BYTES} of UDF ReturnIncompatible does not " - + "match the declared return type Schema{STRING}.")); + expectedException.expectMessage(is("Return type DECIMAL(2, 1) of UDF ReturnIncompatible does not " + + "match the declared return type STRING.")); // When: function.getReturnType(args); @@ -301,7 +302,7 @@ public void shouldThrowOnMissingSchemaProvider() throws ClassNotFoundException { // Expect: expectedException.expect(KsqlException.class); expectedException.expectMessage(is("Cannot find schema provider method with name provideSchema " - + "and parameter List in class org.damian.ksql.udf." + + "and parameter List in class org.damian.ksql.udf." + "MissingSchemaProviderUdf.")); /// When: @@ -671,8 +672,8 @@ public BigDecimal foo(@UdfParameter("justValue") final BigDecimal p) { } @UdfSchemaProvider - public Schema provideSchema(List params) { - return DecimalUtil.builder(2, 1).build(); + public SqlType provideSchema(List params) { + return SqlDecimal.of(2, 1); } } @@ -689,8 +690,8 @@ public String foo(@UdfParameter("justValue") final BigDecimal p) { } @UdfSchemaProvider - public Schema provideSchema(List params) { - return DecimalUtil.builder(2, 1).build(); + public SqlType provideSchema(List params) { + return SqlDecimal.of(2, 1); } } } \ No newline at end of file diff --git a/ksql-engine/src/test/resources/udf-failing-tests.jar b/ksql-engine/src/test/resources/udf-failing-tests.jar index be374a0909c92ef3fa6b5f2b24f6477a520c5204..f96cfa98040b2515bb44f4860eee33b734902365 100644 GIT binary patch delta 1195 zcmX@4K2x1Hz?+$civa{Sc`8rjRbk!;q)e89sk`!E>c1L@nykU74W`2w^@V_Z|9l?y z_4P~)43}9M81xt>CkhzUPYucS7j_hwr~5XP%fr$2b+%(mP@I|J5q$v;FAonsRk2^S zJ8y66F5msMtiPqcb-qd62lgL}dp4J+XLGA9^(-$>`&RjE=FGhMzyCf8Gnh3_Q{X?X zy5gXh;49Y%)dxA;nrHYLH{SA^?C`JolVxLOxW_+3zVj6&ht4s5IkW3ja=qhZ*F(2v z8s}R%39s0#l@&a{waN3BYdnj|x9Be(8M-Oe{$8mw1ij|{TC02Ja!}U+x3jkC(|g*^ zl)R|Ec<=crvLi+0aucqugFcl?H$BfjoGkLx5Un%g+}PCdWTMCpm|Le{Les}I~* zq%HDwN)D^xbhXesT}#sscYX1|w0B`n`+*X{KvSYRm*>~U!B$^ z#J_0O>E=n()fY1O30_l6lr`hH;#OPvBiQ4Zpux>$^Us~znf^hv@5aM7O9fA8s@z&5 zE7xZ;XMexQvqYZ%HE+^+I&_(?D@BQvH&rkE$9p2&=1KhnxpjVzVwwtL)Gu=fG;!YH zycO(`dhq>)Q;}z0eHGX=PvQBcjJ7Sye;f3E-1JmAV@jf0^O;Bg4ybJ|l6&-hqMJqI z@eeBA&y?<5vTXfRej-+V>KEZl^+HyATlhjAXdavY_0)!W&uckF@5HH`iu<-?kJp>5 zP3P3_8C8j2s9KvScuz1R;ukY4L31zw6Y1tE7852=s@=SiO$*GJe2>EoB(PYWlZhFa zvsn!9rTi$XTRg2pJ%DJa>WI$e${M#uBAl>!fk&g$WI9O+qga?E3iJm zn~_O`8J;=)CqLvf2IoL=etR$-$*&KlyZOz)^bvk*FwG=j&xe{rC;JQdfu$A*NHe*y z05hxXaIMP`mY9}CTlQigXu6veL*1K-!o(R zn`KN4403D?40@9jS>z^%3JBLv4T<&_b`-H=-g_fx$$~`>)+(GT(p2tfV{u{<5}Kss zvgEv9dHU_fyEAuLdmq|=NZi!!2lJo6W5&C;Ze^Xk)ieFvSL^$h>E-qRe*I%+xc0DA za8rV4BBNOM2A0#6E1o5`I^=nKEZok?6U|-9G;{l@hb+gkoIaILy#L@J`?U`jjOqh} z%|5@<5pa7iVZ2M#iK*LE)MVx1nu98~{PvY5MKM(xhXPJK>zm?btUJY~^p5YE$6Brl zM_0|=$n93UEV?Azb}<=>f3nrtjmQrtcN|AS~qK4 z`RXE<%l^vA;?I%1!uf(XUht@2I<(kSFiWO;ra`uG&w}S?Y}1!5d$R4=#%#CAJ}u|B z`szLjm1{A0{5tw!>hJSX=4?sZFNNNAJ9DTr!)w`MhD1BQOMbnRckSIOa@nt`fAYjx z6UF^QL~hjA#LtLZo>!$4&Xdn%!um^&|EKK3PdM7~Z(-3-H094$tzdo4vSv%ZWS)Io%kh-Sa;E1B z`hJ+izt@cmd9G{aI@4jFASb`3K*Ux)zDsLU%{>oh>il)Dx7f#7sGD=zYiHezMxMGO zXYwUFW(v;R9#b^+MjfLM2SiO0)dKzU(eZtM4D|B@fl^kvkBw{D$B%=^ajHpLyH% z9rzG>$7g-o*Y&en^DNFX`5bosV*jcvNvFSBXW7Rpp~=;^QiD6c&ipZ_cWv7@<15dc zD){uBX8eaIeGUd-a^4)wV!{MU-SemSN%qD)3h?zk9rR@=M-;`h#4D40sqWRPR^X@VV>mj28*)jw1X`?DYz_ zwy>nH%C!FQ{_e+*vy3@4#Xf!baZprT=<=K?4+E4-Y) z@`=xt5PNZZ)!u85X9vE~u1}AyHCWNgYdDSRuk}300})3NY8%G$iT3WnSsFw zTtYyIC5^c(lPv^fC)cxxPtFkF0V^(G&r-SoRGa`*Y>%RN3ESkEY>ps>@*q=xUs9eS z3REBr#9}B4FHTMnRILZ8QR$vzc@=0_CeSbe6g5(u3=BE>=_Xlv1x5Jb3nSiCKm{)X`^d^ zD*-Z0;T(|qZlHP>d0-0h*ETy params) { - return DecimalUtil.builder(2, 1).build(); + public SqlType provideSchema(List params) { + return SqlDecimal.of(2, 1); } } diff --git a/ksql-streams/pom.xml b/ksql-streams/pom.xml index 40d9b26c9be5..a247d84682e9 100644 --- a/ksql-streams/pom.xml +++ b/ksql-streams/pom.xml @@ -69,6 +69,10 @@ ${project.version} test + + io.confluent.ksql + ksql-common +