Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue reading parquet using pyarrow #59

Closed
andersbogsnes opened this issue May 25, 2020 · 15 comments
Closed

Issue reading parquet using pyarrow #59

andersbogsnes opened this issue May 25, 2020 · 15 comments

Comments

@andersbogsnes
Copy link
Contributor

andersbogsnes commented May 25, 2020

When trying to read parquet files using Dask==2.15.0 and adlfs==0.3.0 I got exceptions that I didn't have before.

I boiled it down to the following example:
Using Azurite:

>>> import dask.dataframe as dd
>>> import pandas as pd
>>> from azure.storage.blob import BlobServiceClient
>>> conn_str = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey" \
         "=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr" \
           "/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
>>> client: BlobServiceClient = BlobServiceClient.from_connection_string(conn_str)
>>> client.create_container("test")
>>> df = pd.DataFrame(
    {
        "col1": [1, 2, 3, 4],
        "col2": [2, 4, 6, 8],
        "index_key": [1, 1, 2, 2],
        "partition_key": [1, 1, 2, 2],
    }
)
>>> STORAGE_OPTIONS = {"account_name": "devstoreaccount1", "connection_string": conn_str}
>>> dask_dataframe = dd.from_pandas(df, npartitions=1)
>>> dask_dataframe.to_parquet(
    "abfs://test/test_group",
    storage_options=STORAGE_OPTIONS,
    engine="pyarrow"
)
>>> data_out = dd.read_parquet("abfs://test/test_group",
                           engine="pyarrow",
                           storage_options=STORAGE_OPTIONS)
Traceback (most recent call last):
  File "<input>", line 1, in <module>
  File "/home/anders/.local/share/JetBrains/Toolbox/apps/PyCharm-P/ch-0/201.7223.92/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
    pydev_imports.execfile(filename, global_vars, local_vars)  # execute the script
  File "/home/anders/.local/share/JetBrains/Toolbox/apps/PyCharm-P/ch-0/201.7223.92/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "/home/anders/.config/JetBrains/PyCharm2020.1/scratches/scratch.py", line 34, in <module>
    data_out = dd.read_parquet("abfs://test/test_group",
  File "/home/anders/.pyenv/versions/feature_env/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 225, in read_parquet
    meta, statistics, parts = engine.read_metadata(
  File "/home/anders/.pyenv/versions/feature_env/lib/python3.8/site-packages/dask/dataframe/io/parquet/arrow.py", line 267, in read_metadata
    parts, dataset = _determine_dataset_parts(
  File "/home/anders/.pyenv/versions/feature_env/lib/python3.8/site-packages/dask/dataframe/io/parquet/arrow.py", line 198, in _determine_dataset_parts
    dataset = pq.ParquetDataset(paths, filesystem=fs, **dataset_kwargs)
  File "/home/anders/.pyenv/versions/feature_env/lib/python3.8/site-packages/pyarrow/parquet.py", line 1171, in __init__
    self.metadata_path) = _make_manifest(
  File "/home/anders/.pyenv/versions/feature_env/lib/python3.8/site-packages/pyarrow/parquet.py", line 1367, in _make_manifest
    raise OSError('Passed non-file path: {}'
OSError: Passed non-file path: test/test_group

Using fastparquet instead solves the problem - but last I tried, fastparquet didn't handle my filters properly, so I would prefer to be able to use pyarrow

@martindurant
Copy link
Member

This exception seems to follow checks for fs.isdir(path) and fs.isfile(path). Do you have a chance to check whether the behaviour of those functions has changed for your path with the newer version of adlfs?
Given that the attempt to read immediately follows the write, can you try eliminating any caching effects by doing (I think)

import adlfs
adlfs.AzureDatalakeFileSystem.clear_instance_cache()

@andersbogsnes
Copy link
Contributor Author

andersbogsnes commented May 25, 2020

Sure!
Clearing cache had no effect - I assume you mean AzureBlobFileSystem, though neither has an effect.

I've had some issues with ls - continuing the snippet from above:

>>> fs = adlfs.AzureBlobFileSystem(**STORAGE_OPTIONS)
>>> fs.ls("test")
Traceback (most recent call last):
  File "/home/anders/.pyenv/versions/feature_env/lib/python3.8/site-packages/adlfs/core.py", line 518, in ls
    elif len(blobs) == 1 and blobs[0]["blob_type"] == "BlockBlob":
  File "/home/anders/.pyenv/versions/feature_env/lib/python3.8/site-packages/azure/storage/blob/_shared/models.py", line 191, in __getitem__
    return self.__dict__[key]
KeyError: 'blob_type'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "<input>", line 1, in <module>
  File "/home/anders/.pyenv/versions/feature_env/lib/python3.8/site-packages/adlfs/core.py", line 534, in ls
    raise FileNotFoundError(f"File {path} does not exist!!")
FileNotFoundError: File  does not exist!!

But there is data in the container:

fs.ls("test/test_group")
['test/test_group/partition_key=1/', 'test/test_group/partition_key=2/', 'test/test_group/_common_metadata', 'test/test_group/_metadata', 'test/test_group/part.0.parquet']

Previous behaviour (just verified by rolling back)

>>> fs.ls("test")
['test/test_group/']

Interestingly, both fs.isdir and fs.isfile return False

>>> fs.isfile("test/test_group")
False
>>> fs.isdir("test/test_group")
False

Previous behaviour

>>> fs.isdir("test/test_group")
True
>>> fs.isfile("test/test_group")
False

Which probably is not good 😄

@hayesgb
Copy link
Collaborator

hayesgb commented May 25, 2020

Do you observe this behavior with adlfs<0.3.0? The update to 0.3 migrates to Azure storage v12 from v2.0. It would be helpful to know if its related to this change.

@andersbogsnes
Copy link
Contributor Author

@hayesgb You're fast 😄 Just updated my reply with equivalent behaviour from v0.2.4

@hayesgb
Copy link
Collaborator

hayesgb commented May 25, 2020

LOL. I just sat down at my computer and saw this. I just added a branch "isfile_tests" that checks to verify if files and directories in the top level directory are identified properly. These pass, and can be found under test_core.py:

def test_isdir(storage):
    fs = adlfs.AzureBlobFileSystem(
        account_name=storage.account_name, connection_string=CONN_STR
    )
    assert fs.isdir("data") is True
    assert fs.isdir("data/root") is True
    assert fs.isdir("data/top_file.txt") is False


def test_isfile(storage):
    fs = adlfs.AzureBlobFileSystem(
        account_name=storage.account_name, connection_string=CONN_STR
    )
    assert fs.isfile("data") is False
    assert fs.isfile("data/root") is False
    assert fs.isfile("data/top_file.txt") is True

Two questions. 1) Any chance you can add a failing example, and 2) Can you share the versions of pyarrow, fastparquet, and fsspec you are using now vs what was working previously?

@andersbogsnes
Copy link
Contributor Author

andersbogsnes commented May 25, 2020

Sure - with a "pure" AzureBlobFileSystem (essentially copying your test) works fine - it seems the issue is with the way dask.dataframe.to_parquet writes the data when using the abfs backend.

So a complete failing example would be (again, using Azurite):

Versions:
adlfs=0.3.0
pandas = 1.0.3
azure-storage-blob=12.3.1
dask=2.16.0
fsspec=0.7.4
pyarrow=0.17.1
fastparquet=0.4.0

import dask.dataframe as dd
import pandas as pd
import adlfs
from azure.storage.blob import BlobServiceClient

conn_str = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey" \
           "=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr" \
           "/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"

STORAGE_OPTIONS = {"account_name": "devstoreaccount1",
                   "connection_string": conn_str}

client: BlobServiceClient = BlobServiceClient.from_connection_string(conn_str)
container_client = client.create_container("test")

df = pd.DataFrame(
    {
        "col1": [1, 2, 3, 4],
        "col2": [2, 4, 6, 8],
        "index_key": [1, 1, 2, 2],
        "partition_key": [1, 1, 2, 2],
    }
)

dask_dataframe = dd.from_pandas(df, npartitions=1)

dask_dataframe.to_parquet(
    "abfs://test/test_group",
    storage_options=STORAGE_OPTIONS,
    engine="pyarrow",
)

fs = adlfs.AzureBlobFileSystem(**STORAGE_OPTIONS)
fs.ls("test")

Traceback (most recent call last):
  File "/home/anders/.pyenv/versions/feature_env/lib/python3.8/site-packages/adlfs/core.py", line 518, in ls
    elif len(blobs) == 1 and blobs[0]["blob_type"] == "BlockBlob":
  File "/home/anders/.pyenv/versions/feature_env/lib/python3.8/site-packages/azure/storage/blob/_shared/models.py", line 191, in __getitem__
    return self.__dict__[key]
KeyError: 'blob_type'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "<input>", line 1, in <module>
  File "/home/anders/.pyenv/versions/feature_env/lib/python3.8/site-packages/adlfs/core.py", line 534, in ls
    raise FileNotFoundError(f"File {path} does not exist!!")
FileNotFoundError: File  does not exist!!

Should this be a Dask issue instead? Seems like to_parquet is writing files in an unexpected manner, which causes errors when trying to read them again.

@martindurant
Copy link
Member

Can you tell whether files have indeed been created in the blob container?

@andersbogsnes
Copy link
Contributor Author

andersbogsnes commented May 25, 2020

Yes, I can do the following:

>>> fs.ls("test/test_group")
['test/test_group/_common_metadata', 'test/test_group/_metadata', 'test/test_group/part.0.parquet']

I have also "manually" confirmed by checking with the BlobStoreClient directly

@martindurant
Copy link
Member

If ls("test/test_group") works, but ls("test") does not, because it apparently tries to list the root/empty path, then the problem does lie here. Why dask should need to list the parent directory is another matter.

@andersbogsnes
Copy link
Contributor Author

andersbogsnes commented May 25, 2020

I think the issue arises, because dask is trying to do fs.isdir("test/test_group") which returns False, which is new behaviour.

Digging into the adlfs/fsspec code, this is probably because fs.info("test/test_group") also raises the same error as fs.ls("test") which makes sense, since fs.info delegating to AbstractFileSystem, which calls ls: https://github.com/intake/filesystem_spec/blob/master/fsspec/spec.py#L538

What I don't understand is why ls is getting screwed up by dask writing a parquet file to it...

@andersbogsnes
Copy link
Contributor Author

Sorry to resurrect, but just wanted to know if this example was reproducible or if I'm the only one having the problem?

@hayesgb
Copy link
Collaborator

hayesgb commented Jun 14, 2020

I'm looking into it.

@hayesgb
Copy link
Collaborator

hayesgb commented Jun 14, 2020

I believe I have a fix for this. In some instances, Azure Blob Filesystem will return an ItemPaged iterator instead of a BlobPrefix. The scenario you were seeing appears to be one of those instances, so it wasn't being picked up. I'm going to do a push to master. Any chance you can take a look and see if it fixes your issue?

@andersbogsnes
Copy link
Contributor Author

@hayesgb That's great news! My test case is working fine now, looks like you found the issue 👍

@hayesgb
Copy link
Collaborator

hayesgb commented Jun 15, 2020

Sounds great. I've released this in 0.3.1.

@hayesgb hayesgb closed this as completed Jun 15, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants