Skip to content

Commit

Permalink
poc
Browse files Browse the repository at this point in the history
  • Loading branch information
lukaszsocha2 committed Feb 28, 2023
1 parent ca79251 commit 7401cb1
Showing 1 changed file with 32 additions and 67 deletions.
99 changes: 32 additions & 67 deletions boxsdk/util/chunked_uploader.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import hashlib
import concurrent.futures
from time import sleep
from queue import Empty, Queue
from concurrent.futures import ThreadPoolExecutor, as_completed, Future
from threading import Lock
from typing import IO, TYPE_CHECKING, Optional
from typing import IO, TYPE_CHECKING, Optional, List

from boxsdk.exception import BoxException
from boxsdk.config import Client
Expand Down Expand Up @@ -36,11 +34,9 @@ def __init__(self, upload_session: 'UploadSession', content_stream: IO[bytes], f
self._part_definitions = {}
self._is_aborted = False
self._lock = Lock()
self._inflight_part = {}
self._exception = None
self._inflight_parts = {}
self._chunk_index = 0
self._upload_queue = None
self._thread_count = Client.CHUNK_UPLOAD_THREADS
self._executor = ThreadPoolExecutor(max_workers=Client.CHUNK_UPLOAD_THREADS)

def start(self) -> Optional['File']:
"""
Expand All @@ -52,11 +48,9 @@ def start(self) -> Optional['File']:
"""
if self._is_aborted:
raise BoxException('The upload has been previously aborted. Please retry upload with a new upload session.')
# Create a list of parts to upload
self._upload_queue = Queue()
for _ in range(self._upload_session.total_parts):
self._upload_queue.put(True)
self._upload()

futures = [self._executor.submit(self._upload_part) for _ in range(self._upload_session.total_parts)]
self._upload(futures)
return self._commit_and_erase_stream_reference_when_succeed()

def resume(self) -> Optional['File']:
Expand All @@ -70,19 +64,18 @@ def resume(self) -> Optional['File']:
"""
if self._is_aborted:
raise BoxException('The upload has been previously aborted. Please retry upload with a new upload session.')
self._upload_queue = Queue()

parts = self._upload_session.get_parts()
self._part_array = []
for part in parts:
self._part_array.append(part)
self._part_definitions[part['offset']] = part
for item in self._inflight_part:
self._upload_queue.put(self._inflight_part[item])

for _ in range(self._upload_session.total_parts - self._chunk_index - len(self._inflight_part)):
self._upload_queue.put(True)
self._inflight_part = {}
self._upload()
futures = [self._executor.submit(lambda item=part: self._upload_part(item)) for part in self._inflight_parts]
for _ in range(self._upload_session.total_parts - self._chunk_index - len(self._inflight_parts)):
futures.append(self._executor.submit(self._upload_part))

self._upload(futures)
return self._commit_and_erase_stream_reference_when_succeed()

def abort(self) -> bool:
Expand All @@ -94,61 +87,33 @@ def abort(self) -> bool:
"""
self._content_stream = None
self._part_array = []
self._inflight_part = None
self._inflight_parts = {}
self._is_aborted = True
return self._upload_session.abort()

def _upload(self) -> None:
def _upload(self, futures: List[Future]) -> None:
"""
Utility function for looping through all parts of the upload session and uploading them.
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=self._thread_count) as executor:
futures = []
try:
while True:
item = self._upload_queue.get(False)
futures.append(executor.submit(self._upload_part, item))
except Empty:
pass

for future in futures:
future.result()
self._upload_queue.task_done()

# Raise any exception that occurred during upload
if self._exception:
ex = self._exception
self._exception = None
raise ex
# Wait for all parts to be uploaded
self._upload_queue.join()
for future in as_completed(futures):
future.result()

self._part_array = sorted(self._part_array, key=lambda part: part['offset'])

def _upload_part(self, task):
next_part = None
# pylint:disable=broad-except
try:
# Exit if an exception has occurred in another thread.
if self._exception:
return
if isinstance(task, InflightPart):
next_part = task
else:
with self._lock:
next_part = self._get_next_part()
sleep(0.001)
self._sha1.update(next_part.chunk)
self._inflight_part[next_part.offset] = next_part
if not self._part_definitions.get(next_part.offset):
# Record that the part has been uploaded.
uploaded_part = next_part.upload()
self._part_array.append(uploaded_part)
self._part_definitions[next_part.offset] = uploaded_part
del self._inflight_part[next_part.offset]
except Empty:
return
except Exception as exc:
self._exception = exc
def _upload_part(self, task=None):
if isinstance(task, InflightPart):
next_part = task
else:
with self._lock:
next_part = self._get_next_part()
self._sha1.update(next_part.chunk)

self._inflight_parts[next_part.offset] = next_part
if not self._part_definitions.get(next_part.offset):
uploaded_part = next_part.upload()
self._part_array.append(uploaded_part)
self._part_definitions[next_part.offset] = uploaded_part
del self._inflight_parts[next_part.offset]

def _get_next_part(self) -> 'InflightPart':
"""
Expand Down

0 comments on commit 7401cb1

Please sign in to comment.