-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12656] [SQL] Implement Intersect with Left-semi Join #10630
Changes from 1 commit
01e4cdf
6835704
9180687
b38a21e
d2b84af
fda8025
ac0dccd
6e0018b
0546772
b37a64f
c2a872c
ab6dbd7
4276356
0bd1771
7bd102b
bfa99c5
cd23b03
100174a
9aad1cf
6742984
e4c34f0
2dab708
9864b3f
24cea7d
27192be
a932cdb
04a26bd
0458770
6a52e2b
f820c61
4372170
1debdfa
763706d
4de6ec1
9422a4f
52bdf48
1e95df3
d59b37b
6a7979d
fd87585
e566d79
3be78c4
e51de8f
b600089
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -40,6 +40,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { | |||||||||||
Batch("Aggregate", FixedPoint(100), | ||||||||||||
ReplaceDistinctWithAggregate, | ||||||||||||
RemoveLiteralFromGroupExpressions) :: | ||||||||||||
Batch("Intersect", FixedPoint(100), | ||||||||||||
ReplaceIntersectWithLeftSemi) :: | ||||||||||||
Batch("Operator Optimizations", FixedPoint(100), | ||||||||||||
// Operator push down | ||||||||||||
SetOperationPushDown, | ||||||||||||
|
@@ -93,18 +95,13 @@ object SamplePushDown extends Rule[LogicalPlan] { | |||||||||||
} | ||||||||||||
|
||||||||||||
/** | ||||||||||||
* Pushes certain operations to both sides of a Union, Intersect or Except operator. | ||||||||||||
* Pushes certain operations to both sides of a Union or Except operator. | ||||||||||||
* Operations that are safe to pushdown are listed as follows. | ||||||||||||
* Union: | ||||||||||||
* Right now, Union means UNION ALL, which does not de-duplicate rows. So, it is | ||||||||||||
* safe to pushdown Filters and Projections through it. Once we add UNION DISTINCT, | ||||||||||||
* we will not be able to pushdown Projections. | ||||||||||||
* | ||||||||||||
* Intersect: | ||||||||||||
* It is not safe to pushdown Projections through it because we need to get the | ||||||||||||
* intersect of rows by comparing the entire rows. It is fine to pushdown Filters | ||||||||||||
* with deterministic condition. | ||||||||||||
* | ||||||||||||
* Except: | ||||||||||||
* It is not safe to pushdown Projections through it because we need to get the | ||||||||||||
* intersect of rows by comparing the entire rows. It is fine to pushdown Filters | ||||||||||||
|
@@ -116,15 +113,15 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { | |||||||||||
* Maps Attributes from the left side to the corresponding Attribute on the right side. | ||||||||||||
*/ | ||||||||||||
private def buildRewrites(bn: BinaryNode): AttributeMap[Attribute] = { | ||||||||||||
assert(bn.isInstanceOf[Union] || bn.isInstanceOf[Intersect] || bn.isInstanceOf[Except]) | ||||||||||||
assert(bn.isInstanceOf[Union] || bn.isInstanceOf[Except]) | ||||||||||||
assert(bn.left.output.size == bn.right.output.size) | ||||||||||||
|
||||||||||||
AttributeMap(bn.left.output.zip(bn.right.output)) | ||||||||||||
} | ||||||||||||
|
||||||||||||
/** | ||||||||||||
* Rewrites an expression so that it can be pushed to the right side of a | ||||||||||||
* Union, Intersect or Except operator. This method relies on the fact that the output attributes | ||||||||||||
* Union or Except operator. This method relies on the fact that the output attributes | ||||||||||||
* of a union/intersect/except are always equal to the left child's output. | ||||||||||||
*/ | ||||||||||||
private def pushToRight[A <: Expression](e: A, rewrites: AttributeMap[Attribute]) = { | ||||||||||||
|
@@ -175,17 +172,6 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { | |||||||||||
p | ||||||||||||
} | ||||||||||||
|
||||||||||||
// Push down filter through INTERSECT | ||||||||||||
case Filter(condition, i @ Intersect(left, right)) => | ||||||||||||
val (deterministic, nondeterministic) = partitionByDeterministic(condition) | ||||||||||||
val rewrites = buildRewrites(i) | ||||||||||||
Filter(nondeterministic, | ||||||||||||
Intersect( | ||||||||||||
Filter(deterministic, left), | ||||||||||||
Filter(pushToRight(deterministic, rewrites), right) | ||||||||||||
) | ||||||||||||
) | ||||||||||||
|
||||||||||||
// Push down filter through EXCEPT | ||||||||||||
case Filter(condition, e @ Except(left, right)) => | ||||||||||||
val (deterministic, nondeterministic) = partitionByDeterministic(condition) | ||||||||||||
|
@@ -965,6 +951,27 @@ object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] { | |||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
/** | ||||||||||||
* Replaces logical [[Intersect]] operator with a left-semi [[Join]] operator. | ||||||||||||
* {{{ | ||||||||||||
* SELECT a1, a2 FROM Tab1 INTERSECT SELECT b1, b2 FROM Tab2 | ||||||||||||
* ==> SELECT a1, a2 FROM Tab1, Tab2 ON a1<=>b1 AND a2<=>b2 | ||||||||||||
* }}} | ||||||||||||
*/ | ||||||||||||
object ReplaceIntersectWithLeftSemi extends Rule[LogicalPlan] { | ||||||||||||
private def buildCond (left: LogicalPlan, right: LogicalPlan): Seq[Expression] = { | ||||||||||||
require(left.output.length == right.output.length) | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just inline this into the rule, since it is used only once? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you look into the analyzer to make sure we have the proper check there for intersect/except? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, will do it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it has a checking logic in analyzer: spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala Lines 186 to 190 in d084a2d
|
||||||||||||
left.output.zip(right.output).map { case (l, r) => | ||||||||||||
EqualNullSafe(l, r) | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use transformUp? cc @yhuai There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually nvm. |
||||||||||||
case Intersect(left, right) => | ||||||||||||
Join(left, right, LeftSemi, buildCond(left, right).reduceLeftOption(And)) | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
/** | ||||||||||||
* Removes literals from group expressions in [[Aggregate]], as they have no effect to the result | ||||||||||||
* but only makes the grouping key bigger. | ||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,8 @@ import scala.util.Random | |
import org.scalatest.Matchers._ | ||
|
||
import org.apache.spark.SparkException | ||
import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation | ||
import org.apache.spark.sql.catalyst.plans.LeftSemi | ||
import org.apache.spark.sql.catalyst.plans.logical.{Intersect, OneRowRelation, Join} | ||
import org.apache.spark.sql.execution.Exchange | ||
import org.apache.spark.sql.execution.aggregate.TungstenAggregate | ||
import org.apache.spark.sql.functions._ | ||
|
@@ -322,13 +323,32 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { | |
} | ||
|
||
test("intersect") { | ||
val intersectDF = lowerCaseData.intersect(lowerCaseData) | ||
|
||
// Before Optimizer, the operator is Intersect | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should go into one of the optimizer unit test suite, not here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, will add a new test suite for it. |
||
assert(intersectDF.queryExecution.analyzed.collect { | ||
case j@Intersect(_, _) => j | ||
}.size === 1) | ||
|
||
// Before Optimizer, the operator is converted to LeftSemi Join | ||
assert(intersectDF.queryExecution.optimizedPlan.collect { | ||
case j@Join(_, _, LeftSemi, _) => j | ||
}.size === 1) | ||
|
||
checkAnswer( | ||
lowerCaseData.intersect(lowerCaseData), | ||
intersectDF, | ||
Row(1, "a") :: | ||
Row(2, "b") :: | ||
Row(3, "c") :: | ||
Row(4, "d") :: Nil) | ||
checkAnswer(lowerCaseData.intersect(upperCaseData), Nil) | ||
|
||
checkAnswer( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. leave a line of comment on what this is testing (e.g. null equality). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, will do it. |
||
nullInts.intersect(nullInts), | ||
Row(1) :: | ||
Row(2) :: | ||
Row(3) :: | ||
Row(null) :: Nil) | ||
} | ||
|
||
test("udf") { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LeftSemi -> LeftSemiJoin or just SemiJoin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. Forgot to specify the join type