Skip to content

Commit

Permalink
Add an example for session window tvf with partition key
Browse files Browse the repository at this point in the history
  • Loading branch information
beyond1920 committed Aug 30, 2021
1 parent 0e63553 commit 555342f
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 62 deletions.
146 changes: 115 additions & 31 deletions docs/content.zh/docs/dev/table/sql/queries/window-tvf.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,11 @@ Here is an example invocation on the `Bid` table:
-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
-- window table-valued function should be used with aggregate operation,
-- this example is just used for explaining the syntax and the data produced by table-valued function.
> SELECT * FROM TABLE(
Flink SQL> SELECT * FROM TABLE(
HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
> SELECT * FROM TABLE(
Flink SQL> SELECT * FROM TABLE(
HOP(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
Expand All @@ -189,7 +189,7 @@ Here is an example invocation on the `Bid` table:
+------------------+-------+------+------------------+------------------+-------------------------+

-- apply aggregation on the hopping windowed table
> SELECT window_start, window_end, SUM(price)
Flink SQL> SELECT window_start, window_end, SUM(price)
FROM TABLE(
HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
Expand Down Expand Up @@ -234,11 +234,11 @@ Here is an example invocation on the Bid table:
-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
-- window table-valued function should be used with aggregate operation,
-- this example is just used for explaining the syntax and the data produced by table-valued function.
> SELECT * FROM TABLE(
Flink SQL> SELECT * FROM TABLE(
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
> SELECT * FROM TABLE(
Flink SQL> SELECT * FROM TABLE(
CUMULATE(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
Expand Down Expand Up @@ -267,7 +267,7 @@ Here is an example invocation on the Bid table:
+------------------+-------+------+------------------+------------------+-------------------------+

-- apply aggregation on the cumulating windowed table
> SELECT window_start, window_end, SUM(price)
Flink SQL> SELECT window_start, window_end, SUM(price)
FROM TABLE(
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
Expand Down Expand Up @@ -306,45 +306,129 @@ SESSION(TABLE data, DESCRIPTOR(timecol), gap)
- `KEY`: is an optional parameter. If does not specify `key`, Session window is applied for all input records, otherwise, Session window is applied per key.
- `gap`: is a duration specifying how long the period of inactivity is. When this period expires, the current session closes and subsequent elements are assigned to a new session window.

Here is an example invocation on the Bid table:
Here is an example about global session window on the BidRich table:

```sql
-- tables must have time attribute, e.g. `bidtime` in this table
Flink SQL> desc BidRich;
+-------------+------------------------+------+-----+--------+---------------------------------+
| name | type | null | key | extras | watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND |
| price | DECIMAL(10, 2) | true | | | |
| item | STRING | true | | | |
| supplier_id | STRING | true | | | |
+-------------+------------------------+------+-----+--------+---------------------------------+

Flink SQL> SELECT * FROM BidRich;
+------------------+-------+------+-------------+
| bidtime | price | item | supplier_id |
+------------------+-------+------+-------------+
| 2020-04-15 08:05 | 4.00 | A | supplier1 |
| 2020-04-15 08:06 | 4.00 | C | supplier2 |
| 2020-04-15 08:07 | 2.00 | G | supplier1 |
| 2020-04-15 08:08 | 2.00 | B | supplier3 |
| 2020-04-15 08:09 | 5.00 | D | supplier4 |
| 2020-04-15 08:11 | 2.00 | B | supplier3 |
| 2020-04-15 08:13 | 1.00 | E | supplier1 |
| 2020-04-15 08:15 | 3.00 | H | supplier2 |
| 2020-04-15 08:17 | 6.00 | F | supplier5 |
+------------------+-------+------+-------------+

-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
-- window table-valued function should be used with aggregate operation,
-- this example is just used for explaining the syntax and the data produced by table-valued function.
> SELECT * FROM TABLE(
SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '3' MINUTES));
Flink SQL> SELECT * FROM TABLE(
SESSION(TABLE BidRich, DESCRIPTOR(bidtime), INTERVAL '3' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
> SELECT * FROM TABLE(
Flink SQL> SELECT * FROM TABLE(
SESSION(
DATA => TABLE Bid,
DATA => TABLE BidRich,
TIMECOL => DESCRIPTOR(bidtime),
GAP => INTERVAL '3' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
| bidtime | price | item | window_start | window_end | window_time |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:05 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:05 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:05 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:05 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:05 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:17 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+
+------------------+-------+------+-------------+------------------+------------------+-------------------------+
| bidtime | price | item | supplier_id | window_start | window_end | window_time |
+------------------+-------+------+-------------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 | 4.00 | A | supplier1 | 2020-04-15 08:05 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:06 | 4.00 | C | supplier2 | 2020-04-15 08:05 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:07 | 2.00 | G | supplier1 | 2020-04-15 08:05 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:08 | 2.00 | B | supplier3 | 2020-04-15 08:05 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:09 | 5.00 | D | supplier4 | 2020-04-15 08:05 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:11 | 2.00 | B | supplier3 | 2020-04-15 08:05 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 | 1.00 | E | supplier1 | 2020-04-15 08:05 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:15 | 3.00 | H | supplier2 | 2020-04-15 08:05 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 | 6.00 | F | supplier5 | 2020-04-15 08:05 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+------------------+-------+------+-------------+------------------+------------------+-------------------------+

-- apply aggregation on the session windowed table
> SELECT window_start, window_end, SUM(price)
Flink SQL> SELECT window_start, window_end, SUM(price)
FROM TABLE(
SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '3' MINUTES))
SESSION(TABLE BidRich, DESCRIPTOR(bidtime), INTERVAL '3' MINUTES))
GROUP BY window_start, window_end;
+------------------+------------------+-------+
| window_start | window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:05 | 2020-04-15 08:16 | 15.00 |
| 2020-04-15 08:17 | 2020-04-15 08:20 | 6.00 |
| 2020-04-15 08:05 | 2020-04-15 08:20 | 29.00 |
+------------------+------------------+-------+
```

Here is an example about partitioned session window on the BidRich table:

```sql
-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
-- window table-valued function should be used with aggregate operation,
-- this example is just used for explaining the syntax and the data produced by table-valued function.
Flink SQL> SELECT * FROM TABLE(
SESSION(TABLE BidRich, DESCRIPTOR(bidtime), DESCRIPTOR(supplier_id), INTERVAL '3' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
Flink SQL> SELECT * FROM TABLE(
SESSION(
DATA => TABLE BidRich,
TIMECOL => DESCRIPTOR(bidtime),
KEY => DESCRIPTOR(supplier_id)
GAP => INTERVAL '3' MINUTES));
+------------------+-------+------+-------------+------------------+------------------+-------------------------+
| bidtime | price | item | supplier_id | window_start | window_end | window_time |
+------------------+-------+------+-------------+------------------+------------------+-------------------------+
| 2020-04-15 08:06 | 4.00 | C | supplier2 | 2020-04-15 08:06 | 2020-04-15 08:09 | 2020-04-15 08:08:59.999 |
| 2020-04-15 08:05 | 4.00 | A | supplier1 | 2020-04-15 08:05 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 | 2.00 | G | supplier1 | 2020-04-15 08:05 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 | 5.00 | D | supplier4 | 2020-04-15 08:09 | 2020-04-15 08:12 | 2020-04-15 08:11:59.999 |
| 2020-04-15 08:08 | 2.00 | B | supplier3 | 2020-04-15 08:08 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
| 2020-04-15 08:11 | 2.00 | B | supplier3 | 2020-04-15 08:08 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
| 2020-04-15 08:13 | 1.00 | E | supplier1 | 2020-04-15 08:13 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:15 | 3.00 | H | supplier2 | 2020-04-15 08:15 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
| 2020-04-15 08:17 | 6.00 | F | supplier5 | 2020-04-15 08:17 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+------------------+-------+------+-------------+------------------+------------------+-------------------------+

-- apply aggregation on the session windowed table
Flink SQL> SELECT supplier_id, window_start, window_end, SUM(price)
FROM TABLE(
SESSION(TABLE BidRich, DESCRIPTOR(bidtime), DESCRIPTOR(supplier_id), INTERVAL '3' MINUTES))
GROUP BY supplier_id, window_start, window_end;
+-------------+------------------+------------------+-------+
| supplier_id | window_start | window_end | price |
+-------------+------------------+------------------+-------+
| supplier2 | 2020-04-15 08:06 | 2020-04-15 08:09 | 4.00 |
+-------------+------------------+------------------+-------+
| supplier1 | 2020-04-15 08:05 | 2020-04-15 08:10 | 6.00 |
+-------------+------------------+------------------+-------+
| supplier4 | 2020-04-15 08:09 | 2020-04-15 08:12 | 5.00 |
+-------------+------------------+------------------+-------+
| supplier3 | 2020-04-15 08:08 | 2020-04-15 08:14 | 4.00 |
+-------------+------------------+------------------+-------+
| supplier1 | 2020-04-15 08:13 | 2020-04-15 08:16 | 1.00 |
+-------------+------------------+------------------+-------+
| supplier2 | 2020-04-15 08:15 | 2020-04-15 08:18 | 3.00 |
+-------------+------------------+------------------+-------+
| supplier5 | 2020-04-15 08:17 | 2020-04-15 08:20 | 6.00 |
+-------------+------------------+------------------+-------+
```

*Note: in order to better understand the behavior of windowing, we simplify the displaying of timestamp values to not show the trailing zeros, e.g. `2020-04-15 08:05` should be displayed as `2020-04-15 08:05:00.000` in Flink SQL Client if the type is `TIMESTAMP(3)`.*

## Window Offset
`Offset` is an optional parameter which could be used to change the window assignment. It could be positive duration and negative duration. Default values for window offset is 0. The same record maybe assigned to the different window if set different offset value.
For example, which window would be assigned to for a record with timestamp `2021-06-30 00:00:04` for a Tumble window with 10 MINUTE as size?
Expand Down Expand Up @@ -378,12 +462,12 @@ Flink SQL> SELECT * FROM TABLE(
+------------------+-------+------+------------------+------------------+-------------------------+
| bidtime | price | item | window_start | window_end | window_time |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+

-- apply aggregation on the tumbling windowed table
Expand Down

0 comments on commit 555342f

Please sign in to comment.