Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-23055][docs][table] Add documentation for window tvf offset. #17015

Merged
merged 2 commits into from Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
67 changes: 63 additions & 4 deletions docs/content.zh/docs/dev/table/sql/queries/window-tvf.md
Expand Up @@ -58,15 +58,16 @@ The `TUMBLE` function assigns each element to a window of specified window size.

The `TUMBLE` function assigns a window for each row of a relation based on a [time attribute]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column. The return value of `TUMBLE` is a new relation that includes all columns of original relation as well as additional 3 columns named "window_start", "window_end", "window_time" to indicate the assigned window. The original time attribute "timecol" will be a regular timestamp column after window TVF.

`TUMBLE` function takes three required parameters:
`TUMBLE` function takes three required parameters, one optional parameter:

```sql
TUMBLE(TABLE data, DESCRIPTOR(timecol), size)
TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
```

- `data`: is a table parameter that can be any relation with a time attribute column.
- `timecol`: is a column descriptor indicating which [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column of data should be mapped to tumbling windows.
- `size`: is a duration specifying the width of the tumbling windows.
- `offset`: is an optional parameter to specify the offset which window start would be shifted by.

Here is an example invocation on the `Bid` table:

Expand Down Expand Up @@ -142,7 +143,7 @@ For example, you could have windows of size 10 minutes that slides by 5 minutes.

The `HOP` function assigns windows that cover rows within the interval of size and shifting every slide based on a [time attribute]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column. The return value of `HOP` is a new relation that includes all columns of original relation as well as additional 3 columns named "window_start", "window_end", "window_time" to indicate the assigned window. The original time attribute "timecol" will be a regular timestamp column after windowing TVF.

`HOP` takes four required parameters.
`HOP` takes four required parameters, one optional parameter:

```sql
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
Expand All @@ -152,6 +153,7 @@ HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
- `timecol`: is a column descriptor indicating which [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column of data should be mapped to hopping windows.
- `slide`: is a duration specifying the duration between the start of sequential hopping windows
- `size`: is a duration specifying the width of the hopping windows.
- `offset`: is an optional parameter to specify the offset which window start would be shifted by.

Here is an example invocation on the `Bid` table:

Expand Down Expand Up @@ -214,7 +216,7 @@ For example, you could have a cumulating window for 1 hour step and 1 day max si

The `CUMULATE` functions assigns windows based on a [time attribute]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column. The return value of `CUMULATE` is a new relation that includes all columns of original relation as well as additional 3 columns named "window_start", "window_end", "window_time" to indicate the assigned window. The original time attribute "timecol" will be a regular timestamp column after window TVF.

`CUMULATE` takes four required parameters.
`CUMULATE` takes four required parameters, one optional parameter:

```sql
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
Expand All @@ -224,6 +226,7 @@ CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
- `timecol`: is a column descriptor indicating which [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column of data should be mapped to tumbling windows.
- `step`: is a duration specifying the increased window size between the end of sequential cumulating windows.
- `size`: is a duration specifying the max width of the cumulating windows. `size` must be an integral multiple of `step`.
- `offset`: is an optional parameter to specify the offset which window start would be shifted by.

Here is an example invocation on the Bid table:

Expand Down Expand Up @@ -282,4 +285,60 @@ Here is an example invocation on the Bid table:
+------------------+------------------+-------+
```

## Window Offset
`Offset` is an optional parameter which could be used to change the window assignment. It could be positive duration and negative duration. Default values for window offset is 0. The same record maybe assigned to the different window if set different offset value.
For example, which window would be assigned to for a record with timestamp `2021-06-30 00:00:04` for a Tumble window with 10 MINUTE as size?
- If `offset` value is `-16 MINUTE`, the record assigns to window [`2021-06-29 23:54:00`, `2021-06-30 00:04:00`).
- If `offset` value is `-6 MINUTE`, the record assigns to window [`2021-06-29 23:54:00`, `2021-06-30 00:04:00`).
- If `offset` is `-4 MINUTE`, the record assigns to window [`2021-06-29 23:56:00`, `2021-06-30 00:06:00`).
- If `offset` is `0`, the record assigns to window [`2021-06-30 00:00:00`, `2021-06-30 00:10:00`).
- If `offset` is `4 MINUTE`, the record assigns to window [`2021-06-29 23:54:00`, `2021-06-30 00:04:00`).
- If `offset` is `6 MINUTE`, the record assigns to window [`2021-06-29 23:56:00`, `2021-06-30 00:06:00`).
- If `offset` is `16 MINUTE`, the record assigns to window [`2021-06-29 23:56:00`, `2021-06-30 00:06:00`).
We could find that, some windows offset parameters may have same effect on the assignment of windows. In the above case, `-16 MINUTE`, `-6 MINUTE` and `4 MINUTE` have same effect for a Tumble window with 10 MINUTE as size.

*Note: The effect of window offset is just for updating window assignment, it has no effect on Watermark.*

We show an example to describe how to use offset in Tumble window in the following SQL.

```sql
-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
-- window table-valued function should be used with aggregate operation,
-- this example is just used for explaining the syntax and the data produced by table-valued function.
Flink SQL> SELECT * FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
Flink SQL> SELECT * FROM TABLE(
TUMBLE(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
SIZE => INTERVAL '10' MINUTES,
OFFSET => INTERVAL '1' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
| bidtime | price | item | window_start | window_end | window_time |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+

-- apply aggregation on the tumbling windowed table
Flink SQL> SELECT window_start, window_end, SUM(price)
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES))
GROUP BY window_start, window_end;
+------------------+------------------+-------+
| window_start | window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:01 | 2020-04-15 08:11 | 11.00 |
| 2020-04-15 08:11 | 2020-04-15 08:21 | 10.00 |
+------------------+------------------+-------+
```

*Note: in order to better understand the behavior of windowing, we simplify the displaying of timestamp values to not show the trailing zeros, e.g. `2020-04-15 08:05` should be displayed as `2020-04-15 08:05:00.000` in Flink SQL Client if the type is `TIMESTAMP(3)`.*

{{< top >}}
67 changes: 63 additions & 4 deletions docs/content/docs/dev/table/sql/queries/window-tvf.md
Expand Up @@ -58,15 +58,16 @@ The `TUMBLE` function assigns each element to a window of specified window size.

The `TUMBLE` function assigns a window for each row of a relation based on a [time attribute]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column. The return value of `TUMBLE` is a new relation that includes all columns of original relation as well as additional 3 columns named "window_start", "window_end", "window_time" to indicate the assigned window. The original time attribute "timecol" will be a regular timestamp column after window TVF.

`TUMBLE` function takes three required parameters:
`TUMBLE` function takes three required parameters, one optional parameter:

```sql
TUMBLE(TABLE data, DESCRIPTOR(timecol), size)
TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
```

- `data`: is a table parameter that can be any relation with a time attribute column.
- `timecol`: is a column descriptor indicating which [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column of data should be mapped to tumbling windows.
- `size`: is a duration specifying the width of the tumbling windows.
- `offset`: is an optional parameter to specify the offset which window start would be shifted by.

Here is an example invocation on the `Bid` table:

Expand Down Expand Up @@ -142,7 +143,7 @@ For example, you could have windows of size 10 minutes that slides by 5 minutes.

The `HOP` function assigns windows that cover rows within the interval of size and shifting every slide based on a [time attribute]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column. The return value of `HOP` is a new relation that includes all columns of original relation as well as additional 3 columns named "window_start", "window_end", "window_time" to indicate the assigned window. The original time attribute "timecol" will be a regular timestamp column after windowing TVF.

`HOP` takes four required parameters.
`HOP` takes four required parameters, one optional parameter:

```sql
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
Expand All @@ -152,6 +153,7 @@ HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
- `timecol`: is a column descriptor indicating which [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column of data should be mapped to hopping windows.
- `slide`: is a duration specifying the duration between the start of sequential hopping windows
- `size`: is a duration specifying the width of the hopping windows.
- `offset`: is an optional parameter to specify the offset which window start would be shifted by.

Here is an example invocation on the `Bid` table:

Expand Down Expand Up @@ -214,7 +216,7 @@ For example, you could have a cumulating window for 1 hour step and 1 day max si

The `CUMULATE` functions assigns windows based on a [time attribute]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column. The return value of `CUMULATE` is a new relation that includes all columns of original relation as well as additional 3 columns named "window_start", "window_end", "window_time" to indicate the assigned window. The original time attribute "timecol" will be a regular timestamp column after window TVF.

`CUMULATE` takes four required parameters.
`CUMULATE` takes four required parameters, one optional parameter:

```sql
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
Expand All @@ -224,6 +226,7 @@ CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
- `timecol`: is a column descriptor indicating which [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column of data should be mapped to tumbling windows.
- `step`: is a duration specifying the increased window size between the end of sequential cumulating windows.
- `size`: is a duration specifying the max width of the cumulating windows. `size` must be an integral multiple of `step`.
- `offset`: is an optional parameter to specify the offset which window start would be shifted by.

Here is an example invocation on the Bid table:

Expand Down Expand Up @@ -282,4 +285,60 @@ Here is an example invocation on the Bid table:
+------------------+------------------+-------+
```

## Window Offset
`Offset` is an optional parameter which could be used to change the window assignment. It could be positive duration and negative duration. Default values for window offset is 0. The same record maybe assigned to the different window if set different offset value.
For example, which window would be assigned to for a record with timestamp `2021-06-30 00:00:04` for a Tumble window with 10 MINUTE as size?
- If `offset` value is `-16 MINUTE`, the record assigns to window [`2021-06-29 23:54:00`, `2021-06-30 00:04:00`).
- If `offset` value is `-6 MINUTE`, the record assigns to window [`2021-06-29 23:54:00`, `2021-06-30 00:04:00`).
- If `offset` is `-4 MINUTE`, the record assigns to window [`2021-06-29 23:56:00`, `2021-06-30 00:06:00`).
- If `offset` is `0`, the record assigns to window [`2021-06-30 00:00:00`, `2021-06-30 00:10:00`).
- If `offset` is `4 MINUTE`, the record assigns to window [`2021-06-29 23:54:00`, `2021-06-30 00:04:00`).
- If `offset` is `6 MINUTE`, the record assigns to window [`2021-06-29 23:56:00`, `2021-06-30 00:06:00`).
- If `offset` is `16 MINUTE`, the record assigns to window [`2021-06-29 23:56:00`, `2021-06-30 00:06:00`).
We could find that, some windows offset parameters may have same effect on the assignment of windows. In the above case, `-16 MINUTE`, `-6 MINUTE` and `4 MINUTE` have same effect for a Tumble window with 10 MINUTE as size.

*Note: The effect of window offset is just for updating window assignment, it has no effect on Watermark.*

We show an example to describe how to use offset in Tumble window in the following SQL.

```sql
-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
-- window table-valued function should be used with aggregate operation,
-- this example is just used for explaining the syntax and the data produced by table-valued function.
Flink SQL> SELECT * FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
Flink SQL> SELECT * FROM TABLE(
TUMBLE(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
SIZE => INTERVAL '10' MINUTES,
OFFSET => INTERVAL '1' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
| bidtime | price | item | window_start | window_end | window_time |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+

-- apply aggregation on the tumbling windowed table
Flink SQL> SELECT window_start, window_end, SUM(price)
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES))
GROUP BY window_start, window_end;
+------------------+------------------+-------+
| window_start | window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:01 | 2020-04-15 08:11 | 11.00 |
| 2020-04-15 08:11 | 2020-04-15 08:21 | 10.00 |
+------------------+------------------+-------+
```

*Note: in order to better understand the behavior of windowing, we simplify the displaying of timestamp values to not show the trailing zeros, e.g. `2020-04-15 08:05` should be displayed as `2020-04-15 08:05:00.000` in Flink SQL Client if the type is `TIMESTAMP(3)`.*

{{< top >}}