Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12656] [SQL] Implement Intersect with Left-semi Join #10630

Closed
wants to merge 44 commits into from
Closed
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
01e4cdf
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 13, 2015
6835704
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
9180687
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
b38a21e
SPARK-11633
gatorsmile Nov 17, 2015
d2b84af
Merge remote-tracking branch 'upstream/master' into joinMakeCopy
gatorsmile Nov 17, 2015
fda8025
Merge remote-tracking branch 'upstream/master'
gatorspark Nov 17, 2015
ac0dccd
Merge branch 'master' of https://github.com/gatorsmile/spark
gatorspark Nov 17, 2015
6e0018b
Merge remote-tracking branch 'upstream/master'
Nov 20, 2015
0546772
converge
gatorsmile Nov 20, 2015
b37a64f
converge
gatorsmile Nov 20, 2015
c2a872c
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
ab6dbd7
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
4276356
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
0bd1771
replace Intersect with Left-semi Join
gatorsmile Jan 7, 2016
7bd102b
Merge remote-tracking branch 'upstream/master' into IntersectBySemiJoin
gatorsmile Jan 7, 2016
bfa99c5
address comments.
gatorsmile Jan 7, 2016
cd23b03
Merge remote-tracking branch 'upstream/master' into IntersectBySemiJoin
gatorsmile Jan 7, 2016
100174a
clean code.
gatorsmile Jan 7, 2016
9aad1cf
clean code.
gatorsmile Jan 7, 2016
6742984
address comments.
gatorsmile Jan 7, 2016
e4c34f0
added one more case for duplicate values
gatorsmile Jan 7, 2016
2dab708
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 7, 2016
9864b3f
added an exception in conversion from logical to physical operators.
gatorsmile Jan 8, 2016
24cea7d
Add DISTINCT and test cases.
gatorsmile Jan 8, 2016
27192be
test case updates.
gatorsmile Jan 8, 2016
a932cdb
Merge remote-tracking branch 'upstream/master' into IntersectBySemiJoin
gatorsmile Jan 8, 2016
04a26bd
resolve the ambiguous attributes
gatorsmile Jan 8, 2016
0458770
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 8, 2016
6a52e2b
Merge branch 'IntersectBySemiJoin' into IntersectBySemiJoinMerged
gatorsmile Jan 8, 2016
f820c61
resolve comments.
gatorsmile Jan 9, 2016
4372170
style change.
gatorsmile Jan 9, 2016
1debdfa
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 9, 2016
763706d
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 14, 2016
4de6ec1
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 18, 2016
9422a4f
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 19, 2016
52bdf48
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 20, 2016
1e95df3
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 23, 2016
d59b37b
Merge branch 'IntersectBySemiJoinMerged' into IntersectBySemiJoinMerg…
gatorsmile Jan 23, 2016
6a7979d
fix failed test cases
gatorsmile Jan 23, 2016
fd87585
Merge remote-tracking branch 'upstream/master' into IntersectBySemiJo…
gatorsmile Jan 27, 2016
e566d79
address comments.
gatorsmile Jan 27, 2016
3be78c4
address comments.
gatorsmile Jan 28, 2016
e51de8f
fixed the failed cases.
gatorsmile Jan 28, 2016
b600089
addressed comments.
gatorsmile Jan 29, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,63 @@ class Analyzer(
}
}

/**
* Generate a new logical plan for the right child with different expression IDs
* for all conflicting attributes.
*/
private def dedupRight (left: LogicalPlan, right: LogicalPlan): LogicalPlan = {
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} " +
s"between $left and $right")

