Skip to content

Commit

Permalink
[SPARK-25306][SQL] Avoid skewed filter trees to speed up `createFilte…
Browse files Browse the repository at this point in the history
…r` in ORC

## What changes were proposed in this pull request?

In both ORC data sources, `createFilter` function has exponential time complexity due to its skewed filter tree generation. This PR aims to improve it by using new `buildTree` function.

**REPRODUCE**
```scala
// Create and read 1 row table with 1000 columns
sql("set spark.sql.orc.filterPushdown=true")
val selectExpr = (1 to 1000).map(i => s"id c$i")
spark.range(1).selectExpr(selectExpr: _*).write.mode("overwrite").orc("/tmp/orc")
print(s"With 0 filters, ")
spark.time(spark.read.orc("/tmp/orc").count)

// Increase the number of filters
(20 to 30).foreach { width =>
  val whereExpr = (1 to width).map(i => s"c$i is not null").mkString(" and ")
  print(s"With $width filters, ")
  spark.time(spark.read.orc("/tmp/orc").where(whereExpr).count)
}
```

**RESULT**
```scala
With 0 filters, Time taken: 653 ms
With 20 filters, Time taken: 962 ms
With 21 filters, Time taken: 1282 ms
With 22 filters, Time taken: 1982 ms
With 23 filters, Time taken: 3855 ms
With 24 filters, Time taken: 6719 ms
With 25 filters, Time taken: 12669 ms
With 26 filters, Time taken: 25032 ms
With 27 filters, Time taken: 49585 ms
With 28 filters, Time taken: 98980 ms    // over 1 min 38 seconds
With 29 filters, Time taken: 198368 ms   // over 3 mins
With 30 filters, Time taken: 393744 ms   // over 6 mins
```

**AFTER THIS PR**
```scala
With 0 filters, Time taken: 774 ms
With 20 filters, Time taken: 601 ms
With 21 filters, Time taken: 399 ms
With 22 filters, Time taken: 679 ms
With 23 filters, Time taken: 363 ms
With 24 filters, Time taken: 342 ms
With 25 filters, Time taken: 336 ms
With 26 filters, Time taken: 352 ms
With 27 filters, Time taken: 322 ms
With 28 filters, Time taken: 302 ms
With 29 filters, Time taken: 307 ms
With 30 filters, Time taken: 301 ms
```

## How was this patch tested?

Pass the Jenkins with newly added test cases.

Closes #22313 from dongjoon-hyun/SPARK-25306.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
dongjoon-hyun authored and cloud-fan committed Sep 5, 2018
1 parent 061bb01 commit 103f513
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 21 deletions.
34 changes: 34 additions & 0 deletions sql/core/benchmarks/FilterPushdownBenchmark-results.txt
Expand Up @@ -702,3 +702,37 @@ Parquet Vectorized (Pushdown) 11766 / 11927 1.3 7
Native ORC Vectorized 12101 / 12301 1.3 769.3 1.0X
Native ORC Vectorized (Pushdown) 11983 / 12651 1.3 761.9 1.0X


================================================================================================
Pushdown benchmark with many filters
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz

Select 1 row with 1 filters: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 158 / 182 0.0 158442969.0 1.0X
Parquet Vectorized (Pushdown) 150 / 158 0.0 149718289.0 1.1X
Native ORC Vectorized 141 / 148 0.0 141259852.0 1.1X
Native ORC Vectorized (Pushdown) 142 / 147 0.0 142016472.0 1.1X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz

Select 1 row with 250 filters: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 1013 / 1026 0.0 1013194322.0 1.0X
Parquet Vectorized (Pushdown) 1326 / 1332 0.0 1326301956.0 0.8X
Native ORC Vectorized 1005 / 1010 0.0 1005266379.0 1.0X
Native ORC Vectorized (Pushdown) 1068 / 1071 0.0 1067964993.0 0.9X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz

Select 1 row with 500 filters: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 3598 / 3614 0.0 3598001202.0 1.0X
Parquet Vectorized (Pushdown) 4282 / 4333 0.0 4281849770.0 0.8X
Native ORC Vectorized 3594 / 3619 0.0 3593551548.0 1.0X
Native ORC Vectorized (Pushdown) 3834 / 3840 0.0 3834240570.0 0.9X
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.sql.execution.datasources.orc

import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument, SearchArgumentFactory}
import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder
import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder
import org.apache.orc.storage.serde2.io.HiveDecimalWritable

import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.{And, Filter}
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -54,7 +55,17 @@ import org.apache.spark.sql.types._
* builder methods mentioned above can only be found in test code, where all tested filters are
* known to be convertible.
*/
private[orc] object OrcFilters {
private[sql] object OrcFilters {
private[sql] def buildTree(filters: Seq[Filter]): Option[Filter] = {
filters match {
case Seq() => None
case Seq(filter) => Some(filter)
case Seq(filter1, filter2) => Some(And(filter1, filter2))
case _ => // length > 2
val (left, right) = filters.splitAt(filters.length / 2)
Some(And(buildTree(left).get, buildTree(right).get))
}
}

/**
* Create ORC filter as a SearchArgument instance.
Expand All @@ -66,14 +77,14 @@ private[orc] object OrcFilters {
// collect all convertible ones to build the final `SearchArgument`.
val convertibleFilters = for {
filter <- filters
_ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder())
_ <- buildSearchArgument(dataTypeMap, filter, newBuilder)
} yield filter

for {
// Combines all convertible filters using `And` to produce a single conjunction
conjunction <- convertibleFilters.reduceOption(org.apache.spark.sql.sources.And)
conjunction <- buildTree(convertibleFilters)
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate
builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder())
builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder)
} yield builder.build()
}

Expand Down Expand Up @@ -127,8 +138,6 @@ private[orc] object OrcFilters {
dataTypeMap: Map[String, DataType],
expression: Filter,
builder: Builder): Option[Builder] = {
def newBuilder = SearchArgumentFactory.newBuilder()

def getType(attribute: String): PredicateLeaf.Type =
getPredicateLeafType(dataTypeMap(attribute))

Expand Down
Expand Up @@ -242,7 +242,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter

ignore("Pushdown for many distinct value case") {
withTempPath { dir =>
withTempTable("orcTable", "patquetTable") {
withTempTable("orcTable", "parquetTable") {
Seq(true, false).foreach { useStringForValue =>
prepareTable(dir, numRows, width, useStringForValue)
if (useStringForValue) {
Expand All @@ -259,7 +259,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
withTempPath { dir =>
val numDistinctValues = 200

withTempTable("orcTable", "patquetTable") {
withTempTable("orcTable", "parquetTable") {
prepareStringDictTable(dir, numRows, numDistinctValues, width)
runStringBenchmark(numRows, width, numDistinctValues / 2, "distinct string")
}
Expand All @@ -268,7 +268,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter

ignore("Pushdown benchmark for StringStartsWith") {
withTempPath { dir =>
withTempTable("orcTable", "patquetTable") {
withTempTable("orcTable", "parquetTable") {
prepareTable(dir, numRows, width, true)
Seq(
"value like '10%'",
Expand Down Expand Up @@ -296,7 +296,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
monotonically_increasing_id()
}
val df = spark.range(numRows).selectExpr(columns: _*).withColumn("value", valueCol.cast(dt))
withTempTable("orcTable", "patquetTable") {
withTempTable("orcTable", "parquetTable") {
saveAsTable(df, dir)

Seq(s"value = $mid").foreach { whereExpr =>
Expand All @@ -320,7 +320,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter

ignore("Pushdown benchmark for InSet -> InFilters") {
withTempPath { dir =>
withTempTable("orcTable", "patquetTable") {
withTempTable("orcTable", "parquetTable") {
prepareTable(dir, numRows, width, false)
Seq(5, 10, 50, 100).foreach { count =>
Seq(10, 50, 90).foreach { distribution =>
Expand All @@ -341,7 +341,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
val df = spark.range(numRows).selectExpr(columns: _*)
.withColumn("value", (monotonically_increasing_id() % Byte.MaxValue).cast(ByteType))
.orderBy("value")
withTempTable("orcTable", "patquetTable") {
withTempTable("orcTable", "parquetTable") {
saveAsTable(df, dir)

Seq(s"value = CAST(${Byte.MaxValue / 2} AS ${ByteType.simpleString})")
Expand Down Expand Up @@ -373,7 +373,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
val columns = (1 to width).map(i => s"CAST(id AS string) c$i")
val df = spark.range(numRows).selectExpr(columns: _*)
.withColumn("value", monotonically_increasing_id().cast(TimestampType))
withTempTable("orcTable", "patquetTable") {
withTempTable("orcTable", "parquetTable") {
saveAsTable(df, dir)

Seq(s"value = CAST($mid AS timestamp)").foreach { whereExpr =>
Expand All @@ -398,6 +398,24 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
}
}
}

test(s"Pushdown benchmark with many filters") {
val numRows = 1
val width = 500

withTempPath { dir =>
val columns = (1 to width).map(i => s"id c$i")
val df = spark.range(1).selectExpr(columns: _*)
withTempTable("orcTable", "parquetTable") {
saveAsTable(df, dir)
Seq(1, 250, 500).foreach { numFilter =>
val whereExpr = (1 to numFilter).map(i => s"c$i = 0").mkString(" and ")
// Note: InferFiltersFromConstraints will add more filters to this given filters
filterPushDownBenchmark(numRows, s"Select 1 row with $numFilter filters", whereExpr)
}
}
}
}
}

trait BenchmarkBeforeAndAfterEachTest extends BeforeAndAfterEachTestData { this: Suite =>
Expand Down
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark.sql.hive.orc

import org.apache.hadoop.hive.ql.io.sarg.{SearchArgument, SearchArgumentFactory}
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.orc.OrcFilters.buildTree
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -62,23 +64,21 @@ private[orc] object OrcFilters extends Logging {
// collect all convertible ones to build the final `SearchArgument`.
val convertibleFilters = for {
filter <- filters
_ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder())
_ <- buildSearchArgument(dataTypeMap, filter, newBuilder)
} yield filter

for {
// Combines all convertible filters using `And` to produce a single conjunction
conjunction <- convertibleFilters.reduceOption(And)
conjunction <- buildTree(convertibleFilters)
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate
builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder())
builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder)
} yield builder.build()
}

private def buildSearchArgument(
dataTypeMap: Map[String, DataType],
expression: Filter,
builder: Builder): Option[Builder] = {
def newBuilder = SearchArgumentFactory.newBuilder()

def isSearchableType(dataType: DataType): Boolean = dataType match {
// Only the values in the Spark types below can be recognized by
// the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method.
Expand Down

0 comments on commit 103f513

Please sign in to comment.