From a55ad54a820f095e5116df05979f169c0fe8e0cf Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 29 Dec 2015 13:34:53 +0900 Subject: [PATCH 1/6] Implement unhandled filters for Parquet --- .../datasources/parquet/ParquetFilters.scala | 31 ++++++++++++++--- .../datasources/parquet/ParquetRelation.scala | 33 ++++++++++++++----- 2 files changed, 51 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 883013bf1bfc..dc9207c05598 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.Serializable +import scala.collection.mutable.ArrayBuffer + import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.filter2.predicate._ import org.apache.parquet.io.api.Binary @@ -207,12 +209,31 @@ private[sql] object ParquetFilters { */ } + /** + * Return referenced columns in [[sources.Filter]]. + */ + def referencedColumns(schema: StructType, predicate: sources.Filter): Array[String] = { + val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap + val referencedColumns = ArrayBuffer.empty[String] + def getDataTypeOf(name: String): DataType = { + referencedColumns += name + dataTypeOf(name) + } + createParquetFilter(getDataTypeOf, predicate) + referencedColumns.distinct.toArray + } + /** * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap + createParquetFilter(dataTypeOf, predicate) + } + private def createParquetFilter( + dataTypeOf: String => DataType, + predicate: sources.Filter): Option[FilterPredicate] = { relaxParquetValidTypeMap // NOTE: @@ -265,18 +286,18 @@ private[sql] object ParquetFilters { // Pushing one side of AND down is only safe to do at the top level. // You can see ParquetRelation's initializeLocalJobFunc method as an example. for { - lhsFilter <- createFilter(schema, lhs) - rhsFilter <- createFilter(schema, rhs) + lhsFilter <- createParquetFilter(dataTypeOf, lhs) + rhsFilter <- createParquetFilter(dataTypeOf, rhs) } yield FilterApi.and(lhsFilter, rhsFilter) case sources.Or(lhs, rhs) => for { - lhsFilter <- createFilter(schema, lhs) - rhsFilter <- createFilter(schema, rhs) + lhsFilter <- createParquetFilter(dataTypeOf, lhs) + rhsFilter <- createParquetFilter(dataTypeOf, rhs) } yield FilterApi.or(lhsFilter, rhsFilter) case sources.Not(pred) => - createFilter(schema, pred).map(FilterApi.not) + createParquetFilter(dataTypeOf, pred).map(FilterApi.not) case _ => None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 1af2a394f399..6c70de7636b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -133,6 +133,11 @@ private[sql] class ParquetRelation( .map(_.toBoolean) .getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED)) + // When merging schemas is enabled and the column of the given filter does not exist, + // Parquet emits an exception which is an issue of Parquet (PARQUET-389). + private val safeParquetFilterPushDown = + sqlContext.conf.parquetFilterPushDown && !shouldMergeSchemas + private val mergeRespectSummaries = sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES) @@ -288,20 +293,23 @@ private[sql] class ParquetRelation( } } + override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { + if (safeParquetFilterPushDown) { + filters.filter(ParquetFilters.createFilter(dataSchema, _).isEmpty) + } else { + filters + } + } + override def buildInternalScan( requiredColumns: Array[String], filters: Array[Filter], inputFiles: Array[FileStatus], broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = { val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA) - val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown + val parquetFilterPushDown = safeParquetFilterPushDown val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp - - // When merging schemas is enabled and the column of the given filter does not exist, - // Parquet emits an exception which is an issue of Parquet (PARQUET-389). - val safeParquetFilterPushDown = !shouldMergeSchemas && parquetFilterPushDown - // Parquet row group size. We will use this value as the value for // mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value // of these flags are smaller than the parquet row group size. @@ -315,7 +323,7 @@ private[sql] class ParquetRelation( dataSchema, parquetBlockSize, useMetadataCache, - safeParquetFilterPushDown, + parquetFilterPushDown, assumeBinaryIsString, assumeInt96IsTimestamp) _ @@ -568,6 +576,15 @@ private[sql] object ParquetRelation extends Logging { conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) // Try to push down filters when filter push-down is enabled. + val safeRequiredColumns = if (parquetFilterPushDown) { + val referencedColumns = filters + // Collects all columns referenced in Parquet filter predicates. + .flatMap(filter => ParquetFilters.referencedColumns(dataSchema, filter)) + (requiredColumns ++ referencedColumns).distinct + } else { + requiredColumns + } + if (parquetFilterPushDown) { filters // Collects all converted Parquet filter predicates. Notice that not all predicates can be @@ -579,7 +596,7 @@ private[sql] object ParquetRelation extends Logging { } conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, { - val requestedSchema = StructType(requiredColumns.map(dataSchema(_))) + val requestedSchema = StructType(safeRequiredColumns.map(dataSchema(_))) CatalystSchemaConverter.checkFieldNames(requestedSchema).json }) From cf331a453c3f99ee40ee5ca6f5029dadee3d07f6 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 29 Dec 2015 13:36:42 +0900 Subject: [PATCH 2/6] Correct existing tests --- .../datasources/parquet/ParquetFilterSuite.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 045425f282ad..283d9a6132b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet import org.apache.parquet.filter2.predicate.Operators._ import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators} +import org.apache.spark.sql.execution.PhysicalRDD import org.apache.spark.sql.{Column, DataFrame, QueryTest, Row, SQLConf} import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -75,7 +76,10 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex assert(f.getClass === filterClass) } } - checker(stripSparkFilter(query), expected) + // Check if SparkPlan Filter is removed and this plan only has PhysicalRDD. + val executedPlan = query.queryExecution.executedPlan + assert(executedPlan.isInstanceOf[PhysicalRDD]) + checker(query, expected) } } } @@ -325,7 +329,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") { import testImplicits._ - withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") { withTempPath { dir => @@ -353,11 +356,14 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path) val df = sqlContext.read.parquet(path).filter("a = 2") + // Check if SparkPlan Filter is removed and this plan only has PhysicalRDD. + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.isInstanceOf[PhysicalRDD]) // The result should be single row. // When a filter is pushed to Parquet, Parquet can apply it to every row. // So, we can check the number of rows returned from the Parquet // to make sure our filter pushdown work. - assert(stripSparkFilter(df).count == 1) + assert(df.count == 1) } } } From 8c376af467b939d3687e24f7d0e1ccf4e0ea5fca Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 29 Dec 2015 15:38:23 +0900 Subject: [PATCH 3/6] Wrap Spark-filter when unsafe row RecordReader is enabled. --- sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala | 2 +- .../execution/datasources/parquet/ParquetRelation.scala | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 3d819262859f..d01162f64e56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -326,7 +326,7 @@ private[spark] object SQLConf { val PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED = booleanConf( key = "spark.sql.parquet.enableUnsafeRowRecordReader", defaultValue = Some(true), - doc = "Enables using the custom ParquetUnsafeRowRecordReader.") + doc = "Enables using the custom UnsafeRowParquetRecordReader.") val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown", defaultValue = Some(false), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 6c70de7636b9..f5befdcf59b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -294,7 +294,12 @@ private[sql] class ParquetRelation( } override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { - if (safeParquetFilterPushDown) { + // The unsafe row RecordReader does not support row by row filtering so for this case + // it should wrap this with Spark-side filtering. + val enableUnsafeRowParquetReader = + sqlContext.getConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key).toBoolean + val shouldHandleFilters = safeParquetFilterPushDown && !enableUnsafeRowParquetReader + if (shouldHandleFilters) { filters.filter(ParquetFilters.createFilter(dataSchema, _).isEmpty) } else { filters From 2ad81822100229318d1d80796edb92916a10d820 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 29 Dec 2015 16:20:43 +0900 Subject: [PATCH 4/6] Remove the use of stripSparkFilter for newly added tests. --- .../parquet/ParquetFilterSuite.scala | 45 +++++++++++-------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index cf3d6a86fcba..628546e6cb88 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -469,30 +469,37 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex test("SPARK-11164: test the parquet filter in") { import testImplicits._ - withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { - withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") { - withTempPath { dir => - val path = s"${dir.getCanonicalPath}/table1" - (1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path) + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", + SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") { + withTempPath { dir => + def checkPlan(df: DataFrame): Unit = { + // Check if SparkPlan Filter is removed and this plan only has PhysicalRDD. + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.isInstanceOf[PhysicalRDD]) + } - // When a filter is pushed to Parquet, Parquet can apply it to every row. - // So, we can check the number of rows returned from the Parquet - // to make sure our filter pushdown work. - val df = sqlContext.read.parquet(path).where("b in (0,2)") - assert(stripSparkFilter(df).count == 3) + val path = s"${dir.getCanonicalPath}/table1" + (1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path) - val df1 = sqlContext.read.parquet(path).where("not (b in (1))") - assert(stripSparkFilter(df1).count == 3) + val df = sqlContext.read.parquet(path).where("b in (0,2)") + checkPlan(df) + assert(df.count == 3) - val df2 = sqlContext.read.parquet(path).where("not (b in (1,3) or a <= 2)") - assert(stripSparkFilter(df2).count == 2) + val df1 = sqlContext.read.parquet(path).where("not (b in (1))") + checkPlan(df1) + assert(df1.count == 3) - val df3 = sqlContext.read.parquet(path).where("not (b in (1,3) and a <= 2)") - assert(stripSparkFilter(df3).count == 4) + val df2 = sqlContext.read.parquet(path).where("not (b in (1,3) or a <= 2)") + checkPlan(df2) + assert(df2.count == 2) - val df4 = sqlContext.read.parquet(path).where("not (a <= 2)") - assert(stripSparkFilter(df4).count == 3) - } + val df3 = sqlContext.read.parquet(path).where("not (b in (1,3) and a <= 2)") + checkPlan(df3) + assert(df3.count == 4) + + val df4 = sqlContext.read.parquet(path).where("not (a <= 2)") + checkPlan(df4) + assert(df4.count == 3) } } } From 67d45332333d97a5ac648b2ebc688c83b6f88e72 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 26 Feb 2016 11:12:48 +0900 Subject: [PATCH 5/6] Handling WholeStageCodegen --- .../parquet/ParquetFilterSuite.scala | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 3d13e9d65850..1057f142b05d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.execution.PhysicalRDD +import org.apache.spark.sql.execution.{PhysicalRDD, WholeStageCodegen} import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -80,9 +80,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex assert(f.getClass === filterClass) } } - // Check if SparkPlan Filter is removed and this plan only has PhysicalRDD. - val executedPlan = query.queryExecution.executedPlan - assert(executedPlan.isInstanceOf[PhysicalRDD]) + + checkPlan(query) checker(query, expected) } } @@ -112,6 +111,14 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate(df, predicate, filterClass, checkBinaryAnswer _, expected) } + private def checkPlan(df: DataFrame): Unit = { + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.isInstanceOf[WholeStageCodegen]) + // Check if SparkPlan Filter is removed and this plan only has `PhysicalRDD`. + val childPlan = executedPlan.asInstanceOf[WholeStageCodegen].plan + assert(childPlan.isInstanceOf[PhysicalRDD]) + } + private def checkBinaryFilterPredicate (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Array[Byte]) (implicit df: DataFrame): Unit = { @@ -454,9 +461,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path) val df = sqlContext.read.parquet(path).filter("a = 2") - // Check if SparkPlan Filter is removed and this plan only has PhysicalRDD. - val executedPlan = df.queryExecution.executedPlan - assert(executedPlan.isInstanceOf[PhysicalRDD]) + // Check if SparkPlan Filter is removed and this plan only has `PhysicalRDD`. + checkPlan(df) // The result should be single row. // When a filter is pushed to Parquet, Parquet can apply it to every row. // So, we can check the number of rows returned from the Parquet @@ -527,12 +533,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") { withTempPath { dir => - def checkPlan(df: DataFrame): Unit = { - // Check if SparkPlan Filter is removed and this plan only has PhysicalRDD. - val executedPlan = df.queryExecution.executedPlan - assert(executedPlan.isInstanceOf[PhysicalRDD]) - } - val path = s"${dir.getCanonicalPath}/table1" (1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path) From 0e149dad44d1787f30122795961a97fd44c6761c Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 26 Feb 2016 11:16:01 +0900 Subject: [PATCH 6/6] Update indentations --- .../sql/execution/datasources/parquet/ParquetFilterSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 1057f142b05d..c3f797fc2bf5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -80,7 +80,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex assert(f.getClass === filterClass) } } - checkPlan(query) checker(query, expected) } @@ -381,6 +380,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") { import testImplicits._ + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") { withTempPath { dir => @@ -454,6 +454,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // The unsafe row RecordReader does not support row by row filtering so run it with it disabled. test("SPARK-11661 Still pushdown filters returned by unhandledFilters") { import testImplicits._ + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") { withTempPath { dir =>