Skip to content
Find file
Fetching contributors…
Cannot retrieve contributors at this time
executable file 192 lines (161 sloc) 7.22 KB
#!/usr/bin/env python
# -*- python -*-
import pump
import pump_transfer
import pump_json
import util_cli as util
import sys
import time
import os
import os.path
import shutil
import simplejson as json
from optparse import OptionParser
from couchbase import client
from couchbase.rest_client import RestConnection, RestHelper
class DocLoader(pump_transfer.Transfer):
def parse_args(self, argv):
usage = "usage: %prog [options] <directory>|zipfile\n\n" + \
"Example: %prog -u Administrator -p password -n 127.0.0.1:8091 " + \
"-b mybucket -s 100 gamesim-sample.zip"
parser = OptionParser(usage)
username = os.environ.get('BUCKET_USERNAME', "")
password = os.environ.get('BUCKET_PASSWORD', "")
parser.add_option('-u', dest='username', default=username,
help='Username', metavar='Administrator')
parser.add_option('-p', dest='password', default=password,
help='Password', metavar='password')
parser.add_option('-b', dest='bucket',
help='Bucket', metavar='mybucket')
parser.add_option('-n', dest='node', default='127.0.0.1:8091',
help='Node address', metavar='127.0.0.1:8091')
parser.add_option('-s', dest='ram_quota', default=100, type='int',
help='RAM quota in MB', metavar=100)
parser.add_option("-v", dest='verbose', action="count")
self.options, self.args = parser.parse_args(argv[1:])
if not self.args or not self.options.bucket:
parser.print_help()
sys.exit(1)
# check if the uploaded file exists
if not os.path.exists(self.args[0]):
sys.stderr.write("Invalid path: %s\n" % self.args[0])
sys.exit(1)
def opt_construct(self, argv):
sink_opts = {"node" : "http://"}
common_opts = {"bucket" : ["-B", None],
"username" : ["-u", None],
"password" : ["-p", None],
}
count_opts = {"verbose" : ["-v", None]}
# parse options and arguments
self.parse_args(argv)
gen_str = "json://" + self.args[0]
sink_str = ""
for key in sink_opts.iterkeys():
val = getattr(self.options, key, None)
if val:
sink_str += sink_opts[key] + val
for key in common_opts.iterkeys():
val = getattr(self.options, key, None)
if val:
common_opts[key][1] = str(val)
for key in count_opts.iterkeys():
val = getattr(self.options, key, None)
if val:
count_opts[key][1] = int(val)
return gen_str, sink_str, common_opts, count_opts
def init_bucket(self):
host, port= util.hostport(self.options.node)
server_info = {'ip': host,
'port': port,
'username': self.options.username,
'password': self.options.password}
self.rest = RestConnection(server_info)
timeout_in_seconds = 120
if self.options.password:
uri = "http://%s:%s/nodes/self" % (server_info["ip"], server_info["port"])
status, content = self.rest._http_request(uri)
quotaUnused = -1
if status:
try:
json_parsed = json.loads(content)
quotaTotal = json_parsed["storageTotals"]["ram"]["quotaTotal"]
quotaUnused = quotaTotal - json_parsed["storageTotals"]["ram"]["quotaUsed"]
except:
pass
quotaUnused = quotaUnused / 1024.0
if quotaUnused > 0 and quotaUnused < self.options.ram_quota:
sys.stderr.write("RAM quota specified is too large to be provisioned into this cluster\n")
sys.stderr.write("Available RAM quota: %d, requested: %d\n" %\
(quotaUnused, self.options.ram_quota))
sys.exit(1)
if not RestHelper(self.rest).bucket_exists(self.options.bucket):
self.rest.create_bucket(bucket=self.options.bucket,
ramQuotaMB=self.options.ram_quota,
authType='sasl')
start = time.time()
# Make sure the bucket exists before querying its status
bucket_exist = False
while (time.time() - start) <= timeout_in_seconds and not bucket_exist:
bucket_exist = RestHelper(self.rest).bucket_exists(self.options.bucket)
if bucket_exist:
break
else:
sys.stderr.write(".")
time.sleep(2)
if not bucket_exist:
sys.stderr.write("Fail to create bucket '%s' within %s seconds\n" %\
(self.options.bucket, timeout_in_seconds))
sys.exit(1)
self.rest = RestConnection(server_info)
#Query status for all bucket nodes
uri = "http://%s:%s/pools/default/buckets/%s" % \
(server_info["ip"], server_info["port"], self.options.bucket)
all_node_ready = False
start = time.time()
while (time.time() - start) <= timeout_in_seconds and not all_node_ready:
status, content = self.rest._http_request(uri)
try:
json_data = json.loads(content)
all_node_ready = True
for node in json_data["nodes"]:
if node["status"] != "healthy":
all_node_ready = False
break
if not all_node_ready:
sys.stderr.write(".")
time.sleep(2)
except Exception, err:
print "Exception:", err
break
if not all_node_ready:
sys.stderr.write("\nNode status is not ready after creating bucket '%s' within %s seconds" %\
(self.options.bucket, timeout_in_seconds))
sys.exit(1)
else:
print "bucket creation is successful"
def find_handlers(self, opts, source, sink):
return pump_json.JSONSource, pump.PumpingStation.find_handler(opts, sink, pump_transfer.SINKS)
def main(self, argv):
src, sink, common_opts, count_opts = self.opt_construct(argv)
local_args = [argv[0]]
local_args.append(src)
local_args.append(sink)
for v in common_opts.itervalues():
local_args.append(v[0])
local_args.append(v[1])
for v in count_opts.itervalues():
if v[1] is not None:
for i in range(v[1]):
local_args.append(v[0])
# create new bucket if it doesn't exist
self.init_bucket()
#use cbtransfer to upload documents
pump_transfer.Transfer.main(self, local_args)
if __name__ == '__main__':
if os.name == 'nt':
mydir = os.path.dirname(sys.argv[0])
bin_dir = os.path.join(mydir, '..')
path = [mydir, bin_dir, os.environ['PATH']]
os.environ['PATH'] = ';'.join(path)
pump_transfer.exit_handler(DocLoader().main(sys.argv))
Jump to Line
Something went wrong with that request. Please try again.