Skip to content

Commit

Permalink
Timestamp support pushdown to parquet data source
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyum committed Jul 10, 2018
1 parent eb6e988 commit b2a9000
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 8 deletions.
Expand Up @@ -378,6 +378,15 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED =
buildConf("spark.sql.parquet.filterPushdown.timestamp")
.doc("If true, enables Parquet filter push-down optimization for Timestamp. " +
"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is " +
"enabled and Timestamp stored as TIMESTAMP_MICROS or TIMESTAMP_MILLIS type.")
.internal()
.booleanConf
.createWithDefault(true)

val PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED =
buildConf("spark.sql.parquet.filterPushdown.string.startsWith")
.doc("If true, enables Parquet filter push-down optimization for string startsWith function. " +
Expand Down Expand Up @@ -1467,6 +1476,8 @@ class SQLConf extends Serializable with Logging {

def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED)

def parquetFilterPushDownTimestamp: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED)

def parquetFilterPushDownStringStartWith: Boolean =
getConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED)

Expand Down
124 changes: 124 additions & 0 deletions sql/core/benchmarks/FilterPushdownBenchmark-results.txt
Expand Up @@ -578,3 +578,127 @@ Native ORC Vectorized 10758 / 11971 1.5 6
Native ORC Vectorized (Pushdown) 10564 / 10713 1.5 671.6 1.1X


================================================================================================
Pushdown benchmark for Timestamp
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 1 timestamp stored as INT96 row (value = CAST(7864320 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 4784 / 4956 3.3 304.2 1.0X
Parquet Vectorized (Pushdown) 4838 / 4917 3.3 307.6 1.0X
Native ORC Vectorized 3923 / 4173 4.0 249.4 1.2X
Native ORC Vectorized (Pushdown) 894 / 943 17.6 56.8 5.4X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 10% timestamp stored as INT96 rows (value < CAST(1572864 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 5686 / 5901 2.8 361.5 1.0X
Parquet Vectorized (Pushdown) 5555 / 5895 2.8 353.2 1.0X
Native ORC Vectorized 4844 / 4957 3.2 308.0 1.2X
Native ORC Vectorized (Pushdown) 2141 / 2230 7.3 136.1 2.7X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 50% timestamp stored as INT96 rows (value < CAST(7864320 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 9100 / 9421 1.7 578.6 1.0X
Parquet Vectorized (Pushdown) 9122 / 9496 1.7 580.0 1.0X
Native ORC Vectorized 8365 / 8874 1.9 531.9 1.1X
Native ORC Vectorized (Pushdown) 7128 / 7376 2.2 453.2 1.3X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 90% timestamp stored as INT96 rows (value < CAST(14155776 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 12764 / 13120 1.2 811.5 1.0X
Parquet Vectorized (Pushdown) 12656 / 13003 1.2 804.7 1.0X
Native ORC Vectorized 13096 / 13233 1.2 832.6 1.0X
Native ORC Vectorized (Pushdown) 12710 / 15611 1.2 808.1 1.0X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 1 timestamp stored as TIMESTAMP_MICROS row (value = CAST(7864320 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 4381 / 4796 3.6 278.5 1.0X
Parquet Vectorized (Pushdown) 122 / 137 129.3 7.7 36.0X
Native ORC Vectorized 3913 / 3988 4.0 248.8 1.1X
Native ORC Vectorized (Pushdown) 905 / 945 17.4 57.6 4.8X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 10% timestamp stored as TIMESTAMP_MICROS rows (value < CAST(1572864 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 5145 / 5184 3.1 327.1 1.0X
Parquet Vectorized (Pushdown) 1426 / 1519 11.0 90.7 3.6X
Native ORC Vectorized 4827 / 4901 3.3 306.9 1.1X
Native ORC Vectorized (Pushdown) 2133 / 2210 7.4 135.6 2.4X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 50% timestamp stored as TIMESTAMP_MICROS rows (value < CAST(7864320 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 9234 / 9516 1.7 587.1 1.0X
Parquet Vectorized (Pushdown) 6752 / 7046 2.3 429.3 1.4X
Native ORC Vectorized 8418 / 8998 1.9 535.2 1.1X
Native ORC Vectorized (Pushdown) 7199 / 7314 2.2 457.7 1.3X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 90% timestamp stored as TIMESTAMP_MICROS rows (value < CAST(14155776 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 12414 / 12458 1.3 789.2 1.0X
Parquet Vectorized (Pushdown) 12094 / 12249 1.3 768.9 1.0X
Native ORC Vectorized 12198 / 13755 1.3 775.5 1.0X
Native ORC Vectorized (Pushdown) 12205 / 12431 1.3 776.0 1.0X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 1 timestamp stored as TIMESTAMP_MILLIS row (value = CAST(7864320 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 4369 / 4515 3.6 277.8 1.0X
Parquet Vectorized (Pushdown) 116 / 125 136.2 7.3 37.8X
Native ORC Vectorized 3965 / 4703 4.0 252.1 1.1X
Native ORC Vectorized (Pushdown) 892 / 1162 17.6 56.7 4.9X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 10% timestamp stored as TIMESTAMP_MILLIS rows (value < CAST(1572864 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 5211 / 5409 3.0 331.3 1.0X
Parquet Vectorized (Pushdown) 1427 / 1438 11.0 90.7 3.7X
Native ORC Vectorized 4719 / 4883 3.3 300.1 1.1X
Native ORC Vectorized (Pushdown) 2191 / 2228 7.2 139.3 2.4X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 50% timestamp stored as TIMESTAMP_MILLIS rows (value < CAST(7864320 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 8716 / 8953 1.8 554.2 1.0X
Parquet Vectorized (Pushdown) 6632 / 6968 2.4 421.7 1.3X
Native ORC Vectorized 8376 / 9118 1.9 532.5 1.0X
Native ORC Vectorized (Pushdown) 7218 / 7609 2.2 458.9 1.2X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 90% timestamp stored as TIMESTAMP_MILLIS rows (value < CAST(14155776 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 12264 / 12452 1.3 779.7 1.0X
Parquet Vectorized (Pushdown) 11766 / 11927 1.3 748.0 1.0X
Native ORC Vectorized 12101 / 12301 1.3 769.3 1.0X
Native ORC Vectorized (Pushdown) 11983 / 12651 1.3 761.9 1.0X

Expand Up @@ -348,6 +348,7 @@ class ParquetFileFormat
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
val returningBatch = supportBatch(sparkSession, resultSchema)
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith

(file: PartitionedFile) => {
Expand Down Expand Up @@ -376,7 +377,7 @@ class ParquetFileFormat
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith)
.flatMap(new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownStringStartWith)
.createFilter(parquetSchema, _))
.reduceOption(FilterApi.and)
} else {
Expand Down
Expand Up @@ -17,14 +17,15 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.sql.Date
import java.lang.{Long => JLong}
import java.sql.{Date, Timestamp}

import scala.collection.JavaConverters.asScalaBufferConverter

import org.apache.parquet.filter2.predicate._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.{DecimalMetadata, MessageType, OriginalType, PrimitiveComparator, PrimitiveType}
import org.apache.parquet.schema.{DecimalMetadata, MessageType, OriginalType, PrimitiveComparator}
import org.apache.parquet.schema.OriginalType._
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
Expand All @@ -37,7 +38,10 @@ import org.apache.spark.unsafe.types.UTF8String
/**
* Some utility function to convert Spark data source filters to Parquet filters.
*/
private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) {
private[parquet] class ParquetFilters(
pushDownDate: Boolean,
pushDownTimestamp: Boolean,
pushDownStartWith: Boolean) {

private case class ParquetSchemaType(
originalType: OriginalType,
Expand All @@ -52,6 +56,8 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null)
private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null)
private val ParquetDateType = ParquetSchemaType(DATE, INT32, null)
private val ParquetTimestampMicrosType = ParquetSchemaType(TIMESTAMP_MICROS, INT64, null)
private val ParquetTimestampMillisType = ParquetSchemaType(TIMESTAMP_MILLIS, INT64, null)

private def dateToDays(date: Date): SQLDate = {
DateTimeUtils.fromJavaDate(date)
Expand Down Expand Up @@ -82,6 +88,15 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
(n: String, v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
case ParquetTimestampMicrosType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.eq(
longColumn(n),
Option(v).map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp])
.asInstanceOf[JLong]).orNull)
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.eq(
longColumn(n),
Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
}

private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
Expand All @@ -108,6 +123,15 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
(n: String, v: Any) => FilterApi.notEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
case ParquetTimestampMicrosType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.notEq(
longColumn(n),
Option(v).map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp])
.asInstanceOf[JLong]).orNull)
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.notEq(
longColumn(n),
Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
}

private val makeLt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
Expand All @@ -129,6 +153,14 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
case ParquetDateType if pushDownDate =>
(n: String, v: Any) =>
FilterApi.lt(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
case ParquetTimestampMicrosType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.lt(
longColumn(n),
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.lt(
longColumn(n),
v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
}

private val makeLtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
Expand All @@ -150,6 +182,14 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
case ParquetDateType if pushDownDate =>
(n: String, v: Any) =>
FilterApi.ltEq(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
case ParquetTimestampMicrosType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.ltEq(
longColumn(n),
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.ltEq(
longColumn(n),
v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
}

private val makeGt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
Expand All @@ -171,6 +211,14 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
case ParquetDateType if pushDownDate =>
(n: String, v: Any) =>
FilterApi.gt(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
case ParquetTimestampMicrosType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.gt(
longColumn(n),
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.gt(
longColumn(n),
v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
}

private val makeGtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
Expand All @@ -192,6 +240,14 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
case ParquetDateType if pushDownDate =>
(n: String, v: Any) =>
FilterApi.gtEq(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
case ParquetTimestampMicrosType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.gtEq(
longColumn(n),
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.gtEq(
longColumn(n),
v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
}

/**
Expand Down
Expand Up @@ -28,7 +28,8 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ByteType, Decimal, DecimalType}
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
import org.apache.spark.sql.types.{ByteType, Decimal, DecimalType, TimestampType}
import org.apache.spark.util.{Benchmark, Utils}

/**
Expand Down Expand Up @@ -359,6 +360,40 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
}
}
}

ignore(s"Pushdown benchmark for Timestamp") {
withTempPath { dir =>
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED.key -> true.toString) {
ParquetOutputTimestampType.values.toSeq.map(_.toString).foreach { fileType =>
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> fileType) {
val columns = (1 to width).map(i => s"CAST(id AS string) c$i")
val df = spark.range(numRows).selectExpr(columns: _*)
.withColumn("value", monotonically_increasing_id().cast(TimestampType))
withTempTable("orcTable", "patquetTable") {
saveAsTable(df, dir)

Seq(s"value = CAST($mid AS timestamp)").foreach { whereExpr =>
val title = s"Select 1 timestamp stored as $fileType row ($whereExpr)"
.replace("value AND value", "value")
filterPushDownBenchmark(numRows, title, whereExpr)
}

val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)")
Seq(10, 50, 90).foreach { percent =>
filterPushDownBenchmark(
numRows,
s"Select $percent% timestamp stored as $fileType rows " +
s"(value < CAST(${numRows * percent / 100} AS timestamp))",
s"value < CAST(${numRows * percent / 100} as timestamp)",
selectExpr
)
}
}
}
}
}
}
}
}

trait BenchmarkBeforeAndAfterEachTest extends BeforeAndAfterEachTestData { this: Suite =>
Expand Down

0 comments on commit b2a9000

Please sign in to comment.