-
Notifications
You must be signed in to change notification settings - Fork 13.7k
[FLINK-37865] Adds documentation for AsyncTableFunction #26611
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
@@ -1172,6 +1173,153 @@ If you intend to implement or call functions in Python, please refer to the [Pyt | |||
|
|||
{{< top >}} | |||
|
|||
Asynchronous Table Functions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should add placeholder information into the Chinese docs.
|
||
Similar to `AsyncScalarFunction`, there also exists a `AsyncTableFunction` for returning multiple row results rather than a single scalar value. Similarly, this is most useful when interacting with external systems (for example when enriching stream events with data stored in a database). | ||
|
||
Asynchronous interaction with an external system means that a single function instance can handle many requests concurrently and receive the responses concurrently. That way, the waiting time can be overlaid with sending other requests and receiving responses. At the very least, the waiting time is amortized over multiple requests. This leads in most cased to much higher streaming throughput. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: cased -> cases
|
||
#### Defining an AsyncTableFunction | ||
|
||
A user-defined asynchronous table function maps zero, one, or multiple scalar values to zero, one, or multiple Rows, but does it asynchronously. Any data type listed in the [data types section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or return type of an evaluation method. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Do we need to say "but does it asynchronously." as we have already said it is asynchronous
|
||
A user-defined asynchronous table function maps zero, one, or multiple scalar values to zero, one, or multiple Rows, but does it asynchronously. Any data type listed in the [data types section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or return type of an evaluation method. | ||
|
||
In order to define an asynchronous table function, one has to extend the base class `AsyncTableFunction` in `org.apache.flink.table.functions` and implement one or more evaluation methods named `eval(...)`. The first argument must be a `CompletableFuture<...>` which is used to return the result, with subsequent arguments being the parameters passed to the function. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we could remove one has to
|
||
In order to define an asynchronous table function, one has to extend the base class `AsyncTableFunction` in `org.apache.flink.table.functions` and implement one or more evaluation methods named `eval(...)`. The first argument must be a `CompletableFuture<...>` which is used to return the result, with subsequent arguments being the parameters passed to the function. | ||
|
||
The number of outstanding calls to `eval` may be configured by `table.exec.async-scalar.buffer-capacity`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe include a link to config parameter
The number of outstanding calls to `eval` may be configured by `table.exec.async-scalar.buffer-capacity`. | ||
|
||
#### Asynchronous Semantics | ||
While calls to an `AsyncTableFunction` may be completed out of the original input order, to maintain correct semantics, the outputs of the function are guaranteed to maintain that input order to downstream components of the query. The data itself could reveal completion order (e.g. by containing fetch timestamps), so the user should consider whether this is acceptable for the use-case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the use-case -> their use-case
While calls to an `AsyncTableFunction` may be completed out of the original input order, to maintain correct semantics, the outputs of the function are guaranteed to maintain that input order to downstream components of the query. The data itself could reveal completion order (e.g. by containing fetch timestamps), so the user should consider whether this is acceptable for the use-case. | ||
|
||
#### Error Handling | ||
The primary way for a user to indicate an error is to call `completableFuture.completeExceptionally(throwable)`. Similarly, if an exception is encountered by the system when invoking `eval`, that will also result in an error. When an error occurs, the system will consider the retry strategy, configured by `table.exec.async-table.retry-strategy`. If this is `NO_RETRY`, it will fail the job immediately. If it is set to `FIXED_DELAY`, a period of `table.exec.async-table.retry-delay` will be waited, and the function call will be retried and given another attempt to succeed. If the number of retries exceeds `table.exec.async-table.max-attempts` or if the timeout `table.exec.async-table.timeout` expires (including all retry attempts), the job will fail. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it will fail the job immediately. -> the job is failed.
The it is not very clear for me
nit: do we need the text and given another attempt to succeed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we are talking about NO_RETRY
and FIXED_DELAY
values, I suggest a table or list would be easier to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
If the number of retries exceeds
table.exec.async-table.max-attempts`
I assume it can never exceed the max retries. Also this is the number of attempt not retries (which would be one less than attempts)
I suggest:
If there have been table.exec.async-table.max-attempts
failed attempts
|
||
``` | ||
{{< /tab >}} | ||
{{< tab "Scala" >}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I question whether we want to give a Scala sample. he Scala API was removed at v2. Is there precedence at v2 for scala examples in the docs?
{{< tab "Java" >}} | ||
|
||
```java | ||
import org.apache.flink.table.api.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a sample / test including this code , so we can ensure that it stays valid. Ideally we would have a sample that would be build in the Flink build and we bring in the source as some sort of snippet in the docs.
WDYT?
|
||
A user-defined asynchronous table function maps zero, one, or multiple scalar values to zero, one, or multiple Rows, but does it asynchronously. Any data type listed in the [data types section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or return type of an evaluation method. | ||
|
||
In order to define an asynchronous table function, one has to extend the base class `AsyncTableFunction` in `org.apache.flink.table.functions` and implement one or more evaluation methods named `eval(...)`. The first argument must be a `CompletableFuture<...>` which is used to return the result, with subsequent arguments being the parameters passed to the function. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks eval
overloading is supported? Maybe add that we example code as well
What is the purpose of the change
Adds documentation for
AsyncTableFunction
.Brief change log
(for example:)
Verifying this change
Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation