Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-23614][table-planner] The resulting scale of TRUNCATE(DECIMAL,… #16740

Merged
merged 1 commit into from Sep 7, 2021

Conversation

paul8263
Copy link
Contributor

@paul8263 paul8263 commented Aug 6, 2021

… ...) is not correct

What is the purpose of the change

Fixe the issue that the resulting scale of TRUNCATE(DECIMAL, ...) is not correct.

Brief change log

  • flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MathFunctionsITCase.java
  • flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java

Verifying this change

This change added tests and can be verified as follows:

  • Added truncate decimal test case in flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MathFunctionsITCase.java

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 6, 2021

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit aceb2a2 (Fri Aug 06 10:17:54 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

new SqlFunction(
"TRUNCATE",
SqlKind.OTHER_FUNCTION,
FlinkReturnTypes.ROUND_FUNCTION_NULLABLE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're reusing this strategy, the name doesn't seem appropriate anymore. We should rename this to something that explains what it's doing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look into this method you'll find that it is calling LogicalTypeMerging#findRoundDecimalType. In that method there is a line stating that

// NOTE: rounding may increase the digits by 1, therefore we need +1 on precisions.
return new DecimalType(false, 1 + precision - scale + round, round);

However for truncate function the number of digits will not increase, thus FlinkReturnTypes.ROUND_FUNCTION_NULLABLE is not the best choice to use here.

What I'll suggest is that you create a new method in LogicalTypeMerging called findTruncateDecimalType. This method and findRoundDecimalType can together reuse some code. Then you might want to create FlinkReturnTypes.TRUNCATE_FUNCTION_NULLABLE which will also reuse a lot of code with ROUND_FUNCTION_NULLABLE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the second review request below, if round and truncate could use the same logic, could I refactor those method name to things like 'findRoundOrTruncateDecimalType'?

Copy link
Contributor

@tsreaper tsreaper Aug 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer "ROUNDING_NULLABLE", as "rounding" is a more general behavior (we can round down or round up), not specified to a function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I'll keep it the same as the round method.

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 6, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

new SqlFunction(
"TRUNCATE",
SqlKind.OTHER_FUNCTION,
FlinkReturnTypes.ROUND_FUNCTION_NULLABLE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look into this method you'll find that it is calling LogicalTypeMerging#findRoundDecimalType. In that method there is a line stating that

// NOTE: rounding may increase the digits by 1, therefore we need +1 on precisions.
return new DecimalType(false, 1 + precision - scale + round, round);

However for truncate function the number of digits will not increase, thus FlinkReturnTypes.ROUND_FUNCTION_NULLABLE is not the best choice to use here.

What I'll suggest is that you create a new method in LogicalTypeMerging called findTruncateDecimalType. This method and findRoundDecimalType can together reuse some code. Then you might want to create FlinkReturnTypes.TRUNCATE_FUNCTION_NULLABLE which will also reuse a lot of code with ROUND_FUNCTION_NULLABLE.

Comment on lines +118 to +126
DataTypes.DECIMAL(8, 2).notNull()),
TestSpec.forFunction(BuiltInFunctionDefinitions.TRUNCATE)
.onFieldsWithData(new BigDecimal("123.456"))
// TRUNCATE(DECIMAL(6, 3) NOT NULL, 2) => DECIMAL(6, 2) NOT NULL
.testResult(
$("f0").truncate(2),
"TRUNCATE(f0, 2)",
new BigDecimal("123.45"),
DataTypes.DECIMAL(6, 2).notNull()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This MathFunctionITCase, as stated in the java docs, is for BuiltInFunctionDefinitions. If you follow the usage of BuiltinFunctionDefinitions you'll see that it is converted to FlinkSqlOperatorTable.TRUNCATE. For Flink SQL scalar functions we always add tests in ScalarFunctionsTest. Please add your tests there.

A thorough test for a function should include all data types it supported as well as their corresponding null values. The tests in ScalarFunctionTest#testTruncate may not be complete so please complete them with all supported data types. See FlinkSqlOperatorTable.TRUNCATE to get all its supported types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @tsreaper ,
Thank you for your advice.
In flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java there is an implementation of truncating method designed for DecimalData:

    public static DecimalData struncate(DecimalData b0, int b1) {
        if (b1 >= b0.scale()) {
            return b0;
        }

        BigDecimal b2 =
                b0.toBigDecimal()
                        .movePointRight(b1)
                        .setScale(0, RoundingMode.DOWN)
                        .movePointLeft(b1);
        int p = b0.precision();
        int s = b0.scale();

        if (b1 < 0) {
            return DecimalData.fromBigDecimal(b2, Math.min(38, 1 + p - s), 0);
        } else {
            return DecimalData.fromBigDecimal(b2, 1 + p - s + b1, b1);
        }
    }

It uses the same logic as the round method.

After I thought over this issue, I suggest that we should add 1 on precision (same logic as round method). If we did not do that, for example, given a number f1 0.333 with the type of DECIMAL(3, 3), if we call truncate(f1, 0), the precision of the result would be 0, which would trigger an exception of 'Decimal precision must be between 1 and 38 (both inclusive)'.

Those info above is my point of view. If you have any suggestion, please comment below. Thanks a lot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. With this in mind we shall keep using FlinkReturnTypes.ROUND_FUNCTION_NULLABLE instead of modifying it. Still the test cases should be moved to the appropriate place. You can also add this special "truncate to zero" test case in your test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @tsreaper ,
I added several "truncated to zero" test cases.

@paul8263
Copy link
Contributor Author

@flinkbot run azure

@tsreaper
Copy link
Contributor

I've checked this issue again and found that adding tests to ScalarFunctionsTest.scala is not enough. This is because function test use the generated function code directly and will not check for the return type of the function. So you may also need to add some tests in org.apache.flink.table.planner.runtime.batch.sql.CalcITCase.

Also there are CI failures. Please keep an eye on the comment of @flinkbot for the results. If there are CI failures then the PR will never be merged.

@paul8263 paul8263 force-pushed the FLINK-23614 branch 2 times, most recently from d7e70d6 to 56b3375 Compare August 12, 2021 06:21
Copy link
Contributor

@tsreaper tsreaper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. @JingsongLi any other thoughts?

@tsreaper
Copy link
Contributor

Hi @paul8263 , this PR is conflicting with the master branch. Please resolve the conflicts.

@paul8263
Copy link
Contributor Author

Hi @tsreaper ,
I solved the conflicts in 9296b4e.
When I ran the unit tests I encountered an error: org.apache.flink.changelog.fs.FsStateChangelogStorageFactory in org.apache.flink.changelog.fs is not public, cannot access it from external packages.
In FLINK-23279, org.apache.flink.changelog.fs.FsStateChangelogStorageFactory was added in StreamFaultToleranceTestBase.java. It seems that changes in other commits might lead to the test failure.

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me!

@JingsongLi JingsongLi merged commit e55e664 into apache:master Sep 7, 2021
godfreyhe pushed a commit that referenced this pull request Oct 29, 2021
… is not correct

This closes #16740

(cherry picked from commit e55e664)
niklassemmler pushed a commit to niklassemmler/flink that referenced this pull request Feb 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants