Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to DataFusion 14.0.0 #903

Merged
merged 21 commits into from
Nov 15, 2022
Merged

Conversation

andygrove
Copy link
Contributor

@andygrove andygrove commented Nov 3, 2022

Changes in this PR:

  • Use DataFusion 14.0.0
  • Added copy of filter_push_down rule from DataFusion 13.0.0 because there are changes in the DataFusion 14.0.0 version that cause regressions for us. We should revert back to using DataFusion's version at some point. I filed [ENH] Use filter_push_down rule from DataFusion #908 for this.

@ayushdg
Copy link
Collaborator

ayushdg commented Nov 3, 2022

Some of the failures here come from the fact that with datafusion 14.0 the plans generated are slightly different leading to the addition of an additional ddf.a=1 step before the filters are applied:

SELECT a FROM parquet_ddf WHERE (b > 5 AND b < 10) OR a = 1

# dask-sql main (datafusion rev)
Projection: parquet_ddf.a, parquet_ddf.b, parquet_ddf.c, parquet_ddf.d
  Filter: parquet_ddf.b > Int64(5) AND parquet_ddf.b < Int64(10) OR parquet_ddf.a = Int64(1)
    TableScan: parquet_ddf projection=[a, b, c, d]

# df 14.0
Projection: parquet_ddf.a, parquet_ddf.b, parquet_ddf.c, parquet_ddf.d
  Filter: (parquet_ddf.b > Int64(5) OR parquet_ddf.a = Int64(1)Int64(1)parquet_ddf.a) AND (parquet_ddf.b < Int64(10) OR parquet_ddf.a = Int64(1)Int64(1)parquet_ddf.a)
    Projection: parquet_ddf.a = Int64(1) AS parquet_ddf.a = Int64(1)Int64(1)parquet_ddf.a, parquet_ddf.a, parquet_ddf.b, parquet_ddf.c, parquet_ddf.d
      TableScan: parquet_ddf projection=[a, b, c, d]

In this case it's safe to push down the filter to the IO since the df.a=1 op is selecting the same value as the filter. In a more arbitrary case if we happened to have df.a=other_val it would not be safe to push the filter of df.a=1 down to the IO.

The way dask handles it today is not by looking at the val, but generally allowing a subset of operations (irrespective of the values) to appear between the IO and filter stage to push predicates down.

@ayushdg
Copy link
Collaborator

ayushdg commented Nov 4, 2022

Looking into this a bit more the step that introduces this additional projection after the table scan comes from the common_sub_expression_eliminate rule. I'm not sure if this is a case where a=1 is a common sub expression so it might be erroneously being applied?

Also while looking through this I realized that the filter_push_down optimizer rule might be able to push these filter down to the table scan for us and allow to pass these filter into the IO. I don't recall if there was a specific reason this rule was not added at the time but it might be worth exploring re-adding this rule?

cc: @andygrove

@andygrove
Copy link
Contributor Author

Thanks @ayushdg. I am going to work on this today. I have updated this PR to use the official 14.0.0 release of DataFusion now.

@andygrove andygrove marked this pull request as ready for review November 8, 2022 17:06
@codecov-commenter
Copy link

codecov-commenter commented Nov 8, 2022

Codecov Report

Merging #903 (b9dfc08) into main (c7017a7) will decrease coverage by 2.18%.
The diff coverage is n/a.

@@            Coverage Diff             @@
##             main     #903      +/-   ##
==========================================
- Coverage   75.18%   72.99%   -2.19%     
==========================================
  Files          73       73              
  Lines        3985     3985              
  Branches      713      713              
==========================================
- Hits         2996     2909      -87     
- Misses        829      912      +83     
- Partials      160      164       +4     
Impacted Files Coverage Δ
dask_sql/physical/rel/logical/join.py 80.67% <ø> (ø)
dask_sql/physical/utils/filter.py 77.84% <ø> (ø)
dask_sql/input_utils/hive.py 18.25% <0.00%> (-81.75%) ⬇️
dask_sql/physical/rex/core/literal.py 60.95% <0.00%> (+2.85%) ⬆️
dask_sql/physical/rel/logical/filter.py 84.84% <0.00%> (+3.03%) ⬆️
dask_sql/_version.py 34.74% <0.00%> (+3.38%) ⬆️

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@andygrove
Copy link
Contributor Author

I don't understand this failure with Python 3.8 / mac:

FAILED tests/integration/test_fugue.py::test_fsql - AssertionError: DataFrame are different

DataFrame shape mismatch
[left]:  (0, 1)
[right]: (1, 1)

@ayushdg any ideas?

@ayushdg
Copy link
Collaborator

ayushdg commented Nov 8, 2022

I don't understand this failure with Python 3.8 / mac:

FAILED tests/integration/test_fugue.py::test_fsql - AssertionError: DataFrame are different

DataFrame shape mismatch
[left]:  (0, 1)
[right]: (1, 1)

@ayushdg any ideas?

This is a known flaky test that appears occasionally on Mac and windows 3.8.

It should be safe to ignore for now

@ayushdg
Copy link
Collaborator

ayushdg commented Nov 8, 2022

A bunch of gpu tests seem to be failing though. @charlesbluca Could you take a look if you get the chance?

@charlesbluca
Copy link
Collaborator

charlesbluca commented Nov 9, 2022

Yeah can take a look into this

EDIT:

On first glance, looks like all the failures are query regressions; will dig into them individually, but opened #911 to track adding a check to CI that makes the changes to the logical plan more prominent so that it's easier to pinpoint where the regressions are coming from.

@charlesbluca
Copy link
Collaborator

From a quick glance, it looks like q4, q11, and q74 are failing because we are trying to concat a dask-cudf and dask CPU dataframe together; the underlying cause for this is that new optimizations mean we are beginning to use the EmptyRelation plugin more prominently, and it always creates a CPU dataframe:

return DataContainer(
dd.from_pandas(pd.DataFrame(data, columns=col_names), npartitions=1),
ColumnContainer(col_names),
)

q33, q56, q60, and q83 all seem to be failing when attempting to regenerate the HLG with predicate pushdown, which is a little harder to diagnose - will look into those queries more closely.

@@ -91,7 +91,7 @@ def attempt_predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame:
try:
return dsk.layers[name]._regenerate_collection(
dsk,
new_kwargs={io_layer: {"filters": filters}},
new_kwargs={io_layer: {"filters": filters, "index": False}},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the issues with predicate pushdown were stemming from the automatic setting of an index in read_parquet by default, which this kwarg override should disallow.

Chatting with @rjzamora, we agreed that this shouldn't be the default behavior, so we may be able to remove this override later on when changes are made upstream.

Copy link
Collaborator

@charlesbluca charlesbluca left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andygrove! We can iterate on the optimizer changes once #908 and #914 are resolved

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants