Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-16413: [Python] Certain dataset APIs hang with a python filesystem #13033

Closed

Conversation

jorisvandenbossche
Copy link
Member

No description provided.

@github-actions
Copy link

@github-actions
Copy link

⚠️ Ticket has not been started in JIRA, please click 'Start Progress'.

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, not sure what's going on with the CI failures - they seem unrelated.

python/pyarrow/tests/test_dataset.py Show resolved Hide resolved
CExpression c_partition_expression = partition_expression.unwrap()

with nogil:
c_result = self.format.MakeFragment(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't able to write a test that needs this change for make_fragment. But I also suppose it doesn't hurt to add the with nogil ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If MakeFragment is self-contained and doesn't call into arbitrary IO code then it shouldn't be necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, it seems that MakeFragment just takes the input path / source and filesystem and puts that in a FileFragment, and for example doesn't actually check the file for the schema (that is only done the first time when accessing the schema in ReadPhysicalSchema, and that part in cython is correctly releasing the GIL)

@pitrou
Copy link
Member

pitrou commented May 3, 2022

Did you find out where the code was hanging previously?

pq.write_table(table, out)

# read using fsspec filesystem
import s3fs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test get skipped if s3fs is not installed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, need to add some skips for this

def _create_parquet_dataset_simple(root_path, filesystem=None):
"""
Creates a simple (flat files, no nested partitioning) Parquet dataset
"""
metadata_collector = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If metadata_collector is an empty list then metadata.append_row_groups is never called above? Am I reading this wrong?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This list gets passed to (and populated by) write_to_dataset a few lines below. It's not the greatest API, but that is how it is currently needs to be done.

Comment on lines 3074 to 3145
metadata_path = str(root_path / '_metadata')
metadata_path = str(root_path) + '/_metadata'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the same thing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For S3, we are not using a pathlib.Path anymore, so the / version doesn't work. I should maybe use os.path.join to make it more robust though instead of hardcoding the /

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, no, if it's an abstract path then it needs / indeed.

metadata_path, table = _create_parquet_dataset_simple(root_path, fs)

# read using fsspec filesystem
import s3fs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question re: skipping.

@kszucs
Copy link
Member

kszucs commented May 3, 2022

The issue was caused by the gil being held?

@lidavidm
Copy link
Member

lidavidm commented May 3, 2022

Basically: the main thread in Python would call Inspect() without releasing the GIL; the Parquet Inspect() implementation would kick off a file read in a background thread and block, waiting for the future to complete. The background thread would try to call into Python to complete the read but would get stuck acquiring the GIL since the main thread was still holding it.

Short term we can release the GIL, longer term it might be nice if we could avoid a background thread for synchronous situations like this. (Without having to duplicate all code paths.)

@pytest.mark.parquet
@pytest.mark.s3
def test_file_format_inspect_fsspec(s3_filesystem):
# https://issues.apache.org/jira/browse/ARROW-16413
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simply use a fsspec local filesystem to avoid the S3 scaffolding?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just trying exactly the same :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it does work (I checked the test hangs with a local fs as well, before applying the fix), with the caveat that I need to manually construct the PyFileSystem with FSSpecHandler, because if you pass an actual fsspec local filesystem, we internally convert it into a arrow native local filesystem:

arrow/python/pyarrow/fs.py

Lines 109 to 113 in 7a0f00c

if isinstance(filesystem, fsspec.AbstractFileSystem):
if type(filesystem).__name__ == 'LocalFileSystem':
# In case its a simple LocalFileSystem, use native arrow one
return LocalFileSystem(use_mmap=use_mmap)
return PyFileSystem(FSSpecHandler(filesystem))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's actually also the reason that our nightly integration tests with dask didn't catch this, because we only were running the main parquet tests that didn't use S3 but only a local filesystem

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, actually, we can perhaps even remove fsspec out of the equation and instead use PyFileSystem(ProxyHandler(LocalFileSystem()))?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's something I tried before, and apparently that isn't sufficient to trigger it (I was expecting that just needing to call into python would be sufficient)

@jorisvandenbossche
Copy link
Member Author

@github-actions crossbow submit test-conda-python-3.9-dask-latest test-conda-python-3.9-dask-master

@github-actions
Copy link

github-actions bot commented May 3, 2022

Revision: e9c308e

Submitted crossbow builds: ursacomputing/crossbow @ actions-1997

Task Status
test-conda-python-3.9-dask-latest Github Actions
test-conda-python-3.9-dask-master Github Actions

@jorisvandenbossche
Copy link
Member Author

jorisvandenbossche commented May 3, 2022

I browsed through our dataset cython code, and I think there are no other remaining cases where there is filesystem interaction without releasing the GIL.
I was only doubting a bit about:

def to_reader(self):
"""Consume this scanner as a RecordBatchReader.
Returns
-------
RecordBatchReader
"""
cdef RecordBatchReader reader
reader = RecordBatchReader.__new__(RecordBatchReader)
reader.reader = GetResultValue(self.scanner.ToRecordBatchReader())
return reader

But my understanding is that this constructor itself doesn't yet read anything, and it's only when consuming the reader that this happens (read_next_batch, which releases the gil).

@jorisvandenbossche
Copy link
Member Author

@github-actions crossbow submit test-conda-python-3.9-dask-latest test-conda-python-3.9-dask-master

@github-actions
Copy link

github-actions bot commented May 3, 2022

Revision: cd24003

Submitted crossbow builds: ursacomputing/crossbow @ actions-2000

Task Status
test-conda-python-3.9-dask-latest Github Actions
test-conda-python-3.9-dask-master Github Actions

@jorisvandenbossche
Copy link
Member Author

Hmm, the AppVeyor failure is actually not unrelated at the moment:

____________________ test_parquet_dataset_factory_fsspec _____________________
tempdir = WindowsPath('C:/Users/appveyor/AppData/Local/Temp/1/pytest-of-appveyor/pytest-0/test_parquet_dataset_factory_f0')
    @pytest.mark.parquet
    def test_parquet_dataset_factory_fsspec(tempdir):
        # https://issues.apache.org/jira/browse/ARROW-16413
        fsspec = pytest.importorskip("fsspec")
    
        # create dataset with pyarrow
        root_path = tempdir / "test_parquet_dataset"
        metadata_path, table = _create_parquet_dataset_simple(root_path)
    
        # read using fsspec filesystem
        fsspec_fs = fsspec.filesystem("file")
        # manually creating a PyFileSystem, because passing the local fsspec
        # filesystem would internally be converted to native LocalFileSystem
        filesystem = fs.PyFileSystem(fs.FSSpecHandler(fsspec_fs))
        dataset = ds.parquet_dataset(metadata_path, filesystem=filesystem)
        assert dataset.schema.equals(table.schema)
        assert len(dataset.files) == 4
>       result = dataset.to_table()
pyarrow\tests\test_dataset.py:3140: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
pyarrow\_dataset.pyx:304: in pyarrow._dataset.Dataset.to_table
    return self.scanner(**kwargs).to_table()
pyarrow\_dataset.pyx:2549: in pyarrow._dataset.Scanner.to_table
    return pyarrow_wrap_table(GetResultValue(result))
pyarrow\error.pxi:144: in pyarrow.lib.pyarrow_internal_check_status
    return check_status(status)
pyarrow\_fs.pyx:1190: in pyarrow._fs._cb_open_input_file
    stream = handler.open_input_file(frombytes(path))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <pyarrow.fs.FSSpecHandler object at 0x000002CA4A397D70>
path = 'f07c005726c84dc69f13ce79116d3304-0.parquet'
    def open_input_file(self, path):
        from pyarrow import PythonFile
    
        if not self.fs.isfile(path):
>           raise FileNotFoundError(path)
E           FileNotFoundError: f07c005726c84dc69f13ce79116d3304-0.parquet
pyarrow\fs.py:400: FileNotFoundError

@jorisvandenbossche
Copy link
Member Author

To get this finalized:

  • The test_parquet_dataset_factory_fsspec is failing on Windows (Appveyor). It seems that the file listing of the dataset is using file paths relative to the root of the dataset folder (where the _metadata file lives). And thus when trying to read it, it gives a FileNotFound error (since those files are located in some temporary directory, and relative paths don't work).
    While if I test this locally, I properly get absolute paths (also when using fsspec). And also with s3 instead of local filesystem (earlier on this PR), the windows tests were passing.
    We are already normalizing the metadata path inside ds.parquet_dataset(..) (and inside FileFromRowGroup in C++, where we combine the path defined in the _metadata file with the root path), so my assumption is that this is some issue with the fsspec filesystem on Windows.
    So maybe we can skip this test for now for Windows, and open a follow-up JIRA for investigating it further? (and potentially open upstream issue) In practice users shouldn't run into this failure, as internally we translate a local fsspec filesystem to a native one.
  • The dask integration tests are still failing, because of the test_s3.py tests I added (the moto server timed out). I ensure locally that those are passing now, so this can probably also be done as a follow-up.

@jorisvandenbossche
Copy link
Member Author

Although based on this comment (another parquet_dataset related test, and one that uses a PyFileSystem as well, but wrapping our own LocalFileSystem to add some logging in the middle), it seems we got similar issues in the past, and it might not necessarily be related to fsspec:

# FIXME(bkietz) on Windows this results in FileNotFoundErrors.
# but actually scanning does open files
# with assert_opens([f.path for f in fragments]):
# dataset.to_table()

@kszucs
Copy link
Member

kszucs commented May 3, 2022

@jorisvandenbossche what the status of this PR? Do we expect the crossbow builds to pass?

@jorisvandenbossche
Copy link
Member Author

Yes, they should pass now (because I reverted the changes to it, which caused the failures above, see second bullet point in summary at #13033 (comment)). But will trigger them again to be sure.

@jorisvandenbossche
Copy link
Member Author

@github-actions crossbow submit test-conda-python-3.9-dask-latest test-conda-python-3.9-dask-master

@jorisvandenbossche
Copy link
Member Author

And if we are fine with skipping the test on Windows for now (see #13033 (comment)), I think this is ready to go.

@github-actions
Copy link

github-actions bot commented May 3, 2022

Revision: 1bca56e

Submitted crossbow builds: ursacomputing/crossbow @ actions-2001

Task Status
test-conda-python-3.9-dask-latest Github Actions
test-conda-python-3.9-dask-master Github Actions

@lidavidm
Copy link
Member

lidavidm commented May 3, 2022

Should we file follow-up issues for the Windows issues? If there's already a TODO in the code it would be good to link it to a Jira for future investigation

@jorisvandenbossche
Copy link
Member Author

Yes, I will open follow-up JIRAs

Copy link
Member

@kszucs kszucs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Joris!

@kszucs kszucs closed this in d897716 May 3, 2022
kszucs pushed a commit that referenced this pull request May 3, 2022
Closes #13033 from jorisvandenbossche/ARROW-16413

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Krisztián Szűcs <szucs.krisztian@gmail.com>
@jorisvandenbossche jorisvandenbossche deleted the ARROW-16413 branch May 4, 2022 13:44
@jorisvandenbossche
Copy link
Member Author

As follow-ups, I opened:

  • [ARROW-16458] [Python] Run S3 tests in the nightly dask integration build
  • [ARROW-16460] [Python] Some dataset tests using PyFileSystem are failing on Windows

@ursabot
Copy link

ursabot commented May 7, 2022

Benchmark runs are scheduled for baseline = 26f2d87 and contender = d897716. d897716 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Finished ⬇️0.27% ⬆️0.0%] test-mac-arm
[Finished ⬇️0.36% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️0.2% ⬆️0.0%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] d8977165 ec2-t3-xlarge-us-east-2
[Finished] d8977165 test-mac-arm
[Finished] d8977165 ursa-i9-9960x
[Finished] d8977165 ursa-thinkcentre-m75q
[Finished] 26f2d877 ec2-t3-xlarge-us-east-2
[Finished] 26f2d877 test-mac-arm
[Finished] 26f2d877 ursa-i9-9960x
[Finished] 26f2d877 ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@ursabot
Copy link

ursabot commented May 7, 2022

['Python', 'R'] benchmarks have high level of regressions.
ursa-i9-9960x

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants