From 3030b82c89d3e45a2e361c469fbc667a1e43b854 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 12 Sep 2018 17:43:40 +0000 Subject: [PATCH] [SPARK-25363][SQL] Fix schema pruning in where clause by ignoring unnecessary root fields ## What changes were proposed in this pull request? Schema pruning doesn't work if nested column is used in where clause. For example, ``` sql("select name.first from contacts where name.first = 'David'") == Physical Plan == *(1) Project [name#19.first AS first#40] +- *(1) Filter (isnotnull(name#19) && (name#19.first = David)) +- *(1) FileScan parquet [name#19] Batched: false, Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name)], ReadSchema: struct> ``` In above query plan, the scan node reads the entire schema of `name` column. This issue is reported by: https://github.com/apache/spark/pull/21320#issuecomment-419290197 The cause is that we infer a root field from expression `IsNotNull(name)`. However, for such expression, we don't really use the nested fields of this root field, so we can ignore the unnecessary nested fields. ## How was this patch tested? Unit tests. Closes #22357 from viirya/SPARK-25363. Authored-by: Liang-Chi Hsieh Signed-off-by: DB Tsai --- .../parquet/ParquetSchemaPruning.scala | 34 ++++++-- .../parquet/ParquetSchemaPruningSuite.scala | 77 ++++++++++++++++--- 2 files changed, 96 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala index 6a46b5f8edc54..91080b15727d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule @@ -110,7 +110,17 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { val projectionRootFields = projects.flatMap(getRootFields) val filterRootFields = filters.flatMap(getRootFields) - (projectionRootFields ++ filterRootFields).distinct + // Kind of expressions don't need to access any fields of a root fields, e.g., `IsNotNull`. + // For them, if there are any nested fields accessed in the query, we don't need to add root + // field access of above expressions. + // For example, for a query `SELECT name.first FROM contacts WHERE name IS NOT NULL`, + // we don't need to read nested fields of `name` struct other than `first` field. + val (rootFields, optRootFields) = (projectionRootFields ++ filterRootFields) + .distinct.partition(_.contentAccessed) + + optRootFields.filter { opt => + !rootFields.exists(_.field.name == opt.field.name) + } ++ rootFields } /** @@ -156,7 +166,7 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { // in the resulting schema may differ from their ordering in the logical relation's // original schema val mergedSchema = requestedRootFields - .map { case RootField(field, _) => StructType(Array(field)) } + .map { case root: RootField => StructType(Array(root.field)) } .reduceLeft(_ merge _) val dataSchemaFieldNames = fileDataSchema.fieldNames.toSet val mergedDataSchema = @@ -199,6 +209,15 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { case att: Attribute => RootField(StructField(att.name, att.dataType, att.nullable), derivedFromAtt = true) :: Nil case SelectedField(field) => RootField(field, derivedFromAtt = false) :: Nil + // Root field accesses by `IsNotNull` and `IsNull` are special cases as the expressions + // don't actually use any nested fields. These root field accesses might be excluded later + // if there are any nested fields accesses in the query plan. + case IsNotNull(SelectedField(field)) => + RootField(field, derivedFromAtt = false, contentAccessed = false) :: Nil + case IsNull(SelectedField(field)) => + RootField(field, derivedFromAtt = false, contentAccessed = false) :: Nil + case IsNotNull(_: Attribute) | IsNull(_: Attribute) => + expr.children.flatMap(getRootFields).map(_.copy(contentAccessed = false)) case _ => expr.children.flatMap(getRootFields) } @@ -250,8 +269,11 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { } /** - * A "root" schema field (aka top-level, no-parent) and whether it was derived from - * an attribute or had a proper child. + * This represents a "root" schema field (aka top-level, no-parent). `field` is the + * `StructField` for field name and datatype. `derivedFromAtt` indicates whether it + * was derived from an attribute or had a proper child. `contentAccessed` means whether + * it was accessed with its content by the expressions refer it. */ - private case class RootField(field: StructField, derivedFromAtt: Boolean) + private case class RootField(field: StructField, derivedFromAtt: Boolean, + contentAccessed: Boolean = true) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala index eb99654fa78f5..7b132af4f6911 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala @@ -35,22 +35,29 @@ class ParquetSchemaPruningSuite with SchemaPruningTest with SharedSQLContext { case class FullName(first: String, middle: String, last: String) + case class Company(name: String, address: String) + case class Employer(id: Int, company: Company) case class Contact( id: Int, name: FullName, address: String, pets: Int, friends: Array[FullName] = Array.empty, - relatives: Map[String, FullName] = Map.empty) + relatives: Map[String, FullName] = Map.empty, + employer: Employer = null) val janeDoe = FullName("Jane", "X.", "Doe") val johnDoe = FullName("John", "Y.", "Doe") val susanSmith = FullName("Susan", "Z.", "Smith") + val employer = Employer(0, Company("abc", "123 Business Street")) + val employerWithNullCompany = Employer(1, null) + private val contacts = Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith), - relatives = Map("brother" -> johnDoe)) :: - Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> janeDoe)) :: Nil + relatives = Map("brother" -> johnDoe), employer = employer) :: + Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> janeDoe), + employer = employerWithNullCompany) :: Nil case class Name(first: String, last: String) case class BriefContact(id: Int, name: Name, address: String) @@ -66,13 +73,14 @@ class ParquetSchemaPruningSuite pets: Int, friends: Array[FullName] = Array(), relatives: Map[String, FullName] = Map(), + employer: Employer = null, p: Int) case class BriefContactWithDataPartitionColumn(id: Int, name: Name, address: String, p: Int) private val contactsWithDataPartitionColumn = - contacts.map { case Contact(id, name, address, pets, friends, relatives) => - ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, 1) } + contacts.map { case Contact(id, name, address, pets, friends, relatives, employer) => + ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, employer, 1) } private val briefContactsWithDataPartitionColumn = briefContacts.map { case BriefContact(id, name, address) => BriefContactWithDataPartitionColumn(id, name, address, 2) } @@ -155,6 +163,60 @@ class ParquetSchemaPruningSuite Row(null) :: Row(null) :: Nil) } + testSchemaPruning("select a single complex field and in where clause") { + val query1 = sql("select name.first from contacts where name.first = 'Jane'") + checkScan(query1, "struct>") + checkAnswer(query1, Row("Jane") :: Nil) + + val query2 = sql("select name.first, name.last from contacts where name.first = 'Jane'") + checkScan(query2, "struct>") + checkAnswer(query2, Row("Jane", "Doe") :: Nil) + + val query3 = sql("select name.first from contacts " + + "where employer.company.name = 'abc' and p = 1") + checkScan(query3, "struct," + + "employer:struct>>") + checkAnswer(query3, Row("Jane") :: Nil) + + val query4 = sql("select name.first, employer.company.name from contacts " + + "where employer.company is not null and p = 1") + checkScan(query4, "struct," + + "employer:struct>>") + checkAnswer(query4, Row("Jane", "abc") :: Nil) + } + + testSchemaPruning("select nullable complex field and having is not null predicate") { + val query = sql("select employer.company from contacts " + + "where employer is not null and p = 1") + checkScan(query, "struct>>") + checkAnswer(query, Row(Row("abc", "123 Business Street")) :: Row(null) :: Nil) + } + + testSchemaPruning("select a single complex field and is null expression in project") { + val query = sql("select name.first, address is not null from contacts") + checkScan(query, "struct,address:string>") + checkAnswer(query.orderBy("id"), + Row("Jane", true) :: Row("John", true) :: Row("Janet", true) :: Row("Jim", true) :: Nil) + } + + testSchemaPruning("select a single complex field array and in clause") { + val query = sql("select friends.middle from contacts where friends.first[0] = 'Susan'") + checkScan(query, + "struct>>") + checkAnswer(query.orderBy("id"), + Row(Array("Z.")) :: Nil) + } + + testSchemaPruning("select a single complex field from a map entry and in clause") { + val query = + sql("select relatives[\"brother\"].middle from contacts " + + "where relatives[\"brother\"].first = 'John'") + checkScan(query, + "struct>>") + checkAnswer(query.orderBy("id"), + Row("Y.") :: Nil) + } + private def testSchemaPruning(testName: String)(testThunk: => Unit) { withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") { test(s"Spark vectorized reader - without partition data column - $testName") { @@ -238,10 +300,7 @@ class ParquetSchemaPruningSuite testMixedCasePruning("filter with different-case column names") { val query = sql("select id from mixedcase where Col2.b = 2") - // Pruning with filters is currently unsupported. As-is, the file reader will read the id column - // and the entire coL2 struct. Once pruning with filters has been implemented we can uncomment - // this line - // checkScan(query, "struct>") + checkScan(query, "struct>") checkAnswer(query.orderBy("id"), Row(1) :: Nil) }