Skip to content

Comments

[SPARK-38221][SQL] Eagerly iterate over groupingExpressions when moving complex grouping expressions out of an Aggregate node#35537

Closed
bersprockets wants to merge 1 commit intoapache:masterfrom
bersprockets:groupby_stream_issue
Closed

[SPARK-38221][SQL] Eagerly iterate over groupingExpressions when moving complex grouping expressions out of an Aggregate node#35537
bersprockets wants to merge 1 commit intoapache:masterfrom
bersprockets:groupby_stream_issue

Conversation

@bersprockets
Copy link
Contributor

What changes were proposed in this pull request?

Change PullOutGroupingExpressions to eagerly iterate over groupingExpressions when building complexGroupingExpressionMap.

Why are the changes needed?

Consider this query:

Seq(1).toDF("id").groupBy(Stream($"id" + 1, $"id" + 2): _*).sum("id").show(false)

It fails with

java.lang.IllegalStateException: Couldn't find _groupingexpression#24 in [id#4,_groupingexpression#23]
  at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
  at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:425)
  at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:73)
  at org.apache.spark.sql.catalyst.expressions.BindReferences$.$anonfun$bindReferences$1(BoundAttribute.scala:94)
  at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163)
  at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163)
  at scala.collection.immutable.Stream.foreach(Stream.scala:534)
  at scala.collection.TraversableOnce.count(TraversableOnce.scala:152)
  at scala.collection.TraversableOnce.count$(TraversableOnce.scala:145)
  at scala.collection.AbstractTraversable.count(Traversable.scala:108)
  at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:293)
  at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doConsumeWithKeys(HashAggregateExec.scala:623)
  ... etc ...

When HashAggregateExec attempts to bind the references in the group-by expressions, attribute _groupingexpression#24 is missing from the child ProjectExec's output.

This is due to the way PullOutGroupingExpressions, when determining which grouping expressions to shift from the Aggregate node to a Project node, populates complexGroupingExpressionMap. PullOutGroupingExpressions uses a map operation to iterate over groupingExpressions and updates complexGroupingExpressionMap in the closure passed to map(). However, if groupingExpressions is a Stream, the map operation is evaluated lazily, and isn't fully completed until ComputeCurrentTime calls transformAllExpressionsWithPruning, which is long after PullOutGroupingExpressions completes. Therefore, at the time PullOutGroupingExpressions is ready to create the Project node, complexGroupingExpressionMap is not fully populated. As result, the Project node is missing all but the first complex grouping expression.

Does this PR introduce any user-facing change?

No, other than the above query now works.

How was this patch tested?

New unit test.

@github-actions github-actions bot added the SQL label Feb 16, 2022
Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@HyukjinKwon
Copy link
Member

The PR description is very detailed and easy to follow, thanks @bersprockets

@bersprockets
Copy link
Contributor Author

Test failure seems unrelated:

MariaDBKrbIntegrationSuite.(It is not a test it is a sbt.testing.SuiteSelector)
org.scalatest.exceptions.TestFailedDueToTimeoutException: 
...etc...
Socket fail to connect to host:10.1.0.81, port:34883. Connection refused (Connection refused).

HyukjinKwon pushed a commit that referenced this pull request Feb 16, 2022
…ng complex grouping expressions out of an Aggregate node

### What changes were proposed in this pull request?

Change `PullOutGroupingExpressions` to eagerly iterate over `groupingExpressions` when building `complexGroupingExpressionMap`.

### Why are the changes needed?

Consider this query:
```
Seq(1).toDF("id").groupBy(Stream($"id" + 1, $"id" + 2): _*).sum("id").show(false)
```
It fails with
```
java.lang.IllegalStateException: Couldn't find _groupingexpression#24 in [id#4,_groupingexpression#23]
  at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
  at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:425)
  at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:73)
  at org.apache.spark.sql.catalyst.expressions.BindReferences$.$anonfun$bindReferences$1(BoundAttribute.scala:94)
  at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163)
  at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163)
  at scala.collection.immutable.Stream.foreach(Stream.scala:534)
  at scala.collection.TraversableOnce.count(TraversableOnce.scala:152)
  at scala.collection.TraversableOnce.count$(TraversableOnce.scala:145)
  at scala.collection.AbstractTraversable.count(Traversable.scala:108)
  at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:293)
  at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doConsumeWithKeys(HashAggregateExec.scala:623)
  ... etc ...
```
When `HashAggregateExec` attempts to bind the references in the group-by expressions, attribute _groupingexpression#24 is missing from the child `ProjectExec`'s output.

This is due to the way `PullOutGroupingExpressions`, when determining which grouping expressions to shift from the `Aggregate` node to a `Project` node,  populates `complexGroupingExpressionMap`. `PullOutGroupingExpressions` uses a map operation to iterate over `groupingExpressions` and updates `complexGroupingExpressionMap` in the closure passed to `map()`. However, if `groupingExpressions` is a `Stream`, the map operation is evaluated lazily, and isn't fully completed until `ComputeCurrentTime` calls `transformAllExpressionsWithPruning`, which is long after `PullOutGroupingExpressions` completes. Therefore, at the time `PullOutGroupingExpressions` is ready to create the `Project` node, `complexGroupingExpressionMap` is not fully populated. As result, the `Project` node is missing all but the first complex grouping expression.

### Does this PR introduce _any_ user-facing change?

No, other than the above query now works.

### How was this patch tested?

New unit test.

Closes #35537 from bersprockets/groupby_stream_issue.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit ad2bc7d)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@HyukjinKwon
Copy link
Member

Yeah.

Merged to master and branch-3.2.

@bersprockets
Copy link
Contributor Author

Thanks @HyukjinKwon

@bersprockets bersprockets deleted the groupby_stream_issue branch August 10, 2022 18:25
kazuyukitanimura pushed a commit to kazuyukitanimura/spark that referenced this pull request Aug 10, 2022
…ng complex grouping expressions out of an Aggregate node

### What changes were proposed in this pull request?

Change `PullOutGroupingExpressions` to eagerly iterate over `groupingExpressions` when building `complexGroupingExpressionMap`.

### Why are the changes needed?

Consider this query:
```
Seq(1).toDF("id").groupBy(Stream($"id" + 1, $"id" + 2): _*).sum("id").show(false)
```
It fails with
```
java.lang.IllegalStateException: Couldn't find _groupingexpression#24 in [id#4,_groupingexpression#23]
  at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
  at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:425)
  at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:73)
  at org.apache.spark.sql.catalyst.expressions.BindReferences$.$anonfun$bindReferences$1(BoundAttribute.scala:94)
  at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163)
  at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163)
  at scala.collection.immutable.Stream.foreach(Stream.scala:534)
  at scala.collection.TraversableOnce.count(TraversableOnce.scala:152)
  at scala.collection.TraversableOnce.count$(TraversableOnce.scala:145)
  at scala.collection.AbstractTraversable.count(Traversable.scala:108)
  at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:293)
  at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doConsumeWithKeys(HashAggregateExec.scala:623)
  ... etc ...
```
When `HashAggregateExec` attempts to bind the references in the group-by expressions, attribute _groupingexpression#24 is missing from the child `ProjectExec`'s output.

This is due to the way `PullOutGroupingExpressions`, when determining which grouping expressions to shift from the `Aggregate` node to a `Project` node,  populates `complexGroupingExpressionMap`. `PullOutGroupingExpressions` uses a map operation to iterate over `groupingExpressions` and updates `complexGroupingExpressionMap` in the closure passed to `map()`. However, if `groupingExpressions` is a `Stream`, the map operation is evaluated lazily, and isn't fully completed until `ComputeCurrentTime` calls `transformAllExpressionsWithPruning`, which is long after `PullOutGroupingExpressions` completes. Therefore, at the time `PullOutGroupingExpressions` is ready to create the `Project` node, `complexGroupingExpressionMap` is not fully populated. As result, the `Project` node is missing all but the first complex grouping expression.

### Does this PR introduce _any_ user-facing change?

No, other than the above query now works.

### How was this patch tested?

New unit test.

Closes apache#35537 from bersprockets/groupby_stream_issue.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit ad2bc7d)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants