Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-41391][SQL] The output column name of groupBy.agg(count_distinct) is incorrect #40116

Closed
wants to merge 16 commits into from

Conversation

ritikam2
Copy link
Contributor

@ritikam2 ritikam2 commented Feb 22, 2023

What changes were proposed in this pull request?

correct the output column name of groupBy.agg(count_distinct), so the "*" is expanded correctly into column names and the output column has the distinct keyword.

Why are the changes needed?

Output column name for groupBy.agg(count_distinct) is incorrect . However similar queries in spark sql return correct output column. For groupBy.agg queries on dataframe "*" is not expanded correctly in the output column and the distinct keyword is missing from output column.

// initializing data
scala> val df = spark.range(1, 10).withColumn("value", lit(1))
df: org.apache.spark.sql.DataFrame = [id: bigint, value: int]
scala> df.createOrReplaceTempView("table")

// Dataframe  aggregate queries with incorrect output column
scala> df.groupBy("id").agg(count_distinct($"*"))
res3: org.apache.spark.sql.DataFrame = [id: bigint, count(unresolvedstar()): bigint]
scala> df.groupBy("id").agg(count_distinct($"value"))
res1: org.apache.spark.sql.DataFrame = [id: bigint, count(value): bigint]

// Spark Sql aggregate queries with correct output column
scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")
res4: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT id, value): bigint]
scala> spark.sql(" SELECT id, COUNT(DISTINCT value) FROM table GROUP BY id ")
res2: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT value): bigint]

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added UT

@github-actions github-actions bot added the SQL label Feb 22, 2023
@zhengruifeng
Copy link
Contributor

I guess you may need to Go to “Actions” tab on your forked repository and enable “Build and test” and “Report test results” workflows

https://spark.apache.org/contributing.html

@ritikam2
Copy link
Contributor Author

Did that

@srowen
Copy link
Member

srowen commented Feb 25, 2023

Eh, this does not explain the issue at all. Please do so.

@ritikam2
Copy link
Contributor Author

I have enabled the workflows on the branch. Is there something else that I need to do?

@ritikam2
Copy link
Contributor Author

Sean not sure which issue you were referring to. I updated the why the changes are needed section of the pull request to mirror what Zheng had already put in his pull request.

@srowen
Copy link
Member

srowen commented Feb 26, 2023

This is about SPARK-41391? it also doesn't contain a simple description of what you're reporting, just code snippets. I can work it out, but this could be explained in just a few sentences

@srowen
Copy link
Member

srowen commented Feb 26, 2023

Please fix the PR description too https://spark.apache.org/contributing.html

@ritikam2 ritikam2 changed the title [WIP]SPARK-41391 Fix SPARK-41391[SQL][WIP] Feb 26, 2023
@ritikam2
Copy link
Contributor Author

Sean I tried to correct the two things pointed out by you. Let me know if that works

@srowen
Copy link
Member

srowen commented Feb 26, 2023

Looks better. Title should start with [SPARK-41391] to link it. Please include the description in the title; there is nothing there now

@ritikam2 ritikam2 changed the title SPARK-41391[SQL][WIP] SPARK-41391[SQL][WIP] The output column name of groupBy.agg(count_distinct) is incorrect Feb 26, 2023
@ritikam2 ritikam2 changed the title SPARK-41391[SQL][WIP] The output column name of groupBy.agg(count_distinct) is incorrect SPARK-41391[SQL] The output column name of groupBy.agg(count_distinct) is incorrect Feb 27, 2023
@zhengruifeng zhengruifeng changed the title SPARK-41391[SQL] The output column name of groupBy.agg(count_distinct) is incorrect [SPARK-41391][SQL] The output column name of groupBy.agg(count_distinct) is incorrect Feb 27, 2023
@ritikam2
Copy link
Contributor Author

Not sure how my checkins are causing javadoc genration error

@srowen
Copy link
Member

srowen commented Feb 27, 2023

