Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

executable file 195 lines (166 sloc) 6.664 kB
#!/usr/bin/env python
# Copyright (c) 2009 Chris Moyer http://kopertop.blogspot.com/
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
#
# Tools to dump and recover an SDB domain
#
VERSION = "%prog version 1.0"
import boto
import time
from boto import sdb
# Allow support for JSON
try:
import simplejson as json
except:
try:
import json
except:
json = False
def choice_input(options, default=None, title=None):
"""
Choice input
"""
if title == None:
title = "Please choose"
print title
objects = []
for n, obj in enumerate(options):
print "%s: %s" % (n, obj)
objects.append(obj)
choice = int(raw_input(">>> "))
try:
choice = objects[choice]
except:
choice = default
return choice
def confirm(message="Are you sure?"):
choice = raw_input("%s [yN] " % message)
return choice and len(choice) > 0 and choice[0].lower() == "y"
def dump_db(domain, file_name, use_json=False):
"""
Dump SDB domain to file
"""
f = open(file_name, "w")
if use_json:
for item in domain:
data = {"name": item.name, "attributes": item}
print >> f, json.dumps(data)
else:
doc = domain.to_xml(f)
def empty_db(domain):
"""
Remove all entries from domain
"""
for item in domain:
item.delete()
def load_db(domain, file, use_json=False):
"""
Load a domain from a file, this doesn't overwrite any existing
data in the file so if you want to do a full recovery and restore
you need to call empty_db before calling this
:param domain: The SDB Domain object to load to
:param file: The File to load the DB from
"""
if use_json:
for line in file.readlines():
if line:
data = json.loads(line)
item = domain.new_item(data['name'])
item.update(data['attributes'])
item.save()
else:
domain.from_xml(file)
def create_db(domain_name, region_name):
"""Create a new DB
:param domain: Name of the domain to create
:type domain: str
"""
sdb = boto.sdb.connect_to_region(region_name)
return sdb.create_domain(domain_name)
if __name__ == "__main__":
from optparse import OptionParser
parser = OptionParser(version=VERSION, usage="Usage: %prog [--dump|--load|--empty|--list|-l] [options]")
# Commands
parser.add_option("--dump", help="Dump domain to file", dest="dump", default=False, action="store_true")
parser.add_option("--load", help="Load domain contents from file", dest="load", default=False, action="store_true")
parser.add_option("--empty", help="Empty all contents of domain", dest="empty", default=False, action="store_true")
parser.add_option("-l", "--list", help="List All domains", dest="list", default=False, action="store_true")
parser.add_option("-c", "--create", help="Create domain", dest="create", default=False, action="store_true")
parser.add_option("-a", "--all-domains", help="Operate on all domains", action="store_true", default=False, dest="all_domains")
if json:
parser.add_option("-j", "--use-json", help="Load/Store as JSON instead of XML", action="store_true", default=False, dest="json")
parser.add_option("-d", "--domain", help="Do functions on domain (may be more then one)", action="append", dest="domains")
parser.add_option("-f", "--file", help="Input/Output file we're operating on", dest="file_name")
parser.add_option("-r", "--region", help="Region (e.g. us-east-1[default] or eu-west-1)", default="us-east-1", dest="region_name")
(options, args) = parser.parse_args()
if options.create:
for domain_name in options.domains:
create_db(domain_name, options.region_name)
exit()
sdb = boto.sdb.connect_to_region(options.region_name)
if options.list:
for db in sdb.get_all_domains():
print db
exit()
if not options.dump and not options.load and not options.empty:
parser.print_help()
exit()
#
# Setup
#
if options.domains:
domains = []
for domain_name in options.domains:
domains.append(sdb.get_domain(domain_name))
elif options.all_domains:
domains = sdb.get_all_domains()
else:
domains = [choice_input(options=sdb.get_all_domains(), title="No domain specified, please choose one")]
#
# Execute the commands
#
stime = time.time()
if options.empty:
if confirm("WARNING!!! Are you sure you want to empty the following domains?: %s" % domains):
stime = time.time()
for domain in domains:
print "--------> Emptying %s <--------" % domain.name
empty_db(domain)
else:
print "Canceling operations"
exit()
if options.dump:
for domain in domains:
print "--------> Dumping %s <---------" % domain.name
if options.file_name:
file_name = options.file_name
else:
file_name = "%s.db" % domain.name
dump_db(domain, file_name, options.json)
if options.load:
for domain in domains:
print "---------> Loading %s <----------" % domain.name
if options.file_name:
file_name = options.file_name
else:
file_name = "%s.db" % domain.name
load_db(domain, open(file_name, "rb"), options.json)
total_time = round(time.time() - stime, 2)
print "--------> Finished in %s <--------" % total_time
Jump to Line
Something went wrong with that request. Please try again.