Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

put_file: support concurrent multipart uploads with max_concurrency #848

Merged
merged 2 commits into from
Feb 27, 2024

Conversation

pmrowla
Copy link
Contributor

@pmrowla pmrowla commented Feb 12, 2024

  • Adds max_concurrency parameter which can be used to increase the concurrency for multipart uploads during S3FileSystem._put_file() (behaves the same as max_concurrency for uploads in adlfs)
    • max_concurrency can be set at the fs instance level or at as a parameter passed to _put_file()
    • When set, this will stack multiplicatively with fs.put(batch_size=...), so you will end up with a maximum of max_concurrency * batch_size parts being transferred at once
  • This does not affect downloads/_get_file().

@pmrowla
Copy link
Contributor Author

pmrowla commented Feb 12, 2024

For reference, I live in Seoul and am physically close to the ap-northeast-2 datacenter. When testing against a bucket located in ap-northeast-2, a single put_file() maxes out my 1Gbps connection regardless of concurrency (i.e. both s3fs main and this PR with any concurrency value hit something like 900+ Mbps upload).

However, for any other region, increasing the concurrency makes a noticeable difference for me. Using us-west-2 as an example, I get a maximum of ~160Mbps per connection, but increasing the concurrency will allow me to get closer to maxing out my available bandwidth.

Given this script and a bucket in us-west-2:

s3 = s3fs.S3FileSystem(cache_regions=True)
for i in (None, 32):
    s3.put_file(
        "ubuntu-22.04.2-live-server-arm64.iso",
        f"s3://{BUCKET}/s3fs-test/ubuntu-22.04.2-live-server-arm64.iso",
        callback=TqdmCallback(tqdm_kwargs={"desc": f"put file max_concurrency={i}", "unit": "B", "unit_scale": True}),
        max_concurrency=i,
    )
$ python test.py
put file max_concurrency=None: 100%|███████████████████████████| 1.94G/1.94G [01:53<00:00, 17.1MB/s]
put file max_concurrency=32: 100%|█████████████████████████████| 1.94G/1.94G [00:31<00:00, 62.1MB/s]

For reference, aws s3 cp takes ~40s to transfer the same file to the same bucket:

$ time aws s3 cp ubuntu-22.04.2-live-server-arm64.iso s3://{BUCKET}/s3fs-test/ubuntu-22.04.2-live-server-arm64.iso
upload: ./ubuntu-22.04.2-live-server-arm64.iso to s3://{BUCKET}/s3fs-test/ubuntu-22.04.2-live-server-arm64.iso
aws s3 cp ubuntu-22.04.2-live-server-arm64.iso   7.53s user 4.03s system 26% cpu 43.334 total

s3fs/core.py Show resolved Hide resolved
@martindurant
Copy link
Member

However, for any other region, increasing the concurrency makes a noticeable difference for me. Using us-west-2 as an example, I get a maximum of ~160Mbps per connection, but increasing the concurrency will allow me to max out the full 1Gbps bandwidth.

Any idea why this happens? The upload should in theory saturate the bandwidth whether on a single call for the whole massive file (after one count of latency) or on many concurrent calls (that all wait ~1 count of latency at the same time).

@pmrowla
Copy link
Contributor Author

pmrowla commented Feb 12, 2024

Any idea why this happens? The upload should in theory saturate the bandwidth whether on a single call for the whole massive file (after one count of latency) or on many concurrent calls (that all wait ~1 count of latency at the same time).

I'm not sure, and as I mentioned I do get the max theoretical bandwidth when I'm guaranteed to have good routing to the datacenter, whether or not it's a concurrent upload.

But we have seen users with the same issues with other storage providers as well, so this isn't limited to AWS, see adlfs/azure issue:
fsspec/adlfs#420
iterative/dvc-azure#54 (comment)

It's probably worth noting that boto3/s3transfer also do multipart uploads in concurrent threads rather than sequentially

@martindurant
Copy link
Member

I do get the max theoretical bandwidth when I'm guaranteed to have good routing to the datacenter,

OK, let's assume there is some component of per-packet latency too, then. Perhaps it's an SSL thing. Thanks for digging.

Copy link
Member

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a couple of thoughts on naming, and when/if we can apply the same strategy to the other high-bandwidth operations.

The only substantial comment is about the batching strategy. It may not matter, and we can keep this approach for now, since it already produces an improvement.

s3fs/core.py Outdated
@@ -1201,6 +1206,54 @@ async def _put_file(
self.invalidate_cache(rpath)
rpath = self._parent(rpath)

async def _upload_part_concurrent(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please rename this to indicate it is uploading from a file, as opposed to bytes. Or can it be generalised to support pipe() too?

s3fs/core.py Outdated Show resolved Hide resolved
while True:
chunks = []
for i in range(max_concurrency):
chunk = f0.read(chunksize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhere we need a caveat, that increasing concurrency will lead to high memory use.

s3fs/core.py Show resolved Hide resolved
s3fs/core.py Outdated
Comment on lines 239 to 243
If given, the maximum number of concurrent transfers to use for a
multipart upload. Defaults to 1 (multipart uploads will be done sequentially).
Note that when used in conjunction with ``S3FileSystem.put(batch_size=...)``
the result will be a maximum of ``max_concurrency * batch_size`` concurrent
transfers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
If given, the maximum number of concurrent transfers to use for a
multipart upload. Defaults to 1 (multipart uploads will be done sequentially).
Note that when used in conjunction with ``S3FileSystem.put(batch_size=...)``
the result will be a maximum of ``max_concurrency * batch_size`` concurrent
transfers.
The maximum number of concurrent transfers to use per file for a
multipart upload (``put()``) operations. Defaults to 1 (sequential).
When used in conjunction with ``S3FileSystem.put(batch_size=...)``
the maximum number of simultaneous connections is ``max_concurrency * batch_size``.
We may extend this parameter to affect ``pipe()``, ``cat()`` and ``get()``.

s3fs/core.py Outdated
):
max_concurrency = max_concurrency or self.max_concurrency
if max_concurrency is None or max_concurrency < 1:
max_concurrency = 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not max_concurrency=1 as the default in __init__?

@martindurant martindurant merged commit caf15c8 into fsspec:main Feb 27, 2024
21 checks passed
@pmrowla pmrowla deleted the put-file-concurrent branch February 28, 2024 01:34
ryan-williams added a commit to hudcostreets/nj-crashes that referenced this pull request Mar 17, 2024
fsspec/s3fs#848 added a `max_concurrency` kwarg, released in s3fs 2024.3,
which seems to break something in `dvc pull`: https://github.com/hudcostreets/nj-crashes/actions/runs/8316430240/job/22755877957#step:11:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants