From 7980cd1d3f190d00633499dd3a80612d4aca1389 Mon Sep 17 00:00:00 2001 From: Ron Hu Date: Fri, 24 Mar 2017 13:31:35 -0700 Subject: [PATCH 01/11] filter estimation on two columns of same table --- .../statsEstimation/FilterEstimation.scala | 142 +++++++++++++++++- .../FilterEstimationSuite.scala | 111 +++++++++++++- 2 files changed, 243 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index b32374c5742ef..ab6bbb2cc0464 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -201,6 +201,21 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo case IsNotNull(ar: Attribute) if plan.child.isInstanceOf[LeafNode] => evaluateNullCheck(ar, isNull = false, update) + case op @ Equality(attrLeft: Attribute, attrRight: Attribute) => + evaluateBinaryForTwoColumns(op, attrLeft, attrRight, update) + + case op @ LessThan(attrLeft: Attribute, attrRight: Attribute) => + evaluateBinaryForTwoColumns(op, attrLeft, attrRight, update) + + case op @ LessThanOrEqual(attrLeft: Attribute, attrRight: Attribute) => + evaluateBinaryForTwoColumns(op, attrLeft, attrRight, update) + + case op @ GreaterThan(attrLeft: Attribute, attrRight: Attribute) => + evaluateBinaryForTwoColumns(op, attrLeft, attrRight, update) + + case op @ GreaterThanOrEqual(attrLeft: Attribute, attrRight: Attribute) => + evaluateBinaryForTwoColumns(op, attrLeft, attrRight, update) + case _ => // TODO: it's difficult to support string operators without advanced statistics. // Hence, these string operators Like(_, _) | Contains(_, _) | StartsWith(_, _) @@ -257,7 +272,7 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo /** * Returns a percentage of rows meeting a binary comparison expression. * - * @param op a binary comparison operator uch as =, <, <=, >, >= + * @param op a binary comparison operator such as =, <, <=, >, >= * @param attr an Attribute (or a column) * @param literal a literal value (or constant) * @param update a boolean flag to specify if we need to update ColumnStat of a given column @@ -448,7 +463,7 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo * Returns a percentage of rows meeting a binary comparison expression. * This method evaluate expression for Numeric/Date/Timestamp/Boolean columns. * - * @param op a binary comparison operator uch as =, <, <=, >, >= + * @param op a binary comparison operator such as =, <, <=, >, >= * @param attr an Attribute (or a column) * @param literal a literal value (or constant) * @param update a boolean flag to specify if we need to update ColumnStat of a given column @@ -550,8 +565,131 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator such as =, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of a given column + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + + if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None + } + if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None + } + + attrLeft.dataType match { + case StringType | BinaryType => + // TODO: It is difficult to support other binary comparisons for String/Binary + // type without min/max and advanced statistics like histogram. + logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) + return None + case _ => + } + + val colStatLeft = colStatsMap(attrLeft) + val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] + val maxLeft = BigDecimal(statsRangeLeft.max) + val minLeft = BigDecimal(statsRangeLeft.min) + val ndvLeft = BigDecimal(colStatLeft.distinctCount) + + val colStatRight = colStatsMap(attrRight) + val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] + val maxRight = BigDecimal(statsRangeRight.max) + val minRight = BigDecimal(statsRangeRight.min) + val ndvRight = BigDecimal(colStatRight.distinctCount) + + // determine the overlapping degree between predicate range and column's range + val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + case _: EqualTo => + ((maxLeft < minRight) || (maxRight < minLeft), + (minLeft == minRight) && (maxLeft == maxRight)) + case _: LessThan => + (minLeft >= maxRight, maxLeft <= minRight) + case _: LessThanOrEqual => + (minLeft >= maxRight, maxLeft <= minRight) + case _: GreaterThan => + (maxLeft <= minRight, minLeft >= maxRight) + case _: GreaterThanOrEqual => + (maxLeft < minRight, minLeft > maxRight) + } + + var percent = BigDecimal(1.0) + if (noOverlap) { + percent = 0.0 + } else if (completeOverlap) { + percent = 1.0 + } else { + // For partial overlap, we use an empirical value 1/3 as suggested by the book + // "Database Systems, the complete book". + percent = 1.0/3.0 + + if (update) { + // Need to adjust new min/max after the filter condition is applied + + val ndv = BigDecimal(colStatLeft.distinctCount) + var newNdv = (ndv * percent).setScale(0, RoundingMode.HALF_UP).toBigInt() + if (newNdv < 1) newNdv = 1 + var newMaxLeft = colStatLeft.max + var newMinLeft = colStatLeft.min + var newMaxRight = colStatRight.max + var newMinRight = colStatRight.min + + op match { + case _: EqualTo => + // need to set new min to the larger min value, and + // set the new max to the smaller max value. + if (minLeft < minRight) newMinLeft = colStatRight.min + else newMinRight = colStatLeft.min + if (maxLeft < maxRight) newMaxRight = colStatLeft.max + else newMaxLeft = colStatRight.max + + case _: GreaterThan | _: GreaterThanOrEqual => + // the left side should be greater than the right side. + // If not, we need to adjust it to narrow the range. + if (minLeft < minRight) newMinLeft = colStatRight.min + if (maxLeft < maxRight) newMaxRight = colStatLeft.max + + case _: LessThan | _: LessThanOrEqual => + // the left side should be less than the right side. + // If not, we need to adjust it to narrow the range. + if (minLeft > minRight) newMinRight = colStatLeft.min + if (maxLeft > maxRight) newMaxLeft = colStatRight.max + } + + val newStatsLeft = colStatLeft.copy(distinctCount = newNdv, min = newMinLeft, + max = newMaxLeft, nullCount = 0) + colStatsMap(attrLeft) = newStatsLeft + val newStatsRight = colStatRight.copy(distinctCount = newNdv, min = newMinRight, + max = newMaxRight, nullCount = 0) + colStatsMap(attrRight) = newStatsRight + } + } + + Some(percent.toDouble) + } + } + class ColumnStatsMap { private val baseMap: mutable.Map[ExprId, (Attribute, ColumnStat)] = mutable.HashMap.empty diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala index 1966c96c05294..d04d9ac1aad3f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala @@ -33,49 +33,66 @@ import org.apache.spark.sql.types._ class FilterEstimationSuite extends StatsEstimationTestBase { // Suppose our test table has 10 rows and 6 columns. - // First column cint has values: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 + // column cint has values: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 // Hence, distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, maxLen:4 val attrInt = AttributeReference("cint", IntegerType)() val colStatInt = ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), nullCount = 0, avgLen = 4, maxLen = 4) - // only 2 values + // column cbool has only 2 distinct values val attrBool = AttributeReference("cbool", BooleanType)() val colStatBool = ColumnStat(distinctCount = 2, min = Some(false), max = Some(true), nullCount = 0, avgLen = 1, maxLen = 1) - // Second column cdate has 10 values from 2017-01-01 through 2017-01-10. + // column cdate has 10 values from 2017-01-01 through 2017-01-10. val dMin = Date.valueOf("2017-01-01") val dMax = Date.valueOf("2017-01-10") val attrDate = AttributeReference("cdate", DateType)() val colStatDate = ColumnStat(distinctCount = 10, min = Some(dMin), max = Some(dMax), nullCount = 0, avgLen = 4, maxLen = 4) - // Fourth column cdecimal has 4 values from 0.20 through 0.80 at increment of 0.20. + // column cdecimal has 4 values from 0.20 through 0.80 at increment of 0.20. val decMin = new java.math.BigDecimal("0.200000000000000000") val decMax = new java.math.BigDecimal("0.800000000000000000") val attrDecimal = AttributeReference("cdecimal", DecimalType(18, 18))() val colStatDecimal = ColumnStat(distinctCount = 4, min = Some(decMin), max = Some(decMax), nullCount = 0, avgLen = 8, maxLen = 8) - // Fifth column cdouble has 10 double values: 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0 + // column cdouble has 10 double values: 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0 val attrDouble = AttributeReference("cdouble", DoubleType)() val colStatDouble = ColumnStat(distinctCount = 10, min = Some(1.0), max = Some(10.0), nullCount = 0, avgLen = 8, maxLen = 8) - // Sixth column cstring has 10 String values: + // column cstring has 10 String values: // "A0", "A1", "A2", "A3", "A4", "A5", "A6", "A7", "A8", "A9" val attrString = AttributeReference("cstring", StringType)() val colStatString = ColumnStat(distinctCount = 10, min = None, max = None, nullCount = 0, avgLen = 2, maxLen = 2) + // column cint2 has values: 7, 8, 9, 10, 11, 12, 13, 14, 15, 16 + // Hence, distinctCount:10, min:7, max:16, nullCount:0, avgLen:4, maxLen:4 + // This column is created to test "cint < cint2 + val attrInt2 = AttributeReference("cint2", IntegerType)() + val colStatInt2 = ColumnStat(distinctCount = 10, min = Some(7), max = Some(16), + nullCount = 0, avgLen = 4, maxLen = 4) + + // column cint3 has values: 30, 31, 32, 33, 34, 35, 36, 37, 38, 39 + // Hence, distinctCount:10, min:30, max:39, nullCount:0, avgLen:4, maxLen:4 + // This column is created to test "cint = cint3 without overlap at all. + val attrInt3 = AttributeReference("cint3", IntegerType)() + val colStatInt3 = ColumnStat(distinctCount = 10, min = Some(30), max = Some(39), + nullCount = 0, avgLen = 4, maxLen = 4) + val attributeMap = AttributeMap(Seq( attrInt -> colStatInt, attrBool -> colStatBool, attrDate -> colStatDate, attrDecimal -> colStatDecimal, attrDouble -> colStatDouble, - attrString -> colStatString)) + attrString -> colStatString, + attrInt2 -> colStatInt2, + attrInt3 -> colStatInt3 + )) test("true") { validateEstimatedStats( @@ -450,6 +467,69 @@ class FilterEstimationSuite extends StatsEstimationTestBase { } } + test("cint = cint2") { + validateEstimatedStats( + Filter(EqualTo(attrInt, attrInt2), childStatsTestPlan(Seq(attrInt, attrInt2), 10L)), + Seq(attrInt -> ColumnStat(distinctCount = 3, min = Some(7), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), + attrInt2 -> ColumnStat(distinctCount = 3, min = Some(7), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4)), + expectedRowCount = 4) + } + + test("cint > cint2") { + validateEstimatedStats( + Filter(GreaterThan(attrInt, attrInt2), childStatsTestPlan(Seq(attrInt, attrInt2), 10L)), + Seq(attrInt -> ColumnStat(distinctCount = 3, min = Some(7), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), + attrInt2 -> ColumnStat(distinctCount = 3, min = Some(7), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4)), + expectedRowCount = 4) + } + + test("cint < cint2") { + validateEstimatedStats( + Filter(LessThan(attrInt, attrInt2), childStatsTestPlan(Seq(attrInt, attrInt2), 10L)), + Seq(attrInt -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), + attrInt2 -> ColumnStat(distinctCount = 3, min = Some(7), max = Some(16), + nullCount = 0, avgLen = 4, maxLen = 4)), + expectedRowCount = 4) + } + + test("cint = cint3") { + // no records qualify due to no overlap + validateEstimatedStats( + Filter(EqualTo(attrInt, attrInt3), childStatsTestPlan(Seq(attrInt, attrInt3), 10L)), + Seq(attrInt -> ColumnStat(distinctCount = 0, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), + attrInt3 -> ColumnStat(distinctCount = 0, min = Some(30), max = Some(39), + nullCount = 0, avgLen = 4, maxLen = 4)), + expectedRowCount = 0) + } + + test("cint < cint3") { + // all table records qualify. + validateEstimatedStats( + Filter(LessThan(attrInt, attrInt3), childStatsTestPlan(Seq(attrInt, attrInt3), 10L)), + Seq(attrInt -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), + attrInt3 -> ColumnStat(distinctCount = 10, min = Some(30), max = Some(39), + nullCount = 0, avgLen = 4, maxLen = 4)), + expectedRowCount = 10) + } + + test("cint > cint3") { + // no records qualify due to no overlap + validateEstimatedStats( + Filter(GreaterThan(attrInt, attrInt3), childStatsTestPlan(Seq(attrInt, attrInt3), 10L)), + Seq(attrInt -> ColumnStat(distinctCount = 0, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), + attrInt3 -> ColumnStat(distinctCount = 4, min = Some(30), max = Some(39), + nullCount = 0, avgLen = 4, maxLen = 4)), + expectedRowCount = 0) + } + private def childStatsTestPlan(outList: Seq[Attribute], tableRowCount: BigInt): StatsTestPlan = { StatsTestPlan( outputList = outList, @@ -491,7 +571,22 @@ class FilterEstimationSuite extends StatsEstimationTestBase { sizeInBytes = getOutputSize(filter.output, expectedRowCount, expectedAttributeMap), rowCount = Some(expectedRowCount), attributeStats = expectedAttributeMap) - assert(filter.stats(conf) == expectedStats) + + val filterStats = filter.stats(conf) + assert(filterStats.sizeInBytes == expectedStats.sizeInBytes) + assert(filterStats.rowCount == expectedStats.rowCount) + val rowCountValue = filterStats.rowCount.getOrElse(0) + // check the output column stats if the row count is > 0. + // When row count is 0, the output is set to empty. + if (rowCountValue != 0) { + // Need to check attributeStats one by one because we may have multiple output columns. + // Due to update operation, the output columns may be in different order. + expectedColStats.foreach { kv => + val filterColumnStat = filterStats.attributeStats.get(kv._1).get + assert(filterColumnStat == kv._2) + } + } + // assert(filter.stats(conf) == expectedStats) } } } From 150108ea52b3a921e686cd0babb9e1a7fb5a1a5a Mon Sep 17 00:00:00 2001 From: Ron Hu Date: Mon, 27 Mar 2017 17:03:41 -0700 Subject: [PATCH 02/11] revise ndv for both tables in predicate --- .../logical/statsEstimation/FilterEstimation.scala | 14 +++++++++----- .../statsEstimation/FilterEstimationSuite.scala | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index ab6bbb2cc0464..31d92c554263c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -645,9 +645,13 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo if (update) { // Need to adjust new min/max after the filter condition is applied - val ndv = BigDecimal(colStatLeft.distinctCount) - var newNdv = (ndv * percent).setScale(0, RoundingMode.HALF_UP).toBigInt() - if (newNdv < 1) newNdv = 1 + val ndvLeft = BigDecimal(colStatLeft.distinctCount) + var newNdvLeft = (ndvLeft * percent).setScale(0, RoundingMode.HALF_UP).toBigInt() + if (newNdvLeft < 1) newNdvLeft = 1 + val ndvRight = BigDecimal(colStatLeft.distinctCount) + var newNdvRight = (ndvRight * percent).setScale(0, RoundingMode.HALF_UP).toBigInt() + if (newNdvRight < 1) newNdvRight = 1 + var newMaxLeft = colStatLeft.max var newMinLeft = colStatLeft.min var newMaxRight = colStatRight.max @@ -675,10 +679,10 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo if (maxLeft > maxRight) newMaxLeft = colStatRight.max } - val newStatsLeft = colStatLeft.copy(distinctCount = newNdv, min = newMinLeft, + val newStatsLeft = colStatLeft.copy(distinctCount = newNdvLeft, min = newMinLeft, max = newMaxLeft, nullCount = 0) colStatsMap(attrLeft) = newStatsLeft - val newStatsRight = colStatRight.copy(distinctCount = newNdv, min = newMinRight, + val newStatsRight = colStatRight.copy(distinctCount = newNdvRight, min = newMinRight, max = newMaxRight, nullCount = 0) colStatsMap(attrRight) = newStatsRight } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala index d04d9ac1aad3f..9a6affec152b1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala @@ -586,7 +586,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase { assert(filterColumnStat == kv._2) } } - // assert(filter.stats(conf) == expectedStats) } } + } From 9bfd35ddffba869fa050a2d44e3443288878ff2c Mon Sep 17 00:00:00 2001 From: Ron Hu Date: Tue, 28 Mar 2017 19:48:06 -0700 Subject: [PATCH 03/11] handle EqualNullSafe separately --- .../plans/logical/statsEstimation/FilterEstimation.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index 31d92c554263c..84c8555b2bebc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -630,6 +630,9 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo (maxLeft <= minRight, minLeft >= maxRight) case _: GreaterThanOrEqual => (maxLeft < minRight, minLeft > maxRight) + case _: EqualNullSafe => + // For null-safe equality, we set it to partial overlap + (false, false) } var percent = BigDecimal(1.0) @@ -680,10 +683,10 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo } val newStatsLeft = colStatLeft.copy(distinctCount = newNdvLeft, min = newMinLeft, - max = newMaxLeft, nullCount = 0) + max = newMaxLeft) colStatsMap(attrLeft) = newStatsLeft val newStatsRight = colStatRight.copy(distinctCount = newNdvRight, min = newMinRight, - max = newMaxRight, nullCount = 0) + max = newMaxRight) colStatsMap(attrRight) = newStatsRight } } From 426c8f376a552a46c91c6270e6e1572e71a4d92a Mon Sep 17 00:00:00 2001 From: Ron Hu Date: Wed, 29 Mar 2017 11:56:07 -0700 Subject: [PATCH 04/11] revise boundary conditions in evaluating range overlap --- .../statsEstimation/FilterEstimation.scala | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index 84c8555b2bebc..e5702727ca2d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -575,7 +575,7 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo * @param op a binary comparison operator such as =, <, <=, >, >= * @param attrLeft the left Attribute (or a column) * @param attrRight the right Attribute (or a column) - * @param update a boolean flag to specify if we need to update ColumnStat of a given column + * @param update a boolean flag to specify if we need to update ColumnStat of the given columns * for subsequent conditions * @return an optional double value to show the percentage of rows meeting a given condition */ @@ -619,20 +619,25 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo // determine the overlapping degree between predicate range and column's range val (noOverlap: Boolean, completeOverlap: Boolean) = op match { - case _: EqualTo => - ((maxLeft < minRight) || (maxRight < minLeft), - (minLeft == minRight) && (maxLeft == maxRight)) case _: LessThan => - (minLeft >= maxRight, maxLeft <= minRight) + (minLeft >= maxRight, maxLeft < minRight) case _: LessThanOrEqual => - (minLeft >= maxRight, maxLeft <= minRight) + (minLeft > maxRight, maxLeft <= minRight) case _: GreaterThan => - (maxLeft <= minRight, minLeft >= maxRight) + (maxLeft <= minRight, minLeft > maxRight) case _: GreaterThanOrEqual => - (maxLeft < minRight, minLeft > maxRight) + (maxLeft < minRight, minLeft >= maxRight) + case _: EqualTo => + ((maxLeft < minRight) || (maxRight < minLeft), + (minLeft == minRight) && (maxLeft == maxRight)) case _: EqualNullSafe => - // For null-safe equality, we set it to partial overlap - (false, false) + // For null-safe equality, we use a very restrictive condition to evaluate its overlap. + // If null values exists, we set it to partial overlap. + (((maxLeft < minRight) || (maxRight < minLeft)) + && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0, + ((minLeft == minRight) && (maxLeft == maxRight)) + && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0 + ) } var percent = BigDecimal(1.0) From 8faf7901e727a911105413b45124f974b35d0521 Mon Sep 17 00:00:00 2001 From: Ron Hu Date: Wed, 29 Mar 2017 16:35:33 -0700 Subject: [PATCH 05/11] handled EqualNullSafe if stats update is needed --- .../statsEstimation/FilterEstimation.scala | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index e5702727ca2d3..7c6c0ea913d18 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -666,13 +666,11 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo var newMinRight = colStatRight.min op match { - case _: EqualTo => - // need to set new min to the larger min value, and - // set the new max to the smaller max value. - if (minLeft < minRight) newMinLeft = colStatRight.min - else newMinRight = colStatLeft.min - if (maxLeft < maxRight) newMaxRight = colStatLeft.max - else newMaxLeft = colStatRight.max + case _: LessThan | _: LessThanOrEqual => + // the left side should be less than the right side. + // If not, we need to adjust it to narrow the range. + if (minLeft > minRight) newMinRight = colStatLeft.min + if (maxLeft > maxRight) newMaxLeft = colStatRight.max case _: GreaterThan | _: GreaterThanOrEqual => // the left side should be greater than the right side. @@ -680,11 +678,13 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo if (minLeft < minRight) newMinLeft = colStatRight.min if (maxLeft < maxRight) newMaxRight = colStatLeft.max - case _: LessThan | _: LessThanOrEqual => - // the left side should be less than the right side. - // If not, we need to adjust it to narrow the range. - if (minLeft > minRight) newMinRight = colStatLeft.min - if (maxLeft > maxRight) newMaxLeft = colStatRight.max + case _: EqualTo | _: EqualNullSafe => + // need to set new min to the larger min value, and + // set the new max to the smaller max value. + if (minLeft < minRight) newMinLeft = colStatRight.min + else newMinRight = colStatLeft.min + if (maxLeft < maxRight) newMaxRight = colStatLeft.max + else newMaxLeft = colStatRight.max } val newStatsLeft = colStatLeft.copy(distinctCount = newNdvLeft, min = newMinLeft, @@ -701,7 +701,6 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo } - class ColumnStatsMap { private val baseMap: mutable.Map[ExprId, (Attribute, ColumnStat)] = mutable.HashMap.empty From e2699e4f3d63ce2a1284de2f5c48820bd4682fc8 Mon Sep 17 00:00:00 2001 From: Ron Hu Date: Thu, 30 Mar 2017 17:30:39 -0700 Subject: [PATCH 06/11] set expectedColStats to Nil when there is no overlap --- .../statsEstimation/FilterEstimationSuite.scala | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala index 9a6affec152b1..a48fbbf7d97e7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala @@ -499,12 +499,10 @@ class FilterEstimationSuite extends StatsEstimationTestBase { test("cint = cint3") { // no records qualify due to no overlap + val emptyColStats = Seq[(Attribute, ColumnStat)]() validateEstimatedStats( Filter(EqualTo(attrInt, attrInt3), childStatsTestPlan(Seq(attrInt, attrInt3), 10L)), - Seq(attrInt -> ColumnStat(distinctCount = 0, min = Some(1), max = Some(10), - nullCount = 0, avgLen = 4, maxLen = 4), - attrInt3 -> ColumnStat(distinctCount = 0, min = Some(30), max = Some(39), - nullCount = 0, avgLen = 4, maxLen = 4)), + Nil, // set to empty expectedRowCount = 0) } @@ -523,10 +521,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase { // no records qualify due to no overlap validateEstimatedStats( Filter(GreaterThan(attrInt, attrInt3), childStatsTestPlan(Seq(attrInt, attrInt3), 10L)), - Seq(attrInt -> ColumnStat(distinctCount = 0, min = Some(1), max = Some(10), - nullCount = 0, avgLen = 4, maxLen = 4), - attrInt3 -> ColumnStat(distinctCount = 4, min = Some(30), max = Some(39), - nullCount = 0, avgLen = 4, maxLen = 4)), + Nil, // set to empty expectedRowCount = 0) } From b760bf8c9d0c72b131664d1a1dc3f44fc5031376 Mon Sep 17 00:00:00 2001 From: Ron Hu Date: Fri, 31 Mar 2017 18:06:24 -0700 Subject: [PATCH 07/11] added a couple of complete-overlap test cases --- .../statsEstimation/FilterEstimation.scala | 96 +++++++++++++++++-- .../FilterEstimationSuite.scala | 35 ++++++- 2 files changed, 122 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index 7c6c0ea913d18..f44f5ff6f925c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -572,7 +572,7 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo * Note that, if column-1 and column-2 belong to different tables, then it is a join * operator's work, NOT a filter operator's work. * - * @param op a binary comparison operator such as =, <, <=, >, >= + * @param op a binary comparison operator, including =, <=>, <, <=, >, >= * @param attrLeft the left Attribute (or a column) * @param attrRight the right Attribute (or a column) * @param update a boolean flag to specify if we need to update ColumnStat of the given columns @@ -608,25 +608,49 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo .asInstanceOf[NumericRange] val maxLeft = BigDecimal(statsRangeLeft.max) val minLeft = BigDecimal(statsRangeLeft.min) - val ndvLeft = BigDecimal(colStatLeft.distinctCount) val colStatRight = colStatsMap(attrRight) val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) .asInstanceOf[NumericRange] val maxRight = BigDecimal(statsRangeRight.max) val minRight = BigDecimal(statsRangeRight.min) - val ndvRight = BigDecimal(colStatRight.distinctCount) // determine the overlapping degree between predicate range and column's range val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + // Left < Right or Left <= Right + // - no overlap: + // minRight maxRight minLeft maxLeft + // 0 ------+------------------+------------+-------------+-------> + // - complete overlap: + // minLeft maxLeft minRight maxRight + // 0 ------+------------------+------------+-------------+-------> case _: LessThan => (minLeft >= maxRight, maxLeft < minRight) case _: LessThanOrEqual => (minLeft > maxRight, maxLeft <= minRight) + + // Left > Right or Left >= Right + // - no overlap: + // minLeft maxLeft minRight maxRight + // 0 ------+------------------+------------+-------------+-------> + // - complete overlap: + // minRight maxRight minLeft maxLeft + // 0 ------+------------------+------------+-------------+-------> case _: GreaterThan => (maxLeft <= minRight, minLeft > maxRight) case _: GreaterThanOrEqual => (maxLeft < minRight, minLeft >= maxRight) + + // Left = Right or Left <=> Right + // - no overlap: + // minLeft maxLeft minRight maxRight + // 0 ------+------------------+------------+-------------+-------> + // minRight maxRight minLeft maxLeft + // 0 ------+------------------+------------+-------------+-------> + // - complete overlap: + // minLeft maxLeft + // minRight maxRight + // 0 ------+------------------+-------> case _: EqualTo => ((maxLeft < minRight) || (maxRight < minLeft), (minLeft == minRight) && (maxLeft == maxRight)) @@ -648,7 +672,7 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo } else { // For partial overlap, we use an empirical value 1/3 as suggested by the book // "Database Systems, the complete book". - percent = 1.0/3.0 + percent = 1.0 / 3.0 if (update) { // Need to adjust new min/max after the filter condition is applied @@ -669,22 +693,78 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo case _: LessThan | _: LessThanOrEqual => // the left side should be less than the right side. // If not, we need to adjust it to narrow the range. + // Left < Right or Left <= Right + // minRight < minLeft + // 0 ------+******************+-------> + // filtered ^ + // | + // newMinRight + // + // maxRight < maxLeft + // 0 ------+******************+-------> + // ^ filtered + // | + // newMaxLeft if (minLeft > minRight) newMinRight = colStatLeft.min if (maxLeft > maxRight) newMaxLeft = colStatRight.max case _: GreaterThan | _: GreaterThanOrEqual => // the left side should be greater than the right side. // If not, we need to adjust it to narrow the range. + // Left > Right or Left >= Right + // minLeft < minRight + // 0 ------+******************+-------> + // filtered ^ + // | + // newMinLeft + // + // maxLeft < maxRight + // 0 ------+******************+-------> + // ^ filtered + // | + // newMaxRight if (minLeft < minRight) newMinLeft = colStatRight.min if (maxLeft < maxRight) newMaxRight = colStatLeft.max case _: EqualTo | _: EqualNullSafe => // need to set new min to the larger min value, and // set the new max to the smaller max value. - if (minLeft < minRight) newMinLeft = colStatRight.min - else newMinRight = colStatLeft.min - if (maxLeft < maxRight) newMaxRight = colStatLeft.max - else newMaxLeft = colStatRight.max + // Left = Right or Left <=> Right + // minLeft < minRight + // 0 ------+******************+-------> + // filtered ^ + // | + // newMinLeft + // + // minRight <= minLeft + // 0 ------+******************+-------> + // filtered ^ + // | + // newMinRight + // + // maxLeft < maxRight + // 0 ------+******************+-------> + // ^ filtered + // | + // newMaxRight + // + // maxRight <= maxLeft + // 0 ------+******************+-------> + // ^ filtered + // | + // newMaxLeft + + + if (minLeft < minRight) { + newMinLeft = colStatRight.min + } else { + newMinRight = colStatLeft.min + } + if (maxLeft < maxRight) { + newMaxRight = colStatLeft.max + } else { + newMaxLeft = colStatRight.max + } } val newStatsLeft = colStatLeft.copy(distinctCount = newNdvLeft, min = newMinLeft, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala index a48fbbf7d97e7..338e6971eb402 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala @@ -83,6 +83,13 @@ class FilterEstimationSuite extends StatsEstimationTestBase { val colStatInt3 = ColumnStat(distinctCount = 10, min = Some(30), max = Some(39), nullCount = 0, avgLen = 4, maxLen = 4) + // column cint4 has values in the range from 1 to 10 + // distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, maxLen:4 + // This column is created to test complete overlap + val attrInt4 = AttributeReference("cint4", IntegerType)() + val colStatInt4 = ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4) + val attributeMap = AttributeMap(Seq( attrInt -> colStatInt, attrBool -> colStatBool, @@ -91,7 +98,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase { attrDouble -> colStatDouble, attrString -> colStatString, attrInt2 -> colStatInt2, - attrInt3 -> colStatInt3 + attrInt3 -> colStatInt3, + attrInt4 -> colStatInt4 )) test("true") { @@ -468,6 +476,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase { } test("cint = cint2") { + // partial overlap case validateEstimatedStats( Filter(EqualTo(attrInt, attrInt2), childStatsTestPlan(Seq(attrInt, attrInt2), 10L)), Seq(attrInt -> ColumnStat(distinctCount = 3, min = Some(7), max = Some(10), @@ -478,6 +487,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase { } test("cint > cint2") { + // partial overlap case validateEstimatedStats( Filter(GreaterThan(attrInt, attrInt2), childStatsTestPlan(Seq(attrInt, attrInt2), 10L)), Seq(attrInt -> ColumnStat(distinctCount = 3, min = Some(7), max = Some(10), @@ -488,6 +498,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase { } test("cint < cint2") { + // partial overlap case validateEstimatedStats( Filter(LessThan(attrInt, attrInt2), childStatsTestPlan(Seq(attrInt, attrInt2), 10L)), Seq(attrInt -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(10), @@ -497,6 +508,28 @@ class FilterEstimationSuite extends StatsEstimationTestBase { expectedRowCount = 4) } + test("cint = cint4") { + // complete overlap case + validateEstimatedStats( + Filter(EqualTo(attrInt, attrInt4), childStatsTestPlan(Seq(attrInt, attrInt4), 10L)), + Seq(attrInt -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), + attrInt4 -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4)), + expectedRowCount = 10) + } + + test("cint < cint4") { + // partial overlap case + validateEstimatedStats( + Filter(LessThan(attrInt, attrInt4), childStatsTestPlan(Seq(attrInt, attrInt4), 10L)), + Seq(attrInt -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), + attrInt4 -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4)), + expectedRowCount = 4) + } + test("cint = cint3") { // no records qualify due to no overlap val emptyColStats = Seq[(Attribute, ColumnStat)]() From 64d796e32a31bebf85873d43fd3e87a088c975ef Mon Sep 17 00:00:00 2001 From: Ron Hu Date: Sat, 1 Apr 2017 13:48:57 -0700 Subject: [PATCH 08/11] need to consider null for complete overlap case --- .../statsEstimation/FilterEstimation.scala | 49 ++++++++++--------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index f44f5ff6f925c..2593ebb9cfd0a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -620,40 +620,45 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo // Left < Right or Left <= Right // - no overlap: // minRight maxRight minLeft maxLeft - // 0 ------+------------------+------------+-------------+-------> - // - complete overlap: + // --------+------------------+------------+-------------+-------> + // - complete overlap: (If null values exists, we set it to partial overlap.) // minLeft maxLeft minRight maxRight - // 0 ------+------------------+------------+-------------+-------> + // --------+------------------+------------+-------------+-------> case _: LessThan => - (minLeft >= maxRight, maxLeft < minRight) + (minLeft >= maxRight, + maxLeft < minRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) case _: LessThanOrEqual => - (minLeft > maxRight, maxLeft <= minRight) + (minLeft > maxRight, + maxLeft <= minRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) // Left > Right or Left >= Right // - no overlap: // minLeft maxLeft minRight maxRight - // 0 ------+------------------+------------+-------------+-------> - // - complete overlap: + // --------+------------------+------------+-------------+-------> + // - complete overlap: (If null values exists, we set it to partial overlap.) // minRight maxRight minLeft maxLeft - // 0 ------+------------------+------------+-------------+-------> + // --------+------------------+------------+-------------+-------> case _: GreaterThan => - (maxLeft <= minRight, minLeft > maxRight) + (maxLeft <= minRight, + minLeft > maxRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) case _: GreaterThanOrEqual => - (maxLeft < minRight, minLeft >= maxRight) + (maxLeft < minRight, + minLeft >= maxRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) // Left = Right or Left <=> Right // - no overlap: // minLeft maxLeft minRight maxRight - // 0 ------+------------------+------------+-------------+-------> + // --------+------------------+------------+-------------+-------> // minRight maxRight minLeft maxLeft - // 0 ------+------------------+------------+-------------+-------> + // --------+------------------+------------+-------------+-------> // - complete overlap: // minLeft maxLeft // minRight maxRight - // 0 ------+------------------+-------> + // --------+------------------+-------> case _: EqualTo => ((maxLeft < minRight) || (maxRight < minLeft), - (minLeft == minRight) && (maxLeft == maxRight)) + (minLeft == minRight) && (maxLeft == maxRight) && colStatLeft.nullCount == 0 + && colStatRight.nullCount == 0) case _: EqualNullSafe => // For null-safe equality, we use a very restrictive condition to evaluate its overlap. // If null values exists, we set it to partial overlap. @@ -695,13 +700,13 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo // If not, we need to adjust it to narrow the range. // Left < Right or Left <= Right // minRight < minLeft - // 0 ------+******************+-------> + // --------+******************+-------> // filtered ^ // | // newMinRight // // maxRight < maxLeft - // 0 ------+******************+-------> + // --------+******************+-------> // ^ filtered // | // newMaxLeft @@ -713,13 +718,13 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo // If not, we need to adjust it to narrow the range. // Left > Right or Left >= Right // minLeft < minRight - // 0 ------+******************+-------> + // --------+******************+-------> // filtered ^ // | // newMinLeft // // maxLeft < maxRight - // 0 ------+******************+-------> + // --------+******************+-------> // ^ filtered // | // newMaxRight @@ -731,25 +736,25 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo // set the new max to the smaller max value. // Left = Right or Left <=> Right // minLeft < minRight - // 0 ------+******************+-------> + // --------+******************+-------> // filtered ^ // | // newMinLeft // // minRight <= minLeft - // 0 ------+******************+-------> + // --------+******************+-------> // filtered ^ // | // newMinRight // // maxLeft < maxRight - // 0 ------+******************+-------> + // --------+******************+-------> // ^ filtered // | // newMaxRight // // maxRight <= maxLeft - // 0 ------+******************+-------> + // --------+******************+-------> // ^ filtered // | // newMaxLeft From 8bc1be23f7136d071ae887c48f8db715807d374a Mon Sep 17 00:00:00 2001 From: Ron Hu Date: Sun, 2 Apr 2017 16:24:32 -0700 Subject: [PATCH 09/11] use allNotNull Boolean condition --- .../statsEstimation/FilterEstimation.scala | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index 2593ebb9cfd0a..0181fb89aa470 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -616,6 +616,7 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo val minRight = BigDecimal(statsRangeRight.min) // determine the overlapping degree between predicate range and column's range + val allNotNull = (colStatLeft.nullCount == 0) && (colStatRight.nullCount == 0) val (noOverlap: Boolean, completeOverlap: Boolean) = op match { // Left < Right or Left <= Right // - no overlap: @@ -625,11 +626,9 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo // minLeft maxLeft minRight maxRight // --------+------------------+------------+-------------+-------> case _: LessThan => - (minLeft >= maxRight, - maxLeft < minRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) + (minLeft >= maxRight, (maxLeft < minRight) && allNotNull) case _: LessThanOrEqual => - (minLeft > maxRight, - maxLeft <= minRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) + (minLeft > maxRight, (maxLeft <= minRight) && allNotNull) // Left > Right or Left >= Right // - no overlap: @@ -639,11 +638,9 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo // minRight maxRight minLeft maxLeft // --------+------------------+------------+-------------+-------> case _: GreaterThan => - (maxLeft <= minRight, - minLeft > maxRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) + (maxLeft <= minRight, (minLeft > maxRight) && allNotNull) case _: GreaterThanOrEqual => - (maxLeft < minRight, - minLeft >= maxRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) + (maxLeft < minRight, (minLeft >= maxRight) && allNotNull) // Left = Right or Left <=> Right // - no overlap: @@ -657,15 +654,13 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo // --------+------------------+-------> case _: EqualTo => ((maxLeft < minRight) || (maxRight < minLeft), - (minLeft == minRight) && (maxLeft == maxRight) && colStatLeft.nullCount == 0 - && colStatRight.nullCount == 0) + (minLeft == minRight) && (maxLeft == maxRight) && allNotNull + ) case _: EqualNullSafe => // For null-safe equality, we use a very restrictive condition to evaluate its overlap. // If null values exists, we set it to partial overlap. - (((maxLeft < minRight) || (maxRight < minLeft)) - && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0, - ((minLeft == minRight) && (maxLeft == maxRight)) - && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0 + (((maxLeft < minRight) || (maxRight < minLeft)) && allNotNull, + (minLeft == minRight) && (maxLeft == maxRight) && allNotNull ) } From 881096c090944e1559616f744d67411f57d984b1 Mon Sep 17 00:00:00 2001 From: "U-CHINA\\r00754707" Date: Sun, 2 Apr 2017 21:56:02 -0700 Subject: [PATCH 10/11] added condition to check distinctCount for EqualTo operator --- .../plans/logical/statsEstimation/FilterEstimation.scala | 3 ++- .../sql/catalyst/statsEstimation/FilterEstimationSuite.scala | 0 2 files changed, 2 insertions(+), 1 deletion(-) mode change 100644 => 100755 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala mode change 100644 => 100755 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala old mode 100644 new mode 100755 index 0181fb89aa470..7f6f9cbb77f03 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -655,6 +655,7 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo case _: EqualTo => ((maxLeft < minRight) || (maxRight < minLeft), (minLeft == minRight) && (maxLeft == maxRight) && allNotNull + && (colStatLeft.distinctCount == colStatRight.distinctCount) ) case _: EqualNullSafe => // For null-safe equality, we use a very restrictive condition to evaluate its overlap. @@ -680,7 +681,7 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo val ndvLeft = BigDecimal(colStatLeft.distinctCount) var newNdvLeft = (ndvLeft * percent).setScale(0, RoundingMode.HALF_UP).toBigInt() if (newNdvLeft < 1) newNdvLeft = 1 - val ndvRight = BigDecimal(colStatLeft.distinctCount) + val ndvRight = BigDecimal(colStatRight.distinctCount) var newNdvRight = (ndvRight * percent).setScale(0, RoundingMode.HALF_UP).toBigInt() if (newNdvRight < 1) newNdvRight = 1 diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala old mode 100644 new mode 100755 From 3f3d30d2f943f79cee8e5ab4ddf642f16104627a Mon Sep 17 00:00:00 2001 From: Ron Hu Date: Mon, 3 Apr 2017 10:34:37 -0700 Subject: [PATCH 11/11] check if expectedColStats.size == filterStats.attributeStats.size --- .../statsEstimation/FilterEstimation.scala | 21 +++++++++---------- .../FilterEstimationSuite.scala | 1 + 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index 7f6f9cbb77f03..03c76cd41d816 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -662,6 +662,7 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo // If null values exists, we set it to partial overlap. (((maxLeft < minRight) || (maxRight < minLeft)) && allNotNull, (minLeft == minRight) && (maxLeft == maxRight) && allNotNull + && (colStatLeft.distinctCount == colStatRight.distinctCount) ) } @@ -754,18 +755,16 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo // ^ filtered // | // newMaxLeft - - if (minLeft < minRight) { - newMinLeft = colStatRight.min - } else { - newMinRight = colStatLeft.min - } - if (maxLeft < maxRight) { - newMaxRight = colStatLeft.max - } else { - newMaxLeft = colStatRight.max - } + newMinLeft = colStatRight.min + } else { + newMinRight = colStatLeft.min + } + if (maxLeft < maxRight) { + newMaxRight = colStatLeft.max + } else { + newMaxLeft = colStatRight.max + } } val newStatsLeft = colStatLeft.copy(distinctCount = newNdvLeft, min = newMinLeft, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala index 338e6971eb402..cffb0d8739287 100755 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala @@ -609,6 +609,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase { if (rowCountValue != 0) { // Need to check attributeStats one by one because we may have multiple output columns. // Due to update operation, the output columns may be in different order. + assert(expectedColStats.size == filterStats.attributeStats.size) expectedColStats.foreach { kv => val filterColumnStat = filterStats.attributeStats.get(kv._1).get assert(filterColumnStat == kv._2)