Skip to content

Commit

Permalink
[SPARK-17213][SQL] Disable Parquet filter push-down for string and bi…
Browse files Browse the repository at this point in the history
…nary columns due to PARQUET-686

This PR targets to both master and branch-2.1.

## What changes were proposed in this pull request?

Due to PARQUET-686, Parquet doesn't do string comparison correctly while doing filter push-down for string columns. This PR disables filter push-down for both string and binary columns to work around this issue. Binary columns are also affected because some Parquet data models (like Hive) may store string columns as a plain Parquet `binary` instead of a `binary (UTF8)`.

## How was this patch tested?

New test case added in `ParquetFilterSuite`.

Author: Cheng Lian <lian@databricks.com>

Closes #16106 from liancheng/spark-17213-bad-string-ppd.
  • Loading branch information
liancheng authored and rxin committed Dec 2, 2016
1 parent c82f16c commit ca63916
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
/*
// Binary.fromString and Binary.fromByteArray don't accept null values
case StringType =>
(n: String, v: Any) => FilterApi.eq(
Expand All @@ -49,6 +52,7 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
*/
}

private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -62,6 +66,9 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
/*
case StringType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Expand All @@ -70,6 +77,7 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
*/
}

private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -81,13 +89,17 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
/*
case StringType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n),
Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
*/
}

private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -99,13 +111,17 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
/*
case StringType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n),
Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
*/
}

private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -117,13 +133,17 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
/*
case StringType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n),
Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
*/
}

private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -135,13 +155,17 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
/*
case StringType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n),
Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
*/
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import org.apache.spark.util.{AccumulatorContext, LongAccumulator}
* data type is nullable.
*/
class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext {

private def checkFilterPredicate(
df: DataFrame,
predicate: Predicate,
Expand Down Expand Up @@ -230,7 +229,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}

test("filter pushdown - string") {
// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
ignore("filter pushdown - string") {
withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate(
Expand Down Expand Up @@ -258,7 +258,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}

test("filter pushdown - binary") {
// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
ignore("filter pushdown - binary") {
implicit class IntToBinary(int: Int) {
def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8)
}
Expand Down Expand Up @@ -558,4 +559,23 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
}

test("SPARK-17213: Broken Parquet filter push-down for string columns") {
withTempPath { dir =>
import testImplicits._

val path = dir.getCanonicalPath
// scalastyle:off nonascii
Seq("a", "é").toDF("name").write.parquet(path)
// scalastyle:on nonascii

assert(spark.read.parquet(path).where("name > 'a'").count() == 1)
assert(spark.read.parquet(path).where("name >= 'a'").count() == 2)

// scalastyle:off nonascii
assert(spark.read.parquet(path).where("name < 'é'").count() == 1)
assert(spark.read.parquet(path).where("name <= 'é'").count() == 2)
// scalastyle:on nonascii
}
}
}

0 comments on commit ca63916

Please sign in to comment.