Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32753][SQL][3.0] Only copy tags to node with no tags #29665

Closed
wants to merge 2 commits into from

Conversation

manuzhang
Copy link
Contributor

This PR backports #29593 to branch-3.0

What changes were proposed in this pull request?

Only copy tags to node with no tags when transforming plans.

Why are the changes needed?

@cloud-fan made a good point that it doesn't make sense to append tags to existing nodes when nodes are removed. That will cause such bugs as duplicate rows when deduplicating and repartitioning by the same column with AQE.

spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
val df = spark.sql("select id from v1 group by id distribute by id") 
println(df.collect().toArray.mkString(","))
println(df.queryExecution.executedPlan)

// With AQE
[4],[0],[3],[2],[1],[7],[6],[8],[5],[9],[4],[0],[3],[2],[1],[7],[6],[8],[5],[9]
AdaptiveSparkPlan(isFinalPlan=true)
+- CustomShuffleReader local
   +- ShuffleQueryStage 0
      +- Exchange hashpartitioning(id#183L, 10), true
         +- *(3) HashAggregate(keys=[id#183L], functions=[], output=[id#183L])
            +- Union
               :- *(1) Range (0, 10, step=1, splits=2)
               +- *(2) Range (0, 10, step=1, splits=2)

// Without AQE
[4],[7],[0],[6],[8],[3],[2],[5],[1],[9]
*(4) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
+- Exchange hashpartitioning(id#206L, 10), true
   +- *(3) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
      +- Union
         :- *(1) Range (0, 10, step=1, splits=2)
         +- *(2) Range (0, 10, step=1, splits=2)

It's too expensive to detect node removal so we make a compromise only to copy tags to node with no tags.

Does this PR introduce any user-facing change?

Yes. Fix a bug.

How was this patch tested?

Add test.

@SparkQA
Copy link

SparkQA commented Sep 7, 2020

Test build #128366 has finished for PR 29665 at commit be84096.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

test("SPARK-32753: Only copy tags to node with no tags") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true"
) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation?

withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. BTW, if this indentation style must be followed, should it be added to checkstyle test ?

@dongjoon-hyun
Copy link
Member

@manuzhang and @cloud-fan . If this is a kind of a correctness issue, could you add a label to JIRA, please?

withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true"
) {
spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use withTempView("v1")?

@cloud-fan
Copy link
Contributor

label added.

@SparkQA
Copy link

SparkQA commented Sep 8, 2020

Test build #128374 has finished for PR 29665 at commit 15b1673.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 8, 2020

Test build #128387 has finished for PR 29665 at commit 6973697.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to 3.0!

cloud-fan pushed a commit that referenced this pull request Sep 8, 2020
This PR backports #29593 to branch-3.0

### What changes were proposed in this pull request?
Only copy tags to node with no tags when transforming plans.

### Why are the changes needed?
cloud-fan [made a good point](#29593 (comment)) that it doesn't make sense to append tags to existing nodes when nodes are removed. That will cause such bugs as duplicate rows when deduplicating and repartitioning by the same column with AQE.

```
spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
val df = spark.sql("select id from v1 group by id distribute by id")
println(df.collect().toArray.mkString(","))
println(df.queryExecution.executedPlan)

// With AQE
[4],[0],[3],[2],[1],[7],[6],[8],[5],[9],[4],[0],[3],[2],[1],[7],[6],[8],[5],[9]
AdaptiveSparkPlan(isFinalPlan=true)
+- CustomShuffleReader local
   +- ShuffleQueryStage 0
      +- Exchange hashpartitioning(id#183L, 10), true
         +- *(3) HashAggregate(keys=[id#183L], functions=[], output=[id#183L])
            +- Union
               :- *(1) Range (0, 10, step=1, splits=2)
               +- *(2) Range (0, 10, step=1, splits=2)

// Without AQE
[4],[7],[0],[6],[8],[3],[2],[5],[1],[9]
*(4) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
+- Exchange hashpartitioning(id#206L, 10), true
   +- *(3) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
      +- Union
         :- *(1) Range (0, 10, step=1, splits=2)
         +- *(2) Range (0, 10, step=1, splits=2)
```

It's too expensive to detect node removal so we make a compromise only to copy tags to node with no tags.

### Does this PR introduce any user-facing change?
Yes. Fix a bug.

### How was this patch tested?
Add test.

Closes #29665 from manuzhang/spark-32753-3.0.

Authored-by: manuzhang <owenzhang1990@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor

cloud-fan commented Sep 8, 2020

@manuzhang can you create a new PR against master to add the new code updates from this PR?

@manuzhang
Copy link
Contributor Author

@dongjoon-hyun @cloud-fan thanks for review. #29682 has been opened as follow-up against master.

@dongjoon-hyun
Copy link
Member

Thank you, @cloud-fan and @manuzhang .

HyukjinKwon pushed a commit that referenced this pull request Sep 9, 2020
### What changes were proposed in this pull request?
Fix indentation and clean up view in the test added by #29593.

### Why are the changes needed?
Address review comments in #29665.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Updated test.

Closes #29682 from manuzhang/spark-32753-followup.

Authored-by: manuzhang <owenzhang1990@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
holdenk pushed a commit to holdenk/spark that referenced this pull request Oct 27, 2020
This PR backports apache#29593 to branch-3.0

### What changes were proposed in this pull request?
Only copy tags to node with no tags when transforming plans.

### Why are the changes needed?
cloud-fan [made a good point](apache#29593 (comment)) that it doesn't make sense to append tags to existing nodes when nodes are removed. That will cause such bugs as duplicate rows when deduplicating and repartitioning by the same column with AQE.

```
spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
val df = spark.sql("select id from v1 group by id distribute by id")
println(df.collect().toArray.mkString(","))
println(df.queryExecution.executedPlan)

// With AQE
[4],[0],[3],[2],[1],[7],[6],[8],[5],[9],[4],[0],[3],[2],[1],[7],[6],[8],[5],[9]
AdaptiveSparkPlan(isFinalPlan=true)
+- CustomShuffleReader local
   +- ShuffleQueryStage 0
      +- Exchange hashpartitioning(id#183L, 10), true
         +- *(3) HashAggregate(keys=[id#183L], functions=[], output=[id#183L])
            +- Union
               :- *(1) Range (0, 10, step=1, splits=2)
               +- *(2) Range (0, 10, step=1, splits=2)

// Without AQE
[4],[7],[0],[6],[8],[3],[2],[5],[1],[9]
*(4) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
+- Exchange hashpartitioning(id#206L, 10), true
   +- *(3) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
      +- Union
         :- *(1) Range (0, 10, step=1, splits=2)
         +- *(2) Range (0, 10, step=1, splits=2)
```

It's too expensive to detect node removal so we make a compromise only to copy tags to node with no tags.

### Does this PR introduce any user-facing change?
Yes. Fix a bug.

### How was this patch tested?
Add test.

Closes apache#29665 from manuzhang/spark-32753-3.0.

Authored-by: manuzhang <owenzhang1990@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
rshkv pushed a commit to palantir/spark that referenced this pull request Feb 17, 2021
### What changes were proposed in this pull request?
Fix indentation and clean up view in the test added by apache#29593.

### Why are the changes needed?
Address review comments in apache#29665.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Updated test.

Closes apache#29682 from manuzhang/spark-32753-followup.

Authored-by: manuzhang <owenzhang1990@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
rshkv pushed a commit to palantir/spark that referenced this pull request Feb 19, 2021
### What changes were proposed in this pull request?
Fix indentation and clean up view in the test added by apache#29593.

### Why are the changes needed?
Address review comments in apache#29665.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Updated test.

Closes apache#29682 from manuzhang/spark-32753-followup.

Authored-by: manuzhang <owenzhang1990@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants