Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KLIP-1: Improve UDF Interfaces #2503

Merged
merged 6 commits into from Apr 3, 2019

Conversation

@agavra
Copy link
Contributor

commented Mar 1, 2019

@agavra agavra requested a review from confluentinc/ksql Mar 1, 2019

@hjafarpour
Copy link
Member

left a comment

Thanks for updating the KLIP for the struct return type as we discussed offline.
Left a few comments and questions :)

improve the UDF/UDAF interface in KSQL. Of these features, four stand out:

- _Custom Struct Support_ - UDFs can only operate on standard Java types (`Map`, `List`, `String`,
etc...) and do not accept structured types. This dramatically limits what KSQL can handle, as

This comment has been minimized.

Copy link
@hjafarpour

hjafarpour Mar 6, 2019

Member

As of now, support for Map and List also is limited and you cannot have nested complex data.

This comment has been minimized.

Copy link
@agavra

agavra Mar 6, 2019

Author Contributor

That's rather unfortunate 😢 but I think nested complex data is out of scope for this specific one. I'll add that to the out of scope section unless you feel strongly that those should be covered as part of this KLIP.

This comment has been minimized.

Copy link
@blueedgenick

blueedgenick Mar 13, 2019

Contributor

might not be super-urgent compared to teh other items under discussion, but "just enough" nested support to allow e.g. a "flatten" function (List<T> flatten(List<List<T>>) would be useful.

public static final Schema SCHEMA = SchemaBuilder.struct()...build();
@Udf("Checks if the employee has a valid record (i.e. contains a valid name and email)")
@UdfReturn(value = "STRUCT<'A' INT, 'B' VARCHAR>") // sample specification annotation

This comment has been minimized.

Copy link
@hjafarpour

hjafarpour Mar 6, 2019

Member

I'm more in favor of this one. Even better if we can build and supply the schema object as the return type.

This comment has been minimized.

Copy link
@agavra

agavra Mar 6, 2019

Author Contributor

What you are suggesting is similar to the third (inline) option. Thoughts? Also I'm not opposed to allowing all three ways. That way for simple ones we can support the inline one and for complicated ones we can support the second/third option.

This comment has been minimized.

Copy link
@rodesai

rodesai Mar 22, 2019

Contributor

Yeah I think we'll need both 1 and 3 at least.

```

### Generics

This comment has been minimized.

Copy link
@hjafarpour

hjafarpour Mar 6, 2019

Member

Considering we need to have the full struct schema for the return value, how would generics work with complex types such as structs?

This comment has been minimized.

Copy link
@agavra

agavra Mar 6, 2019

Author Contributor

The intention here was limited to Map and List. I'm not exactly sure what a generic struct output would look like (since there is no such thing as Struct<T>). Also check the note at the bottom, wildcards and non-inferable output types are not supported as of this KLIP.

@agavra agavra requested a review from confluentinc/ksql Mar 6, 2019

@agavra agavra added this to the 5.3 milestone Mar 11, 2019

@mitch-seymour mitch-seymour referenced this pull request Mar 12, 2019
0 of 2 tasks complete
@vcrfxia
Copy link
Contributor

left a comment

Very cool and thorough KLIP @agavra ! Looking forward to seeing this. Left a few questions.

must have some mechanism to create the output schema. There are two ways to resolve the output
schema:

* Resolve the schema at runtime, inferring it from the output object. This can work, but runs the

This comment has been minimized.

Copy link
@vcrfxia

vcrfxia Mar 13, 2019

Contributor

Does this option mean we'd have to wait for data to come through the UDF before KSQL could know the output schema of the UDF? If so, this seems unacceptably limiting, since it means a user wouldn't be able to use a stream created with such a UDF until the stream processed data, i.e., supposing the user issued CREATE STREAM foo AS SELECT myUDF(...) FROM ... ;, they wouldn't be able to follow that up with CREATE STREAM bar AS SELECT ... FROM foo; until foo had started processing data.

This comment has been minimized.

Copy link
@agavra

agavra Mar 13, 2019

Author Contributor

You are correct - perhaps with this limitation it isn't really even a viable option! I will update the doc to remove this alternative.

This comment has been minimized.

Copy link
@apurvam

apurvam Apr 2, 2019

Contributor

It looks like the unviable alternative still has to be removed :)

This comment has been minimized.

Copy link
@agavra

agavra Apr 2, 2019

Author Contributor

are you looking at the most recent revision? I can't find the reference to it.

@Udf("Checks if the employee has a valid record (i.e. contains a valid name and email)")
@UdfReturn(value = "STRUCT<'A' INT, 'B' VARCHAR>") // sample specification annotation
@UdfReturn(file = 'schema_def.kschema') // another way pointing to a file

This comment has been minimized.

Copy link
@vcrfxia

vcrfxia Mar 13, 2019

Contributor

What would the contents of the file be? Would the contents look like option 1?

This comment has been minimized.

Copy link
@agavra

agavra Mar 13, 2019

Author Contributor

That's how I imagined it for this KLIP, though perhaps in the future we could also support .avsc or .json to specify the schema


### Complex Aggregation

We will allow users to supply an additional method `VR export(A agg)`. This method will be taken

This comment has been minimized.

Copy link
@vcrfxia

vcrfxia Mar 13, 2019

Contributor

Out of curiosity, what does VR stand for? I gather it's the return type but can't piece together the acronym.

This comment has been minimized.

Copy link
@agavra

agavra Mar 13, 2019

Author Contributor

I assume it stands for ValueReturn. I took it from KStreams:

    <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper);

@vcrfxia vcrfxia requested a review from confluentinc/ksql Mar 13, 2019

@blueedgenick
Copy link
Contributor

left a comment

Overall loving it! Few inline questions, suggestions, and details to maybe clarify. Thanks for pulling this together so thoroughly!

least one of the parameters. We can access this information via reflection.

**NOTE:** Supporting a wildcard type (e.g. `<?>`) is not covered by this design since we would not
be able to generate the output schema of select statements.

This comment has been minimized.

Copy link
@blueedgenick

blueedgenick Mar 13, 2019

Contributor

i guess what we mean here is that wildcard types are not supported for the output type - but they should be ok on the input side - e.g. Integer countMembers(List<?> input), or do i misunderstand the limitation ?

This comment has been minimized.

Copy link
@agavra

agavra Mar 13, 2019

Author Contributor

This limitation is correct, we should be able to do that - though I believe the following is nearly semantically equivalent:

<T> Integer countMembers(List<T> input)
will register a corresponding function with the wrapping array type (e.g. `String[]`). At runtime,
if no function matches the explicit parameter types, we will fallback to search for any array-type
parameter method declaration of the same type iff all parameters are of a single type. If any
parameter is null, it will be considered valid for any vararg declaration.

This comment has been minimized.

Copy link
@blueedgenick

blueedgenick Mar 13, 2019

Contributor

bit of a nit-pick but we might want to extend this fall-back slightly to match functions that have all trailing args of the same type, rather than all args of the same type. Motivating example: String concat_ws(String separator, String... fieldsToConcat) - here we want to match a fn with a signature of String, String[]

This comment has been minimized.

Copy link
@agavra

agavra Mar 13, 2019

Author Contributor

Good point - I hadn't thought of that

interface will have the `mapValues` stage applied to them.

```java
class AverageUdaf implements Udaf<Long, Struct>, Exportable<Struct, Long> {

This comment has been minimized.

Copy link
@blueedgenick

blueedgenick Mar 13, 2019

Contributor

Few questions here:

  • thinking from the perspective of someone who might implement a UDAF using this new capability, the Udaf<Long, Struct> part seems non-intuitive - i'd expect here to declare the output type of my UDAF (only). Feels like I'm telling you what i want to serialize between invocations in the Exportable declaration, and then unnecessarily repeating it in the Udaf part ? (or does this have to be here for backwards compatibility reasons i'm not quite seeing?)
  • perhaps another option to try to simplify the ux could be a new ExportableUdaf interface, thereby not requiring the second interface ?
  • another random thought on backwards compatibility: we could have a default no-op implementation of the export method on the interface itself ? Although i guess this simplification would be traded against the fact that we'd then do an unnecessary invocation of mapValues for UDAFs which didn't strictly require it ?
    Apologies if those Qs are off the mark or missing something obvious, i'm very rusty!

This comment has been minimized.

Copy link
@agavra

agavra Mar 13, 2019

Author Contributor
  • (or does this have to be here for backwards compatibility reasons i'm not quite seeing?)

The problem here, is that existing signatures would be messed up. Imagine I have the following class:

class MyUdaf implements Udaf<Long, Long> {
  public Long aggregate(final Long val, final Long agg);
}

I want to implement the following (using my syntax):

class MyExportUdaf implements Udaf<Long, Struct>, Exportable<Struct, Long> {
    public Struct aggregate(final Long val, final Struct agg);
    public Long export(Struct agg);
}

If I change Udaf<V, A> to Udaf<V, VR>, then MyExportUdaf#aggregate doesn't work:

class MyExportUdaf implements Udaf<Long, Long>, Exportable<Struct, Long> {
    public Struct aggregate(final Long val, final Long agg); // don't have access to `Struct`
}

On the other hand, if I have Udaf<V, A, VR> then MyUdaf no longer compiles because it doesn't specify VR.

  • we could have a default no-op implementation of the export method on the interface itself

See above. We can't do this because we need to know the export types. If we add VR export(A) to Udaf, we need a way to define VR (simply adding it will break all existing Udafs).

  • perhaps another option to try to simplify the ux could be a new ExportableUdaf interface, thereby not requiring the second interface ?

Definitely a possibility, and it could ExportableUdaf<V, A, VR> extends Udaf<V, VR>. I actually quite like this option.


If we expect UDAFs to commonly require exportable functionality, then we can make this change in a
backwards incompatible change and introduce the `export` (or `terminate`) method into the UDAF
interface directly.

This comment has been minimized.

Copy link
@blueedgenick

blueedgenick Mar 13, 2019

Contributor

i don't think this would be the end of the world from a UX perspective, just needs documenting properly in the release notes. We could obviously convert over the in-built UDAFs as we implement this. I do believe the majority of non-trivial UDAFs will need to avail themselves of this new capability.

This comment has been minimized.

Copy link
@agavra

agavra Mar 13, 2019

Author Contributor

@MichaelDrogalis thoughts?

>class AverageUdaf implements Udaf<Long, Struct>, Exportable<Struct, Long> {
>
> @Override
> public Long export(Struct runningSum) {

This comment has been minimized.

Copy link
@blueedgenick

blueedgenick Mar 13, 2019

Contributor

nit, also applicable to the proposed code implementation of 'average' above: i found it harder to follow the example because i kept wondering where you were going to allow for fractional numbers in the output :) - if we change the return type to Double then it will help clarify which Long in the various signatures represents the incoming values to aggregate vs which one is the return type.

| Varargs support | UDF interface will accept `...` in the method declarations |
| Complex Aggregation | UDAFs may include `VR export(A agg)` to convert aggregated to output |
| Structured UDFs | UDF/UDAF interfaces will accept the Kafka Connect `Struct` type |

This comment has been minimized.

Copy link
@blueedgenick

blueedgenick Mar 13, 2019

Contributor

can we consider adding a bullet for making it possible to access the record context from inside a UDF/UDAF ? Here i mean context in the Kafka Streams sense (see ). I guess we might consider that unnecessarily powerful and 'unsafe' in some way, in which case the core motivation here is to access the various metadata fields of the current record (offset, partition, timestamp, ...) so perhaps we can consider copying those out into some value object and giving access to that instead of to the actual ProcessorContext.

This comment has been minimized.

Copy link
@agavra

agavra Mar 13, 2019

Author Contributor

Stateful operations are a serious beast! Unless this is really one of the more requested features (or would dramatically change the way I design the other four) I'd prefer to leave it out of scope for this KLIP. Lots of small KLIPs make me a happy engineer 😄

This comment has been minimized.

Copy link
@blueedgenick

blueedgenick Mar 13, 2019

Contributor

OOps, my link got screwed up there: https://kafka.apache.org/21/documentation/streams/developer-guide/processor-api.html#accessing-processor-context
Understood! Am asking here in case you feel the need to change the public interface in order to allow for it in some way that would be a PITA or require some compatibility-breaking change later.
My motivating example for this is a simple Earliest or Latest UDAF, which returns the specified field from the input record with the lowest or highest incoming timestamp for this window, respectively.

This comment has been minimized.

Copy link
@agavra

agavra Mar 13, 2019

Author Contributor

Makes sense - I will do some prototyping and thinking about APIs to see if there's any good way without changing public APIs for this. If public APIs need to change, then I agree that we should at least design around that here.

This comment has been minimized.

Copy link
@apurvam

apurvam Apr 2, 2019

Contributor

@agavra where did we land up on this thread?

This comment has been minimized.

Copy link
@agavra

agavra Apr 2, 2019

Author Contributor

We would need to change two things:

  1. Introduce an API to access it (and likely just a limited subset of information available in it)
  2. Change our SelectValueMapper to implement ValueTransformer instead of ValueMapper. Based on my initial exploration, this should be completely backwards compatible; if it is not or adds some performance overhead, we can add some scaffolding code around it to dynamically choose one or the other based on the presence of the API

Both of these can be easily done in backwards compatible fashion and can probably be done as just a PR. FYI @blueedgenick

are resolved properly. Anytime a method is registered with variable args (e.g. `String... args`) we
will register a corresponding function with the wrapping array type (e.g. `String[]`). At runtime,
we resolve methods using the following fallback logic:
```

This comment has been minimized.

Copy link
@agavra

agavra Mar 21, 2019

Author Contributor

I found a good way to implement this algorithm with a Trie data structure.

@agavra agavra referenced this pull request Mar 21, 2019
0 of 2 tasks complete
@rodesai
Copy link
Contributor

left a comment

Thanks @agavra, nice write up! I've left a bunch of in-line comments.

- _Variable Argument Support_ - Varargs are required to create UDFs that accept an arbitrary
number of inputs. Creating a simple `concat` UDF that could sum multiple columns would require
creating method signatures for as many input combinations as desired.
- _Complex Aggregation Models_ - Today, KSQL only supports tracking a single variable for

This comment has been minimized.

Copy link
@rodesai

rodesai Mar 22, 2019

Contributor

Nice, we can have a proper TOPK function now 😄

This comment has been minimized.

Copy link
@apurvam

apurvam Apr 2, 2019

Contributor

Indeed. It should be trivial with a one line reduce implementation in most cases.


| Improvement | API change |
|:---------------------:|------------------------------------------------------------------------|
| Generics support | UDF interface will accept inferred generics (e.g. `List<T>`) |

This comment has been minimized.

Copy link
@rodesai

rodesai Mar 22, 2019

Contributor

How will these be rendered in API responses? For example, if I describe the function ARRAYCONTAINS I get something like:

  "@type": "describe_function",
  "statementText": "describe function ARRAYCONTAINS;",
  "name": "ARRAYCONTAINS",
  "description": "",
  "author": "Confluent",
  "version": "",
  "path": "internal",
  "functions": [
    {
      "arguments": [
        {
          "name": "",
          "type": "VARCHAR",
          "description": ""
        },
        {
          "name": "",
          "type": "VARCHAR",
          "description": ""
        }
      ],
      "returnType": "BOOLEAN",
      "description": "",
      "argumentTypes": [
        "VARCHAR",
        "VARCHAR"
      ]
    },

It would be nice to replace something like ARRAYCONTAINS with a function that takes a generic. What would be the type for the List? I think it's important to decide that now because down the road when we want to support writing functions with their code embedded in api calls (e.g. a python function), we need a syntax to describe this, and that syntax should match what the API returns.

Also, do we want to support generic types for args not in containers? Using ARRAYCONTAINS as an example again, the second parameter could have a generic type.

This comment has been minimized.

Copy link
@agavra

agavra Mar 25, 2019

Author Contributor

Signature for generic functions: It looks like neither Hive nor most other SQL languages support this type of functionality (Hive has it through the ObjectInspector interface, which is quite bulky...). I think we have two options here:

  • Generate all possible signatures from a list of all supported types (e.g. DESCRIBE FUNCTION would should ARRAY<VARCHAR>, ARRAY<INT>, ARRAY<BIGINT>...). This would allow us to punt the problem to another time and instead "simulate" registering all possible generics. This can get super complicated when you infer more than one type (obviously it's exponential complexity), but we can likely limit it to one generic inference for now and cover 80% of use cases.
  • Introduce a new syntax for generics. Off the top of my head, I imagine that it would looks something like ARRAY<?>.

Support for non-container generics: Yes. We should definitely support this, and the same strategy should apply (e.g. we must be able to infer the output schema from the input parameters) and we would resolve method signatures in the same was described above.

This comment has been minimized.

Copy link
@agavra

agavra Mar 29, 2019

Author Contributor

Talked with the team and we believe that the first alternative isn't a good option. The most updated revision has the up-to-date recommendation, which will be to increment a type T1...TN for each generic inferred type.

is already in place to support `Struct` types as input parameters. Minor changes need to be made to
ensure that the validation does not unnecessarily prevent usage of `Struct`.

A noteworthy extension that may be worth supporting is dedicated AVRO types in the signatures for

This comment has been minimized.

Copy link
@rodesai

rodesai Mar 22, 2019

Contributor

Just so I understand, the idea here is to use avro classes generated by the specific serializer, which generates code?

This comment has been minimized.

Copy link
@agavra

agavra Mar 22, 2019

Author Contributor

Yes - and leveraging reader/writer schema would allow us to easily upgrade the UDFs for schema evolution

public static final Schema SCHEMA = SchemaBuilder.struct()...build();
@Udf("Checks if the employee has a valid record (i.e. contains a valid name and email)")
@UdfReturn(value = "STRUCT<'A' INT, 'B' VARCHAR>") // sample specification annotation

This comment has been minimized.

Copy link
@rodesai

rodesai Mar 22, 2019

Contributor

Yeah I think we'll need both 1 and 3 at least.


### Structured UDFs

After refactoring done in [#2411](https://github.com/confluentinc/ksql/pull/2411), the type coercion

This comment has been minimized.

Copy link
@rodesai

rodesai Mar 22, 2019

Contributor

How do we know how the schema of the struct parameter maps to a udf implementation, in the case a user wants to overload a function? One use case for this might be if a user evolves the schema of the source stream/table of a query, and needs to provide 2 implementations for the function for handling the 2 different versions of his function. I think it's fine to only support a single implementation of a function that accepts Struct for a given parameter, but we should call that out.

I think it's also worth thinking about if/how we want to support this down the line. For example, strong-typing vs duck-typing. I think in the current proposal these functions are kind of "duck-typed" - if the schema in the stream has all the fields the function uses then its all good. But if tomorrow we want to support overloading these then we need some way to choose which one to bind to - which is hard if we're not strongly typed for structs.

This comment has been minimized.

Copy link
@agavra

agavra Mar 22, 2019

Author Contributor

Interesting... originally I had imagined just specifying the output struct schema but perhaps this brings up a good need to specify (using the same proposal above) all Struct parameters as well.

This comment has been minimized.

Copy link
@agavra

agavra Mar 22, 2019

Author Contributor

To solve schema evolution, however, I think we should just compare writer/reader schema and check if they are compatible (e.g. no fields have been removed). From there, data evolution is possible if they take care to only upgrade their data after all servers have the new udf that can handle both new and old data.

vararg = desired[-1]
while desired[-1] == vararg:
if signature_exists(desired, arrayOf(vararg)) return method
desired = desired[:-1]

This comment has been minimized.

Copy link
@rodesai

rodesai Mar 22, 2019

Contributor

This should fail if desired has a different type from desired[:-1] right? Not trying to nitpick pseudocode, just want to make sure I understand the intention correctly.

This comment has been minimized.

Copy link
@agavra

agavra Mar 22, 2019

Author Contributor

Correct - but it's actually even more complicated than that. My implementation covers all the edge cases (see #2595). Namely imagine these two method signatures:

void foo(String, Integer, String...)
void foo(String, String...)

If I call foo("hello", null, "world") resolving this get's kinda tricky. That's probably enough to illustrate the point.


### Varargs

Varargs boils down to supporting native Java arrays as parameters and ensuring that component types

This comment has been minimized.

Copy link
@rodesai

rodesai Mar 22, 2019

Contributor

How do we match struct varargs?

This comment has been minimized.

Copy link
@agavra

agavra Mar 22, 2019

Author Contributor

I think solving your question

How do we know how the schema of the struct parameter maps to a udf implementation, in the case a user wants to overload a function?

Would answer this as well. We make sure that each struct has the same schema as the one defined in the parameter schema.


### Complex Aggregation

We will allow users to supply an additional method `VR export(A agg)`. This method will be taken

This comment has been minimized.

Copy link
@rodesai

rodesai Mar 22, 2019

Contributor

I'm not understanding why this is necessary. Why can't we just change our aggregations to support arbitrary numbers of arguments, and arbitrary return types. Why do we need this extra export step. The fact that we only support 1 parameter which requires the same type is because we've hard-coded it this way. May be easier to discuss in person.

This comment has been minimized.

Copy link
@agavra

agavra Mar 22, 2019

Author Contributor

You might not want to aggregate on the value that you return. For example, in the avg case you want to aggregate on struct{long, long} but want to return double. Let's talk in person

@agavra agavra requested a review from confluentinc/ksql Mar 22, 2019

@agavra agavra force-pushed the agavra:udf_klip branch from 80dc680 to 8a7b360 Mar 25, 2019

@agavra agavra force-pushed the agavra:udf_klip branch from 8a7b360 to f76e6db Mar 25, 2019

@blueedgenick blueedgenick referenced this pull request Mar 28, 2019
@vcrfxia
Copy link
Contributor

left a comment

Thanks @agavra -- LGTM! Excited to see these new features in 5.3 :)

@UdfSchema
public static final Schema SCHEMA = SchemaBuilder.struct()...build();
@Udf(schema="STRUCT<'VAL' VARCHAR, 'LENGTH' INT>")

This comment has been minimized.

Copy link
@vcrfxia

vcrfxia Mar 29, 2019

Contributor

Minor point but probably good to clarify: are the single-quotes around the struct field names required? Are the field names case-sensitive?

This comment has been minimized.

Copy link
@agavra

agavra Mar 29, 2019

Author Contributor

I will use the exact same syntax as tableElement in SqlBase.g4. I think you are correct in that the quotes are not required and it is not case sensitive (unless you quote, but that behavior is broken as of today)


`UdfFactory` could then load the correct UDF given the schema by matching the schema of the type
against various different methods. It will be possible to support multiple methods with different
struct schemas if the name of the udf method is changed (not that the java method name is ignored

This comment has been minimized.

Copy link
@vcrfxia

vcrfxia Mar 29, 2019

Contributor
Suggested change
struct schemas if the name of the udf method is changed (not that the java method name is ignored
struct schemas if the name of the udf method is changed (note that the java method name is ignored

nit: typo (had to read this a few times to make sense of it though...)

### Generics

* The `DESCRIBE FUNCTION` should indicate when a function accepts generics using `ANY<?>` where `?`
is a letter that begins with `T` and is incremented for each inferred type.

This comment has been minimized.

Copy link
@vcrfxia

vcrfxia Mar 29, 2019

Contributor

What happens if we run out of letters? Does it perhaps make sense to use T1, T2, ... instead?

This comment has been minimized.

Copy link
@agavra

agavra Mar 29, 2019

Author Contributor

That's a better idea!

>```
* Syntax reference needs to be updated to reflect any new UDF/UDAFs that are implemented as part of
this KLIP.

This comment has been minimized.

Copy link
@vcrfxia

vcrfxia Mar 29, 2019

Contributor

Perhaps worth adding another note about the compatibility-breaking change around the new UDAF structure introduced by complex aggregations. (It's already mentioned above, but I think it'd be good to call it out again here.) Either a note here in the docs-update section about updating the upgrade notes, or a mention in the "Compatibility implications" section below.

@vcrfxia vcrfxia requested a review from confluentinc/ksql Mar 29, 2019

```

We need to ensure that the output type can be inferred. This means that we need to fail if the
signature is something like `<T> List<T> convert(List<String> list)`. This can be done during

This comment has been minimized.

Copy link
@apurvam

apurvam Apr 2, 2019

Contributor

Is this example accurate? The generic type in the output is indeed present in one of the parameters.

This comment has been minimized.

Copy link
@agavra

agavra Apr 2, 2019

Author Contributor

Perhaps the wording is unclear, but I think the example stands - I only consider List<String> as the parameter, while <T> does not show up in that (we can't infer<T> from List<T>).

@UdfSchema
public static final Schema SCHEMA = SchemaBuilder.struct()...build();
@Udf(schema="STRUCT<val VARCHAR, length INT>")

This comment has been minimized.

Copy link
@apurvam

apurvam Apr 2, 2019

Contributor

If I understand this correctly, the implementation will parse out the types using the existing grammar and then generate the schema from the parsed types while loading the UDFs?

This comment has been minimized.

Copy link
@agavra

agavra Apr 2, 2019

Author Contributor

yup!

@agavra agavra requested a review from confluentinc/ksql Apr 2, 2019

@hjafarpour
Copy link
Member

left a comment

LGTM with a minor comment.

We need to ensure that the output type can be inferred. This means that we need to fail if the
signature is something like `<T> List<T> convert(List<String> list)`. This can be done during
compilation by following a simple rule: any generic type used in the output must be present in at
least one of the parameters. We can access this information via reflection.

This comment has been minimized.

Copy link
@hjafarpour

hjafarpour Apr 2, 2019

Member

Wouldn't this prevent us from having UDFs where the generic type in output is not the same as the parameters?
Consider the following example where we have a list of user addresses and we want to extract the list of zipcodes from it. Assuming the address is a struct or string and zipcode is an integer.

This comment has been minimized.

Copy link
@agavra

agavra Apr 3, 2019

Author Contributor

I'm not sure I understand your question. In the example that you give, you would not need to use inferred generics:

List<Integer> extractZipcodes(@UdfParameter(schema=...) List<Struct> users)

As of this KLIP we don't have a good way to specify schema for List<Struct>, but that's a discussion for another time.

@agavra agavra merged commit 26c9dac into confluentinc:master Apr 3, 2019

1 check passed

continuous-integration/jenkins/pr-merge This commit looks good
Details
@agavra agavra referenced this pull request Apr 16, 2019
0 of 2 tasks complete
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants
You can’t perform that action at this time.