diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/function/types/StructType.java b/ksqldb-common/src/main/java/io/confluent/ksql/function/types/StructType.java index fc6cb18eba92..a55bf7a60659 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/function/types/StructType.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/function/types/StructType.java @@ -22,6 +22,11 @@ public final class StructType extends ObjectType { + /** + * An empty struct accepts any struct as an instantiation. + */ + public static final StructType ANY_STRUCT = StructType.builder().build(); + private final ImmutableMap schema; private StructType(final Map schema) { diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java index 32f277732bb1..ffa2ccb06d5b 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java @@ -256,6 +256,20 @@ public void shouldLoadFunctionWithSchemaProvider() { assertThat(function.getReturnType(args), equalTo(decimal)); } + @Test + public void shouldLoadFunctionWithStructSchemaProvider() { + // Given: + final UdfFactory returnDecimal = FUNC_REG.getUdfFactory(FunctionName.of("KsqlStructUdf")); + + // When: + final List args = ImmutableList.of(); + final KsqlScalarFunction function = returnDecimal.getFunction(args); + + // Then: + assertThat(function.getReturnType(args), equalTo(KsqlStructUdf.RETURN)); + } + + @Test public void shouldLoadFunctionWithNestedDecimalSchema() { // Given: @@ -270,7 +284,6 @@ public void shouldLoadFunctionWithNestedDecimalSchema() { equalTo(SqlStruct.builder().field("VAL", SqlDecimal.of(64, 2)).build())); } - @Test public void shouldThrowOnReturnTypeMismatch() { // Given: @@ -1391,6 +1404,25 @@ public Struct getDecimalStruct() { } } + @UdfDescription( + name = "KsqlStructUdf", + description = "A test-only UDF for testing struct return types") + public static class KsqlStructUdf { + + private static final SqlStruct RETURN = + SqlStruct.builder().field("VAL", SqlTypes.STRING).build(); + + @UdfSchemaProvider + public SqlType provide(final List params) { + return RETURN; + } + + @Udf(schemaProvider = "provide") + public Struct getDecimalStruct() { + return null; + } + } + @SuppressWarnings({"unused", "MethodMayBeStatic"}) // Invoked via reflection in test. @UdfDescription( name = "ReturnIncompatible", diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/TestUdf.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/TestUdf.java index 43cffca82316..2040efd67f34 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/TestUdf.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/TestUdf.java @@ -15,12 +15,20 @@ package io.confluent.ksql.function.udf; +import io.confluent.ksql.schema.ksql.types.SqlStruct; +import io.confluent.ksql.schema.ksql.types.SqlType; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import java.util.List; +import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; @UdfDescription(name="test_udf", description = "test") @SuppressWarnings("unused") public class TestUdf { + private static final SqlStruct RETURN = + SqlStruct.builder().field("A", SqlTypes.STRING).build(); + @Udf(description = "returns the method name") public String doStuffIntString(final int arg1, final String arg2) { return "doStuffIntString"; @@ -47,4 +55,16 @@ public String doStuffStruct( ) { return struct.getString("A"); } + + @Udf(description = "returns the value of 'STRUCT'", schemaProvider = "structProvider") + public Struct returnStructStuff() { + return new Struct( + SchemaBuilder.struct().field("A", SchemaBuilder.OPTIONAL_STRING_SCHEMA).optional().build() + ).put("A", "foo"); + } + + @UdfSchemaProvider + public SqlType structProvider(final List params) { + return RETURN; + } } diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/function/UdfUtil.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/function/UdfUtil.java index 052c7bf4f87e..d1e016992777 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/function/UdfUtil.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/function/UdfUtil.java @@ -23,6 +23,7 @@ import io.confluent.ksql.function.types.ParamType; import io.confluent.ksql.function.types.ParamTypes; import io.confluent.ksql.function.types.StringType; +import io.confluent.ksql.function.types.StructType; import io.confluent.ksql.name.FunctionName; import io.confluent.ksql.util.KsqlException; import java.lang.reflect.GenericArrayType; @@ -32,6 +33,7 @@ import java.math.BigDecimal; import java.util.List; import java.util.Map; +import org.apache.kafka.connect.data.Struct; public final class UdfUtil { @@ -104,7 +106,9 @@ public static ParamType getSchemaFromType(final Type type) { return schema; } + // CHECKSTYLE_RULES.OFF: CyclomaticComplexity private static ParamType handleParameterizedType(final Type type) { + // CHECKSTYLE_RULES.ON: CyclomaticComplexity if (type instanceof ParameterizedType) { final ParameterizedType parameterizedType = (ParameterizedType) type; if (parameterizedType.getRawType() == Map.class) { @@ -133,6 +137,11 @@ private static ParamType handleParameterizedType(final Type type) { return ArrayType.of( GenericType.of( ((GenericArrayType) type).getGenericComponentType().getTypeName())); + } else if (type instanceof Class && Struct.class.isAssignableFrom((Class) type)) { + // we don't have enough information here to return a more specific type of struct, + // but there are other parts of the code that enforce having a schema provider or + // schema annotation if a struct is being used + return StructType.ANY_STRUCT; } throw new KsqlException("Type inference is not supported for: " + type); diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/function/UdfUtilTest.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/function/UdfUtilTest.java index b8a1a8c7dcd9..0bb7f309e6f3 100644 --- a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/function/UdfUtilTest.java +++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/function/UdfUtilTest.java @@ -25,12 +25,14 @@ import io.confluent.ksql.function.types.MapType; import io.confluent.ksql.function.types.ParamType; import io.confluent.ksql.function.types.ParamTypes; +import io.confluent.ksql.function.types.StructType; import io.confluent.ksql.name.FunctionName; import io.confluent.ksql.util.KsqlException; import java.lang.reflect.Type; import java.math.BigDecimal; import java.util.List; import java.util.Map; +import org.apache.kafka.connect.data.Struct; import org.hamcrest.CoreMatchers; import org.hamcrest.MatcherAssert; import org.junit.Test; @@ -173,6 +175,14 @@ public void shouldGetStringSchemaFromStringClass() { ); } + @Test + public void shouldGetStringSchemaFromStructClass() { + assertThat( + UdfUtil.getSchemaFromType(Struct.class), + equalTo(StructType.ANY_STRUCT) + ); + } + @Test(expected = KsqlException.class) public void shouldThrowExceptionIfClassDoesntMapToSchema() { UdfUtil.getSchemaFromType(System.class); diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct/6.0.0_1588882528086/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct/6.0.0_1588882528086/plan.json new file mode 100644 index 000000000000..48c679534824 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct/6.0.0_1588882528086/plan.json @@ -0,0 +1,125 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (VAL STRING) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ROWKEY` STRING KEY, `VAL` STRING", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT TEST_UDF() VALUE\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `VALUE` STRUCT<`A` STRING>", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ROWKEY` STRING KEY, `VAL` STRING" + }, + "selectExpressions" : [ "TEST_UDF() AS VALUE" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.any.key.name.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct/6.0.0_1588882528086/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct/6.0.0_1588882528086/spec.json new file mode 100644 index 000000000000..a6572a1c6ed8 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct/6.0.0_1588882528086/spec.json @@ -0,0 +1,67 @@ +{ + "version" : "6.0.0", + "timestamp" : 1588882528086, + "path" : "query-validation-tests/struct-udfs.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT> NOT NULL" + }, + "testCase" : { + "name" : "Output struct", + "inputs" : [ { + "topic" : "test_topic", + "key" : "1", + "value" : { + "val" : "foo" + }, + "timestamp" : 0 + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "1", + "value" : { + "VALUE" : { + "A" : "foo" + } + }, + "timestamp" : 0 + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM test (val VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT TEST_UDF() AS VALUE FROM test;" ], + "post" : { + "topics" : { + "topics" : [ { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct/6.0.0_1588882528086/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct/6.0.0_1588882528086/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct/6.0.0_1588882528086/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/struct-udfs.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/struct-udfs.json index 6f28dff0d225..9cf0a62902c8 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/struct-udfs.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/struct-udfs.json @@ -28,6 +28,19 @@ "outputs": [ {"topic": "OUTPUT", "key": "1", "value": {"VALUE": "1"}, "timestamp": 0} ] + }, + { + "name": "Output struct", + "statements": [ + "CREATE STREAM test (val VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT TEST_UDF() AS VALUE FROM test;" + ], + "inputs": [ + {"topic": "test_topic", "key": "1", "value": {"val": "foo"}, "timestamp": 0} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "1", "value": {"VALUE": {"A": "foo"}}, "timestamp": 0} + ] } ] } \ No newline at end of file diff --git a/ksqldb-udf/src/main/java/io/confluent/ksql/function/udf/UdfSchemaProvider.java b/ksqldb-udf/src/main/java/io/confluent/ksql/function/udf/UdfSchemaProvider.java index 06c1a2a3f690..bbaa357ad568 100644 --- a/ksqldb-udf/src/main/java/io/confluent/ksql/function/udf/UdfSchemaProvider.java +++ b/ksqldb-udf/src/main/java/io/confluent/ksql/function/udf/UdfSchemaProvider.java @@ -20,14 +20,14 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -@Retention(RetentionPolicy.RUNTIME) -@Target({ElementType.METHOD}) /** * The {@code UdfSchemaProvider} annotation on a method tells KSQL to use this method to resolve * the return type of the udf at runtime. * *

The corresponding udf annotation must have the {@code schemaProvider} specified. */ +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD}) public @interface UdfSchemaProvider { } \ No newline at end of file