Skip to content

Commit

Permalink
Add upload to page blob support
Browse files Browse the repository at this point in the history
  • Loading branch information
alfpark committed Sep 1, 2017
1 parent 5c3c093 commit 8c68441
Show file tree
Hide file tree
Showing 14 changed files with 200 additions and 21 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,23 @@

## [Unreleased]

### Added
- `upload` from `stdin` to page blob support. Optional
`--stdin-as-page-blob-size` parameter added. Please see current limitations
doc for more information.
- `upload` from `stdin` `--rename` support
- `synccopy` single object `--rename` support

### Changed
- AppVeyor integration
- PyPI releases automatically generated for tags
- PyInstaller-based releases uploaded to GitHub for Windows and Linux

### Fixed
- YAML config merge with CLI options when YAML options not present
- `synccopy` invocation without YAML config
- Test failures on Windows

## [1.0.0b1] - 2017-08-28
### Added
- Cross-mode synchronous copy support
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ copies for Block blobs)
* Support all Azure Blob types and Azure Files for both upload and download
* Advanced skip options for rsync-like operations
* Store/restore POSIX filemode and uid/gid
* Support reading/pipe from `stdin`
* Support reading/pipe from `stdin` including page blob destinations
* Support reading from blob snapshots for downloading and synchronous copy
* Configurable one-shot block upload support
* Configurable chunk size for both upload and download
Expand Down
1 change: 1 addition & 0 deletions blobxfer/models/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
'recursive',
'rename',
'rsa_public_key',
'stdin_as_page_blob_size',
'store_file_properties',
'strip_components',
'vectored_io',
Expand Down
47 changes: 40 additions & 7 deletions blobxfer/models/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,11 @@ def can_rename(self):
:rtype: bool
:return: if rename possible
"""
return len(self._paths) == 1 and self._paths[0].is_file()
return len(self._paths) == 1 and (
self._paths[0].is_file() or
blobxfer.models.upload.LocalSourcePath.is_stdin(
str(self._paths[0]))
)

@staticmethod
def is_stdin(path):
Expand Down Expand Up @@ -349,15 +353,20 @@ def __init__(self, lpath, ase, uid, options, resume_mgr):
self._chunk_num = 0
self._next_integrity_chunk = 0
self._finalized = False
self._needs_resize = False
self._meta_lock = threading.Lock()
self._hasher_lock = threading.Lock()
self._resume_mgr = resume_mgr
if resume_mgr and self.local_path.use_stdin:
logger.warning('ignoring resume option for stdin source')
self._resume_mgr = None
else:
self._resume_mgr = resume_mgr
self._ase = ase
self._store_file_attr = options.store_file_properties.attributes
self.current_iv = None
self._initialize_encryption(options)
# calculate the total number of ops required for transfer
self._compute_remote_size()
self._compute_remote_size(options)
self._adjust_chunk_size(options)
self._total_chunks = self._compute_total_chunks(self._chunk_size)
self._outstanding_ops = self._total_chunks
Expand Down Expand Up @@ -501,6 +510,16 @@ def requires_set_file_properties_md5(self):
return (not self.entity.is_encrypted and self.must_compute_md5 and
self.remote_is_file)

def requires_resize(self):
# type: (Descriptor) -> tuple
"""Remote destination requires a resize operation
:param Descriptor self: this
:rtype: tuple
:return: blob requires a resize, length
"""
with self._meta_lock:
return (self._needs_resize, self._offset)

def complete_offset_upload(self, chunk_num):
# type: (Descriptor, int) -> None
"""Complete the upload for the offset
Expand Down Expand Up @@ -573,15 +592,23 @@ def _initialize_encryption(self, options):
self.current_iv = em.content_encryption_iv
self._ase.encryption_metadata = em

def _compute_remote_size(self):
# type: (Descriptor, int) -> None
def _compute_remote_size(self, options):
# type: (Descriptor, blobxfer.models.options.Upload) -> None
"""Compute total remote file size
:param Descriptor self: this
:param blobxfer.models.options.Upload options: upload options
:rtype: int
:return: remote file size
"""
size = self.local_path.size
if size > 0:
if (self._ase.mode == blobxfer.models.azure.StorageModes.Page and
self.local_path.use_stdin):
if options.stdin_as_page_blob_size == 0:
allocatesize = _MAX_PAGE_BLOB_SIZE
self._needs_resize = True
else:
allocatesize = options.stdin_as_page_blob_size
elif size > 0:
if self._ase.is_encrypted:
# cipher_len_without_iv = (clear_len / aes_bs + 1) * aes_bs
allocatesize = (size // self._AES_BLOCKSIZE + 1) * \
Expand Down Expand Up @@ -678,7 +705,9 @@ def _compute_total_chunks(self, chunk_size):
chunks = int(math.ceil(self._ase.size / chunk_size))
except ZeroDivisionError:
chunks = 1
if self.local_path.use_stdin and chunks == 0:
# for stdin, override and use 1 chunk to start, this will change
# dynamically as data as read
if self.local_path.use_stdin:
chunks = 1
if (self._ase.mode != blobxfer.models.azure.StorageModes.Page and
chunks > 50000):
Expand Down Expand Up @@ -877,12 +906,16 @@ def read_data(self, offsets):
data = blobxfer.STDIN.read(self._chunk_size)
if not data:
with self._meta_lock:
self._offset -= offsets.num_bytes
self._ase.size -= offsets.num_bytes
self._total_chunks -= 1
self._chunk_num -= 1
self._outstanding_ops -= 1
else:
num_bytes = len(data)
with self._meta_lock:
self._offset -= offsets.num_bytes
self._ase.size -= offsets.num_bytes
newoffset = Offsets(
chunk_num=self._chunk_num - 1,
num_bytes=num_bytes,
Expand Down
14 changes: 14 additions & 0 deletions blobxfer/operations/azure/blob/page.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,17 @@ def put_page(ase, page_start, page_end, data, timeout=None):
end_range=page_end,
validate_content=False, # integrity is enforced with HTTPS
timeout=timeout) # noqa


def resize_blob(ase, size, timeout=None):
# type: (blobxfer.models.azure.StorageEntity, int, int) -> None
"""Resizes a page blob
:param blobxfer.models.azure.StorageEntity ase: Azure StorageEntity
:param int size: content length
:param int timeout: timeout
"""
ase.client.resize_blob(
container_name=ase.container,
blob_name=ase.name,
content_length=blobxfer.util.page_align_content_length(size),
timeout=timeout) # noqa
24 changes: 20 additions & 4 deletions blobxfer/operations/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,13 +628,29 @@ def _set_blob_metadata(self, ud, metadata):
for ase in ud.entity.replica_targets:
blobxfer.operations.azure.blob.set_blob_metadata(ase, metadata)

def _resize_blob(self, ud, size):
# type: (Uploader, blobxfer.models.upload.Descriptor, int) -> None
"""Resize page blob
:param Uploader self: this
:param blobxfer.models.upload.Descriptor ud: upload descriptor
:param int size: content length
"""
blobxfer.operations.azure.blob.page.resize_blob(ud.entity, size)
if blobxfer.util.is_not_empty(ud.entity.replica_targets):
for ase in ud.entity.replica_targets:
blobxfer.operations.azure.blob.page.resize_blob(ase, size)

def _finalize_nonblock_blob(self, ud, metadata):
# type: (Uploader, blobxfer.models.upload.Descriptor, dict) -> None
"""Finalize Non-Block blob
:param Uploader self: this
:param blobxfer.models.upload.Descriptor ud: upload descriptor
:param dict metadata: metadata dict
"""
# resize page blobs to final size if required
needs_resize, final_size = ud.requires_resize()
if needs_resize:
self._resize_blob(ud, final_size)
# set md5 page blob property if required
if ud.requires_non_encrypted_md5_put:
self._set_blob_md5(ud)
Expand Down Expand Up @@ -1030,7 +1046,7 @@ def _run(self):
skipped_size = 0
approx_total_bytes = 0
# iterate through source paths to upload
dupes = set()
seen = set()
for src in self._spec.sources.files():
# create a destination array for the source
dest = [
Expand All @@ -1040,11 +1056,11 @@ def _run(self):
for action, lp, ase in self._vectorize_and_bind(src, dest):
dest_id = blobxfer.operations.upload.Uploader.\
create_destination_id(ase._client, ase.container, ase.name)
if dest_id in dupes:
if dest_id in seen:
raise RuntimeError(
'duplicate destination entity detected: {}/{}'.format(
ase._client.primary_endpoint, ase.path))
dupes.add(dest_id)
seen.add(dest_id)
if self._spec.options.delete_extraneous_destination:
self._delete_exclude.add(dest_id)
if action == UploadAction.Skip:
Expand All @@ -1064,7 +1080,7 @@ def _run(self):
self._pre_md5_skip_on_check(lp, ase)
elif action == UploadAction.Upload:
self._add_to_upload_queue(lp, ase, uid)
del dupes
del seen
# set remote files processed
with self._md5_meta_lock:
self._all_files_processed = True
Expand Down
15 changes: 15 additions & 0 deletions cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,20 @@ def callback(ctx, param, value):
callback=callback)(f)


def _stdin_as_page_blob_size_option(f):
def callback(ctx, param, value):
clictx = ctx.ensure_object(CliContext)
clictx.cli_options['stdin_as_page_blob_size'] = value
return value
return click.option(
'--stdin-as-page-blob-size',
expose_value=False,
type=int,
default=None,
help='Size of page blob with input from stdin [0]',
callback=callback)(f)


def _strip_components_option(f):
def callback(ctx, param, value):
clictx = ctx.ensure_object(CliContext)
Expand Down Expand Up @@ -708,6 +722,7 @@ def callback(ctx, param, value):
def upload_options(f):
f = _stripe_chunk_size_bytes_option(f)
f = _strip_components_option(f)
f = _stdin_as_page_blob_size_option(f)
f = _skip_on_md5_match_option(f)
f = _skip_on_lmt_ge_option(f)
f = _skip_on_filesize_match_option(f)
Expand Down
5 changes: 5 additions & 0 deletions cli/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ def add_cli_options(cli_options, action):
'lmt_ge': cli_options.get('skip_on_lmt_ge'),
'md5_match': cli_options.get('skip_on_md5_match'),
},
'stdin_as_page_blob_size': cli_options.get(
'stdin_as_page_blob_size'),
'store_file_properties': {
'attributes': cli_options.get('file_attributes'),
'md5': cli_options.get('file_md5'),
Expand Down Expand Up @@ -609,6 +611,9 @@ def create_upload_specifications(cli_options, config):
cli_options, sfp, 'md5', name_cli='file_md5',
default=False),
),
stdin_as_page_blob_size=_merge_setting(
cli_options, conf['options'], 'stdin_as_page_blob_size',
default=0),
strip_components=_merge_setting(
cli_options, conf['options'], 'strip_components',
default=0),
Expand Down
2 changes: 2 additions & 0 deletions docs/10-cli-usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ behavior.
* `--rename` renames a single file to the target destination or source path.
This can only be used when transferring a single source file to a destination
and can be used with any command.
* `--stdin-as-page-blob-size` allows a page blob size to be set if known
beforehand when using `stdin` as a source and the destination is a page blob
* `--strip-components N` will strip the leading `N` components from the
local file path on upload. The default is `0`.

Expand Down
4 changes: 4 additions & 0 deletions docs/20-yaml-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ upload:
filesize_match: false
lmt_ge: false
md5_match: true
stdin_as_page_blob_size: 0
store_file_properties:
attributes: true
md5: true
Expand Down Expand Up @@ -201,6 +202,9 @@ upload:
* `lmt_ge` skip if remote file has a last modified time greater than or
equal to the local file
* `md5_match` skip if MD5 match
* `stdin_as_page_blob_size` is the page blob size to preallocate if the
amount of data to be streamed from stdin is known beforehand and the
`mode` is `page`
* `store_file_properties` stores the following file properties if enabled
* `attributes` will store POSIX file mode and ownership
* `md5` will store the MD5 of the file
Expand Down
14 changes: 13 additions & 1 deletion docs/99-current-limitations.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ Please read this section carefully for any current known limitations to
### Client-side Encryption
* Client-side encryption is currently only available for block blobs and
Azure Files.
* `stdin` sources cannot be encrypted.
* Azure KeyVault key references are currently not supported.

### Platform-specific
Expand All @@ -16,6 +15,19 @@ Azure Files.
SHA256 object cannot be pickled.
* Append blobs currently cannot be resumed for upload.

### `stdin` Limitations
* `stdin` uploads without the `--stdin-as-page-blob-size` parameter will
allocate a maximum-sized page blob and then will be resized once the `stdin`
source completes. If the upload fails, the file will remain maximum sized
and will be charged as such; no cleanup is performed if the upload fails.
* `stdin` sources cannot be resumed.
* `stdin` sources cannot be encrypted.
* `stdin` sources cannot be stripe vectorized for upload.
* For optimal performance, `--chunk-size-bytes` should match the "chunk size"
that is being written to `stdin`. For example, if you were using `dd` you
should set the block size (`bs`) parameter to be the same as the
`--chunk-size-bytes` parameter.

### General Azure File Limitations
* Please see [this article](https://msdn.microsoft.com/en-us/library/azure/dn744326.aspx)
for more information.
Expand Down

0 comments on commit 8c68441

Please sign in to comment.