diff --git a/sql/core/benchmarks/FilterPushdownBenchmark-results.txt b/sql/core/benchmarks/FilterPushdownBenchmark-results.txt index 2215ed91e2018..a75a15c99328a 100644 --- a/sql/core/benchmarks/FilterPushdownBenchmark-results.txt +++ b/sql/core/benchmarks/FilterPushdownBenchmark-results.txt @@ -702,3 +702,37 @@ Parquet Vectorized (Pushdown) 11766 / 11927 1.3 7 Native ORC Vectorized 12101 / 12301 1.3 769.3 1.0X Native ORC Vectorized (Pushdown) 11983 / 12651 1.3 761.9 1.0X + +================================================================================================ +Pushdown benchmark with many filters +================================================================================================ + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.13.6 +Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz + +Select 1 row with 1 filters: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +Parquet Vectorized 158 / 182 0.0 158442969.0 1.0X +Parquet Vectorized (Pushdown) 150 / 158 0.0 149718289.0 1.1X +Native ORC Vectorized 141 / 148 0.0 141259852.0 1.1X +Native ORC Vectorized (Pushdown) 142 / 147 0.0 142016472.0 1.1X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.13.6 +Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz + +Select 1 row with 250 filters: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +Parquet Vectorized 1013 / 1026 0.0 1013194322.0 1.0X +Parquet Vectorized (Pushdown) 1326 / 1332 0.0 1326301956.0 0.8X +Native ORC Vectorized 1005 / 1010 0.0 1005266379.0 1.0X +Native ORC Vectorized (Pushdown) 1068 / 1071 0.0 1067964993.0 0.9X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.13.6 +Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz + +Select 1 row with 500 filters: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +Parquet Vectorized 3598 / 3614 0.0 3598001202.0 1.0X +Parquet Vectorized (Pushdown) 4282 / 4333 0.0 4281849770.0 0.8X +Native ORC Vectorized 3594 / 3619 0.0 3593551548.0 1.0X +Native ORC Vectorized (Pushdown) 3834 / 3840 0.0 3834240570.0 0.9X diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index c4c3b3053a3b1..dbafc468c6c40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -17,11 +17,12 @@ package org.apache.spark.sql.execution.datasources.orc -import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument, SearchArgumentFactory} +import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument} import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder +import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder import org.apache.orc.storage.serde2.io.HiveDecimalWritable -import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.{And, Filter} import org.apache.spark.sql.types._ /** @@ -54,7 +55,17 @@ import org.apache.spark.sql.types._ * builder methods mentioned above can only be found in test code, where all tested filters are * known to be convertible. */ -private[orc] object OrcFilters { +private[sql] object OrcFilters { + private[sql] def buildTree(filters: Seq[Filter]): Option[Filter] = { + filters match { + case Seq() => None + case Seq(filter) => Some(filter) + case Seq(filter1, filter2) => Some(And(filter1, filter2)) + case _ => // length > 2 + val (left, right) = filters.splitAt(filters.length / 2) + Some(And(buildTree(left).get, buildTree(right).get)) + } + } /** * Create ORC filter as a SearchArgument instance. @@ -66,14 +77,14 @@ private[orc] object OrcFilters { // collect all convertible ones to build the final `SearchArgument`. val convertibleFilters = for { filter <- filters - _ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder()) + _ <- buildSearchArgument(dataTypeMap, filter, newBuilder) } yield filter for { // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- convertibleFilters.reduceOption(org.apache.spark.sql.sources.And) + conjunction <- buildTree(convertibleFilters) // Then tries to build a single ORC `SearchArgument` for the conjunction predicate - builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder()) + builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) } yield builder.build() } @@ -127,8 +138,6 @@ private[orc] object OrcFilters { dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { - def newBuilder = SearchArgumentFactory.newBuilder() - def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala index bdb60b44750c7..41087f1a97174 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala @@ -242,7 +242,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter ignore("Pushdown for many distinct value case") { withTempPath { dir => - withTempTable("orcTable", "patquetTable") { + withTempTable("orcTable", "parquetTable") { Seq(true, false).foreach { useStringForValue => prepareTable(dir, numRows, width, useStringForValue) if (useStringForValue) { @@ -259,7 +259,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter withTempPath { dir => val numDistinctValues = 200 - withTempTable("orcTable", "patquetTable") { + withTempTable("orcTable", "parquetTable") { prepareStringDictTable(dir, numRows, numDistinctValues, width) runStringBenchmark(numRows, width, numDistinctValues / 2, "distinct string") } @@ -268,7 +268,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter ignore("Pushdown benchmark for StringStartsWith") { withTempPath { dir => - withTempTable("orcTable", "patquetTable") { + withTempTable("orcTable", "parquetTable") { prepareTable(dir, numRows, width, true) Seq( "value like '10%'", @@ -296,7 +296,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter monotonically_increasing_id() } val df = spark.range(numRows).selectExpr(columns: _*).withColumn("value", valueCol.cast(dt)) - withTempTable("orcTable", "patquetTable") { + withTempTable("orcTable", "parquetTable") { saveAsTable(df, dir) Seq(s"value = $mid").foreach { whereExpr => @@ -320,7 +320,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter ignore("Pushdown benchmark for InSet -> InFilters") { withTempPath { dir => - withTempTable("orcTable", "patquetTable") { + withTempTable("orcTable", "parquetTable") { prepareTable(dir, numRows, width, false) Seq(5, 10, 50, 100).foreach { count => Seq(10, 50, 90).foreach { distribution => @@ -341,7 +341,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter val df = spark.range(numRows).selectExpr(columns: _*) .withColumn("value", (monotonically_increasing_id() % Byte.MaxValue).cast(ByteType)) .orderBy("value") - withTempTable("orcTable", "patquetTable") { + withTempTable("orcTable", "parquetTable") { saveAsTable(df, dir) Seq(s"value = CAST(${Byte.MaxValue / 2} AS ${ByteType.simpleString})") @@ -373,7 +373,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter val columns = (1 to width).map(i => s"CAST(id AS string) c$i") val df = spark.range(numRows).selectExpr(columns: _*) .withColumn("value", monotonically_increasing_id().cast(TimestampType)) - withTempTable("orcTable", "patquetTable") { + withTempTable("orcTable", "parquetTable") { saveAsTable(df, dir) Seq(s"value = CAST($mid AS timestamp)").foreach { whereExpr => @@ -398,6 +398,24 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter } } } + + test(s"Pushdown benchmark with many filters") { + val numRows = 1 + val width = 500 + + withTempPath { dir => + val columns = (1 to width).map(i => s"id c$i") + val df = spark.range(1).selectExpr(columns: _*) + withTempTable("orcTable", "parquetTable") { + saveAsTable(df, dir) + Seq(1, 250, 500).foreach { numFilter => + val whereExpr = (1 to numFilter).map(i => s"c$i = 0").mkString(" and ") + // Note: InferFiltersFromConstraints will add more filters to this given filters + filterPushDownBenchmark(numRows, s"Select 1 row with $numFilter filters", whereExpr) + } + } + } + } } trait BenchmarkBeforeAndAfterEachTest extends BeforeAndAfterEachTestData { this: Suite => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index d9efd0cb457cd..aee9cb58a031e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -17,10 +17,12 @@ package org.apache.spark.sql.hive.orc -import org.apache.hadoop.hive.ql.io.sarg.{SearchArgument, SearchArgumentFactory} +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.datasources.orc.OrcFilters.buildTree import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -62,14 +64,14 @@ private[orc] object OrcFilters extends Logging { // collect all convertible ones to build the final `SearchArgument`. val convertibleFilters = for { filter <- filters - _ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder()) + _ <- buildSearchArgument(dataTypeMap, filter, newBuilder) } yield filter for { // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- convertibleFilters.reduceOption(And) + conjunction <- buildTree(convertibleFilters) // Then tries to build a single ORC `SearchArgument` for the conjunction predicate - builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder()) + builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) } yield builder.build() } @@ -77,8 +79,6 @@ private[orc] object OrcFilters extends Logging { dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { - def newBuilder = SearchArgumentFactory.newBuilder() - def isSearchableType(dataType: DataType): Boolean = dataType match { // Only the values in the Spark types below can be recognized by // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method.