Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Fix reading partition key columns in DeltaLake #2118

Merged
merged 8 commits into from
Apr 15, 2024

Conversation

jaychia
Copy link
Contributor

@jaychia jaychia commented Apr 14, 2024

Fixes pushdowns for column selection on partition keys in DeltaLake and Iceberg.

When table formats such as Iceberg and Delta Lake store the data for a partition column, they will strip the column from the actual Parquet data files that they write out. Then when we want to select only specific columns, our Parquet reader fails when it is not able to find those columns in the file.

NOTE: Seems like Iceberg only does this for identity transformed partition columns

Follow-on issue for selection of only the partition keys: #2129

@github-actions github-actions bot added the bug Something isn't working label Apr 14, 2024
Copy link

codecov bot commented Apr 15, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 85.15%. Comparing base (0fe3df7) to head (f05a535).

❗ Current head f05a535 differs from pull request most recent head 4b1ef7d. Consider uploading reports for the commit 4b1ef7d to get more accurate results

Additional details and impacted files

Impacted file tree graph

@@           Coverage Diff           @@
##             main    #2118   +/-   ##
=======================================
  Coverage   85.15%   85.15%           
=======================================
  Files          68       68           
  Lines        7356     7356           
=======================================
  Hits         6264     6264           
  Misses       1092     1092           

.pushdowns
.columns
.as_ref()
.map(|v| v.iter().map(|s| s.as_ref()).collect::<Vec<&str>>());
.map(|v| v.iter().map(|s| s.as_str()).collect::<Vec<&str>>());
let file_column_names =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you likely need to apply this logic to the eager code path. Say you passed in a predicate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fixed in this section: https://github.com/Eventual-Inc/Daft/pull/2118/files#diff-1d03b58906d515cc349a8a414595727e787d60af7adb3ab9fb3db3558c9e7e27R789-R808

Unfortunately our code is a little jank right now, with 2 main codepaths:

  1. MicroPartition::from_scan_task either eagerly produces loaded micropartitions or lazily produces unloaded micropartitions using read_parquet_into_micropartition
  2. Lazy micropartitions then load data using materialize_scan_task

So any logic often has to be duplicated across those two functions (read_parquet_into_micropartition and materialize_scan_task). Perhaps there is some opportunity to consolidate this by having read_parquet_into_micropartition call materialize_scan_task under the hood, but that might be a refactor for another PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a simpler way would be to just wrap read_parquet_bulk in this crate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate? Both read_parquet_into_micropartition and materialize_scan_task do currently wrap read_parquet_bulk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that @samster25 was talking about creating a read_parquet_bulk_with_virtual (or similarly named) utility in this micropartition.rs module that does all of the virtual column + pushdown handling before calling read_parquet_bulk, shared (called) by both the eager and lazy paths, so we don't have to duplicate this handling logic.

However, that would require duplicating the read_parquet_bulk API, which is pretty large, and we also wouldn't have coverage for other file types (e.g. if we added Avro or ORC support to Iceberg reads). @samster25 am I misunderstanding your suggestion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do already have a shared _read_parquet_into_loaded_micropartition utility for read_parquet_into_micropartition, maybe we should refactor materialize_scan_task to call out to similar helpers for each file format.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that @samster25 was talking about creating a read_parquet_bulk_with_virtual (or similarly named) utility in this micropartition.rs module that does all of the virtual column + pushdown handling before calling read_parquet_bulk, shared (called) by both the eager and lazy paths, so we don't have to duplicate this handling logic.

Yeah exactly!

@@ -82,3 +82,40 @@ def test_daft_iceberg_table_renamed_column_pushdown_collect_correct(local_iceber
iceberg_pandas = tab.scan().to_arrow().to_pandas()
iceberg_pandas = iceberg_pandas[["idx_renamed"]]
assert_df_equals(daft_pandas, iceberg_pandas, sort_key=[])


@pytest.mark.integration()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add 2 tests:

  • select only the partition column with filter on it
  • select only the partition column with filter on another column

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the two requested tests:

  1. This fails, because reading just the partition column only still fails after this PR (we dont have a great way of "reading [] columns from a file" at the moment
  2. This passes

@jaychia jaychia merged commit a05dfb5 into main Apr 15, 2024
29 checks passed
@jaychia jaychia deleted the jay/parquet-delta-pushdown branch April 15, 2024 22:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants