Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32816][SQL] Fix analyzer bug when aggregating multiple distinct DECIMAL columns #29673

Closed
wants to merge 5 commits into from

Conversation

linhongliu-db
Copy link
Contributor

What changes were proposed in this pull request?

This PR fixes a conflict between RewriteDistinctAggregates and DecimalAggregates.
In some cases, DecimalAggregates will wrap the decimal column to UnscaledValue using
different rules for different aggregates.

This means, same distinct column with different aggregates will change to different distinct columns
after DecimalAggregates. For example:
avg(distinct decimal_col), sum(distinct decimal_col) may change to
avg(distinct UnscaledValue(decimal_col)), sum(distinct decimal_col)

We assume after RewriteDistinctAggregates, there will be at most one distinct column in aggregates,
but DecimalAggregates breaks this assumption. To fix this, we have to switch the order of these two
rules.

Why are the changes needed?

bug fix

Does this PR introduce any user-facing change?

no

How was this patch tested?

added test cases

Comment on lines 2561 to 2565
spark.range(0, 100, 1, 1)
.selectExpr("id", "cast(id as decimal(9, 0)) as decimal_col")
.write.mode("overwrite")
.parquet(path.getAbsolutePath)
spark.read.parquet(path.getAbsolutePath).createOrReplaceTempView("test_table")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we need not to write parquet.

Copy link
Contributor

@TJX2014 TJX2014 Sep 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val df = spark.range(0, 50000, 1, 1).selectExpr("id", "cast(id as decimal(9, 0)) as ss_ext_list_price")
df.createOrReplaceTempView("test_table")
sql("select avg(distinct ss_ext_list_price), sum(distinct ss_ext_list_price) from test_table").explain
seems enough to reproduce.

@linhongliu-db
Copy link
Contributor Author

@cloud-fan

@@ -196,6 +195,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
EliminateSorts) :+
Batch("Decimal Optimizations", fixedPoint,
DecimalAggregates) :+
Batch("Distinct Aggregate Rewrite", Once,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment to say: this batch must be run after "Decimal Optimizations", as "Decimal Optimizations" may change the aggregate distinct column?

@SparkQA
Copy link

SparkQA commented Sep 8, 2020

Test build #128394 has finished for PR 29673 at commit b1ce4b5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 9, 2020

Test build #128428 has finished for PR 29673 at commit 4df4f7c.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

test("SPARK-32816: aggregating multiple distinct DECIMAL columns") {
spark.range(0, 100, 1, 1)
.selectExpr("id", "cast(id as decimal(9, 0)) as decimal_col")
.createOrReplaceTempView("test_table")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wrap the test with withTempView

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about writing it like this w/o a temp view;

  test("SPARK-32816: aggregating multiple distinct DECIMAL columns") {
    checkAnswer(
      sql(
        s"""
           |SELECT AVG(DISTINCT decimal_col), SUM(DISTINCT decimal_col)
           |  FROM VALUES (CAST(1 AS DECIMAL(9, 0))) t(decimal_col)
         """.stripMargin),
      Row(XXX, XXX))
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Also, could you move this test into SQLQueryTestSuite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me try this, I'm not sure if literal will behave differently

withTempView("test_table") {
spark.range(0, 100, 1, 1)
.selectExpr("id", "cast(id as decimal(9, 0)) as decimal_col")
.createOrReplaceTempView("test_table")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you follow #29673 (comment) ? That's a better idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

@SparkQA
Copy link

SparkQA commented Sep 10, 2020

Test build #128480 has finished for PR 29673 at commit 737f996.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

|SELECT AVG(DISTINCT decimal_col), SUM(DISTINCT decimal_col)
| FROM VALUES (CAST(1 AS DECIMAL(9, 0))) t(decimal_col)
""".stripMargin),
Row(1, 1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the test group-by.sql looks enough for this issue, so could you remove this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I forgot to remove this.

Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM excep for one minor comment.

@SparkQA
Copy link

SparkQA commented Sep 10, 2020

Test build #128504 has finished for PR 29673 at commit f2111df.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 11, 2020

Test build #128540 has finished for PR 29673 at commit 8510ff9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 40ef5c9 Sep 16, 2020
linhongliu-db added a commit to linhongliu-db/spark that referenced this pull request Oct 15, 2020
…t DECIMAL columns

This PR fixes a conflict between `RewriteDistinctAggregates` and `DecimalAggregates`.
In some cases, `DecimalAggregates` will wrap the decimal column to `UnscaledValue` using
different rules for different aggregates.

This means, same distinct column with different aggregates will change to different distinct columns
after `DecimalAggregates`. For example:
`avg(distinct decimal_col), sum(distinct decimal_col)` may change to
`avg(distinct UnscaledValue(decimal_col)), sum(distinct decimal_col)`

We assume after `RewriteDistinctAggregates`, there will be at most one distinct column in aggregates,
but `DecimalAggregates` breaks this assumption. To fix this, we have to switch the order of these two
rules.

bug fix

no

added test cases

Closes apache#29673 from linhongliu-db/SPARK-32816.

Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 40ef5c9)
linhongliu-db added a commit to linhongliu-db/spark that referenced this pull request Oct 16, 2020
…t DECIMAL columns

This PR fixes a conflict between `RewriteDistinctAggregates` and `DecimalAggregates`.
In some cases, `DecimalAggregates` will wrap the decimal column to `UnscaledValue` using
different rules for different aggregates.

This means, same distinct column with different aggregates will change to different distinct columns
after `DecimalAggregates`. For example:
`avg(distinct decimal_col), sum(distinct decimal_col)` may change to
`avg(distinct UnscaledValue(decimal_col)), sum(distinct decimal_col)`

We assume after `RewriteDistinctAggregates`, there will be at most one distinct column in aggregates,
but `DecimalAggregates` breaks this assumption. To fix this, we have to switch the order of these two
rules.

bug fix

no

added test cases

Closes apache#29673 from linhongliu-db/SPARK-32816.

Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 40ef5c9)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants