Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40086][SPARK-42049][SQL] Improve AliasAwareOutputPartitioning and AliasAwareQueryOutputOrdering to take all aliases into account #37525

Conversation

peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Aug 15, 2022

What changes were proposed in this pull request?

Currently AliasAwareOutputPartitioning and AliasAwareQueryOutputOrdering
takes only the last alias by aliased expressions into account. We could avoid some extra shuffles and sorts with better alias handling.

Why are the changes needed?

Performance improvement and this also fix the issue in #39475.

Does this PR introduce any user-facing change?

Yes, this PR fixes the issue in #39475.

How was this patch tested?

Added new UT.

@github-actions github-actions bot added the SQL label Aug 15, 2022
@@ -681,7 +681,7 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
df.createTempView("df")

val sqlText = "EXPLAIN CODEGEN SELECT key, MAX(value) FROM df GROUP BY key"
val expectedCodegenText = "Found 2 WholeStageCodegen subtrees."
val expectedCodegenText = "Found 1 WholeStageCodegen subtrees."
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain codegen output (plan) changed from:

Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 (maxMethodCodeSize:282; maxConstantPoolSize:348(0.53% used); numInnerClasses:2) ==
*(1) HashAggregate(keys=[key#xL], functions=[partial_max(value#xL)], output=[key#xL, max#xL])
+- *(1) Project [id#xL AS key#xL, id#xL AS value#xL]
   +- *(1) Range (0, 5, step=1, splits=2)


== Subtree 2 / 2 (maxMethodCodeSize:252; maxConstantPoolSize:214(0.33% used); numInnerClasses:0) ==
*(2) HashAggregate(keys=[key#xL], functions=[max(value#xL)], output=[key#xL, max(value)#xL])
+- AQEShuffleRead coalesced
   +- ShuffleQueryStage 0
      +- Exchange hashpartitioning(key#xL, 5), ENSURE_REQUIREMENTS, [plan_id=55]
         +- *(1) HashAggregate(keys=[key#xL], functions=[partial_max(value#xL)], output=[key#xL, max#xL])
            +- *(1) Project [id#xL AS key#xL, id#xL AS value#xL]
               +- *(1) Range (0, 5, step=1, splits=2)

to:

Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 (maxMethodCodeSize:308; maxConstantPoolSize:374(0.57% used); numInnerClasses:2) ==
*(1) HashAggregate(keys=[key#xL], functions=[max(value#xL)], output=[key#xL, max(value)#xL])
+- *(1) HashAggregate(keys=[key#xL], functions=[partial_max(value#xL)], output=[key#xL, max#xL])
   +- *(1) Project [id#xL AS key#xL, id#xL AS value#xL]
      +- *(1) Range (0, 5, step=1, splits=2)

@@ -339,12 +339,12 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite {
// ShuffleQueryStage 0
// ShuffleQueryStage 2
// ReusedQueryStage 0
val grouped = df.groupBy("key").agg(max("value").as("value"))
val grouped = df.groupBy((col("key") + 1).as("key")).agg(max("value").as("value"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to modify the test because the fix modified the explain plan of the original query from:

Union
:- *(5) HashAggregate(keys=[_groupingexpression#79L], functions=[max(value#38L)], output=[(key + 1)#44L, max(value)#45L])
:  +- AQEShuffleRead coalesced
:     +- ShuffleQueryStage 3
:        +- Exchange hashpartitioning(_groupingexpression#79L, 5), ENSURE_REQUIREMENTS, [plan_id=693]
:           +- *(3) HashAggregate(keys=[_groupingexpression#79L], functions=[partial_max(value#38L)], output=[_groupingexpression#79L, max#62L])
:              +- *(3) HashAggregate(keys=[key#12L], functions=[max(value#13L)], output=[value#38L, _groupingexpression#79L])
:                 +- AQEShuffleRead coalesced
:                    +- ShuffleQueryStage 0
:                       +- Exchange hashpartitioning(key#12L, 5), ENSURE_REQUIREMENTS, [plan_id=623]
:                          +- *(1) HashAggregate(keys=[key#12L], functions=[partial_max(value#13L)], output=[key#12L, max#64L])
:                             +- *(1) Project [id#10L AS key#12L, id#10L AS value#13L]
:                                +- *(1) Range (0, 6, step=1, splits=10)
+- *(6) HashAggregate(keys=[_groupingexpression#80L], functions=[max(value#38L)], output=[(key + 2)#51L, max(value)#52L])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 4
         +- Exchange hashpartitioning(_groupingexpression#80L, 5), ENSURE_REQUIREMENTS, [plan_id=719]
            +- *(4) HashAggregate(keys=[_groupingexpression#80L], functions=[partial_max(value#38L)], output=[_groupingexpression#80L, max#66L])
               +- *(4) HashAggregate(keys=[key#12L], functions=[max(value#13L)], output=[value#38L, _groupingexpression#80L])
                  +- AQEShuffleRead coalesced
                     +- ShuffleQueryStage 2
                        +- ReusedExchange [key#12L, max#64L], Exchange hashpartitioning(key#12L, 5), ENSURE_REQUIREMENTS, [plan_id=623]

to (1 less exchange):

Union
:- *(3) HashAggregate(keys=[_groupingexpression#75L], functions=[max(value#38L)], output=[(key + 1)#44L, max(value)#45L])
:  +- AQEShuffleRead coalesced
:     +- ShuffleQueryStage 0
:        +- Exchange hashpartitioning(_groupingexpression#75L, 5), ENSURE_REQUIREMENTS, [plan_id=514]
:           +- *(1) HashAggregate(keys=[_groupingexpression#75L], functions=[partial_max(value#38L)], output=[_groupingexpression#75L, max#62L])
:              +- *(1) HashAggregate(keys=[key#12L], functions=[max(value#13L)], output=[value#38L, _groupingexpression#75L])
:                 +- *(1) HashAggregate(keys=[key#12L], functions=[partial_max(value#13L)], output=[key#12L, max#64L])
:                    +- *(1) Project [id#10L AS key#12L, id#10L AS value#13L]
:                       +- *(1) Range (0, 6, step=1, splits=10)
+- *(4) HashAggregate(keys=[_groupingexpression#76L], functions=[max(value#38L)], output=[(key + 2)#51L, max(value)#52L])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(_groupingexpression#76L, 5), ENSURE_REQUIREMENTS, [plan_id=532]
            +- *(2) HashAggregate(keys=[_groupingexpression#76L], functions=[partial_max(value#38L)], output=[_groupingexpression#76L, max#66L])
               +- *(2) HashAggregate(keys=[key#12L], functions=[max(value#13L)], output=[value#38L, _groupingexpression#76L])
                  +- *(2) HashAggregate(keys=[key#12L], functions=[partial_max(value#13L)], output=[key#12L, max#64L])
                     +- *(2) Project [id#55L AS key#12L, id#55L AS value#13L]
                        +- *(2) Range (0, 6, step=1, splits=10)

and so the query didn't match the test case 2 description.

@peter-toth peter-toth changed the title [WIP][SPARK-40086][SQL] Improve AliasAwareOutputPartitioning to take all aliases into account [SPARK-40086][SQL] Improve AliasAwareOutputPartitioning to take all aliases into account Aug 16, 2022
@peter-toth
Copy link
Contributor Author

cc @cloud-fan

@peter-toth peter-toth force-pushed the SPARK-40086-fix-aliasawareoutputexpression branch from eb3cd45 to e044ff8 Compare August 17, 2022 14:02
@peter-toth
Copy link
Contributor Author

@cloud-fan, @imback82, can you please help to review this PR?

@peter-toth peter-toth changed the title [SPARK-40086][SQL] Improve AliasAwareOutputPartitioning to take all aliases into account [WIP][SPARK-40086][SQL] Improve AliasAwareOutputPartitioning to take all aliases into account Sep 26, 2022
@peter-toth peter-toth force-pushed the SPARK-40086-fix-aliasawareoutputexpression branch 3 times, most recently from 04c77d6 to 730e785 Compare September 28, 2022 13:33
@peter-toth peter-toth force-pushed the SPARK-40086-fix-aliasawareoutputexpression branch from 730e785 to ddf16c9 Compare September 28, 2022 18:38
@peter-toth peter-toth force-pushed the SPARK-40086-fix-aliasawareoutputexpression branch from ddf16c9 to 7cf5260 Compare December 27, 2022 14:33
@peter-toth peter-toth force-pushed the SPARK-40086-fix-aliasawareoutputexpression branch from 7cf5260 to 1fe49c3 Compare January 15, 2023 09:17
@peter-toth peter-toth changed the title [WIP][SPARK-40086][SQL] Improve AliasAwareOutputPartitioning to take all aliases into account [WIP][SPARK-40086][SPARK-42049][SQL] Improve AliasAwareOutputPartitioning and AliasAwareQueryOutputOrdering to take all aliases into account Jan 15, 2023
val outputSet = AttributeSet(outputExpressions.map(_.toAttribute))

def pruneCandidate(candidate: Expression): Option[Expression] = {
expr.multiTransform {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, @ulysses-you I've updated this PR. Now it is based on multiTransform and contains changes from both this PR and #39556 (see the description).

normalizeExpression() becomes as simple as this with multiTransform.

Please note that currently pruneFunc is used only for "after transformation filtering", but, as multiTransform does the mapping in "one run" (unlike the removed code which runs a transform for each alias) so it is much more efficient than the removed version if we have high number of aliases.

Some early pruning would also be possible using multiTransform, I will show you that version a bit later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to my usage. The multiTransform should at least support 4 cases of pruning:

  • the max limit size of returned result
  • prune the result whose references is not subset of output
  • prune intermediate result if the alias map does not contain any other sub-expression
  • prune sub-expression, e.g. PartitionCollection(a, b) -> PartitionCollection(a) if b is not subset of output

If all this requirements can be matched, I think it's good to switch to multi-transofrm.

@peter-toth peter-toth force-pushed the SPARK-40086-fix-aliasawareoutputexpression branch 3 times, most recently from 4c0a1ea to 98b7a15 Compare January 17, 2023 13:20
@peter-toth
Copy link
Contributor Author

peter-toth commented Jan 17, 2023

@ulysses-you, @cloud-fan, I've rebased this PR on top of master, that now includes multiTransform():

Let me know your thougths.

@peter-toth peter-toth changed the title [WIP][SPARK-40086][SPARK-42049][SQL] Improve AliasAwareOutputPartitioning and AliasAwareQueryOutputOrdering to take all aliases into account [SPARK-40086][SPARK-42049][SQL] Improve AliasAwareOutputPartitioning and AliasAwareQueryOutputOrdering to take all aliases into account Jan 17, 2023
projectExpression(e)
.filter(e => partitioningSet.add(e.canonicalized))
.take(aliasCandidateLimit)
.asInstanceOf[Stream[Partitioning]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to cast to Stream? It will be pattern matched by Seq(...) immediately which will materialize it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cast is required to avoid a compile error as projectExpression returns Stream[Expression] but the flatMap requires Seq[Partitioning].
We could use .asInstanceOf[Seq[Partitioning]] here but I'm not sure it makes any difference.

if (requiredOrdering.length > outputOrdering.length) {
false
} else {
requiredOrdering.zip(outputOrdering).forall {
case (requiredOrder, outputOrder) => requiredOrder.semanticEquals(outputOrder)
case (requiredOrder, outputOrder) =>
// Follow `SortOrder.satisfies` that respects `SortOrder.sameOrderExpressions`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is not needed, as we are not following SortOrder.satisfies, but using it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed, fixed in 733ecb5

" It can preserve the output partitioning and ordering." +
" Negative value means disable this optimization.")
.internal()
.version("3.4.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR targets master, which is 3.5.0. Is this going to be merged into branch-3.4, which is feature-freeze? If not, this line should be adjusted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to merge it to 3.4 as it fixes a bug in planned write, which is a new feature in 3.4.

Comment on lines +41 to +44
val partitioningSet = mutable.Set.empty[Expression]
projectExpression(e)
.filter(e => partitioningSet.add(e.canonicalized))
.take(aliasCandidateLimit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala 2.13 allows to simplify this. Its a shame...

Suggested change
val partitioningSet = mutable.Set.empty[Expression]
projectExpression(e)
.filter(e => partitioningSet.add(e.canonicalized))
.take(aliasCandidateLimit)
projectExpression(e)
.distinctBy(_.canonicalized)
.take(aliasCandidateLimit)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I think we still need to support scala 2.12 for now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, that's the shame bit

Copy link
Contributor

@EnricoMi EnricoMi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once again, I can confirm that for the example in #38356 master writes

WriteFiles
+- *(1) Sort [p#19 ASC NULLS FIRST], false, 0
   +- *(1) Project [id#10, sort_col#11, empty2null(p#12) AS p#19]
      +- ShuffleQueryStage 0
         +- Exchange SinglePartition, REPARTITION_BY_NUM, [plan_id=18]
            +- LocalTableScan [id#10, sort_col#11, p#12]

while this fix writes

WriteFiles
+- *(1) Project [id#10, sort_col#11, empty2null(p#12) AS p#19]
   +- *(1) Sort [p#12 ASC NULLS FIRST, sort_col#11 ASC NULLS FIRST], false, 0
      +- ShuffleQueryStage 0
         +- Exchange SinglePartition, REPARTITION_BY_NUM, [plan_id=18]
            +- LocalTableScan [id#10, sort_col#11, p#12]

LGTM!

@cloud-fan
Copy link
Contributor

@peter-toth can you retrigger the tests? The pyspark failures may be flaky.

case _ =>
}
outputExpressions.foreach {
case a: Attribute if aliases.contains(a.canonicalized) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do ? do you mean !aliases.contains(a.canonicalized) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have c, c AS a projection then we need to add both the original c attribute and a to the alternatives of c. But we don't need to add an attribute if it isn't aliased.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the behavior of c as a ? this code seems to return both c and a. I think the right way should be
if AttributSet(outputExpressions).contains(a) => // add a to buffer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case doesn't match c AS a. This case makes sure that if c -> a has been added to the map by the previous forach then c -> c should be added too.

I think your code would add all attributes, but that is not needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why c -> c should be added ? the outputExpression only contains c as a, shall we only return c -> a ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I see it. how about this case: c1, c2 as x ? We also should add c1 into aliasMap right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. If c1 is not aliased otherwise then we don't need to add it to the map. If the map doesn't contain any alias for an expression then the transformation does nothing with that c1 attribute, just leaves it in the expression tree as it is...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the aliasMap is not empty due to c2 as x. for this case, how can we preserve c1 if c1 does not add into aliasMap ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see it, nvm. @peter-toth Thank you for the patience !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Np, thanks for reviewing this PR @ulysses-you!

val aliases = mutable.Map[Expression, mutable.ArrayBuffer[Attribute]]()
outputExpressions.reverse.foreach {
case a @ Alias(child, _) =>
val buffer = aliases.getOrElseUpdate(strip(child.canonicalized), mutable.ArrayBuffer.empty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer strip(child).canonicalized. I have not seen other code places that match a canonicalized expression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks. I will change it today.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 1f1f093

Comment on lines 99 to 110
orderingExpressions.flatMap { sortOrder =>
val orderingSet = mutable.Set.empty[Expression]
val sameOrderings = sortOrder.children.toStream
.flatMap(projectExpression)
.filter(e => orderingSet.add(e.canonicalized))
.take(aliasCandidateLimit)
if (sameOrderings.nonEmpty) {
Some(sortOrder.copy(child = sameOrderings.head,
sameOrderExpressions = sameOrderings.tail))
} else {
None
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it correct ? if orderingExpressions is c1, c2 and aliasMap is c2 as x, then the outputOrdering should be c1, x or empty rather than x.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Partitioning is fine as the entire HashPartitioning is one expression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also add tests for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I get this. If we have 2 orderings in orderingExpressions (SortOrder (c1), SortOrder(c2)) and projection is c2 as x then why we don't expect SortOrder(x)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I might see the issue here... Let me fix it today.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If data is sorted by a, b, we can't say the data is sorted by b, but we can say it's sorted by a.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it already thanks! I will submit a fix today.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the fix and a new test: 1f1f093

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.4 (as it fixes a bug in planned write)!

cloud-fan pushed a commit that referenced this pull request Jan 31, 2023
…and AliasAwareQueryOutputOrdering to take all aliases into account

### What changes were proposed in this pull request?
Currently `AliasAwareOutputPartitioning` and `AliasAwareQueryOutputOrdering`
takes only the last alias by aliased expressions into account. We could avoid some extra shuffles and sorts with better alias handling.

### Why are the changes needed?
Performance improvement and this also fix the issue in #39475.

### Does this PR introduce _any_ user-facing change?
Yes, this PR fixes the issue in #39475.

### How was this patch tested?
Added new UT.

Closes #37525 from peter-toth/SPARK-40086-fix-aliasawareoutputexpression.

Lead-authored-by: Peter Toth <peter.toth@gmail.com>
Co-authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 6341b06)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan cloud-fan closed this in 6341b06 Jan 31, 2023
@peter-toth
Copy link
Contributor Author

Thanks for the review!

dongjoon-hyun pushed a commit that referenced this pull request Feb 24, 2023
…itioning

### What changes were proposed in this pull request?

This is a follow-up of #37525 . When the project list has aliases, we go to the `projectExpression` branch which filters away invalid partitioning/ordering that reference non-existing attributes in the current plan node. However, this filtering is missing when the project list has no alias, where we directly return the child partitioning/ordering.

This PR fixes it.

### Why are the changes needed?

to make sure we always return valid output partitioning/ordering.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new tests

Closes #40137 from cloud-fan/alias.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
dongjoon-hyun pushed a commit that referenced this pull request Feb 24, 2023
…itioning

### What changes were proposed in this pull request?

This is a follow-up of #37525 . When the project list has aliases, we go to the `projectExpression` branch which filters away invalid partitioning/ordering that reference non-existing attributes in the current plan node. However, this filtering is missing when the project list has no alias, where we directly return the child partitioning/ordering.

This PR fixes it.

### Why are the changes needed?

to make sure we always return valid output partitioning/ordering.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new tests

Closes #40137 from cloud-fan/alias.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 72922ad)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
cloud-fan pushed a commit that referenced this pull request Mar 10, 2023
### What changes were proposed in this pull request?

After #37525 (SPARK-40086 / SPARK-42049) the following, simple subselect expression containing query:
```
select (select sum(id) from t1)
```
fails with:
```
09:48:57.645 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.NullPointerException
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch$lzycompute(BatchScanExec.scala:47)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch(BatchScanExec.scala:47)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.hashCode(BatchScanExec.scala:60)
	at scala.runtime.Statics.anyHash(Statics.java:122)
        ...
	at org.apache.spark.sql.catalyst.trees.TreeNode.hashCode(TreeNode.scala:249)
	at scala.runtime.Statics.anyHash(Statics.java:122)
	at scala.collection.mutable.HashTable$HashUtils.elemHashCode(HashTable.scala:416)
	at scala.collection.mutable.HashTable$HashUtils.elemHashCode$(HashTable.scala:416)
	at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:44)
	at scala.collection.mutable.HashTable.addEntry(HashTable.scala:149)
	at scala.collection.mutable.HashTable.addEntry$(HashTable.scala:148)
	at scala.collection.mutable.HashMap.addEntry(HashMap.scala:44)
	at scala.collection.mutable.HashTable.init(HashTable.scala:110)
	at scala.collection.mutable.HashTable.init$(HashTable.scala:89)
	at scala.collection.mutable.HashMap.init(HashMap.scala:44)
	at scala.collection.mutable.HashMap.readObject(HashMap.scala:195)
        ...
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:85)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1520)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
```
when DSv2 is enabled.

This PR proposes to fix `BatchScanExec` as its `equals()` and `hashCode()` as those shouldn't throw NPE in any circumstances.

But if we dig deeper we realize that the NPE orrurs since #37525 and the root cause of the problem is changing `AliasAwareOutputExpression.aliasMap` from immutable to mutable. The mutable map deserialization invokes the `hashCode()` of the keys while that is not the case with immutable maps. In this case the key is a subquery expression whose plan contains the `BatchScanExec`.
Please note that the mutability of `aliasMap` shouldn't be an issue as it is a `private` field of `AliasAwareOutputExpression` (though adding a simple `.toMap` would also help to avoid the NPE).
Based on the above findings this PR also proposes making `aliasMap` to transient as it isn't needed on executors.

A side quiestion is if adding any subqery expressions to `AliasAwareOutputExpression.aliasMap` makes any sense because `AliasAwareOutputExpression.projectExpression()` mainly projects `child.outputPartitioning` and `child.outputOrdering` that can't contain subquery expressions. But there are a few exceptions (`SortAggregateExec`, `TakeOrderedAndProjectExec`) where `AliasAwareQueryOutputOrdering.orderingExpressions` doesn't come from the `child` and actually leaving those expressions in the map doesn't do any harm.

### Why are the changes needed?
To fix regression introduced with #37525.

### Does this PR introduce _any_ user-facing change?
Yes, the query works again.

### How was this patch tested?
Added new UT.

Closes #40364 from peter-toth/SPARK-42745-improved-aliasawareoutputexpression-with-dsv2.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Mar 10, 2023
### What changes were proposed in this pull request?

After #37525 (SPARK-40086 / SPARK-42049) the following, simple subselect expression containing query:
```
select (select sum(id) from t1)
```
fails with:
```
09:48:57.645 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.NullPointerException
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch$lzycompute(BatchScanExec.scala:47)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch(BatchScanExec.scala:47)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.hashCode(BatchScanExec.scala:60)
	at scala.runtime.Statics.anyHash(Statics.java:122)
        ...
	at org.apache.spark.sql.catalyst.trees.TreeNode.hashCode(TreeNode.scala:249)
	at scala.runtime.Statics.anyHash(Statics.java:122)
	at scala.collection.mutable.HashTable$HashUtils.elemHashCode(HashTable.scala:416)
	at scala.collection.mutable.HashTable$HashUtils.elemHashCode$(HashTable.scala:416)
	at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:44)
	at scala.collection.mutable.HashTable.addEntry(HashTable.scala:149)
	at scala.collection.mutable.HashTable.addEntry$(HashTable.scala:148)
	at scala.collection.mutable.HashMap.addEntry(HashMap.scala:44)
	at scala.collection.mutable.HashTable.init(HashTable.scala:110)
	at scala.collection.mutable.HashTable.init$(HashTable.scala:89)
	at scala.collection.mutable.HashMap.init(HashMap.scala:44)
	at scala.collection.mutable.HashMap.readObject(HashMap.scala:195)
        ...
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:85)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1520)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
```
when DSv2 is enabled.

This PR proposes to fix `BatchScanExec` as its `equals()` and `hashCode()` as those shouldn't throw NPE in any circumstances.

But if we dig deeper we realize that the NPE orrurs since #37525 and the root cause of the problem is changing `AliasAwareOutputExpression.aliasMap` from immutable to mutable. The mutable map deserialization invokes the `hashCode()` of the keys while that is not the case with immutable maps. In this case the key is a subquery expression whose plan contains the `BatchScanExec`.
Please note that the mutability of `aliasMap` shouldn't be an issue as it is a `private` field of `AliasAwareOutputExpression` (though adding a simple `.toMap` would also help to avoid the NPE).
Based on the above findings this PR also proposes making `aliasMap` to transient as it isn't needed on executors.

A side quiestion is if adding any subqery expressions to `AliasAwareOutputExpression.aliasMap` makes any sense because `AliasAwareOutputExpression.projectExpression()` mainly projects `child.outputPartitioning` and `child.outputOrdering` that can't contain subquery expressions. But there are a few exceptions (`SortAggregateExec`, `TakeOrderedAndProjectExec`) where `AliasAwareQueryOutputOrdering.orderingExpressions` doesn't come from the `child` and actually leaving those expressions in the map doesn't do any harm.

### Why are the changes needed?
To fix regression introduced with #37525.

### Does this PR introduce _any_ user-facing change?
Yes, the query works again.

### How was this patch tested?
Added new UT.

Closes #40364 from peter-toth/SPARK-42745-improved-aliasawareoutputexpression-with-dsv2.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 93d5816)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
…and AliasAwareQueryOutputOrdering to take all aliases into account

### What changes were proposed in this pull request?
Currently `AliasAwareOutputPartitioning` and `AliasAwareQueryOutputOrdering`
takes only the last alias by aliased expressions into account. We could avoid some extra shuffles and sorts with better alias handling.

### Why are the changes needed?
Performance improvement and this also fix the issue in apache#39475.

### Does this PR introduce _any_ user-facing change?
Yes, this PR fixes the issue in apache#39475.

### How was this patch tested?
Added new UT.

Closes apache#37525 from peter-toth/SPARK-40086-fix-aliasawareoutputexpression.

Lead-authored-by: Peter Toth <peter.toth@gmail.com>
Co-authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 6341b06)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
…itioning

### What changes were proposed in this pull request?

This is a follow-up of apache#37525 . When the project list has aliases, we go to the `projectExpression` branch which filters away invalid partitioning/ordering that reference non-existing attributes in the current plan node. However, this filtering is missing when the project list has no alias, where we directly return the child partitioning/ordering.

This PR fixes it.

### Why are the changes needed?

to make sure we always return valid output partitioning/ordering.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new tests

Closes apache#40137 from cloud-fan/alias.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 72922ad)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
### What changes were proposed in this pull request?

After apache#37525 (SPARK-40086 / SPARK-42049) the following, simple subselect expression containing query:
```
select (select sum(id) from t1)
```
fails with:
```
09:48:57.645 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.NullPointerException
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch$lzycompute(BatchScanExec.scala:47)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch(BatchScanExec.scala:47)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.hashCode(BatchScanExec.scala:60)
	at scala.runtime.Statics.anyHash(Statics.java:122)
        ...
	at org.apache.spark.sql.catalyst.trees.TreeNode.hashCode(TreeNode.scala:249)
	at scala.runtime.Statics.anyHash(Statics.java:122)
	at scala.collection.mutable.HashTable$HashUtils.elemHashCode(HashTable.scala:416)
	at scala.collection.mutable.HashTable$HashUtils.elemHashCode$(HashTable.scala:416)
	at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:44)
	at scala.collection.mutable.HashTable.addEntry(HashTable.scala:149)
	at scala.collection.mutable.HashTable.addEntry$(HashTable.scala:148)
	at scala.collection.mutable.HashMap.addEntry(HashMap.scala:44)
	at scala.collection.mutable.HashTable.init(HashTable.scala:110)
	at scala.collection.mutable.HashTable.init$(HashTable.scala:89)
	at scala.collection.mutable.HashMap.init(HashMap.scala:44)
	at scala.collection.mutable.HashMap.readObject(HashMap.scala:195)
        ...
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:85)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1520)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
```
when DSv2 is enabled.

This PR proposes to fix `BatchScanExec` as its `equals()` and `hashCode()` as those shouldn't throw NPE in any circumstances.

But if we dig deeper we realize that the NPE orrurs since apache#37525 and the root cause of the problem is changing `AliasAwareOutputExpression.aliasMap` from immutable to mutable. The mutable map deserialization invokes the `hashCode()` of the keys while that is not the case with immutable maps. In this case the key is a subquery expression whose plan contains the `BatchScanExec`.
Please note that the mutability of `aliasMap` shouldn't be an issue as it is a `private` field of `AliasAwareOutputExpression` (though adding a simple `.toMap` would also help to avoid the NPE).
Based on the above findings this PR also proposes making `aliasMap` to transient as it isn't needed on executors.

A side quiestion is if adding any subqery expressions to `AliasAwareOutputExpression.aliasMap` makes any sense because `AliasAwareOutputExpression.projectExpression()` mainly projects `child.outputPartitioning` and `child.outputOrdering` that can't contain subquery expressions. But there are a few exceptions (`SortAggregateExec`, `TakeOrderedAndProjectExec`) where `AliasAwareQueryOutputOrdering.orderingExpressions` doesn't come from the `child` and actually leaving those expressions in the map doesn't do any harm.

### Why are the changes needed?
To fix regression introduced with apache#37525.

### Does this PR introduce _any_ user-facing change?
Yes, the query works again.

### How was this patch tested?
Added new UT.

Closes apache#40364 from peter-toth/SPARK-42745-improved-aliasawareoutputexpression-with-dsv2.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 93d5816)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
dongjoon-hyun pushed a commit that referenced this pull request Jan 5, 2024
…ingUnaryExecNode

### What changes were proposed in this pull request?

This is a followup of #37525 . When expanding the output partitioning/ordering with aliases, we have a threshold to avoid exponential explosion. However, we missed to apply this threshold in one place. This PR fixes it.

### Why are the changes needed?

to avoid OOM

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

new test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #44614 from cloud-fan/oom.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
dongjoon-hyun pushed a commit that referenced this pull request Jan 5, 2024
…ingUnaryExecNode

### What changes were proposed in this pull request?

This is a followup of #37525 . When expanding the output partitioning/ordering with aliases, we have a threshold to avoid exponential explosion. However, we missed to apply this threshold in one place. This PR fixes it.

### Why are the changes needed?

to avoid OOM

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

new test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #44614 from cloud-fan/oom.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit f8115da)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants