Permalink
Fetching contributors…
Cannot retrieve contributors at this time
executable file 518 lines (429 sloc) 19.2 KB
#!/usr/bin/env python
import glob
import logging
import os
import Queue
import re
import json
import struct
import threading
import couchstore
import couchbaseConstants
import pump
from collections import defaultdict
SFD_SCHEME = "couchstore-files://"
SFD_VBUCKETS = 1024
SFD_REV_META = ">QIIBB" # cas, exp, flg, flex_meta, dtype
SFD_REV_SEQ = ">Q"
SFD_DB_SEQ = ">Q"
SFD_RE = "^([0-9]+)\\.couch\\.([0-9]+)$"
# TODO: (1) SFDSource - total_msgs.
# TODO: (1) SFDSink - ensure right user for bucket_dir.
# TODO: (1) SFDSink - ensure right user for couchstore file.
class SFDSource(pump.Source):
"""Reads couchstore files from a couchbase server data directory."""
def __init__(self, opts, spec, source_bucket, source_node,
source_map, sink_map, ctl, cur):
super(SFDSource, self).__init__(opts, spec, source_bucket, source_node,
source_map, sink_map, ctl, cur)
self.done = False
self.queue = None
@staticmethod
def can_handle(opts, spec):
return spec.startswith(SFD_SCHEME)
@staticmethod
def check_base(opts, spec):
# Skip immediate superclass Source.check_base(),
# since SFDSource can handle different vbucket states.
return pump.EndPoint.check_base(opts, spec)
@staticmethod
def check(opts, spec):
rv, d = data_dir(spec)
if rv != 0:
return rv
buckets = []
for bucket_dir in sorted(glob.glob(d + "/*/")):
if not glob.glob(bucket_dir + "/*.couch.*"):
continue
bucket_name = os.path.basename(os.path.dirname(bucket_dir))
if not bucket_name:
return "error: bucket_name too short: " + bucket_dir, None
rv, v = SFDSource.vbucket_states(opts, spec, bucket_dir)
if rv != 0:
return rv, None
buckets.append({'name': bucket_name,
'nodes': [{'hostname': 'N/A',
'vbucket_states': v}]})
if not buckets:
return "error: no bucket subdirectories at: " + d, None
return 0, {'spec': spec, 'buckets': buckets}
@staticmethod
def vbucket_states(opts, spec, bucket_dir):
"""Reads all the latest couchstore files in a directory, and returns
map of state string (e.g., 'active') to map of vbucket_id to doc."""
vbucket_states = defaultdict(dict)
for f in latest_couch_files(bucket_dir):
vbucket_id = int(re.match(SFD_RE, os.path.basename(f)).group(1))
try:
store = couchstore.CouchStore(f, 'r')
try:
doc_str = store.localDocs['_local/vbstate']
if doc_str:
doc = json.loads(doc_str)
state = doc.get('state', None)
if state:
vbucket_states[state][vbucket_id] = doc
else:
return "error: missing vbucket_state from: %s" \
% (f), None
except Exception, e:
return ("error: could not read _local/vbstate from: %s" +
"; exception: %s") % (f, e), None
store.close()
except Exception, e:
return ("error: could not read couchstore file: %s" +
"; exception: %s") % (f, e), None
if vbucket_states:
return 0, vbucket_states
return "error: no vbucket_states in files: %s" % (bucket_dir), None
@staticmethod
def provide_design(opts, source_spec, source_bucket, source_map):
rv, d = data_dir(source_spec)
if rv != 0:
return rv, None
bucket_dir = d + '/' + source_bucket['name']
if not os.path.isdir(bucket_dir):
return 0, None
rv, store, store_path = \
open_latest_store(bucket_dir,
"master.couch.*",
"^(master)\\.couch\\.([0-9]+)$",
"master.couch.0",
mode='r')
if rv != 0 or not store:
return rv, None
rows = []
for doc_info in store.changesSince(0):
if not doc_info.deleted:
try:
doc_contents = doc_info.getContents(options=couchstore.CouchStore.DECOMPRESS)
except Exception, e:
return ("error: could not read design doc: %s" +
"; source_spec: %s; exception: %s") % \
(doc_info.id, source_spec, e), None
try:
doc = json.loads(doc_contents)
except ValueError, e:
return ("error: could not parse design doc: %s" +
"; source_spec: %s; exception: %s") % \
(doc_info.id, source_spec, e), None
doc['id'] = doc.get('id', doc_info.id)
doc['_rev'] = doc.get('_rev', doc_info.revSequence)
rows.append({'id': doc_info.id, 'doc': doc})
store.close()
return 0, json.dumps(rows)
def provide_batch(self):
if self.done:
return 0, None
if not self.queue:
name = "c" + threading.currentThread().getName()[1:]
self.queue = Queue.Queue(2)
self.thread = threading.Thread(target=self.loader, name=name)
self.thread.daemon = True
self.thread.start()
rv, batch = self.queue.get()
self.queue.task_done()
if rv != 0 or batch is None:
self.done = True
return rv, batch
def loader(self):
rv, d = data_dir(self.spec)
if rv != 0:
self.queue.put((rv, None))
return
source_vbucket_state = \
getattr(self.opts, 'source_vbucket_state', 'active')
source_nodes = self.source_bucket['nodes']
if len(source_nodes) != 1:
self.queue.put(("error: expected 1 node in source_bucket: %s"
% (self.source_bucket['name']), None))
return
vbucket_states = source_nodes[0].get('vbucket_states', None)
if not vbucket_states:
self.queue.put(("error: missing vbucket_states in source_bucket: %s"
% (self.source_bucket['name']), None))
return
vbuckets = vbucket_states.get(source_vbucket_state, None)
if vbuckets is None: # Empty dict is valid.
self.queue.put(("error: missing vbuckets in source_bucket: %s"
% (self.source_bucket['name']), None))
return
batch_max_size = self.opts.extra['batch_max_size']
batch_max_bytes = self.opts.extra['batch_max_bytes']
store = None
vbucket_id = None
# Level of indirection since we can't use python 3 nonlocal statement.
abatch = [pump.Batch(self)]
def change_callback(doc_info):
if doc_info:
key = doc_info.id
if self.skip(key, vbucket_id):
return
if doc_info.deleted:
cmd = couchbaseConstants.CMD_TAP_DELETE
val = ''
else:
cmd = couchbaseConstants.CMD_TAP_MUTATION
val = doc_info.getContents(options=couchstore.CouchStore.DECOMPRESS)
try:
cas, exp, flg, flex_meta, dtype = struct.unpack(SFD_REV_META, doc_info.revMeta)
meta = doc_info.revSequence
seqno = doc_info.sequence
nmeta = 0
msg = (cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, dtype, nmeta, 0)
abatch[0].append(msg, len(val))
except Exception, e:
self.queue.put(("error: could not read couchstore file due to unsupported file format version;"
" exception: %s"% e, None))
return
if (abatch[0].size() >= batch_max_size or
abatch[0].bytes >= batch_max_bytes):
self.queue.put((0, abatch[0]))
abatch[0] = pump.Batch(self)
for f in latest_couch_files(d + '/' + self.source_bucket['name']):
vbucket_id = int(re.match(SFD_RE, os.path.basename(f)).group(1))
if not vbucket_id in vbuckets:
continue
try:
store = couchstore.CouchStore(f, 'r')
store.forEachChange(0, change_callback)
store.close()
except Exception, e:
#MB-12270: Some files may be deleted due to compaction. We can
#safely ingore them and move to next file.
pass
if abatch[0].size():
self.queue.put((0, abatch[0]))
self.queue.put((0, None))
class SFDSink(pump.Sink):
"""Sink for couchstore in couchbase server/file/directory layout."""
def __init__(self, opts, spec, source_bucket, source_node,
source_map, sink_map, ctl, cur):
super(SFDSink, self).__init__(opts, spec, source_bucket, source_node,
source_map, sink_map, ctl, cur)
self.rehash = opts.extra.get("rehash", 0)
self.init_worker(SFDSink.run)
@staticmethod
def run(self):
destination_vbucket_state = \
getattr(self.opts, 'destination_vbucket_state', 'active')
vbucket_states = self.source_node.get('vbucket_states', {})
while not self.ctl['stop']:
batch, future = self.pull_next_batch()
if not batch:
return self.future_done(future, 0)
vbuckets = batch.group_by_vbucket_id(SFD_VBUCKETS, self.rehash)
for vbucket_id, msgs in vbuckets.iteritems():
checkpoint_id = 0
max_deleted_seqno = 0
rv, store, store_path = self.open_store(vbucket_id)
if rv != 0:
return self.future_done(future, rv)
bulk_keys = []
bulk_vals = []
for i, msg in enumerate(msgs):
cmd, _vbucket_id, key, flg, exp, cas, meta, val, seqno, dtype, nmeta, conf_res = msg
if self.skip(key, vbucket_id):
continue
d = couchstore.DocumentInfo(str(key))
flex_meta = 1
d.revMeta = str(struct.pack(SFD_REV_META, cas, exp, flg, flex_meta, dtype))
if meta:
if len(meta) > 8:
meta = meta[0:8]
if len(meta) < 8:
meta = ('\x00\x00\x00\x00\x00\x00\x00\x00' + meta)[-8:]
d.revSequence, = struct.unpack(SFD_REV_SEQ, meta)
else:
d.revSequence = 1
if seqno:
d.sequence = int(seqno)
if cmd == couchbaseConstants.CMD_TAP_MUTATION:
v = str(val)
try:
if (re.match('^\\s*{', v) and
json.loads(v) is not None):
d.contentType = couchstore.DocumentInfo.IS_JSON
except ValueError:
pass # NON_JSON is already the default contentType.
elif cmd == couchbaseConstants.CMD_TAP_DELETE:
v = None
else:
self.future_done(future,
"error: SFDSink bad cmd: " + str(cmd))
store.close()
return
bulk_keys.append(d)
bulk_vals.append(v)
try:
if bulk_keys and bulk_vals:
vm = vbucket_states.get(destination_vbucket_state, None)
if vm:
vi = vm.get(vbucket_id, None)
if vi:
c = int(vi.get("checkpoint_id", checkpoint_id))
checkpoint_id = max(checkpoint_id, c)
m = int(vi.get("max_deleted_seqno", max_deleted_seqno))
max_deleted_seqno = max(max_deleted_seqno, m)
rv = self.save_vbucket_state(store, vbucket_id,
destination_vbucket_state,
checkpoint_id,
max_deleted_seqno)
if rv != 0:
self.future_done(future, rv)
store.close()
return
store.saveMultiple(bulk_keys, bulk_vals,
options=couchstore.CouchStore.COMPRESS)
store.commit()
store.close()
except Exception, e:
self.future_done(future,
"error: could not save couchstore data"
"; vbucket_id: %s; store_path: %s"
"; exception: %s"
% (vbucket_id, store_path, e))
return
self.future_done(future, 0) # No return to keep looping.
def save_vbucket_state(self, store, vbucket_id,
state, checkpoint_id, max_deleted_seqno):
doc = json.dumps({'state': state,
'checkpoint_id': str(checkpoint_id),
'max_deleted_seqno': str(max_deleted_seqno)})
try:
store.localDocs['_local/vbstate'] = doc
except Exception, e:
return "error: save_vbucket_state() failed: " + str(e)
return 0
@staticmethod
def can_handle(opts, spec):
return spec.startswith(SFD_SCHEME)
@staticmethod
def check_base(opts, spec):
if getattr(opts, "destination_operation", None) != None:
return ("error: --destination-operation" +
" is not supported by this destination: %s") % (spec)
# Skip immediate superclass Sink.check_base(),
# since SFDSink can handle different vbucket states.
return pump.EndPoint.check_base(opts, spec)
@staticmethod
def check(opts, spec, source_map):
# TODO: (2) SFDSink - check disk space.
rv, dir = data_dir(spec)
if rv != 0:
return rv
if not os.path.isdir(dir):
return "error: not a directory: " + dir, None
if not os.access(dir, os.W_OK):
return "error: directory is not writable: " + dir, None
return 0, None
@staticmethod
def consume_design(opts, sink_spec, sink_map,
source_bucket, source_map, source_design):
if not source_design:
return 0
try:
sd = json.loads(source_design)
except ValueError, e:
return "error: could not parse source_design: " + source_design
rv, d = data_dir(sink_spec)
if rv != 0:
return rv
bucket_dir = d + '/' + source_bucket['name']
if not os.path.isdir(bucket_dir):
os.mkdir(bucket_dir)
rv, store, store_path = \
open_latest_store(bucket_dir,
"master.couch.*",
"^(master)\\.couch\\.([0-9]+)$",
"master.couch.1")
if rv != 0:
return rv
bulk_keys = []
bulk_vals = []
if sd:
for row in sd['rows']:
logging.debug("design_doc row: " + str(row))
d = couchstore.DocumentInfo(str(row['id']))
if '_rev' in row['doc']:
d.revMeta = str(row['doc']['_rev'])
del row['doc']['_rev']
d.contentType = couchstore.DocumentInfo.IS_JSON
bulk_keys.append(d)
bulk_vals.append(json.dumps(row['doc']))
if bulk_keys and bulk_vals:
store.saveMultiple(bulk_keys, bulk_vals) # TODO: Compress ddocs?
store.commit()
store.close()
return 0
def consume_batch_async(self, batch):
return self.push_next_batch(batch, pump.SinkBatchFuture(self, batch))
def open_store(self, vbucket_id):
# data_dir => /opt/couchbase/var/lib/couchbase/data/
# bucket_dir => default/
# store_path => VBUCKET_ID.couch.COMPACTION_NUM
if vbucket_id >= SFD_VBUCKETS:
return "error: vbucket_id too large: %s" % (vbucket_id), None, None
rv, bucket_dir = self.find_bucket_dir()
if rv != 0:
return rv, None, None
return open_latest_store(bucket_dir, "%s.couch.*" % (vbucket_id), SFD_RE,
str(vbucket_id) + ".couch.1", mode='c')
def find_bucket_dir(self):
rv, d = data_dir(self.spec)
if rv != 0:
return rv, None
bucket_dir = d + '/' + self.source_bucket['name']
if not os.path.isdir(bucket_dir):
try:
os.mkdir(bucket_dir)
except OSError, e:
return ("error: could not create bucket_dir: %s; exception: %s"
% (bucket_dir, e)), None
return 0, bucket_dir
def open_latest_store(bucket_dir, glob_pattern, filter_re, default_name, mode='c'):
store_paths = latest_couch_files(bucket_dir,
glob_pattern=glob_pattern,
filter_re=filter_re)
if not store_paths:
if mode == 'r':
return 0, None, None
store_paths = [bucket_dir + '/' + default_name]
if len(store_paths) != 1:
return ("error: no single, latest couchstore file: %s" +
"; found: %s") % (glob_pattern, store_paths), None, None
try:
return 0, couchstore.CouchStore(str(store_paths[0]), mode), store_paths[0]
except Exception, e:
return ("error: could not open couchstore file: %s" +
"; exception: %s") % (store_paths[0], e), None, None
def latest_couch_files(bucket_dir, glob_pattern='*.couch.*', filter_re=SFD_RE):
"""Given directory of *.couch.VER files, returns files with largest VER suffixes."""
files = glob.glob(bucket_dir + '/' + glob_pattern)
files = [f for f in files if re.match(filter_re, os.path.basename(f))]
matches = [(re.match(filter_re, os.path.basename(f)), f) for f in files]
latest = {}
for match, file in matches:
top, _ = latest.get(match.group(1), (-1, None))
cur = int(match.group(2))
if cur > top:
latest[match.group(1)] = (cur, file)
return sorted([file for top, file in latest.values()])
def data_dir(spec):
if not spec.startswith(SFD_SCHEME):
return "error: wrong scheme in spec: " + spec, None
dir = spec[len(SFD_SCHEME):]
if dir:
return 0, os.path.normpath(dir)
else:
return "error: missing dir in spec: " + spec, None