Skip to content

Commit

Permalink
[SPARK-25363][SQL] Fix schema pruning in where clause by ignoring unn…
Browse files Browse the repository at this point in the history
…ecessary root fields

## What changes were proposed in this pull request?

Schema pruning doesn't work if nested column is used in where clause.

For example,
```
sql("select name.first from contacts where name.first = 'David'")

== Physical Plan ==
*(1) Project [name#19.first AS first#40]
+- *(1) Filter (isnotnull(name#19) && (name#19.first = David))
   +- *(1) FileScan parquet [name#19] Batched: false, Format: Parquet, PartitionFilters: [],
    PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:struct<first:string,middle:string,last:string>>
```

In above query plan, the scan node reads the entire schema of `name` column.

This issue is reported by:
#21320 (comment)

The cause is that we infer a root field from expression `IsNotNull(name)`. However, for such expression, we don't really use the nested fields of this root field, so we can ignore the unnecessary nested fields.

## How was this patch tested?

Unit tests.

Closes #22357 from viirya/SPARK-25363.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
  • Loading branch information
viirya authored and dbtsai committed Sep 12, 2018
1 parent 2f42239 commit 3030b82
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 15 deletions.
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources.parquet

import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -110,7 +110,17 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
val projectionRootFields = projects.flatMap(getRootFields)
val filterRootFields = filters.flatMap(getRootFields)

(projectionRootFields ++ filterRootFields).distinct
// Kind of expressions don't need to access any fields of a root fields, e.g., `IsNotNull`.
// For them, if there are any nested fields accessed in the query, we don't need to add root
// field access of above expressions.
// For example, for a query `SELECT name.first FROM contacts WHERE name IS NOT NULL`,
// we don't need to read nested fields of `name` struct other than `first` field.
val (rootFields, optRootFields) = (projectionRootFields ++ filterRootFields)
.distinct.partition(_.contentAccessed)

optRootFields.filter { opt =>
!rootFields.exists(_.field.name == opt.field.name)
} ++ rootFields
}

/**
Expand Down Expand Up @@ -156,7 +166,7 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
// in the resulting schema may differ from their ordering in the logical relation's
// original schema
val mergedSchema = requestedRootFields
.map { case RootField(field, _) => StructType(Array(field)) }
.map { case root: RootField => StructType(Array(root.field)) }
.reduceLeft(_ merge _)
val dataSchemaFieldNames = fileDataSchema.fieldNames.toSet
val mergedDataSchema =
Expand Down Expand Up @@ -199,6 +209,15 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
case att: Attribute =>
RootField(StructField(att.name, att.dataType, att.nullable), derivedFromAtt = true) :: Nil
case SelectedField(field) => RootField(field, derivedFromAtt = false) :: Nil
// Root field accesses by `IsNotNull` and `IsNull` are special cases as the expressions
// don't actually use any nested fields. These root field accesses might be excluded later
// if there are any nested fields accesses in the query plan.
case IsNotNull(SelectedField(field)) =>
RootField(field, derivedFromAtt = false, contentAccessed = false) :: Nil
case IsNull(SelectedField(field)) =>
RootField(field, derivedFromAtt = false, contentAccessed = false) :: Nil
case IsNotNull(_: Attribute) | IsNull(_: Attribute) =>
expr.children.flatMap(getRootFields).map(_.copy(contentAccessed = false))
case _ =>
expr.children.flatMap(getRootFields)
}
Expand Down Expand Up @@ -250,8 +269,11 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
}

/**
* A "root" schema field (aka top-level, no-parent) and whether it was derived from
* an attribute or had a proper child.
* This represents a "root" schema field (aka top-level, no-parent). `field` is the
* `StructField` for field name and datatype. `derivedFromAtt` indicates whether it
* was derived from an attribute or had a proper child. `contentAccessed` means whether
* it was accessed with its content by the expressions refer it.
*/
private case class RootField(field: StructField, derivedFromAtt: Boolean)
private case class RootField(field: StructField, derivedFromAtt: Boolean,
contentAccessed: Boolean = true)
}
Expand Up @@ -35,22 +35,29 @@ class ParquetSchemaPruningSuite
with SchemaPruningTest
with SharedSQLContext {
case class FullName(first: String, middle: String, last: String)
case class Company(name: String, address: String)
case class Employer(id: Int, company: Company)
case class Contact(
id: Int,
name: FullName,
address: String,
pets: Int,
friends: Array[FullName] = Array.empty,
relatives: Map[String, FullName] = Map.empty)
relatives: Map[String, FullName] = Map.empty,
employer: Employer = null)

val janeDoe = FullName("Jane", "X.", "Doe")
val johnDoe = FullName("John", "Y.", "Doe")
val susanSmith = FullName("Susan", "Z.", "Smith")

