Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure filepath exists in to_parquet #8057

Merged
merged 1 commit into from
Aug 24, 2021

Conversation

jrbourbeau
Copy link
Member

This is a possible solution for #8056. Though from my comment in that issue

This makes me think there's a mismatch between how fsspec filesystems behave

we may want a more upstream solution in s3fs and/or fsspec (cc @martindurant). Also, given I've only been able to trigger this issue using S3 remote files so far, any ideas on how we might add a test are welcome

@jrbourbeau
Copy link
Member Author

Interestingly dask/dataframe/io/tests/test_parquet.py::test_create_metadata_file[pyarrow-dataset-fastparquet-a] is failing due to a DataFrame shape mismatch in one CI build (full traceback below). Though looking at the test itself I don't think it's related to the changes here as that test doesn't specify overwrite=True.

Full traceback:
2021-08-17T22:44:03.7164386Z ___________ test_create_metadata_file[pyarrow-dataset-fastparquet-a] ___________
2021-08-17T22:44:03.7165991Z [gw0] linux -- Python 3.8.10 /usr/share/miniconda3/envs/test-environment/bin/python
2021-08-17T22:44:03.7166803Z 
2021-08-17T22:44:03.7167967Z tmpdir = '/tmp/pytest-of-runner/pytest-0/popen-gw0/test_create_metadata_file_pyar1'
2021-08-17T22:44:03.7169778Z write_engine = 'pyarrow-dataset', read_engine = 'fastparquet'
2021-08-17T22:44:03.7170803Z partition_on = 'a'
2021-08-17T22:44:03.7171314Z 
2021-08-17T22:44:03.7171896Z     @PYARROW_MARK
2021-08-17T22:44:03.7172920Z     @pytest.mark.parametrize("partition_on", [None, "a"])
2021-08-17T22:44:03.7173827Z     @write_read_engines()
2021-08-17T22:44:03.7174757Z     def test_create_metadata_file(tmpdir, write_engine, read_engine, partition_on):
2021-08-17T22:44:03.7175711Z         tmpdir = str(tmpdir)
2021-08-17T22:44:03.7176313Z     
2021-08-17T22:44:03.7176967Z         # Write ddf without a _metadata file
2021-08-17T22:44:03.7177813Z         df1 = pd.DataFrame({"b": range(100), "a": ["A", "B", "C", "D"] * 25})
2021-08-17T22:44:03.7178620Z         df1.index.name = "myindex"
2021-08-17T22:44:03.7179443Z         ddf1 = dd.from_pandas(df1, npartitions=10)
2021-08-17T22:44:03.7180198Z         ddf1.to_parquet(
2021-08-17T22:44:03.7180827Z             tmpdir,
2021-08-17T22:44:03.7181511Z             write_metadata_file=False,
2021-08-17T22:44:03.7182257Z             partition_on=partition_on,
2021-08-17T22:44:03.7183002Z             engine=write_engine,
2021-08-17T22:44:03.7184369Z         )
2021-08-17T22:44:03.7184874Z     
2021-08-17T22:44:03.7185446Z         # Add global _metadata file
2021-08-17T22:44:03.7186085Z         if partition_on:
2021-08-17T22:44:03.7186882Z             fns = glob.glob(os.path.join(tmpdir, partition_on + "=*/*.parquet"))
2021-08-17T22:44:03.7187644Z         else:
2021-08-17T22:44:03.7188338Z             fns = glob.glob(os.path.join(tmpdir, "*.parquet"))
2021-08-17T22:44:03.7189208Z         dd.io.parquet.create_metadata_file(
2021-08-17T22:44:03.7189896Z             fns,
2021-08-17T22:44:03.7190434Z             engine="pyarrow",
2021-08-17T22:44:03.7191122Z             split_every=3,  # Force tree reduction
2021-08-17T22:44:03.7191863Z         )
2021-08-17T22:44:03.7192855Z     
2021-08-17T22:44:03.7193451Z         # Check that we can now read the ddf
2021-08-17T22:44:03.7194163Z         # with the _metadata file present
2021-08-17T22:44:03.7194842Z         ddf2 = dd.read_parquet(
2021-08-17T22:44:03.7195412Z             tmpdir,
2021-08-17T22:44:03.7196016Z             gather_statistics=True,
2021-08-17T22:44:03.7196676Z             split_row_groups=False,
2021-08-17T22:44:03.7197303Z             engine=read_engine,
2021-08-17T22:44:03.7198349Z             index="myindex",  # python-3.6 CI
2021-08-17T22:44:03.7198945Z         )
2021-08-17T22:44:03.7199476Z         if partition_on:
2021-08-17T22:44:03.7200082Z             ddf1 = df1.sort_values("b")
2021-08-17T22:44:03.7200788Z             ddf2 = ddf2.compute().sort_values("b")
2021-08-17T22:44:03.7201541Z             ddf2.a = ddf2.a.astype("object")
2021-08-17T22:44:03.7202192Z >       assert_eq(ddf1, ddf2)
2021-08-17T22:44:03.7202625Z 
2021-08-17T22:44:03.7203333Z dask/dataframe/io/tests/test_parquet.py:3384: 
2021-08-17T22:44:03.7204024Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
2021-08-17T22:44:03.7204451Z 
2021-08-17T22:44:03.7204917Z a =           b  a
2021-08-17T22:44:03.7205426Z myindex       
2021-08-17T22:44:03.7205927Z 0         0  A
2021-08-17T22:44:03.7206385Z 1         1  B
2021-08-17T22:44:03.7206854Z 2         2  C
2021-08-17T22:44:03.7207330Z 3         3  D
2021-08-17T22:44:03.7207809Z 4         4  A
2021-08-17T22:44:03.7208280Z ...      .. ..
2021-08-17T22:44:03.7208752Z 95       95  D
2021-08-17T22:44:03.7209211Z 96       96  A
2021-08-17T22:44:03.7209661Z 97       97  B
2021-08-17T22:44:03.7210128Z 98       98  C
2021-08-17T22:44:03.7210594Z 99       99  D
2021-08-17T22:44:03.7210940Z 
2021-08-17T22:44:03.7211451Z [100 rows x 2 columns]
2021-08-17T22:44:03.7211976Z b =           b  a
2021-08-17T22:44:03.7212598Z myindex       
2021-08-17T22:44:03.7213100Z 0         0  A
2021-08-17T22:44:03.7213555Z 1         1  B
2021-08-17T22:44:03.7214047Z 2         2  C
2021-08-17T22:44:03.7214818Z 3         3  D
2021-08-17T22:44:03.7215395Z 4         4  A
2021-08-17T22:44:03.7217661Z ...      .. ..
2021-08-17T22:44:03.7218102Z 95       95  D
2021-08-17T22:44:03.7218450Z 96       96  A
2021-08-17T22:44:03.7218784Z 97       97  B
2021-08-17T22:44:03.7219130Z 98       98  C
2021-08-17T22:44:03.7219462Z 99       99  D
2021-08-17T22:44:03.7219711Z 
2021-08-17T22:44:03.7220092Z [99 rows x 2 columns]
2021-08-17T22:44:03.7220702Z check_names = True, check_dtype = True, check_divisions = True
2021-08-17T22:44:03.7221356Z check_index = True, kwargs = {}
2021-08-17T22:44:03.7221690Z 
2021-08-17T22:44:03.7222054Z     def assert_eq(
2021-08-17T22:44:03.7222443Z         a,
2021-08-17T22:44:03.7222775Z         b,
2021-08-17T22:44:03.7223184Z         check_names=True,
2021-08-17T22:44:03.7223640Z         check_dtype=True,
2021-08-17T22:44:03.7224130Z         check_divisions=True,
2021-08-17T22:44:03.7224607Z         check_index=True,
2021-08-17T22:44:03.7225046Z         **kwargs,
2021-08-17T22:44:03.7225397Z     ):
2021-08-17T22:44:03.7225806Z         if check_divisions:
2021-08-17T22:44:03.7226297Z             assert_divisions(a)
2021-08-17T22:44:03.7226802Z             assert_divisions(b)
2021-08-17T22:44:03.7227429Z             if hasattr(a, "divisions") and hasattr(b, "divisions"):
2021-08-17T22:44:03.7228204Z                 at = type(np.asarray(a.divisions).tolist()[0])  # numpy to python
2021-08-17T22:44:03.7229033Z                 bt = type(np.asarray(b.divisions).tolist()[0])  # scalar conversion
2021-08-17T22:44:03.7229687Z                 assert at == bt, (at, bt)
2021-08-17T22:44:03.7230195Z         assert_sane_keynames(a)
2021-08-17T22:44:03.7230708Z         assert_sane_keynames(b)
2021-08-17T22:44:03.7231399Z         a = _check_dask(a, check_names=check_names, check_dtypes=check_dtype)
2021-08-17T22:44:03.7232224Z         b = _check_dask(b, check_names=check_names, check_dtypes=check_dtype)
2021-08-17T22:44:03.7232990Z         if not check_index:
2021-08-17T22:44:03.7233505Z             a = a.reset_index(drop=True)
2021-08-17T22:44:03.7234030Z             b = b.reset_index(drop=True)
2021-08-17T22:44:03.7234564Z         if hasattr(a, "to_pandas"):
2021-08-17T22:44:03.7235727Z             a = a.to_pandas()
2021-08-17T22:44:03.7236212Z         if hasattr(b, "to_pandas"):
2021-08-17T22:44:03.7236676Z             b = b.to_pandas()
2021-08-17T22:44:03.7237215Z         if isinstance(a, pd.DataFrame):
2021-08-17T22:44:03.7237735Z             a = _maybe_sort(a)
2021-08-17T22:44:03.7238176Z             b = _maybe_sort(b)
2021-08-17T22:44:03.7238804Z >           tm.assert_frame_equal(a, b, check_dtype=check_dtype, **kwargs)
2021-08-17T22:44:03.7239592Z E           AssertionError: DataFrame are different
2021-08-17T22:44:03.7240143Z E           
2021-08-17T22:44:03.7240587Z E           DataFrame shape mismatch
2021-08-17T22:44:03.7241076Z E           [left]:  (100, 2)
2021-08-17T22:44:03.7241476Z E           [right]: (99, 2)
2021-08-17T22:44:03.7241758Z 
2021-08-17T22:44:03.7242279Z dask/dataframe/utils.py:555: AssertionError

@martindurant
Copy link
Member

It lost one row from somewhere in the middle of the dataframe??

@jrbourbeau
Copy link
Member Author

Yeah, that's what it seems like. Since this is unrelated to the changes in this PR I opened a separate issue to track the test failure #8062.

Do you have any thoughts on why s3fs is behaving differently than the local filesystem? Should we expect isdir to raise an error or return False when it encounters a nonexistent path?

@jrbourbeau
Copy link
Member Author

Perhaps we should try to follow os.path and return False

In [1]: import os

In [2]: os.path.isdir("this/does/not/exist/")
Out[2]: False

@martindurant
Copy link
Member

isdir should be False where a path is a file or does not exist. It may raise if the bucket cannot be listed at all because of permissions (or because the bucket doesn't exist, which looks the same). Both exists and isdir have custom implementation in s3fs, so worth checking.

I can conceive that you may, because of a bug, get different behaviour depending on whether the dir was previously listed and cached.

@jrbourbeau
Copy link
Member Author

In light of #8056 (comment) I'd like to merge this PR. @martindurant do you have any objections or an alternative you would prefer?

@martindurant
Copy link
Member

Adding the extra check should not hurt - so feel free to include. However I will revisit the logic in s3 maybe today and fix from the s3fs-pure reproducer you made.

@jrbourbeau
Copy link
Member Author

That sounds great

@jrbourbeau jrbourbeau merged commit ab69fe1 into dask:main Aug 24, 2021
@jrbourbeau jrbourbeau deleted the to-parquet-overwrite branch August 24, 2021 15:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants