Skip to content

Commit

Permalink
[FLINK-23055][docs][table] Add documentation for window tvf offset.
Browse files Browse the repository at this point in the history
  • Loading branch information
beyond1920 committed Aug 27, 2021
1 parent b3dea50 commit 4b46a4c
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 8 deletions.
67 changes: 63 additions & 4 deletions docs/content.zh/docs/dev/table/sql/queries/window-tvf.md
Expand Up @@ -58,15 +58,16 @@ The `TUMBLE` function assigns each element to a window of specified window size.

The `TUMBLE` function assigns a window for each row of a relation based on a [time attribute]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column. The return value of `TUMBLE` is a new relation that includes all columns of original relation as well as additional 3 columns named "window_start", "window_end", "window_time" to indicate the assigned window. The original time attribute "timecol" will be a regular timestamp column after window TVF.

`TUMBLE` function takes three required parameters:
`TUMBLE` function takes three required parameters, one optional parameter:

```sql
TUMBLE(TABLE data, DESCRIPTOR(timecol), size)
TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
```

- `data`: is a table parameter that can be any relation with a time attribute column.
- `timecol`: is a column descriptor indicating which [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column of data should be mapped to tumbling windows.
- `size`: is a duration specifying the width of the tumbling windows.
- `offset`: is an optional parameter to specify the offset which window start would be shifted by.

Here is an example invocation on the `Bid` table:

Expand Down Expand Up @@ -142,7 +143,7 @@ For example, you could have windows of size 10 minutes that slides by 5 minutes.

The `HOP` function assigns windows that cover rows within the interval of size and shifting every slide based on a [time attribute]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column. The return value of `HOP` is a new relation that includes all columns of original relation as well as additional 3 columns named "window_start", "window_end", "window_time" to indicate the assigned window. The original time attribute "timecol" will be a regular timestamp column after windowing TVF.

`HOP` takes four required parameters.
`HOP` takes four required parameters, one optional parameter:

```sql
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
Expand All @@ -152,6 +153,7 @@ HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
- `timecol`: is a column descriptor indicating which [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column of data should be mapped to hopping windows.
- `slide`: is a duration specifying the duration between the start of sequential hopping windows
- `size`: is a duration specifying the width of the hopping windows.
- `offset`: is an optional parameter to specify the offset which window start would be shifted by.

Here is an example invocation on the `Bid` table:

Expand Down Expand Up @@ -214,7 +216,7 @@ For example, you could have a cumulating window for 1 hour step and 1 day max si

The `CUMULATE` functions assigns windows based on a [time attribute]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column. The return value of `CUMULATE` is a new relation that includes all columns of original relation as well as additional 3 columns named "window_start", "window_end", "window_time" to indicate the assigned window. The original time attribute "timecol" will be a regular timestamp column after window TVF.

`CUMULATE` takes four required parameters.
`CUMULATE` takes four required parameters, one optional parameter:

```sql
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
Expand All @@ -224,6 +226,7 @@ CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
- `timecol`: is a column descriptor indicating which [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column of data should be mapped to tumbling windows.
- `step`: is a duration specifying the increased window size between the end of sequential cumulating windows.
- `size`: is a duration specifying the max width of the cumulating windows. `size` must be an integral multiple of `step`.
- `offset`: is an optional parameter to specify the offset which window start would be shifted by.

Here is an example invocation on the Bid table:

Expand Down Expand Up @@ -282,4 +285,60 @@ Here is an example invocation on the Bid table:
+------------------+------------------+-------+
```

### Window offset
`Offset` is an optional parameter which could be used to change the window assignment. It could be positive duration and negative duration. Default values for window offset is 0. The same record maybe assigned to the different window if set different offset value.
For example, which window would be assigned to for a record with timestamp `2021-06-30 00:00:04` for a Tumble window with 10 MINUTE as size?
If `offset` value is `-16 MINUTE`, the record assigns to window [`2021-06-29 23:54:00`, `2021-06-30 00:04:00`).
If `offset` value is `-6 MINUTE`, the record assigns to window [`2021-06-29 23:54:00`, `2021-06-30 00:04:00`).
If `offset` is `-4 MINUTE`, the record assigns to window [`2021-06-29 23:56:00`, `2021-06-30 00:06:00`).
If `offset` is `0`, the record assigns to window [`2021-06-30 00:00:00`, `2021-06-30 00:10:00`).
If `offset` is `4 MINUTE`, the record assigns to window [`2021-06-29 23:54:00`, `2021-06-30 00:04:00`).
If `offset` is `6 MINUTE`, the record assigns to window [`2021-06-29 23:56:00`, `2021-06-30 00:06:00`).
If `offset` is `16 MINUTE`, the record assigns to window [`2021-06-29 23:56:00`, `2021-06-30 00:06:00`).
We could find that, some windows offset parameters may have same effect on the assignment of windows. In the above case, `-16 MINUTE`, `-6 MINUTE` and `4 MINUTE` have same effect for a Tumble window with 10 MINUTE as size.

*Note: The effect of window offset is just for updating window assignment, it has no effect on Watermark.*

We show an example to describe how to use offset in Tumble window in the following SQL.

```sql
-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
-- window table-valued function should be used with aggregate operation,
-- this example is just used for explaining the syntax and the data produced by table-valued function.
Flink SQL> SELECT * FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
Flink SQL> SELECT * FROM TABLE(
TUMBLE(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
SIZE => INTERVAL '10' MINUTES,
OFFSET => INTERVAL '1' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
| bidtime | price | item | window_start | window_end | window_time |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+

-- apply aggregation on the tumbling windowed table
Flink SQL> SELECT window_start, window_end, SUM(price)
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES))
GROUP BY window_start, window_end;
+------------------+------------------+-------+
| window_start | window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:01 | 2020-04-15 08:11 | 11.00 |
| 2020-04-15 08:11 | 2020-04-15 08:21 | 10.00 |
+------------------+------------------+-------+
```

*Note: in order to better understand the behavior of windowing, we simplify the displaying of timestamp values to not show the trailing zeros, e.g. `2020-04-15 08:05` should be displayed as `2020-04-15 08:05:00.000` in Flink SQL Client if the type is `TIMESTAMP(3)`.*

{{< top >}}
67 changes: 63 additions & 4 deletions docs/content/docs/dev/table/sql/queries/window-tvf.md
Expand Up @@ -58,15 +58,16 @@ The `TUMBLE` function assigns each element to a window of specified window size.

The `TUMBLE` function assigns a window for each row of a relation based on a [time attribute]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column. The return value of `TUMBLE` is a new relation that includes all columns of original relation as well as additional 3 columns named "window_start", "window_end", "window_time" to indicate the assigned window. The original time attribute "timecol" will be a regular timestamp column after window TVF.

`TUMBLE` function takes three required parameters:
`TUMBLE` function takes three required parameters, one optional parameter:

```sql
TUMBLE(TABLE data, DESCRIPTOR(timecol), size)
TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
```

- `data`: is a table parameter that can be any relation with a time attribute column.
- `timecol`: is a column descriptor indicating which [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column of data should be mapped to tumbling windows.
- `size`: is a duration specifying the width of the tumbling windows.
- `offset`: is an optional parameter to specify the offset which window start would be shifted by.

Here is an example invocation on the `Bid` table:

Expand Down Expand Up @@ -142,7 +143,7 @@ For example, you could have windows of size 10 minutes that slides by 5 minutes.

The `HOP` function assigns windows that cover rows within the interval of size and shifting every slide based on a [time attribute]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column. The return value of `HOP` is a new relation that includes all columns of original relation as well as additional 3 columns named "window_start", "window_end", "window_time" to indicate the assigned window. The original time attribute "timecol" will be a regular timestamp column after windowing TVF.

`HOP` takes four required parameters.
`HOP` takes four required parameters, one optional parameter:

```sql
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
Expand All @@ -152,6 +153,7 @@ HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
- `timecol`: is a column descriptor indicating which [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column of data should be mapped to hopping windows.
- `slide`: is a duration specifying the duration between the start of sequential hopping windows
- `size`: is a duration specifying the width of the hopping windows.
- `offset`: is an optional parameter to specify the offset which window start would be shifted by.

Here is an example invocation on the `Bid` table:

Expand Down Expand Up @@ -214,7 +216,7 @@ For example, you could have a cumulating window for 1 hour step and 1 day max si

The `CUMULATE` functions assigns windows based on a [time attribute]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column. The return value of `CUMULATE` is a new relation that includes all columns of original relation as well as additional 3 columns named "window_start", "window_end", "window_time" to indicate the assigned window. The original time attribute "timecol" will be a regular timestamp column after window TVF.

`CUMULATE` takes four required parameters.
`CUMULATE` takes four required parameters, one optional parameter:

```sql
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
Expand All @@ -224,6 +226,7 @@ CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
- `timecol`: is a column descriptor indicating which [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column of data should be mapped to tumbling windows.
- `step`: is a duration specifying the increased window size between the end of sequential cumulating windows.
- `size`: is a duration specifying the max width of the cumulating windows. `size` must be an integral multiple of `step`.
- `offset`: is an optional parameter to specify the offset which window start would be shifted by.

Here is an example invocation on the Bid table:

Expand Down Expand Up @@ -282,4 +285,60 @@ Here is an example invocation on the Bid table:
+------------------+------------------+-------+
```

### Window offset
`Offset` is an optional parameter which could be used to change the window assignment. It could be positive duration and negative duration. Default values for window offset is 0. The same record maybe assigned to the different window if set different offset value.
For example, which window would be assigned to for a record with timestamp `2021-06-30 00:00:04` for a Tumble window with 10 MINUTE as size?
If `offset` value is `-16 MINUTE`, the record assigns to window [`2021-06-29 23:54:00`, `2021-06-30 00:04:00`).
If `offset` value is `-6 MINUTE`, the record assigns to window [`2021-06-29 23:54:00`, `2021-06-30 00:04:00`).
If `offset` is `-4 MINUTE`, the record assigns to window [`2021-06-29 23:56:00`, `2021-06-30 00:06:00`).
If `offset` is `0`, the record assigns to window [`2021-06-30 00:00:00`, `2021-06-30 00:10:00`).
If `offset` is `4 MINUTE`, the record assigns to window [`2021-06-29 23:54:00`, `2021-06-30 00:04:00`).
If `offset` is `6 MINUTE`, the record assigns to window [`2021-06-29 23:56:00`, `2021-06-30 00:06:00`).
If `offset` is `16 MINUTE`, the record assigns to window [`2021-06-29 23:56:00`, `2021-06-30 00:06:00`).
We could find that, some windows offset parameters may have same effect on the assignment of windows. In the above case, `-16 MINUTE`, `-6 MINUTE` and `4 MINUTE` have same effect for a Tumble window with 10 MINUTE as size.

*Note: The effect of window offset is just for updating window assignment, it has no effect on Watermark.*

We show an example to describe how to use offset in Tumble window in the following SQL.

```sql
-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
-- window table-valued function should be used with aggregate operation,
-- this example is just used for explaining the syntax and the data produced by table-valued function.
Flink SQL> SELECT * FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
Flink SQL> SELECT * FROM TABLE(
TUMBLE(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
SIZE => INTERVAL '10' MINUTES,
OFFSET => INTERVAL '1' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
| bidtime | price | item | window_start | window_end | window_time |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+

-- apply aggregation on the tumbling windowed table
Flink SQL> SELECT window_start, window_end, SUM(price)
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES))
GROUP BY window_start, window_end;
+------------------+------------------+-------+
| window_start | window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:01 | 2020-04-15 08:11 | 11.00 |
| 2020-04-15 08:11 | 2020-04-15 08:21 | 10.00 |
+------------------+------------------+-------+
```

*Note: in order to better understand the behavior of windowing, we simplify the displaying of timestamp values to not show the trailing zeros, e.g. `2020-04-15 08:05` should be displayed as `2020-04-15 08:05:00.000` in Flink SQL Client if the type is `TIMESTAMP(3)`.*

{{< top >}}

0 comments on commit 4b46a4c

Please sign in to comment.