Skip to content

Conversation

@hequn8128
Copy link
Contributor

@hequn8128 hequn8128 commented May 7, 2019

What is the purpose of the change

Goal

This PR supports (streaming, window) flatAggregate on Table API.

Background

FLINK-10977 add streaming non-window flatAggregate on TableAPI. The behavior of table aggregates is most like GroupReduceFunction did, which computed for a group of elements, and output a group of elements. The TableAggregateFunction can be applied on Table/GroupedTable.

Motivation

For group window cases, it would be nice if we can also support flatAggregate. This means we can perform flatAggregate on Tumble/Slide/Session Windows. The API looks like:

table.window(Tumble.over("5.minutes").on("rowtime").as("w")) // define window
    .groupBy("a, w") // group by key and window
    .flatAggregate("tableAggFunc(b) as (x, y)")

Solution

This PR adds flatAggregate API on WindowGroupedTable and adds window table aggregate logical node for the plan optimization. For the runtime code, window table aggregate reused most of the code with window aggregate and non-window table aggregate.

The difference between window aggregate and window table aggregate is, for window table aggregate, the runtime aggregate function(org.apache.flink.table.runtime.aggregate.AggregateAggFunction) returns both accumulator and function to the window function and emit value in it while for window aggregate, the runtime aggregate function returns the final results to the window function. This is because we can't emit results in the runtime aggregate function for table aggregate as there is no collector to be used.

Brief change log

  • 494d356 Add flatAggregate API on WindowGroupedTable. Add java and scala API. In this commit, the implementation has not been implemented.
  • 9990a47 Resolve expression and build LogicalWindowTableAggregate. TableAggregate is added to make it different from Aggregate. The two kinds of RelNode contain different semantics. For Aggregate, it represents a relational operator that eliminates duplicates and computes totals while for TableAggregate, it can output 0 or more records for a group.
  • 3b2e2bb Add Plan support for window table aggregate. Logical&Physical nodes and rules are added in the commit.
  • e1a3ed4 Add runtime support for window table aggregate. This commit reuses the code of window aggregate and adds support for window table aggregate.
  • 031a6e7 Add docs for window table aggregate. Also rename Table Aggregate to FlatAggregate as it is more consistent with the API for users.

Verifying this change

This change added tests and can be verified as follows:

  • Added plan test(GroupWindowTableAggregateTest) to verify plan
  • Add IT case(GroupWindowTableAggregateITCase) to verify code gen and runtime logic
  • Add GropuWindowTableAggregateStringExpressionTest to test java.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs)

@flinkbot
Copy link
Collaborator

flinkbot commented May 7, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ✅ 1. The [description] looks good.
  • ✅ 2. There is [consensus] that the contribution should go into to Flink.
  • ❗ 3. Needs [attention] from.
  • ✅ 4. The change fits into the overall [architecture].
  • ✅ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@sunjincheng121
Copy link
Member

@flinkbot approve consensus

@sunjincheng121
Copy link
Member

@flinkbot attention @aljoscha @twalthr @dawidwys

Copy link
Member

@sunjincheng121 sunjincheng121 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your great Job! @hequn8128
I just finish 2 commits review and left a few comments.
please let me know what do you think about the suggestions.

Thanks,
Jincheng

* }
* </pre>
*/
FlatAggregateTable flatAggregate(String tableAggregateFunction);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add the JavaDoc for WindowGroupedTable in FlatAggregateTable. e.g.:
A table that performs flatAggregate on a {@link Table}, a {@link GroupedTable} or a {@link WndowGroupedTable}.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more comment, we should make the change: Similar to an SQL SELECT -> Similar to a SQL SELECT in FlatAggregateTable JavaDoc as well.

private val tableImpl = table.asInstanceOf[TableImpl]

override def select(fields: String): Table = {
select(ExpressionParser.parseExpressionList(fields).asScala: _*)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove .asScala?

}

override def select(fields: Expression*): Table = {
val expressionsWithResolvedCalls = fields.map(_.accept(tableImpl.callResolver)).asJava
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asJava is very important for IDE check, but we do not have to add asJava for the functionality, is that correct? due to we already import the JavaConversions._.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I think we can remove it.

tableImpl.getUniqueAttributeSupplier)

override def flatAggregate(tableAggregateFunction: Expression): FlatAggregateTable = ???
if (!extracted.getAggregations.isEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extracted.getAggregations.nonEmpty is better than !extracted.getAggregations.isEmpty, right?

override def flatAggregate(tableAggregateFunction: Expression): FlatAggregateTable = ???
if (!extracted.getAggregations.isEmpty) {
throw new ValidationException("The select statement after flatAggregate does not support " +
"aggregate functions.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregate functions cannot be used in the select statement right after flatAggregate ?

aggregates.stream().flatMap(expr -> {
if (isTableAggregate) {
return Stream.of(UserDefinedFunctionUtils.getFieldInfo(
((AggregateFunctionDefinition) ((CallExpression) expr).getFunctionDefinition())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest defining a new method for extract user-defined table aggregate function expression names, so we can share the logic for both createWindowAggregate and createAggregate, or we can extract a method for both extractName and extractudtafNames, such as:

extractNames/extractTypes(expr){

if (isTableAggregate) {
					return Stream.of(extractUdtafName(expr));
				} else {
					return Stream.of(extractName(expr).orElseGet(expr::toString));
				}

}

what do you think?

.map(expr -> expr.accept(aggregateVisitor))
.collect(toList());
.map(expr -> {
if (isTableAggregate) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if(isTableAggFunctionCall(expr)){...} ?

GroupKey groupKey = relBuilder.groupKey(groupings);
LogicalWindow logicalWindow = toLogicalWindow(windowAggregate.getGroupWindow());
return flinkRelBuilder.aggregate(logicalWindow, groupKey, windowProperties, aggregations).build();
return flinkRelBuilder.aggregate(logicalWindow, groupKey, windowProperties, aggregations, isTableAggregate).build();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And put logic of isTableAggregate into flinkRelBuilder.aggregate?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW: It's better to Unified naming aggregate and windowAggregate, in AggregateOperationFactory and AggregateOperationFactory. What do you think?

AggregateOperationFactory#createAggregate
AggregateOperationFactory#createWindowAggregate

FlinkRelBuilder#aggregate(window: LogicalWindow)
FlinkRelBuilder#aggregate(...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! I think it would be nice if we can make these names consistent.

LogicalAggregate logicalAggregate =
LogicalTableAggregate.getCorrespondingAggregate((LogicalTableAggregate) relNode);
RelNode newAggregate = performExtract(logicalAggregate, input, relBuilder);
Aggregate aggregate =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to add a new performExtract for TableAggregate, in this way the code is more clearly.
We can not use the current preformExtract because of the Aggregate and TableAggregate if different logic node. What do you think?

groupKey: GroupKey,
windowProperties: JList[PlannerExpression],
aggCalls: Iterable[AggCall])
aggCalls: Iterable[AggCall],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can rename aggregate to windowAggregate due to AggregateOperationFactory#createAggregate and AggregateOperationFactory#createWindowAggregate. I am sorry for the aggregate PR not see this issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you make a good point here. I will make them consistent.

@hequn8128 hequn8128 force-pushed the flip29_flatagg_window_split branch from ed284d5 to ef5014d Compare May 10, 2019 14:18
@hequn8128
Copy link
Contributor Author

@sunjincheng121 Thank you very much for your suggestions and meticulous review. I have addressed all your comments and updated the PR. Would be great if you can take another look. Thank you.

Best, Hequn

@rmetzger rmetzger requested review from aljoscha, dawidwys and twalthr May 11, 2019 11:43
@hequn8128 hequn8128 force-pushed the flip29_flatagg_window_split branch from ef5014d to 5a1db3f Compare May 12, 2019 15:38
Copy link
Member

@sunjincheng121 sunjincheng121 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update! @hequn8128
only left a few suggestions!
Best, Jincheng

* Get result types for the aggregate or the table aggregate expression. For a table aggregate,
* it may return multi result types when the composite return type is flattened.
*/
private Stream<TypeInformation<?>> getAggregateResultTypes(PlannerExpression plannerExpression) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about getAggregateResultTypes ->extractAggregateResultTypes, That's because for tableAgg we should flatten the tableAgg expression's type, furthermore, it's more consistence with extractAggregateNames, what do you think?

return aggregateExpression.accept(tableAggregateVisitor);
} else {
return aggregateExpression.accept(aggregateVisitor);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, we also need to check the isAggFunctionCall and throw an exception if false returned from isAggFunctionCall. but I agree it works for the current use case.


private int getWindowTimeFieldIndex(LogicalWindowAggregate aggregate, RelNode input) {
LogicalWindow logicalWindow = aggregate.getWindow();
private int getWindowTimeFieldIndex(LogicalWindow logicalWindow, RelNode input) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we should do this change? I can see one reason is for LogicalWindowTableAggregate, but I do not find out the change about LogicalWindowTableAggregate deal with the time field. Can explain more about this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only do some minor refactor. LogicalWindowTableAggregate.getCorrespondingAggregate returns a LogicalWindowAggregate and the time field will be processed same like LogicalWindowAggregate.


override def explainTerms(pw: RelWriter): RelWriter = {
super.explainTerms(pw)
.itemIf("groupBy", groupingToString(inputSchema.relDataType, grouping), !grouping.isEmpty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!grouping.isEmpty -> grouping.nonEmpty ? just a suggestion. because the implantion of noEmpty is !isEmpty

agg.getGroupSet.toArray)
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Useless change?


<tr>
<td>
<strong>GroupBy Window TableAggregation</strong><br>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GroupBy Window TableAggregation -> FlatAggregate, and I think it again It's better to describe the document from the perspective of an operator? such as: `FlatAggregate and FlatAggregate(Windowed). What do you think?

So did the GroupBy TableAggregation section.

val constantFields = constantExprs.map(addReusableBoxedConstant)

val isTableAggregate = aggregates.length == 1 &&
aggregates(0).isInstanceOf[TableAggregateFunction[_, _]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think aggregates.length == 1 is the requirement of aggreate operator, and there a so many places using the aggregates(0).isInstanceOf[TableAggregateFunction[_, _]], so I suggest add a util method in UserDefinedFunctionUtils. such as: containsTableAggFunction(aggregationList: Seq[UserDefinedAggregateFunction]). which can be using in isTableAggregate. It's that make sense to you?

BTW: I think that as long as the TableAggregateFunciton is included, it is the TableAggregate operation, so there is no need to judge the size of Aggregates here. The judgment of size==1 should be the check done by the logic level(validate step).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, I think we don't need to check the size after validation.

/**
* The collector is used to concat group key and table aggregate function output.
*/
class ConcatKeyAndAggResultCollector(val keyNum: Int) extends CRowWrappingCollector {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is only using in Table Aggregate right? if so It's better to ConcatKeyAndAggResultCollector -> TableAggregateCollector, and the keyNum -> groupKeySize or groupKeyCount, What do you think?

* Aggregate Function used for the aggregate or table aggregate operator in
* [[org.apache.flink.streaming.api.datastream.WindowedStream]].
*
* @param genAggregations Generated aggregate helper function
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregate or table aggregate?

* @param namedAggregates List of calls to aggregate functions and their output field names
* @param inputRowType Input row type
* @param inputFieldTypes Types of the physical input fields
* @param outputType Output type of the aggregate node
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Output type of the aggregate node -> Output type of the (table)aggregate node ? please check all the JavaDoc,for aggregate and tableAggregate, and make the consistency of the descript.

hequn8128 added 5 commits May 14, 2019 20:55
- Add java and scala api. In this commit, the implementation has not been implemented.
…ggregate

- `TableAggregate` is added to make it different from `Aggregate`. The two kinds of RelNode contain different semantics. For `Aggregate`, it represents a relational operator that eliminates duplicates and computes totals while for `TableAggregate`, it can output 0 or more records for a group.
- Logical&Physical nodes and rules are added in the commit.
- This commit reuses the code of window aggregate and add support for window table aggregate. The difference between window aggregate and window table aggregate is, for window table aggregate, the runtime aggregate function(org.apache.flink.table.runtime.aggregate.AggregateAggFunction) returns both accumulator and function to the window function and emit value in it while for window aggregate, the runtime aggregate function returns the final results to the window function. This is because we can't emit results in the runtime aggregate function for table aggregate as there is no collector to be used.
- Also rename `Table Aggregate` to `FlatAggregate` as it is more consistent with the api for users.
@hequn8128 hequn8128 force-pushed the flip29_flatagg_window_split branch from 5a1db3f to f1a68be Compare May 14, 2019 12:59
@hequn8128
Copy link
Contributor Author

@sunjincheng121 Hi, thanks a lot for your review and suggestions. I have updated the PR according to your comments. Could you kindly take another look. Thank you.

Best, Hequn

@sunjincheng121
Copy link
Member

LGTM. +1 to merged

@sunjincheng121
Copy link
Member

@flinkbot approve all

sunjincheng121 pushed a commit to sunjincheng121/flink that referenced this pull request May 17, 2019
Brief change log:
- Add flatAggregate API on WindowGroupedTable. Add java and scala API. In this commit, the implementation has not been implemented.
- Resolve expression and build LogicalWindowTableAggregate. TableAggregate is added to make it different from Aggregate. The two kinds of RelNode contain different semantics. For Aggregate, it represents a relational operator that eliminates duplicates and computes totals while for TableAggregate, it can output 0 or more records for a group.
- Add Plan support for window table aggregate. Logical&Physical nodes and rules are added in the commit.
- Add runtime support for window table aggregate. This commit reuses the code of window aggregate and adds support for window table aggregate.
- Add docs for window table aggregate. Also rename Table Aggregate to FlatAggregate as it is more consistent with the API for users.

This closes apache#8359
@asfgit asfgit closed this in 142462f May 19, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants