From 8a0e8b0f290826d6453739a4039665e9e9806471 Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Mon, 27 Mar 2017 17:47:23 -0700 Subject: [PATCH 1/3] fix. --- .../statsEstimation/FilterEstimation.scala | 29 ++++++++++++++--- .../FilterEstimationSuite.scala | 32 +++++++++++++++++++ 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index b10785b05d6c7..f61034bdc83e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -24,6 +24,7 @@ import scala.math.BigDecimal.RoundingMode import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Filter, Statistics} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ @@ -134,13 +135,13 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo */ def calculateSingleCondition(condition: Expression, update: Boolean): Option[Double] = { condition match { - // For evaluateBinary method, we assume the literal on the right side of an operator. - // So we will change the order if not. + case l: Literal => + evaluateLiteral(l) // EqualTo/EqualNullSafe does not care about the order - case op @ Equality(ar: Attribute, l: Literal) => + case Equality(ar: Attribute, l: Literal) => evaluateEquality(ar, l, update) - case op @ Equality(l: Literal, ar: Attribute) => + case Equality(l: Literal, ar: Attribute) => evaluateEquality(ar, l, update) case op @ LessThan(ar: Attribute, l: Literal) => @@ -336,6 +337,26 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo } + /** + * Returns a percentage of rows meeting a Literal expression. + * This method evaluates all the possible literal cases in Filter. + * + * FalseLiteral and TrueLiteral should be eliminated by optimizer, but null literal might be added + * by optimizer rule NullPropagation. For safety, we handle all the cases here. + * + * @param literal a literal value (or constant) + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateLiteral(literal: Literal): Option[Double] = { + literal match { + case Literal(null, _) => Some(0.0) + case FalseLiteral => Some(0.0) + case TrueLiteral => Some(1.0) + // Ideally, we should not hit the following branch + case _ => None + } + } + /** * Returns a percentage of rows meeting "IN" operator expression. * This method evaluates the equality predicate for all data types. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala index 4691913c8c986..3933d6b48728c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.statsEstimation import java.sql.Date import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Filter, Statistics} import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._ import org.apache.spark.sql.types._ @@ -75,6 +76,27 @@ class FilterEstimationSuite extends StatsEstimationTestBase { attrDouble -> colStatDouble, attrString -> colStatString)) + test("true") { + validateEstimatedStats( + Filter(TrueLiteral, childStatsTestPlan(Seq(attrInt), 10L)), + Seq(attrInt -> colStatInt), + expectedRowCount = 10) + } + + test("false") { + validateEstimatedStats( + Filter(FalseLiteral, childStatsTestPlan(Seq(attrInt), 10L)), + Nil, + expectedRowCount = 0) + } + + test("null") { + validateEstimatedStats( + Filter(Literal(null, IntegerType), childStatsTestPlan(Seq(attrInt), 10L)), + Nil, + expectedRowCount = 0) + } + test("cint = 2") { validateEstimatedStats( Filter(EqualTo(attrInt, Literal(2)), childStatsTestPlan(Seq(attrInt), 10L)), @@ -162,6 +184,16 @@ class FilterEstimationSuite extends StatsEstimationTestBase { expectedRowCount = 10) } + test("cint IS NOT NULL && null") { + // 'cint < null' will be optimized to 'cint IS NOT NULL && null'. + // More similar cases can be found in the Optimizer NullPropagation. + val condition = And(IsNotNull(attrInt), Literal(null, IntegerType)) + validateEstimatedStats( + Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)), + Nil, + expectedRowCount = 0) + } + test("cint > 3 AND cint <= 6") { val condition = And(GreaterThan(attrInt, Literal(3)), LessThanOrEqual(attrInt, Literal(6))) validateEstimatedStats( From 1775ca73e9b2918fdc212fbe517f2e743f048368 Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Mon, 27 Mar 2017 18:38:14 -0700 Subject: [PATCH 2/3] revert --- .../plans/logical/statsEstimation/FilterEstimation.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index d7b64f65d98f9..3554f91617f26 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -138,6 +138,9 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo case l: Literal => evaluateLiteral(l) + // For evaluateBinary method, we assume the literal on the right side of an operator. + // So we will change the order if not. + // EqualTo/EqualNullSafe does not care about the order case Equality(ar: Attribute, l: Literal) => evaluateEquality(ar, l, update) From 8ec57f394f3ab0c1fc92545c50e028b7867bb064 Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Wed, 29 Mar 2017 00:23:33 -0700 Subject: [PATCH 3/3] fix. --- .../statsEstimation/FilterEstimation.scala | 11 ++++ .../FilterEstimationSuite.scala | 55 +++++++++++++++++++ 2 files changed, 66 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index 3554f91617f26..b32374c5742ef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -105,12 +105,23 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo val percent2 = calculateFilterSelectivity(cond2, update = false).getOrElse(1.0) Some(percent1 + percent2 - (percent1 * percent2)) + // Not-operator pushdown case Not(And(cond1, cond2)) => calculateFilterSelectivity(Or(Not(cond1), Not(cond2)), update = false) + // Not-operator pushdown case Not(Or(cond1, cond2)) => calculateFilterSelectivity(And(Not(cond1), Not(cond2)), update = false) + // Collapse two consecutive Not operators which could be generated after Not-operator pushdown + case Not(Not(cond)) => + calculateFilterSelectivity(cond, update = false) + + // The foldable Not has been processed in the ConstantFolding rule + // This is a top-down traversal. The Not could be pushed down by the above two cases. + case Not(l @ Literal(null, _)) => + calculateSingleCondition(l, update = false) + case Not(cond) => calculateFilterSelectivity(cond, update = false) match { case Some(percent) => Some(1.0 - percent) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala index 2962c26375f84..1966c96c05294 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala @@ -98,6 +98,61 @@ class FilterEstimationSuite extends StatsEstimationTestBase { expectedRowCount = 0) } + test("Not(null)") { + validateEstimatedStats( + Filter(Not(Literal(null, IntegerType)), childStatsTestPlan(Seq(attrInt), 10L)), + Nil, + expectedRowCount = 0) + } + + test("Not(Not(null))") { + validateEstimatedStats( + Filter(Not(Not(Literal(null, IntegerType))), childStatsTestPlan(Seq(attrInt), 10L)), + Nil, + expectedRowCount = 0) + } + + test("cint < 3 AND null") { + val condition = And(LessThan(attrInt, Literal(3)), Literal(null, IntegerType)) + validateEstimatedStats( + Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)), + Nil, + expectedRowCount = 0) + } + + test("cint < 3 OR null") { + val condition = Or(LessThan(attrInt, Literal(3)), Literal(null, IntegerType)) + val m = Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)).stats(conf) + validateEstimatedStats( + Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)), + Seq(attrInt -> colStatInt), + expectedRowCount = 3) + } + + test("Not(cint < 3 AND null)") { + val condition = Not(And(LessThan(attrInt, Literal(3)), Literal(null, IntegerType))) + validateEstimatedStats( + Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)), + Seq(attrInt -> colStatInt), + expectedRowCount = 8) + } + + test("Not(cint < 3 OR null)") { + val condition = Not(Or(LessThan(attrInt, Literal(3)), Literal(null, IntegerType))) + validateEstimatedStats( + Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)), + Nil, + expectedRowCount = 0) + } + + test("Not(cint < 3 AND Not(null))") { + val condition = Not(And(LessThan(attrInt, Literal(3)), Not(Literal(null, IntegerType)))) + validateEstimatedStats( + Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)), + Seq(attrInt -> colStatInt), + expectedRowCount = 8) + } + test("cint = 2") { validateEstimatedStats( Filter(EqualTo(attrInt, Literal(2)), childStatsTestPlan(Seq(attrInt), 10L)),