Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27359] [OPTIMIZER] [SQL] Rewrite ArraysOverlap Join #24563

Closed
wants to merge 2 commits into from
Closed

[SPARK-27359] [OPTIMIZER] [SQL] Rewrite ArraysOverlap Join #24563

wants to merge 2 commits into from

Conversation

nvander1
Copy link
Contributor

@nvander1 nvander1 commented May 9, 2019

What changes were proposed in this pull request?

An optimization for joins on a condition of arrays_overlap. I believe this worthwhile to integrate into Spark due to the recent release of several new array functions in Spark 2.4. This optimization will allow
users to make better use of the arrays overlap function. The technique proposed in the patch can also be trivially extended to joins with a condition involving array_contains.

The following code will produce a cartesian product in the physical plans.

import spark.implicits._
import org.apache.spark.sql.functions._

val a = Seq((Seq(1, 2, 3), "one")).toDF("num", "name")
val b = Seq((Seq(1, 5), "two")).toDF("num", "name")
val j = a.join(b, arrays_overlap(b("num"), a("num")))
j.explain(true)
== Parsed Logical Plan ==
Join Inner, arrays_overlap(num#158, num#149)
:- Project [_1#146 AS num#149, _2#147 AS name#150]
:  +- LocalRelation [_1#146, _2#147]
+- Project [_1#155 AS num#158, _2#156 AS name#159]
   +- LocalRelation [_1#155, _2#156]

== Analyzed Logical Plan ==
num: array<int>, name: string, num: array<int>, name: string
Join Inner, arrays_overlap(num#158, num#149)
:- Project [_1#146 AS num#149, _2#147 AS name#150]
:  +- LocalRelation [_1#146, _2#147]
+- Project [_1#155 AS num#158, _2#156 AS name#159]
   +- LocalRelation [_1#155, _2#156]

== Optimized Logical Plan ==
Join Inner, arrays_overlap(num#158, num#149)
:- LocalRelation [num#149, name#150]
+- LocalRelation [num#158, name#159]

== Physical Plan ==
CartesianProduct arrays_overlap(num#158, num#149)
:- LocalTableScan [num#149, name#150]
+- LocalTableScan [num#158, name#159]

This is unacceptable for joins on large datasets.
The query can be written into an equivalent equijoin by:

  1. exploding the arrays
  2. joining on the exploded columns
  3. dropping the exploded columns on the joined data
  4. removing duplicates from the result of 3)

Doing so will bring a query that might otherwise never complete, down to a reasonable time.

== Parsed Logical Plan ==
Join Inner, arrays_overlap(num#158, num#149)
:- Project [_1#146 AS num#149, _2#147 AS name#150]
:  +- LocalRelation [_1#146, _2#147]
+- Project [_1#155 AS num#158, _2#156 AS name#159]
   +- LocalRelation [_1#155, _2#156]

== Analyzed Logical Plan ==
num: array<int>, name: string, num: array<int>, name: string
Join Inner, arrays_overlap(num#158, num#149)
:- Project [_1#146 AS num#149, _2#147 AS name#150]
:  +- LocalRelation [_1#146, _2#147]
+- Project [_1#155 AS num#158, _2#156 AS name#159]
   +- LocalRelation [_1#155, _2#156]

== Optimized Logical Plan ==
Aggregate [1], [first(num#149, false) AS num#149, first(name#150, false) AS name#150, first(num#158, false) AS num#158, first(name#159, false) AS name#159]
+- Project [num#149, name#150, num#158, name#159]
   +- Join Inner, (explode_larr#178 = explode_rarr#180)
      :- Project [num#149, name#150, explode_larr#178]
      :  +- Generate explode(num#149), false, [explode_larr#178]
      :     +- LocalRelation [num#149, name#150]
      +- Project [num#158, name#159, explode_rarr#180]
         +- Generate explode(num#158), false, [explode_rarr#180]
            +- LocalRelation [num#158, name#159]

== Physical Plan ==
SortAggregate(key=[1#185], functions=[finalmerge_first(merge first#188, valueSet#189) AS first(num#149)()#181, finalmerge_first(merge first#192, valueSet#193) AS first(name#150)()#182, finalmerge_first(merge first#196, valueSet#197) AS first(num#158)()#183, finalmerge_first(merge first#200, valueSet#201) AS first(name#159)()#184], output=[num#149, name#150, num#158, name#159])
+- Sort [1#185 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(1#185, 200)
      +- SortAggregate(key=[1 AS 1#185], functions=[partial_first(num#149, false) AS (first#188, valueSet#189), partial_first(name#150, false) AS (first#192, valueSet#193), partial_first(num#158, false) AS (first#196, valueSet#197), partial_first(name#159, false) AS (first#200, valueSet#201)], output=[1#185, first#188, valueSet#189, first#192, valueSet#193, first#196, valueSet#197, first#200, valueSet#201])
         +- *(3) Sort [1 AS 1#185 ASC NULLS FIRST], false, 0
            +- *(3) Project [num#149, name#150, num#158, name#159]
               +- *(3) SortMergeJoin [explode_larr#178], [explode_rarr#180], Inner
                  :- Sort [explode_larr#178 ASC NULLS FIRST], false, 0
                  :  +- Exchange hashpartitioning(explode_larr#178, 200)
                  :     +- *(1) Project [num#149, name#150, explode_larr#178]
                  :        +- *(1) Generate explode(num#149), [num#149, name#150], false, [explode_larr#178]
                  :           +- LocalTableScan [num#149, name#150]
                  +- Sort [explode_rarr#180 ASC NULLS FIRST], false, 0
                     +- Exchange hashpartitioning(explode_rarr#180, 200)
                        +- *(2) Project [num#158, name#159, explode_rarr#180]
                           +- *(2) Generate explode(num#158), [num#158, name#159], false, [explode_rarr#180]
                              +- LocalTableScan [num#158, name#159]

How was this patch tested?

This patch has only been tested via manual tests on a large dataset.
I've used the technique implemented by this patch to perform similar joins with ~300 million records on either side of the join. If you agree that this is a worthwhile optimization, I'll happily contribute some unit tests to ensure the robustness of the optimization.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your work!

I have few questions for now:

  1. Do you have some benchmark numbers regarding this optimization?
  2. This is exploding the arrays. So if the length of arrays is long, is it big impact on the perf?

@viirya
Copy link
Member

viirya commented May 9, 2019

Btw, the PR title is empty, currently. Could you write a proper title for this work?

@nvander1 nvander1 changed the title [SPARK-27359] [OPTIMIZER] [SQL] [SPARK-27359] [OPTIMIZER] [SQL] Rewrite ArraysOverlap Join May 9, 2019
@nvander1
Copy link
Contributor Author

nvander1 commented May 9, 2019

@viirya

Oops, thanks for pointing out the missing title! :)

I’ve only used this when the size of the arrays is several orders of magnitude less than the number of records on the largest side of the join. I don’t have any benchmarks to back this up yet (I’ll do some experiments and post the result here).

An assumption is that the number of items in the largest array is several orders of magnitude less than the number of records on either side of the join. This feels similar to how the replication factor used to optimize skew joins is also small.

@nvander1
Copy link
Contributor Author

nvander1 commented May 9, 2019

Re: benchmarks. This is only anecdotal, but I’ve used this technique at work to bring a join that ran for a day without making progress down to only a few hours. As part of the experiments I mentioned above, I’ll try to make some dummy data that was similar to that use case.

val (leftArray, rightArray) =
if (isIn(left, arrA) && isIn(right, arrB)) {
(arrA, arrB)
} else { // other cases would be caught be the analyzer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: by the analyzer?

private def isIn(p: LogicalPlan, e: Expression) = p.output.map(_.expr).contains(e)

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Join(left, right, joinType, Some(ArraysOverlap(arrA: NamedExpression, arrB: NamedExpression))) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may fail the scala style test?

val (leftAlias, rightAlias) = ("explode_larr", "explode_rarr")
val (leftPrime, leftExp) = makePrime(left, leftArray, leftAlias)
val (rightPrime, rightExp) = makePrime(right, rightArray, rightAlias)
val joined = Join(leftPrime, rightPrime, joinType, Some(leftExp === rightExp))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember we usually use EqualTo(leftExp, rightExp) instead of the dsl here.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Dec 30, 2019
@github-actions github-actions bot closed this Dec 31, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants