-
Notifications
You must be signed in to change notification settings - Fork 0
/
aws-multipart-upload.py
122 lines (104 loc) · 3.39 KB
/
aws-multipart-upload.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from pathlib import Path
import logging
import argparse
from boto3 import Session
import requests
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class S3MultipartUploadUtil:
"""
AWS S3 Multipart Upload Uril
"""
def __init__(self, session: Session):
self.session = session
self.s3 = session.client('s3')
self.upload_id = None
self.bucket_name = None
self.key = None
def start(self, bucket_name: str, key: str):
"""
Start Multipart Upload
:param bucket_name:
:param key:
:return:
"""
self.bucket_name = bucket_name
self.key = key
res = self.s3.create_multipart_upload(Bucket=bucket_name, Key=key)
self.upload_id = res['UploadId']
logger.debug(f"Start multipart upload '{self.upload_id}'")
def create_presigned_url(self, part_no: int, expire: int=3600) -> str:
"""
Create pre-signed URL for upload part.
:param part_no:
:param expire:
:return:
"""
signed_url = self.s3.generate_presigned_url(
ClientMethod='upload_part',
Params={'Bucket': self.bucket_name,
'Key': self.key,
'UploadId': self.upload_id,
'PartNumber': part_no},
ExpiresIn=expire)
logger.debug(f"Create presigned url for upload part '{signed_url}'")
return signed_url
def complete(self, parts):
"""
Complete Multipart Uploading.
`parts` is list of dictionary below.
```
[ {'ETag': etag, 'PartNumber': 1}, {'ETag': etag, 'PartNumber': 2}, ... ]
```
you can get `ETag` from upload part response header.
:param parts: Sent part info.
:return:
"""
res = self.s3.complete_multipart_upload(
Bucket=self.bucket_name,
Key=self.key,
MultipartUpload={
'Parts': parts
},
UploadId=self.upload_id
)
logger.debug(f"Complete multipart upload '{self.upload_id}'")
logger.debug(res)
self.upload_id = None
self.bucket_name = None
self.key = None
def main():
parser = argparse.ArgumentParser()
parser.add_argument('target_file')
parser.add_argument('--bucket', required=True)
args = parser.parse_args()
target_file = Path(args.target_file)
bucket_name = args.bucket
key = target_file.name
max_size = 5 * 1024 * 1024
file_size = target_file.stat().st_size
upload_by = int(file_size / max_size) + 1
session = Session()
s3util = S3MultipartUploadUtil(session)
s3util.start(bucket_name, key)
urls = []
for part in range(1, upload_by + 1):
signed_url = s3util.create_presigned_url(part)
urls.append(signed_url)
parts = []
with target_file.open('rb') as fin:
for num, url in enumerate(urls):
part = num + 1
file_data = fin.read(max_size)
print(f"upload part {part} size={len(file_data)}")
res = requests.put(url, data=file_data)
print(res)
if res.status_code != 200:
return
etag = res.headers['ETag']
parts.append({'ETag': etag, 'PartNumber': part})
print(parts)
s3util.complete(parts)
if __name__ == '__main__':
main()