right.collect {
// Handle base relations that might appear more than once.
case oldVersion: MultiInstanceRelation
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
val newVersion = oldVersion.newInstance()
(oldVersion, newVersion)

// Handle projects that create conflicting aliases.
case oldVersion @ Project(projectList, _)
if findAliases(projectList).intersect(conflictingAttributes).nonEmpty =>
(oldVersion, oldVersion.copy(projectList = newAliases(projectList)))

case oldVersion @ Aggregate(_, aggregateExpressions, _)
if findAliases(aggregateExpressions).intersect(conflictingAttributes).nonEmpty =>
(oldVersion, oldVersion.copy(aggregateExpressions = newAliases(aggregateExpressions)))

case oldVersion: Generate
if oldVersion.generatedSet.intersect(conflictingAttributes).nonEmpty =>
val newOutput = oldVersion.generatorOutput.map(_.newInstance())
(oldVersion, oldVersion.copy(generatorOutput = newOutput))

case oldVersion @ Window(_, windowExpressions, _, _, child)
if AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes)
.nonEmpty =>
(oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions)))
}
// Only handle first case, others will be fixed on the next pass.
.headOption match {
case None =>
/*
* No result implies that there is a logical plan node that produces new references
* that this rule cannot handle. When that is the case, there must be another rule
* that resolves these conflicts. Otherwise, the analysis will fail.
*/
right
case Some((oldRelation, newRelation)) =>
val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
val newRight = right transformUp {
case r if r == oldRelation => newRelation
} transformUp {
case other => other transformExpressions {
case a: Attribute => attributeRewrites.get(a).getOrElse(a)
}
}
newRight
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case p: LogicalPlan if !p.childrenResolved => p

Expand Down Expand Up @@ -388,57 +445,11 @@ class Analyzer(
.map(_.asInstanceOf[NamedExpression])
a.copy(aggregateExpressions = expanded)

// Special handling for cases when self-join introduce duplicate expression ids.
case j @ Join(left, right, _, _) if !j.selfJoinResolved =>
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} in $j")

right.collect {
// Handle base relations that might appear more than once.
case oldVersion: MultiInstanceRelation
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
val newVersion = oldVersion.newInstance()
(oldVersion, newVersion)

// Handle projects that create conflicting aliases.
case oldVersion @ Project(projectList, _)
if findAliases(projectList).intersect(conflictingAttributes).nonEmpty =>
(oldVersion, oldVersion.copy(projectList = newAliases(projectList)))

case oldVersion @ Aggregate(_, aggregateExpressions, _)
if findAliases(aggregateExpressions).intersect(conflictingAttributes).nonEmpty =>
(oldVersion, oldVersion.copy(aggregateExpressions = newAliases(aggregateExpressions)))

case oldVersion: Generate
if oldVersion.generatedSet.intersect(conflictingAttributes).nonEmpty =>
val newOutput = oldVersion.generatorOutput.map(_.newInstance())
(oldVersion, oldVersion.copy(generatorOutput = newOutput))

case oldVersion @ Window(_, windowExpressions, _, _, child)
if AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes)
.nonEmpty =>
(oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions)))
}
// Only handle first case, others will be fixed on the next pass.
.headOption match {
case None =>
/*
* No result implies that there is a logical plan node that produces new references
* that this rule cannot handle. When that is the case, there must be another rule
* that resolves these conflicts. Otherwise, the analysis will fail.
*/
j
case Some((oldRelation, newRelation)) =>
val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
val newRight = right transformUp {
case r if r == oldRelation => newRelation
} transformUp {
case other => other transformExpressions {
case a: Attribute => attributeRewrites.get(a).getOrElse(a)
}
}
j.copy(right = newRight)
}
// To resolve duplicate expression IDs for Join and Intersect
case j @ Join(left, right, _, _) if !j.duplicateResolved =>
j.copy(right = dedupRight(left, right))
case i @ Intersect(left, right) if !i.duplicateResolved =>
i.copy(right = dedupRight(left, right))

// When resolve `SortOrder`s in Sort based on child, don't report errors as
// we still have chance to resolve it based on grandchild
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,22 @@ trait CheckAnalysis {
s"""Only a single table generating function is allowed in a SELECT clause, found:
| ${exprs.map(_.prettyString).mkString(",")}""".stripMargin)

// Special handling for cases when self-join introduce duplicate expression ids.
case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty =>
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
case j: Join if !j.duplicateResolved =>
val conflictingAttributes = j.left.outputSet.intersect(j.right.outputSet)
failAnalysis(
s"""
|Failure when resolving conflicting references in Join:
|Failure when resolving conflicting references
|in operator ${operator.simpleString}:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now we can keep this message as it only checks join :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can users observe the error? or it can be considered as an internal errors? BTW, we are about to convert it to an internal error in the PR: #41476

|$plan
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
|""".stripMargin)

case i: Intersect if !i.duplicateResolved =>
val conflictingAttributes = i.left.outputSet.intersect(i.right.outputSet)
failAnalysis(
s"""
|Failure when resolving conflicting references
|in operator ${operator.simpleString}:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, we could say Intersect directly

|$plan
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
|""".stripMargin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
// since the other rules might make two separate Unions operators adjacent.
Batch("Union", Once,
CombineUnions) ::
Batch("Replace Operators", FixedPoint(100),
ReplaceIntersectWithSemiJoin,
ReplaceDistinctWithAggregate) ::
Batch("Aggregate", FixedPoint(100),
ReplaceDistinctWithAggregate,
RemoveLiteralFromGroupExpressions) ::
Batch("Operator Optimizations", FixedPoint(100),
// Operator push down
Expand Down Expand Up @@ -124,18 +126,13 @@ object EliminateSerialization extends Rule[LogicalPlan] {
}

/**
* Pushes certain operations to both sides of a Union, Intersect or Except operator.
* Pushes certain operations to both sides of a Union or Except operator.
* Operations that are safe to pushdown are listed as follows.
* Union:
* Right now, Union means UNION ALL, which does not de-duplicate rows. So, it is
* safe to pushdown Filters and Projections through it. Once we add UNION DISTINCT,
* we will not be able to pushdown Projections.
*
* Intersect:
* It is not safe to pushdown Projections through it because we need to get the
* intersect of rows by comparing the entire rows. It is fine to pushdown Filters
* with deterministic condition.
*
* Except:
* It is not safe to pushdown Projections through it because we need to get the
* intersect of rows by comparing the entire rows. It is fine to pushdown Filters
Expand All @@ -153,7 +150,7 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {

/**
* Rewrites an expression so that it can be pushed to the right side of a
* Union, Intersect or Except operator. This method relies on the fact that the output attributes
* Union or Except operator. This method relies on the fact that the output attributes
* of a union/intersect/except are always equal to the left child's output.
*/
private def pushToRight[A <: Expression](e: A, rewrites: AttributeMap[Attribute]) = {
Expand Down Expand Up @@ -210,17 +207,6 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
}
Filter(nondeterministic, Union(newFirstChild +: newOtherChildren))

// Push down filter through INTERSECT
case Filter(condition, Intersect(left, right)) =>
val (deterministic, nondeterministic) = partitionByDeterministic(condition)
val rewrites = buildRewrites(left, right)
Filter(nondeterministic,
Intersect(
Filter(deterministic, left),
Filter(pushToRight(deterministic, rewrites), right)
)
)

// Push down filter through EXCEPT
case Filter(condition, Except(left, right)) =>
val (deterministic, nondeterministic) = partitionByDeterministic(condition)
Expand Down Expand Up @@ -1054,6 +1040,27 @@ object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] {
}
}

/**
* Replaces logical [[Intersect]] operator with a left-semi [[Join]] operator.
* {{{
* SELECT a1, a2 FROM Tab1 INTERSECT SELECT b1, b2 FROM Tab2
* ==> SELECT DISTINCT a1, a2 FROM Tab1 LEFT SEMI JOIN Tab2 ON a1<=>b1 AND a2<=>b2
* }}}
*
* Note:
* 1. This rule is only applicable to INTERSECT DISTINCT. Do not use it for INTERSECT ALL.
* 2. This rule has to be done after de-duplicating the attributes; otherwise, the generated
* join conditions will be incorrect.
*/
object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to add a comment at here to mention that this rewrite is just for INTERSECT (i.e. INTERSECT DISTINCT) and is not applicable to INTERSECT ALL.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, will do it. Actually, I will also implement INTERSECT ALL after this one. Thanks!

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use transformUp?

cc @yhuai

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually nvm.

case Intersect(left, right) =>
assert(left.output.size == right.output.size)
val joinCond = left.output.zip(right.output).map { case (l, r) => EqualNullSafe(l, r) }
Distinct(Join(left, right, LeftSemi, joinCond.reduceLeftOption(And)))
}
}

/**
* Removes literals from group expressions in [[Aggregate]], as they have no effect to the result
* but only makes the grouping key bigger.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,4 +315,7 @@ abstract class BinaryNode extends LogicalPlan {
def right: LogicalPlan

override def children: Seq[LogicalPlan] = Seq(left, right)

override lazy val resolved: Boolean =
expressions.forall(_.resolved) && childrenResolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why override this?

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -90,28 +91,38 @@ case class Filter(condition: Expression, child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}

abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
final override lazy val resolved: Boolean =
childrenResolved &&
left.output.length == right.output.length &&
left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType }
}
abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode

private[sql] object SetOperation {
def unapply(p: SetOperation): Option[(LogicalPlan, LogicalPlan)] = Some((p.left, p.right))
}

case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {

def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty

override def output: Seq[Attribute] =
left.output.zip(right.output).map { case (leftAttr, rightAttr) =>
leftAttr.withNullability(leftAttr.nullable && rightAttr.nullable)
}

// Intersect are only resolved if they don't introduce ambiguous expression ids,
// since the Optimizer will convert Intersect to Join.
override lazy val resolved: Boolean =
childrenResolved &&
left.output.length == right.output.length &&
left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } &&
duplicateResolved
}

case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {
/** We don't use right.output because those rows get excluded from the set. */
override def output: Seq[Attribute] = left.output

override lazy val resolved: Boolean =
childrenResolved &&
left.output.length == right.output.length &&
left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType }
}

/** Factory for constructing new `Union` nodes. */
Expand Down Expand Up @@ -169,13 +180,13 @@ case class Join(
}
}

def selfJoinResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty

// Joins are only resolved if they don't introduce ambiguous expression ids.
override lazy val resolved: Boolean = {
childrenResolved &&
expressions.forall(_.resolved) &&
selfJoinResolved &&
duplicateResolved &&
condition.forall(_.dataType == BooleanType)
}
}
Expand Down Expand Up @@ -249,7 +260,7 @@ case class Range(
end: Long,
step: Long,
numSlices: Int,
output: Seq[Attribute]) extends LeafNode {
output: Seq[Attribute]) extends LeafNode with MultiInstanceRelation {
require(step != 0, "step cannot be 0")
val numElements: BigInt = {
val safeStart = BigInt(start)
Expand All @@ -262,6 +273,9 @@ case class Range(
}
}

override def newInstance(): Range =
Range(start, end, step, numSlices, output.map(_.newInstance()))

override def statistics: Statistics = {
val sizeInBytes = LongType.defaultSize * numElements
Statistics( sizeInBytes = sizeInBytes )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ class AnalysisErrorSuite extends AnalysisTest {
val error = intercept[AnalysisException] {
SimpleAnalyzer.checkAnalysis(join)
}
assert(error.message.contains("Failure when resolving conflicting references in Join"))
assert(error.message.contains("Failure when resolving conflicting references\n" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can revert this after revert the error message change.

"in operator 'Join"))
assert(error.message.contains("Conflicting attributes"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ class AnalysisSuite extends AnalysisTest {
checkAnalysis(plan, expected)
}

test("self intersect should resolve duplicate expression IDs") {
val plan = testRelation.intersect(testRelation)
assertAnalysisSuccess(plan)
}

test("SPARK-8654: invalid CAST in NULL IN(...) expression") {
val plan = Project(Alias(In(Literal(null), Seq(Literal(1), Literal(2))), "a")() :: Nil,
LocalRelation()
Expand Down
Loading