Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40169][SQL] Don't pushdown Parquet filters with no reference to data schema #37881

Closed
wants to merge 2 commits into from

Conversation

sunchao
Copy link
Member

@sunchao sunchao commented Sep 14, 2022

What changes were proposed in this pull request?

Currently in Parquet V1 read path, Spark will pushdown data filters even if they have no reference in the Parquet read schema. This can cause correctness issues as described in SPARK-39833.

The root cause, it seems, is because in the V1 path, we first use AttributeReference equality to filter out data columns without partition columns, and then use AttributeSet equality to filter out filters with only references to data columns.
There's inconsistency in the two steps, when case sensitive check is false.

Take the following scenario as example:

  • data column: [COL, a]
  • partition column: [col]
  • filter: col > 10

With AttributeReference equality, COL is not considered equal to col (because their names are different), and thus the filtered out data column set is still [COL, a]. However, when calculating filters with only reference to data columns, COL is considered equal to col. Consequently, the filter col > 10, when checking with [COL, a], is considered to have reference to data columns, and thus will be pushed down to Parquet as data filter.

On the Parquet side, since col doesn't exist in the file schema (it only has COL), when column index enabled, it will incorrectly return wrong number of rows. See PARQUET-2170 for more detail.

In general, where data columns overlap with partition columns and case sensitivity is false, partition filters will not be filter out before we calculate filters with only reference to data columns, which is incorrect.

Why are the changes needed?

This fixes the correctness bug described in SPARK-39833.

Does this PR introduce any user-facing change?

No

How was this patch tested?

There are existing test cases for this issue from SPARK-39833. This also modified them to test the scenarios when case sensitivity is on or off.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. (Pending CIs). Thank you, @sunchao .

cc @viirya and @huaxingao

@github-actions github-actions bot added the SQL label Sep 14, 2022
Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, do we have any test case for this situation?

@sunchao
Copy link
Member Author

sunchao commented Sep 14, 2022

@viirya please see test cases added in SPARK-39833.

cc @sadikovi @cloud-fan @HyukjinKwon too.

@@ -186,10 +186,10 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {

// Partition keys are not available in the statistics of the files.
// `dataColumns` might have partition columns, we need to filter them out.
val dataColumnsWithoutPartitionCols = dataColumns.filterNot(partitionColumns.contains)
val dataColumnsWithoutPartitionCols = AttributeSet(dataColumns) -- partitionColumns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this is correct. Can you elaborate?

Copy link
Contributor

@sadikovi sadikovi Sep 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not an expert in AttributeSet so it would be good if you could explain how it makes this work so I can reference it in the future 😄.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sadikovi np. I explained a bit in the PR description. Let me add more details.

There are two steps when calculating data filters for V1 file source:

  1. compute dataColumnsWithoutPartitionCols
  2. call extractPredicatesWithinOutputSet using dataColumnsWithoutPartitionCols on the filters, to obtain data only filters which are supposed to be pushed down to data sources

In the first step, the equality check is done via AttributeReference.equals, which checks attribute name, among other things.

In the second step, however, the equality is checked via AttributeEquals.equals which only checks expression ID.

This inconsistency poses an issue when case sensitive check is false (which is the default behavior). For the example in the PR description:

  • data column: [COL, a]
  • partition column: [col]
  • filter: col > 10

The expression ID for data column COL and partition column col are the same because of case insensitivity. In the first step above, however, COL and col are not considered equal and thus the dataColumnsWithoutPartitionCols will still be [COL, a]. In the second step, the data filters are calculated using AttributeEquals.equals and COL is treated as data column. As result, filter col > 10 is considered as data filter and pushed down.

In general, where data columns overlap with partition columns and case sensitivity is false, the first step will not filter out partition columns, so they will still be used in the second step. This is incorrect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@cloud-fan
Copy link
Contributor

This seems like a corner case when data columns and partition columns overlap (assuming you didn't set the case sensitivity flag to true).

When data columns and partition columns overlap, Spark reads the actual values from partition columns and ignore the overlapping data columns. See HadoopFsRelation.schema. That said, in your example, the filter col > 10 should be a partition filter, not data filter.

@@ -186,10 +186,10 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {

// Partition keys are not available in the statistics of the files.
// `dataColumns` might have partition columns, we need to filter them out.
val dataColumnsWithoutPartitionCols = dataColumns.filterNot(partitionColumns.contains)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the fix should be dataColumns.filterNot(partitionSet.contains)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think semantically both are the same. In

AttributeSet(dataColumns) -- partitionColumns

partitionColumns is first wrapped into AttributeSet and then compared with AttributeSet(dataColumns).

Your version does require one less line of change though :)

Copy link
Contributor

@sadikovi sadikovi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing the issue!

Copy link
Contributor

@LuciferYang LuciferYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM

@sunchao sunchao closed this in 4e0fea2 Sep 16, 2022
sunchao added a commit that referenced this pull request Sep 16, 2022
…o data schema

### What changes were proposed in this pull request?

Currently in Parquet V1 read path, Spark will pushdown data filters even if they have no reference in the Parquet read schema. This can cause correctness issues as described in [SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833).

The root cause, it seems, is because in the V1 path, we first use `AttributeReference` equality to filter out data columns without partition columns, and then use `AttributeSet` equality to filter out filters with only references to data columns.
There's inconsistency in the two steps, when case sensitive check is false.

Take the following scenario as example:
- data column: `[COL, a]`
- partition column: `[col]`
- filter: `col > 10`

With `AttributeReference` equality, `COL` is not considered equal to `col` (because their names are different), and thus the filtered out data column set is still `[COL, a]`. However, when calculating filters with only reference to data columns, `COL` is **considered equal** to `col`. Consequently, the filter `col > 10`, when checking with `[COL, a]`, is considered to have reference to data columns, and thus will be pushed down to Parquet as data filter.

On the Parquet side, since `col` doesn't exist in the file schema (it only has `COL`), when column index enabled, it will incorrectly return wrong number of rows. See [PARQUET-2170](https://issues.apache.org/jira/browse/PARQUET-2170) for more detail.

In general, where data columns overlap with partition columns and case sensitivity is false, partition filters will not be filter out before we calculate filters with only reference to data columns, which is incorrect.

### Why are the changes needed?

This fixes the correctness bug described in [SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833).

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

There are existing test cases for this issue from [SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833). This also modified them to test the scenarios when case sensitivity is on or off.

Closes #37881 from sunchao/SPARK-40169.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Chao Sun <sunchao@apple.com>
@sunchao
Copy link
Member Author

sunchao commented Sep 16, 2022

Thanks! merged to master/branch-3.3/branch-3.2 (test failure unrelated).

sunchao added a commit that referenced this pull request Sep 16, 2022
…o data schema

### What changes were proposed in this pull request?

Currently in Parquet V1 read path, Spark will pushdown data filters even if they have no reference in the Parquet read schema. This can cause correctness issues as described in [SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833).

The root cause, it seems, is because in the V1 path, we first use `AttributeReference` equality to filter out data columns without partition columns, and then use `AttributeSet` equality to filter out filters with only references to data columns.
There's inconsistency in the two steps, when case sensitive check is false.

Take the following scenario as example:
- data column: `[COL, a]`
- partition column: `[col]`
- filter: `col > 10`

With `AttributeReference` equality, `COL` is not considered equal to `col` (because their names are different), and thus the filtered out data column set is still `[COL, a]`. However, when calculating filters with only reference to data columns, `COL` is **considered equal** to `col`. Consequently, the filter `col > 10`, when checking with `[COL, a]`, is considered to have reference to data columns, and thus will be pushed down to Parquet as data filter.

On the Parquet side, since `col` doesn't exist in the file schema (it only has `COL`), when column index enabled, it will incorrectly return wrong number of rows. See [PARQUET-2170](https://issues.apache.org/jira/browse/PARQUET-2170) for more detail.

In general, where data columns overlap with partition columns and case sensitivity is false, partition filters will not be filter out before we calculate filters with only reference to data columns, which is incorrect.

### Why are the changes needed?

This fixes the correctness bug described in [SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833).

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

There are existing test cases for this issue from [SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833). This also modified them to test the scenarios when case sensitivity is on or off.

Closes #37881 from sunchao/SPARK-40169.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Chao Sun <sunchao@apple.com>
@dongjoon-hyun
Copy link
Member

Thank you, @sunchao and all!

LuciferYang pushed a commit to LuciferYang/spark that referenced this pull request Sep 20, 2022
…o data schema

### What changes were proposed in this pull request?

Currently in Parquet V1 read path, Spark will pushdown data filters even if they have no reference in the Parquet read schema. This can cause correctness issues as described in [SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833).

The root cause, it seems, is because in the V1 path, we first use `AttributeReference` equality to filter out data columns without partition columns, and then use `AttributeSet` equality to filter out filters with only references to data columns.
There's inconsistency in the two steps, when case sensitive check is false.

Take the following scenario as example:
- data column: `[COL, a]`
- partition column: `[col]`
- filter: `col > 10`

With `AttributeReference` equality, `COL` is not considered equal to `col` (because their names are different), and thus the filtered out data column set is still `[COL, a]`. However, when calculating filters with only reference to data columns, `COL` is **considered equal** to `col`. Consequently, the filter `col > 10`, when checking with `[COL, a]`, is considered to have reference to data columns, and thus will be pushed down to Parquet as data filter.

On the Parquet side, since `col` doesn't exist in the file schema (it only has `COL`), when column index enabled, it will incorrectly return wrong number of rows. See [PARQUET-2170](https://issues.apache.org/jira/browse/PARQUET-2170) for more detail.

In general, where data columns overlap with partition columns and case sensitivity is false, partition filters will not be filter out before we calculate filters with only reference to data columns, which is incorrect.

### Why are the changes needed?

This fixes the correctness bug described in [SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833).

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

There are existing test cases for this issue from [SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833). This also modified them to test the scenarios when case sensitivity is on or off.

Closes apache#37881 from sunchao/SPARK-40169.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Chao Sun <sunchao@apple.com>
sunchao added a commit to sunchao/spark that referenced this pull request Jun 2, 2023
…o data schema

### What changes were proposed in this pull request?

Currently in Parquet V1 read path, Spark will pushdown data filters even if they have no reference in the Parquet read schema. This can cause correctness issues as described in [SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833).

The root cause, it seems, is because in the V1 path, we first use `AttributeReference` equality to filter out data columns without partition columns, and then use `AttributeSet` equality to filter out filters with only references to data columns.
There's inconsistency in the two steps, when case sensitive check is false.

Take the following scenario as example:
- data column: `[COL, a]`
- partition column: `[col]`
- filter: `col > 10`

With `AttributeReference` equality, `COL` is not considered equal to `col` (because their names are different), and thus the filtered out data column set is still `[COL, a]`. However, when calculating filters with only reference to data columns, `COL` is **considered equal** to `col`. Consequently, the filter `col > 10`, when checking with `[COL, a]`, is considered to have reference to data columns, and thus will be pushed down to Parquet as data filter.

On the Parquet side, since `col` doesn't exist in the file schema (it only has `COL`), when column index enabled, it will incorrectly return wrong number of rows. See [PARQUET-2170](https://issues.apache.org/jira/browse/PARQUET-2170) for more detail.

In general, where data columns overlap with partition columns and case sensitivity is false, partition filters will not be filter out before we calculate filters with only reference to data columns, which is incorrect.

### Why are the changes needed?

This fixes the correctness bug described in [SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833).

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

There are existing test cases for this issue from [SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833). This also modified them to test the scenarios when case sensitivity is on or off.

Closes apache#37881 from sunchao/SPARK-40169.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Chao Sun <sunchao@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
7 participants