Skip to content

Commit

Permalink
[SPARK-40169][SQL] Don't pushdown Parquet filters with no reference t…
Browse files Browse the repository at this point in the history
…o data schema

### What changes were proposed in this pull request?

Currently in Parquet V1 read path, Spark will pushdown data filters even if they have no reference in the Parquet read schema. This can cause correctness issues as described in [SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833).

The root cause, it seems, is because in the V1 path, we first use `AttributeReference` equality to filter out data columns without partition columns, and then use `AttributeSet` equality to filter out filters with only references to data columns.
There's inconsistency in the two steps, when case sensitive check is false.

Take the following scenario as example:
- data column: `[COL, a]`
- partition column: `[col]`
- filter: `col > 10`

With `AttributeReference` equality, `COL` is not considered equal to `col` (because their names are different), and thus the filtered out data column set is still `[COL, a]`. However, when calculating filters with only reference to data columns, `COL` is **considered equal** to `col`. Consequently, the filter `col > 10`, when checking with `[COL, a]`, is considered to have reference to data columns, and thus will be pushed down to Parquet as data filter.

On the Parquet side, since `col` doesn't exist in the file schema (it only has `COL`), when column index enabled, it will incorrectly return wrong number of rows. See [PARQUET-2170](https://issues.apache.org/jira/browse/PARQUET-2170) for more detail.

In general, where data columns overlap with partition columns and case sensitivity is false, partition filters will not be filter out before we calculate filters with only reference to data columns, which is incorrect.

### Why are the changes needed?

This fixes the correctness bug described in [SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833).

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

There are existing test cases for this issue from [SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833). This also modified them to test the scenarios when case sensitivity is on or off.

Closes #37881 from sunchao/SPARK-40169.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Chao Sun <sunchao@apple.com>
  • Loading branch information
sunchao committed Sep 16, 2022
1 parent d71b180 commit 4e0fea2
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {

// Partition keys are not available in the statistics of the files.
// `dataColumns` might have partition columns, we need to filter them out.
val dataColumnsWithoutPartitionCols = dataColumns.filterNot(partitionColumns.contains)
val dataColumnsWithoutPartitionCols = dataColumns.filterNot(partitionSet.contains)
val dataFilters = normalizedFiltersWithoutSubqueries.flatMap { f =>
if (f.references.intersect(partitionSet).nonEmpty) {
extractPredicatesWithinOutputSet(f, AttributeSet(dataColumnsWithoutPartitionCols))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,6 @@ class ParquetFileFormat
SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key,
sparkSession.sessionState.conf.parquetTimestampNTZEnabled)

// See PARQUET-2170.
// Disable column index optimisation when required schema does not have columns that appear in
// pushed filters to avoid getting incorrect results.
hadoopConf.setBooleanIfUnset(ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED, false)

val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1161,24 +1161,34 @@ class ParquetV1QuerySuite extends ParquetQuerySuite {
}

test("SPARK-39833: pushed filters with count()") {
withTempPath { path =>
val p = s"${path.getCanonicalPath}${File.separator}col=0${File.separator}"
Seq(0).toDF("COL").coalesce(1).write.save(p)
val df = spark.read.parquet(path.getCanonicalPath)
checkAnswer(df.filter("col = 0"), Seq(Row(0)))
assert(df.filter("col = 0").count() == 1, "col")
assert(df.filter("COL = 0").count() == 1, "COL")
Seq(true, false).foreach { caseSensitive =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
withTempPath { path =>
val p = s"${path.getCanonicalPath}${File.separator}col=0${File.separator}"
Seq(0).toDF("COL").coalesce(1).write.save(p)
val df = spark.read.parquet(path.getCanonicalPath)
val expected = if (caseSensitive) Seq(Row(0, 0)) else Seq(Row(0))
checkAnswer(df.filter("col = 0"), expected)
assert(df.filter("col = 0").count() == 1, "col")
assert(df.filter("COL = 0").count() == 1, "COL")
}
}
}
}

test("SPARK-39833: pushed filters with project without filter columns") {
withTempPath { path =>
val p = s"${path.getCanonicalPath}${File.separator}col=0${File.separator}"
Seq((0, 1)).toDF("COL", "a").coalesce(1).write.save(p)
val df = spark.read.parquet(path.getCanonicalPath)
checkAnswer(df.filter("col = 0"), Seq(Row(0, 1)))
assert(df.filter("col = 0").select("a").collect().toSeq == Row(1) :: Nil)
assert(df.filter("col = 0 and a = 1").select("a").collect().toSeq == Row(1) :: Nil)
Seq(true, false).foreach { caseSensitive =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
withTempPath { path =>
val p = s"${path.getCanonicalPath}${File.separator}col=0${File.separator}"
Seq((0, 1)).toDF("COL", "a").coalesce(1).write.save(p)
val df = spark.read.parquet(path.getCanonicalPath)
val expected = if (caseSensitive) Seq(Row(0, 1, 0)) else Seq(Row(0, 1))
checkAnswer(df.filter("col = 0"), expected)
assert(df.filter("col = 0").select("a").collect().toSeq == Row(1) :: Nil)
assert(df.filter("col = 0 and a = 1").select("a").collect().toSeq == Row(1) :: Nil)
}
}
}
}
}
Expand Down

0 comments on commit 4e0fea2

Please sign in to comment.