Skip to content

Conversation

matriv
Copy link
Contributor

@matriv matriv commented Dec 16, 2021

What is the purpose of the change

Fix precision for agg functions on decimal types.

Brief change log

  • Small code optimisations for aggregate functions
  • Fix existing test for SumWithRetractAggFunction which didn't actually use it but instead used the normal SumAggFunction.
  • Since Sum0AggFunction,SumWithRetractAggFunction and AvgAggFunction are using internally plus() and minus() operators to implement the sum and avg aggregation (minus() is used also for the WithRetract), the decimal return type calculated by LogicalTypeMerging#findSumAggType() (also for the AvgAggFunctions internal SumType) gets overriden by the calculation for the plus() (and/or minus()) operator done by LogicalTypeMerging#findAdditionDecimalType(). To prevent this add a special aggDecimalMinus() operator and use it together with the previously added aggDecimalPlus() in those aggregate functions to avoid overriding the calculated precision of their decimal return type.

Verifying this change

This change added tests and can be verified as follows:

  • Added tests to AggregateITCase.scala both for SQL and TableApi.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 6dcc619 (Thu Dec 16 15:35:15 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 16, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @matriv. I had only minor comments. Should be good in the next iteration.

* on a Decimal type. Uses the {@link LogicalTypeMerging#findSumAggType(LogicalType)} to avoid
* the normal {@link #PLUS} override the special calculation for precision and scale needed by
* SUM.
* Special "+" operator used internally by {@code SumAggFunction}, {@code Sum0AggFunction},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove IncrSumAggFunction. how about we keep this section a bit more generic to not update it whenever we update the implementation? maybe just used internally by for implementing SUM/AVG aggregations (with and without retractions)

return ifThenElse(equalTo(count, literal(0L)), ifTrue, ifFalse);
}

protected UnresolvedCallExpression doPlus(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doPlus sounds a bit generic, how about specializedPlus or adjustedPlus?

tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink)
env.execute()

val expected = List("6.41671935,65947.23071935707000000000,609.02867403703699700000")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment to explain the result? for me as a reviewer that is not super familiar with the topic anymore, it is difficult to see something in those numbers

@twalthr
Copy link
Contributor

twalthr commented Dec 20, 2021

Btw the issue number is incorrect for both the PR and commit

@matriv matriv changed the title [FLINK-25304][table-common][table-planner] Fix precision for aggs on DECIMAL types [FLINK-24809][table-common][table-planner] Fix precision for aggs on DECIMAL types Dec 20, 2021
@matriv
Copy link
Contributor Author

matriv commented Dec 20, 2021

Btw the issue number is incorrect for both the PR and commit

Thx, fixing.

@matriv
Copy link
Contributor Author

matriv commented Dec 20, 2021

@cshuo @JingsongLi Could you please also take a look at this one?

@matriv
Copy link
Contributor Author

matriv commented Dec 21, 2021

@flinkbot run azure

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Looks good to me overall. Left minor comments.

changelogRow("+I", Byte.box(3), Short.box(3), Int.box(3), Long.box(3),
Float.box(3.0F), Double.box(3.0), "a"))

val upsertSourceDataId = registerData(upsertSourceCurrencyData);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: remove ";"

return literal(0);
}

protected UnresolvedCallExpression adjustedPlus(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

override?

return aggDecimalPlus(arg1, arg2);
}

protected UnresolvedCallExpression adjustedMinus(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@twalthr
Copy link
Contributor

twalthr commented Dec 22, 2021

@flinkbot run azure

@matriv matriv force-pushed the FLINK-25304 branch 2 times, most recently from 7f37df4 to dee9315 Compare December 27, 2021 10:07
Apply some small code optimisations for the build-in aggregate functions.
Use `TestValuesTableFactory` to setup the source changelog stream any make
sure that the SQL query will generate `SumWithRetractAggFunction`  to
implement sum aggregations.
…DECIMAL types

Since `Sum0AggFunction`,`SumWithRetractAggFunction` and `AvgAggFunction` are
 using internally `plus()` and `minus()` operators to implement the sum and avg
aggregation (`minus()` is used also for the `WithRetract`), the decimal return
type calculated by `LogicalTypeMerging#findSumAggType()` (also for the
`AvgAggFunction`s internal `SumType`) gets overriden by the calculation for
the `plus()` (and/or `minus()`) operator done by
`LogicalTypeMerging#findAdditionDecimalType()`. To prevent this add a special
`aggDecimalMinus()` operator and use it together with the previously added
 `aggDecimalPlus()` in those aggregate functions to avoid overriding the
calculated precision of their decimal return type.

Follows: apache#17634
@twalthr twalthr closed this in ec893d2 Dec 30, 2021
@matriv matriv deleted the FLINK-25304 branch January 17, 2022 08:14
niklassemmler pushed a commit to niklassemmler/flink that referenced this pull request Feb 3, 2022
…DECIMAL types

Since `Sum0AggFunction`,`SumWithRetractAggFunction` and `AvgAggFunction` are
 using internally `plus()` and `minus()` operators to implement the sum and avg
aggregation (`minus()` is used also for the `WithRetract`), the decimal return
type calculated by `LogicalTypeMerging#findSumAggType()` (also for the
`AvgAggFunction`s internal `SumType`) gets overriden by the calculation for
the `plus()` (and/or `minus()`) operator done by
`LogicalTypeMerging#findAdditionDecimalType()`. To prevent this add a special
`aggDecimalMinus()` operator and use it together with the previously added
 `aggDecimalPlus()` in those aggregate functions to avoid overriding the
calculated precision of their decimal return type.

See also FLINK-24691.

This closes apache#18135.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants