Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 122 additions & 41 deletions docs/content.zh/docs/dev/table/functions/ptfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,107 @@ not needed anymore via `Context#clearAllTimers()` or `TimeContext#clearTimer(Str

{{< top >}}

Multiple Tables
---------------

A PTF can process multiple tables simultaneously. This enables a variety of use cases, including:

- Implementing **custom joins** that efficiently manage state.
- Enriching the main table with information from dimension tables as **side inputs**.
- Sending **control events** to the keyed virtual processor during runtime.

The `eval()` method can specify multiple table arguments to support multiple inputs. All table arguments must be declared
with set semantics and use consistent partitioning. In other words, the number of columns and their data types in the
`PARTITION BY` clause must match across all involved table arguments.

Rows from either input are passed to the function one at a time. Thus, only one table argument is non-null at a time. Use
null checks to determine which input is currently being processed.

{{< hint warning >}}
The system decides which input row is streamed through the virtual processor next. If not handled properly in the PTF,
this can lead to race conditions between inputs and, consequently, to non-deterministic results. It is recommended to
design the function in such a way that the join is either time-based (i.e., waiting for all rows to arrive up to a given
watermark) or condition-based, where the PTF buffers one or more input rows until a specific condition is met.
{{< /hint >}}

### Example: Custom Join

The following example illustrates how to implement a custom join between two tables:

{{< tabs "2137eeed-3d13-455c-8e2f-5e164da9f844" >}}
{{< tab "Java" >}}
```java
TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMode());

env.executeSql("CREATE VIEW Visits(name) AS VALUES ('Bob'), ('Alice'), ('Bob')");
env.executeSql("CREATE VIEW Purchases(customer, item) AS VALUES ('Alice', 'milk')");

env.createFunction("Greeting", GreetingWithLastPurchase.class);

env
.executeSql("SELECT * FROM Greeting(TABLE Visits PARTITION BY name, TABLE Purchases PARTITION BY customer)")
.print();

// --------------------
// Function declaration
// --------------------

// Function that greets a customer and suggests the last purchase made, if available.
public static class GreetingWithLastPurchase extends ProcessTableFunction<String> {

// Keep the last purchased item in state
public static class LastItemState {
public String lastItem;
}

// The eval() method takes two @ArgumentHint(TABLE_AS_SET) arguments
public void eval(
@StateHint LastItemState state,
@ArgumentHint(TABLE_AS_SET) Row visit,
@ArgumentHint(TABLE_AS_SET) Row purchase) {

// Process row from table Purchases
if (purchase != null) {
state.lastItem = purchase.getFieldAs("item");
}

// Process row from table Visits
else if (visit != null) {
if (state.lastItem == null) {
collect("Hello " + visit.getFieldAs("name") + ", let me know if I can help!");
} else {
collect("Hello " + visit.getFieldAs("name") + ", here to buy " + state.lastItem + " again?");
}
}
}
}
```
{{< /tab >}}
{{< /tabs >}}

The result will look similar to:

```text
+----+--------------------------------+--------------------------------+--------------------------------+
| op | name | customer | EXPR$0 |
+----+--------------------------------+--------------------------------+--------------------------------+
| +I | Bob | Bob | Hello Bob, let me know if I... |
| +I | Alice | Alice | Hello Alice, here to buy Pr... |
| +I | Bob | Bob | Hello Bob, let me know if I... |
+----+--------------------------------+--------------------------------+--------------------------------+
```

### Efficiency and Design Principles

A high number of input tables can negatively impact a single TaskManager or subtask. Network buffers must be allocated
for each input, resulting in increased memory consumption which is why the number of table arguments is limited to a
maximum of 20 tables.

Unevenly distributed keys may overload a single virtual processor, leading to backpressure. It is important to select
appropriate partition keys.

{{< top >}}

Query Evolution with UIDs
-------------------------

Expand Down Expand Up @@ -1061,7 +1162,8 @@ END;

{{< top >}}

## Pass-Through Columns
Pass-Through Columns
--------------------

Depending on the table semantics and whether an `on_time` argument has been defined, the system adds addition columns for
every function output.
Expand Down Expand Up @@ -1089,7 +1191,7 @@ With pass-through columns: | k | v | c1 | c2 |

This allows the PTF to focus on the main aggregation without the need to manually forward input columns.

*Note*: Timers are not available when pass-through columns are enabled.
*Note*: Pass-through columns are only available for append-only PTFs taking a single table argument and don't use timers.

{{< top >}}

Expand Down Expand Up @@ -1610,9 +1712,6 @@ while the PTF exists once in the pipeline.
The following example shows how a PTF can be used for joining. Additionally, it also showcases how a PTF can be used as
a data generator for creating bounded tables with dummy data.

Because PTFs don't support multiple table arguments yet, we use `unionAll` to for passing multiple partitioned tables
into the PTF. Because a union requires a unified schema, the data generators transform the data into a `UnifiedEvent`.

{{< tabs "1637eeed-3d13-455c-8e2f-5e164da9f844" >}}
{{< tab "Java" >}}
```java
Expand All @@ -1625,58 +1724,40 @@ TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMo
Table orders = env.fromCall(OrderGenerator.class);
Table payments = env.fromCall(PaymentGenerator.class);

// Union orders and payments before
// partitioning and passing them into the Joiner function
Table joined = orders.unionAll(payments)
.partitionBy($("orderId"))
.process(Joiner.class);
// Partition orders and payments and pass them into the Joiner function
Table joined = env.fromCall(
Joiner.class,
orders.partitionBy($("id")).asArgument("order"),
payments.partitionBy($("orderId")).asArgument("payment"));

joined.execute().print();

// ---------------------------
// Data Generation
// ---------------------------

// A unified event for all input tables.
// One of the sides is al empty.
public static class UnifiedEvent {
public int orderId;
public Order order;
public Payment payment;

public static UnifiedEvent of(int orderId, Order order, Payment payment) {
UnifiedEvent unifiedEvent = new UnifiedEvent();
unifiedEvent.orderId = orderId;
unifiedEvent.order = order;
unifiedEvent.payment = payment;
return unifiedEvent;
}
}

// A PTF that generates Orders
public static class OrderGenerator extends ProcessTableFunction<UnifiedEvent> {
public static class OrderGenerator extends ProcessTableFunction<Order> {
public void eval() {
Stream.of(
Order.of("Bob", 1000001, 23.46, "USD"),
Order.of("Bob", 1000021, 6.99, "USD"),
Order.of("Alice", 1000601, 0.79, "EUR"),
Order.of("Charly", 1000703, 100.60, "EUR")
)
.map(order -> UnifiedEvent.of(order.id, order, null))
.forEach(this::collect);
}
}

// A PTF that generates Payments
public static class PaymentGenerator extends ProcessTableFunction<UnifiedEvent> {
public static class PaymentGenerator extends ProcessTableFunction<Payment> {
public void eval() {
Stream.of(
Payment.of(999997870, 1000001),
Payment.of(999997870, 1000001),
Payment.of(999993331, 1000021),
Payment.of(999994111, 1000601)
)
.map(payment -> UnifiedEvent.of(payment.orderId, null, payment))
.forEach(this::collect);
}
}
Expand Down Expand Up @@ -1714,8 +1795,8 @@ public static class Payment {
{{< /tab >}}
{{< /tabs >}}

After generating the data and performing the union, the stateful Joiner buffers events until a matching pair is
found. Any duplicates in either of the input tables are ignored.
After generating the data, the stateful Joiner buffers events until a matching pair is found. Any duplicates in either
of the input tables are ignored.

{{< tabs "1737eeed-3d13-455c-8e2f-5e164da9f844" >}}
{{< tab "Java" >}}
Expand All @@ -1727,7 +1808,8 @@ public static class Joiner extends ProcessTableFunction<JoinResult> {
public void eval(
Context ctx,
@StateHint(ttl = "1 hour") JoinResult seen,
@ArgumentHint(TABLE_AS_SET) UnifiedEvent input
@ArgumentHint(TABLE_AS_SET) Order order,
@ArgumentHint(TABLE_AS_SET) Payment payment
) {
if (input.order != null) {
if (seen.order != null) {
Expand Down Expand Up @@ -1767,19 +1849,18 @@ The output could look similar to the following. Duplicate events for payment `99
for `Charly` could not be found.

```text
+----+-------------+--------------------------------+--------------------------------+
| op | orderId | order | payment |
+----+-------------+--------------------------------+--------------------------------+
| +I | 1000021 | (amount=6.99, currency=USD,... | (id=999993331, orderId=1000... |
| +I | 1000601 | (amount=0.79, currency=EUR,... | (id=999994111, orderId=1000... |
| +I | 1000001 | (amount=23.46, currency=USD... | (id=999997870, orderId=1000... |
+----+-------------+--------------------------------+--------------------------------+
+----+-------------+-------------+--------------------------------+--------------------------------+
| op | id | orderId | order | payment |
+----+-------------+-------------+--------------------------------+--------------------------------+
| +I | 1000021 | 1000021 | (amount=6.99, currency=USD,... | (id=999993331, orderId=1000... |
| +I | 1000601 | 1000601 | (amount=0.79, currency=EUR,... | (id=999994111, orderId=1000... |
| +I | 1000001 | 1000001 | (amount=23.46, currency=USD... | (id=999997870, orderId=1000... |
+----+-------------+-------------+--------------------------------+--------------------------------+
```

Limitations
-----------

PTFs are in an early stage. The following limitations apply:
- Multiple table arguments are not supported.
- PTFs cannot run in batch mode.
- Broadcast state
Loading