It's the [[Star]] in the scaladoc you added. Just don't make it a reference

@ritikam2
Copy link
Contributor Author

ritikam2 commented Mar 1, 2023

Is there anything else that I need to do for the fix to be accepted?

@srowen
Copy link
Member

srowen commented Mar 1, 2023

@cloud-fan or @HyukjinKwon do you have an opinion?

/**
* Returns true if `exprs` contains a star.
*/
def containsStar(exprs: Seq[Expression]): Boolean =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be private.
since it is only used once, i think we can inline it.

@@ -89,9 +89,18 @@ class RelationalGroupedDataset protected[sql](
case expr: NamedExpression => expr
case a: AggregateExpression if a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>
UnresolvedAlias(a, Some(Column.generateAlias))
case ag: UnresolvedFunction if (containsStar(Seq(ag))) || ag.isDistinct =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's weird to have this special case. Shall we always use UnresolvedAlias?

@ritikam2
Copy link
Contributor Author

ritikam2 commented Mar 2, 2023

Not sure why the suggested changes made the build fail in the
catalyst,hive-thriftserver module and
sql-other test module.
2023-03-01T22:23:36.6700903Z Error instrumenting class:org.apache.spark.sql.execution.streaming.state.SchemaHelper$SchemaV2Reader2023-03-01T22:23:36.8662344Z Error instrumenting class:org.apache.spark.sql.v2.avro.AvroScan
2023-03-01T22:23:36.8712474Z Error instrumenting class:org.apache.spark.api.python.DoubleArrayWritable

/**
* Returns true if `exprs` contains a star.
*/
@inline final private def containsStar(exprs: Seq[Expression]): Boolean =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's probably remove this.

@ritikam2
Copy link
Contributor Author

ritikam2 commented Mar 3, 2023

Any comments. Apparently having all expr as unresolvedAlias is not working.

@cloud-fan
Copy link
Contributor

Apparently having all expr as unresolvedAlias is not working.

Can you share the test failures? Maybe we just need to update the tests with the different alias name.

@ritikam2
Copy link
Contributor Author

ritikam2 commented Mar 8, 2023

@cloud-fan
Copy link
Contributor

I think the test is easy to fix. It wants to test the aggregate function result, but not the generated alias, so we just change the testing query to add alias explicitly.

val avgDF = intervalData.select(
      avg($"year-month").as("a1"),
      avg($"year").as("a2"),
      ...

@ritikam2
Copy link
Contributor Author

I think the test is easy to fix. It wants to test the aggregate function result, but not the generated alias, so we just change the testing query to add alias explicitly.

val avgDF = intervalData.select(
      avg($"year-month").as("a1"),
      avg($"year").as("a2"),
      ...

Couple of questions

  1. Is it required and documented that we should add alias with the aggregate functions? If that is not a requirement then fixing the test case is potentially covering an issue.
  2. The Thread leaks reported in the sql-other tests in not just from DataFrameAggregateSuite, but from multiple other suites

023-03-03T04:05:16.9822203Z 04:05:16.978 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 393.0 failed 1 times; aborting job
2023-03-03T04:05:16.9866693Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[32m- SPARK-30668: use legacy timestamp parser in to_timestamp (154 milliseconds)�[0m�[0m
2023-03-03T04:05:17.0464670Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[32m- SPARK-30752: convert time zones on a daylight saving day (62 milliseconds)�[0m�[0m
2023-03-03T04:05:17.1930942Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[32m- SPARK-30766: date_trunc of old timestamps to hours and days (142 milliseconds)�[0m�[0m
2023-03-03T04:05:17.3358608Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[32m- SPARK-30793: truncate timestamps before the epoch to seconds and minutes (146 milliseconds)�[0m�[0m
2023-03-03T04:05:17.3824844Z 04:05:17.382 WARN org.apache.spark.sql.DateFunctionsSuite:
2023-03-03T04:05:17.3845065Z
2023-03-03T04:05:17.3846873Z ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.DateFunctionsSuite,

@cloud-fan
Copy link
Contributor

The auto-generated alias name is fragile and we are trying to improve it at #40126

Can you give some examples of how the new update changes the alias name? If it's not reasonable, we should keep the previous code.

@ritikam2
Copy link
Contributor Author

ritikam2 commented Mar 14, 2023

The auto-generated alias name is fragile and we are trying to improve it at #40126

Can you give some examples of how the new update changes the alias name? If it's not reasonable, we should keep the previous code.

I am attaching a file showing some failures when all the aggregate expressions were made UnresolvedAlias. My latest checkin where I only make those aggregate expressions that have "*" as UnresolvedAlias works. The build went through.So it is essentially the unresolvedstar() that is being produced by the toPrettySQL for the agg expr with star that the Analyzer is not able to resolve.
sqlOtherTests.txt

@ritikam2
Copy link
Contributor Author

ritikam2 commented Mar 15, 2023

Can anyone tell me how I am getting this single quote in count expression. Attaching the picture. This can potentially cause problems down the lane where tree nodes are compared in the transformDownWithPruning where the two nodes are not same because of this single quote
Screen Shot 2023-03-15 at 9 34 51 AM

@cloud-fan
Copy link
Contributor

The single quote indicates that the expression is unresolved, I think it doesn't matter here.

@@ -40,12 +40,15 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits {
*/
implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
new ColumnName(sc.s(args: _*))
if (sc.parts.length == 1 && sc.parts.contains("*")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this change fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yuo this is redundant. Removed it in the latest build

if (containsStar(Seq(expr))) {
UnresolvedAlias(expr, None)
} else {
Alias(expr, toPrettySQL(expr))()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want a surgical fix, shall we fix how toPrettySQL handles star?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want a surgical fix, shall we fix how toPrettySQL handles star?

Sure we can fix toPrettySQL. But the best we can do is to get count(distinct *) which is not the same as what spark.sql produces.

If we want to duplicate spark.sql behavior the best option would be to create unresolvedAlias for expr containing "*" as pushed in the latest build.

@ritikam2
Copy link
Contributor Author

ritikam2 commented Mar 22, 2023

Perhaps the following would be better solution. Instead of looking for star any UnresolvedFunction should have UnresolvedAlias. Any comments?

private[this] def alias(expr: Expression): NamedExpression = expr match {
case expr: NamedExpression => expr
case a: AggregateExpression if a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>
UnresolvedAlias(a, Some(Column.generateAlias))
case expr: Expression =>
if (expr.isInstanceOf[UnresolvedFunction]) {
UnresolvedAlias(expr, None)
} else {
Alias(expr, toPrettySQL(expr))()
}
}

@cloud-fan
Copy link
Contributor

cloud-fan commented Mar 24, 2023

any UnresolvedFunction should have UnresolvedAlias.

SGTM. Or more aggressively, any expression should have UnresolvedAlias, and update failed tests.

@ritikam2
Copy link
Contributor Author

Right. This is simple 1 file fix with addition of test case versus the other one which may involve number of files.

@ritikam2
Copy link
Contributor Author

Please see if this fix can be pulled.

@@ -89,7 +89,12 @@ class RelationalGroupedDataset protected[sql](
case expr: NamedExpression => expr
case a: AggregateExpression if a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>
UnresolvedAlias(a, Some(Column.generateAlias))
case expr: Expression => Alias(expr, toPrettySQL(expr))()
case expr: Expression =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

case u: UnresolvedFunction => UnresolvedAlias(expr, None)
case expr: Expression => Alias(expr, toPrettySQL(expr))() 

@@ -40,12 +40,11 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits {
*/
implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
new ColumnName(sc.s(args: _*))
new ColumnName(sc.s(args: _*))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change

@@ -45,7 +45,7 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits {
}

// Primitives

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will fail scalastyle check

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in cb7d082 Mar 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants