Skip to content

Commit

Permalink
use parallel downloader for small files
Browse files Browse the repository at this point in the history
  • Loading branch information
mjurbanski-reef committed Apr 5, 2024
1 parent 294d056 commit 29696c4
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 23 deletions.
17 changes: 8 additions & 9 deletions b2sdk/_internal/transfer/inbound/downloader/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@


class ParallelDownloader(AbstractDownloader):
"""
Downloader using threads to download&write multiple parts of an object in parallel.
Each part is downloaded by its own thread, while all writes are done by additional dedicated thread.
This can increase performance even for a small file, as fetching & writing can be done in parallel.
"""
# situations to consider:
#
# local file start local file end
Expand Down Expand Up @@ -60,22 +66,15 @@ def __init__(self, min_part_size: int, max_streams: int | None = None, **kwargs)
self.max_streams = max_streams
self.min_part_size = min_part_size

def is_suitable(self, download_version: DownloadVersion, allow_seeking: bool):
if not super().is_suitable(download_version, allow_seeking):
return False
return self._get_number_of_streams(
download_version.content_length
) >= 2 and download_version.content_length >= 2 * self.min_part_size

def _get_number_of_streams(self, content_length):
def _get_number_of_streams(self, content_length: int) -> int:
num_streams = content_length // self.min_part_size
if self.max_streams is not None:
num_streams = min(num_streams, self.max_streams)
else:
max_threadpool_workers = getattr(self._thread_pool, '_max_workers', None)
if max_threadpool_workers is not None:
num_streams = min(num_streams, max_threadpool_workers)
return num_streams
return max(num_streams, 1)

def download(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use ParallelDownloader for small files instead of SimpleDownloader to avoid blocking on I/O.
22 changes: 8 additions & 14 deletions test/integration/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,14 @@ def test_large_file(self):
]
):

# let's check that small file downloads fail with these settings
zero = bucket.upload_bytes(b'0', 'a_single_zero')
with pytest.raises(ValueError) as exc_info:
with io.BytesIO() as io_:
bucket.download_file_by_name('a_single_zero').save(io_)
assert exc_info.value.args == ('no strategy suitable for download was found!',)
# let's check that small file downloads do not fail with these settings
small_file_version = bucket.upload_bytes(b'0', 'a_single_char')
with io.BytesIO() as io_:
bucket.download_file_by_name('a_single_char').save(io_)
assert io_.getvalue() == b'0'

f, sha1 = self._file_helper(bucket)
if zero._type() != 'large':
if small_file_version._type() != 'large':
# if we are here, that's not the production server!
assert f.download_version.content_sha1_verified # large files don't have sha1, lets not check

Expand Down Expand Up @@ -115,20 +114,15 @@ def test_gzip(self):
self.b2_api.download_file_by_id(file_id=file_version.id_).save_to(
str(downloaded_compressed_file)
)
with open(downloaded_compressed_file, 'rb') as dcf:
downloaded_data = dcf.read()
with open(source_file, 'rb') as sf:
source_data = sf.read()
assert downloaded_data == source_data
assert downloaded_compressed_file.read_bytes() == source_file.read_bytes()

decompressing_api, _ = authorize(
self.b2_auth_data, B2HttpApiConfig(decode_content=True)
)
decompressing_api.download_file_by_id(file_id=file_version.id_).save_to(
str(downloaded_uncompressed_file)
)
with open(downloaded_uncompressed_file, 'rb') as duf:
assert duf.read() == data_to_write
assert downloaded_uncompressed_file.read_bytes() == data_to_write


@pytest.fixture
Expand Down

0 comments on commit 29696c4

Please sign in to comment.