From 297395effef8df279f11545d803b051cc2234c6e Mon Sep 17 00:00:00 2001 From: Mykhailo Shtelma Date: Mon, 26 Mar 2018 15:09:39 +0200 Subject: [PATCH 1/6] During evaluation of IN conditions, if the source table is empty, division by zero can occur. In order to fix this, check was added. --- .../plans/logical/statsEstimation/FilterEstimation.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index 0538c9d88584b..724aa1d5c8b65 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -427,7 +427,11 @@ case class FilterEstimation(plan: Filter) extends Logging { // return the filter selectivity. Without advanced statistics such as histograms, // we have to assume uniform distribution. - Some(math.min(newNdv.toDouble / ndv.toDouble, 1.0)) + if (ndv.toDouble != 0) { + Some(math.min(newNdv.toDouble / ndv.toDouble, 1.0)) + } else { + Some(0.0) + } } /** From d634ddaec88d0511334ec6c021255094f697b31d Mon Sep 17 00:00:00 2001 From: Mykhailo Shtelma Date: Tue, 3 Apr 2018 17:44:33 +0200 Subject: [PATCH 2/6] Added test case for the the following situation: During evaluation of IN conditions, if the source table is empty, division by zero can occur. In order to fix this, check was added. --- .../statsEstimation/FilterEstimationSuite.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala index 43440d51dede6..b1ca37195cc3a 100755 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala @@ -357,6 +357,17 @@ class FilterEstimationSuite extends StatsEstimationTestBase { expectedRowCount = 3) } + test("evaluateInSet with all zeros") { + validateEstimatedStats( + Filter(InSet(attrString, Set(3, 4, 5)), + StatsTestPlan(Seq(attrString), 10, + AttributeMap(Seq(attrString -> + ColumnStat(distinctCount = Some(0), min = Some(0), max = Some(0), + nullCount = Some(0), avgLen = Some(0), maxLen = Some(0)))))), + Seq(attrString -> ColumnStat(distinctCount = Some(0))), + expectedRowCount = 0) + } + test("cint NOT IN (3, 4, 5)") { validateEstimatedStats( Filter(Not(InSet(attrInt, Set(3, 4, 5))), childStatsTestPlan(Seq(attrInt), 10L)), From 74b6ebdc2cd8a91944cc6159946f560ba7212a6a Mon Sep 17 00:00:00 2001 From: Mykhailo Shtelma Date: Thu, 12 Apr 2018 14:00:55 +0200 Subject: [PATCH 3/6] If an empty dataframe (because of some conditions in parent query, which were not satisfied) is queried and CBO is turned on, wrong statistics is used, which leads to ClassCastException in FilterEstimation.evaluateInSet --- .../statsEstimation/FilterEstimation.scala | 37 ++++++------ .../apache/spark/sql/execution/CBOSuite.scala | 58 +++++++++++++++++++ 2 files changed, 77 insertions(+), 18 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/CBOSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index 724aa1d5c8b65..0f3c024b6220e 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -395,27 +395,28 @@ case class FilterEstimation(plan: Filter) extends Logging { // use [min, max] to filter the original hSet dataType match { case _: NumericType | BooleanType | DateType | TimestampType => - val statsInterval = - ValueInterval(colStat.min, colStat.max, dataType).asInstanceOf[NumericValueInterval] - val validQuerySet = hSet.filter { v => - v != null && statsInterval.contains(Literal(v, dataType)) - } + if (colStat.min.isDefined && colStat.max.isDefined) { + val statsInterval = + ValueInterval(colStat.min, colStat.max, dataType).asInstanceOf[NumericValueInterval] + val validQuerySet = hSet.filter { v => + v != null && statsInterval.contains(Literal(v, dataType)) + } - if (validQuerySet.isEmpty) { - return Some(0.0) - } + if (validQuerySet.isEmpty) { + return Some(0.0) + } - val newMax = validQuerySet.maxBy(EstimationUtils.toDouble(_, dataType)) - val newMin = validQuerySet.minBy(EstimationUtils.toDouble(_, dataType)) - // newNdv should not be greater than the old ndv. For example, column has only 2 values - // 1 and 6. The predicate column IN (1, 2, 3, 4, 5). validQuerySet.size is 5. - newNdv = ndv.min(BigInt(validQuerySet.size)) - if (update) { - val newStats = colStat.copy(distinctCount = Some(newNdv), min = Some(newMin), - max = Some(newMax), nullCount = Some(0)) - colStatsMap.update(attr, newStats) + val newMax = validQuerySet.maxBy(EstimationUtils.toDouble(_, dataType)) + val newMin = validQuerySet.minBy(EstimationUtils.toDouble(_, dataType)) + // newNdv should not be greater than the old ndv. For example, column has only 2 values + // 1 and 6. The predicate column IN (1, 2, 3, 4, 5). validQuerySet.size is 5. + newNdv = ndv.min(BigInt(validQuerySet.size)) + if (update) { + val newStats = colStat.copy(distinctCount = Some(newNdv), min = Some(newMin), + max = Some(newMax), nullCount = Some(0)) + colStatsMap.update(attr, newStats) + } } - // We assume the whole set since there is no min/max information for String/Binary type case StringType | BinaryType => newNdv = ndv.min(BigInt(hSet.size)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/CBOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/CBOSuite.scala new file mode 100644 index 0000000000000..bdc1610cb7c74 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/CBOSuite.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.spark.sql.{QueryTest, SaveMode} +import org.apache.spark.sql.test.SharedSparkSession + +class CBOSuite extends QueryTest with SharedSparkSession { + + import testImplicits._ + + test("Simple queries must be working, if CBO is turned on") { + withSQLConf(("spark.sql.cbo.enabled", "true")) { + withTable("TBL1", "TBL") { + import org.apache.spark.sql.functions._ + val df = spark.range(1000L).select('id, + 'id * 2 as "FLD1", + 'id * 12 as "FLD2", + lit("aaa") + 'id as "fld3") + df.write + .mode(SaveMode.Overwrite) + .bucketBy(10, "id", "FLD1", "FLD2") + .sortBy("id", "FLD1", "FLD2") + .saveAsTable("TBL") + spark.sql("ANALYZE TABLE TBL COMPUTE STATISTICS ") + spark.sql("ANALYZE TABLE TBL COMPUTE STATISTICS FOR COLUMNS ID, FLD1, FLD2, FLD3") + val df2 = spark.sql( + """ + SELECT t1.id, t1.fld1, t1.fld2, t1.fld3 + FROM tbl t1 + JOIN tbl t2 on t1.id=t2.id + WHERE t1.fld3 IN (-123.23,321.23) + """.stripMargin) + df2.createTempView("TBL2") + val df3 = spark.sql("SELECT * FROM tbl2 WHERE fld3 IN ('qqq', 'qwe') ") + assertResult(0, "") { + df3.count() + } + } + } + + } + +} From 0faa789a2e040c90c8add1ba93bd8618b1988d8a Mon Sep 17 00:00:00 2001 From: smikesh Date: Fri, 13 Apr 2018 17:06:25 +0200 Subject: [PATCH 4/6] 1)CBOSuite was moved to StatisticsCollectionSuite 2)Reduced number of changed lines in FilterEstomation.evaluateInSet --- .../statsEstimation/FilterEstimation.scala | 47 ++++++++------- .../FilterEstimationSuite.scala | 4 +- .../spark/sql/StatisticsCollectionSuite.scala | 30 ++++++++++ .../apache/spark/sql/execution/CBOSuite.scala | 58 ------------------- 4 files changed, 55 insertions(+), 84 deletions(-) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/CBOSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index 0f3c024b6220e..263c9ba60d145 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -392,31 +392,34 @@ case class FilterEstimation(plan: Filter) extends Logging { val dataType = attr.dataType var newNdv = ndv + if (ndv.toDouble == 0 || colStat.min.isEmpty || colStat.max.isEmpty) { + return Some(0.0) + } + // use [min, max] to filter the original hSet dataType match { case _: NumericType | BooleanType | DateType | TimestampType => - if (colStat.min.isDefined && colStat.max.isDefined) { - val statsInterval = - ValueInterval(colStat.min, colStat.max, dataType).asInstanceOf[NumericValueInterval] - val validQuerySet = hSet.filter { v => - v != null && statsInterval.contains(Literal(v, dataType)) - } + val statsInterval = + ValueInterval(colStat.min, colStat.max, dataType).asInstanceOf[NumericValueInterval] + val validQuerySet = hSet.filter { v => + v != null && statsInterval.contains(Literal(v, dataType)) + } - if (validQuerySet.isEmpty) { - return Some(0.0) - } + if (validQuerySet.isEmpty) { + return Some(0.0) + } - val newMax = validQuerySet.maxBy(EstimationUtils.toDouble(_, dataType)) - val newMin = validQuerySet.minBy(EstimationUtils.toDouble(_, dataType)) - // newNdv should not be greater than the old ndv. For example, column has only 2 values - // 1 and 6. The predicate column IN (1, 2, 3, 4, 5). validQuerySet.size is 5. - newNdv = ndv.min(BigInt(validQuerySet.size)) - if (update) { - val newStats = colStat.copy(distinctCount = Some(newNdv), min = Some(newMin), - max = Some(newMax), nullCount = Some(0)) - colStatsMap.update(attr, newStats) - } + val newMax = validQuerySet.maxBy(EstimationUtils.toDouble(_, dataType)) + val newMin = validQuerySet.minBy(EstimationUtils.toDouble(_, dataType)) + // newNdv should not be greater than the old ndv. For example, column has only 2 values + // 1 and 6. The predicate column IN (1, 2, 3, 4, 5). validQuerySet.size is 5. + newNdv = ndv.min(BigInt(validQuerySet.size)) + if (update) { + val newStats = colStat.copy(distinctCount = Some(newNdv), min = Some(newMin), + max = Some(newMax), nullCount = Some(0)) + colStatsMap.update(attr, newStats) } + // We assume the whole set since there is no min/max information for String/Binary type case StringType | BinaryType => newNdv = ndv.min(BigInt(hSet.size)) @@ -428,11 +431,7 @@ case class FilterEstimation(plan: Filter) extends Logging { // return the filter selectivity. Without advanced statistics such as histograms, // we have to assume uniform distribution. - if (ndv.toDouble != 0) { - Some(math.min(newNdv.toDouble / ndv.toDouble, 1.0)) - } else { - Some(0.0) - } + Some(math.min(newNdv.toDouble / ndv.toDouble, 1.0)) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala index b1ca37195cc3a..16cb5d032cf57 100755 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala @@ -360,9 +360,9 @@ class FilterEstimationSuite extends StatsEstimationTestBase { test("evaluateInSet with all zeros") { validateEstimatedStats( Filter(InSet(attrString, Set(3, 4, 5)), - StatsTestPlan(Seq(attrString), 10, + StatsTestPlan(Seq(attrString), 0, AttributeMap(Seq(attrString -> - ColumnStat(distinctCount = Some(0), min = Some(0), max = Some(0), + ColumnStat(distinctCount = Some(0), min = None, max = None, nullCount = Some(0), avgLen = Some(0), maxLen = Some(0)))))), Seq(attrString -> ColumnStat(distinctCount = Some(0))), expectedRowCount = 0) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index 14a565863d66c..877746beb79f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -382,4 +382,34 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Simple queries must be working, if CBO is turned on") { + withSQLConf(("spark.sql.cbo.enabled", "true")) { + withTable("TBL1", "TBL") { + import org.apache.spark.sql.functions._ + val df = spark.range(1000L).select('id, + 'id * 2 as "FLD1", + 'id * 12 as "FLD2", + lit("aaa") + 'id as "fld3") + df.write + .mode(SaveMode.Overwrite) + .bucketBy(10, "id", "FLD1", "FLD2") + .sortBy("id", "FLD1", "FLD2") + .saveAsTable("TBL") + spark.sql("ANALYZE TABLE TBL COMPUTE STATISTICS ") + spark.sql("ANALYZE TABLE TBL COMPUTE STATISTICS FOR COLUMNS ID, FLD1, FLD2, FLD3") + val df2 = spark.sql( + """ + SELECT t1.id, t1.fld1, t1.fld2, t1.fld3 + FROM tbl t1 + JOIN tbl t2 on t1.id=t2.id + WHERE t1.fld3 IN (-123.23,321.23) + """.stripMargin) + df2.createTempView("TBL2") + spark.sql("SELECT * FROM tbl2 WHERE fld3 IN ('qqq', 'qwe') ").explain() + } + } + + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/CBOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/CBOSuite.scala deleted file mode 100644 index bdc1610cb7c74..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/CBOSuite.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution - -import org.apache.spark.sql.{QueryTest, SaveMode} -import org.apache.spark.sql.test.SharedSparkSession - -class CBOSuite extends QueryTest with SharedSparkSession { - - import testImplicits._ - - test("Simple queries must be working, if CBO is turned on") { - withSQLConf(("spark.sql.cbo.enabled", "true")) { - withTable("TBL1", "TBL") { - import org.apache.spark.sql.functions._ - val df = spark.range(1000L).select('id, - 'id * 2 as "FLD1", - 'id * 12 as "FLD2", - lit("aaa") + 'id as "fld3") - df.write - .mode(SaveMode.Overwrite) - .bucketBy(10, "id", "FLD1", "FLD2") - .sortBy("id", "FLD1", "FLD2") - .saveAsTable("TBL") - spark.sql("ANALYZE TABLE TBL COMPUTE STATISTICS ") - spark.sql("ANALYZE TABLE TBL COMPUTE STATISTICS FOR COLUMNS ID, FLD1, FLD2, FLD3") - val df2 = spark.sql( - """ - SELECT t1.id, t1.fld1, t1.fld2, t1.fld3 - FROM tbl t1 - JOIN tbl t2 on t1.id=t2.id - WHERE t1.fld3 IN (-123.23,321.23) - """.stripMargin) - df2.createTempView("TBL2") - val df3 = spark.sql("SELECT * FROM tbl2 WHERE fld3 IN ('qqq', 'qwe') ") - assertResult(0, "") { - df3.count() - } - } - } - - } - -} From 8d2148814e52a2db1e14592c91467013565c310a Mon Sep 17 00:00:00 2001 From: Mykhailo Shtelma Date: Sat, 21 Apr 2018 13:14:12 +0200 Subject: [PATCH 5/6] minor fixes --- .../apache/spark/sql/StatisticsCollectionSuite.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index 877746beb79f9..08cc1295f1d05 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -384,7 +384,7 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } test("Simple queries must be working, if CBO is turned on") { - withSQLConf(("spark.sql.cbo.enabled", "true")) { + withSQLConf(SQLConf.CBO_ENABLED.key -> "true") { withTable("TBL1", "TBL") { import org.apache.spark.sql.functions._ val df = spark.range(1000L).select('id, @@ -396,8 +396,8 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared .bucketBy(10, "id", "FLD1", "FLD2") .sortBy("id", "FLD1", "FLD2") .saveAsTable("TBL") - spark.sql("ANALYZE TABLE TBL COMPUTE STATISTICS ") - spark.sql("ANALYZE TABLE TBL COMPUTE STATISTICS FOR COLUMNS ID, FLD1, FLD2, FLD3") + sql("ANALYZE TABLE TBL COMPUTE STATISTICS ") + sql("ANALYZE TABLE TBL COMPUTE STATISTICS FOR COLUMNS ID, FLD1, FLD2, FLD3") val df2 = spark.sql( """ SELECT t1.id, t1.fld1, t1.fld2, t1.fld3 @@ -406,10 +406,8 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared WHERE t1.fld3 IN (-123.23,321.23) """.stripMargin) df2.createTempView("TBL2") - spark.sql("SELECT * FROM tbl2 WHERE fld3 IN ('qqq', 'qwe') ").explain() + sql("SELECT * FROM tbl2 WHERE fld3 IN ('qqq', 'qwe') ").explain() } } - } - } From 8369cbcd5eab3686c78365e1b1f906a3e8136731 Mon Sep 17 00:00:00 2001 From: Mykhailo Shtelma Date: Sun, 22 Apr 2018 00:14:56 +0200 Subject: [PATCH 6/6] minor fixes --- .../apache/spark/sql/StatisticsCollectionSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index 08cc1295f1d05..b91712f4cc25d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -400,13 +400,13 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared sql("ANALYZE TABLE TBL COMPUTE STATISTICS FOR COLUMNS ID, FLD1, FLD2, FLD3") val df2 = spark.sql( """ - SELECT t1.id, t1.fld1, t1.fld2, t1.fld3 - FROM tbl t1 - JOIN tbl t2 on t1.id=t2.id - WHERE t1.fld3 IN (-123.23,321.23) + |SELECT t1.id, t1.fld1, t1.fld2, t1.fld3 + |FROM tbl t1 + |JOIN tbl t2 on t1.id=t2.id + |WHERE t1.fld3 IN (-123.23,321.23) """.stripMargin) df2.createTempView("TBL2") - sql("SELECT * FROM tbl2 WHERE fld3 IN ('qqq', 'qwe') ").explain() + sql("SELECT * FROM tbl2 WHERE fld3 IN ('qqq', 'qwe') ").queryExecution.executedPlan } } }