Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8682][SQL][WIP] Range Join #7379

Closed
wants to merge 8 commits into from

Conversation

hvanhovell
Copy link
Contributor

...copied from JIRA (SPARK-8682):

Currently Spark SQL uses a Broadcast Nested Loop join (or a filtered Cartesian Join) when it has to execute the following range query:

SELECT A.*,
       B.*
FROM   tableA A
       JOIN tableB B
        ON A.start <= B.end
         AND A.end > B.start

This is horribly inefficient. The performance of this query can be greatly improved, when one of the tables can be broadcasted, by creating a range index. A range index is basically a sorted map containing the rows of the smaller table, indexed by both the high and low keys. using this structure the complexity of the query would go from O(N * M) to O(N * 2 * LOG(M)), N = number of records in the larger table, M = number of records in the smaller (indexed) table.

This is currently a work in progress. I will be adding more tests and a small benchmark in the next couple of days. If you want to try this out, set the spark.sql.planner.rangeJoin option to true in the SQL configuration.

@marmbrus
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Jul 14, 2015

Test build #37182 has finished for PR 7379 at commit d2bd793.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BroadcastRangeJoin(

@SparkQA
Copy link

SparkQA commented Jul 14, 2015

Test build #37193 has finished for PR 7379 at commit 6727807.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BroadcastRangeJoin(

@hvanhovell
Copy link
Contributor Author

Current test errors are a bit weird. They shouldn't have been caused by this change, because the functionality is disabled by default.

Rebased to most recent master. See if this helps.

@SparkQA
Copy link

SparkQA commented Jul 14, 2015

