Skip to content

Commit

Permalink
[FLINK-5386] [table] Refactor window clause.
Browse files Browse the repository at this point in the history
- move window() before groupBy()
- make window alias mandatory
- groupBy() must include window alias

This closes #3046.
  • Loading branch information
sunjincheng121 authored and fhueske committed Jan 20, 2017
1 parent 8ccedd1 commit 6bf556e
Show file tree
Hide file tree
Showing 9 changed files with 501 additions and 314 deletions.
57 changes: 30 additions & 27 deletions docs/dev/table_api.md
Expand Up @@ -1022,69 +1022,72 @@ Temporal intervals can be represented as number of months (`Types.INTERVAL_MONTH

### Windows

The Table API is a declarative API to define queries on batch and streaming tables. Projection, selection, and union operations can be applied both on streaming and batch tables without additional semantics. Aggregations on (possibly) infinite streaming tables, however, can only be computed on finite groups of records. Group-window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. For batch tables, group-windows are a convenient shortcut to group records by time intervals.
The Table API is a declarative API to define queries on batch and streaming tables. Projection, selection, and union operations can be applied both on streaming and batch tables without additional semantics. Aggregations on (possibly) infinite streaming tables, however, can only be computed on finite groups of records. Window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. For batch tables, windows are a convenient shortcut to group records by time intervals.

Group-windows are defined using the `window(w: GroupWindow)` clause. The following example shows how to define a group-window aggregation on a table.
Windows are defined using the `window(w: Window)` clause and require an alias, which is specified using the `as` clause. In order to group a table by a window, the window alias must be referenced in the `groupBy(...)` clause like a regular grouping attribute.
The following example shows how to define a window aggregation on a table.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
Table table = input
.window(GroupWindow w) // define window
.select("b.sum") // aggregate
.window([Window w].as("w")) // define window with alias w
.groupBy("w") // group the table by window w
.select("b.sum") // aggregate
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val table = input
.window(w: GroupWindow) // define window
.select('b.sum) // aggregate
.window([w: Window] as 'w) // define window with alias w
.groupBy('w) // group the table by window w
.select('b.sum) // aggregate
{% endhighlight %}
</div>
</div>

In streaming environments, group-window aggregates can only be computed in parallel, if they are *keyed*, i.e., there is an additional `groupBy` attribute. Group-window aggregates without additional `groupBy`, such as in the example above, can only be evaluated in a single, non-parallel task. The following example shows how to define a keyed group-window aggregation on a table.
In streaming environments, window aggregates can only be computed in parallel if they group on one or more attributes in addition to the window, i.e., the `groupBy(...)` clause references a window alias and at least one additional attribute. A `groupBy(...)` clause that only references a window alias (such as in the example above) can only be evaluated by a single, non-parallel task.
The following example shows how to define a window aggregation with additional grouping attributes.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
Table table = input
.groupBy("a")
.window(GroupWindow w) // define window
.select("a, b.sum") // aggregate
.window([Window w].as("w")) // define window with alias w
.groupBy("w, a") // group the table by attribute a and window w
.select("a, b.sum") // aggregate
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val table = input
.groupBy('a)
.window(w: GroupWindow) // define window
.select('a, 'b.sum) // aggregate
.window([w: Window] as 'w) // define window with alias w
.groupBy('w, 'a) // group the table by attribute a and window w
.select('a, 'b.sum) // aggregate
{% endhighlight %}
</div>
</div>

The `GroupWindow` parameter defines how rows are mapped to windows. `GroupWindow` is not an interface that users can implement. Instead, the Table API provides a set of predefined `GroupWindow` classes with specific semantics, which are translated into underlying `DataStream` or `DataSet` operations. The supported window definitions are listed below.
By assigning the group-window an alias using `as`, properties such as the start and end timestamp of a time window can be accessed in the `select` statement.
The `Window` parameter defines how rows are mapped to windows. `Window` is not an interface that users can implement. Instead, the Table API provides a set of predefined `Window` classes with specific semantics, which are translated into underlying `DataStream` or `DataSet` operations. The supported window definitions are listed below. Window properties such as the start and end timestamp of a time window can be added in the select statement as a property of the window alias as `w.start` and `w.end`, respectively.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
Table table = input
.groupBy("a")
.window(XXX.as("myWin")) // define window alias
.select("a, myWin.start, myWin.end, b.count") // aggregate
.window([Window w].as("w")) // define window with alias w
.groupBy("w, a") // group the table by attribute a and window w
.select("a, w.start, w.end, b.count") // aggregate and add window start and end timestamps
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val table = input
.groupBy('a)
.window(XXX as 'myWin) // define window alias
.select('a, 'myWin.start, 'myWin.end, 'b.count) // aggregate
.window([w: Window] as 'w) // define window with alias w
.groupBy('w, 'a) // group the table by attribute a and window w
.select('a, 'w.start, 'w.end, 'b.count) // aggregate and add window start and end timestamps
{% endhighlight %}
</div>
</div>
Expand Down Expand Up @@ -1117,8 +1120,8 @@ Tumbling windows are defined by using the `Tumble` class as follows:
</tr>
<tr>
<td><code>as</code></td>
<td>Optional.</td>
<td>Assigns an alias to the window that can be used in the following <code>select()</code> clause to access window properties such as window start or end time.</td>
<td>Required.</td>
<td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start or end time in the <code>select()</code> clause.</td>
</tr>
</tbody>
</table>
Expand Down Expand Up @@ -1184,8 +1187,8 @@ Sliding windows are defined by using the `Slide` class as follows:
</tr>
<tr>
<td><code>as</code></td>
<td>Optional.</td>
<td>Assigns an alias to the window that can be used in the following <code>select()</code> clause to access window properties such as window start or end time.</td>
<td>Required.</td>
<td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start or end time in the <code>select()</code> clause.</td>
</tr>
</tbody>
</table>
Expand Down Expand Up @@ -1246,8 +1249,8 @@ A session window is defined by using the `Session` class as follows:
</tr>
<tr>
<td><code>as</code></td>
<td>Optional.</td>
<td>Assigns an alias to the window that can be used in the following <code>select()</code> clause to access window properties such as window start or end time.</td>
<td>Required.</td>
<td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start or end time in the <code>select()</code> clause.</td>
</tr>
</tbody>
</table>
Expand Down
Expand Up @@ -795,14 +795,19 @@ class Table(
* For batch tables of finite size, windowing essentially provides shortcuts for time-based
* groupBy.
*
* __Note__: window on non-grouped streaming table is a non-parallel operation, i.e., all data
* will be processed by a single operator.
* __Note__: Computing windowed aggregates on a streaming table is only a parallel operation
* if additional grouping attributes are added to the `groupBy(...)` clause.
* If the `groupBy(...)` only references a window alias, the streamed table will be processed
* by a single task, i.e., with parallelism 1.
*
* @param groupWindow group-window that specifies how elements are grouped.
* @param window window that specifies how elements are grouped.
* @return A windowed table.
*/
def window(groupWindow: GroupWindow): GroupWindowedTable = {
new GroupWindowedTable(this, Seq(), groupWindow)
def window(window: Window): WindowedTable = {
if (window.alias.isEmpty) {
throw new ValidationException("An alias must be specified for the window.")
}
new WindowedTable(this, window)
}
}

Expand Down Expand Up @@ -855,57 +860,96 @@ class GroupedTable(
val fieldExprs = ExpressionParser.parseExpressionList(fields)
select(fieldExprs: _*)
}
}

class WindowedTable(
private[flink] val table: Table,
private[flink] val window: Window) {

/**
* Groups the records of a table by assigning them to windows defined by a time or row interval.
* Groups the elements by a mandatory window and one or more optional grouping attributes.
* The window is specified by referring to its alias.
*
* For streaming tables of infinite size, grouping into windows is required to define finite
* groups on which group-based aggregates can be computed.
* If no additional grouping attribute is specified and if the input is a streaming table,
* the aggregation will be performed by a single task, i.e., with parallelism 1.
*
* For batch tables of finite size, windowing essentially provides shortcuts for time-based
* groupBy.
* Aggregations are performed per group and defined by a subsequent `select(...)` clause similar
* to SQL SELECT-GROUP-BY query.
*
* @param groupWindow group-window that specifies how elements are grouped.
* @return A windowed table.
* Example:
*
* {{{
* tab.window([window] as 'w)).groupBy('w, 'key).select('key, 'value.avg)
* }}}
*/
def groupBy(fields: Expression*): WindowGroupedTable = {
val fieldsWithoutWindow = fields.filterNot(window.alias.get.equals(_))
if (fields.size != fieldsWithoutWindow.size + 1) {
throw new ValidationException("GroupBy must contain exactly one window alias.")
}

new WindowGroupedTable(table, fieldsWithoutWindow, window)
}

/**
* Groups the elements by a mandatory window and one or more optional grouping attributes.
* The window is specified by referring to its alias.
*
* If no additional grouping attribute is specified and if the input is a streaming table,
* the aggregation will be performed by a single task, i.e., with parallelism 1.
*
* Aggregations are performed per group and defined by a subsequent `select(...)` clause similar
* to SQL SELECT-GROUP-BY query.
*
* Example:
*
* {{{
* tab.window([window].as("w")).groupBy("w, key").select("key, value.avg")
* }}}
*/
def window(groupWindow: GroupWindow): GroupWindowedTable = {
new GroupWindowedTable(table, groupKey, groupWindow)
def groupBy(fields: String): WindowGroupedTable = {
val fieldsExpr = ExpressionParser.parseExpressionList(fields)
groupBy(fieldsExpr: _*)
}

}

class GroupWindowedTable(
class WindowGroupedTable(
private[flink] val table: Table,
private[flink] val groupKey: Seq[Expression],
private[flink] val window: GroupWindow) {
private[flink] val groupKeys: Seq[Expression],
private[flink] val window: Window) {

/**
* Performs a selection operation on a windowed table. Similar to an SQL SELECT statement.
* Performs a selection operation on a window grouped table. Similar to an SQL SELECT statement.
* The field expressions can contain complex expressions and aggregations.
*
* Example:
*
* {{{
* groupWindowTable.select('key, 'window.start, 'value.avg + " The average" as 'average)
* windowGroupedTable.select('key, 'window.start, 'value.avg as 'valavg)
* }}}
*/
def select(fields: Expression*): Table = {
// get group keys by removing window alias

val (aggNames, propNames) = extractAggregationsAndProperties(fields, table.tableEnv)

val projectsOnAgg = replaceAggregationsAndProperties(
fields, table.tableEnv, aggNames, propNames)

val projectFields = (table.tableEnv, window) match {
// event time can be arbitrary field in batch environment
case (_: BatchTableEnvironment, w: EventTimeWindow) =>
extractFieldReferences(fields ++ groupKey ++ Seq(w.timeField))
extractFieldReferences(fields ++ groupKeys ++ Seq(w.timeField))
case (_, _) =>
extractFieldReferences(fields ++ groupKey)
extractFieldReferences(fields ++ groupKeys)
}

new Table(table.tableEnv,
Project(
projectsOnAgg,
WindowAggregate(
groupKey,
groupKeys,
window.toLogicalWindow,
propNames.map(a => Alias(a._1, a._2)).toSeq,
aggNames.map(a => Alias(a._1, a._2)).toSeq,
Expand All @@ -915,13 +959,13 @@ class GroupWindowedTable(
}

/**
* Performs a selection operation on a group-windows table. Similar to an SQL SELECT statement.
* Performs a selection operation on a window grouped table. Similar to an SQL SELECT statement.
* The field expressions can contain complex expressions and aggregations.
*
* Example:
*
* {{{
* groupWindowTable.select("key, window.start, value.avg + ' The average' as average")
* windowGroupedTable.select("key, window.start, value.avg as valavg")
* }}}
*/
def select(fields: String): Table = {
Expand Down

0 comments on commit 6bf556e

Please sign in to comment.