Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35352][SQL] Add code-gen for full outer sort merge join #34581

Closed
wants to merge 7 commits into from

Conversation

c21
Copy link
Contributor

@c21 c21 commented Nov 13, 2021

What changes were proposed in this pull request?

This PR is to add code-gen for FULL OUTER sort merge join. The change is in SortMergeJoinExec.scala:codegenFullOuter(). Followed the same algorithm in iterator mode - SortMergeFullOuterJoinScanner: maintain buffer for join left and right sides, and iterate over matched rows in buffers.

Example query:

val df1 = spark.range(5).select($"id".as("k1"))
val df2 = spark.range(10).select($"id".as("k2"))
df1.join(df2.hint(hint), $"k1" === $"k2" % 3 && $"k1" + 3 =!= $"k2", "full_outer")

Example generated code: https://gist.github.com/c21/5cab9751f24ae448d77a259d28cb77d7

In addition, to help review as this PR triggers several TPCDS plan files change. The below files are having the real code change:

  • SortMergeJoinExec.scala
  • WholeStageCodegenSuite.scala

All other files are auto-generated golden file plan changes for TPCDS queries.

Why are the changes needed?

Improve the run-time/CPU performance of FULL OUTER sort merge join.

Micro benchmark (same query in JoinBenchmark.scala):

  def sortMergeJoin(): Unit = {
    val N = 2 << 20
    codegenBenchmark("sort merge join", N) {
      val df1 = spark.range(N).selectExpr(s"id * 2 as k1")
      val df2 = spark.range(N).selectExpr(s"id * 3 as k2")
      val df = df1.join(df2, col("k1") === col("k2"), "full_outer")
      assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
      df.noop()
    }
  }

  def sortMergeJoinWithDuplicates(): Unit = {
    val N = 2 << 20
    codegenBenchmark("sort merge join with duplicates", N) {
      val df1 = spark.range(N)
        .selectExpr(s"(id * 15485863) % ${N*10} as k1")
      val df2 = spark.range(N)
        .selectExpr(s"(id * 15485867) % ${N*10} as k2")
      val df = df1.join(df2, col("k1") === col("k2"), "full_outer")
      assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
      df.noop()
    }
  }

Seeing 20-30% of run-time improvement:

Running benchmark: sort merge join
  Running case: sort merge join wholestage off
  Stopped after 2 iterations, 2979 ms
  Running case: sort merge join wholestage on
  Stopped after 5 iterations, 5849 ms

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.16
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
sort merge join:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
sort merge join wholestage off                     1453           1490          52          1.4         693.0       1.0X
sort merge join wholestage on                      1115           1170          43          1.9         531.6       1.3X

Running benchmark: sort merge join with duplicates
  Running case: sort merge join with duplicates wholestage off
  Stopped after 2 iterations, 3236 ms
  Running case: sort merge join with duplicates wholestage on
  Stopped after 5 iterations, 6768 ms

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.16
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
sort merge join with duplicates:                Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------------
sort merge join with duplicates wholestage off           1609           1618          13          1.3         767.2       1.0X
sort merge join with duplicates wholestage on            1330           1354          24          1.6         634.4       1.2X

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  • Added unit test in WholeStageCodegenSuite.scala.
  • Existing unit test in OuterJoinSuite.scala.

@github-actions github-actions bot added the SQL label Nov 13, 2021
@SparkQA
Copy link

SparkQA commented Nov 13, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49655/

@SparkQA
Copy link

SparkQA commented Nov 13, 2021

Test build #145186 has finished for PR 34581 at commit f712f74.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 13, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49655/

@SparkQA
Copy link

SparkQA commented Nov 13, 2021

Test build #145194 has finished for PR 34581 at commit aa6ebd9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 14, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49663/

@SparkQA
Copy link

SparkQA commented Nov 14, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49663/

@SparkQA
Copy link

SparkQA commented Nov 14, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49664/

@SparkQA
Copy link

SparkQA commented Nov 14, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49664/

@SparkQA
Copy link

SparkQA commented Nov 14, 2021

Test build #145195 has finished for PR 34581 at commit e8b2477.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 14, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49676/

@SparkQA
Copy link

SparkQA commented Nov 14, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49676/

@SparkQA
Copy link

SparkQA commented Nov 14, 2021

Test build #145207 has finished for PR 34581 at commit c846656.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Nov 15, 2021

cc @cloud-fan could you help take a look when you have time? Thanks.

@SparkQA
Copy link

SparkQA commented Nov 15, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49695/

val rightIndex = ctx.freshName("rightIndex")

// Generate code for join condition
// val leftVars = genOneSideJoinVars(ctx, leftOutputRow, left, setDefaultValue = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we remove this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - ah sorry, was added during debugging, removed.

val leftAnyNull = leftKeyVars.map(_.isNull).mkString(" || ")
val rightKeyVars = createJoinKey(ctx, rightInputRow, rightKeys, right.output)
val rightAnyNull = rightKeyVars.map(_.isNull).mkString(" || ")
val matchedKeyVars = copyKeys(ctx, leftKeyVars)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why it's only related to left keys?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see, it's matched, so left and right keys are the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - yeah, it can be copying from either leftKeyVars or rightKeyVars.

@SparkQA
Copy link

SparkQA commented Nov 15, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49695/

@SparkQA
Copy link

SparkQA commented Nov 15, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49696/

@SparkQA
Copy link

SparkQA commented Nov 15, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49696/

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 2ef60f7 Nov 15, 2021
@SparkQA
Copy link

SparkQA commented Nov 15, 2021

Test build #145225 has finished for PR 34581 at commit 23eedba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Nov 15, 2021

Thank you @cloud-fan for review!

@c21 c21 deleted the smj-codegen branch November 15, 2021 12:20
@SparkQA
Copy link

SparkQA commented Nov 15, 2021

Test build #145226 has finished for PR 34581 at commit 34830f2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants