Skip to content

Commit

Permalink
put_file: support concurrent multipart uploads with max_concurrency (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
pmrowla committed Feb 27, 2024
1 parent 74f4d95 commit caf15c8
Showing 1 changed file with 76 additions and 19 deletions.
95 changes: 76 additions & 19 deletions s3fs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ class S3FileSystem(AsyncFileSystem):
session : aiobotocore AioSession object to be used for all connections.
This session will be used inplace of creating a new session inside S3FileSystem.
For example: aiobotocore.session.AioSession(profile='test_user')
max_concurrency : int (1)
The maximum number of concurrent transfers to use per file for multipart
upload (``put()``) operations. Defaults to 1 (sequential). When used in
conjunction with ``S3FileSystem.put(batch_size=...)`` the maximum number of
simultaneous connections is ``max_concurrency * batch_size``. We may extend
this parameter to affect ``pipe()``, ``cat()`` and ``get()``. Increasing this
value will result in higher memory usage during multipart upload operations (by
``max_concurrency * chunksize`` bytes per file).
The following parameters are passed on to fsspec:
Expand Down Expand Up @@ -282,6 +290,7 @@ def __init__(
cache_regions=False,
asynchronous=False,
loop=None,
max_concurrency=1,
**kwargs,
):
if key and username:
Expand Down Expand Up @@ -319,6 +328,9 @@ def __init__(
self.cache_regions = cache_regions
self._s3 = None
self.session = session
if max_concurrency < 1:
raise ValueError("max_concurrency must be >= 1")
self.max_concurrency = max_concurrency

@property
def s3(self):
Expand Down Expand Up @@ -1140,7 +1152,13 @@ async def _pipe_file(self, path, data, chunksize=50 * 2**20, **kwargs):
self.invalidate_cache(path)

async def _put_file(
self, lpath, rpath, callback=_DEFAULT_CALLBACK, chunksize=50 * 2**20, **kwargs
self,
lpath,
rpath,
callback=_DEFAULT_CALLBACK,
chunksize=50 * 2**20,
max_concurrency=None,
**kwargs,
):
bucket, key, _ = self.split_path(rpath)
if os.path.isdir(lpath):
Expand Down Expand Up @@ -1169,24 +1187,15 @@ async def _put_file(
mpu = await self._call_s3(
"create_multipart_upload", Bucket=bucket, Key=key, **kwargs
)

out = []
while True:
chunk = f0.read(chunksize)
if not chunk:
break
out.append(
await self._call_s3(
"upload_part",
Bucket=bucket,
PartNumber=len(out) + 1,
UploadId=mpu["UploadId"],
Body=chunk,
Key=key,
)
)
callback.relative_update(len(chunk))

out = await self._upload_file_part_concurrent(
bucket,
key,
mpu,
f0,
callback=callback,
chunksize=chunksize,
max_concurrency=max_concurrency,
)
parts = [
{"PartNumber": i + 1, "ETag": o["ETag"]} for i, o in enumerate(out)
]
Expand All @@ -1201,6 +1210,54 @@ async def _put_file(
self.invalidate_cache(rpath)
rpath = self._parent(rpath)

async def _upload_file_part_concurrent(
self,
bucket,
key,
mpu,
f0,
callback=_DEFAULT_CALLBACK,
chunksize=50 * 2**20,
max_concurrency=None,
):
max_concurrency = max_concurrency or self.max_concurrency
if max_concurrency < 1:
raise ValueError("max_concurrency must be >= 1")

async def _upload_chunk(chunk, part_number):
result = await self._call_s3(
"upload_part",
Bucket=bucket,
PartNumber=part_number,
UploadId=mpu["UploadId"],
Body=chunk,
Key=key,
)
callback.relative_update(len(chunk))
return result

out = []
while True:
chunks = []
for i in range(max_concurrency):
chunk = f0.read(chunksize)
if chunk:
chunks.append(chunk)
if not chunks:
break
if len(chunks) > 1:
out.extend(
await asyncio.gather(
*[
_upload_chunk(chunk, len(out) + i)
for i, chunk in enumerate(chunks, 1)
]
)
)
else:
out.append(await _upload_chunk(chunk, len(out) + 1))
return out

async def _get_file(
self, rpath, lpath, callback=_DEFAULT_CALLBACK, version_id=None
):
Expand Down

0 comments on commit caf15c8

Please sign in to comment.