Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions azure-storage-blob/ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

> See [BreakingChanges](BreakingChanges.md) for a detailed list of API breaks.

## Version XX.XX.XX:
- Fixed design flaw where get_blob_to_* methods buffer entire blob when max_connections is set to 1.

## Version 1.3.0:

- Support for 2018-03-28 REST version. Please see our REST API documentation and blog for information about the related added features.
Expand Down
93 changes: 72 additions & 21 deletions azure-storage-blob/azure/storage/blob/_download_chunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@
# --------------------------------------------------------------------------
import threading

from azure.storage.common._error import _ERROR_NO_SINGLE_THREAD_CHUNKING


def _download_blob_chunks(blob_service, container_name, blob_name, snapshot,
download_size, block_size, progress, start_range, end_range,
stream, max_connections, progress_callback, validate_content,
lease_id, if_modified_since, if_unmodified_since, if_match,
if_none_match, timeout, operation_context):
if max_connections <= 1:
raise ValueError(_ERROR_NO_SINGLE_THREAD_CHUNKING.format('blob'))

downloader = _BlobChunkDownloader(
downloader_class = _ParallelBlobChunkDownloader if max_connections > 1 else _SequentialBlobChunkDownloader

downloader = downloader_class(
blob_service,
container_name,
blob_name,
Expand All @@ -38,35 +36,42 @@ def _download_blob_chunks(blob_service, container_name, blob_name, snapshot,
operation_context,
)

import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_connections)
result = list(executor.map(downloader.process_chunk, downloader.get_chunk_offsets()))
if max_connections > 1:
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_connections)
list(executor.map(downloader.process_chunk, downloader.get_chunk_offsets()))
else:
for chunk in downloader.get_chunk_offsets():
downloader.process_chunk(chunk)


class _BlobChunkDownloader(object):
def __init__(self, blob_service, container_name, blob_name, snapshot, download_size,
chunk_size, progress, start_range, end_range, stream,
progress_callback, validate_content, lease_id, if_modified_since,
if_unmodified_since, if_match, if_none_match, timeout, operation_context):
# identifiers for the blob
self.blob_service = blob_service
self.container_name = container_name
self.blob_name = blob_name
self.snapshot = snapshot
self.chunk_size = chunk_size

# information on the download range/chunk size
self.chunk_size = chunk_size
self.download_size = download_size
self.start_index = start_range
self.blob_end = end_range

# the destination that we will write to
self.stream = stream
self.stream_start = stream.tell()
self.stream_lock = threading.Lock()

# progress related
self.progress_callback = progress_callback
self.progress_total = progress
self.progress_lock = threading.Lock()

# parameters for each get blob operation
self.timeout = timeout
self.operation_context = operation_context

self.validate_content = validate_content
self.lease_id = lease_id
self.if_modified_since = if_modified_since
Expand All @@ -92,17 +97,13 @@ def process_chunk(self, chunk_start):
self._write_to_stream(chunk_data, chunk_start)
self._update_progress(length)

# should be provided by the subclass
def _update_progress(self, length):
if self.progress_callback is not None:
with self.progress_lock:
self.progress_total += length
total = self.progress_total
self.progress_callback(total, self.download_size)
pass

# should be provided by the subclass
def _write_to_stream(self, chunk_data, chunk_start):
with self.stream_lock:
self.stream.seek(self.stream_start + (chunk_start - self.start_index))
self.stream.write(chunk_data)
pass

def _download_chunk(self, chunk_start, chunk_end):
response = self.blob_service._get_blob(
Expand All @@ -125,3 +126,53 @@ def _download_chunk(self, chunk_start, chunk_end):
# that subsequent downloads are to an unmodified blob
self.if_match = response.properties.etag
return response


class _ParallelBlobChunkDownloader(_BlobChunkDownloader):
def __init__(self, blob_service, container_name, blob_name, snapshot, download_size,
chunk_size, progress, start_range, end_range, stream,
progress_callback, validate_content, lease_id, if_modified_since,
if_unmodified_since, if_match, if_none_match, timeout, operation_context):

super(_ParallelBlobChunkDownloader, self).__init__(blob_service, container_name, blob_name, snapshot,
download_size,
chunk_size, progress, start_range, end_range, stream,
progress_callback, validate_content, lease_id,
if_modified_since,
if_unmodified_since, if_match, if_none_match, timeout,
operation_context)

# for a parallel download, the stream is always seekable, so we note down the current position
# in order to seek to the right place when out-of-order chunks come in
self.stream_start = stream.tell()

# since parallel operations are going on
# it is essential to protect the writing and progress reporting operations
self.stream_lock = threading.Lock()
self.progress_lock = threading.Lock()

def _update_progress(self, length):
if self.progress_callback is not None:
with self.progress_lock:
self.progress_total += length
total_so_far = self.progress_total
self.progress_callback(total_so_far, self.download_size)

def _write_to_stream(self, chunk_data, chunk_start):
with self.stream_lock:
self.stream.seek(self.stream_start + (chunk_start - self.start_index))
self.stream.write(chunk_data)


class _SequentialBlobChunkDownloader(_BlobChunkDownloader):
def __init__(self, *args):
super(_SequentialBlobChunkDownloader, self).__init__(*args)

def _update_progress(self, length):
if self.progress_callback is not None:
self.progress_total += length
self.progress_callback(self.progress_total, self.download_size)

def _write_to_stream(self, chunk_data, chunk_start):
# chunk_start is ignored in the case of sequential download since we cannot seek the destination stream
self.stream.write(chunk_data)
124 changes: 55 additions & 69 deletions azure-storage-blob/azure/storage/blob/baseblobservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -1972,50 +1972,63 @@ def get_blob_to_stream(
if end_range is not None:
_validate_not_none("start_range", start_range)

# If the user explicitly sets max_connections to 1, do a single shot download
if max_connections == 1:
# the stream must be seekable if parallel download is required
if max_connections > 1:
if sys.version_info >= (3,) and not stream.seekable():
raise ValueError(_ERROR_PARALLEL_NOT_SEEKABLE)
else:
try:
stream.seek(stream.tell())
except (NotImplementedError, AttributeError):
raise ValueError(_ERROR_PARALLEL_NOT_SEEKABLE)

# The service only provides transactional MD5s for chunks under 4MB.
# If validate_content is on, get only self.MAX_CHUNK_GET_SIZE for the first
# chunk so a transactional MD5 can be retrieved.
first_get_size = self.MAX_SINGLE_GET_SIZE if not validate_content else self.MAX_CHUNK_GET_SIZE

initial_request_start = start_range if start_range is not None else 0

if end_range is not None and end_range - start_range < first_get_size:
initial_request_end = end_range
else:
initial_request_end = initial_request_start + first_get_size - 1

# Send a context object to make sure we always retry to the initial location
operation_context = _OperationContext(location_lock=True)
try:
blob = self._get_blob(container_name,
blob_name,
snapshot,
start_range=start_range,
end_range=end_range,
start_range=initial_request_start,
end_range=initial_request_end,
validate_content=validate_content,
lease_id=lease_id,
if_modified_since=if_modified_since,
if_unmodified_since=if_unmodified_since,
if_match=if_match,
if_none_match=if_none_match,
timeout=timeout)
timeout=timeout,
_context=operation_context)

# Set the download size
download_size = blob.properties.content_length

# If max_connections is greater than 1, do the first get to establish the
# size of the blob and get the first segment of data
else:
if sys.version_info >= (3,) and not stream.seekable():
raise ValueError(_ERROR_PARALLEL_NOT_SEEKABLE)

# The service only provides transactional MD5s for chunks under 4MB.
# If validate_content is on, get only self.MAX_CHUNK_GET_SIZE for the first
# chunk so a transactional MD5 can be retrieved.
first_get_size = self.MAX_SINGLE_GET_SIZE if not validate_content else self.MAX_CHUNK_GET_SIZE

initial_request_start = start_range if start_range is not None else 0

if end_range is not None and end_range - start_range < first_get_size:
initial_request_end = end_range
# Parse the total blob size and adjust the download size if ranges
# were specified
blob_size = _parse_length_from_content_range(blob.properties.content_range)
if end_range is not None:
# Use the end_range unless it is over the end of the blob
download_size = min(blob_size, end_range - start_range + 1)
elif start_range is not None:
download_size = blob_size - start_range
else:
initial_request_end = initial_request_start + first_get_size - 1

# Send a context object to make sure we always retry to the initial location
operation_context = _OperationContext(location_lock=True)
try:
download_size = blob_size
except AzureHttpError as ex:
if start_range is None and ex.status_code == 416:
# Get range will fail on an empty blob. If the user did not
# request a range, do a regular get request in order to get
# any properties.
blob = self._get_blob(container_name,
blob_name,
snapshot,
start_range=initial_request_start,
end_range=initial_request_end,
validate_content=validate_content,
lease_id=lease_id,
if_modified_since=if_modified_since,
Expand All @@ -2025,51 +2038,24 @@ def get_blob_to_stream(
timeout=timeout,
_context=operation_context)

# Parse the total blob size and adjust the download size if ranges
# were specified
blob_size = _parse_length_from_content_range(blob.properties.content_range)
if end_range is not None:
# Use the end_range unless it is over the end of the blob
download_size = min(blob_size, end_range - start_range + 1)
elif start_range is not None:
download_size = blob_size - start_range
else:
download_size = blob_size
except AzureHttpError as ex:
if start_range is None and ex.status_code == 416:
# Get range will fail on an empty blob. If the user did not
# request a range, do a regular get request in order to get
# any properties.
blob = self._get_blob(container_name,
blob_name,
snapshot,
validate_content=validate_content,
lease_id=lease_id,
if_modified_since=if_modified_since,
if_unmodified_since=if_unmodified_since,
if_match=if_match,
if_none_match=if_none_match,
timeout=timeout,
_context=operation_context)

# Set the download size to empty
download_size = 0
else:
raise ex

# Mark the first progress chunk. If the blob is small or this is a single
# Set the download size to empty
download_size = 0
else:
raise ex

# Mark the first progress chunk. If the blob is small or this is a single
# shot download, this is the only call
if progress_callback:
progress_callback(blob.properties.content_length, download_size)

# Write the content to the user stream
# Clear blob content since output has been written to user stream
# Write the content to the user stream
# Clear blob content since output has been written to user stream
if blob.content is not None:
stream.write(blob.content)
blob.content = None

# If the blob is small or single shot download was used, the download is
# complete at this point. If blob size is large, use parallel download.
# If the blob is small, the download is complete at this point.
# If blob size is large, download the rest of the blob in chunks.
if blob.properties.content_length != download_size:
# Lock on the etag. This can be overriden by the user by specifying '*'
if_match = if_match if if_match is not None else blob.properties.etag
Expand Down Expand Up @@ -2102,14 +2088,14 @@ def get_blob_to_stream(
operation_context
)

# Set the content length to the download size instead of the size of
# Set the content length to the download size instead of the size of
# the last range
blob.properties.content_length = download_size

# Overwrite the content range to the user requested range
blob.properties.content_range = 'bytes {0}-{1}/{2}'.format(start_range, end_range, blob_size)

# Overwrite the content MD5 as it is the MD5 for the last range instead
# Overwrite the content MD5 as it is the MD5 for the last range instead
# of the stored MD5
# TODO: Set to the stored MD5 when the service returns this
blob.properties.content_md5 = None
Expand Down
4 changes: 0 additions & 4 deletions azure-storage-common/azure/storage/common/_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ def _to_str(value):
_ERROR_VALUE_NONE = '{0} should not be None.'
_ERROR_VALUE_NONE_OR_EMPTY = '{0} should not be None or empty.'
_ERROR_VALUE_NEGATIVE = '{0} should not be negative.'
_ERROR_NO_SINGLE_THREAD_CHUNKING = \
'To use {0} chunk downloader more than 1 thread must be ' + \
'used since get_{0}_to_bytes should be called for single threaded ' + \
'{0} downloads.'
_ERROR_START_END_NEEDED_FOR_MD5 = \
'Both end_range and start_range need to be specified ' + \
'for getting content MD5.'
Expand Down
3 changes: 3 additions & 0 deletions azure-storage-file/ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

> See [BreakingChanges](BreakingChanges.md) for a detailed list of API breaks.

## Version XX.XX.XX:
- Fixed design flaw where get_file_to_* methods buffer entire file when max_connections is set to 1.

## Version 1.3.0:

- Support for 2018-03-28 REST version. Please see our REST API documentation and blog for information about the related added features.
Expand Down
Loading