Skip to content

[SPARK-37597][WIP] Deduplicate the right side of left-semi join and left-anti join#34850

Closed
zhengruifeng wants to merge 2 commits into
apache:masterfrom
zhengruifeng:semi_join_distinct
Closed

[SPARK-37597][WIP] Deduplicate the right side of left-semi join and left-anti join#34850
zhengruifeng wants to merge 2 commits into
apache:masterfrom
zhengruifeng:semi_join_distinct

Conversation

@zhengruifeng
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Deduplicate the right side of left-semi join and left-anti join

Why are the changes needed?

1, reduce the shuffle amount in the right side;
2, improve the chance to broadcast the right side;
3, reslove skewed keys in the right side;

Does this PR introduce any user-facing change?

No

How was this patch tested?

added testsuits

@github-actions github-actions Bot added the SQL label Dec 9, 2021
RewriteCorrelatedScalarSubquery,
RewriteLateralSubquery,
EliminateSerialization,
DeduplicateLeftSemiLeftAntiRightSide,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should after RewriteSubquery to cover more cases.

@zhengruifeng
Copy link
Copy Markdown
Contributor Author

test case:

spark.range(0, 10000, 1, 10).selectExpr("id % 1000 as key1", "id % 3 as value1").createOrReplaceTempView("table1")

spark.range(0, 10000000, 1, 10).selectExpr("id % 100 as key2", "id as value2").createOrReplaceTempView("table2")

spark.sql("SELECT key1 FROM table1 LEFT ANTI JOIN table2 ON key1 = key2").write.mode("overwrite").parquet("/tmp/tmp1.parquet")

master:

== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand (22)
+- AdaptiveSparkPlan (21)
   +- == Final Plan ==
      * Project (14)
      +- * SortMergeJoin LeftAnti (13)
         :- * Sort (5)
         :  +- AQEShuffleRead (4)
         :     +- ShuffleQueryStage (3)
         :        +- Exchange (2)
         :           +- * Range (1)
         +- * Sort (12)
            +- AQEShuffleRead (11)
               +- ShuffleQueryStage (10)
                  +- Exchange (9)
                     +- * Project (8)
                        +- * Filter (7)
                           +- * Range (6)



this pr:

== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand (25)
+- AdaptiveSparkPlan (24)
   +- == Final Plan ==
      * Project (16)
      +- * BroadcastHashJoin LeftAnti BuildRight (15)
         :- AQEShuffleRead (4)
         :  +- ShuffleQueryStage (3)
         :     +- Exchange (2)
         :        +- * Range (1)
         +- BroadcastQueryStage (14)
            +- BroadcastExchange (13)
               +- * HashAggregate (12)
                  +- AQEShuffleRead (11)
                     +- ShuffleQueryStage (10)
                        +- Exchange (9)
                           +- * HashAggregate (8)
                              +- * Project (7)
                                 +- * Filter (6)
                                    +- * Range (5)

Comment on lines +2158 to +2159
case join @ Join(_, right, LeftSemiOrAnti(_), _, _) if !right.isInstanceOf[Aggregate] =>
join.copy(right = Aggregate(right.output, right.output, right))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deduplicate is not always has benefit. This is my initial PR: b559901

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, there are some cases that had not been taken into account in this PR. I think b559901 is much better. I will close this one.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Dec 9, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50513/

@SparkQA
Copy link
Copy Markdown

SparkQA commented Dec 9, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50513/

@SparkQA
Copy link
Copy Markdown

SparkQA commented Dec 9, 2021

Test build #146038 has finished for PR 34850 at commit 49dd1ad.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng zhengruifeng deleted the semi_join_distinct branch December 10, 2021 02:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants