Permalink
Browse files

s3put s3multiput: Merge multiput functionality into s3put

  • Loading branch information...
1 parent c06d56f commit 5c0c353001c66ecda4fb02bfb37a8058b9fe46c4 @glance- glance- committed Oct 18, 2012
Showing with 138 additions and 391 deletions.
  1. +0 −378 bin/s3multiput
  2. +136 −11 bin/s3put
  3. +2 −2 setup.py
View
378 bin/s3multiput
@@ -1,378 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2006,2007,2008 Mitch Garnaat http://garnaat.org/
-#
-# Permission is hereby granted, free of charge, to any person obtaining a
-# copy of this software and associated documentation files (the
-# "Software"), to deal in the Software without restriction, including
-# without limitation the rights to use, copy, modify, merge, publish, dis-
-# tribute, sublicense, and/or sell copies of the Software, and to permit
-# persons to whom the Software is furnished to do so, subject to the fol-
-# lowing conditions:
-#
-# The above copyright notice and this permission notice shall be included
-# in all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
-# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
-# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
-# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
-# IN THE SOFTWARE.
-#
-import getopt
-import sys
-import os
-import boto
-
-try:
- # multipart portions copyright Fabian Topfstedt
- # https://gist.github.com/924094
-
- import math
- import mimetypes
- from multiprocessing import Pool
- from boto.s3.connection import S3Connection
- from filechunkio import FileChunkIO
- multipart_capable = True
- usage_flag_multipart_capable = """ [--multipart]"""
- usage_string_multipart_capable = """
- multipart - Upload files as multiple parts. This needs filechunkio."""
-except ImportError:
- multipart_capable = False
- usage_flag_multipart_capable = ""
- usage_string_multipart_capable = ""
-
-
-usage_string = """
-SYNOPSIS
- s3put [-a/--access_key <access_key>] [-s/--secret_key <secret_key>]
- -b/--bucket <bucket_name> [-c/--callback <num_cb>]
- [-d/--debug <debug_level>] [-i/--ignore <ignore_dirs>]
- [-n/--no_op] [-p/--prefix <prefix>] [-k/--key_prefix <key_prefix>]
- [-q/--quiet] [-g/--grant grant] [-w/--no_overwrite] [-r/--reduced]
- [--header] """ + usage_string_multipart_capable + """ path
-
- Where
- access_key - Your AWS Access Key ID. If not supplied, boto will
- use the value of the environment variable
- AWS_ACCESS_KEY_ID
- secret_key - Your AWS Secret Access Key. If not supplied, boto
- will use the value of the environment variable
- AWS_SECRET_ACCESS_KEY
- bucket_name - The name of the S3 bucket the file(s) should be
- copied to.
- path - A path to a directory or file that represents the items
- to be uploaded. If the path points to an individual file,
- that file will be uploaded to the specified bucket. If the
- path points to a directory, s3_it will recursively traverse
- the directory and upload all files to the specified bucket.
- debug_level - 0 means no debug output (default), 1 means normal
- debug output from boto, and 2 means boto debug output
- plus request/response output from httplib
- ignore_dirs - a comma-separated list of directory names that will
- be ignored and not uploaded to S3.
- num_cb - The number of progress callbacks to display. The default
- is zero which means no callbacks. If you supplied a value
- of "-c 10" for example, the progress callback would be
- called 10 times for each file transferred.
- prefix - A file path prefix that will be stripped from the full
- path of the file when determining the key name in S3.
- For example, if the full path of a file is:
- /home/foo/bar/fie.baz
- and the prefix is specified as "-p /home/foo/" the
- resulting key name in S3 will be:
- /bar/fie.baz
- The prefix must end in a trailing separator and if it
- does not then one will be added.
- key_prefix - A prefix to be added to the S3 key name, after any
- stripping of the file path is done based on the
- "-p/--prefix" option.
- reduced - Use Reduced Redundancy storage
- grant - A canned ACL policy that will be granted on each file
- transferred to S3. The value of provided must be one
- of the "canned" ACL policies supported by S3:
- private|public-read|public-read-write|authenticated-read
- no_overwrite - No files will be overwritten on S3, if the file/key
- exists on s3 it will be kept. This is useful for
- resuming interrupted transfers. Note this is not a
- sync, even if the file has been updated locally if
- the key exists on s3 the file on s3 will not be
- updated.
- header - key=value paris of extra header(s) to pass along in the
- request""" + usage_string_multipart_capable + """
-
-
- If the -n option is provided, no files will be transferred to S3 but
- informational messages will be printed about what would happen.
-"""
-
-
-def usage():
- print usage_string
- sys.exit()
-
-
-def submit_cb(bytes_so_far, total_bytes):
- print '%d bytes transferred / %d bytes total' % (bytes_so_far, total_bytes)
-
-
-def get_key_name(fullpath, prefix, key_prefix):
- if fullpath.startswith(prefix):
- key_name = fullpath[len(prefix):]
- else:
- key_name = fullpath
- l = key_name.split(os.sep)
- return key_prefix + '/'.join(l)
-
-
-def _upload_part(bucketname, aws_key, aws_secret, multipart_id, part_num,
- source_path, offset, bytes, debug, cb, num_cb,
- amount_of_retries=10):
- """
- Uploads a part with retries.
- """
- if debug == 1:
- print "_upload_part(%s, %s, %s)" % (source_path, offset, bytes)
-
- def _upload(retries_left=amount_of_retries):
- try:
- if debug == 1:
- print 'Start uploading part #%d ...' % part_num
- conn = S3Connection(aws_key, aws_secret)
- conn.debug = debug
- bucket = conn.get_bucket(bucketname)
- for mp in bucket.get_all_multipart_uploads():
- if mp.id == multipart_id:
- with FileChunkIO(source_path, 'r', offset=offset,
- bytes=bytes) as fp:
- mp.upload_part_from_file(fp=fp, part_num=part_num,
- cb=cb, num_cb=num_cb)
- break
- except Exception, exc:
- if retries_left:
- _upload(retries_left=retries_left - 1)
- else:
- print 'Failed uploading part #%d' % part_num
- raise exc
- else:
- if debug == 1:
- print '... Uploaded part #%d' % part_num
-
- _upload()
-
-
-def multipart_upload(bucketname, aws_key, aws_secret, source_path, keyname,
- reduced, debug, cb, num_cb, acl='private', headers={},
- guess_mimetype=True, parallel_processes=4):
- """
- Parallel multipart upload.
- """
- conn = S3Connection(aws_key, aws_secret)
- conn.debug = debug
- bucket = conn.get_bucket(bucketname)
-
- if guess_mimetype:
- mtype = mimetypes.guess_type(keyname)[0] or 'application/octet-stream'
- headers.update({'Content-Type': mtype})
-
- mp = bucket.initiate_multipart_upload(keyname, headers=headers,
- reduced_redundancy=reduced)
-
- source_size = os.stat(source_path).st_size
- bytes_per_chunk = max(int(math.sqrt(5242880) * math.sqrt(source_size)),
- 5242880)
- chunk_amount = int(math.ceil(source_size / float(bytes_per_chunk)))
-
- pool = Pool(processes=parallel_processes)
- for i in range(chunk_amount):
- offset = i * bytes_per_chunk
- remaining_bytes = source_size - offset
- bytes = min([bytes_per_chunk, remaining_bytes])
- part_num = i + 1
- pool.apply_async(_upload_part, [bucketname, aws_key, aws_secret, mp.id,
- part_num, source_path, offset, bytes,
- debug, cb, num_cb])
- pool.close()
- pool.join()
-
- if len(mp.get_all_parts()) == chunk_amount:
- mp.complete_upload()
- key = bucket.get_key(keyname)
- key.set_acl(acl)
- else:
- mp.cancel_upload()
-
-
-def singlepart_upload(bucket, key_name, fullpath, *kargs, **kwargs):
- """
- Single upload.
- """
- k = bucket.new_key(key_name)
- k.set_contents_from_filename(fullpath, *kargs, **kwargs)
-
-
-def expand_path(path):
- path = os.path.expanduser(path)
- path = os.path.expandvars(path)
- return os.path.abspath(path)
-
-
-def main():
-
- # default values
- aws_access_key_id = None
- aws_secret_access_key = None
- bucket_name = ''
- ignore_dirs = []
- debug = 0
- cb = None
- num_cb = 0
- quiet = False
- no_op = False
- prefix = '/'
- key_prefix = ''
- grant = None
- no_overwrite = False
- reduced = False
- headers = {}
- multipart_requested = False
-
- try:
- opts, args = getopt.getopt(
- sys.argv[1:], 'a:b:c::d:g:hi:k:np:qs:wr',
- ['access_key=', 'bucket=', 'callback=', 'debug=', 'help', 'grant=',
- 'ignore=', 'key_prefix=', 'no_op', 'prefix=', 'quiet',
- 'secret_key=', 'no_overwrite', 'reduced', 'header=', 'multipart'])
- except:
- usage()
-
- # parse opts
- for o, a in opts:
- if o in ('-h', '--help'):
- usage()
- if o in ('-a', '--access_key'):
- aws_access_key_id = a
- if o in ('-b', '--bucket'):
- bucket_name = a
- if o in ('-c', '--callback'):
- num_cb = int(a)
- cb = submit_cb
- if o in ('-d', '--debug'):
- debug = int(a)
- if o in ('-g', '--grant'):
- grant = a
- if o in ('-i', '--ignore'):
- ignore_dirs = a.split(',')
- if o in ('-n', '--no_op'):
- no_op = True
- if o in ('-w', '--no_overwrite'):
- no_overwrite = True
- if o in ('-p', '--prefix'):
- prefix = a
- if prefix[-1] != os.sep:
- prefix = prefix + os.sep
- prefix = expand_path(prefix)
- if o in ('-k', '--key_prefix'):
- key_prefix = a
- if o in ('-q', '--quiet'):
- quiet = True
- if o in ('-s', '--secret_key'):
- aws_secret_access_key = a
- if o in ('-r', '--reduced'):
- reduced = True
- if o in ('--header'):
- (k, v) = a.split("=")
- headers[k] = v
- if o in ('--multipart'):
- if multipart_capable:
- multipart_requested = True
- else:
- print "multipart upload requested but not capable"
- sys.exit()
-
- if len(args) != 1:
- usage()
-
- path = expand_path(args[0])
-
- if not bucket_name:
- print "bucket name is required!"
- usage()
-
- c = boto.connect_s3(aws_access_key_id=aws_access_key_id,
- aws_secret_access_key=aws_secret_access_key)
- c.debug = debug
- b = c.get_bucket(bucket_name)
-
- # upload a directory of files recursively
- if os.path.isdir(path):
- if no_overwrite:
- if not quiet:
- print 'Getting list of existing keys to check against'
- keys = []
- for key in b.list(get_key_name(path, prefix, key_prefix)):
- keys.append(key.name)
- for root, dirs, files in os.walk(path):
- for ignore in ignore_dirs:
- if ignore in dirs:
- dirs.remove(ignore)
- for path in files:
- if path.startswith("."):
- continue
- fullpath = os.path.join(root, path)
- key_name = get_key_name(fullpath, prefix, key_prefix)
- copy_file = True
- if no_overwrite:
- if key_name in keys:
- copy_file = False
- if not quiet:
- print 'Skipping %s as it exists in s3' % path
-
- if copy_file:
- if not quiet:
- print 'Copying %s to %s/%s' % (path, bucket_name, key_name)
-
- if not no_op:
- # 0-byte files don't work and also don't need multipart upload
- if os.stat(fullpath).st_size != 0 and \
- multipart_capable and multipart_requested:
- multipart_upload(bucket_name, aws_access_key_id,
- aws_secret_access_key, fullpath, key_name,
- reduced, debug, cb, num_cb, grant or 'private',
- headers)
- else:
- singlepart_upload(b, key_name, fullpath, cb=cb, num_cb=num_cb,
- policy=grant, reduced_redundancy=reduced,
- headers=headers)
-
- # upload a single file
- elif os.path.isfile(path):
- fullpath = os.path.abspath(path)
- key_name = get_key_name(fullpath, prefix, key_prefix)
- copy_file = True
- if no_overwrite:
- if b.get_key(key_name):
- copy_file = False
- if not quiet:
- print 'Skipping %s as it exists in s3' % path
-
- if copy_file:
- if not quiet:
- print 'Copying %s to %s/%s' % (path, bucket_name, key_name)
-
- if not no_op:
- # 0-byte files don't work and also don't need multipart upload
- if os.stat(fullpath).st_size != 0 and \
- multipart_capable and multipart_requested:
- multipart_upload(bucket_name, aws_access_key_id,
- aws_secret_access_key, fullpath, key_name,
- reduced, debug, cb, num_cb, grant or 'private',
- headers=headers)
- else:
- singlepart_upload(b, key_name, fullpath, cb=cb, num_cb=num_cb,
- policy=grant, reduced_redundancy=reduced,
- headers=headers)
-
-if __name__ == "__main__":
- main()
View
147 bin/s3put
@@ -25,6 +25,24 @@ import sys
import os
import boto
+try:
+ # multipart portions copyright Fabian Topfstedt
+ # https://gist.github.com/924094
+
+ import math
+ import mimetypes
+ from multiprocessing import Pool
+ from boto.s3.connection import S3Connection
+ from filechunkio import FileChunkIO
+ multipart_capable = True
+ usage_flag_multipart_capable = """ [--multipart]"""
+ usage_string_multipart_capable = """
+ multipart - Upload files as multiple parts. This needs filechunkio."""
+except ImportError:
+ multipart_capable = False
+ usage_flag_multipart_capable = ""
+ usage_string_multipart_capable = ""
+
usage_string = """
SYNOPSIS
@@ -33,7 +51,7 @@ SYNOPSIS
[-d/--debug <debug_level>] [-i/--ignore <ignore_dirs>]
[-n/--no_op] [-p/--prefix <prefix>] [-k/--key_prefix <key_prefix>]
[-q/--quiet] [-g/--grant grant] [-w/--no_overwrite] [-r/--reduced]
- [--header] path
+ [--header] """ + usage_string_multipart_capable + """ path
Where
access_key - Your AWS Access Key ID. If not supplied, boto will
@@ -82,7 +100,7 @@ SYNOPSIS
the key exists on s3 the file on s3 will not be
updated.
header - key=value paris of extra header(s) to pass along in the
- request
+ request""" + usage_string_multipart_capable + """
If the -n option is provided, no files will be transferred to S3 but
@@ -108,6 +126,92 @@ def get_key_name(fullpath, prefix, key_prefix):
return key_prefix + '/'.join(l)
+def _upload_part(bucketname, aws_key, aws_secret, multipart_id, part_num,
+ source_path, offset, bytes, debug, cb, num_cb,
+ amount_of_retries=10):
+ """
+ Uploads a part with retries.
+ """
+ if debug == 1:
+ print "_upload_part(%s, %s, %s)" % (source_path, offset, bytes)
+
+ def _upload(retries_left=amount_of_retries):
+ try:
+ if debug == 1:
+ print 'Start uploading part #%d ...' % part_num
+ conn = S3Connection(aws_key, aws_secret)
+ conn.debug = debug
+ bucket = conn.get_bucket(bucketname)
+ for mp in bucket.get_all_multipart_uploads():
+ if mp.id == multipart_id:
+ with FileChunkIO(source_path, 'r', offset=offset,
+ bytes=bytes) as fp:
+ mp.upload_part_from_file(fp=fp, part_num=part_num,
+ cb=cb, num_cb=num_cb)
+ break
+ except Exception, exc:
+ if retries_left:
+ _upload(retries_left=retries_left - 1)
+ else:
+ print 'Failed uploading part #%d' % part_num
+ raise exc
+ else:
+ if debug == 1:
+ print '... Uploaded part #%d' % part_num
+
+ _upload()
+
+
+def multipart_upload(bucketname, aws_key, aws_secret, source_path, keyname,
+ reduced, debug, cb, num_cb, acl='private', headers={},
+ guess_mimetype=True, parallel_processes=4):
+ """
+ Parallel multipart upload.
+ """
+ conn = S3Connection(aws_key, aws_secret)
+ conn.debug = debug
+ bucket = conn.get_bucket(bucketname)
+
+ if guess_mimetype:
+ mtype = mimetypes.guess_type(keyname)[0] or 'application/octet-stream'
+ headers.update({'Content-Type': mtype})
+
+ mp = bucket.initiate_multipart_upload(keyname, headers=headers,
+ reduced_redundancy=reduced)
+
+ source_size = os.stat(source_path).st_size
+ bytes_per_chunk = max(int(math.sqrt(5242880) * math.sqrt(source_size)),
+ 5242880)
+ chunk_amount = int(math.ceil(source_size / float(bytes_per_chunk)))
+
+ pool = Pool(processes=parallel_processes)
+ for i in range(chunk_amount):
+ offset = i * bytes_per_chunk
+ remaining_bytes = source_size - offset
+ bytes = min([bytes_per_chunk, remaining_bytes])
+ part_num = i + 1
+ pool.apply_async(_upload_part, [bucketname, aws_key, aws_secret, mp.id,
+ part_num, source_path, offset, bytes,
+ debug, cb, num_cb])
+ pool.close()
+ pool.join()
+
+ if len(mp.get_all_parts()) == chunk_amount:
+ mp.complete_upload()
+ key = bucket.get_key(keyname)
+ key.set_acl(acl)
+ else:
+ mp.cancel_upload()
+
+
+def singlepart_upload(bucket, key_name, fullpath, *kargs, **kwargs):
+ """
+ Single upload.
+ """
+ k = bucket.new_key(key_name)
+ k.set_contents_from_filename(fullpath, *kargs, **kwargs)
+
+
def expand_path(path):
path = os.path.expanduser(path)
path = os.path.expandvars(path)
@@ -132,13 +236,14 @@ def main():
no_overwrite = False
reduced = False
headers = {}
+ multipart_requested = False
try:
opts, args = getopt.getopt(
sys.argv[1:], 'a:b:c::d:g:hi:k:np:qs:wr',
['access_key=', 'bucket=', 'callback=', 'debug=', 'help', 'grant=',
'ignore=', 'key_prefix=', 'no_op', 'prefix=', 'quiet',
- 'secret_key=', 'no_overwrite', 'reduced', 'header='])
+ 'secret_key=', 'no_overwrite', 'reduced', 'header=', 'multipart'])
except:
usage()
@@ -179,6 +284,12 @@ def main():
if o in ('--header'):
(k, v) = a.split("=")
headers[k] = v
+ if o in ('--multipart'):
+ if multipart_capable:
+ multipart_requested = True
+ else:
+ print "multipart upload requested but not capable"
+ sys.exit()
if len(args) != 1:
usage()
@@ -223,11 +334,17 @@ def main():
print 'Copying %s to %s/%s' % (path, bucket_name, key_name)
if not no_op:
- k = b.new_key(key_name)
- k.set_contents_from_filename(fullpath, cb=cb, num_cb=num_cb,
- policy=grant, reduced_redundancy=reduced,
- headers=headers)
- total += 1
+ # 0-byte files don't work and also don't need multipart upload
+ if os.stat(fullpath).st_size != 0 and \
+ multipart_capable and multipart_requested:
+ multipart_upload(bucket_name, aws_access_key_id,
+ aws_secret_access_key, fullpath, key_name,
+ reduced, debug, cb, num_cb, grant or 'private',
+ headers)
+ else:
+ singlepart_upload(b, key_name, fullpath, cb=cb, num_cb=num_cb,
+ policy=grant, reduced_redundancy=reduced,
+ headers=headers)
# upload a single file
elif os.path.isfile(path):
@@ -245,9 +362,17 @@ def main():
print 'Copying %s to %s/%s' % (path, bucket_name, key_name)
if not no_op:
- k = b.new_key(key_name)
- k.set_contents_from_filename(fullpath, cb=cb, num_cb=num_cb, policy=grant,
- reduced_redundancy=reduced, headers=headers)
+ # 0-byte files don't work and also don't need multipart upload
+ if os.stat(fullpath).st_size != 0 and \
+ multipart_capable and multipart_requested:
+ multipart_upload(bucket_name, aws_access_key_id,
+ aws_secret_access_key, fullpath, key_name,
+ reduced, debug, cb, num_cb, grant or 'private',
+ headers=headers)
+ else:
+ singlepart_upload(b, key_name, fullpath, cb=cb, num_cb=num_cb,
+ policy=grant, reduced_redundancy=reduced,
+ headers=headers)
if __name__ == "__main__":
main()
View
4 setup.py
@@ -55,8 +55,8 @@ def readme():
"bin/s3put", "bin/fetch_file", "bin/launch_instance",
"bin/list_instances", "bin/taskadmin", "bin/kill_instance",
"bin/bundle_image", "bin/pyami_sendmail", "bin/lss3",
- "bin/cq", "bin/route53", "bin/s3multiput", "bin/cwutil",
- "bin/instance_events", "bin/asadmin", "bin/glacier"],
+ "bin/cq", "bin/route53", "bin/cwutil", "bin/instance_events",
+ "bin/asadmin", "bin/glacier"],
url = "https://github.com/boto/boto/",
packages = ["boto", "boto.sqs", "boto.s3", "boto.gs", "boto.file",
"boto.ec2", "boto.ec2.cloudwatch", "boto.ec2.autoscale",

0 comments on commit 5c0c353

Please sign in to comment.