Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-41391][SQL] The output column name of groupBy.agg(count_distinct) is incorrect #38917

Closed

Conversation

zhengruifeng
Copy link
Contributor

What changes were proposed in this pull request?

correct the output column name of groupBy.agg(count_distinct)

Why are the changes needed?

before this PR: [id: bigint, count(value): bigint]

scala> val df = spark.range(1, 10).withColumn("value", lit(1))
df: org.apache.spark.sql.DataFrame = [id: bigint, value: int]

scala> df.select(count_distinct($"value"))
res0: org.apache.spark.sql.DataFrame = [count(DISTINCT value): bigint]

scala> df.groupBy("id").agg(count_distinct($"value"))
res1: org.apache.spark.sql.DataFrame = [id: bigint, count(value): bigint]

scala> df.select(sum_distinct($"value"))
res2: org.apache.spark.sql.DataFrame = [sum(DISTINCT value): bigint]

scala> df.groupBy("id").agg(sum_distinct($"value"))
res3: org.apache.spark.sql.DataFrame = [id: bigint, sum(DISTINCT value): bigint]

scala> df.createOrReplaceTempView("table")

scala> spark.sql(" SELECT id, COUNT(DISTINCT value) FROM table GROUP BY id ")
res5: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT value): bigint]

after this PR: [id: bigint, count(DISTINCT value): bigint]

scala> val df = spark.range(1, 10).withColumn("value", lit(1))
df: org.apache.spark.sql.DataFrame = [id: bigint, value: int]

scala> df.select(count_distinct($"value"))
res0: org.apache.spark.sql.DataFrame = [count(DISTINCT value): bigint]

scala> df.groupBy("id").agg(count_distinct($"value"))
res1: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT value): bigint]

Does this PR introduce any user-facing change?

the default column name changed

How was this patch tested?

added UT

init
Copy link
Contributor

@amaliujia amaliujia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM nice catch!

@@ -1134,6 +1134,11 @@ class DataFrameSuite extends QueryTest
checkAnswer(approxSummaryDF, approxSummaryResult)
}

test("SPARK-41391: Correct the output column name of groupBy.agg(count_distinct)") {
val df = person.groupBy("id").agg(count_distinct(col("name")))
assert(df.columns === Array("id", "count(DISTINCT name)"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: does it make sense to compare with columns from the SQL example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, will update

@zhengruifeng
Copy link
Contributor Author

sql - other keeps failing, I need a bit more time to investigate

@zhengruifeng
Copy link
Contributor Author

need to take * into account, and groupBy.agg(count_distinct($"*")) output column count(unresolvedstar())

scala> df.select(count_distinct(col("*")))
res12: org.apache.spark.sql.DataFrame = [count(DISTINCT id, value): bigint]

scala> df.groupBy("id").agg(count_distinct($"*"))
res13: org.apache.spark.sql.DataFrame = [id: bigint, count(unresolvedstar()): bigint]

scala> spark.sql(" SELECT COUNT(DISTINCT *) FROM table ")
res14: org.apache.spark.sql.DataFrame = [count(DISTINCT id, value): bigint]

scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")
res15: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT id, value): bigint]

@zhengruifeng
Copy link
Contributor Author

this PR causes SPARK-27581: DataFrame count_distinct("*") shouldn't fail with AnalysisException fail:

2022-12-06T10:00:45.0030472Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m- SPARK-27581: DataFrame count_distinct("*") shouldn't fail with AnalysisException *** FAILED *** (12 milliseconds)�[0m�[0m
2022-12-06T10:00:45.0035652Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  org.apache.spark.sql.AnalysisException: Invalid usage of '*' in expression 'count'.�[0m�[0m
2022-12-06T10:00:45.0041055Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.errors.QueryCompilationErrors$.invalidStarUsageError(QueryCompilationErrors.scala:465)�[0m�[0m
2022-12-06T10:00:45.0045499Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$expandStarExpression$1.applyOrElse(Analyzer.scala:1759)�[0m�[0m
2022-12-06T10:00:45.0050364Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$expandStarExpression$1.applyOrElse(Analyzer.scala:1715)�[0m�[0m
2022-12-06T10:00:45.0054806Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:566)�[0m�[0m
2022-12-06T10:00:45.0060748Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)�[0m�[0m
2022-12-06T10:00:45.0065073Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:566)�[0m�[0m
2022-12-06T10:00:45.0075998Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:563)�[0m�[0m
2022-12-06T10:00:45.0080489Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.collection.immutable.List.map(List.scala:293)�[0m�[0m

I believe the analyzer need to be changed to fix this issue, let me close this PR and ping @cloud-fan and @viirya to take a look since I think it's related to #24482.

scala> val df = spark.range(1, 10).withColumn("value", lit(1))
df: org.apache.spark.sql.DataFrame = [id: bigint, value: int]

scala> df.createOrReplaceTempView("table")

scala> df.groupBy("id").agg(count_distinct($"value"))
res1: org.apache.spark.sql.DataFrame = [id: bigint, count(value): bigint]

scala> spark.sql(" SELECT id, COUNT(DISTINCT value) FROM table GROUP BY id ")
res2: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT value): bigint]

scala> df.groupBy("id").agg(count_distinct($"*"))
res3: org.apache.spark.sql.DataFrame = [id: bigint, count(unresolvedstar()): bigint]

scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")
res4: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT id, value): bigint]

@zhengruifeng zhengruifeng deleted the sql_fix_count_distinct_name branch March 30, 2023 03:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants