Skip to content

Conversation

@sunjincheng121
Copy link
Member

What is the purpose of the change

This pull request ports window flatAggregate from Flink planner to Blink planner.

The changes mainly include two modules: blink-planner and blink-runtime. The code of the API part can be reused as the two planner share the same API.

Brief change log

This pr is further split into the following 4 commits:

  1. Add plan support for table aggregate. 0e875a2
    This commit adds RelNodes(Logical, FlinkLogical and StreamExec) and Rules in blink planner. The execution procedures are:
    First, the WindowAggregate QueryOperation node in API level will be converted to a LogicalWindowTableAggregate relnode in QueryOperationConverter.
    Second, the LogicalWindowTableAggregate will be converted to a FlinkLogicalWindowTableAggregate by the rule in logical optimization phase.
    Third, the FlinkLogicalWindowTableAggregate will be converted to a StreamExecGroupWindowTableAggregate by the rule in physical optimization phase.

  2. Add runtime support for table aggregate. a1120036
    This commit adds codegen and window operator support for window table aggregete.
    The AggCodeGen generates a NamespaceTableAggsHandleFunction which is used in the window operator.
    For window operator, this commit refactors the previous window operator by adding a base window operator(WindowOperator) and two sub window operators(AggregateWindowOperator and TableAggregateWindowOperator).
    Additionally, to make the refactor possible, this commit also change the window processing function(InternalWindowProcessFunction) by changing the getWindowAggregationResult method to prepareAggregateAccumulatorForEmit.
    The prepareAggregateAccumulatorForEmit method returns the accumulator, with which the AggregateWindowOperator can use getValue to return result and the TableAggregateWindowOperator can use emitValue to return results.

  3. Add window operator tests for window table aggregate. 686ab9df
    Change the WindowOperatorTest into a parameterized test which can test both aggregate and table aggregate.
    To simplify the testing logic, the table aggregate outputs same value with the aggregate except that the table aggregate outputs two same records each time.

  4. Support WindowTableAggregate in some MetadataHandler. 9a5651f1d
    MetadataHandler is used to provide Metadata of a relational expression. For example, provide the unique key information of a relnode. The information can be used during optimization.
    This commit adds WindowTableAggregate support in FlinkRelMdColumnInterval, FlinkRelMdFilteredColumnInterval and FlinkRelMdModifiedMonotonicity. The absence of WindowTableAggregate in other MetadataHandlers mainly because:

  • The WindowTableAggregate can not provide uniqueness information which is required in FlinkRelMdColumnUniqueness, FlinkRelMdDistinctRowCount, FlinkRelMdUniqueGroups, FlinkRelMdUniqueKeys.
  • Most of the MetadataHandlers are not been used in streaming jobs,such as FlinkRelMdPercentageOriginalRows,FlinkRelMdPopulationSize, FlinkRelMdRowCount, etc.

BTW: I do not add the emitpolicy for window aggregate just we want align with Flink Planner.

Verifying this change

This change added tests and can be verified as follows:

  • WindowOperatorContractTest
  • WindowOperatorTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): ( no )
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 8, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 0df9723 (Fri Aug 23 10:16:20 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ✅ 1. The [description] looks good.
  • ✅ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ✅ 4. The change fits into the overall [architecture].
  • ✅ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@sunjincheng121
Copy link
Member Author

@flinkbot attention @JingsongLi @wuchong @hequn8128 @aljoscha I appreciate if you can have look at this PR :)

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 8, 2019

CI report:

@wuchong
Copy link
Member

wuchong commented Aug 9, 2019

I would suggest to modify the component name in commit message a bit. How about:

[FLINK-13473][table-planner-blink] Support windowed TableAggregate in some MetadataHandle
[FLINK-13473][table-runtime-blink] Add tests for window operator
[FLINK-13473][table-blink] Add runtime support for windowed flatAggregat on blink planner
[FLINK-13473][table-planner-blink] Add plan support for windowed flatAggregate on blink planner

And for the pull request title, I would suggest to use "[table-blink]" because it doesn't contain API changes.

What do you think?

@sunjincheng121
Copy link
Member Author

Thanks for the review @wuchong !
This commit will be squashed when merge, So I think [table-blink] is make senses for me.

@sunjincheng121 sunjincheng121 changed the title [FLINK-13473][table] Add stream Windowed FlatAggregate support for blink planner [FLINK-13473][table-blink] Add stream Windowed FlatAggregate support for blink planner Aug 9, 2019
Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sunjincheng121 , just left some minor comments.

if (isWindow) {
// no need to bind input
val exprGenerator = new ExprCodeGenerator(ctx, INPUT_NOT_NULL)
var valueExprs = Seq[GeneratedExpression]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use val valueExprs = getWindowExpressions(windowProperties)? Because windowProperties is empty when hasNamespace is false.

val COLLECTOR_TERM = "out"
val MEMBER_COLLECTOR_TERM = "convertCollector"
val CONVERT_COLLECTOR_TYPE_TERM = "ConvertCollector"
val KEY_TERM = "groupKey"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you can name it NAMESPACE_TABLE_AGG_KEY

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, and how about NAMESPACE_TABLE_AGG_KEY_TERM. It looks more consistent with other variables which end with _TERM.


val builder = getWindowOperatorBuilder(inputFields, timeIdx)
builder
.aggregate(aggsHandler, accTypes, aggValueTypes, windowPropertyTypes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should window table agg support withAllowedLateness?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ou are right. We can support withAllowedLateness for table aggregate. However, there are two reasons that it is not supported in this PR.

  • This PR mainly dedicate to port table aggregate from flink to blink. It maybe good to keep both align with each other first.
  • withAllowedLateness is not a simple change for table aggregate. We also need to think about how to support emitStrategy in Table API.
    So, in this PR we just align with flink planner table aggregate, What do you think?

/**
* A {@link WindowOperator} that dedicates for group aggregate.
*
* <p>When an element arrives it gets assigned a key using a {@link KeySelector} and it gets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

repeated comments? can we remove it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we remove this paragraph, the next paragraph("Each pane gets its own instance...") would be hard to understand.
Maybe we can keep this paragraph, or remove these two paragraphs in the base class? What do you think?

/**
* A {@link WindowOperator} that dedicates for group table aggregate.
*
* <p>When an element arrives it gets assigned a key using a {@link KeySelector} and it gets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

repeated comments? can we remove it?

public WindowOperator build() {
checkNotNull(trigger, "trigger is not set");
if (generatedAggregateFunction != null && generatedEqualiser != null) {
if (generatedTableAggregateFunction != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a lot of branches, can we abstract this builder to multiple builders?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. We can make the WindowOperatorBuilder as a base builer and add two more builders for aggregate and table aggregate.
What do you think?

@sunjincheng121
Copy link
Member Author

@JingsongLi Thanks for the review, I have update the PR, and I also have been shared my throughs on your comments. :)
please let me know what do you think?

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sunjincheng121 , I left some comments.


override def replaceInputNode(
ordinalInParent: Int,
newInputNode: ExecNode[StreamPlanner, _]): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent

}

override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[BaseRow] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent

val window: LogicalWindow,
namedProperties: Seq[PlannerNamedWindowProperty],
inputTimeFieldIndex: Int,
val emitStrategy: Option[WindowEmitStrategy],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think window TableAggregate can also have emit strategy. Because we also pass the sendRetraction and allowedLateness parameter into TableAggregateWindowOperator.

If we don't support it currently, we can throw exception if an emit strategy is defined when translateToPlan . So that we can keep the physical node more consistent for agg and tableAgg.

* .build();
* </pre>
*/
public final class AggregateWindowOperatorBuilder extends WindowOperatorBuilder {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to avoid introducing AggregateXXX and TableAggregateXXX for all window operator relative classes. For the WindowOperatorBuilder, I think we can keep only one WindowOperatorBuilder entry point, and return an inner AggregateWindowOperatorBuilder/TableAggregateWindowOperatorBuilder class when corresponding aggregate(...) is called.

So that, users don't need to choose builder, but separate the build() logic. What do you think ?

return new AggregateWindowOperatorBuilder();
}

public AggregateWindowOperatorBuilder withAllowedLateness(Duration allowedLateness) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can push withAllowedLateness into the base builder, because table aggregate can also support it. We can throw an exception in build() if allowedLateness is specified for now.

generator
}

protected def enrichWindowOperatorBuilder(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to finish the build logic in one place, rather than passing a half-work builder here and there.
Can we use a if-else to generate the window operator in the base class depends on whether it is a table aggregate? like what we do in DataStreamGroupWindowAggregateBase?

Copy link
Contributor

@aljoscha aljoscha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good as far as I can see. But I don't have much familiarity with the Blink code. I had two comments about the wording of javadocs.

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A {@link WindowOperator} that dedicates for group aggregate.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* A {@link WindowOperator} that dedicates for group aggregate.
* A {@link WindowOperator} for grouped window aggregates.

import org.apache.flink.util.Collector;

/**
* A {@link WindowOperator} that dedicates for group table aggregate.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* A {@link WindowOperator} that dedicates for group table aggregate.
* A {@link WindowOperator} for grouped and windowed table aggregates.

@sunjincheng121
Copy link
Member Author

Hi @wuchong and @aljoscha, thanks a lot for your nice review. 👍
Glad to see the nice suggestions. I have addressed all your comments and updated the PR.
Would be great if you can take another look! ;)

Best, Jincheng

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @sunjincheng121 . Looks good from my side.

@sunjincheng121
Copy link
Member Author

Thanks for the review @JingsongLi @wuchong @aljoscha
@flinkbot approve all
Merging...

@JingsongLi
Copy link
Contributor

LGTM

@asfgit asfgit closed this in 29f6c3f Aug 12, 2019
becketqin pushed a commit to becketqin/flink that referenced this pull request Aug 17, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants