Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet engine-core refactoring #4995

Merged
merged 62 commits into from Jul 19, 2019
Merged

Conversation

rjzamora
Copy link
Member

@rjzamora rjzamora commented Jun 24, 2019

This PR is intended to superseed PR#4336, which was originally driven mostly by @mrocklin and @martindurant (I'm opening a PR just to refresh/reorganize the discussion a bit). There are still 18 tests failing in test_parquet.py (all for fastparquet), but there should be enough progress for healthy discussion/feedback.

The primary goal here is to refactor the parquet interface into distinct core and engine code that will be easier to maintain in the future. The idea is to pull all engine-agnostic code into dask/dask/dataframe/io/parquet/core.py, and then isolate engine specific code in dask/dask/dataframe/io/parquet/arrow.py and dask/dask/dataframe/io/parquet/fastparquet.py (for pyarrow and fastparquet, respectively).

Engine calls during write phase (core.to_parquet):

  1. engine.initialize_write: Preparation for writing/appending
  2. engine.write_partition: Write operation for each partition
  3. engine.write_metadata: Metadata-write operation

Engine calls during read phase (core.read_parquet):

  1. engine.read_metadata: Read metadata and file statistics
  2. engine.read_partition: Read operation for each partition

Another goal of PR#4336 was to limit the responsibilities of engine-specific code. I tried to do this without loosing any significant/useful features. For example, I decided to retain index detection in files with pandas metadata. For simplicity, the index is alway preserved by resetting it before writing (unless write_index is explicitly set to False). The name(s) of the original index column(s) are then passed to the engines, which are expected to use pandas metadata to store the column index (and read it back during read_metadata).

Any and all feedback is welcome :)

  • Tests added / passed
  • Passes flake8 dask

@martindurant
Copy link
Member

Do you have an idea of any changes you need from fastparquet? Or can it all be worked out here if I were to go through the code?

@martindurant
Copy link
Member

@mrocklin , happy to close #4336? I have not yet looked through any of the code here, so I have no comment on whether it is better.

@rjzamora
Copy link
Member Author

Do you have an idea of any changes you need from fastparquet? Or can it all be worked out here if I were to go through the code?

I'm actually not sure about this, but hopefully I will have a better idea tomorrow. You are certainly welcome to dig through the code and make changes (or let me know what changes you suggest)

@mrocklin , happy to close #4336? I have not yet looked through any of the code here, so I have no comment on whether it is better.

I'm realizeing that I did not properly track the earlier changes from #4336 for this PR (due to a bit of a rebasing blunder). So, it would also be fine with me if @mrocklin thinks we should move these changes back over there.

@mrocklin
Copy link
Member

I don't care about which PR we use. I would use whichever is more likely to finish faster :)

@rjzamora
Copy link
Member Author

Update: We are down to 7 failing tests. Here is the grep of FAILED in the pytest output:

95:dask/dataframe/io/tests/test_parquet.py::test_roundtrip[fastparquet-df3-write_kwargs3-read_kwargs3] FAILED
145:dask/dataframe/io/tests/test_parquet.py::test_partition_on[pyarrow-fastparquet] FAILED
150:dask/dataframe/io/tests/test_parquet.py::test_divisions_read_with_filters FAILED
151:dask/dataframe/io/tests/test_parquet.py::test_divisions_are_known_read_with_filters FAILED
157:dask/dataframe/io/tests/test_parquet.py::test_timestamp96 FAILED
180:dask/dataframe/io/tests/test_parquet.py::test_writing_parquet_with_unknown_kwargs[fastparquet] FAILED
189:dask/dataframe/io/tests/test_parquet.py::test_passing_parquetfile FAILED
696:=== 7 failed, 172 passed, 20 xfailed, 5 xpassed, 1 warnings in 36.54 seconds ===

@@ -357,7 +362,9 @@ def get_fs_token_paths(urlpath, mode='rb', num=1, name_function=None,
fs, fs_token = get_fs(protocol, options)
paths = expand_paths_if_needed(paths, mode, num, fs, name_function)

elif isinstance(urlpath, (str, unicode)) or hasattr(urlpath, 'name'):
elif (isinstance(urlpath, (str, unicode)) or
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a new stringify_path helper in dask/bytes/utils.py Does that suffice here?

@@ -37,6 +37,8 @@ def infer_storage_options(urlpath, inherit_storage_options=None):
"host": "node", "port": 123, "path": "/mnt/datasets/test.csv",
"url_query": "q=1", "extra": "value"}
"""
urlpath = str(urlpath) # re, urllib don't support pathlib.Path objects
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stringify_path, though maybe the callers should be doing this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -243,6 +243,9 @@ class _Frame(DaskMethodsMixin, OperatorMethodMixin):
Values along which we partition our blocks on the index
"""
def __init__(self, dsk, name, meta, divisions):
if len(divisions) < 2: # no partitions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not saying this is wrong, but it's a bit surprising to see this in a Parquet refactor. Do you recall what prompted it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines were added in PR#4336 to address cases where an empty dataframe/partition is read in (like test_parquet.py::test_empty). Not sure if there is a more appropriate fix.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean a dataframe without any partitions, I think. Yes, that could happen with parquet.

@TomAugspurger
Copy link
Member

Ahh, was this open when we ran black over the codebase? Sorry about that.

@rjzamora
Copy link
Member Author

@TomAugspurger Not a problem - I can certainly understand the advantages of using black moving forward :)

@rjzamora
Copy link
Member Author

@mrocklin @martindurant - Should we allow read_parquet to accept an existing ParquetFile (which the current parquet api supports)? This is certainly doable, but I just wanted to confirm that this case is actually used/desired.

If "yes" (which I am expecting), do we want to address test_passing_parquetfile as is (in which the actual parquet directory is removed), or does a simple solution like this one suffice?

@martindurant
Copy link
Member

Should we allow read_parquet to accept an existing ParquetFile

I think this is unnecessary now that we have multiple engines and a stronger desire to structure the code well. Anything that can be opened with ParquetFile(.., open_with=) should be openable using a URL alone in the code here.

@rjzamora
Copy link
Member Author

Update: All test_parquet.py tests are passing. However, (1) the latest master branch of pyarrow is required, (2) Not all "new" capabilities are tested for pyarrow.

@mrocklin
Copy link
Member

Update: All test_parquet.py tests are passing

Woot!

@TomAugspurger
Copy link
Member

@martindurant will you have a chance to look through this PR?

@martindurant
Copy link
Member

Will look today. Was waiting for the new commits to stop coming.

@rjzamora
Copy link
Member Author

Thanks @TomAugspurger and @martindurant - Since the refactor is only "working" for pyarrow 0.13.1, my next step (likely this afternoon) will be to add backward compatibility. Code review is still welcome/appreciated at any time of course.

@martindurant
Copy link
Member

@mrocklin , do you think we'll need the compatibility? I imagine it may take more effort than it's worth, and we already disallow 0.13.0.

@TomAugspurger
Copy link
Member

TomAugspurger commented Jun 28, 2019 via email

@martindurant
Copy link
Member

I'd be ok with that

Copy link
Member

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass:
looked over everything except tests and fastprquet module.

@@ -243,6 +243,9 @@ class _Frame(DaskMethodsMixin, OperatorMethodMixin):
Values along which we partition our blocks on the index
"""
def __init__(self, dsk, name, meta, divisions):
if len(divisions) < 2: # no partitions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean a dataframe without any partitions, I think. Yes, that could happen with parquet.

import pyarrow as pa
import pyarrow.parquet as pq
from ....delayed import delayed
from ....bytes.core import get_pyarrow_filesystem
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forewarning: when we switch to fsspec, all filesystem implementations will already be subclasses of pyarrow's (if installed)

dask/dataframe/io/parquet/arrow.py Outdated Show resolved Hide resolved
fs, paths, categories=None, index=None, gather_statistics=None, **kwargs
):

# In pyarrow, the physical storage field names may differ from
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pandas-derived parquet (also true for fastparquet)

dask/dataframe/io/parquet/arrow.py Outdated Show resolved Hide resolved

def apply_filters(parts, statistics, filters):
""" Apply filters onto parts/statistics pairs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(copied from fastparquet; not sure where in Dask this is documented)

index_names = user_index
if set(column_names).intersection(index_names):
raise ValueError(
"Specified index and column names must not " "intersect"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same type and thoughts. Which copy gets triggered, probably not both.

]

# Need to reconcile storage and real names. These will differ for
# pyarrow, which uses __index_leveL_d__ for the storage name of indexes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also work for fastparquet - to be tested

dask/dataframe/io/parquet/utils.py Show resolved Hide resolved
dask/dataframe/optimize.py Outdated Show resolved Hide resolved
@rjzamora
Copy link
Member Author

Hmm the new test_s3.py failures may have something to do with the s3fs conda package being updated to 0.3.0 just hours ago...

@martindurant
Copy link
Member

Possibly, but all tests are passing in #5064 is also using the same version. Your help diagnosing would be greatly appreciated.

@martindurant
Copy link
Member

Actually, if #5064 is to be merged soon, then the best action would be to merge/rebase from it (which did affect the old parquet code slightly).

@rjzamora
Copy link
Member Author

rjzamora commented Jul 18, 2019

Actually, if #5064 is to be merged soon, then the best action would be to merge/rebase from it (which did affect the old parquet code slightly).

@martindurant This sounds good to me - merging in your fsspec branch seems to fix the failed tests on my local machine. If all goes well, we should be able to merge this after #5064

@rjzamora rjzamora changed the title [WIP] Parquet engine-core refactoring Parquet engine-core refactoring Jul 18, 2019
@rjzamora rjzamora mentioned this pull request Jul 18, 2019
2 tasks
@martindurant
Copy link
Member

Sorry, merge wasn't quite clean (probably because it was merged)

@rjzamora
Copy link
Member Author

No problem - I can clean this up tonight.

@martindurant
Copy link
Member

I think the new failures are fixed in #5056

@rjzamora
Copy link
Member Author

I think the new failures are fixed in #5056

That is what I'm hoping :) - I am assuming I should wait for that to go through before bothering to rebase/merge here again?

@martindurant
Copy link
Member

I expect yes. @TomAugspurger ?

@martindurant
Copy link
Member

A hearty +1 from me, as soon as fixes are merged in

@martindurant
Copy link
Member

#5056 was merged

@rjzamora
Copy link
Member Author

Travis tests are all passing, but that pesky partd problem is still there in appveyor :/

@martindurant
Copy link
Member

... here we go

@martindurant martindurant merged commit a53d45e into dask:master Jul 19, 2019
@rjzamora
Copy link
Member Author

Thanks @martindurant!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants