Skip to content

Commit

Permalink
BREAKING CHANGE: implement S3 multipart upload with presigned URLs
Browse files Browse the repository at this point in the history
  • Loading branch information
paulmueller committed Apr 10, 2024
1 parent 3bb70e8 commit 2d87a1f
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 139 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
0.10.0
- BREAKING CHANGE: presigned URLs for uploading resources now work
differently. For small files (< 1 GiB), a single upload URL for
PUT is created. For larger files, a multipart upload is started.
0.9.1
- feat: add convenience method `get_s3_attributes_for_artifact`
0.9.0
Expand Down
184 changes: 128 additions & 56 deletions dcor_shared/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import pathlib
import time
from typing import List, Tuple
import warnings

import boto3
Expand Down Expand Up @@ -45,86 +46,157 @@ def compute_checksum(bucket_name, object_name, max_size=None):
return s3_sha256


def create_presigned_upload_url(bucket_name, object_name,
expiration=86400):
"""Create a presigned URL for uploading to S3
def create_presigned_upload_urls(
bucket_name: str,
object_name: str,
file_size: int,
expiration: int = 86400) -> Tuple[List, str]:
"""Create presigned URLs for uploading an object to S3
Parameters
----------
bucket_name: str
Name of the bucket
object_name: str
Name of the object
file_size: int
Size of the file in bytes; used to determine the number of
multipart upload parts with a hard-coded part size of
maximum 1 GiB. Reasons for choosing 1 GiB:
- Using the default value of 8 or 16 MiB leads to a lot parts
for a 20 GiB file
- The maximum allowed part size in S3 is 5 GiB
- Somebody on the internet said that Amazon is actually not
concatenating the parts into a new, larger file, but it keeps
the parts stored and just adds a layer in-between. This has
performance implications if you are using GET with ranges
to access partial data.
- I was originally thinking about 100 MiB, but having the number
of parts in the ETag as an estimate of the file size in GiB
seemed like a good idea.
expiration: int
Time in seconds for the presigned URL to remain valid
Returns
-------
psurl: str
Presigned upload URL as string (HTTPS://{DOMAIN}/{BUCKET})
fields: dict
Dictionary for `data` to use during upload:
- key: identical to `object_name`
- AWSAccessKeyId: S3 access key name for authentication
- policy: base64-encoded policy (expiration, conditions: bucket, key)
- signature: signature created by the server to verify the request
upload_urls: list
List of the presigned URLs required for the upload.
There will always be the key "urls" containing a list of
presigned URLs.
complete_url: str
If a multipart upload is necessary, this is the presigned URL
required to finalize the upload. For more information, see
`https://boto3.amazonaws.com/v1/documentation/api/latest/
reference/services/s3/client/complete_multipart_upload.html
#complete-multipart-upload`_ or the example below
Notes
-----
By default, due to the access design in DCOR, the S3 URL of the
uploaded resource `psurl/object_name` is private. If you would
like to make the resource publicly accessible, you can call
the method :func:`make_object_public`, which will add the `public=true`
tag to the object.
By default, due to the access design in DCOR, the S3 URLs of the
uploaded resource is private. If you would like to make the resource
publicly accessible, you can call the method :func:`make_object_public`,
which will add the `public=true` tag to the object.
Example
-------
To upload a file with :mod:`requests` and :mod:`requests_toolbelt`::
To upload a file with :mod:`requests`::
import pathlib
import requests
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
# input file
path_to_upload = pathlib.Path("/path/to/file")
bucket_name = "circle-19082eua8sdj82"
object_name = "resources/18/39/01923102983
# obtain upload URL and credentials
psurl, fields = create_presigned_upload_url(bucket_name, object_name)
# callback function for monitoring the upload progress
monitor_callback = lambda mon: print(f"Bytes: {mon.bytes_read}")
# open the input file for streaming
with path_to_upload.open("rb") as fd:
fields["file"] = (fields["key"], fd)
e = MultipartEncoder(fields=fields)
m = MultipartEncoderMonitor(e, monitor_callback)
# Increase the read size to speed-up upload (the default chunk
# size for uploads in urllib is 8k which results in a lot of
# Python code being involved in uploading a 20GB file; Setting
# the chunk size to 4MB should increase the upload speed):
# https://github.com/requests/toolbelt/issues/75
# #issuecomment-237189952
m._read = m.read
m.read = lambda size: m._read(4 * 1024 * 1024)
# perform the actual upload
hrep = requests.post(
psurl,
data=m,
headers={'Content-Type': m.content_type},
verify=True, # verify SSL connection
timeout=27.3, # timeout to avoid freezing
with path.open("rb") as fd:
if len(upload_urls) > 1:
# Multipart upload
# Determine the part size for multipart upload
num_parts = len(upload_urls)
file_size = path.stat().st_size
if file_size % num_parts == 0:
part_size = file_size // num_parts
else:
part_size = file_size // num_parts + 1
# Upload each part
etags = []
for psurl in upload_urls:
respi = requests.put(psurl,
data=fd.read(part_size),
timeout=3,
)
etag_part = respi.headers.get("ETag").strip("'").strip('"')
etags.append(etag_part)
# Finish the multipart upload
c_xml = "<CompleteMultipartUpload>\n"
for ii, etag in enumerate(etags):
c_xml += (f" <Part>\n"
+ f" <PartNumber>{ii + 1}</PartNumber>\n"
+ f" <ETag>{etag}</ETag>\n"
+ f" </Part>\n"
)
c_xml += "</CompleteMultipartUpload>"
resp = requests.post(
complete_url,
data=c_xml,
timeout=3,
)
if hrep.status_code != 204:
raise ValueError(
f"Upload failed with {hrep.status_code}: {hrep.reason}")
etag_full = resp.headers.get("ETag").strip("'").strip('"')
else:
# Single file upload
resp = requests.put(upload_urls[0],
data=fd,
timeout=3)
etag_full = resp.headers.get("ETag").strip("'").strip('"')
"""
require_bucket(bucket_name)
s3_client, _, _ = get_s3()
response = s3_client.generate_presigned_post(bucket_name,
object_name,
ExpiresIn=expiration)
# The response contains the presigned URL and required fields
return response["url"], response["fields"]
fields = s3_client.create_multipart_upload(Bucket=bucket_name,
Key=object_name)
upload_id = fields["UploadId"]

gb = 1024**3

if file_size % gb == 0:
num_parts = file_size // gb
else:
num_parts = file_size // gb + 1

upload_urls = []
if num_parts == 1:
# The file is uploaded with one PUT request. We don't need
# multipart upload.
psurl = s3_client.generate_presigned_url(
"put_object",
Params={'Bucket': bucket_name,
'Key': object_name,
},
ExpiresIn=expiration,
HttpMethod='PUT',
)
upload_urls.append(psurl)
complete_url = None
else:
# Set up multipart upload URLs
for ii in range(num_parts):
psurl = s3_client.generate_presigned_url(
"upload_part",
Params={'Bucket': bucket_name,
'Key': object_name,
'UploadId': upload_id,
'PartNumber': ii+1
},
ExpiresIn=expiration,
HttpMethod='PUT',
)
upload_urls.append(psurl)
# Set up the complete upload URL
complete_url = s3_client.generate_presigned_url(
"complete_multipart_upload",
Params={'Bucket': bucket_name,
'Key': object_name,
'UploadId': upload_id,
},
ExpiresIn=expiration,
HttpMethod='POST',
)

return upload_urls, complete_url


def create_presigned_url(
Expand Down
97 changes: 66 additions & 31 deletions dcor_shared/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
from io import BytesIO
import numbers
import pathlib
from typing import Dict
from typing import Dict, List
import uuid

import ckan.authz
import ckan.tests.factories as factories
import ckan.tests.helpers as helpers
from ckan.tests.pytest_ckan.fixtures import FakeFileStorage
import requests
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor

import pytest

Expand Down Expand Up @@ -251,36 +250,72 @@ def synchronous_enqueue_job(job_func, args=None, kwargs=None, title=None,
job_func(*args, **kwargs)


def upload_presigned_to_s3(psurl, fields, path_to_upload):
def upload_presigned_to_s3(
path: str | pathlib.Path,
upload_urls: List[str],
complete_url: str | None):
"""Helper function for uploading data to S3
This is exactly how DCOR-Aid would be uploading things (with the
requests_toolbelt package). This could have been a little simpler,
but for the sake of reproducibility, we do it the DCOR-Aid way.
This is how DCOR-Aid would be uploading things.
Parameters
----------
path: str or pathlib.Path
file to upload
upload_urls: list
List of the presigned URLs required for the upload.
There will always be the key "urls" containing a list of
presigned URLs.
complete_url: str
If a multipart upload is necessary, this is the presigned URL
required to finalize the upload. For more information, see
`https://boto3.amazonaws.com/v1/documentation/api/latest/
reference/services/s3/client/complete_multipart_upload.html
#complete-multipart-upload`_ or the example below
"""
# callback function for monitoring the upload progress
# open the input file for streaming
with path_to_upload.open("rb") as fd:
fields["file"] = (fields["key"], fd)
e = MultipartEncoder(fields=fields)
m = MultipartEncoderMonitor(
e, lambda monitor: print(f"Bytes: {monitor.bytes_read}"))
# Increase the read size to speed-up upload (the default chunk
# size for uploads in urllib is 8k which results in a lot of
# Python code being involved in uploading a 20GB file; Setting
# the chunk size to 4MB should increase the upload speed):
# https://github.com/requests/toolbelt/issues/75
# #issuecomment-237189952
m._read = m.read
m.read = lambda size: m._read(4 * 1024 * 1024)
# perform the actual upload
hrep = requests.post(
psurl,
data=m,
headers={'Content-Type': m.content_type},
verify=True, # verify SSL connection
timeout=27.3, # timeout to avoid freezing
)
if hrep.status_code != 204:
path = pathlib.Path(path)
with path.open("rb") as fd:
if len(upload_urls) > 1:
# Multipart upload
# Determine the part size for multipart upload
num_parts = len(upload_urls)
file_size = path.stat().st_size
if file_size % num_parts == 0:
part_size = file_size // num_parts
else:
part_size = file_size // num_parts + 1
# Upload each part
etags = []
for psurl in upload_urls:
respi = requests.put(psurl,
data=fd.read(part_size),
timeout=3,
)
etag_part = respi.headers.get("ETag", "").strip("'").strip('"')
etags.append(etag_part)
# Finish the multipart upload
c_xml = "<CompleteMultipartUpload>\n"
for ii, etag in enumerate(etags):
c_xml += (f" <Part>\n"
+ f" <PartNumber>{ii + 1}</PartNumber>\n"
+ f" <ETag>{etag}</ETag>\n"
+ f" </Part>\n"
)
c_xml += "</CompleteMultipartUpload>"
resp = requests.post(
complete_url,
data=c_xml,
timeout=3,
)
etag_full = resp.headers.get("ETag", "").strip("'").strip('"')
else:
# Single file upload
resp = requests.put(upload_urls[0],
data=fd,
timeout=3)
etag_full = resp.headers.get("ETag", "").strip("'").strip('"')
if not etag_full:
raise ValueError(
f"Upload failed with {hrep.status_code}: {hrep.reason}")
f"Upload failed with {resp.status_code}: {resp.reason} "
f"({resp.headers})")
return etag_full

0 comments on commit 2d87a1f

Please sign in to comment.