Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read Performance for Large Files #677

Open
sneha5gsm opened this issue Dec 23, 2022 · 13 comments
Open

Read Performance for Large Files #677

sneha5gsm opened this issue Dec 23, 2022 · 13 comments

Comments

@sneha5gsm
Copy link

sneha5gsm commented Dec 23, 2022

Reading a large Pickle file from S3
S3 File size: 12.5 GB

Approach 1:
Reading the file using pandas read_pickle function and passing the S3 URI as input. Pandas internally uses s3fs to read from S3.
pd.read_pickle({s3_uri})
Time taken ~16.5min (990 sec)

Approach 2:
Getting the file using boto3 and passing it directly to the pandas read_pickle function:

s3 = boto3.resource("s3")
s3_bucket_name = {bucket_name}
filename = {file_name}
file_data = pd.read_pickle(s3.Bucket(s3_bucket_name).Object(filename).get()["Body"])

Time taken ~3min (180sec)

Why the HUGE difference?
I ran a few experiments by changing the default_block_size and default_cache_type .
pd.read_pickle({s3_uri}, storage_options={"default_block_size":{block_size}, "default_cache_type":{cache_type}})

S3fs has defined the default_block_size as 5MB and the default_cache_type as bytes.

The experiments suggest that changing the default_cache_type to readahead would give a good read performance improvement. Let me know your thoughts. I also wanted to know why bytes is chosen as the default cache_type for s3fs.

The following table outlines the experiments:
Note: The experiments weren't performed multiple times with the same parameters for most of the configurations, so the read times can vary by a few seconds.

S3 File Size Block Size Cache Type Total Read Time RAM on instance
12.5 GB 5*2**20 (5MB) (Default configured in S3fs) bytes 993 sec 32 GB
12.5 GB (5*2**20)*2 (10MB) bytes 696 sec 32 GB
12.5 GB (5*2**20)*4 (20MB) bytes 540 sec 32 GB
12.5 GB (5*2**20)*8 (40MB) bytes 536 sec 32 GB
         
12.5 GB 5*2**20 (5MB) (Default configured in S3fs) readahead 586 sec 32 GB
12.5 GB (5*2**20)*2 (10MB) readahead 430 sec 32 GB
12.5 GB (5*2**20)*4 (20MB) readahead 349 sec 32 GB
12.5 GB (5*2**20)*8 (40MB) readahead 466 sec 32 GB
         
12.5 GB 5*2**20 (5MB) (Default configured in S3fs) all Out of Memory 32 GB
12.5 GB 5*2**20 (5MB) (Default configured in S3fs) all 185 sec 64 GB
         
12.5 GB 5*2**20 (5MB) (Default configured in S3fs) mmap 656 sec 32 GB
12.5 GB (5*2**20)*8 (40MB) mmap 269 sec 32 GB
         
12.5 GB 5*2**20 (5MB) (Default configured in S3fs) block 617 sec 32 GB
12.5 GB (5*2**20)*8 (40MB) block 272 sec 32 GB
         
12.5 GB 5*2**20 (5MB) (Default configured in S3fs) first too long - stopped experiment 32 GB
         
12.5 GB 5*2**20 (5MB) (Default configured in S3fs) / 40 MB parts too long - stopped experiment 32 GB

Thanks!

Edit: Fixed type in code

@martindurant
Copy link
Member

I would support changing the default cache to "readahead". This is what gcsfs has, and "bytes" is the default here only because at the time of introduction, that was the only one available and we never changed it. I would also support increasing the default block size from 5MB to something like 15MB or more, here and all across fsspec backends. I am more hesitant about this change, as it will worsen performance on random access reading. However, bandwidths have increased a lot in the time s3fs has existed.

Unfortunately, s3fs has no way to know whether the file object it is creating will be read entirely from beginning to end as here, or something else. So the options exist, but picking a default that is best for all situations is not possible.

mrocklin added a commit that referenced this issue Dec 30, 2022
mrocklin added a commit that referenced this issue Dec 30, 2022
@mrocklin
Copy link
Collaborator

This seems like an excellent change? I don't know the internals of this library any longer, but I've gone ahead and made the bytes->readahead change recommended by both @sneha5gsm 's analysis and @martindurant 's comment above.

I would also support increasing the default block size from 5MB to something like 15MB or more, here and all across fsspec backends. I am more hesitant about this change, as it will worsen performance on random access reading. However, bandwidths have increased a lot in the time s3fs has existed.

@martindurant do you have thoughts on how to make this decision? Perhaps some mild benchmarking might be in order? I suspect that if you were to lay out the benchmark that you wanted to get done then you or perhaps even someone else would be willing to do that work.

Also, kudos to @sneha5gsm for the excellent analysis work here.

@mrocklin
Copy link
Collaborator

Oh, sorry, PR in #678 . But really anyone else could probably do a better job here. I'm just doing the minimal job in hopes that this is trivial to press the green button 🙂

@martindurant
Copy link
Member

thoughts on how to make this decision

It seems that it is reasonable to check the environment variables AWS_EXECUTION_ENV ("am I running in AWS"), which is set for at least ec2, fargate and lambda, and AWS_REGION ("where am I running by default"). Being in AWS would suggest bigger blocks, since IO will be much faster - not that we will know the networking capabilities of the instance, of course!

Alternatively, we could adaptively do timing during the first operations and set the blocksize on the basis of the apparent latency/throughput. This is probably tricky to do; for instance, the initial connection (per region) will need some TLS handshaking and some behind-the-scenes auth flow.

As o benchmarks, well it very much depends on what you are doing. For streaming (as in this thread), the bigger the block the better, up to memory limits.

mrocklin added a commit that referenced this issue Dec 30, 2022
@mrocklin
Copy link
Collaborator

As to benchmarks, well it very much depends on what you are doing. For streaming (as in this thread), the bigger the block the better, up to memory limits

My thought was that maybe it made sense to come up with a few workloads that we thought were representative of how we see s3fs being used today (reading parquet, reading zarr, pandas read functions, other stuff I don't know about) and then see how those workloads are affected. This might help us to make this decisions weighted by common user behaviors rather than theory, which could be useful.

Dynamically changing around cache sizes based on "am I on AWS" or not sounds grand too.

mrocklin added a commit that referenced this issue Dec 30, 2022
@mrocklin
Copy link
Collaborator

Alternatively, should Dask/Pandas/Zarr be setting certain defaults in their use of fsspec-style libraries? If 30% benefits are easily accessible then this seems like a big win to me (many workloads are entirely bound by object store bandwidth).

cc @rabernat for Xarray

martindurant pushed a commit that referenced this issue Dec 31, 2022
@rabernat
Copy link

rabernat commented Jan 1, 2023

I don't have any data to inform this choice. Martin, could use explain how these settings affect the common s3fs -> Zarr -> Xarray workflow? My guess would be that they are irrelevant, since we are generally fetching entire objects (whole chunks) at a time. But maybe kerchunk is affected?

@martindurant
Copy link
Member

Zarr is absolutely not affected, since it uses cat and gets whole files and doesn't use the file-like API. Kerchunk access is also unaffected, since cat_ranges also only reads the specific ranges given and doesn't make file-like objects.

HDF5 over fsspec is another matter. kerchunk usually uses "first" caching strategy there, because there are typically many small reads of metadata scattered throughout the file with big gaps between, but recurrent trips into the base metadata at the start of the file. That's apparently best when scanning metadata, but probably not when actually reading the data.

@mrocklin
Copy link
Collaborator

mrocklin commented Jan 1, 2023

How about Parquet reading?

@martindurant
Copy link
Member

How about Parquet reading?

It's not simple! I can lay out some points:

  • the metadata read is almost always achieved in a single fetch with little extra, and at most two fetches (you can know how much data you missed exactly from the first fetch)
  • The data read can vary from full streaming (all columns of all row groups) to sparse. The size of the chunks versus the size of the gaps will depend on the nature of the particular dataset. Readahead will help when the chunks are big and gaps small and hurt when the gaps are big (you just read extra bytes but don't use them). In many cases "none" cache is best, since chnks starts and stops are exactly known.
  • both fastparquet and pyarrow do use the file-like API.
  • @rjzamora 's blog post (benchmarked on a single column out of 30: sparse) showed that fsspec.parquet does a really good job of fetching all the pieces we know we will need prospectively, and creating a file-like object from these to give to fastparquet/pyarrow.

@stevengillard
Copy link

Interesting thread!

Looking at the code BytesCache is actually a read-ahead cache with some additional smarts for semi-random reads, so it would be interesting to understand why there's such a difference in performance, and whether it can be fixed. I suggest enabling both s3fs and fsspec debug logging to see what is going on.

I've also done some recent experiments trying to improve the performance and efficiency of s3fs when reading NetCDF4 data from S3. This is some simple processing of 12 months / 5.5GB ERA5 NetCDF data on a Dask cluster with 12 workers.

In this test I found that reducing the s3fs block size down to 512KB from the default 5MB yielded a ~35% performance improvement. This is because the underlying chunks are quite small (around 100-200KB compressed) and are not all sequential, so by default s3fs makes 5MB+ range requests for every chunk but only a fraction of the data is used. The same data is later requested again which ends up being very inefficient, with 10x more data read than is stored. The smaller block size improved the efficiency and performance with 512KB around the sweet spot in the trade-off between number of requests and data volume.

Here's a summary of the results from my testing, with results for readahead and 15MB block size thrown in. I also compared with Kerchunk and Zarr.

Test Block Size Total time (seconds) S3 requests S3 GB's read
bytes cache 5MB (default) 88.9 11,690 59.5
bytes cache 512KB 58.3 17,172 10.4
bytes cache 15MB 207.59 13,093 196.7
readahead cache 5MB (default) 74.93 11,676 59.3
Kerchunk Not set - same as NetCDF chunk (~150KB) 66.2 43,920 5.5
Zarr data Not set - same as Zarr chunk (~5MB) 21.0 1,272 6.2

readahead performed better but S3 read efficiency was about the same as the default - which is also interesting.

You are certainly right to be cautious about increasing the block size to 15MB 😃 It made performance a lot worse in this test, although it would no doubt provide a boost where data is being read sequentially. I think it really is a case-by-case situation, depending on the format and layout of the data, how it is read, and how other components like Xarray and Dask (and pickle in this issue) make and distribute read requests.

One thing I think would be really useful is to improve the logging and/or metrics collected around S3 requests made by s3fs. I added an extra logging line to s3fs/core.py _call_s3 that provided some additional information like the request time and content length in my tests:

        s3ret = await _error_wrapper(
            method, kwargs=additional_kwargs, retries=self.retries
        )
        logger.debug("S3 RESULT: %s - %s", method.__name__, s3ret)
        return s3ret

No doubt there are more elegant ways of doing this - I'd love to be able to see this data in the dask dashboard 😃

Also happy to share more details on these results if there is interest, I've been meaning to write them up!

@martindurant
Copy link
Member

I think I might have mentioned before, but the caching technique used for scanning HDF5 files with kerchunk is "first", i.e., keep hold of the header block which has a lot of the metadata in it, but otherwise don't cache.

It is not surprising that storing the data into bigger contiguous blocks with zarr gives a good performance boost. Of course, it will harm random access times a bit.

It is interesting that you include kerchunk in the ways to access the data, and that it fares similarly to HDF5 for various options. You might be aware that referenceFS has the ability to merge read simultaneous read calls if they are close enough together, but the usefulness of this depends on the access pattern. Also, if you are using xarray/dask, then you should set the dask partition size to be a decently big multiple of the base chunk size using the chunks= parameter, will make things faster.

Extra logging it a reasonable thing to add. The catchall suggested seems like it would produce a very large amount of output, I'm not sure.

I've been meaning to write them up!

A blog post of even a paper on this topic I think would be very useful for the community. Data access patterns and caching strategy are not things that get talked about much, but make a big difference. For instance, the cost of the extra bytes in a readahead cache is not typically mentioned (may be zero if in the same cloud centre).

@SnehaGhantasalaTR
Copy link

SnehaGhantasalaTR commented Feb 13, 2023

Interesting thread!

Looking at the code BytesCache is actually a read-ahead cache with some additional smarts for semi-random reads, so it would be interesting to understand why there's such a difference in performance, and whether it can be fixed. I suggest enabling both s3fs and fsspec debug logging to see what is going on.

I've also done some recent experiments trying to improve the performance and efficiency of s3fs when reading NetCDF4 data from S3. This is some simple processing of 12 months / 5.5GB ERA5 NetCDF data on a Dask cluster with 12 workers.

In this test I found that reducing the s3fs block size down to 512KB from the default 5MB yielded a ~35% performance improvement. This is because the underlying chunks are quite small (around 100-200KB compressed) and are not all sequential, so by default s3fs makes 5MB+ range requests for every chunk but only a fraction of the data is used. The same data is later requested again which ends up being very inefficient, with 10x more data read than is stored. The smaller block size improved the efficiency and performance with 512KB around the sweet spot in the trade-off between number of requests and data volume.

Here's a summary of the results from my testing, with results for readahead and 15MB block size thrown in. I also compared with Kerchunk and Zarr.

Test Block Size Total time (seconds) S3 requests S3 GB's read
bytes cache 5MB (default) 88.9 11,690 59.5
bytes cache 512KB 58.3 17,172 10.4
bytes cache 15MB 207.59 13,093 196.7
readahead cache 5MB (default) 74.93 11,676 59.3
Kerchunk Not set - same as NetCDF chunk (~150KB) 66.2 43,920 5.5
Zarr data Not set - same as Zarr chunk (~5MB) 21.0 1,272 6.2
readahead performed better but S3 read efficiency was about the same as the default - which is also interesting.

You are certainly right to be cautious about increasing the block size to 15MB 😃 It made performance a lot worse in this test, although it would no doubt provide a boost where data is being read sequentially. I think it really is a case-by-case situation, depending on the format and layout of the data, how it is read, and how other components like Xarray and Dask (and pickle in this issue) make and distribute read requests.

One thing I think would be really useful is to improve the logging and/or metrics collected around S3 requests made by s3fs. I added an extra logging line to s3fs/core.py _call_s3 that provided some additional information like the request time and content length in my tests:

        s3ret = await _error_wrapper(
            method, kwargs=additional_kwargs, retries=self.retries
        )
        logger.debug("S3 RESULT: %s - %s", method.__name__, s3ret)
        return s3ret

No doubt there are more elegant ways of doing this - I'd love to be able to see this data in the dask dashboard 😃

Also happy to share more details on these results if there is interest, I've been meaning to write them up!

@stevengillard
Did you try the combination of readahead cache and a block_size of 512 KB ?

EDIT: Typo

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants