Skip to content
This repository has been archived by the owner on Jan 5, 2023. It is now read-only.

Commit

Permalink
upload throttling
Browse files Browse the repository at this point in the history
  • Loading branch information
Jim Blomo committed Apr 10, 2012
1 parent 6589e03 commit c0aa93a
Showing 1 changed file with 31 additions and 4 deletions.
35 changes: 31 additions & 4 deletions s3mysqldump.py
Expand Up @@ -31,6 +31,7 @@
import sys
import tempfile
import time
import socket

import boto
import boto.pyami.config
Expand Down Expand Up @@ -107,8 +108,7 @@ def mysqldump_to_s3(s3_uri, databases=None, tables=None):
upload_multipart(s3_key, file.name)
else:
log.debug('Upload to %r' % s3_key)
s3_key.set_contents_from_file(file)

upload_singlepart(s3_key, file.name)
log.debug(' Done in %.1fs' % (time.time() - start))

# output to separate files, if specified by %T and %D
Expand Down Expand Up @@ -267,27 +267,54 @@ def make_s3_key(s3_conn, s3_uri):
else:
return bucket.new_key(key_name)

def sleeping_callback(t):
"""Return a callback function that sleeps for t seconds"""
return lambda _,__: time.sleep(t)

S3_ATTEMPTS = 4 # number of times to retry failed uploads
S3_THROTTLE = 60 # number of times to throttle during upload

def upload_multipart(s3_key, large_file):
"""Split up a large_file into chunks suitable for multipart upload, then
upload each chunk."""
split_dir = tempfile.mkdtemp(prefix='s3mysqldump-split-')
split_prefix = "%s/part-" % split_dir

args = ['split', "--line-bytes=%u" % S3_MAX_PUT_SIZE, '--suffix-length=4', large_file, split_prefix]
args = ['split', "--line-bytes=%u" % S3_MAX_PUT_SIZE, '--suffix-length=5', '-d', large_file, split_prefix]
log.debug(' '.join(pipes.quote(arg) for arg in args))
subprocess.check_call(args)

mp = s3_key.bucket.initiate_multipart_upload(s3_key.name)
log.debug('Multipart upload to %r' % s3_key)
for part, filename in enumerate(sorted(glob.glob(split_prefix + '*'))):
with open(filename, 'rb') as file:
mp.upload_part_from_file(file, part+1) # counting starts at 1
for t in xrange(S3_ATTEMPTS):
try:
mp.upload_part_from_file(file, part+1, cb=sleeping_callback(t), num_cb=S3_THROTTLE) # counting starts at 1
log.debug('Part %s uploaded to %r' % (part+1, s3_key))
break
except socket.error as e:
log.warn('Part %s, upload attempt %s/%s: upload_part_from_file raised %r' %
(part+1, t, S3_ATTEMPTS, e))
else:
raise socket.error("Upload failed")

mp.complete_upload()

shutil.rmtree(split_dir, True)

def upload_singlepart(s3_key, filename):
"""Upload a normal sized file. Retry with sleeping callbacks when throttled by S3."""
for t in xrange(S3_ATTEMPTS):
try:
s3_key.set_contents_from_filename(filename, cb=sleeping_callback(t), num_cb=S3_THROTTLE)
break
except socket.error as e:
log.warn('Upload attempt %s/%s: set_contents_from_file raised %r' %
(t, S3_ATTEMPTS, e))
else:
raise socket.error("Upload failed")


def make_option_parser():
usage = '%prog [options] db_name [tbl_name ...] s3_uri_format'
Expand Down

0 comments on commit c0aa93a

Please sign in to comment.