Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12656] [SQL] Implement Intersect with Left-semi Join #10630

Closed
wants to merge 44 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
01e4cdf
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 13, 2015
6835704
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
9180687
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
b38a21e
SPARK-11633
gatorsmile Nov 17, 2015
d2b84af
Merge remote-tracking branch 'upstream/master' into joinMakeCopy
gatorsmile Nov 17, 2015
fda8025
Merge remote-tracking branch 'upstream/master'
gatorspark Nov 17, 2015
ac0dccd
Merge branch 'master' of https://github.com/gatorsmile/spark
gatorspark Nov 17, 2015
6e0018b
Merge remote-tracking branch 'upstream/master'
Nov 20, 2015
0546772
converge
gatorsmile Nov 20, 2015
b37a64f
converge
gatorsmile Nov 20, 2015
c2a872c
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
ab6dbd7
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
4276356
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
0bd1771
replace Intersect with Left-semi Join
gatorsmile Jan 7, 2016
7bd102b
Merge remote-tracking branch 'upstream/master' into IntersectBySemiJoin
gatorsmile Jan 7, 2016
bfa99c5
address comments.
gatorsmile Jan 7, 2016
cd23b03
Merge remote-tracking branch 'upstream/master' into IntersectBySemiJoin
gatorsmile Jan 7, 2016
100174a
clean code.
gatorsmile Jan 7, 2016
9aad1cf
clean code.
gatorsmile Jan 7, 2016
6742984
address comments.
gatorsmile Jan 7, 2016
e4c34f0
added one more case for duplicate values
gatorsmile Jan 7, 2016
2dab708
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 7, 2016
9864b3f
added an exception in conversion from logical to physical operators.
gatorsmile Jan 8, 2016
24cea7d
Add DISTINCT and test cases.
gatorsmile Jan 8, 2016
27192be
test case updates.
gatorsmile Jan 8, 2016
a932cdb
Merge remote-tracking branch 'upstream/master' into IntersectBySemiJoin
gatorsmile Jan 8, 2016
04a26bd
resolve the ambiguous attributes
gatorsmile Jan 8, 2016
0458770
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 8, 2016
6a52e2b
Merge branch 'IntersectBySemiJoin' into IntersectBySemiJoinMerged
gatorsmile Jan 8, 2016
f820c61
resolve comments.
gatorsmile Jan 9, 2016
4372170
style change.
gatorsmile Jan 9, 2016
1debdfa
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 9, 2016
763706d
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 14, 2016
4de6ec1
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 18, 2016
9422a4f
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 19, 2016
52bdf48
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 20, 2016
1e95df3
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 23, 2016
d59b37b
Merge branch 'IntersectBySemiJoinMerged' into IntersectBySemiJoinMerg…
gatorsmile Jan 23, 2016
6a7979d
fix failed test cases
gatorsmile Jan 23, 2016
fd87585
Merge remote-tracking branch 'upstream/master' into IntersectBySemiJo…
gatorsmile Jan 27, 2016
e566d79
address comments.
gatorsmile Jan 27, 2016
3be78c4
address comments.
gatorsmile Jan 28, 2016
e51de8f
fixed the failed cases.
gatorsmile Jan 28, 2016
b600089
addressed comments.
gatorsmile Jan 29, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
Batch("Aggregate", FixedPoint(100),
ReplaceDistinctWithAggregate,
RemoveLiteralFromGroupExpressions) ::
Batch("Intersect", FixedPoint(100),
ReplaceIntersectWithSemiJoin) ::
Batch("Operator Optimizations", FixedPoint(100),
// Operator push down
SetOperationPushDown,
Expand Down Expand Up @@ -93,18 +95,13 @@ object SamplePushDown extends Rule[LogicalPlan] {
}

/**
* Pushes certain operations to both sides of a Union, Intersect or Except operator.
* Pushes certain operations to both sides of a Union or Except operator.
* Operations that are safe to pushdown are listed as follows.
* Union:
* Right now, Union means UNION ALL, which does not de-duplicate rows. So, it is
* safe to pushdown Filters and Projections through it. Once we add UNION DISTINCT,
* we will not be able to pushdown Projections.
*
* Intersect:
* It is not safe to pushdown Projections through it because we need to get the
* intersect of rows by comparing the entire rows. It is fine to pushdown Filters
* with deterministic condition.
*
* Except:
* It is not safe to pushdown Projections through it because we need to get the
* intersect of rows by comparing the entire rows. It is fine to pushdown Filters
Expand All @@ -116,15 +113,15 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
* Maps Attributes from the left side to the corresponding Attribute on the right side.
*/
private def buildRewrites(bn: BinaryNode): AttributeMap[Attribute] = {
assert(bn.isInstanceOf[Union] || bn.isInstanceOf[Intersect] || bn.isInstanceOf[Except])
assert(bn.isInstanceOf[Union] || bn.isInstanceOf[Except])
assert(bn.left.output.size == bn.right.output.size)

AttributeMap(bn.left.output.zip(bn.right.output))
}

/**
* Rewrites an expression so that it can be pushed to the right side of a
* Union, Intersect or Except operator. This method relies on the fact that the output attributes
* Union or Except operator. This method relies on the fact that the output attributes
* of a union/intersect/except are always equal to the left child's output.
*/
private def pushToRight[A <: Expression](e: A, rewrites: AttributeMap[Attribute]) = {
Expand Down Expand Up @@ -175,17 +172,6 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
p
}

// Push down filter through INTERSECT
case Filter(condition, i @ Intersect(left, right)) =>
val (deterministic, nondeterministic) = partitionByDeterministic(condition)
val rewrites = buildRewrites(i)
Filter(nondeterministic,
Intersect(
Filter(deterministic, left),
Filter(pushToRight(deterministic, rewrites), right)
)
)

// Push down filter through EXCEPT
case Filter(condition, e @ Except(left, right)) =>
val (deterministic, nondeterministic) = partitionByDeterministic(condition)
Expand Down Expand Up @@ -965,6 +951,22 @@ object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] {
}
}

/**
* Replaces logical [[Intersect]] operator with a left-semi [[Join]] operator.
* {{{
* SELECT a1, a2 FROM Tab1 INTERSECT SELECT b1, b2 FROM Tab2
* ==> SELECT a1, a2 FROM Tab1 LEFT SEMI JOIN Tab2 ON a1<=>b1 AND a2<=>b2
* }}}
*/
object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to add a comment at here to mention that this rewrite is just for INTERSECT (i.e. INTERSECT DISTINCT) and is not applicable to INTERSECT ALL.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, will do it. Actually, I will also implement INTERSECT ALL after this one. Thanks!

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use transformUp?

cc @yhuai

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually nvm.

case Intersect(left, right) =>
val joinCond = left.output.zip(right.output).map { case (l, r) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one nit - might as well add assert(left.output.size == right.output.size)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. Thanks!

EqualNullSafe(l, r) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we put it in one line?

Join(left, right, LeftSemi, joinCond.reduceLeftOption(And))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like intersect also implies de-dup unless INTERSECT ALL is used (see http://www.postgresql.org/docs/9.4/static/queries-union.html and https://msdn.microsoft.com/en-us/library/ms188055.aspx).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right! It should remove duplicates. I just tried it in DB2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does our current intersect do de-dup?

On Thu, Jan 7, 2016 at 8:50 AM -0800, "Xiao Li" notifications@github.com wrote:

In sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:

@@ -966,6 +952,22 @@ object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] {
}

/**

  • * Replaces logical [[Intersect]] operator with a left-semi [[Join]] operator.
  • * {{{
  • * SELECT a1, a2 FROM Tab1 INTERSECT SELECT b1, b2 FROM Tab2
  • * ==> SELECT a1, a2 FROM Tab1 LEFT SEMI JOIN Tab2 ON a1<=>b1 AND a2<=>b2
  • * }}}
  • */
    +object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] {
  • def apply(plan: LogicalPlan): LogicalPlan = plan transform {
  • case Intersect(left, right) =>
  •  assert(left.output.size == right.output.size)
    
  •  val joinCond = left.output.zip(right.output).map { case (l, r) => EqualNullSafe(l, r) }
    
  •  Join(left, right, LeftSemi, joinCond.reduceLeftOption(And))
    

You are right! It should remove duplicates. I just tried it in DB2.


Reply to this email directly or view it on GitHub.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just ran it. The existing Intersect removes the duplicates.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also checked the underlying RDD API: Intersect. It removes the duplicates. If we added Distinct above Intersect, we need to measure the performance difference, I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just insert a distinct into this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Will do

}
}

/**
* Removes literals from group expressions in [[Aggregate]], as they have no effect to the result
* but only makes the grouping key bigger.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor

class ReplaceOperatorSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Intersect", FixedPoint(100),
ReplaceIntersectWithSemiJoin) :: Nil
}

test("replace Intersect with Left-semi Join") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also move the test for ReplaceDistinctWithAggregate here?

val table1 = LocalRelation('a.int, 'b.int)
val table2 = LocalRelation('c.int, 'd.int)

val query = Intersect(table1, table2)
val optimized = Optimize.execute(query.analyze)

val correctAnswer =
Join(table1, table2, LeftSemi, Option('a <=> 'c && 'b <=> 'd)).analyze

comparePlans(optimized, correctAnswer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,21 @@ class SetOperationPushDownSuite extends PlanTest {
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int)
val testUnion = Union(testRelation, testRelation2)
val testIntersect = Intersect(testRelation, testRelation2)
val testExcept = Except(testRelation, testRelation2)

test("union/intersect/except: filter to each side") {
test("union/except: filter to each side") {
val unionQuery = testUnion.where('a === 1)
val intersectQuery = testIntersect.where('b < 10)
val exceptQuery = testExcept.where('c >= 5)

val unionOptimized = Optimize.execute(unionQuery.analyze)
val intersectOptimized = Optimize.execute(intersectQuery.analyze)
val exceptOptimized = Optimize.execute(exceptQuery.analyze)

val unionCorrectAnswer =
Union(testRelation.where('a === 1), testRelation2.where('d === 1)).analyze
val intersectCorrectAnswer =
Intersect(testRelation.where('b < 10), testRelation2.where('e < 10)).analyze
val exceptCorrectAnswer =
Except(testRelation.where('c >= 5), testRelation2.where('f >= 5)).analyze

comparePlans(unionOptimized, unionCorrectAnswer)
comparePlans(intersectOptimized, intersectCorrectAnswer)
comparePlans(exceptOptimized, exceptCorrectAnswer)
}

Expand All @@ -70,13 +64,8 @@ class SetOperationPushDownSuite extends PlanTest {
}

test("SPARK-10539: Project should not be pushed down through Intersect or Except") {
val intersectQuery = testIntersect.select('b, 'c)
val exceptQuery = testExcept.select('a, 'b, 'c)

val intersectOptimized = Optimize.execute(intersectQuery.analyze)
val exceptOptimized = Optimize.execute(exceptQuery.analyze)

comparePlans(intersectOptimized, intersectQuery.analyze)
comparePlans(exceptOptimized, exceptQuery.analyze)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.Union(unionChildren.map(planLater)) :: Nil
case logical.Except(left, right) =>
execution.Except(planLater(left), planLater(right)) :: Nil
case logical.Intersect(left, right) =>
execution.Intersect(planLater(left), planLater(right)) :: Nil
case g @ logical.Generate(generator, join, outer, _, _, child) =>
execution.Generate(
generator, join = join, outer = outer, g.output, planLater(child)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,18 +307,6 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
}
}

/**
* Returns the rows in left that also appear in right using the built in spark
* intersection function.
*/
case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = children.head.output

protected override def doExecute(): RDD[InternalRow] = {
left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
}
}

/**
* A plan node that does nothing but lie about the output of its child. Used to spice a
* (hopefully structurally equivalent) tree from a different optimization sequence into an already
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
Row(3, "c") ::
Row(4, "d") :: Nil)
checkAnswer(lowerCaseData.intersect(upperCaseData), Nil)

// check null equality
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test case where there are duplicates on the left side?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good case might be two tables that each contain two int rows [1], [1] and intersect them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Let me add the test case. It passes but we should still add this case!

checkAnswer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leave a line of comment on what this is testing (e.g. null equality).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will do it.

nullInts.intersect(nullInts),
Row(1) ::
Row(2) ::
Row(3) ::
Row(null) :: Nil)
}

test("udf") {
Expand Down