Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #1 from yudongtan/master

s3mysqldump change
  • Loading branch information...
commit 6192b6e348e0fded19317d62d8c9f474b3b7dfdb 2 parents c0aa93a + 102ccbb
@jblomo jblomo authored
View
21 README.rst
@@ -0,0 +1,21 @@
+s3mysqldump
+===========
+
+**s3mysqldump** is a tool to dump mysql tables to S3, so they can be consumed by Elastic MapReduce, etc.
+
+Installation
+============
+
+From source:
+
+python setup.py install
+
+A Simple Example
+================
+
+The following command dumps 'user' table in 'db' database to s3 bucket s3://emr-storage/. 'my.cnf' specifies mysql parameters. 'boto.cfg' is the configure file for s3 connection which specifies things like aws credentials etc.
+
+``s3mysqldump -v --force -m my.cnf -s -b boto.cfg db user s3://emr-storage/user.sql``
+
+
+
View
106 s3mysqldump.py
@@ -43,6 +43,8 @@
DEFAULT_MYSQLDUMP_BIN = 'mysqldump'
SINGLE_ROW_FORMAT_OPTS = [
+ # --skip-opt causes out of memory error on 5.0.91, so do explicitly instead
+ # '--skip-opt',
'--compact',
'--complete-insert',
'--default_character_set=utf8',
@@ -52,13 +54,12 @@
'--quick',
'--skip-lock-tables',
'--skip-extended-insert',
- # '--skip-opt', removing all opt options causes out of memory error on 5.0.91
]
S3_URI_RE = re.compile(r'^s3n?://(.*?)/(.*)$')
-S3_MAX_PUT_SIZE = 4*1024*1024*1024 # actually 5G, but 4 to be safe
+S3_MAX_PUT_SIZE = 4 * 1024 * 1024 * 1024 # actually 5G, but 4 to be safe
# match directives in a strftime format string (e.g. '%Y-%m-%d')
# for fully correct handling of percent literals (e.g. don't match %T in %%T)
@@ -68,7 +69,8 @@
def main(args):
"""Run the mysqldump utility.
- :param list args: alternate command line arguments (normally we read from ``sys.argv[:1]``)
+ :param list args: alternate command line arguments (normally we read from
+ ``sys.argv[:1]``)
"""
databases, tables, s3_uri_format, options = parse_args(args)
@@ -190,7 +192,8 @@ def replacer(match):
def parse_args(args):
"""Parse command-line arguments
- :param list args: alternate command line arguments (normally we read from ``sys.argv[:1]``)
+ :param list args: alternate command line arguments (normally we read from
+ ``sys.argv[1:]``)
:return: *database*, *tables*, *s3_uri*, *options*
"""
@@ -229,7 +232,8 @@ def parse_args(args):
parser.error('If you use %T, you must specify one or more tables')
if has_database_field(s3_uri_format) and not databases:
- parser.error('If you use %D, you must specify one or more databases (use %d for day of month)')
+ parser.error('If you use %D, you must specify one or more databases'
+ ' (use %d for day of month)')
return databases, tables, s3_uri_format, options
@@ -237,8 +241,11 @@ def parse_args(args):
def connect_s3(boto_cfg=None, **kwargs):
"""Make a connection to S3 using :py:mod:`boto` and return it.
- :param string boto_cfg: Optional path to boto.cfg file to read credentials from
- :param kwargs: Optional additional keyword args to pass to :py:func:`boto.connect_s3`. Keyword args set to ``None`` will be filtered out (so we can use boto's defaults).
+ :param string boto_cfg: Optional path to boto.cfg file to read credentials
+ from
+ :param kwargs: Optional additional keyword args to pass to
+ :py:func:`boto.connect_s3`. Keyword args set to ``None``
+ will be filtered out (so we can use boto's defaults).
"""
if boto_cfg:
configs = boto.pyami.config.Config(path=boto_cfg)
@@ -267,12 +274,14 @@ def make_s3_key(s3_conn, s3_uri):
else:
return bucket.new_key(key_name)
+
def sleeping_callback(t):
"""Return a callback function that sleeps for t seconds"""
- return lambda _,__: time.sleep(t)
+ return lambda _, __: time.sleep(t)
+
+S3_ATTEMPTS = 4 # number of times to retry failed uploads
+S3_THROTTLE = 60 # number of times to throttle during upload
-S3_ATTEMPTS = 4 # number of times to retry failed uploads
-S3_THROTTLE = 60 # number of times to throttle during upload
def upload_multipart(s3_key, large_file):
"""Split up a large_file into chunks suitable for multipart upload, then
@@ -280,7 +289,8 @@ def upload_multipart(s3_key, large_file):
split_dir = tempfile.mkdtemp(prefix='s3mysqldump-split-')
split_prefix = "%s/part-" % split_dir
- args = ['split', "--line-bytes=%u" % S3_MAX_PUT_SIZE, '--suffix-length=5', '-d', large_file, split_prefix]
+ args = ['split', "--line-bytes=%u" % S3_MAX_PUT_SIZE, '--suffix-length=5',
+ '-d', large_file, split_prefix]
log.debug(' '.join(pipes.quote(arg) for arg in args))
subprocess.check_call(args)
@@ -290,12 +300,18 @@ def upload_multipart(s3_key, large_file):
with open(filename, 'rb') as file:
for t in xrange(S3_ATTEMPTS):
try:
- mp.upload_part_from_file(file, part+1, cb=sleeping_callback(t), num_cb=S3_THROTTLE) # counting starts at 1
- log.debug('Part %s uploaded to %r' % (part+1, s3_key))
+ mp.upload_part_from_file(
+ file,
+ part + 1,
+ cb=sleeping_callback(t),
+ num_cb=S3_THROTTLE
+ ) # counting starts at 1
+ log.debug('Part %s uploaded to %r' % (part + 1, s3_key))
break
except socket.error as e:
- log.warn('Part %s, upload attempt %s/%s: upload_part_from_file raised %r' %
- (part+1, t, S3_ATTEMPTS, e))
+ log.warn('Part %s, upload attempt %s/%s:'\
+ 'upload_part_from_file raised %r' %
+ (part + 1, t, S3_ATTEMPTS, e))
else:
raise socket.error("Upload failed")
@@ -303,11 +319,18 @@ def upload_multipart(s3_key, large_file):
shutil.rmtree(split_dir, True)
+
def upload_singlepart(s3_key, filename):
- """Upload a normal sized file. Retry with sleeping callbacks when throttled by S3."""
+ """Upload a normal sized file. Retry with sleeping callbacks when
+ throttled by S3.
+ """
for t in xrange(S3_ATTEMPTS):
try:
- s3_key.set_contents_from_filename(filename, cb=sleeping_callback(t), num_cb=S3_THROTTLE)
+ s3_key.set_contents_from_filename(
+ filename,
+ cb=sleeping_callback(t),
+ num_cb=S3_THROTTLE
+ )
break
except socket.error as e:
log.warn('Upload attempt %s/%s: set_contents_from_file raised %r' %
@@ -318,9 +341,9 @@ def upload_singlepart(s3_key, filename):
def make_option_parser():
usage = '%prog [options] db_name [tbl_name ...] s3_uri_format'
- description = ('Dump one or more MySQL tables to S3.' +
- ' s3_uri_format may be a strftime() format string, e.g.' +
- ' s3://foo/%Y/%m/%d/, for daily (or hourly) dumps. You can '
+ description = ('Dump one or more MySQL tables to S3.'
+ ' s3_uri_format may be a strftime() format string, e.g.'
+ ' s3://foo/%Y/%m/%d/, for daily (or hourly) dumps. You can'
' also use %D for database name and %T for table name. '
' Using %T will create one key per table.')
option_parser = optparse.OptionParser(usage=usage, description=description)
@@ -369,11 +392,14 @@ def make_option_parser():
help='Dump tables from one database (the default).')
option_parser.add_option(
'--s3-endpoint', dest='s3_endpoint', default=None,
- help='alternate S3 endpoint to connect to (e.g. us-west-1.elasticmapreduce.amazonaws.com).')
+ help=('alternate S3 endpoint to connect to (e.g.'
+ ' us-west-1.elasticmapreduce.amazonaws.com).'))
option_parser.add_option(
'-s', '--single-row-format', dest='single_row_format', default=False,
action='store_true',
- help='Output single-row INSERT statements, and turn off locking, for easy data processing. Equivalent to -M "%s"' % ' '.join(SINGLE_ROW_FORMAT_OPTS))
+ help=('Output single-row INSERT statements, and turn off locking, for'
+ ' easy data processing. Equivalent to -M "%s"'
+ % ' '.join(SINGLE_ROW_FORMAT_OPTS)))
option_parser.add_option(
'--utc', dest='utc', default=False, action='store_true',
help='Use UTC rather than local time to process s3_uri_format')
@@ -400,7 +426,8 @@ def parse_s3_uri(uri):
raise ValueError('Invalid S3 URI: %s' % uri)
-def log_to_stream(name=None, stream=None, format=None, level=None, debug=False):
+def log_to_stream(name=None, stream=None, format=None, level=None,
+ debug=False):
"""Set up logging.
:type name: str
@@ -411,7 +438,8 @@ def log_to_stream(name=None, stream=None, format=None, level=None, debug=False):
:param format: log message format (default is '%(message)s')
:param level: log level to use
:type debug: bool
- :param debug: quick way of setting the log level; if true, use ``logging.DEBUG``; otherwise use ``logging.INFO``
+ :param debug: quick way of setting the log level; if true, use
+ ``logging.DEBUG``; otherwise use ``logging.INFO``
"""
if level is None:
level = logging.DEBUG if debug else logging.INFO
@@ -441,23 +469,39 @@ def parse_opts(list_of_opts):
return results
-def mysqldump_to_file(file, databases=None, tables=None, mysqldump_bin=None, my_cnf=None, extra_opts=None, single_row_format=False):
+def mysqldump_to_file(file, databases=None, tables=None, mysqldump_bin=None,
+ my_cnf=None, extra_opts=None, single_row_format=False):
"""Run mysqldump on a single table and dump it to a file
:param string file: file object to dump to
- :param databases: sequence of MySQL database names, or ``None`` for all databases
- :param tables: sequences of MySQL table names, or ``None`` for all tables. If you specify tables, there must be exactly one database name, due to limitations of :command:`mysqldump`
+ :param databases: sequence of MySQL database names, or ``None`` for all
+ databases
+ :param tables: sequences of MySQL table names, or ``None`` for all tables.
+ If you specify tables, there must be exactly one database
+ name, due to limitations of :command:`mysqldump`
:param string mysqldump_bin: alternate path to mysqldump binary
- :param string my_cnf: alternate path to my.cnf file containing MySQL credentials. If not set, this function will also try to read the environment variable :envvar:`MY_CNF`.
- :param extra_opts: a list of additional arguments to pass to mysqldump (e.g. hostname, port, and credentials).
- :param single_row_format: Output single-row INSERT statements, and turn off locking, for easy data processing.. Passes ``--compact --complete-insert --default_character_set=utf8 --hex-blob --no-create-db --no-create-info --quick --skip-lock-tables --skip-extended-insert`` to :command:`mysqldump`. Note this also turns off table locking. You can override any of this with *extra_opts*.
+ :param string my_cnf: alternate path to my.cnf file containing MySQL
+ credentials. If not set, this function will also try
+ to read the environment variable :envvar:`MY_CNF`.
+ :param extra_opts: a list of additional arguments to pass to mysqldump
+ (e.g. hostname, port, and credentials).
+ :param single_row_format: Output single-row INSERT statements, and turn off
+ locking, for easy data processing.. Passes
+ ``--compact --complete-insert
+ --default_character_set=utf8 --hex-blob
+ --no-create-db --no-create-info --quick
+ --skip-lock-tables --skip-extended-insert`` to
+ :command:`mysqldump`. Note this also turns off
+ table locking. You can override any of this with
+ *extra_opts*.
If you dump multiple databases in single-row format, you will still get one
``USE`` statement per database; :command:`mysqldump` doesn't have a way to
turn this off.
"""
if tables and (not databases or len(databases) != 1):
- raise ValueError('If you specify tables you must specify exactly one database')
+ raise ValueError(
+ 'If you specify tables you must specify exactly one database')
args = []
args.append(mysqldump_bin or DEFAULT_MYSQLDUMP_BIN)
View
10 tests/mockboto.py
@@ -24,6 +24,7 @@
import boto.exception
+
### S3 ###
def add_mock_s3_data(mock_s3_fs, data):
@@ -108,8 +109,9 @@ def __init__(self, bucket=None, name=None):
self.bucket = bucket
self.name = name
- def set_contents_from_file(self, f):
+ def set_contents_from_filename(self, filename, cb=None, num_cb=0):
mock_s3_fs = self.bucket.connection.mock_s3_fs
- f.seek(0)
- contents = f.read()
- mock_s3_fs[self.bucket.name][self.name] = contents
+ with open(filename) as f:
+ f.seek(0)
+ contents = f.read()
+ mock_s3_fs[self.bucket.name][self.name] = contents
View
21 tests/s3mysqldump_tests.py
@@ -24,6 +24,7 @@
from tests.mockboto import MockS3Connection
from tests.mockboto import add_mock_s3_data
+
class MockS3AndMysqldumpTestCase(TestCase):
@setup
@@ -42,7 +43,7 @@ def wrap_make_option_parser(self):
"""
self.times_help_printed = 0
self.parser_errors = []
-
+
real_make_option_parser = s3mysqldump.make_option_parser
def fake_print_help():
@@ -51,7 +52,7 @@ def fake_print_help():
def fake_error(msg):
self.parser_errors.append(msg)
sys.exit(1)
-
+
def wrapper():
parser = real_make_option_parser()
parser.print_help = fake_print_help
@@ -78,7 +79,7 @@ def mock_boto_connect_s3(*args, **kwargs):
if 'aws_access_key_id' in kwargs:
self.aws_access_key_id = kwargs['aws_access_key_id']
if 'aws_secret_access_key' in kwargs:
- self.aws_access_key_id = kwargs['aws_access_key_id']
+ self.aws_secret_access_key = kwargs['aws_secret_access_key']
return MockS3Connection(*args, **kwargs)
@@ -170,7 +171,7 @@ def test_one_table(self):
def test_percent_T_on_one_table(self):
s3mysqldump.main(['foo', 'bar', 's3://walrus/%T.sql'])
self.check_s3('walrus', 'bar.sql', '--tables -- foo bar')
-
+
def test_percent_D_and_T_on_one_table(self):
s3mysqldump.main(['foo', 'bar', 's3://walrus/%D/%T.sql'])
self.check_s3('walrus', 'foo/bar.sql', '--tables -- foo bar')
@@ -186,18 +187,18 @@ def test_percent_T_on_many_tables(self):
self.check_s3('walrus', 'bar.sql', '--tables -- foo bar')
self.check_s3('walrus', 'baz.sql', '--tables -- foo baz')
self.check_s3('walrus', 'qux.sql', '--tables -- foo qux')
-
+
def test_percent_D_and_T_on_many_tables(self):
s3mysqldump.main(['foo', 'bar', 'baz', 'qux',
's3://walrus/%D/%T.sql'])
self.check_s3('walrus', 'foo/bar.sql', '--tables -- foo bar')
self.check_s3('walrus', 'foo/baz.sql', '--tables -- foo baz')
self.check_s3('walrus', 'foo/qux.sql', '--tables -- foo qux')
-
+
def test_one_database(self):
s3mysqldump.main(['-B', 'foo', 's3://walrus/foo.sql'])
self.check_s3('walrus', 'foo.sql', '--databases -- foo')
-
+
def test_percent_D_with_one_database(self):
s3mysqldump.main(['-B', 'foo', 's3://walrus/%D.sql'])
self.check_s3('walrus', 'foo.sql', '--databases -- foo')
@@ -294,12 +295,6 @@ def test_with_boto_cfg(self):
s3mysqldump.main(['-b', self.boto_cfg, 'foo', 's3://walrus/foo.sql'])
assert_equal(self.aws_access_key_id, '12345678910')
assert_equal(self.aws_secret_access_key, 'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
-
-
-
-
-
-
if __name__ == '__main__':
Please sign in to comment.
Something went wrong with that request. Please try again.