Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

executable file 410 lines (356 sloc) 15.297 kb
#!/usr/bin/env python
import pyinotify
import boto
from optparse import OptionParser
from traceback import format_exc
from threading import Thread
from Queue import Queue
import logging
import os.path
import socket
import json
import sys
import os
import pwd
import grp
import signal
import StringIO
default_log = logging.getLogger('tablesnap')
stderr = logging.StreamHandler()
stderr.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s'))
default_log.addHandler(stderr)
if os.environ.get('TDEBUG', False):
default_log.setLevel(logging.DEBUG)
else:
default_log.setLevel(logging.INFO)
# Default number of writer threads
default_threads = 4
# Default retries
default_retries = 1
# S3 limit for single file upload
s3_limit = 5 * 2**30
# Max file size to upload without doing multipart in MB
max_file_size = 5120
# Default chunk size for multipart uploads in MB
default_chunk_size = 256
class UploadHandler(pyinotify.ProcessEvent):
def my_init(self, threads=None, key=None, secret=None, bucket_name=None,
prefix=None, name=None, max_size=None, chunk_size=None,
log=default_log):
self.key = key
self.secret = secret
self.bucket_name = bucket_name
self.prefix = prefix
self.name = name or socket.getfqdn()
self.retries = default_retries
self.log = log
if max_size:
self.max_size = max_size * 2**20
else:
self.max_size = max_file_size * 2**20
if chunk_size:
self.chunk_size = chunk_size * 2**20
else:
self.chunk_size = None
self.fileq = Queue()
for i in range(int(threads)):
t = Thread(target=self.worker)
t.daemon = True
t.start()
def build_keyname(self, pathname):
return '%s%s:%s' % (self.prefix or '', self.name, pathname)
def add_file(self, filename):
if filename.find('-tmp') == -1:
self.fileq.put(filename)
def get_bucket(self):
# Reconnect to S3
s3 = boto.connect_s3(self.key, self.secret)
return s3.get_bucket(self.bucket_name)
def worker(self):
bucket = self.get_bucket()
while True:
f = self.fileq.get()
keyname = self.build_keyname(f)
try:
self.upload_sstable(bucket, keyname, f)
except:
self.log.critical("Failed uploading %s. Aborting.\n%s" %
(f, format_exc()))
# Brute force kill self
os.kill(os.getpid(), signal.SIGKILL)
self.fileq.task_done()
def process_IN_MOVED_TO(self, event):
self.add_file(event.pathname)
#
# Check if this keyname (ie, file) has already been uploaded to
# the S3 bucket. This will verify that not only does the keyname
# exist, but that the MD5 sum is the same -- this protects against
# partial or corrupt uploads.
#
def key_exists(self, bucket, keyname, filename, stat):
key = None
for r in range(self.retries):
try:
key = bucket.get_key(keyname)
if key == None:
self.log.debug('Key %s does not exist' % (keyname,))
return False
else:
self.log.debug('Found key %s' % (keyname,))
break
except:
bucket = self.get_bucket()
continue
if key == None:
self.log.critical("Failed to lookup keyname %s after %d"
" retries\n%s" %
(keyname, self.retries, format_exc()))
raise
if key.size != stat.st_size:
self.log.warning('ATTENTION: your source (%s) and target (%s) '
'sizes differ, you should take a look. As immutable files '
'never change, one must assume the local file got corrupted '
'and the right version is the one in S3. Will skip this file '
'to avoid future complications' % (filename, keyname, ))
return True
else:
# Compute MD5 sum of file
try:
fp = open(filename, "r")
except IOError as (errno, strerror):
if errno == 2:
# The file was removed, return True to skip this file.
return True
self.log.critical("Failed to open file: %s (%s)\n%s" %
(filename, strerror, format_exc(),))
raise
md5 = key.compute_md5(fp)
fp.close()
self.log.debug('Computed md5: %s' % (md5,))
meta = key.get_metadata('md5sum')
if meta:
self.log.debug('MD5 metadata comparison: %s == %s? : %s' %
(md5[0], meta, (md5[0] == meta)))
result = (md5[0] == meta)
else:
self.log.debug('ETag comparison: %s == %s? : %s' %
(md5[0], key.etag.strip('"'),
(md5[0] == key.etag.strip('"'))))
result = (md5[0] == key.etag.strip('"'))
if result:
self.log.debug('Setting missing md5sum metadata for %s' %
(keyname,))
key.set_metadata('md5sum', md5[0])
if result:
self.log.info("Keyname %s already exists, skipping upload"
% (keyname))
else:
self.log.warning('ATTENTION: your source (%s) and target (%s) '
'MD5 hashes differ, you should take a look. As immutable '
'files never change, one must assume the local file got '
'corrupted and the right version is the one in S3. Will '
'skip this file to avoid future complications' %
(filename, keyname, ))
return True
return result
def get_free_memory_in_kb(self):
f = open('/proc/meminfo', 'r')
memlines = f.readlines()
f.close()
lines = []
for line in memlines:
ml = line.rstrip(' kB\n').split(':')
lines.append((ml[0], int(ml[1].strip())))
d = dict(lines)
return d['Cached'] + d['MemFree'] + d['Buffers']
def split_sstable(self, filename):
free = self.get_free_memory_in_kb() * 1024
self.log.debug('Free memory check: %d < %d ? : %s' %
(free, self.chunk_size, (free < self.chunk_size)))
if free < self.chunk_size:
self.log.warn('Your system is low on memory, '
'reading in smaller chunks')
chunk_size = free / 20
else:
chunk_size = self.chunk_size
self.log.debug('Reading %s in %d byte sized chunks' %
(filename, chunk_size))
f = open(filename, 'rb')
while True:
chunk = f.read(chunk_size)
if chunk:
yield StringIO.StringIO(chunk)
else:
break
if f and not f.closed:
f.close()
def upload_sstable(self, bucket, keyname, filename, with_index=True):
# Include the file system metadata so that we have the
# option of using it to restore the file modes correctly.
#
try:
stat = os.stat(filename)
except OSError:
# File removed?
return
if self.key_exists(bucket, keyname, filename, stat):
return
else:
fp = open(filename, 'rb')
md5 = boto.utils.compute_md5(fp)
self.log.debug('Computed md5sum before upload is: %s' % (md5,))
fp.close()
def progress(sent, total):
if sent == total:
self.log.info('Finished uploading %s' % filename)
try:
dirname = os.path.dirname(filename)
if with_index:
json_str = json.dumps({dirname: os.listdir(dirname)})
for r in range(self.retries):
try:
key = bucket.new_key('%s-listdir.json' % keyname)
key.set_contents_from_string(json_str,
headers={'Content-Type': 'application/json'},
replace=True)
break
except:
if r == self.retries - 1:
self.log.critical("Failed to upload directory "
"listing.")
raise
bucket = self.get_bucket()
continue
meta = {'uid': stat.st_uid,
'gid': stat.st_gid,
'mode': stat.st_mode}
try:
u = pwd.getpwuid(stat.st_uid)
meta['user'] = u.pw_name
except:
pass
try:
g = grp.getgrgid(stat.st_gid)
meta['group'] = g.gr_name
except:
pass
self.log.info('Uploading %s' % filename)
meta = json.dumps(meta)
for r in range(self.retries):
try:
self.log.debug('File size check: %s > %s ? : %s' %
(stat.st_size, self.max_size,
(stat.st_size > self.max_size),))
if stat.st_size > self.max_size:
self.log.info('Performing multipart upload for %s' %
(filename))
mp = bucket.initiate_multipart_upload(keyname,
metadata={'stat': meta, 'md5sum': md5[0]})
part = 1
chunk = None
try:
for chunk in self.split_sstable(filename):
self.log.debug('Uploading part #%d '
'(size: %d)' %
(part, chunk.len,))
mp.upload_part_from_file(chunk, part)
chunk.close()
part += 1
part -= 1
except Exception as e:
self.log.debug(e)
self.log.info('Error uploading part %d' % (part,))
mp.cancel_upload()
if chunk:
chunk.close()
raise
self.log.debug('Uploaded %d parts, '
'completing upload' % (part,))
mp.complete_upload()
progress(100, 100)
else:
self.log.debug('Performing monolithic upload')
key = bucket.new_key(keyname)
key.set_metadata('stat', meta)
key.set_metadata('md5sum', md5[0])
key.set_contents_from_filename(filename, replace=True,
cb=progress, num_cb=1,
md5=md5)
break
except:
if not os.path.exists(filename):
# File was removed? Skip
return
if r == self.retries - 1:
self.log.critical("Failed to upload file contents.")
raise
bucket = self.get_bucket()
continue
except:
self.log.error('Error uploading %s\n%s' % (keyname, format_exc()))
raise
def backup_file(handler, filename, filedir):
if filename.find('-tmp') != -1:
return
def backup_files(handler, paths, recurse, log=default_log):
for path in paths:
log.info('Backing up %s' % path)
if recurse:
for root, dirs, files in os.walk(path):
for filename in files:
backup_file(handler, filename, root)
else:
for filename in os.listdir(path):
backup_file(handler, filename, path)
return 0
def main():
parser = OptionParser(usage='%prog [options] <bucket> <path> [...]')
parser.add_option('-k', '--aws-key', dest='aws_key', default=None)
parser.add_option('-s', '--aws-secret', dest='aws_secret', default=None)
parser.add_option('-r', '--recursive', action='store_true', dest='recursive', default=False,
help='Recursively watch the given path(s)s for new SSTables')
parser.add_option('-a', '--auto-add', action='store_true', dest='auto_add', default=False,
help='Automatically start watching new subdirectories within path(s)')
parser.add_option('-p', '--prefix', dest='prefix', default=None,
help='Set a string prefix for uploaded files in S3')
parser.add_option('-t', '--threads', dest='threads', default=default_threads,
help='Number of writer threads')
parser.add_option('-n', '--name', dest='name', default=None,
help='Use this name instead of the FQDN to identify the SSTables from this host')
parser.add_option('--max-upload-size', dest='max_upload_size',
default=max_file_size,
help='Max size for files to be uploaded before doing multipart '
'(default %dM)' % max_file_size)
parser.add_option('--multipart-chunk-size', dest='multipart_chunk_size',
default=default_chunk_size,
help='Chunk size for multipart uploads (default: %dM or 10%% of '
'free memory if default is not available)' %
(default_chunk_size,))
options, args = parser.parse_args()
if len(args) < 2:
parser.print_help()
return -1
bucket = args[0]
paths = args[1:]
# Check S3 credentials only. We reconnect per-thread to avoid any
# potential thread-safety problems.
s3 = boto.connect_s3(options.aws_key, options.aws_secret)
bucket = s3.get_bucket(bucket)
handler = UploadHandler(threads=options.threads, key=options.aws_key,
secret=options.aws_secret, bucket_name=bucket,
prefix=options.prefix, name=options.name,
max_size=int(options.max_upload_size),
chunk_size=int(options.multipart_chunk_size))
wm = pyinotify.WatchManager()
notifier = pyinotify.Notifier(wm, handler)
for path in paths:
ret = wm.add_watch(path, pyinotify.IN_MOVED_TO, rec=options.recursive,
auto_add=options.auto_add)
if ret[path] == -1:
default_log.critical('add_watch failed for %s, bailing out!' %
(path))
return 1
backup_files(handler, paths, options.recursive)
notifier.loop()
if __name__ == '__main__':
sys.exit(main())
Jump to Line
Something went wrong with that request. Please try again.