-
Notifications
You must be signed in to change notification settings - Fork 214
/
chunked_uploader.py
173 lines (151 loc) · 6.77 KB
/
chunked_uploader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import hashlib
from typing import TYPE_CHECKING, IO, Optional
from boxsdk.exception import BoxException
if TYPE_CHECKING:
from boxsdk.object.upload_session import UploadSession
from boxsdk.object.file import File
class ChunkedUploader:
def __init__(self, upload_session: 'UploadSession', content_stream: IO, file_size: int):
"""
The initializer for the :class:`ChunkedUploader`
:param upload_session:
The upload session for doing the chunked uploader.
:param content_stream:
The file-like object to upload.
:param file_size:
The total size of the file for the chunked upload.
:returns:
An intialized`ChunkedUploader` object.
"""
self._upload_session = upload_session
self._content_stream = content_stream
self._file_size = file_size
self._part_array = []
self._sha1 = hashlib.sha1()
self._part_definitions = {}
self._inflight_part = None
self._is_aborted = False
def start(self) -> Optional['File']:
"""
Starts the process of chunk uploading a file. Should return file. If commit was not processed will return None.
You can call ChunkedUploader.resume to retry committing upload.
:returns:
An uploaded :class:`File` or None if session was not processed
"""
if self._is_aborted:
raise BoxException('The upload has been previously aborted. Please retry upload with a new upload session.')
self._upload()
content_sha1 = self._sha1.digest()
return self._upload_session.commit(content_sha1=content_sha1, parts=self._part_array)
def resume(self) -> Optional['File']:
"""
Resumes the process of chunk uploading a file from where upload failed.
Should return file. If commit was not processed will return None.
You can call ChunkedUploader.resume to retry committing upload.
:returns:
An uploaded :class:`File` or None if session was not processed
"""
if self._is_aborted:
raise BoxException('The upload has been previously aborted. Please retry upload with a new upload session.')
parts = self._upload_session.get_parts()
self._part_array = []
# Construct a part array that is the first consecutive run of uploaded parts up to an inflight part so resume
# has a previous state to start from for in process uploads and cross process uploads.
# Construct a part definition to be used later to determine if a part has been uploaded by offset.
for part in parts:
if self._inflight_part and part['offset'] <= self._inflight_part.offset:
self._part_array.append(part)
if self._inflight_part and part['offset'] == self._inflight_part.offset:
self._inflight_part = None
self._part_definitions[part['offset']] = part
self._upload()
content_sha1 = self._sha1.digest()
return self._upload_session.commit(content_sha1=content_sha1, parts=self._part_array)
def abort(self) -> bool:
"""
Abort an upload session, cancelling the upload and removing any parts that have already been uploaded.
:returns:
A boolean indication success of the upload abort.
"""
self._content_stream = None
self._part_array = []
self._inflight_part = None
self._is_aborted = True
return self._upload_session.abort()
def _upload(self) -> None:
"""
Utility function for looping through all parts of of the upload session and uploading them.
"""
while len(self._part_array) < self._upload_session.total_parts:
# Retrieve the part inflight if it exists, if it does not exist then get the next part from the stream.
next_part = self._inflight_part or self._get_next_part()
# Set the retrieve part to the current part inflight.
self._inflight_part = next_part
self._sha1.update(next_part.chunk)
# Retrieve the uploaded part if the part has already been uploaded. If not upload the current part.
uploaded_part = self._part_definitions.get(next_part.offset) or next_part.upload()
self._inflight_part = None
# Record that the part has been uploaded.
self._part_array.append(uploaded_part)
self._part_definitions[next_part.offset] = uploaded_part
def _get_next_part(self) -> 'InflightPart':
"""
Retrieves the next :class:`InflightPart` that needs to be uploaded
:returns:
The :class:`InflightPart` object to be uploaded next.
"""
copied_length = 0
chunk = b''
offset = len(self._part_array) * self._upload_session.part_size
while copied_length < self._upload_session.part_size:
bytes_read = self._content_stream.read(self._upload_session.part_size - copied_length)
if bytes_read is None:
# stream returns none when no bytes are ready currently but there are
# potentially more bytes in the stream to be read.
continue
if not bytes_read:
# stream is exhausted.
break
chunk += bytes_read
copied_length += len(bytes_read)
return InflightPart(offset, chunk, self._upload_session, self._file_size)
class InflightPart:
def __init__(self, offset: int, chunk: bytes, upload_session: 'UploadSession', total_size: int):
"""
The initializer for the :class:`InflightPart` object.
:param offset:
The offset for the :class:`InflightPart` that represents the position of the part to be uploaded
:param chunk:
The chunk in bytes to be uploaded.
:param upload_session:
The :class:`UploadSession` for the :class:`InflightPart`.
:param total_size:
The total size of the file to be chunked uploaded.
"""
self._offset = offset
self._chunk = chunk
self._upload_session = upload_session
self._total_size = total_size
@property
def offset(self) -> int:
"""
Getter for the offset of the :class:`InflightPart`
"""
return self._offset
@property
def chunk(self) -> bytes:
"""
Getter for the chunk of the :class:`InflightPart`
"""
return self._chunk
def upload(self) -> dict:
"""
Upload method for the :class:`InflightPart`
:returns:
The uploaded part record.
"""
return self._upload_session.upload_part_bytes(
part_bytes=self.chunk,
offset=self.offset,
total_size=self._total_size
)