From 8c76e315bc0e89d9f049b59c82e97d06c8627025 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Thu, 27 Sep 2018 15:12:44 -0700 Subject: [PATCH 1/4] Remove unsupported predicates in parquet --- .../datasources/parquet/ParquetFilters.scala | 33 +++++++++++-------- .../parquet/ParquetFilterSuite.scala | 2 +- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 0c286defb9406..862a3b6a2be8a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -394,7 +394,13 @@ private[parquet] class ParquetFilters( */ def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = { val nameToParquetField = getFieldMap(schema) + createFilterHelper(nameToParquetField, predicate, canRemoveOneSideInAnd = true) + } + private def createFilterHelper( + nameToParquetField: Map[String, ParquetField], + predicate: sources.Filter, + canRemoveOneSideInAnd: Boolean): Option[FilterPredicate] = { // Decimal type must make sure that filter value's scale matched the file. // If doesn't matched, which would cause data corruption. def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match { @@ -488,26 +494,25 @@ private[parquet] class ParquetFilters( .map(_(nameToParquetField(name).fieldName, value)) case sources.And(lhs, rhs) => - // At here, it is not safe to just convert one side if we do not understand the - // other side. Here is an example used to explain the reason. - // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to - // convert b in ('1'). If we only convert a = 2, we will end up with a filter - // NOT(a = 2), which will generate wrong results. - // Pushing one side of AND down is only safe to do at the top level. - // You can see ParquetRelation's initializeLocalJobFunc method as an example. - for { - lhsFilter <- createFilter(schema, lhs) - rhsFilter <- createFilter(schema, rhs) - } yield FilterApi.and(lhsFilter, rhsFilter) + // If the unsupported predicate is in the top level `And` condition or in the child + // `And` condition before hitting `Not` or `Or` condition, it can be safely removed. + (createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = true), + createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = true)) match { + case (Some(lhsFilter), Some(rhsFilter)) => Some(FilterApi.and(lhsFilter, rhsFilter)) + case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter) + case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter) + case _ => None + } case sources.Or(lhs, rhs) => for { - lhsFilter <- createFilter(schema, lhs) - rhsFilter <- createFilter(schema, rhs) + lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false) + rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false) } yield FilterApi.or(lhsFilter, rhsFilter) case sources.Not(pred) => - createFilter(schema, pred).map(FilterApi.not) + createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) + .map(FilterApi.not) case sources.In(name, values) if canMakeFilterOn(name, values.head) && values.distinct.length <= pushDownInFilterThreshold => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 7ebb75009555a..1f7bfb6972e65 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -770,7 +770,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sources.GreaterThan("c", 1.5D))) } - assertResult(None) { + assertResult(Some(lt(intColumn("a"), 10: Integer))) { parquetFilters.createFilter( parquetSchema, sources.And( From 7792e99839bc5356df298431126a2a7417baff87 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 27 Sep 2018 19:15:35 -0700 Subject: [PATCH 2/4] Add a nested And test case --- .../datasources/parquet/ParquetFilterSuite.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 1f7bfb6972e65..5aef65b9de244 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -778,6 +778,17 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sources.StringContains("b", "prefix"))) } + assertResult(None) { + parquetFilters.createFilter( + parquetSchema, + sources.Not( + sources.And( + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix")), + sources.GreaterThan("a", 2)))) + } + assertResult(None) { parquetFilters.createFilter( parquetSchema, From d37b2e8883425d121d1e8d1533f934747c28fff3 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Thu, 27 Sep 2018 23:33:28 -0700 Subject: [PATCH 3/4] address feedback --- .../sql/execution/datasources/parquet/ParquetFilters.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 862a3b6a2be8a..6c27fa0940578 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -496,8 +496,10 @@ private[parquet] class ParquetFilters( case sources.And(lhs, rhs) => // If the unsupported predicate is in the top level `And` condition or in the child // `And` condition before hitting `Not` or `Or` condition, it can be safely removed. - (createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = true), - createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = true)) match { + val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd) + val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd) + + (lhsFilterOption, rhsFilterOption) match { case (Some(lhsFilter), Some(rhsFilter)) => Some(FilterApi.and(lhsFilter, rhsFilter)) case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter) case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter) From 9a9e47fb242afb94e7df917c852425cc0f5114e0 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Fri, 28 Sep 2018 13:34:39 -0700 Subject: [PATCH 4/4] Added more test and doc --- .../datasources/parquet/ParquetFilters.scala | 13 +- .../parquet/ParquetFilterSuite.scala | 138 +++++++++++++++++- 2 files changed, 146 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 6c27fa0940578..44a0d209e6e69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -494,8 +494,17 @@ private[parquet] class ParquetFilters( .map(_(nameToParquetField(name).fieldName, value)) case sources.And(lhs, rhs) => - // If the unsupported predicate is in the top level `And` condition or in the child - // `And` condition before hitting `Not` or `Or` condition, it can be safely removed. + // At here, it is not safe to just convert one side and remove the other side + // if we do not understand what the parent filters are. + // + // Here is an example used to explain the reason. + // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to + // convert b in ('1'). If we only convert a = 2, we will end up with a filter + // NOT(a = 2), which will generate wrong results. + // + // Pushing one side of AND down is only safe to do at the top level or in the child + // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate + // can be safely removed. val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd) val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 5aef65b9de244..01e41b3c5df36 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -750,7 +750,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } - test("SPARK-12218 Converting conjunctions into Parquet filter predicates") { + test("SPARK-12218 and SPARK-25559 Converting conjunctions into Parquet filter predicates") { val schema = StructType(Seq( StructField("a", IntegerType, nullable = false), StructField("b", StringType, nullable = true), @@ -770,6 +770,10 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sources.GreaterThan("c", 1.5D))) } + // Testing when `canRemoveOneSideInAnd == true` + // case sources.And(lhs, rhs) => + // ... + // case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter) assertResult(Some(lt(intColumn("a"), 10: Integer))) { parquetFilters.createFilter( parquetSchema, @@ -778,6 +782,122 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sources.StringContains("b", "prefix"))) } + // Testing when `canRemoveOneSideInAnd == true` + // case sources.And(lhs, rhs) => + // ... + // case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter) + assertResult(Some(lt(intColumn("a"), 10: Integer))) { + parquetFilters.createFilter( + parquetSchema, + sources.And( + sources.StringContains("b", "prefix"), + sources.LessThan("a", 10))) + } + + // Testing complex And conditions + assertResult(Some( + FilterApi.and(lt(intColumn("a"), 10: Integer), gt(intColumn("a"), 5: Integer)))) { + parquetFilters.createFilter( + parquetSchema, + sources.And( + sources.And( + sources.LessThan("a", 10), + sources.StringContains("b", "prefix") + ), + sources.GreaterThan("a", 5))) + } + + // Testing complex And conditions + assertResult(Some( + FilterApi.and(gt(intColumn("a"), 5: Integer), lt(intColumn("a"), 10: Integer)))) { + parquetFilters.createFilter( + parquetSchema, + sources.And( + sources.GreaterThan("a", 5), + sources.And( + sources.StringContains("b", "prefix"), + sources.LessThan("a", 10) + ))) + } + + // Testing + // case sources.Or(lhs, rhs) => + // ... + // lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false) + assertResult(None) { + parquetFilters.createFilter( + parquetSchema, + sources.Or( + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix")), + sources.GreaterThan("a", 2))) + } + + // Testing + // case sources.Or(lhs, rhs) => + // ... + // rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false) + assertResult(None) { + parquetFilters.createFilter( + parquetSchema, + sources.Or( + sources.GreaterThan("a", 2), + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix")))) + } + + // Testing + // case sources.Not(pred) => + // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) + // .map(FilterApi.not) + // + // and + // + // Testing when `canRemoveOneSideInAnd == false` + // case sources.And(lhs, rhs) => + // ... + // case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter) + assertResult(None) { + parquetFilters.createFilter( + parquetSchema, + sources.Not( + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix")))) + } + + // Testing + // case sources.Not(pred) => + // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) + // .map(FilterApi.not) + // + // and + // + // Testing when `canRemoveOneSideInAnd == false` + // case sources.And(lhs, rhs) => + // ... + // case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter) + assertResult(None) { + parquetFilters.createFilter( + parquetSchema, + sources.Not( + sources.And( + sources.StringContains("b", "prefix"), + sources.GreaterThan("a", 1)))) + } + + // Testing + // case sources.Not(pred) => + // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) + // .map(FilterApi.not) + // + // and + // + // Testing passing `canRemoveOneSideInAnd = false` into + // case sources.And(lhs, rhs) => + // val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd) assertResult(None) { parquetFilters.createFilter( parquetSchema, @@ -789,13 +909,25 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sources.GreaterThan("a", 2)))) } + // Testing + // case sources.Not(pred) => + // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) + // .map(FilterApi.not) + // + // and + // + // Testing passing `canRemoveOneSideInAnd = false` into + // case sources.And(lhs, rhs) => + // val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd) assertResult(None) { parquetFilters.createFilter( parquetSchema, sources.Not( sources.And( - sources.GreaterThan("a", 1), - sources.StringContains("b", "prefix")))) + sources.GreaterThan("a", 2), + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix"))))) } }