val employer = Employer(0, Company("abc", "123 Business Street"))
val employerWithNullCompany = Employer(1, null)

private val contacts =
Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith),
relatives = Map("brother" -> johnDoe)) ::
Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> janeDoe)) :: Nil
relatives = Map("brother" -> johnDoe), employer = employer) ::
Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> janeDoe),
employer = employerWithNullCompany) :: Nil

case class Name(first: String, last: String)
case class BriefContact(id: Int, name: Name, address: String)
Expand All @@ -66,13 +73,14 @@ class ParquetSchemaPruningSuite
pets: Int,
friends: Array[FullName] = Array(),
relatives: Map[String, FullName] = Map(),
employer: Employer = null,
p: Int)

case class BriefContactWithDataPartitionColumn(id: Int, name: Name, address: String, p: Int)

private val contactsWithDataPartitionColumn =
contacts.map { case Contact(id, name, address, pets, friends, relatives) =>
ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, 1) }
contacts.map { case Contact(id, name, address, pets, friends, relatives, employer) =>
ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, employer, 1) }
private val briefContactsWithDataPartitionColumn =
briefContacts.map { case BriefContact(id, name, address) =>
BriefContactWithDataPartitionColumn(id, name, address, 2) }
Expand Down Expand Up @@ -155,6 +163,60 @@ class ParquetSchemaPruningSuite
Row(null) :: Row(null) :: Nil)
}

testSchemaPruning("select a single complex field and in where clause") {
val query1 = sql("select name.first from contacts where name.first = 'Jane'")
checkScan(query1, "struct<name:struct<first:string>>")
checkAnswer(query1, Row("Jane") :: Nil)

val query2 = sql("select name.first, name.last from contacts where name.first = 'Jane'")
checkScan(query2, "struct<name:struct<first:string,last:string>>")
checkAnswer(query2, Row("Jane", "Doe") :: Nil)

val query3 = sql("select name.first from contacts " +
"where employer.company.name = 'abc' and p = 1")
checkScan(query3, "struct<name:struct<first:string>," +
"employer:struct<company:struct<name:string>>>")
checkAnswer(query3, Row("Jane") :: Nil)

val query4 = sql("select name.first, employer.company.name from contacts " +
"where employer.company is not null and p = 1")
checkScan(query4, "struct<name:struct<first:string>," +
"employer:struct<company:struct<name:string>>>")
checkAnswer(query4, Row("Jane", "abc") :: Nil)
}

testSchemaPruning("select nullable complex field and having is not null predicate") {
val query = sql("select employer.company from contacts " +
"where employer is not null and p = 1")
checkScan(query, "struct<employer:struct<company:struct<name:string,address:string>>>")
checkAnswer(query, Row(Row("abc", "123 Business Street")) :: Row(null) :: Nil)
}

testSchemaPruning("select a single complex field and is null expression in project") {
val query = sql("select name.first, address is not null from contacts")
checkScan(query, "struct<name:struct<first:string>,address:string>")
checkAnswer(query.orderBy("id"),
Row("Jane", true) :: Row("John", true) :: Row("Janet", true) :: Row("Jim", true) :: Nil)
}

testSchemaPruning("select a single complex field array and in clause") {
val query = sql("select friends.middle from contacts where friends.first[0] = 'Susan'")
checkScan(query,
"struct<friends:array<struct<first:string,middle:string>>>")
checkAnswer(query.orderBy("id"),
Row(Array("Z.")) :: Nil)
}

testSchemaPruning("select a single complex field from a map entry and in clause") {
val query =
sql("select relatives[\"brother\"].middle from contacts " +
"where relatives[\"brother\"].first = 'John'")
checkScan(query,
"struct<relatives:map<string,struct<first:string,middle:string>>>")
checkAnswer(query.orderBy("id"),
Row("Y.") :: Nil)
}

private def testSchemaPruning(testName: String)(testThunk: => Unit) {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
test(s"Spark vectorized reader - without partition data column - $testName") {
Expand Down Expand Up @@ -238,10 +300,7 @@ class ParquetSchemaPruningSuite

testMixedCasePruning("filter with different-case column names") {
val query = sql("select id from mixedcase where Col2.b = 2")
// Pruning with filters is currently unsupported. As-is, the file reader will read the id column
// and the entire coL2 struct. Once pruning with filters has been implemented we can uncomment
// this line
// checkScan(query, "struct<id:int,coL2:struct<B:int>>")
checkScan(query, "struct<id:int,coL2:struct<B:int>>")
checkAnswer(query.orderBy("id"), Row(1) :: Nil)
}

Expand Down

0 comments on commit 3030b82

Please sign in to comment.