Skip to content

[FLINK-10976][table] Add Aggregate operator to Table API#8311

Closed
hequn8128 wants to merge 5 commits intoapache:masterfrom
hequn8128:flip29_aggregate2
Closed

[FLINK-10976][table] Add Aggregate operator to Table API#8311
hequn8128 wants to merge 5 commits intoapache:masterfrom
hequn8128:flip29_aggregate2

Conversation

@hequn8128
Copy link
Contributor

What is the purpose of the change

This pull request add row-based aggregate function on Table API. Note: this pr based on the #7235. Thank you @dianfu for your excellent work.

Brief change log

  • Add row-based aggregate function on Table API.
  • Add tests.
  • Add docs.

Verifying this change

This change added tests and can be verified as follows:

  • Added tests in AggregateTest, AggregateITCase, AggregateStringExpressionTest, AggregateValidationTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): ( no)
  • The serializers: (no )
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs)

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 29, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ✅ 1. The [description] looks good.
  • ✅ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ✅ 4. The change fits into the overall [architecture].
  • ✅ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@rmetzger
Copy link
Contributor

@flinkbot approve description

* AggregateFunction aggFunc = new MyAggregateFunction()
* tableEnv.registerFunction("aggFunc", aggFunc);
* table.aggregate("aggFunc(a, b) as (f0, f1, f2)")
* .select("key, f0, f1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

select("f0, f1")?

}

/**
* The implementation of a [[AggregatedTable]] that has been grouped on a set of grouping keys.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a -> an
that has been performed on an aggregate function.

* The implementation of a [[AggregatedTable]] that has been grouped on a set of grouping keys.
*/
class AggregatedTableImpl(
private[flink] val table: Table,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation

}

def rowBasedAggregate(
groupingExpressions: JList[Expression],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation

: TableOperation = {
// resolve for java string case, i.e., turn LookupCallExpression to CallExpression.
val resolver = resolverFor(tableCatalog, functionCatalog, child).build
val resolvedAggregate = resolveSingleExpression(aggregate, resolver)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use resolveExpression?

tableEnv.registerFunction("myAggFunc", myAggFunc);
Table table = input
.groupBy("key")
.aggregate("myAggFunc(a, b) as (x, y, z)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as (x, y)

}
}

def resetAccumulator(acc: MyMinMaxAcc): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Scala example has resetAccumulator, while the Java example has not.

}

@Test(expected = classOf[ValidationException])
def testTableFunctionInSelection(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test failed with the exception:
org.apache.flink.table.api.ValidationException: Given parameters of function 'func' do not match any signature.
Actual: (java.lang.Long)
Expected: (java.lang.String)


table
.groupBy('a)
// must fail. Only AggregateFunction can be used in aggregate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception message is not friendly for users:
org.apache.flink.table.api.ValidationException: Invalid arguments [log(b), 'd'] for function: as

util.tableEnv.registerFunction("func", new TableFunc0)
table
.groupBy('a)
// must fail. Only AggregateFunction can be used in aggregate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only AggregateFunction -> Only one AggregateFunction

@dianfu
Copy link
Contributor

dianfu commented May 5, 2019

@hequn8128 Thanks a lot for the PR. LGTM overall with just a few comments.

@sunjincheng121
Copy link
Member

@flinkbot approve-until architecture
I am glad to review this change after you rebase the code. :) @hequn8128
Best, Jincheng

@sunjincheng121 sunjincheng121 self-requested a review May 5, 2019 08:03
@sunjincheng121 sunjincheng121 self-assigned this May 5, 2019
@hequn8128 hequn8128 force-pushed the flip29_aggregate2 branch from 2ccf79b to 09e3e73 Compare May 5, 2019 08:32
@hequn8128
Copy link
Contributor Author

@sunjincheng121 @dianfu Thanks a lot for your review. I have updated the PR and rebased to the master.

Best, Hequn

@dianfu
Copy link
Contributor

dianfu commented May 5, 2019

@hequn8128 Thanks for the update. LGTM. +1

@sunjincheng121
Copy link
Member

@flinkbot approve-until architecture

Copy link
Member

@sunjincheng121 sunjincheng121 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @hequn8128
I only left some suggestions.
Best,
Jincheng

/**
* Performs an aggregate operation with an aggregate function. Use this before a selection
* to perform the selection operation. The output will be flattened if the output type is a
* composite type.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use this before a selection to perform the selection operation. -> You have to close the "aggregate" with a select statement ?


/**
* Performs a global aggregate operation with an aggregate function. Use this before a selection
* to perform the selection operation. The output will be flattened if the output type is a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as GroupedTable.

aggregate(ExpressionParser.parseExpression(aggregateFunction))
}

override def aggregate(aggregateFunction: Expression): AggregatedTable = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we consistency the name format for aggregateFunction and tableAggFunction?
I suggest using the complete word, i.e.: Agg->Aggregate? What do you think?


override def select(fields: Expression*): Table = {
new TableImpl(tableImpl.tableEnv,
tableImpl.operationTreeBuilder.project(fields,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can unify aggregate and flatAggregate implementations. i.e. both AggregatedTableImpl#select and FlatAggregateTableImpl#select can usingoperationTreeBuilder.project or xxTable.select(fields: _*). What do you think?

aggregateOperationFactory.createAggregate(resolvedGroupings, resolvedAggregates, child)
}

def rowBasedAggregate(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can rowBasedAggregate -> aggregate and add some comments to illustrate the difference between the two aggregate methods, What do you think?

verifyTableEquals(resJava, resScala)
}

def testNonGroupedAggregate2(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testNonGroupedAggregate2->testRowBasedNonGroupedAggregate?

@hequn8128
Copy link
Contributor Author

@sunjincheng121 Thanks a lot for your review and suggestions. I have updated the PR according to your comments.

Best, Hequn

@sunjincheng121
Copy link
Member

LGTM. +1 to merged
@flinkbot approve all

sunjincheng121 pushed a commit to sunjincheng121/flink that referenced this pull request May 7, 2019
@asfgit asfgit closed this in 560b2bd May 7, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants