Skip to content

Commit

Permalink
fix: allow structs in schema provider return types (#5287)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed May 7, 2020
1 parent bd9302a commit 2e604f0
Show file tree
Hide file tree
Showing 10 changed files with 297 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@

public final class StructType extends ObjectType {

/**
* An empty struct accepts any struct as an instantiation.
*/
public static final StructType ANY_STRUCT = StructType.builder().build();

private final ImmutableMap<String, ParamType> schema;

private StructType(final Map<String, ParamType> schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,20 @@ public void shouldLoadFunctionWithSchemaProvider() {
assertThat(function.getReturnType(args), equalTo(decimal));
}

@Test
public void shouldLoadFunctionWithStructSchemaProvider() {
// Given:
final UdfFactory returnDecimal = FUNC_REG.getUdfFactory(FunctionName.of("KsqlStructUdf"));

// When:
final List<SqlType> args = ImmutableList.of();
final KsqlScalarFunction function = returnDecimal.getFunction(args);

// Then:
assertThat(function.getReturnType(args), equalTo(KsqlStructUdf.RETURN));
}


@Test
public void shouldLoadFunctionWithNestedDecimalSchema() {
// Given:
Expand All @@ -270,7 +284,6 @@ public void shouldLoadFunctionWithNestedDecimalSchema() {
equalTo(SqlStruct.builder().field("VAL", SqlDecimal.of(64, 2)).build()));
}


@Test
public void shouldThrowOnReturnTypeMismatch() {
// Given:
Expand Down Expand Up @@ -1391,6 +1404,25 @@ public Struct getDecimalStruct() {
}
}

@UdfDescription(
name = "KsqlStructUdf",
description = "A test-only UDF for testing struct return types")
public static class KsqlStructUdf {

private static final SqlStruct RETURN =
SqlStruct.builder().field("VAL", SqlTypes.STRING).build();

@UdfSchemaProvider
public SqlType provide(final List<SqlType> params) {
return RETURN;
}

@Udf(schemaProvider = "provide")
public Struct getDecimalStruct() {
return null;
}
}

@SuppressWarnings({"unused", "MethodMayBeStatic"}) // Invoked via reflection in test.
@UdfDescription(
name = "ReturnIncompatible",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,20 @@

package io.confluent.ksql.function.udf;

import io.confluent.ksql.schema.ksql.types.SqlStruct;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import java.util.List;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;

@UdfDescription(name="test_udf", description = "test")
@SuppressWarnings("unused")
public class TestUdf {

private static final SqlStruct RETURN =
SqlStruct.builder().field("A", SqlTypes.STRING).build();

@Udf(description = "returns the method name")
public String doStuffIntString(final int arg1, final String arg2) {
return "doStuffIntString";
Expand All @@ -47,4 +55,16 @@ public String doStuffStruct(
) {
return struct.getString("A");
}

@Udf(description = "returns the value of 'STRUCT<A VARCHAR>'", schemaProvider = "structProvider")
public Struct returnStructStuff() {
return new Struct(
SchemaBuilder.struct().field("A", SchemaBuilder.OPTIONAL_STRING_SCHEMA).optional().build()
).put("A", "foo");
}

@UdfSchemaProvider
public SqlType structProvider(final List<SqlType> params) {
return RETURN;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.ksql.function.types.ParamType;
import io.confluent.ksql.function.types.ParamTypes;
import io.confluent.ksql.function.types.StringType;
import io.confluent.ksql.function.types.StructType;
import io.confluent.ksql.name.FunctionName;
import io.confluent.ksql.util.KsqlException;
import java.lang.reflect.GenericArrayType;
Expand All @@ -32,6 +33,7 @@
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Struct;

public final class UdfUtil {

Expand Down Expand Up @@ -104,7 +106,9 @@ public static ParamType getSchemaFromType(final Type type) {
return schema;
}

// CHECKSTYLE_RULES.OFF: CyclomaticComplexity
private static ParamType handleParameterizedType(final Type type) {
// CHECKSTYLE_RULES.ON: CyclomaticComplexity
if (type instanceof ParameterizedType) {
final ParameterizedType parameterizedType = (ParameterizedType) type;
if (parameterizedType.getRawType() == Map.class) {
Expand Down Expand Up @@ -133,6 +137,11 @@ private static ParamType handleParameterizedType(final Type type) {
return ArrayType.of(
GenericType.of(
((GenericArrayType) type).getGenericComponentType().getTypeName()));
} else if (type instanceof Class<?> && Struct.class.isAssignableFrom((Class<?>) type)) {
// we don't have enough information here to return a more specific type of struct,
// but there are other parts of the code that enforce having a schema provider or
// schema annotation if a struct is being used
return StructType.ANY_STRUCT;
}

throw new KsqlException("Type inference is not supported for: " + type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import io.confluent.ksql.function.types.MapType;
import io.confluent.ksql.function.types.ParamType;
import io.confluent.ksql.function.types.ParamTypes;
import io.confluent.ksql.function.types.StructType;
import io.confluent.ksql.name.FunctionName;
import io.confluent.ksql.util.KsqlException;
import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Struct;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Test;
Expand Down Expand Up @@ -173,6 +175,14 @@ public void shouldGetStringSchemaFromStringClass() {
);
}

@Test
public void shouldGetStringSchemaFromStructClass() {
assertThat(
UdfUtil.getSchemaFromType(Struct.class),
equalTo(StructType.ANY_STRUCT)
);
}

@Test(expected = KsqlException.class)
public void shouldThrowExceptionIfClassDoesntMapToSchema() {
UdfUtil.getSchemaFromType(System.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (VAL STRING) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`ROWKEY` STRING KEY, `VAL` STRING",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT TEST_UDF() VALUE\nFROM TEST TEST\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ROWKEY` STRING KEY, `VALUE` STRUCT<`A` STRING>",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`ROWKEY` STRING KEY, `VAL` STRING"
},
"selectExpressions" : [ "TEST_UDF() AS VALUE" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.any.key.name.enabled" : "false",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
{
"version" : "6.0.0",
"timestamp" : 1588882528086,
"path" : "query-validation-tests/struct-udfs.json",
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<VAL VARCHAR> NOT NULL",
"CSAS_OUTPUT_0.OUTPUT" : "STRUCT<VALUE STRUCT<A VARCHAR>> NOT NULL"
},
"testCase" : {
"name" : "Output struct",
"inputs" : [ {
"topic" : "test_topic",
"key" : "1",
"value" : {
"val" : "foo"
},
"timestamp" : 0
} ],
"outputs" : [ {
"topic" : "OUTPUT",
"key" : "1",
"value" : {
"VALUE" : {
"A" : "foo"
}
},
"timestamp" : 0
} ],
"topics" : [ {
"name" : "OUTPUT",
"replicas" : 1,
"numPartitions" : 4
}, {
"name" : "test_topic",
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM test (val VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT TEST_UDF() AS VALUE FROM test;" ],
"post" : {
"topics" : {
"topics" : [ {
"name" : "OUTPUT",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
}, {
"name" : "test_topic",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
} ]
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

0 comments on commit 2e604f0

Please sign in to comment.