Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KLIP-1: Improve UDF Interfaces #2503

Merged
merged 6 commits into from Apr 3, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions design-proposals/README.md
Expand Up @@ -13,11 +13,12 @@ Here is the process for submitting a KLIP and getting it approved.

# KsqL Improvement Proposals (aka KLIPs)

Next KLIP number: 0.
Next KLIP number: 2.

| KLIP | Status | Release |
|----------------------------------------|:------:| ------: |
| [KLIP-X: Template](klip-template.md) | Open | N/A |
| KLIP | Status | Release |
|----------------------------------------------------------------------|:------:| ------: |
| [KLIP-X: Template](klip-template.md) | Open | N/A |
| [KLIP-1: Improve UDF Interfaces](klip-1-improve-udf-interfaces.md) | Open | N/A |



314 changes: 314 additions & 0 deletions design-proposals/klip-1-improve-udf-interfaces.md
@@ -0,0 +1,314 @@
# KLIP-1: Improve UDF Interfaces

**Author**: agavra |
**Release Target**: 5.3 |
**Status**: In Discussion |
**Discussion**: link

**tl;dr:** *Address the most common feature requests related to UDF/UDAFs including
struct support, generic types, variable arguments and complex aggregation*


## Motivation and background

There have been many requests ([#2163](https://github.com/confluentinc/ksql/issues/2163)) to
improve the UDF/UDAF interface in KSQL. Of these features, four stand out:

- _Custom Struct Support_ - UDFs can only operate on standard Java types (`Map`, `List`, `String`,
etc...) and do not accept structured types. This dramatically limits what KSQL can handle, as
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of now, support for Map and List also is limited and you cannot have nested complex data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's rather unfortunate 😢 but I think nested complex data is out of scope for this specific one. I'll add that to the out of scope section unless you feel strongly that those should be covered as part of this KLIP.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might not be super-urgent compared to teh other items under discussion, but "just enough" nested support to allow e.g. a "flatten" function (List<T> flatten(List<List<T>>) would be useful.

much of the data flowing through kafka is structured Avro/JSON.
- _Generics Type Support_ - Today, there is no way to implement a single UDF that supports data
structures with implicit types. For example, implementing a generic `list_union` operator would
require different method signatures for each supported type. This quickly becomes unmanageable
when there are multiple arguments.
- _Variable Argument Support_ - Varargs are required to create UDFs that accept an arbitrary
number of inputs. Creating a simple `concat` UDF that could sum multiple columns would require
creating method signatures for as many input combinations as desired.
- _Complex Aggregation Models_ - Today, KSQL only supports tracking a single variable for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, we can have a proper TOPK function now 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. It should be trivial with a one line reduce implementation in most cases.

aggregation and it must mirror the output type. To implement an `avg` UDAF, it is necessary to store
both a `count` and a `running_total`.

## Scope

All changes will be made for the new annotation-based UDF interfaces and will not be backported to
the legacy implementation. The bullet points mentioned above are all in scope, but the following
will not be included in this KLIP:

- UDTFs - multiple output operations are more complicated and will require a KLIP of their own
- Generics/Varargs for UDAFs - the support for Generics/Varargs will only extend to UDFs.

## Value

Basic UDF operations that are supported in SQL languages will now be unblocked for KSQL. Some of
these are outlined below:

| Improvement | Value |
|:---------------------:|------------------------------------------------------------------------|
| Generics support | Unblock `ARRAY_LENGTH`, `ARRAY_SLICE`, `ARRAY_TO_STRING`, etc... |
| Varargs support | `CONCAT`, `MAX`, `MIN` etc... |
| Complex Aggregation | `AVG`, `PERCENTILE`, etc... |
| Structured UDFs | Arbitrary data handling without intermediary transforms |

## Public APIS

Some public APIs will be changed, though all will be changed in a backwards compatible fashion:

| Improvement | API change |
|:---------------------:|------------------------------------------------------------------------|
| Generics support | UDF interface will accept inferred generics (e.g. `List<T>`) |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will these be rendered in API responses? For example, if I describe the function ARRAYCONTAINS I get something like:

  "@type": "describe_function",
  "statementText": "describe function ARRAYCONTAINS;",
  "name": "ARRAYCONTAINS",
  "description": "",
  "author": "Confluent",
  "version": "",
  "path": "internal",
  "functions": [
    {
      "arguments": [
        {
          "name": "",
          "type": "VARCHAR",
          "description": ""
        },
        {
          "name": "",
          "type": "VARCHAR",
          "description": ""
        }
      ],
      "returnType": "BOOLEAN",
      "description": "",
      "argumentTypes": [
        "VARCHAR",
        "VARCHAR"
      ]
    },

It would be nice to replace something like ARRAYCONTAINS with a function that takes a generic. What would be the type for the List? I think it's important to decide that now because down the road when we want to support writing functions with their code embedded in api calls (e.g. a python function), we need a syntax to describe this, and that syntax should match what the API returns.

Also, do we want to support generic types for args not in containers? Using ARRAYCONTAINS as an example again, the second parameter could have a generic type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signature for generic functions: It looks like neither Hive nor most other SQL languages support this type of functionality (Hive has it through the ObjectInspector interface, which is quite bulky...). I think we have two options here:

  • Generate all possible signatures from a list of all supported types (e.g. DESCRIBE FUNCTION would should ARRAY<VARCHAR>, ARRAY<INT>, ARRAY<BIGINT>...). This would allow us to punt the problem to another time and instead "simulate" registering all possible generics. This can get super complicated when you infer more than one type (obviously it's exponential complexity), but we can likely limit it to one generic inference for now and cover 80% of use cases.
  • Introduce a new syntax for generics. Off the top of my head, I imagine that it would looks something like ARRAY<?>.

Support for non-container generics: Yes. We should definitely support this, and the same strategy should apply (e.g. we must be able to infer the output schema from the input parameters) and we would resolve method signatures in the same was described above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked with the team and we believe that the first alternative isn't a good option. The most updated revision has the up-to-date recommendation, which will be to increment a type T1...TN for each generic inferred type.

| Varargs support | UDF interface will accept `...` in the method declarations |
| Complex Aggregation | UDAFs may include `VR export(A agg)` to convert aggregated to output |
| Structured UDFs | UDF/UDAF interfaces will accept the Kafka Connect `Struct` type |

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we consider adding a bullet for making it possible to access the record context from inside a UDF/UDAF ? Here i mean context in the Kafka Streams sense (see ). I guess we might consider that unnecessarily powerful and 'unsafe' in some way, in which case the core motivation here is to access the various metadata fields of the current record (offset, partition, timestamp, ...) so perhaps we can consider copying those out into some value object and giving access to that instead of to the actual ProcessorContext.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stateful operations are a serious beast! Unless this is really one of the more requested features (or would dramatically change the way I design the other four) I'd prefer to leave it out of scope for this KLIP. Lots of small KLIPs make me a happy engineer 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OOps, my link got screwed up there: https://kafka.apache.org/21/documentation/streams/developer-guide/processor-api.html#accessing-processor-context
Understood! Am asking here in case you feel the need to change the public interface in order to allow for it in some way that would be a PITA or require some compatibility-breaking change later.
My motivating example for this is a simple Earliest or Latest UDAF, which returns the specified field from the input record with the lowest or highest incoming timestamp for this window, respectively.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense - I will do some prototyping and thinking about APIs to see if there's any good way without changing public APIs for this. If public APIs need to change, then I agree that we should at least design around that here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agavra where did we land up on this thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need to change two things:

  1. Introduce an API to access it (and likely just a limited subset of information available in it)
  2. Change our SelectValueMapper to implement ValueTransformer instead of ValueMapper. Based on my initial exploration, this should be completely backwards compatible; if it is not or adds some performance overhead, we can add some scaffolding code around it to dynamically choose one or the other based on the presence of the API

Both of these can be easily done in backwards compatible fashion and can probably be done as just a PR. FYI @blueedgenick

## Design

Each of these improvements is relatively self-contained and can be worked on independently and in
parallel. Below are detailed designs for each:

### Structured UDFs

After refactoring done in [#2411](https://github.com/confluentinc/ksql/pull/2411), the type coercion
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know how the schema of the struct parameter maps to a udf implementation, in the case a user wants to overload a function? One use case for this might be if a user evolves the schema of the source stream/table of a query, and needs to provide 2 implementations for the function for handling the 2 different versions of his function. I think it's fine to only support a single implementation of a function that accepts Struct for a given parameter, but we should call that out.

I think it's also worth thinking about if/how we want to support this down the line. For example, strong-typing vs duck-typing. I think in the current proposal these functions are kind of "duck-typed" - if the schema in the stream has all the fields the function uses then its all good. But if tomorrow we want to support overloading these then we need some way to choose which one to bind to - which is hard if we're not strongly typed for structs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting... originally I had imagined just specifying the output struct schema but perhaps this brings up a good need to specify (using the same proposal above) all Struct parameters as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To solve schema evolution, however, I think we should just compare writer/reader schema and check if they are compatible (e.g. no fields have been removed). From there, data evolution is possible if they take care to only upgrade their data after all servers have the new udf that can handle both new and old data.

is already in place to support `Struct` types as input parameters. Minor changes need to be made to
ensure that the validation does not unnecessarily prevent usage of `Struct`.

A noteworthy extension that may be worth supporting is dedicated AVRO types in the signatures for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so I understand, the idea here is to use avro classes generated by the specific serializer, which generates code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - and leveraging reader/writer schema would allow us to easily upgrade the UDFs for schema evolution

UDFs, but this is not covered in this KLIP.

```java
@Udf("Checks if the employee has a valid record (i.e. contains a valid name and email)")
public boolean isValid(
@UdfParameter final Struct employee) {
return employee.get("firstname").matches("[A-Z][a-z]*")
&& employee.get("email").endsWith("@company.io");
}
```

There is more work to be done in order to support `Struct` as the return value of UDFs, namely we
must have some mechanism to create the output schema. To address this, we can resolve the schema as
part of the UDF specification. This adds more structure and predictability, but may become tricky to
evolve and we need a good API to do this, especially if it is necessary to specify complicated
nested structs. Below are three candidate ways to specify the schema:

```java
@UdfSchema
public static final Schema SCHEMA = SchemaBuilder.struct()...build();

@Udf("Checks if the employee has a valid record (i.e. contains a valid name and email)")
@UdfReturn(value = "STRUCT<'A' INT, 'B' VARCHAR>") // sample specification annotation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm more in favor of this one. Even better if we can build and supply the schema object as the return type.

Copy link
Contributor Author

@agavra agavra Mar 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you are suggesting is similar to the third (inline) option. Thoughts? Also I'm not opposed to allowing all three ways. That way for simple ones we can support the inline one and for complicated ones we can support the second/third option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think we'll need both 1 and 3 at least.

@UdfReturn(file = 'schema_def.kschema') // another way pointing to a file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would the contents of the file be? Would the contents look like option 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's how I imagined it for this KLIP, though perhaps in the future we could also support .avsc or .json to specify the schema

@UdfReturn(schema = "name.space.MyClass.SCHEMA") // another way that would resolve a java object
public Struct generate() {
return new Struct(...);
}
```

### Generics

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering we need to have the full struct schema for the return value, how would generics work with complex types such as structs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention here was limited to Map and List. I'm not exactly sure what a generic struct output would look like (since there is no such thing as Struct<T>). Also check the note at the bottom, wildcards and non-inferable output types are not supported as of this KLIP.

Today, we eagerly convert UDF output types to corresponding Connect schema types at the time that we
compile the UDFs and not the time that we use them in a `SELECT` clause. Since there is no
corresponding "unresolved" type in Connect, we fail this conversion. To solve this problem, we can
infer the type from the source schema and compile the code to work with any component type (e.g.
omit the generics from the compiled code).

```java
@Udf("returns a sublist of 'list'")
public <T> List<T> sublist(
@UdfParameter final List<T> list,
@UdfParameter final int start,
@UdfParameter final int end) {
return list.subList(start, end);
}
```

We need to ensure that the output type can be inferred. This means that we need to fail if the
signature is something like `<T> List<T> convert(List<String> list)`. This can be done during
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this example accurate? The generic type in the output is indeed present in one of the parameters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps the wording is unclear, but I think the example stands - I only consider List<String> as the parameter, while <T> does not show up in that (we can't infer<T> from List<T>).

compilation by following a simple rule: any generic type used in the output must be present in at
least one of the parameters. We can access this information via reflection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this prevent us from having UDFs where the generic type in output is not the same as the parameters?
Consider the following example where we have a list of user addresses and we want to extract the list of zipcodes from it. Assuming the address is a struct or string and zipcode is an integer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand your question. In the example that you give, you would not need to use inferred generics:

List<Integer> extractZipcodes(@UdfParameter(schema=...) List<Struct> users)

As of this KLIP we don't have a good way to specify schema for List<Struct>, but that's a discussion for another time.


**NOTE:** Supporting a wildcard output type is not covered by this design since we would not be able
to generate the output schema for select statements, however supporting wildcards in the input types
(e.g. `Long length(List<?> list)`) should be possible.

### Varargs

Varargs boils down to supporting native Java arrays as parameters and ensuring that component types
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we match struct varargs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think solving your question

How do we know how the schema of the struct parameter maps to a udf implementation, in the case a user wants to overload a function?

Would answer this as well. We make sure that each struct has the same schema as the one defined in the parameter schema.

are resolved properly. Anytime a method is registered with variable args (e.g. `String... args`) we
will register a corresponding function with the wrapping array type (e.g. `String[]`). At runtime,
we resolve methods using the following fallback logic:
```
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a good way to implement this algorithm with a Trie data structure.

Type[] desired
if signature_exists(desired):
return method
else:
vararg = desired[-1]
while desired[-1] == vararg:
if signature_exists(desired, arrayOf(vararg)) return method
desired = desired[:-1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should fail if desired has a different type from desired[:-1] right? Not trying to nitpick pseudocode, just want to make sure I understand the intention correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct - but it's actually even more complicated than that. My implementation covers all the edge cases (see #2595). Namely imagine these two method signatures:

void foo(String, Integer, String...)
void foo(String, String...)

If I call foo("hello", null, "world") resolving this get's kinda tricky. That's probably enough to illustrate the point.

fail
```
This will allows us to resolve methods such as `foo(Int, String...)` and `foo(String, String...)`,
as well as accept empty arguments to `foo(String...)`. If any parameter is `null`, it will be
considered valid for any vararg declaration.

This proposal does not cover supporting `Object` as a parameter to a method in order to allow for
"generic variable argument" UDFs, but supporting it can be an extension.

```java
@Udf("returns a sublist of 'list' t")
public int sum(@UdfParameter final int... args) {
return Arrays.stream(args).sum();
}
```

### Complex Aggregation

We will allow users to supply an additional method `VR export(A agg)`. This method will be taken
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, what does VR stand for? I gather it's the return type but can't piece together the acronym.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume it stands for ValueReturn. I took it from KStreams:

    <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not understanding why this is necessary. Why can't we just change our aggregations to support arbitrary numbers of arguments, and arbitrary return types. Why do we need this extra export step. The fact that we only support 1 parameter which requires the same type is because we've hard-coded it this way. May be easier to discuss in person.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might not want to aggregate on the value that you return. For example, in the avg case you want to aggregate on struct{long, long} but want to return double. Let's talk in person

as the behavioral parameter to a `mapValues` task that will be applied after the `aggregate` task
in the generated KStreams topology. To support this in a backwards compatible fashion, we will
introduce a new interface `Exportable`, that UDFs may implement. Only UDAFs that implement this
interface will have the `mapValues` stage applied to them.

```java
// This UDAF accepts `Long` values, aggregates on a `Struct` and exports a `Double`
class AverageUdaf implements Udaf<Long, Struct>, Exportable<Struct, Double> {

private static final Schema SUM_SCHEMA =
SchemaBuilder.struct()
.field("sum", Schema.INT64_SCHEMA)
.field("count", Schema.INT64_SCHEMA).build();

@Override
public Struct initialize() {
return new Struct(SCHEMA);
}

@Override
public Struct aggregate(final Long val, final Struct agg) {
agg.put("sum", agg.get("sum") + val);
agg.put("count", agg.get("count") + 1);
return agg;
}

@Override
public Struct merge(final Struct agg1, final Struct agg2) {
agg1.put("sum", agg1.get("sum") + agg2.get("sum"));
agg1.put("count", agg1.get("count") + agg2.get("count"));
return agg1;
}

/**
* This is the new method that overrides {@code Exportable#export(A agg)} (note that
* {@code Exportable<A,VR>} is defined with type parameters for the input and output
* of this method.
*/
@Override
public Double export(final Struct agg) {
return ((Double) agg.getInt64("sum")) / agg.getInt64("count");
}

}
```

**NOTE:** It should be possible to support arbitrary Java objects for aggregation without much
extra work. For the scope of this KLIP, however, supporting just KSQL types (including `Struct`)
should suffice.

#### Alternative 1

Instead of introducing a new `Exportable<A, VR>` interface, we can introduce an interface that
inherits from `Udaf` instead:

```java
interface ExportableUdaf<V, VR, A> extends Udaf<V, A> {
VR export(A agg);
}
```

The behavior will be the same, but it allows us to specify the type parameter for `A` only once.

#### Alternative 2

If we expect UDAFs to commonly require exportable functionality, then we can make this change in a
backwards incompatible change and introduce the `export` (or `terminate`) method into the UDAF
interface directly.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think this would be the end of the world from a UX perspective, just needs documenting properly in the release notes. We could obviously convert over the in-built UDAFs as we implement this. I do believe the majority of non-trivial UDAFs will need to avail themselves of this new capability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MichaelDrogalis thoughts?


## Test plan

Each of these improvements will come with some built-in UDF/UDAF examples. These examples will then
be comprehensively covered by both unit tests as well as corresponding Query Translation Tests.
Specifically, the UDFs described in the [Value](#value) section will all be implemented alongside
this KLIP. For the new UDAF complex aggregation, we will ensure that the generated topologies do not
include any unexpected steps such as a repartition.

## Documentation Updates

* The `Example UDF Class` in `udf.rst` will be extended to showcase the new features. Namely, we
will replace the multiply double method to include varargs:

>```java
> @Udf(description = "multiply N non-nullable DOUBLEs.")
> public double multiply(final double... values) {
> return Arrays.stream(values).reduce((a, b) -> a * b);;
> }
>```

* The `Supported Types` section in `udf.rst` will include `Struct` and have an updated note at the
bottom which reads:

> Note: Complex types other than List, Map and Struct are not currently supported. Generic types
> for List and Map are supported, but component elements must be of one of the supported types. If
> the types are generic, the output type must be able to be inferred from one or more of the input
> parameters.

* The `UDAF` section in `udf.rst` will have an additional section to describe usage of the
`Exportable` interface:

> A UDAF can support complex aggregation types if it implements `io.confluent.common.Exportable`.
> This will allow your custom UDAF to convert some intermediate type used for aggregation to a
> KSQL output value. For example:
>
>```java
>@UdfFactory(description = "Computes a running average")
>class AverageUdaf implements Udaf<Long, Struct>, Exportable<Struct, Long> {
>
> @Override
> public Long export(Struct runningSum) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, also applicable to the proposed code implementation of 'average' above: i found it harder to follow the example because i kept wondering where you were going to allow for fractional numbers in the output :) - if we change the return type to Double then it will help clarify which Long in the various signatures represents the incoming values to aggregate vs which one is the return type.

> return runningSum.getInt64("sum") / runningSum.getInt64("count");
> }
>
> // ...
> }
>```
>
> You can then use this UDAF like any other: `SELECT AVERAGE(val) as avg FROM ...` and the output
> value will adhere to the export specification in place of the Udaf specification.

* For exportable UDAFs, we will also need to generate updated documentation for `DESCRIBE FUNCTION`
calls to make sure that the output value is not the intermediate value, but rather the final
exported value. This is a straightforward change in compiling the generated code. For example:

> ```
> ksql> DESCRIBE FUNCTION AVERAGE;
>
> Name : AVERAGE
> Author : Confluent
> Overview : Returns the average value for a window.
> Type : aggregate
> Jar : internal
> Variations :
>
> Variation : AVERAGE(BIGINT)
> Returns : BIGINT
> // Note that the signature for this is actually Udaf<Long, Struct> as seen above
>```

* Syntax reference needs to be updated to reflect any new UDF/UDAFs that are implemented as part of
this KLIP.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps worth adding another note about the compatibility-breaking change around the new UDAF structure introduced by complex aggregations. (It's already mentioned above, but I think it'd be good to call it out again here.) Either a note here in the docs-update section about updating the upgrade notes, or a mention in the "Compatibility implications" section below.


# Compatibility implications

N/A

## Performance implications

N/A