Skip to content

Commit

Permalink
Add an example for session window tvf with partition key
Browse files Browse the repository at this point in the history
  • Loading branch information
beyond1920 committed Aug 30, 2021
1 parent 0e63553 commit e5730d5
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 50 deletions.
134 changes: 109 additions & 25 deletions docs/content.zh/docs/dev/table/sql/queries/window-tvf.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,11 @@ Here is an example invocation on the `Bid` table:
-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
-- window table-valued function should be used with aggregate operation,
-- this example is just used for explaining the syntax and the data produced by table-valued function.
> SELECT * FROM TABLE(
Flink SQL> SELECT * FROM TABLE(
HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
> SELECT * FROM TABLE(
Flink SQL> SELECT * FROM TABLE(
HOP(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
Expand All @@ -189,7 +189,7 @@ Here is an example invocation on the `Bid` table:
+------------------+-------+------+------------------+------------------+-------------------------+

-- apply aggregation on the hopping windowed table
> SELECT window_start, window_end, SUM(price)
Flink SQL> SELECT window_start, window_end, SUM(price)
FROM TABLE(
HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
Expand Down Expand Up @@ -234,11 +234,11 @@ Here is an example invocation on the Bid table:
-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
-- window table-valued function should be used with aggregate operation,
-- this example is just used for explaining the syntax and the data produced by table-valued function.
> SELECT * FROM TABLE(
Flink SQL> SELECT * FROM TABLE(
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
> SELECT * FROM TABLE(
Flink SQL> SELECT * FROM TABLE(
CUMULATE(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
Expand Down Expand Up @@ -267,7 +267,7 @@ Here is an example invocation on the Bid table:
+------------------+-------+------+------------------+------------------+-------------------------+

-- apply aggregation on the cumulating windowed table
> SELECT window_start, window_end, SUM(price)
Flink SQL> SELECT window_start, window_end, SUM(price)
FROM TABLE(
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
Expand Down Expand Up @@ -306,45 +306,129 @@ SESSION(TABLE data, DESCRIPTOR(timecol), gap)
- `KEY`: is an optional parameter. If does not specify `key`, Session window is applied for all input records, otherwise, Session window is applied per key.
- `gap`: is a duration specifying how long the period of inactivity is. When this period expires, the current session closes and subsequent elements are assigned to a new session window.

Here is an example invocation on the Bid table:
Here is an example about global session window (which has no partition key) on the BidRich table:

```sql
-- tables must have time attribute, e.g. `bidtime` in this table
Flink SQL> desc BidRich;
+-------------+------------------------+------+-----+--------+---------------------------------+
| name | type | null | key | extras | watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND |
| price | DECIMAL(10, 2) | true | | | |
| item | STRING | true | | | |
| supplier_id | STRING | true | | | |
+-------------+------------------------+------+-----+--------+---------------------------------+

Flink SQL> SELECT * FROM BidRich;
+------------------+-------+------+-------------+
| bidtime | price | item | supplier_id |
+------------------+-------+------+-------------+
| 2020-04-15 08:05 | 4.00 | A | supplier1 |
| 2020-04-15 08:06 | 4.00 | C | supplier2 |
| 2020-04-15 08:07 | 2.00 | G | supplier1 |
| 2020-04-15 08:08 | 2.00 | B | supplier3 |
| 2020-04-15 08:09 | 5.00 | D | supplier4 |
| 2020-04-15 08:11 | 2.00 | B | supplier3 |
| 2020-04-15 08:13 | 1.00 | E | supplier1 |
| 2020-04-15 08:15 | 3.00 | H | supplier2 |
| 2020-04-15 08:17 | 6.00 | F | supplier5 |
+------------------+-------+------+-------------+

-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
-- window table-valued function should be used with aggregate operation,
-- this example is just used for explaining the syntax and the data produced by table-valued function.
> SELECT * FROM TABLE(
SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '3' MINUTES));
Flink SQL> SELECT * FROM TABLE(
SESSION(TABLE BidRich, DESCRIPTOR(bidtime), INTERVAL '3' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
> SELECT * FROM TABLE(
Flink SQL> SELECT * FROM TABLE(
SESSION(
DATA => TABLE Bid,
DATA => TABLE BidRich,
TIMECOL => DESCRIPTOR(bidtime),
GAP => INTERVAL '3' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
| bidtime | price | item | window_start | window_end | window_time |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:05 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:05 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:05 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:05 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:05 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:17 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+
+------------------+-------+------+-------------+------------------+------------------+-------------------------+
| bidtime | price | item | supplier_id | window_start | window_end | window_time |
+------------------+-------+------+-------------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 | 4.00 | A | supplier1 | 2020-04-15 08:05 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:06 | 4.00 | C | supplier2 | 2020-04-15 08:05 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:07 | 2.00 | G | supplier1 | 2020-04-15 08:05 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:08 | 2.00 | B | supplier3 | 2020-04-15 08:05 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:09 | 5.00 | D | supplier4 | 2020-04-15 08:05 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:11 | 2.00 | B | supplier3 | 2020-04-15 08:05 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 | 1.00 | E | supplier1 | 2020-04-15 08:05 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:15 | 3.00 | H | supplier2 | 2020-04-15 08:05 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 | 6.00 | F | supplier5 | 2020-04-15 08:05 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+------------------+-------+------+-------------+------------------+------------------+-------------------------+

-- apply aggregation on the session windowed table
> SELECT window_start, window_end, SUM(price)
Flink SQL> SELECT window_start, window_end, SUM(price)
FROM TABLE(
SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '3' MINUTES))
SESSION(TABLE BidRich, DESCRIPTOR(bidtime), INTERVAL '3' MINUTES))
GROUP BY window_start, window_end;
+------------------+------------------+-------+
| window_start | window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:05 | 2020-04-15 08:16 | 15.00 |
| 2020-04-15 08:17 | 2020-04-15 08:20 | 6.00 |
| 2020-04-15 08:05 | 2020-04-15 08:20 | 29.00 |
+------------------+------------------+-------+
```

Here is an example about session window which has partition key on the BidRich table:

```sql
-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
-- window table-valued function should be used with aggregate operation,
-- this example is just used for explaining the syntax and the data produced by table-valued function.
Flink SQL> SELECT * FROM TABLE(
SESSION(TABLE BidRich, DESCRIPTOR(bidtime), DESCRIPTOR(supplier_id), INTERVAL '3' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
Flink SQL> SELECT * FROM TABLE(
SESSION(
DATA => TABLE BidRich,
TIMECOL => DESCRIPTOR(bidtime),
KEY => DESCRIPTOR(supplier_id)
GAP => INTERVAL '3' MINUTES));
+------------------+-------+------+-------------+------------------+------------------+-------------------------+
| bidtime | price | item | supplier_id | window_start | window_end | window_time |
+------------------+-------+------+-------------+------------------+------------------+-------------------------+
| 2020-04-15 08:06 | 4.00 | C | supplier2 | 2020-04-15 08:06 | 2020-04-15 08:09 | 2020-04-15 08:08:59.999 |
| 2020-04-15 08:05 | 4.00 | A | supplier1 | 2020-04-15 08:05 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 | 2.00 | G | supplier1 | 2020-04-15 08:05 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 | 5.00 | D | supplier4 | 2020-04-15 08:09 | 2020-04-15 08:12 | 2020-04-15 08:11:59.999 |
| 2020-04-15 08:08 | 2.00 | B | supplier3 | 2020-04-15 08:08 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
| 2020-04-15 08:11 | 2.00 | B | supplier3 | 2020-04-15 08:08 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
| 2020-04-15 08:13 | 1.00 | E | supplier1 | 2020-04-15 08:13 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:15 | 3.00 | H | supplier2 | 2020-04-15 08:15 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
| 2020-04-15 08:17 | 6.00 | F | supplier5 | 2020-04-15 08:17 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+------------------+-------+------+-------------+------------------+------------------+-------------------------+

-- apply aggregation on the session windowed table
Flink SQL> SELECT supplier_id, window_start, window_end, SUM(price)
FROM TABLE(
SESSION(TABLE BidRich, DESCRIPTOR(bidtime), DESCRIPTOR(supplier_id), INTERVAL '3' MINUTES))
GROUP BY supplier_id, window_start, window_end;
+-------------+------------------+------------------+-------+
| supplier_id | window_start | window_end | price |
+-------------+------------------+------------------+-------+
| supplier2 | 2020-04-15 08:06 | 2020-04-15 08:09 | 4.00 |
+-------------+------------------+------------------+-------+
| supplier1 | 2020-04-15 08:05 | 2020-04-15 08:10 | 6.00 |
+-------------+------------------+------------------+-------+
| supplier4 | 2020-04-15 08:09 | 2020-04-15 08:12 | 5.00 |
+-------------+------------------+------------------+-------+
| supplier3 | 2020-04-15 08:08 | 2020-04-15 08:14 | 4.00 |
+-------------+------------------+------------------+-------+
| supplier1 | 2020-04-15 08:13 | 2020-04-15 08:16 | 1.00 |
+-------------+------------------+------------------+-------+
| supplier2 | 2020-04-15 08:15 | 2020-04-15 08:18 | 3.00 |
+-------------+------------------+------------------+-------+
| supplier5 | 2020-04-15 08:17 | 2020-04-15 08:20 | 6.00 |
+-------------+------------------+------------------+-------+
```

*Note: in order to better understand the behavior of windowing, we simplify the displaying of timestamp values to not show the trailing zeros, e.g. `2020-04-15 08:05` should be displayed as `2020-04-15 08:05:00.000` in Flink SQL Client if the type is `TIMESTAMP(3)`.*

## Window Offset
`Offset` is an optional parameter which could be used to change the window assignment. It could be positive duration and negative duration. Default values for window offset is 0. The same record maybe assigned to the different window if set different offset value.
For example, which window would be assigned to for a record with timestamp `2021-06-30 00:00:04` for a Tumble window with 10 MINUTE as size?
Expand Down

0 comments on commit e5730d5

Please sign in to comment.