forked from s3tools/s3cmd
-
Notifications
You must be signed in to change notification settings - Fork 2
/
MultiPart.py
131 lines (111 loc) · 4.32 KB
/
MultiPart.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
## Amazon S3 Multipart upload support
## Author: Jerome Leclanche <jerome.leclanche@gmail.com>
## License: GPL Version 2
from Queue import Queue
from threading import Thread
from logging import debug, info, warning, error
from Utils import getTextFromXml
class Worker(Thread):
"""
Thread executing tasks from a given tasks queue
"""
def __init__(self, tasks):
super(Worker, self).__init__()
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
func(*args, **kargs)
self.tasks.task_done()
class ThreadPool(object):
"""
Pool of threads consuming tasks from a queue
"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads):
Worker(self.tasks)
def add_task(self, func, *args, **kargs):
"""
Add a task to the queue
"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""
Wait for completion of all the tasks in the queue
"""
self.tasks.join()
class MultiPartUpload(object):
MIN_CHUNK_SIZE = 5242880 # 5MB
MAX_CHUNK_SIZE = 5368709120 # 5GB
MAX_CHUNKS = 100
MAX_FILE_SIZE = 42949672960 # 5TB
def __init__(self, s3, file, uri):
self.s3 = s3
self.file = file
self.uri = uri
self.upload_id = None
self.parts = {}
def initiate_multipart_upload(self):
"""
Begin a multipart upload
http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadInitiate.html
"""
request = self.s3.create_request("OBJECT_POST", uri = self.uri, extra = "?uploads")
response = self.s3.send_request(request)
data = response["data"]
s3, key, upload_id = getTextFromXml(data, "Bucket"), getTextFromXml(data, "Key"), getTextFromXml(data, "UploadId")
self.upload_id = upload_id
return s3, key, upload_id
def upload_all_parts(self, num_threads, chunk_size):
"""
Execute a full multipart upload on a file
Returns the id/etag dict
TODO use num_processes to thread it
"""
if not self.upload_id:
raise RuntimeError("Attempting to use a multipart upload that has not been initiated.")
chunk_size = max(self.MIN_CHUNK_SIZE, chunk_size)
id = 1
pool = ThreadPool(num_threads)
while True:
if id == self.MAX_CHUNKS:
data = self.file.read(-1)
else:
data = self.file.read(chunk_size)
if not data:
break
pool.add_task(self.upload_part, data, id)
id += 1
debug("Thread pool with %i threads and %i tasks awaiting completion." % (num_threads, id))
pool.wait_completion()
def upload_part(self, data, id):
"""
Upload a file chunk
http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
"""
# TODO implement Content-MD5
content_length = str(len(data))
debug("Uploading part %i of %r (%s bytes)" % (id, self.upload_id, content_length))
headers = { "Content-Length": content_length }
query_string = "?partNumber=%i&uploadId=%s" % (id, self.upload_id)
request = self.s3.create_request("OBJECT_PUT", uri = self.uri, headers = headers, extra = query_string)
response = self.s3.send_request(request, body = data)
self.parts[id] = response["headers"]["etag"]
def complete_multipart_upload(self):
"""
Finish a multipart upload
http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadComplete.html
"""
parts_xml = []
part_xml = "<Part><PartNumber>%i</PartNumber><ETag>%s</ETag></Part>"
for id, etag in self.parts.items():
parts_xml.append(part_xml % (id, etag))
body = "<CompleteMultipartUpload>%s</CompleteMultipartUpload>" % ("".join(parts_xml))
headers = { "Content-Length": len(body) }
request = self.s3.create_request("OBJECT_POST", uri = self.uri, headers = headers, extra = "?uploadId=%s" % (self.upload_id))
response = self.s3.send_request(request, body = body)
return response
# vim:et:ts=4:sts=4:ai