Skip to content

Commit

Permalink
Add delete option for issue Azure-Samples#78
Browse files Browse the repository at this point in the history
- Add delete option analogous to rsync's --delete-before
  • Loading branch information
alfpark committed Nov 8, 2015
1 parent 2f1c7ab commit 461c677
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 6 deletions.
3 changes: 3 additions & 0 deletions Python/Storage/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ General Notes
data if the source is not page boundary byte-aligned. This enables these
page blobs or files to be skipped during subsequent download or upload,
if the skiponmatch parameter is enabled.
- if ``--delete`` is specified, any remote files found that have no
corresponding local file in directory upload mode will be deleted. Deletion
occurs prior to any transfers, analogous to the delete-before rsync option.

Performance Notes
-----------------
Expand Down
65 changes: 59 additions & 6 deletions Python/Storage/blobxfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,28 @@ def set_blob_properties(
'set_blob_properties: {}'.format(
response.status_code))

def delete_blob(
self, container_name, blob_name):
"""Deletes a blob
Parameters:
container_name - container name
blob_name - name of blob
Returns:
Nothing
Raises:
IOError if unexpected status code
"""
url = '{blobep}{container_name}/{blob_name}{saskey}'.format(
blobep=self.blobep, container_name=container_name,
blob_name=blob_name, saskey=self.saskey)
response = azure_request(
requests.delete, url=url, timeout=self.timeout)
response.raise_for_status()
if response.status_code != 202:
raise IOError(
'incorrect status code returned for delete_blob: {}'.format(
response.status_code))


class BlobChunkWorker(threading.Thread):
"""Chunk worker for a Blob"""
Expand Down Expand Up @@ -1383,24 +1405,26 @@ def as_page_blob(pageblob, autovhd, name):
return False


def get_blob_listing(blob_service, args):
def get_blob_listing(blob_service, args, metadata=True):
"""Convenience method for generating a blob listing of a container
Parameters:
blob_service - blob service
args - program arguments
metadata - include metadata
Returns:
dictionary of blob -> list [content length, content md5, enc metadata]
Raises:
Nothing
"""
marker = None
blobdict = {}
incl = 'metadata' if metadata else None
while True:
try:
result = azure_request(
blob_service.list_blobs, timeout=args.timeout,
container_name=args.container, marker=marker,
maxresults=_MAX_LISTBLOBS_RESULTS, include='metadata')
maxresults=_MAX_LISTBLOBS_RESULTS, include=incl)
except azure.common.AzureMissingResourceHttpError:
break
for blob in result:
Expand Down Expand Up @@ -1854,6 +1878,7 @@ def main():
print(' keep mismatched MD5: {}'.format(args.keepmismatchedmd5files))
print(' recursive if dir: {}'.format(args.recursive))
print(' keep root dir on up: {}'.format(args.keeprootdir))
print(' remote delete: {}'.format(args.delete))
print(' collate to: {}'.format(args.collate or 'disabled'))
print(' local overwrite: {}'.format(args.overwrite))
print(' encryption mode: {}'.format(
Expand All @@ -1876,6 +1901,7 @@ def main():
completed_blockids = {}
filemap = {}
filesizes = {}
delblobs = None
md5map = {}
filedesc = None
if xfertoazure:
Expand All @@ -1885,13 +1911,15 @@ def main():
else:
blobskipdict = {}
if os.path.isdir(args.localresource):
_remotefiles = set()
# mirror directory
if args.recursive:
for root, _, files in os.walk(args.localresource):
for dirfile in files:
fname = os.path.join(root, dirfile)
remotefname = apply_file_collation(
args, fname, apply_keeproot=True)
_remotefiles.add(remotefname)
filesize, ops, md5digest, filedesc = \
generate_xferspec_upload(
args, storage_in_queue, blobskipdict,
Expand All @@ -1912,6 +1940,7 @@ def main():
remotefname = apply_file_collation(
args, lfile if not args.keeprootdir else fname,
apply_keeproot=False)
_remotefiles.add(remotefname)
filesize, ops, md5digest, filedesc = \
generate_xferspec_upload(
args, storage_in_queue, blobskipdict,
Expand All @@ -1923,6 +1952,14 @@ def main():
filesizes[fname] = filesize
allfilesize = allfilesize + filesize
nstorageops = nstorageops + ops
# fill deletion list
if args.delete:
# get blob skip dict if it hasn't been populated
if len(blobskipdict) == 0:
blobskipdict = get_blob_listing(
blob_service, args, metadata=False)
delblobs = [x for x in blobskipdict if x not in _remotefiles]
del _remotefiles
else:
# upload single file
if not args.remoteresource:
Expand All @@ -1939,6 +1976,7 @@ def main():
filemap[args.localresource] = args.remoteresource
filesizes[args.localresource] = filesize
allfilesize = allfilesize + filesize
del blobskipdict
# create container if needed
if args.createcontainer:
try:
Expand Down Expand Up @@ -1997,9 +2035,19 @@ def main():
nstorageops = nstorageops + ops
if len(blobdict) > 0:
del created_dirs
del blobdict

# delete any remote blobs if specified
if xfertoazure and delblobs is not None:
print('deleting {} remote blobs'.format(len(delblobs)))
for blob in delblobs:
azure_request(
blob_service.delete_blob, timeout=args.timeout,
container_name=args.container, blob_name=blob)
print('deletion complete.')

if nstorageops == 0:
print('detected no actions needed to be taken, exiting...')
print('detected no transfer actions needed to be taken, exiting...')
sys.exit(0)

if xfertoazure:
Expand Down Expand Up @@ -2120,7 +2168,7 @@ def main():
args.progressbar, 'xfer', progress_text, nstorageops,
done_ops, storage_start)
print('\n\n{} MiB transfered, elapsed {} sec. '
'Throughput = {} Mbit/sec'.format(
'Throughput = {} Mbit/sec\n'.format(
allfilesize / 1048576.0, endtime - storage_start,
(8.0 * allfilesize / 1048576.0) / (endtime - storage_start)))

Expand All @@ -2130,7 +2178,7 @@ def main():

# finalize files/blobs
if not xfertoazure:
print('\nperforming finalization (if applicable): {}: {}, '
print('performing finalization (if applicable): {}: {}, '
'MD5: {}'.format(
_ENCRYPTION_INTEGRITY_AUTH_ALGORITHM,
args.rsakey is not None, args.computefilemd5))
Expand Down Expand Up @@ -2192,6 +2240,7 @@ def main():
os.remove(tmpfilename)
print('finalization complete.')

# output final log lines
print('\nscript elapsed time: {} sec'.format(time.time() - start))
print('script end time: {}'.format(time.strftime("%Y-%m-%d %H:%M:%S")))

Expand Down Expand Up @@ -2238,7 +2287,7 @@ def parseargs(): # pragma: no cover
parser.set_defaults(
autovhd=False, blobep=_DEFAULT_BLOB_ENDPOINT,
chunksizebytes=_MAX_BLOB_CHUNK_SIZE_BYTES, collate=None,
computefilemd5=True, createcontainer=True,
computefilemd5=True, createcontainer=True, delete=False,
encmode=_DEFAULT_ENCRYPTION_MODE, keeprootdir=False,
managementep=_DEFAULT_MANAGEMENT_ENDPOINT,
numworkers=_DEFAULT_MAX_STORAGEACCOUNT_WORKERS, overwrite=True,
Expand All @@ -2263,6 +2312,10 @@ def parseargs(): # pragma: no cover
'--chunksizebytes', type=int,
help='maximum chunk size to transfer in bytes [{}]'.format(
_MAX_BLOB_CHUNK_SIZE_BYTES))
parser.add_argument(
'--delete', action='store_true',
help='delete extraneous remote blobs that have no corresponding '
'local file when uploading directories')
parser.add_argument(
'--download', action='store_true',
help='force transfer direction to download from Azure')
Expand Down
28 changes: 28 additions & 0 deletions Python/Storage/test/test_blobxfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,9 @@ def test_main1(
patched_sms_saprops, patched_sms_sakeys, patched_parseargs, tmpdir):
lpath = str(tmpdir.join('test.tmp'))
args = MagicMock()
args.delete = False
args.rsakey = None
args.rsakeypassphrase = None
args.numworkers = 0
args.localresource = ''
args.storageaccount = 'blobep'
Expand Down Expand Up @@ -867,6 +869,8 @@ def test_main1(
'?saskey&comp=blocklist', status_code=201)
m.put('https://blobep.blobep/container' + lpath +
'?saskey&comp=block&blockid=00000000', status_code=201)
m.put('https://blobep.blobep/container' + lpath +
'?saskey&comp=metadata', status_code=200)
m.get('https://blobep.blobep/container?saskey&comp=list'
'&restype=container&maxresults=1000',
text='<?xml version="1.0" encoding="utf-8"?>'
Expand Down Expand Up @@ -894,6 +898,18 @@ def test_main1(
m.get('https://blobep.blobep/container/blob?saskey', content=b'012345')
blobxfer.main()

args.pageblob = False
args.autovhd = False
args.skiponmatch = False
pempath = str(tmpdir.join('rsa.pem'))
with open(pempath, 'wb') as f:
f.write(_RSAKEY.exportKey())
args.rsakey = pempath
blobxfer.main()
os.remove(pempath)

args.rsakey = None
args.skiponmatch = True
args.remoteresource = '.'
args.keepmismatchedmd5files = False
m.get('https://blobep.blobep/container?saskey&comp=list'
Expand Down Expand Up @@ -927,6 +943,7 @@ def test_main1(
notmp_lpath = '/'.join(lpath.strip('/').split('/')[1:])

with requests_mock.mock() as m:
args.delete = True
args.download = False
args.upload = True
args.remoteresource = None
Expand All @@ -943,6 +960,16 @@ def test_main1(
'?saskey&comp=block&blockid=00000000', status_code=200)
m.put('https://blobep.blobep/container/' + notmp_lpath +
'?saskey&comp=blocklist', status_code=201)
m.get('https://blobep.blobep/container?saskey&comp=list'
'&restype=container&maxresults=1000',
text='<?xml version="1.0" encoding="utf-8"?>'
'<EnumerationResults ContainerName="https://blobep.blobep/'
'container"><Blobs><Blob><Name>blob</Name><Properties>'
'<Content-Length>6</Content-Length><Content-MD5>md5'
'</Content-MD5><BlobType>BlockBlob</BlobType></Properties>'
'<Metadata/></Blob></Blobs></EnumerationResults>')
m.delete('https://blobep.blobep/container/blob?saskey',
status_code=202)
with pytest.raises(SystemExit):
blobxfer.main()

Expand Down Expand Up @@ -1013,6 +1040,7 @@ def test_main2(patched_parseargs, tmpdir):
lpath = str(tmpdir.join('test.tmp'))
args = MagicMock()
patched_parseargs.return_value = args
args.delete = False
args.rsakey = None
args.numworkers = 64
args.storageaccount = 'blobep'
Expand Down

0 comments on commit 461c677

Please sign in to comment.