Skip to content

Conversation

@sadikovi
Copy link
Contributor

@sadikovi sadikovi commented Aug 5, 2022

What changes were proposed in this pull request?

This PR fixes a correctness issue in Parquet DSv1 FileFormat when projection does not contain columns referenced in pushed filters. This typically happens when partition columns and data columns overlap.

This could result in empty result when in fact there were records matching predicate as can be seen in the provided fields.

The problem is especially visible with count() and show() reporting different results, for example, show() would return 1+ records where the count() would return 0.

In Parquet, when the predicate is provided and column index is enabled, we would try to filter row ranges to figure out what the count should be. Unfortunately, there is an issue that if the projection is empty or is not in the set of filter columns, any checks on columns would fail and 0 rows are returned (RowRanges.EMPTY) even though there is data matching the filter.

Note that this is rather a mitigation, a quick fix. The actual fix needs to go into Parquet-MR: https://issues.apache.org/jira/browse/PARQUET-2170.

The fix is not required in DSv2 where the overlapping columns are removed in FileScanBuilder::readDataSchema().

Why are the changes needed?

Fixes a correctness issue when projection columns are not referenced by columns in pushed down filters or the schema is empty in Parquet DSv1.

Downsides: Parquet column filter would be disabled if it had not been explicitly enabled which could affect performance.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

I added a unit test that reproduces this behaviour. The test fails without the fix and passes with the fix.

@github-actions github-actions bot added the SQL label Aug 5, 2022
@sadikovi sadikovi changed the title [SPARK-39833][SQL] Fix a Parquet incorrect count issue when requiredSchema is empty and column index is enabled [SPARK-39833][SQL] Fix Parquet incorrect count issue when requiredSchema is empty and column index is enabled Aug 5, 2022
// See PARQUET-2170.
// Disable column index optimisation when required schema is empty so we get the correct
// row count from parquet-mr.
if (requiredSchema.isEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a similar fix for DSv2?

Copy link
Contributor Author

@sadikovi sadikovi Aug 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is not required for DSv2.

The test works in DSv2 due to another inconsistency - Parquet DSv2 filters out the column in readDataSchema()` method due to the fact that both partition column and data column are similar in a case insensitive mode. The final schema becomes empty resulting in the empty list of filters and thus returning the correct number of records. It is rather a performance inefficiency in DSv2 as the entire file will be scanned. However, the result will be correct.

@sunchao
Copy link
Member

sunchao commented Aug 5, 2022

@sadikovi in the example you gave:

root/
  col0=0/
    part-0001.parquet (schema: COL0)

what's the content in part-0001.parquet? I wonder why we need to pushdown partition filters to Parquet, given that we'll not materialize the partition values in the Parquet files. What is the pushed filters to Parquet in this example?

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@sadikovi
Copy link
Contributor Author

sadikovi commented Aug 7, 2022

The content is just one column COL with one value 0. You would still want to filter out row groups that don't much the predicate otherwise it could be a performance regression. The filters are evaluated in the example, see https://issues.apache.org/jira/browse/PARQUET-2170.

@sadikovi
Copy link
Contributor Author

sadikovi commented Aug 7, 2022

To be honest, I am still thinking about the best way to mitigate the problem at the moment.
I decided to fix it in a similar way as what DSv2 does.

@sadikovi sadikovi changed the title [SPARK-39833][SQL] Fix Parquet incorrect count issue when requiredSchema is empty and column index is enabled [SPARK-39833][SQL] Fix Parquet incorrect count issue when requiredSchema is empty and column index is enabled in DSv1 Aug 7, 2022
@sadikovi sadikovi changed the title [SPARK-39833][SQL] Fix Parquet incorrect count issue when requiredSchema is empty and column index is enabled in DSv1 [SPARK-39833][SQL] Remove partition columns from data schema in the case of overlapping columns to fix Parquet DSv1 incorrect count issue Aug 8, 2022
@sadikovi
Copy link
Contributor Author

sadikovi commented Aug 8, 2022

@cloud-fan Can you review? This change modifies a test that was added in SPARK-22356 that you authored. Thanks.

// The DESC TABLE should report same schema as table scan.
assert(sql("desc t").select("col_name")
.as[String].collect().mkString(",").contains("i,p,j"))
.as[String].collect().mkString(",").contains("i,j,p"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a behavior change (query schema change) that is hard to accept.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sadikovi do you know how is it caused? It's unclear from the code changes in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partition columns are always appended to the schema. In the case of overlapping columns, we now remove all of the partition columns from the schema and append them afterwards. This does not change the result but changes the column output.

Essentially:
data schema: i, p, j, partition schema: p. We will remove p and append partition column: i, j, p.

Previously we would keep the partition column as part of the data schema and insert partition values into it, which IMHO a bit confusing. This change also makes it compatible with DSv2 which is how it works there.

Copy link
Contributor Author

@sadikovi sadikovi Aug 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan My initial fix was this:

// See PARQUET-2170.
// Disable column index optimisation when required schema is empty so we get the correct
// row count from parquet-mr.
if (requiredSchema.isEmpty) {
  hadoopConf.setBoolean(ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED, false)
}

I can revert to this code if you prefer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer the surgical fix. Making it consistent with file source v2 does not justify a breaking change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, makes perfect sense. Let me update the PR.

This reverts commit e69609c.
This reverts commit 43e52d0.
@sadikovi sadikovi changed the title [SPARK-39833][SQL] Remove partition columns from data schema in the case of overlapping columns to fix Parquet DSv1 incorrect count issue [SPARK-39833][SQL] Fix a rare correctness issue with count() in the case of overlapping partition and data columns in Parquet DSv1 Aug 11, 2022
@sadikovi
Copy link
Contributor Author

@sunchao @cloud-fan Would you be able to take another look?

I have kept the original "patch". It is essentially a band aid until the Parquet ticket is fixed. I cannot think of a better and less intrusive way to fix the problem. Let me know if you have any questions about it, I will be happy to clarify. Thanks.

@sadikovi sadikovi requested review from cloud-fan and sunchao August 12, 2022 03:27
Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fix looks safe to me. I'm curious though, does this issue only happen when the filters pushed down to Parquet reference columns that do not exist in Parquet file schema (e.g., partition column in Spark when case insensitive)?

@sadikovi
Copy link
Contributor Author

Not exactly, the filter actually references columns that exist in the file. It is the projection that matters in the code apparently.

Here is what they have in the javadoc:

   * @param paths
   *          the paths of the columns used in the actual projection; a column not being part of the projection will be
   *          handled as containing {@code null} values only even if the column has values written in the file

https://github.com/apache/parquet-mr/blob/0819356a9dafd2ca07c5eab68e2bffeddc3bd3d9/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java#L80)

I am not very familiar with the implementation but I think the library should be returning all rows instead of empty rows.

@cloud-fan
Copy link
Contributor

@sadikovi which spark version starts to have this bug?

@sadikovi
Copy link
Contributor Author

Oh, this was originally implemented in PARQUET-1201 which is parquet-mr 1.11.0. Again, this is a rare case maybe we don't do anything about it or merge in master only.

I don't know if there is a better way to fix the problem other than fixing it in parquet-mr.

}
}

test("SPARK-39833: count() with pushed filters from Parquet files") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we change this test to:

  test("SPARK-39833-2: count() with pushed filters from Parquet files") {
    withTempPath { path =>
      val p = s"${path.getCanonicalPath}${File.separator}col=0${File.separator}"
      Seq((0, "a")).toDF("COL", "b").coalesce(1).write.save(p)
      val df = spark.read.parquet(path.getCanonicalPath)
      checkAnswer(df.filter("col = 0"), Seq(Row(0, "a")))
      assert(df.filter("col = 0").select('b).collect().toSeq == Row("a") :: Nil)
    }
  }

it seems checking whether requestedSchema is empty is not sufficient enough, since it can be non-empty but the pushed filter could reference column that do not exist in it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the provided test fails. it appears the problem is wide-spread due to this parquet-mr bug. I would have to change the code to check filters that are pushed down against the requested schema instead of a full one but then it might introduce a performance issue.

I think the safest for now is to disable column indexes altogether. I will take a look at what could be done.
We can also merge this PR and I can follow up on the fix later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... I would have to change the code to check filters that are pushed down against the requested schema instead of a full one but then it might introduce a performance issue.

Yes I think this would be the approach, but why it might introduce performance issues?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be no predicate pushdown in this case so reads could be slower potentially.

Copy link
Contributor Author

@sadikovi sadikovi Aug 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may need to disable column index altogether until we figure out the proper fix. I verified that hadoopConf.setBoolean(ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED, false) makes both tests pass.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, there seems to be another bug in DSv2 which I will fix later:

checkAnswer(df.filter("col = 0"), Seq(Row(0, "a")))

fails in DSv2 due to a different column order.

Let's do one problem at a time.

@sadikovi
Copy link
Contributor Author

Apologies, I did not have time to debug this yet. I will do that tomorrow.

@sadikovi sadikovi changed the title [SPARK-39833][SQL] Fix a rare correctness issue with count() in the case of overlapping partition and data columns in Parquet DSv1 [SPARK-39833][SQL] Disable Parquet column index in DSv1 to fix a correctness issue in the case of overlapping partition and data columns Aug 18, 2022
@sadikovi
Copy link
Contributor Author

I decided to disable column index altogether until I have a better fix or parquet bug is fixed. I also moved tests to ParquetQueryV1 as one of the tests fails in DSv2 due to another bug in projection.

@cloud-fan @sunchao Can you review this PR?
I just think adding a check on required schema and column filters could be error-prone especially when nested fields are involved. It seems to me it is easier to disable column index by default which can still be enabled manually by users.

I am also open to other suggestions.

@sadikovi
Copy link
Contributor Author

I suggest we merge the PR with this fix and I will follow up on a more permanent resolution, maybe fix it in Parquet-mr. I am also thinking that we may need to backport it to 3.3 although this would be up to committers.

@sunchao
Copy link
Member

sunchao commented Aug 18, 2022

I think it's fine to disable it temporarily, but I'd prefer a fix in Spark itself though so that we can backport it to 3.3 without relying on a Parquet release and bumping the version there. I can also take a look on the approach of checking filters against required schema.

Could you open a JIRA tracking the permanent fix in Spark, and mark it as blocker for 3.4.0 release?

@sadikovi
Copy link
Contributor Author

I will take a look on how to fix it in Spark, I have not had enough time to work on this problem yet.
I would like to mention that the fix should be merged into 3.3 and 3.4 and potentially earlier releases where the bug occurs as it is a correctness issue, it is also unclear if it partition columns related at this point, it could be potentially reproducible with a simple predicate pushdown and projection.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, yeah let's disable it for now since it's a correctness bug anyway.

@sadikovi
Copy link
Contributor Author

sadikovi commented Aug 19, 2022

@sunchao Can you help me to find a workaround in Spark for this if we want to fix it as part of this PR? Otherwise, I can take a look in a follow-up. Thanks.

@sunchao
Copy link
Member

sunchao commented Aug 19, 2022

@sadikovi yes, I can also take a look at this next week. I'm fine either way: what do you think @cloud-fan @HyukjinKwon , should we merge this PR as it is (via disabling column index) first, and work on a fix separately?

@HyukjinKwon
Copy link
Member

Yeah, let's just get this in first.

@HyukjinKwon
Copy link
Member

Merged to master, branch-3.3, and branch-3.2.

HyukjinKwon pushed a commit that referenced this pull request Aug 21, 2022
…ectness issue in the case of overlapping partition and data columns

### What changes were proposed in this pull request?

This PR fixes a correctness issue in Parquet DSv1 FileFormat when projection does not contain columns referenced in pushed filters. This typically happens when partition columns and data columns overlap.

This could result in empty result when in fact there were records matching predicate as can be seen in the provided fields.

The problem is especially visible with `count()` and `show()` reporting different results, for example, show() would return 1+ records where the count() would return 0.

In Parquet, when the predicate is provided and column index is enabled, we would try to filter row ranges to figure out what the count should be. Unfortunately, there is an issue that if the projection is empty or is not in the set of filter columns, any checks on columns would fail and 0 rows are returned (`RowRanges.EMPTY`) even though there is data matching the filter.

Note that this is rather a mitigation, a quick fix. The actual fix needs to go into Parquet-MR: https://issues.apache.org/jira/browse/PARQUET-2170.

The fix is not required in DSv2 where the overlapping columns are removed in `FileScanBuilder::readDataSchema()`.

### Why are the changes needed?

Fixes a correctness issue when projection columns are not referenced by columns in pushed down filters or the schema is empty in Parquet DSv1.

Downsides: Parquet column filter would be disabled if it had not been explicitly enabled which could affect performance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a unit test that reproduces this behaviour. The test fails without the fix and passes with the fix.

Closes #37419 from sadikovi/SPARK-39833.

Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit cde71aa)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon pushed a commit that referenced this pull request Aug 21, 2022
…ectness issue in the case of overlapping partition and data columns

### What changes were proposed in this pull request?

This PR fixes a correctness issue in Parquet DSv1 FileFormat when projection does not contain columns referenced in pushed filters. This typically happens when partition columns and data columns overlap.

This could result in empty result when in fact there were records matching predicate as can be seen in the provided fields.

The problem is especially visible with `count()` and `show()` reporting different results, for example, show() would return 1+ records where the count() would return 0.

In Parquet, when the predicate is provided and column index is enabled, we would try to filter row ranges to figure out what the count should be. Unfortunately, there is an issue that if the projection is empty or is not in the set of filter columns, any checks on columns would fail and 0 rows are returned (`RowRanges.EMPTY`) even though there is data matching the filter.

Note that this is rather a mitigation, a quick fix. The actual fix needs to go into Parquet-MR: https://issues.apache.org/jira/browse/PARQUET-2170.

The fix is not required in DSv2 where the overlapping columns are removed in `FileScanBuilder::readDataSchema()`.

### Why are the changes needed?

Fixes a correctness issue when projection columns are not referenced by columns in pushed down filters or the schema is empty in Parquet DSv1.

Downsides: Parquet column filter would be disabled if it had not been explicitly enabled which could affect performance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a unit test that reproduces this behaviour. The test fails without the fix and passes with the fix.

Closes #37419 from sadikovi/SPARK-39833.

Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit cde71aa)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@sadikovi
Copy link
Contributor Author

Thank you for merging the PR. I have opened the follow-up ticket https://issues.apache.org/jira/browse/SPARK-40169 to fix this properly. I will sync with @sunchao separately on this, I am sure we will be able to come up with a proper way to fix it!

sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
…ectness issue in the case of overlapping partition and data columns

### What changes were proposed in this pull request?

This PR fixes a correctness issue in Parquet DSv1 FileFormat when projection does not contain columns referenced in pushed filters. This typically happens when partition columns and data columns overlap.

This could result in empty result when in fact there were records matching predicate as can be seen in the provided fields.

The problem is especially visible with `count()` and `show()` reporting different results, for example, show() would return 1+ records where the count() would return 0.

In Parquet, when the predicate is provided and column index is enabled, we would try to filter row ranges to figure out what the count should be. Unfortunately, there is an issue that if the projection is empty or is not in the set of filter columns, any checks on columns would fail and 0 rows are returned (`RowRanges.EMPTY`) even though there is data matching the filter.

Note that this is rather a mitigation, a quick fix. The actual fix needs to go into Parquet-MR: https://issues.apache.org/jira/browse/PARQUET-2170.

The fix is not required in DSv2 where the overlapping columns are removed in `FileScanBuilder::readDataSchema()`.

### Why are the changes needed?

Fixes a correctness issue when projection columns are not referenced by columns in pushed down filters or the schema is empty in Parquet DSv1.

Downsides: Parquet column filter would be disabled if it had not been explicitly enabled which could affect performance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a unit test that reproduces this behaviour. The test fails without the fix and passes with the fix.

Closes apache#37419 from sadikovi/SPARK-39833.

Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit cde71aa)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants