Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock in the interaction between pyarrow.filesystem.S3FSWrapper and s3fs.core.S3FileSystem #365

Closed
dmcguire81 opened this issue Sep 10, 2020 · 22 comments

Comments

@dmcguire81
Copy link

dmcguire81 commented Sep 10, 2020

Please be concise with code posted. See guidelines below on how to provide a good bug report:

Bug reports that follow these guidelines are easier to diagnose, and so are often handled much more quickly.
-->

What happened: Some interaction between s3fs, pyarrow, and petastorm causes deadlock

What you expected to happen: s3fs to be threadsafe, if pyarrow is using it that way

Minimal Complete Verifiable Example:

import pyarrow.parquet as pq
from petastorm.fs_utils import get_filesystem_and_path_or_paths, normalize_dir_url

dataset_url = 's3://<redacted>'

# Repeat basic steps that make_reader or make_batch_reader normally does
dataset_url = normalize_dir_url(dataset_url)
fs, path = get_filesystem_and_path_or_paths(dataset_url)

# Finished in seconds
dataset = pq.ParquetDataset(path, filesystem=fs, metadata_nthreads=1)
# Hung all night
dataset = pq.ParquetDataset(path, filesystem=fs, metadata_nthreads=10)

# Their code
>>> type(fs)
<class 'pyarrow.filesystem.S3FSWrapper'>
# Your code
>>> type(fs.fs)
<class 's3fs.core.S3FileSystem'>

Anything else we need to know?:

If your code is not threadsafe, that would appear to be news to pyarrow. Also reported to Petastorm. Will be reported to PyArrow.

Environment:

  • Dask version: 0.4.2
  • Python version: 3.7.8
  • Operating System: Mac OS 10.15.6
  • Install method (conda, pip, source): pip install s3fs==0.4.2
@dmcguire81 dmcguire81 changed the title https://github.com/uber/petastorm/issues/590 Deadlock in the interaction between pyarrow.filesystem.S3FSWrapper and s3fs.core.S3FileSystem Sep 11, 2020
@martindurant
Copy link
Member

This is the same as #350 , as has been fixed upstream in aiobotocore. I believe upgrading to aiobotocore 1.1.1 should solve - but you can also set the AWS_DEFAULT_REGION environment variable as a workaround.

@dmcguire81
Copy link
Author

That didn't help. Did you try to reproduce this?

$ pip freeze | grep s3fs
s3fs==0.4.2
$ pip freeze | grep aiobotocore
aiobotocore==1.1.1
$ AWS_DEFAULT_REGION=us-east-1 python
Python 3.7.8 (default, Jul 27 2020, 17:21:35)
[Clang 11.0.3 (clang-1103.0.32.59)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow.parquet as pq
>>> from petastorm.fs_utils import get_filesystem_and_path_or_paths, normalize_dir_url
>>>
>>> dataset_url = "s3://<redacted>"
>>>
>>> dataset_url = normalize_dir_url(dataset_url)
>>> fs, path = get_filesystem_and_path_or_paths(dataset_url)
>>>
>>> # still waiting
>>> dataset = pq.ParquetDataset(path, filesystem=fs, metadata_nthreads=10)

@martindurant
Copy link
Member

I do not have petastorm...

Are you saying that if fs is a pyarrow variant, it fails, but if it is the S3FileSystem directly, it works?

If so, I would suggest asking petastorm not to use the pyarrow wrappers (or indeed try to parse URLs) but just use fsspec directly. Also, you might try with fsspec/s3fs master, which changed what happens to coroutines in some situations.

cc @jorisvandenbossche

@dmcguire81
Copy link
Author

Alright, I'll try to work up a repro case that doesn't include petastorm.

@dmcguire81
Copy link
Author

dmcguire81 commented Sep 11, 2020

Here's what I got, based on the versions that were (likely) installed when the error was first encountered. These are from my notes when I patched it by disabling multithreaded metadata discovery in our fork of petastorm, so I haven't had a chance to see if newer versions are working, or if a simpler case would reproduce it. Notably, this uses filters, and it's unclear if that's the difference. Unfortunately, you'll need to find your own hive-partitioned dataset to apply filters on, but it's pretty trivial to construct with only awscli, if you know the format; I've changed the partition names and values, but kept the structure.

Setup

pip install pyarrow==0.17.1
pip install s3fs==0.4.2

Test Case

import pyarrow.parquet as pq
from s3fs import S3FileSystem
from pyarrow.filesystem import S3FSWrapper

fs = S3FSWrapper(S3FileSystem())
dataset_url = "s3://our-bucket/series/of/prefixes/partition1=foo/partition2=bar"
filters = [[('partition3', '=', 'baz')]]

# Slow, but does actually make meaningful progress
dataset = pq.ParquetDataset(dataset_url, filesystem=fs, filters=filters, metadata_nthreads=1)
# Seems to be deadlocked, given that connections to S3 are all in `CLOSE_WAIT`, indefinitely
dataset = pq.ParquetDataset(dataset_url, filesystem=fs, filters=filters, metadata_nthreads=10)

Analysis

How do I know it's deadlocked? Well, as mentioned, the first time I reproduced it, I let it run overnight (8 hours) to be sure it wasn't just incredibly slow. Upon further investigation, it's done will all meaningful work (according to lsof) almost immediately, then just sits with the S3 connection in CLOSE_WAIT, indefinitely:

$ lsof -p 88248 | grep s3
python3.7 88248 dmcguire    5u     IPv4 0xa1009c7791331a23      0t0                 TCP 192.168.1.114:58325->s3-1-w.amazonaws.com:https (CLOSE_WAIT)

@dmcguire81
Copy link
Author

This is updated to use the latest pyarrow, and not to do the single-threaded metadata discovery, first, just in case there's some kind of strange caching I don't understand.

Setup

pip install pyarrow==1.0.1
pip install s3fs==0.4.2

Test Case

import pyarrow.parquet as pq
from s3fs import S3FileSystem
from pyarrow.filesystem import S3FSWrapper

fs = S3FSWrapper(S3FileSystem())
dataset_url = "s3://our-bucket/series/of/prefixes/partition1=foo/partition2=bar"
filters = [[('partition3', '=', 'baz')]]

# Just go straight to multithreaded metadata discovery, just to be sure
dataset = pq.ParquetDataset(dataset_url, filesystem=fs, filters=filters, metadata_nthreads=10)

Analysis

Again, it's done will all meaningful work (according to lsof) almost immediately, then just sits with the S3 connections in CLOSE_WAIT, indefinitely:

$ lsof -p 88827 | grep s3
python3.7 88827 dmcguire    5u     IPv4 0xa1009c77bb142ee3      0t0                 TCP 192.168.1.114:58543->s3-1-w.amazonaws.com:https (CLOSE_WAIT)
python3.7 88827 dmcguire    6u     IPv4 0xa1009c77be0fca23      0t0                 TCP 192.168.1.114:58551->s3-1-w.amazonaws.com:https (CLOSE_WAIT)
python3.7 88827 dmcguire    7u     IPv4 0xa1009c77bebaba23      0t0                 TCP 192.168.1.114:58552->s3-1-w.amazonaws.com:https (CLOSE_WAIT)
python3.7 88827 dmcguire    8u     IPv4 0xa1009c77bec84663      0t0                 TCP 192.168.1.114:58545->s3-1-w.amazonaws.com:https (CLOSE_WAIT)
python3.7 88827 dmcguire    9u     IPv4 0xa1009c772ef692a3      0t0                 TCP 192.168.1.114:58544->s3-1-w.amazonaws.com:https (CLOSE_WAIT)
python3.7 88827 dmcguire   10u     IPv4 0xa1009c772ef69c83      0t0                 TCP 192.168.1.114:58546->s3-1-w.amazonaws.com:https (CLOSE_WAIT)
python3.7 88827 dmcguire   11u     IPv4 0xa1009c77bebab043      0t0                 TCP 192.168.1.114:58548->s3-1-w.amazonaws.com:https (CLOSE_WAIT)
python3.7 88827 dmcguire   13u     IPv4 0xa1009c77bebaa663      0t0                 TCP 192.168.1.114:58547->s3-1-w.amazonaws.com:https (CLOSE_WAIT)
python3.7 88827 dmcguire   14u     IPv4 0xa1009c77beba9c83      0t0                 TCP 192.168.1.114:58549->s3-1-w.amazonaws.com:https (CLOSE_WAIT)
python3.7 88827 dmcguire   15u     IPv4 0xa1009c77beba7ee3      0t0                 TCP 192.168.1.114:58550->s3-1-w.amazonaws.com:https (CLOSE_WAIT)

@dmcguire81
Copy link
Author

@martindurant aiobotocore does not appear to be installed by s3fs==0.4.2 or either version of pyarrow, so I'm not sure how you can say that this is the same issue:

$ pip install pyarrow==0.17.1
$ pip install s3fs==0.4.2
$ pip freeze | grep aiobotocore # no hits

and

$ pip install --upgrade pyarrrow==1.0.1
$ pip freeze | grep aiobotocore # *still* no hits

It might be related to my defect against the later versions, which does seem to install aiobotocore, so maybe you just read them both and responded to the wrong one. I'll jump over there and see if that's a relevant fix.

@dmcguire81
Copy link
Author

Alright, I was able to confirm that this has to do with the filters argument to pyarrow.parquet.ParquetDataset, since leaving it out allows the code to complete:

import pyarrow.parquet as pq
from s3fs import S3FileSystem
from pyarrow.filesystem import S3FSWrapper

fs = S3FSWrapper(S3FileSystem())
dataset_url = "s3://our-bucket/series/of/prefixes/partition1=foo/partition2=bar/partition3=baz"

# This completes as soon as all the `ESTABLISHED` connections turn to `CLOSE_WAIT`
dataset = pq.ParquetDataset(dataset_url, filesystem=fs, metadata_nthreads=10)

@martindurant
Copy link
Member

Sorry about the aiobotocore red herring - that seemed likely to cause deadlocking, as it is async stuff.

Quick questions:

  • do things work if fs = S3FileSystem() (i.e., no wrapper) ?
  • is there any effect from the metadata_nthreads parameter, particularly if it is 1?

@dmcguire81
Copy link
Author

Sorry about the aiobotocore red herring - that seemed likely to cause deadlocking, as it is async stuff.

Quick questions:

* do things work if `fs = S3FileSystem()` (i.e., no wrapper) ?

I don't think that will work, because pyarrow.parquet.ParquetDataset expects an instance of pyarrow.FileSystem, but I'll give it a shot and get back to you.

* is there any effect from the `metadata_nthreads` parameter, particularly if it is 1?

Yes, it's definitely effected by this. Everything single-threaded seems to work (if very, very slowly). I can watch the effective progress, as I mentioned, by looking for ESTABLISHED connections to S3, to differentiate between slow and (likely) deadlocked.

Also, the above filters differentiation was misleading, it actually had to do with the depth of traversal that the pyarrow.filesystem.S3FSWrapper attempts to undertake, since the first test case, above, uses two hard-coded partitions in the path (partition1=foo/partition2=bar) and one filter (('partition3', '=', 'baz')), where as the second case uses three hard-coded partitions in the path (partition1=foo/partition2=bar/partition3=baz) and no filters, just so the listing would be constrained. Instead, comparing apples-to-apples of two hard-coded partitions, with or without filters, causes deadlock. This appears to be because, with only two partitions specified, the manifest being built needs to crawl two additional layers of partitions in the path, and fails. If three are prescribed, and it only needs to crawl one, then it's successful. I'll have the details of the repro test case momentarily.

@dmcguire81
Copy link
Author

Summary

Crawling hierarchical partitions (depth >= 2) with pyarrow.parquet.ParquetDataset leads to deadlock with multithreaded metadata discovery.

Test Cases

Multithreaded
import pyarrow.parquet as pq
from s3fs import S3FileSystem
from pyarrow.filesystem import S3FSWrapper

fs = S3FSWrapper(S3FileSystem())
# This hard-codes 2 out of the 4 total partitions in the path (leaving 2 to be crawled)
dataset_url = "s3://our-bucket/series/of/prefixes/partition1=foo/partition2=bar"

dataset = pq.ParquetDataset(dataset_url, filesystem=fs, metadata_nthreads=100)

Ran for an hour trying to crawl ~41K objects unsuccessfully.

Single-threaded
import pyarrow.parquet as pq
from s3fs import S3FileSystem
from pyarrow.filesystem import S3FSWrapper

fs = S3FSWrapper(S3FileSystem())
# This hard-codes 2 out of the 4 total partitions in the path (leaving 2 to be crawled)
dataset_url = "s3://our-bucket/series/of/prefixes/partition1=foo/partition2=bar"

dataset = pq.ParquetDataset(dataset_url, filesystem=fs, metadata_nthreads=1)

Appears to still be making meaningful progress (will update the results after, or timeout at an hour).

Reference Stats

Count of objects:

$ aws s3 ls --recursive s3://our-bucket/series/of/prefixes/partition1=foo/partition2=bar | wc -l
   41004

Baseline time to crawl:

$ time aws s3 ls --recursive s3://our-bucket/series/of/prefixes/partition1=foo/partition2=bar
...

real    0m20.734s
user    0m7.292s
sys     0m0.325s

Diagnostics

When killing the hung repro test case, here's the stack trace of where it's waiting:

^CTraceback (most recent call last):
  File "repro.py", line 9, in <module>
    dataset = pq.ParquetDataset(dataset_url, filesystem=fs, metadata_nthreads=100)
  File "./env/lib/python3.7/site-packages/pyarrow/parquet.py", line 1170, in __init__
    open_file_func=partial(_open_dataset_file, self._metadata)
  File "./env/lib/python3.7/site-packages/pyarrow/parquet.py", line 1348, in _make_manifest
    metadata_nthreads=metadata_nthreads)
  File "./env/lib/python3.7/site-packages/pyarrow/parquet.py", line 927, in __init__
    self._visit_level(0, self.dirpath, [])
  File "./env/lib/python3.7/site-packages/pyarrow/parquet.py", line 968, in _visit_level
    self._visit_directories(level, filtered_directories, part_keys)
  File "./env/lib/python3.7/site-packages/pyarrow/parquet.py", line 998, in _visit_directories
    futures.wait(futures_list)
  File "./.pyenv/versions/3.7.8/lib/python3.7/concurrent/futures/_base.py", line 301, in wait
    waiter.event.wait(timeout)
  File "./.pyenv/versions/3.7.8/lib/python3.7/threading.py", line 552, in wait
    signaled = self._cond.wait(timeout)
  File "./.pyenv/versions/3.7.8/lib/python3.7/threading.py", line 296, in wait
    waiter.acquire()

That corresponds to this line in the 1.0.x maintanence branch of the pyarrow GitHub repo, but, as I mentioned, it's unclear how to follow up with them. Maybe your CC of @jorisvandenbossche will help call attention.

@dmcguire81
Copy link
Author

dmcguire81 commented Sep 11, 2020

Quick questions:

* do things work if `fs = S3FileSystem()` (i.e., no wrapper) ?

I don't think that will work, because pyarrow.parquet.ParquetDataset expects an instance of pyarrow.FileSystem, but I'll give it a shot and get back to you.

You were right that they are compatible, but that configuration exhibits the same symptoms as the wrapped filesystem, above, where the number of ESTABLISHED connections quickly drops to 0 and the crawling does not appear to be making meaningful progress. Here's the (edited) code, for reference:

import pyarrow.parquet as pq
from s3fs import S3FileSystem

fs = S3FileSystem()
# This hard-codes 2 out of the 4 total partitions in the path (leaving 2 to be crawled)
dataset_url = "s3://our-bucket/series/of/prefixes/partition1=foo/partition2=bar"

dataset = pq.ParquetDataset(dataset_url, filesystem=fs, metadata_nthreads=100)

This exhibited the same behavior:

$ time python repro.py
^CTraceback (most recent call last):
...

real	57m6.142s
user	0m3.385s
sys	0m1.274s

@martindurant
Copy link
Member

I wonder if this is still a problem with the async/aiobotocore-based latest version. That ought to be friendly to multithreading. I think that for the original botocore version you are using, we may simply need to say that it isn't threadsafe, and using threads will kill it above some threshold.

In Dask, we no longer crawl all the files in a dataset by default, partly because of this kind of problem (such crawling would not be done in parallel on workers). In the absence of a global _metadata file, it seems best to assume the schema (you probably do have a _common_metadata file) and not bother collecting min/max values for all columns and all files.

By the way, tests with the new async version shows up to 100x speedup for concurrent small reads from many (>1000) files on S3. Of course, pyarrow isn't plumbed to use that.

@dmcguire81
Copy link
Author

I have issues using the latest version. See #366.

@jorisvandenbossche
Copy link

jorisvandenbossche commented Sep 14, 2020

I'm having trouble navigating the bug-reporting process for Apache Arrow, if you're able to pass this on to them.

@dmcguire81 see https://arrow.apache.org/community/ for several avenues to reach out / open issues. But, I think it is fine to first discuss further here, to try to diagnose the issue, before opening another issue at the Arrow side.

Using the S3FileSystem directly or wrapped in the S3FSWrapper should indeed not matter much (EDIT: although it does override the walk method, causing apparently causing #366), as in the end in both cases the S3FileSystem gets used by pyarrow (the wrapper is applied automatically to fsspec filesystems that are not subclasses of pyarrow.filesytem.FileSystem, but nowadays S3FileSystem is already such a subclass).

(but indeed, petastorm should not use this S3FSWrapper directly)

That corresponds to this line in the 1.0.x maintanence branch of the pyarrow GitHub repo

So it hangs when discovering all the directories (to build up the partition structure) and when doing that with a threadpool. It seems that that code only calls the walk method of the filesystem.
Based on the code of https://github.com/apache/arrow/blob/5435b005fbef32c4ad828ddbec95d73294473c7c/python/pyarrow/parquet.py#L939-L998, it might be possible to make a smaller reproducer using a threadpool and filesystem walk to see if this is related to pyarrow or not.

@martindurant
Copy link
Member

it might be possible to make a smaller reproducer using a threadpool and filesystem walk

This is probably the way to go, agreed. I don't think I have the time this week.

@dmcguire81
Copy link
Author

@jorisvandenbossche and @martindurant thanks for the feedback. I'll see if I can follow well enough to create the reproducer.

@dmcguire81
Copy link
Author

Pulling in @selitvin to help investigate overlap with petastorm tests timing out.

@dmcguire81
Copy link
Author

dmcguire81 commented Sep 17, 2020

@martindurant good news (for you): I have a repro test case that is 100% pyarrow, so it looks like s3fs is not involved.

@jorisvandenbossche how should I follow up with this, based on pyarrow.filesystem.LocalFileSystem?

import pyarrow.parquet as pq
import pyarrow.filesystem as fs

class LoggingLocalFileSystem(fs.LocalFileSystem):
    def walk(self, path):
        print(path)
        return super().walk(path)

fs = LoggingLocalFileSystem()
dataset_url = "dataset"

# Viewing the File System *directories* as a tree, one thread is required for every non-leaf node,
# in order to avoid deadlock

# 1) dataset
# 2) dataset/foo=1
# 3) dataset/foo=1/bar=2
# 4) dataset/foo=1/bar=2/baz=0
# 5) dataset/foo=1/bar=2/baz=1
# 6) dataset/foo=1/bar=2/baz=2
# *) dataset/foo=1/bar=2/baz=0/qux=false
# *) dataset/foo=1/bar=2/baz=1/qux=false
# *) dataset/foo=1/bar=2/baz=1/qux=true
# *) dataset/foo=1/bar=2/baz=0/qux=true
# *) dataset/foo=1/bar=2/baz=2/qux=false
# *) dataset/foo=1/bar=2/baz=2/qux=true

# This completes
threads = 6
dataset = pq.ParquetDataset(dataset_url, filesystem=fs, validate_schema=False, metadata_nthreads=threads)
print(len(dataset.pieces))

# This hangs indefinitely
threads = 5
dataset = pq.ParquetDataset(dataset_url, filesystem=fs, validate_schema=False, metadata_nthreads=threads)
print(len(dataset.pieces))
$ python repro.py 
dataset
dataset/foo=1
dataset/foo=1/bar=2
dataset/foo=1/bar=2/baz=0
dataset/foo=1/bar=2/baz=1
dataset/foo=1/bar=2/baz=2
dataset/foo=1/bar=2/baz=0/qux=false
dataset/foo=1/bar=2/baz=0/qux=true
dataset/foo=1/bar=2/baz=1/qux=false
dataset/foo=1/bar=2/baz=1/qux=true
dataset/foo=1/bar=2/baz=2/qux=false
dataset/foo=1/bar=2/baz=2/qux=true
6
dataset
dataset/foo=1
dataset/foo=1/bar=2
dataset/foo=1/bar=2/baz=0
dataset/foo=1/bar=2/baz=1
dataset/foo=1/bar=2/baz=2
^C
...
KeyboardInterrupt
^C
...
KeyboardInterrupt

NOTE: this also happens with the un-decorated LocalFileSystem, and when omitting the filesystem argument.

@dmcguire81
Copy link
Author

Reported as ARROW-10029.

@dmcguire81
Copy link
Author

@martindurant thanks for help and sorry for the confusion!

@jorisvandenbossche
Copy link

Cool, thanks for further looking into it and figuring it out @dmcguire81 !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants