From ab7ea3734777f378c7b2d809595f52a4bf8b4ce2 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 14 Mar 2016 06:11:38 +0000 Subject: [PATCH 1/4] init import. --- .../plans/logical/basicOperators.scala | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 09ea3fea6a694..b0502d7f91630 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -279,9 +279,27 @@ case class Join( case LeftSemi => left.constraints case LeftOuter => - left.constraints + // For left outer join, the constraints of right side are effective if the referred + // outputs in right side are not null. + val rightConstraints = right.constraints.map { constraint => + val relatedOuputs = constraint.references.filter(right.outputSet.contains) + val notNulls = relatedOuputs.map { o => + IsNotNull(o) + }.reduce(And) + And(notNulls, constraint) + } + left.constraints.union(ExpressionSet(rightConstraints)) case RightOuter => - right.constraints + // For right outer join, the constraints of left side are effective if the referred + // outputs in left side are not null. + val leftConstraints = left.constraints.map { constraint => + val relatedOuputs = constraint.references.filter(left.outputSet.contains) + val notNulls = relatedOuputs.map { o => + IsNotNull(o) + }.reduce(And) + And(notNulls, constraint) + } + right.constraints.union(ExpressionSet(leftConstraints)) case FullOuter => Set.empty[Expression] } From deae036525bc58e6c81d41a8cadfa8b33f7f9d74 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 14 Mar 2016 06:36:24 +0000 Subject: [PATCH 2/4] Fix. --- .../catalyst/plans/logical/basicOperators.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index b0502d7f91630..000e3496ee804 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -280,24 +280,24 @@ case class Join( left.constraints case LeftOuter => // For left outer join, the constraints of right side are effective if the referred - // outputs in right side are not null. + // outputs in right side are not null. Otherwise, the referred outputs are all nulls. val rightConstraints = right.constraints.map { constraint => val relatedOuputs = constraint.references.filter(right.outputSet.contains) - val notNulls = relatedOuputs.map { o => - IsNotNull(o) + val isNulls = relatedOuputs.map { o => + IsNull(o) }.reduce(And) - And(notNulls, constraint) + Or(notNulls, constraint) } left.constraints.union(ExpressionSet(rightConstraints)) case RightOuter => // For right outer join, the constraints of left side are effective if the referred - // outputs in left side are not null. + // outputs in left side are not null. Otherwise, the referred outputs are all nulls. val leftConstraints = left.constraints.map { constraint => val relatedOuputs = constraint.references.filter(left.outputSet.contains) - val notNulls = relatedOuputs.map { o => - IsNotNull(o) + val isNulls = relatedOuputs.map { o => + IsNull(o) }.reduce(And) - And(notNulls, constraint) + Or(notNulls, constraint) } right.constraints.union(ExpressionSet(leftConstraints)) case FullOuter => From 1aa85500725a4a4d5a55583cc400d7b7d4171c37 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 14 Mar 2016 07:57:22 +0000 Subject: [PATCH 3/4] Add constraints to outer join. --- .../plans/logical/basicOperators.scala | 38 +++++++++++-------- .../plans/ConstraintPropagationSuite.scala | 17 +++++++-- 2 files changed, 36 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 000e3496ee804..ebacf6a7106d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -265,6 +265,24 @@ case class Join( } } + private def getRightConstraintsForOuter = right.constraints.collect { + case constraint: Expression if !constraint.isInstanceOf[IsNotNull] => + val relatedOuputs = constraint.references.filter(right.outputSet.contains) + val isNulls = relatedOuputs.map { o => + IsNull(o) + }.reduce(And) + Or(isNulls, constraint) + } + + private def getLeftConstraintsForOuter = left.constraints.collect { + case constraint: Expression if !constraint.isInstanceOf[IsNotNull] => + val relatedOuputs = constraint.references.filter(left.outputSet.contains) + val isNulls = relatedOuputs.map { o => + IsNull(o) + }.reduce(And) + Or(isNulls, constraint) + } + override protected def validConstraints: Set[Expression] = { joinType match { case Inner if condition.isDefined => @@ -281,27 +299,17 @@ case class Join( case LeftOuter => // For left outer join, the constraints of right side are effective if the referred // outputs in right side are not null. Otherwise, the referred outputs are all nulls. - val rightConstraints = right.constraints.map { constraint => - val relatedOuputs = constraint.references.filter(right.outputSet.contains) - val isNulls = relatedOuputs.map { o => - IsNull(o) - }.reduce(And) - Or(notNulls, constraint) - } + val rightConstraints = getRightConstraintsForOuter left.constraints.union(ExpressionSet(rightConstraints)) case RightOuter => // For right outer join, the constraints of left side are effective if the referred // outputs in left side are not null. Otherwise, the referred outputs are all nulls. - val leftConstraints = left.constraints.map { constraint => - val relatedOuputs = constraint.references.filter(left.outputSet.contains) - val isNulls = relatedOuputs.map { o => - IsNull(o) - }.reduce(And) - Or(notNulls, constraint) - } + val leftConstraints = getLeftConstraintsForOuter right.constraints.union(ExpressionSet(leftConstraints)) case FullOuter => - Set.empty[Expression] + val rightConstraints = getRightConstraintsForOuter + val leftConstraints = getLeftConstraintsForOuter + ExpressionSet(leftConstraints).union(ExpressionSet(rightConstraints)) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala index a9375a740daac..75e2ab754cb3a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala @@ -183,7 +183,9 @@ class ConstraintPropagationSuite extends SparkFunSuite { .join(tr2.where('d.attr < 100), LeftOuter, Some("tr1.a".attr === "tr2.a".attr)) .analyze.constraints, ExpressionSet(Seq(tr1.resolveQuoted("a", caseInsensitiveResolution).get > 10, - IsNotNull(tr1.resolveQuoted("a", caseInsensitiveResolution).get)))) + IsNotNull(tr1.resolveQuoted("a", caseInsensitiveResolution).get), + Or(IsNull(tr2.resolveQuoted("d", caseInsensitiveResolution).get), + tr2.resolveQuoted("d", caseInsensitiveResolution).get < 100)))) } test("propagating constraints in right-outer join") { @@ -194,15 +196,22 @@ class ConstraintPropagationSuite extends SparkFunSuite { .join(tr2.where('d.attr < 100), RightOuter, Some("tr1.a".attr === "tr2.a".attr)) .analyze.constraints, ExpressionSet(Seq(tr2.resolveQuoted("d", caseInsensitiveResolution).get < 100, - IsNotNull(tr2.resolveQuoted("d", caseInsensitiveResolution).get)))) + IsNotNull(tr2.resolveQuoted("d", caseInsensitiveResolution).get), + Or(IsNull(tr1.resolveQuoted("a", caseInsensitiveResolution).get), + tr1.resolveQuoted("a", caseInsensitiveResolution).get > 10)))) } test("propagating constraints in full-outer join") { val tr1 = LocalRelation('a.int, 'b.int, 'c.int).subquery('tr1) val tr2 = LocalRelation('a.int, 'd.int, 'e.int).subquery('tr2) - assert(tr1.where('a.attr > 10) + verifyConstraints(tr1.where('a.attr > 10) .join(tr2.where('d.attr < 100), FullOuter, Some("tr1.a".attr === "tr2.a".attr)) - .analyze.constraints.isEmpty) + .analyze.constraints, + ExpressionSet(Seq( + Or(IsNull(tr1.resolveQuoted("a", caseInsensitiveResolution).get), + tr1.resolveQuoted("a",caseInsensitiveResolution).get > 10), + Or(IsNull(tr2.resolveQuoted("d", caseInsensitiveResolution).get), + tr2.resolveQuoted("d", caseInsensitiveResolution).get < 100)))) } test("infer additional constraints in filters") { From 6379d8bf800c3f9af40875a9ac11f34da23860ee Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 14 Mar 2016 08:31:26 +0000 Subject: [PATCH 4/4] Fix scala style. --- .../spark/sql/catalyst/plans/ConstraintPropagationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala index 75e2ab754cb3a..ce712b79f7b69 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala @@ -209,7 +209,7 @@ class ConstraintPropagationSuite extends SparkFunSuite { .analyze.constraints, ExpressionSet(Seq( Or(IsNull(tr1.resolveQuoted("a", caseInsensitiveResolution).get), - tr1.resolveQuoted("a",caseInsensitiveResolution).get > 10), + tr1.resolveQuoted("a", caseInsensitiveResolution).get > 10), Or(IsNull(tr2.resolveQuoted("d", caseInsensitiveResolution).get), tr2.resolveQuoted("d", caseInsensitiveResolution).get < 100)))) }