Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12656] [SQL] Implement Intersect with Left-semi Join #10630

Closed
wants to merge 44 commits into from

Conversation

gatorsmile
Copy link
Member

Our current Intersect physical operator simply delegates to RDD.intersect. We should remove the Intersect physical operator and simply transform a logical intersect into a semi-join with distinct. This way, we can take advantage of all the benefits of join implementations (e.g. managed memory, code generation, broadcast joins).

After a search, I found one of the mainstream RDBMS did the same. In their query explain, Intersect is replaced by Left-semi Join. Left-semi Join could help outer-join elimination in Optimizer, as shown in the PR: #10566

@gatorsmile
Copy link
Member Author

@rxin Please review the implementation. Thank you!

@rxin
Copy link
Contributor

rxin commented Jan 7, 2016

Which mainstream RDBMS is that?

* ==> SELECT a1, a2 FROM Tab1, Tab2 ON a1<=>b1 AND a2<=>b2
* }}}
*/
object ReplaceIntersectWithLeftSemi extends Rule[LogicalPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LeftSemi -> LeftSemiJoin or just SemiJoin

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. Forgot to specify the join type

@gatorsmile
Copy link
Member Author

MS SQL Server did that

@@ -322,13 +323,32 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}

test("intersect") {
val intersectDF = lowerCaseData.intersect(lowerCaseData)

// Before Optimizer, the operator is Intersect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should go into one of the optimizer unit test suite, not here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will add a new test suite for it.

@rxin
Copy link
Contributor

rxin commented Jan 7, 2016

LGTM.

cc @cloud-fan to take a look too.

@SparkQA
Copy link

SparkQA commented Jan 7, 2016

Test build #48900 has finished for PR 10630 at commit 0bd1771.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use transformUp?

cc @yhuai

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually nvm.

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Intersect(left, right) =>
val joinCond = left.output.zip(right.output).map { case (l, r) =>
EqualNullSafe(l, r) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we put it in one line?

@gatorsmile
Copy link
Member Author

When resolving the conflicts, I realized the multi-children Union might introduce duplicate exprId. So far, I did not add/change the corresponding function to de-duplicate them. This is not a trivial work, if needed. When Union has hundreds of children, it is infeasible to use the current per-pair de-duplication. That means, we need to rewrite the whole function dedupRight.

Let me know if we need to open a separate PR to do it now. So far, unlike Intersect, we did not hit any issue even if there exist duplicate exprId values in Union. Thanks!

@SparkQA
Copy link

SparkQA commented Jan 23, 2016

Test build #49936 has finished for PR 10630 at commit 6a7979d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

I don't think its a problem for there to be conflicting attribute ids for set operations, this is because only one child's attribute references need to be propagated up (unlike with a join).

@gatorsmile
Copy link
Member Author

Yeah, agree! Thank you!

@@ -125,17 +128,15 @@ object EliminateSerialization extends Rule[LogicalPlan] {

/**
* Pushes certain operations to both sides of a Union, Intersect or Except operator.
=======
* Pushes certain operations to both sides of a Union or Except operator.
>>>>>>> IntersectBySemiJoinMerged
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we need to remove this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, sure, will do.

@SparkQA
Copy link

SparkQA commented Jan 27, 2016

Test build #50176 has finished for PR 10630 at commit e566d79.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -111,6 +113,7 @@ object SamplePushDown extends Rule[LogicalPlan] {
}

/**
<<<<<<< HEAD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do.

@SparkQA
Copy link

SparkQA commented Jan 28, 2016

Test build #50257 has finished for PR 10630 at commit 3be78c4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 29, 2016

Test build #50313 has finished for PR 10630 at commit e51de8f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode

failAnalysis(
s"""
|Failure when resolving conflicting references in Join:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now we can keep this message as it only checks join :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can users observe the error? or it can be considered as an internal errors? BTW, we are about to convert it to an internal error in the PR: #41476

@cloud-fan
Copy link
Contributor

LGTM. we can merge it first and @gatorsmile can address remaining comments in a follow-up PR.

@rxin
Copy link
Contributor

rxin commented Jan 29, 2016

This is not that big. Let's just do it together here.

@gatorsmile
Copy link
Member Author

Thank you! Just cleaned the codes. : )

@cloud-fan
Copy link
Contributor

LGTM, pending test

@SparkQA
Copy link

SparkQA commented Jan 29, 2016

Test build #50368 has finished for PR 10630 at commit b600089.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Jan 29, 2016

Thanks - I'm going to merge this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
9 participants