Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error when using with pd.read_parquet with threading on #213

Open
mtrbean opened this issue Aug 6, 2019 · 13 comments
Open

Error when using with pd.read_parquet with threading on #213

mtrbean opened this issue Aug 6, 2019 · 13 comments

Comments

@mtrbean
Copy link
Contributor

mtrbean commented Aug 6, 2019

Since updating to latest version I randomly get errors like this:

Traceback (most recent call last):
  File "print_parquet_metadata.py", line 28, in <module>
    df = pd.read_parquet(args.file, use_threads=True)
  File "/Users/mtrbean/.pyenv/versions/stats3/lib/python3.6/site-packages/pandas/io/parquet.py", line 282, in read_parquet
    return impl.read(path, columns=columns, **kwargs)
  File "/Users/mtrbean/.pyenv/versions/stats3/lib/python3.6/site-packages/pandas/io/parquet.py", line 129, in read
    **kwargs).to_pandas()
  File "/Users/mtrbean/.pyenv/versions/stats3/lib/python3.6/site-packages/pyarrow/parquet.py", line 1216, in read_table
    use_pandas_metadata=use_pandas_metadata)
  File "/Users/mtrbean/.pyenv/versions/stats3/lib/python3.6/site-packages/pyarrow/parquet.py", line 216, in read
    use_threads=use_threads)
  File "pyarrow/_parquet.pyx", line 1086, in pyarrow._parquet.ParquetReader.read_all
  File "pyarrow/error.pxi", line 87, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: Unexpected end of stream: Page was smaller (26292) than expected (59582)

I'm pretty sure that the file is not corrupted because sometimes it can be read successfully, and it can also be read when the file is local.

It is also very hard to reproduce consistently but if I try to read the file in a loop, I realize a pattern:

This will succeed 30 times in a row without a problem

f = "s3://<bucket>/<parquet_file>"
for i in range(30):
    pd.read_parquet(f, use_threads=False)

This will fail sometimes on the first iteration, sometimes on the second, and never got to iteration number 5:

f = "s3://<bucket>/<parquet_file>"
for i in range(30):
    pd.read_parquet(f, use_threads=True)

I suspect that it has something to do with random access with multiple threads, and the cache is not handling it correctly.

Finally this is the debug log which I hope will help

DEBUG:urllib3.util.retry:Converted retries value: False -> Retry(total=False, connect=None, read=None, redirect=0, status=None)
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): mybucket.s3.us-west-2.amazonaws.com
DEBUG:urllib3.connectionpool:https://mybucket.s3.us-west-2.amazonaws.com:443 "GET /?list-type=2&prefix=&delimiter=%2F&encoding-type=url HTTP/1.1" 200 None
DEBUG:urllib3.util.retry:Converted retries value: False -> Retry(total=False, connect=None, read=None, redirect=0, status=None)
DEBUG:urllib3.connectionpool:https://mybucket.s3.us-west-2.amazonaws.com:443 "GET /?list-type=2&prefix=temp%2F&delimiter=%2F&encoding-type=url HTTP/1.1" 200 None
DEBUG:fsspec:<File-like object S3FileSystem, mybucket/temp/data.parquet> read: 18606060 - 18671596
DEBUG:s3fs.core:Fetch: mybucket/temp/data.parquet, 18606060-23914476
DEBUG:urllib3.util.retry:Converted retries value: False -> Retry(total=False, connect=None, read=None, redirect=0, status=None)
DEBUG:urllib3.connectionpool:https://mybucket.s3.us-west-2.amazonaws.com:443 "GET /temp/data.parquet HTTP/1.1" 206 65536
DEBUG:fsspec:<File-like object S3FileSystem, mybucket/temp/data.parquet> read: 18176581 - 18670182
DEBUG:s3fs.core:Fetch: mybucket/temp/data.parquet, 18176581-18606060
DEBUG:urllib3.util.retry:Converted retries value: False -> Retry(total=False, connect=None, read=None, redirect=0, status=None)
DEBUG:urllib3.connectionpool:https://mybucket.s3.us-west-2.amazonaws.com:443 "GET /temp/data.parquet HTTP/1.1" 206 429479
DEBUG:fsspec:<File-like object S3FileSystem, mybucket/temp/data.parquet> read: 4 - 7237571
DEBUG:s3fs.core:Fetch: mybucket/temp/data.parquet, 4-12480451
DEBUG:urllib3.util.retry:Converted retries value: False -> Retry(total=False, connect=None, read=None, redirect=0, status=None)
DEBUG:urllib3.connectionpool:https://mybucket.s3.us-west-2.amazonaws.com:443 "GET /temp/data.parquet HTTP/1.1" 206 12480447
DEBUG:fsspec:<File-like object S3FileSystem, mybucket/temp/data.parquet> read: 8274755 - 18176458
DEBUG:s3fs.core:Fetch: mybucket/temp/data.parquet, 8274755-10485764
DEBUG:urllib3.util.retry:Converted retries value: False -> Retry(total=False, connect=None, read=None, redirect=0, status=None)
DEBUG:urllib3.connectionpool:https://mybucket.s3.us-west-2.amazonaws.com:443 "GET /temp/data.parquet HTTP/1.1" 206 2211009
DEBUG:fsspec:<File-like object S3FileSystem, mybucket/temp/data.parquet> read: 7237648 - 8274676
DEBUG:s3fs.core:Fetch: mybucket/temp/data.parquet, 7237648-8274755
DEBUG:urllib3.util.retry:Converted retries value: False -> Retry(total=False, connect=None, read=None, redirect=0, status=None)
DEBUG:urllib3.connectionpool:https://mybucket.s3.us-west-2.amazonaws.com:443 "GET /temp/data.parquet HTTP/1.1" 206 1037107
Traceback (most recent call last):
  File "print_parquet_metadata.py", line 28, in <module>
    df = pd.read_parquet(args.file, use_threads=True)
  File "/Users/mtrbean/.pyenv/versions/stats3/lib/python3.6/site-packages/pandas/io/parquet.py", line 282, in read_parquet
    return impl.read(path, columns=columns, **kwargs)
  File "/Users/mtrbean/.pyenv/versions/stats3/lib/python3.6/site-packages/pandas/io/parquet.py", line 129, in read
    **kwargs).to_pandas()
  File "/Users/mtrbean/.pyenv/versions/stats3/lib/python3.6/site-packages/pyarrow/parquet.py", line 1216, in read_table
    use_pandas_metadata=use_pandas_metadata)
  File "/Users/mtrbean/.pyenv/versions/stats3/lib/python3.6/site-packages/pyarrow/parquet.py", line 216, in read
    use_threads=use_threads)
  File "pyarrow/_parquet.pyx", line 1086, in pyarrow._parquet.ParquetReader.read_all
  File "pyarrow/error.pxi", line 87, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: Unexpected end of stream: Page was smaller (26292) than expected (59582)
@martindurant
Copy link
Member

Please post/link to pyarrow too. s3fs does not do anything particular with threads, the S3FileObjects are independent instances, but S3FileSystem is a singleton (for given parameters) with shared state.

@mtrbean
Copy link
Contributor Author

mtrbean commented Aug 6, 2019

I don't think it's a problem with pyarrow per se, as it's reading regions that do not have overlap. And reading a local file always succeeds, threading turned on or not. Since I think there is only one instance of S3FileObjects, I think the issue is cache not being thread safe (and apologies if I misunderstood anything):

If you read the lines starting with DEBUG:fsspec in the debug logs above, the parquet file is read in 5 chunks. The first chunk is 65536 bytes of metadata at 18606060 - 18671596 (which is at the end of the file), and then the following blocks in parallel when threading is turned on:

  • 4 - 7237571
  • 7237648 - 8274676
  • 8274755 - 18176458
  • 18176581 - 18670182

However s3fs.core:Fetch is pulling

  • (metadata) 18606060 - 23914476 <- beyond end of file but harmless
  • 4 - 12480451
  • 7237648 - 8274755
  • 8274755 - 10485764 <- much less than the requested range
  • 18176581 - 18606060 <- also less than the requested range, but the missing range 18606060 - 18670182 is already covered by the first metadata fetch (18606060 - 18671596)

which seems to suggest that s3fs.core:Fetch is confused about what is in cache and what is not. The range 10485765 - 18176458 was never downloaded.

@mtrbean
Copy link
Contributor Author

mtrbean commented Aug 7, 2019

I currently work around it by turning off threading in pyarrow, but I think the best way is to turn off caching. What is the best way to turn off fsspec’s caching? @martindurant

@martindurant
Copy link
Member

Yeah, I didn't at first realise that the same file instance was being shared across threads - that's not how I normally think about things from a Dask perspective.

What is the best way to turn off fsspec’s caching?

Yes, that's the conclusion I was coming to also. You can use open(.., cache_type='none') to make a file instance and pass that instead of the URL. There is no way to change the default (except monkey patching)

@mtrbean
Copy link
Contributor Author

mtrbean commented Aug 7, 2019

Should we allow setting default cache_type in the AbstractFileSystem (as an argument to __init__)? I'm happy to submit a PR to fsspec. Also happy to implement a threadsafe cache

@martindurant
Copy link
Member

Both of those sound reasonable.

Note on a threadsafe cache: I would still want the file instance to be pickleable, so presumably the cache in instances reconstituted by pickle would always have empty caches (unless they happen to be in the same thread?).

@mtrbean
Copy link
Contributor Author

mtrbean commented Aug 8, 2019

I decided to implement default cache_type in s3fs (PR #214 ).

However when working on the cache I found that currently MMapCache is not pickleable. I guess it needs to be fixed as well?

@martindurant
Copy link
Member

Good point.

fsspec/filesystem_spec#99 does this, but cloudpickle only. Would be good not to have to use it, you might have a better idea.

@mtrbean
Copy link
Contributor Author

mtrbean commented Aug 8, 2019

If you don’t mind losing the cache content, you can try implementing getstate and setstate

@martindurant
Copy link
Member

I think I'll park it there and see what people think. If you have a nice implementation, would be happy to see it, and indeed one for a threadsafe bytes cache. I don't know when I would get to this.

@martindurant
Copy link
Member

You know, I may even back away from saying that files should be pickleable, since we do have OpenFile after all, specifically for passing around things that are pickleable, but can be made into files with with.

@maximerihouey
Copy link

Hello, do you have any updates on this issue ?

@martindurant
Copy link
Member

Apparently boto3 sessions are not thread-safe, but we have not come across this before, so maybe previously they accidentally were. Certainly, s3fs instances get shared, but this is not new behaviour. I do not have a good idea of how to produce thread-local boto sessions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants