Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
3 changes: 3 additions & 0 deletions .github/ISSUE_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
### Which service(blob, file, queue) does this issue concern?


### Which version of the SDK was used? Please provide the output of `pip freeze`.


### What problem was encountered?


Expand Down
4 changes: 4 additions & 0 deletions azure-storage-blob/ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

> See [BreakingChanges](BreakingChanges.md) for a detailed list of API breaks.

## Version 1.3.1:
- Fixed design flaw where get_blob_to_* methods buffer entire blob when max_connections is set to 1.
- Added support for access conditions on append_blob_from_* methods.

## Version 1.3.0:

- Support for 2018-03-28 REST version. Please see our REST API documentation and blog for information about the related added features.
Expand Down
2 changes: 1 addition & 1 deletion azure-storage-blob/azure/storage/blob/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# --------------------------------------------------------------------------

__author__ = 'Microsoft Corp. <ptvshelp@microsoft.com>'
__version__ = '1.3.0'
__version__ = '1.3.1'

# x-ms-version for storage service.
X_MS_VERSION = '2018-03-28'
Expand Down
93 changes: 72 additions & 21 deletions azure-storage-blob/azure/storage/blob/_download_chunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@
# --------------------------------------------------------------------------
import threading

from azure.storage.common._error import _ERROR_NO_SINGLE_THREAD_CHUNKING


def _download_blob_chunks(blob_service, container_name, blob_name, snapshot,
download_size, block_size, progress, start_range, end_range,
stream, max_connections, progress_callback, validate_content,
lease_id, if_modified_since, if_unmodified_since, if_match,
if_none_match, timeout, operation_context):
if max_connections <= 1:
raise ValueError(_ERROR_NO_SINGLE_THREAD_CHUNKING.format('blob'))

downloader = _BlobChunkDownloader(
downloader_class = _ParallelBlobChunkDownloader if max_connections > 1 else _SequentialBlobChunkDownloader

downloader = downloader_class(
blob_service,
container_name,
blob_name,
Expand All @@ -38,35 +36,42 @@ def _download_blob_chunks(blob_service, container_name, blob_name, snapshot,
operation_context,
)

import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_connections)
result = list(executor.map(downloader.process_chunk, downloader.get_chunk_offsets()))
if max_connections > 1:
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_connections)
list(executor.map(downloader.process_chunk, downloader.get_chunk_offsets()))
else:
for chunk in downloader.get_chunk_offsets():
downloader.process_chunk(chunk)


class _BlobChunkDownloader(object):
def __init__(self, blob_service, container_name, blob_name, snapshot, download_size,
chunk_size, progress, start_range, end_range, stream,
progress_callback, validate_content, lease_id, if_modified_since,
if_unmodified_since, if_match, if_none_match, timeout, operation_context):
# identifiers for the blob
self.blob_service = blob_service
self.container_name = container_name
self.blob_name = blob_name
self.snapshot = snapshot
self.chunk_size = chunk_size

# information on the download range/chunk size
self.chunk_size = chunk_size
self.download_size = download_size
self.start_index = start_range
self.blob_end = end_range

# the destination that we will write to
self.stream = stream
self.stream_start = stream.tell()
self.stream_lock = threading.Lock()

# progress related
self.progress_callback = progress_callback
self.progress_total = progress
self.progress_lock = threading.Lock()

# parameters for each get blob operation
self.timeout = timeout
self.operation_context = operation_context

self.validate_content = validate_content
self.lease_id = lease_id
self.if_modified_since = if_modified_since
Expand All @@ -92,17 +97,13 @@ def process_chunk(self, chunk_start):
self._write_to_stream(chunk_data, chunk_start)
self._update_progress(length)

# should be provided by the subclass
def _update_progress(self, length):
if self.progress_callback is not None:
with self.progress_lock:
self.progress_total += length
total = self.progress_total
self.progress_callback(total, self.download_size)
pass

# should be provided by the subclass
def _write_to_stream(self, chunk_data, chunk_start):
with self.stream_lock:
self.stream.seek(self.stream_start + (chunk_start - self.start_index))
self.stream.write(chunk_data)
pass

def _download_chunk(self, chunk_start, chunk_end):
response = self.blob_service._get_blob(
Expand All @@ -125,3 +126,53 @@ def _download_chunk(self, chunk_start, chunk_end):
# that subsequent downloads are to an unmodified blob
self.if_match = response.properties.etag
return response


class _ParallelBlobChunkDownloader(_BlobChunkDownloader):
def __init__(self, blob_service, container_name, blob_name, snapshot, download_size,
chunk_size, progress, start_range, end_range, stream,
progress_callback, validate_content, lease_id, if_modified_since,
if_unmodified_since, if_match, if_none_match, timeout, operation_context):

super(_ParallelBlobChunkDownloader, self).__init__(blob_service, container_name, blob_name, snapshot,
download_size,
chunk_size, progress, start_range, end_range, stream,
progress_callback, validate_content, lease_id,
if_modified_since,
if_unmodified_since, if_match, if_none_match, timeout,
operation_context)

# for a parallel download, the stream is always seekable, so we note down the current position
# in order to seek to the right place when out-of-order chunks come in
self.stream_start = stream.tell()

# since parallel operations are going on
# it is essential to protect the writing and progress reporting operations
self.stream_lock = threading.Lock()
self.progress_lock = threading.Lock()

def _update_progress(self, length):
if self.progress_callback is not None:
with self.progress_lock:
self.progress_total += length
total_so_far = self.progress_total
self.progress_callback(total_so_far, self.download_size)

def _write_to_stream(self, chunk_data, chunk_start):
with self.stream_lock:
self.stream.seek(self.stream_start + (chunk_start - self.start_index))
self.stream.write(chunk_data)


class _SequentialBlobChunkDownloader(_BlobChunkDownloader):
def __init__(self, *args):
super(_SequentialBlobChunkDownloader, self).__init__(*args)

def _update_progress(self, length):
if self.progress_callback is not None:
self.progress_total += length
self.progress_callback(self.progress_total, self.download_size)

def _write_to_stream(self, chunk_data, chunk_start):
# chunk_start is ignored in the case of sequential download since we cannot seek the destination stream
self.stream.write(chunk_data)
27 changes: 19 additions & 8 deletions azure-storage-blob/azure/storage/blob/_upload_chunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,31 @@
# license information.
# --------------------------------------------------------------------------
from io import (BytesIO, IOBase, SEEK_CUR, SEEK_END, SEEK_SET, UnsupportedOperation)
from math import ceil
from threading import Lock

from math import ceil

from azure.storage.common._common_conversion import _encode_base64
from azure.storage.common._error import _ERROR_VALUE_SHOULD_BE_SEEKABLE_STREAM
from azure.storage.common._serialization import (
url_quote,
_get_data_bytes_only,
_len_plus
)
from ._constants import (
_LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE
)
from ._encryption import (
_get_blob_encryptor_and_padder,
)
from .models import BlobBlock
from ._constants import (
_LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE
)


def _upload_blob_chunks(blob_service, container_name, blob_name,
blob_size, block_size, stream, max_connections,
progress_callback, validate_content, lease_id, uploader_class,
maxsize_condition=None, if_match=None, timeout=None,
maxsize_condition=None, if_modified_since=None, if_unmodified_since=None, if_match=None,
if_none_match=None, timeout=None,
content_encryption_key=None, initialization_vector=None, resource_properties=None):
encryptor, padder = _get_blob_encryptor_and_padder(content_encryption_key, initialization_vector,
uploader_class is not _PageBlobChunkUploader)
Expand All @@ -49,9 +51,14 @@ def _upload_blob_chunks(blob_service, container_name, blob_name,

uploader.maxsize_condition = maxsize_condition

# ETag matching does not work with parallelism as a ranged upload may start
# before the previous finishes and provides an etag
uploader.if_match = if_match if not max_connections > 1 else None
# Access conditions do not work with parallelism
if max_connections > 1:
uploader.if_match = uploader.if_none_match = uploader.if_modified_since = uploader.if_unmodified_since = None
else:
uploader.if_match = if_match
uploader.if_none_match = if_none_match
uploader.if_modified_since = if_modified_since
uploader.if_unmodified_since = if_unmodified_since

if progress_callback is not None:
progress_callback(0, blob_size)
Expand Down Expand Up @@ -322,6 +329,10 @@ def _upload_chunk(self, chunk_offset, chunk_data):
lease_id=self.lease_id,
maxsize_condition=self.maxsize_condition,
timeout=self.timeout,
if_modified_since=self.if_modified_since,
if_unmodified_since=self.if_unmodified_since,
if_match=self.if_match,
if_none_match=self.if_none_match
)

self.current_length = resp.append_offset
Expand Down
Loading