Skip to content

Commit

Permalink
fixed the failed cases.
Browse files Browse the repository at this point in the history
  • Loading branch information
gatorsmile committed Jan 28, 2016
1 parent 3be78c4 commit e51de8f
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -445,14 +445,11 @@ class Analyzer(
.map(_.asInstanceOf[NamedExpression])
a.copy(aggregateExpressions = expanded)

// To resolve duplicate expression IDs for all the BinaryNode
case b: BinaryNode if !b.duplicateResolved => b match {
case j @ Join(left, right, _, _) =>
j.copy(right = dedupRight(left, right))
case i @ Intersect(left, right) =>
i.copy(right = dedupRight(left, right))
case other => other
}
// To resolve duplicate expression IDs for Join and Intersect
case j @ Join(left, right, _, _) if !j.duplicateResolved =>
j.copy(right = dedupRight(left, right))
case i @ Intersect(left, right) if !i.duplicateResolved =>
i.copy(right = dedupRight(left, right))

// When resolve `SortOrder`s in Sort based on child, don't report errors as
// we still have chance to resolve it based on grandchild
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ trait CheckAnalysis {
s"""Only a single table generating function is allowed in a SELECT clause, found:
| ${exprs.map(_.prettyString).mkString(",")}""".stripMargin)

case j: BinaryNode if !j.duplicateResolved =>
case j: Join if !j.duplicateResolved =>
val conflictingAttributes = j.left.outputSet.intersect(j.right.outputSet)
failAnalysis(
s"""
Expand All @@ -224,6 +224,16 @@ trait CheckAnalysis {
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
|""".stripMargin)

case i: Intersect if !i.duplicateResolved =>
val conflictingAttributes = i.left.outputSet.intersect(i.right.outputSet)
failAnalysis(
s"""
|Failure when resolving conflicting references
|in operator ${operator.simpleString}:
|$plan
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
|""".stripMargin)

case o if !o.resolved =>
failAnalysis(
s"unresolved operator ${operator.simpleString}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,10 @@ object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] {
* ==> SELECT DISTINCT a1, a2 FROM Tab1 LEFT SEMI JOIN Tab2 ON a1<=>b1 AND a2<=>b2
* }}}
*
* This rule is only applicable to INTERSECT DISTINCT. Do not use it for INTERSECT ALL.
* Note:
* 1. This rule is only applicable to INTERSECT DISTINCT. Do not use it for INTERSECT ALL.
* 2. This rule has to be done after de-duplicating the attributes; otherwise, the generated
* join conditions will be incorrect.
*/
object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,6 @@ abstract class BinaryNode extends LogicalPlan {

override def children: Seq[LogicalPlan] = Seq(left, right)

def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty

override lazy val resolved: Boolean =
expressions.forall(_.resolved) && childrenResolved && duplicateResolved
expressions.forall(_.resolved) && childrenResolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,30 +91,38 @@ case class Filter(condition: Expression, child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}

abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {

final override lazy val resolved: Boolean =
childrenResolved &&
left.output.length == right.output.length &&
left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } &&
duplicateResolved
}
abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode

private[sql] object SetOperation {
def unapply(p: SetOperation): Option[(LogicalPlan, LogicalPlan)] = Some((p.left, p.right))
}

case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {

def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty

override def output: Seq[Attribute] =
left.output.zip(right.output).map { case (leftAttr, rightAttr) =>
leftAttr.withNullability(leftAttr.nullable && rightAttr.nullable)
}

// Intersect are only resolved if they don't introduce ambiguous expression ids,
// since the Optimizer will convert Intersect to Join.
override lazy val resolved: Boolean =
childrenResolved &&
left.output.length == right.output.length &&
left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } &&
duplicateResolved
}

case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {
/** We don't use right.output because those rows get excluded from the set. */
override def output: Seq[Attribute] = left.output

override lazy val resolved: Boolean =
childrenResolved &&
left.output.length == right.output.length &&
left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType }
}

/** Factory for constructing new `Union` nodes. */
Expand Down Expand Up @@ -172,6 +180,8 @@ case class Join(
}
}

def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty

// Joins are only resolved if they don't introduce ambiguous expression ids.
override lazy val resolved: Boolean = {
childrenResolved &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,16 +421,13 @@ class HiveTypeCoercionSuite extends PlanTest {
assert(r2.left.isInstanceOf[Project])
assert(r2.right.isInstanceOf[Project])

// Even if we are doing self Except, we still add Project. The node Except will not be marked
// as analyzed unless their exprId are de-duplicated. Thus, the func resolveOperators called in
// WidenSetOperationTypes does not skip and return the node before applying the rule.
val r3 = wt(Except(firstTable, firstTable)).asInstanceOf[Except]
checkOutput(r3.left, Seq(IntegerType, DecimalType.SYSTEM_DEFAULT, ByteType, DoubleType))
checkOutput(r3.right, Seq(IntegerType, DecimalType.SYSTEM_DEFAULT, ByteType, DoubleType))

// Check if a Project is added
assert(r3.left.isInstanceOf[Project])
assert(r3.right.isInstanceOf[Project])
// Check if no Project is added
assert(r3.left.isInstanceOf[LocalRelation])
assert(r3.right.isInstanceOf[LocalRelation])
}

test("WidenSetOperationTypes for union") {
Expand Down

0 comments on commit e51de8f

Please sign in to comment.