Test build #37229 has finished for PR 7379 at commit 773c009.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Least(children: Seq[Expression]) extends Expression
    • case class Greatest(children: Seq[Expression]) extends Expression
    • case class BroadcastRangeJoin(

@marmbrus
Copy link
Contributor

This looks pretty cool! I can try and do a more through review in a bit, but a few testing suggestions:

It would be great to add a test for the query planner PlannerSuite
I would also add some unit tests for the operator itself, probably a new class modeled after OuterJoinSuite

@SparkQA
Copy link

SparkQA commented Jul 16, 2015

Test build #37448 has finished for PR 7379 at commit b405e45.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BroadcastRangeJoin(

@SparkQA
Copy link

SparkQA commented Jul 16, 2015

Test build #37456 has finished for PR 7379 at commit 8204eae.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BroadcastRangeJoin(

private[this] var iterator: Iterator[InternalRow] = Iterator.empty

override final def hasNext: Boolean = {
var result = iterator.hasNext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thus it's very rare case, but we cannot assume that user will not call the hasNext multiple times before call the next().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple calls to hasNext shouldn't be a problemen. Granted the first call can have a side effect (updating the state of the iterator), but the subsequent ones won't.

A problem will occur when the next is called without calling hasNext first. I was inspired by the HashedRelation class in the same package when writing this.

@chenghao-intel
Copy link
Contributor

This is a very interesting optimization, but will it be more general if we consider that with the SortMergeJoin? As well as the case like:

SELECT A.*,
       B.*
FROM   tableA A
       JOIN tableB B
        ON A.start <= B.start

@hvanhovell
Copy link
Contributor Author

The <= case is quite easy to implement.

This implementation is currently targetted at range joining a rather small (broadcastable) to an arbitrarily large table. I don't think this matches the use case of SMJ: i.e. equi joining arbitrarily large tables. But I might be missing something?

override final def hasNext: Boolean = {
var result = iterator.hasNext
while (!result && stream.hasNext) {
row = stream.next()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, actually I mean here, we probably skip some rows if we call hasNext multiple times.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. The current (WIP) implementation only allows for inner joins, and we drop rows if they don't have a match in the index. Outer Joins are possible, however BuildSide Outerjoins will require a bit of bookkeeping.

@chenghao-intel
Copy link
Contributor

Sorry, I shouldn't use the word SMJ.
I mean if we are planning to improve the performance of RangeJoin, probably we can think of it in a more general way, not just binary ConjunctivePredicates, but also for arbitrary-nary predicates.

e.g.
join on a.key < b.key
join on a.key < b.key and a.key2 > b.key2 and a.key3>=b.key3 ...

I just have kind of feeling, maybe it will be more helpful/easy if we specify the same RangePartitioner (range key as the sorted key) for table(s), and then make the Cartesian Product for partitions.

And the binary BroadcastRangeJoin just a specific case, and definitely lots of code can be shared (like the binary search for the closest tuple).

Sorry if I misunderstood something.

@hvanhovell
Copy link
Contributor Author

No problem.

Supporting N-Ary Predicates.

In order to make the range join work we need the predicates to define a single interval for each side of the join. For instance the clause: a.low < b.high && b.low < a.high implies that there are two intervals: [a.low, a.high] & [b.low, b.high]. An open interval, for instance a.low < b.high, would also work.

When we use more than two clauses, we can potentially have multiple intervals, in your example for instance a.key < b.key and a.key2 > b.key2 and a.key3>=b.key3 would yield the following intervals: [a.key1, a.key2], [a.key1, a.key3], [b.key2, b.key1] & [b.key2, b.key3]. Creating a working index, that can deal with the (partially) uncorrelated intervals, will be quite a challenge (I haven't really looked into this yet). We could offcourse pick join on one pair of intervals and use filtering to take of the rest.

I think the Unary and Binary cases are the most common. Let's start there, and see if there is demand for N-ary designs.

Generalization

If you consider the fact that we are joining intervals (Ranges if you will), range partitioning will not work because this assumes both intervals will be entirely in the same partition (they can span multiple partitions). When dealing with larger tables we would have to use a special interval-aware partitioning, this would create partitions for a number of fully covering non-overlapping intervals, and would multicast the rows to each interval it belongs to. The subsequent step would be using an index or doing a cartesian/BNL join.

Doing a Cartesian Join in a single partition performs horrible. I thought it wouldn't be a problem either, but this completely killed the performance of an analysis I was doing for a client (account balances at specific dates).

I do see opportunities for code re-use. But this would be by generalizing HashedRelation and the BroadCast join family.

val ordering = TypeUtils.getOrdering(buildKeys.head.dataType)

// Note that we use .execute().collect() because we don't want to convert data to Scala types
// TODO find out if the result of a sort and a collect is still sorted.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be

@marmbrus
Copy link
Contributor

marmbrus commented Sep 3, 2015

@hvanhovell thanks for working on this! To keep the PR queue manageable I propose we close this issue for now until you have time to bring it up to date and remove the WIP tag.

@asfgit asfgit closed this in 804a012 Sep 4, 2015
@saj1th
Copy link

saj1th commented Jan 19, 2017

Facing huge performance issues with range joins. Hoping to see this implemented

@zzeekk
Copy link

zzeekk commented Feb 16, 2017

Same here. A Workaround is to build blocks and add them as equi-join condition. But then you need to make an additional join on the following block und coalesce the results.

@IceMan81
Copy link

IceMan81 commented Jun 6, 2017

@zzeekk Would you mind explaining how your workaround works.

A Workaround is to build blocks and add them as equi-join condition
Not sure I understand what you are suggesting here.

@marmbrus Inability to do range join efficiently results in very poor performance. Are there plans on addressing this directly in an upcoming release? I've scenarios where the optimizer sorts the results into the single partition for the join (all other partitions are empty) because the sort does not include the columns in the range condition. And this task will run for more than a day which a forced broadcast version of it will run in 3 hours. And here I'm only able to do the boradcast because I'm using a smaller data set on one side of the join.

@zzeekk
Copy link

zzeekk commented Jun 9, 2017

@IceMan81 Here is an abstract example of our workaround, building blocks as additional equi-join conditions.
The task is to join Points (df1) on a line back to Segments of the same line (df2): df1.id_line=df2.id_line and df1.position>=df2.position_from and df1.position<df2.position_to
By building blocks for the position, we can add a more detailed equi-join condition, but need to join twice and coalesce results to catch the case when position_from is in a different block than position. Additional note: block size (/10) must be adapted to expected data:

  df1.as("df1")
  .join( df2.as("df2a"), $"df1.id_line"===$"df2a.id_line" and floor($"df1.position"/10)===floor($"df2a.position_from"/10) 
                           and $"df1.position">=$"df2a.position_from" and $"df1.position"<$"df2a.position_to", "left")
  .join( df2.as("df2b"), $"df1.id_line"===$"df2b.id_line" and floor($"df1.position"/10)===floor($"df2b.position_to"/10) 
                           and $"df1.position">=$"df2b.position_from" and $"df1.position"<$"df2b.position_to", "left")

@IceMan81
Copy link

@zzeekk Okay, I get the idea. But, what would you do for timestamp ranges; how would you get additional equi-join conditions. The idea of floor($"df1.position"/10)===floor($"df2a.position_from"/10) (or in the case of timestamps "df1.timestamp" - interval === $"df21.timestamp" - interval ) wouldn't apply as you may have not have timestamps that match that condition.

@zzeekk
Copy link

zzeekk commented Jun 19, 2017

Hello @IceMan81, you need to truncate your timestamps to days, hours or mins depending on your use case, and use that for the additional equi-join condition.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants