Permalink
Browse files

CBD-87: try set-with-meta during restore

For memcached/couchbase destinations, if the source was providing meta
values, first try using SET-WITH-META/ADD-WITH-META/DELETE-WITH-META
commands.  If the destination replies with unknown-command errors, try
instead using regular SET/ADD/DELETE commands.

Future unit tests can force the required behavior with the try-xwm
(XXX-WITH-META) extra -x option.

Change-Id: Ib76f1e857cbed5d9d81f48f8026df21bfeb295c6
Reviewed-on: http://review.couchbase.org/18263
Reviewed-by: Ronnie Sun <ronnie@couchbase.com>
Tested-by: Steve Yen <steve.yen@gmail.com>
  • Loading branch information...
1 parent 876a3d3 commit 50c77a188e3aaa95ff695a1d06883fd692b5e25a @steveyen steveyen committed Jul 11, 2012
Showing with 52 additions and 24 deletions.
  1. +3 −0 memcacheConstants.py
  2. +0 −4 pump_cb.py
  3. +47 −16 pump_mc.py
  4. +2 −1 pump_transfer.py
  5. +0 −3 t/pump_test.py
View
@@ -66,6 +66,9 @@
CMD_ADD_WITH_META = 0xa4
CMD_ADDQ_WITH_META = 0xa5
+CMD_DELETE_WITH_META = 0xa8
+CMD_DELETEQ_WITH_META = 0xa9
+
# Replication
CMD_TAP_CONNECT = 0x40
CMD_TAP_MUTATION = 0x41
View
@@ -51,10 +51,6 @@ def scatter_gather(self, mconns, batch):
return 0, retry_batch
- def translate_cas(self, cas):
- # TODO: (1) CBSink - use cas with SET-WITH-META.
- return 0
-
@staticmethod
def can_handle(opts, spec):
return (spec.startswith("http://") or
View
@@ -11,6 +11,20 @@
import memcacheConstants
import pump
+OP_MAP = {
+ 'get': memcacheConstants.CMD_GET,
+ 'set': memcacheConstants.CMD_SET,
+ 'add': memcacheConstants.CMD_ADD,
+ 'delete': memcacheConstants.CMD_DELETE,
+ }
+
+OP_MAP_WITH_META = {
+ 'get': memcacheConstants.CMD_GET,
+ 'set': memcacheConstants.CMD_SET_WITH_META,
+ 'add': memcacheConstants.CMD_ADD_WITH_META,
+ 'delete': memcacheConstants.CMD_DELETE_WITH_META
+ }
+
class MCSink(pump.Sink):
"""Dumb client sink using binary memcached protocol.
Used when moxi or memcached is destination."""
@@ -19,6 +33,11 @@ def __init__(self, opts, spec, source_bucket, source_node,
source_map, sink_map, ctl, cur):
super(MCSink, self).__init__(opts, spec, source_bucket, source_node,
source_map, sink_map, ctl, cur)
+
+ self.op_map = OP_MAP
+ if opts.extra.get("try_xwm", 1):
+ self.op_map = OP_MAP_WITH_META
+
self.init_worker(MCSink.run)
def close(self):
@@ -110,7 +129,7 @@ def send_msgs(self, conn, msgs, operation, vbucket_id=None):
if self.skip(key, vbucket_id_msg):
continue
- rv, cmd = self.translate_cmd(cmd, operation)
+ rv, cmd = self.translate_cmd(cmd, operation, meta)
if rv != 0:
return rv
@@ -121,7 +140,7 @@ def send_msgs(self, conn, msgs, operation, vbucket_id=None):
rv, req = self.cmd_request(cmd, vbucket_id_msg, key, val,
ctypes.c_uint32(flg).value,
- exp, self.translate_cas(cas), meta, i)
+ exp, cas, meta, i)
if rv != 0:
return rv
@@ -172,6 +191,16 @@ def recv_msgs(self, conn, msgs, vbucket_id=None):
% (vbucket_id_msg, key, self.spec,
conn.host, conn.port))
return msg, None
+ elif r_status == mmecacheConstants.ERR_UNKNOWN_COMMAND:
+ if self.op_map == OP_MAP:
+ if not retry:
+ return "error: unknown command: %s" % (r_cmd), None
+ else:
+ if not retry:
+ logging.warn("destination does not take XXX-WITH-META"
+ " commands; will use META-less commands")
+ self.op_map = OP_MAP
+ retry = True
else:
return "error: MCSink MC error: " + str(r_status), None
@@ -181,26 +210,24 @@ def recv_msgs(self, conn, msgs, vbucket_id=None):
return 0, retry
- def translate_cmd(self, cmd, op):
+ def translate_cmd(self, cmd, op, meta):
+ if len(str(meta)) <= 0:
+ # The source gave no meta, so use regular commands.
+ self.op_map = OP_MAP
+
if cmd == memcacheConstants.CMD_TAP_MUTATION:
- if op == 'set':
- return 0, memcacheConstants.CMD_SET
- if op == 'add':
- return 0, memcacheConstants.CMD_ADD
- if op == 'get':
- return 0, memcacheConstants.CMD_GET
+ m = self.op_map.get(op, None)
+ if m:
+ return 0, m
return "error: MCSink.translate_cmd, unsupported op: " + op, None
if cmd == memcacheConstants.CMD_TAP_DELETE:
if op == 'get':
return 0, memcacheConstants.CMD_NOOP
- return 0, memcacheConstants.CMD_DELETE
+ return 0, self.op_map['delete']
return "error: MCSink - unknown cmd: %s, op: %s" % (cmd, op), None
- def translate_cas(self, cas):
- return 0 # Cannot force CAS using memcached SET/ADD commands.
-
def append_req(self, m, req):
hdr, ext, key, val = req
m.append(hdr)
@@ -266,8 +293,12 @@ def connect_mc(host, port, user, pswd):
return 0, mc
def cmd_request(self, cmd, vbucket_id, key, val, flg, exp, cas, meta, opaque):
- if (cmd == memcacheConstants.CMD_SET or
- cmd == memcacheConstants.CMD_ADD):
+ if (cmd == memcacheConstants.CMD_SET_WITH_META or
+ cmd == memcacheConstants.CMD_ADD_WITH_META or
+ cmd == memcacheConstants.CMD_DELETE_WITH_META):
+ ext = struct.pack(">IIQ", flg, exp, cas) + str(meta)
+ elif (cmd == memcacheConstants.CMD_SET or
+ cmd == memcacheConstants.CMD_ADD):
ext = struct.pack(memcacheConstants.SET_PKT_FMT, flg, exp)
elif (cmd == memcacheConstants.CMD_DELETE or
cmd == memcacheConstants.CMD_GET or
@@ -276,7 +307,7 @@ def cmd_request(self, cmd, vbucket_id, key, val, flg, exp, cas, meta, opaque):
else:
return "error: MCSink - unknown cmd for request: " + str(cmd), None
- hdr = self.cmd_header(cmd, vbucket_id, key, val, ext, cas, opaque)
+ hdr = self.cmd_header(cmd, vbucket_id, key, val, ext, 0, opaque)
return 0, (hdr, ext, key, val)
def cmd_header(self, cmd, vbucket_id, key, val, ext, cas, opaque,
View
@@ -156,7 +156,8 @@ def opt_extra_defaults(self):
"max_retry": (10, "max # of sequential retries"),
"report": (5, "# batches before updating progress bar"),
"report_full": (2000, "# batches before emitting progress info"),
- "recv_min_bytes": (4096, "amount of bytes for every recv() call")
+ "recv_min_bytes": (4096, "amount of bytes for every recv() call"),
+ "try_xwm": (1, "1 to first try XXX-WITH-META commands")
}
def find_handlers(self, opts, source, sink):
View
@@ -1695,10 +1695,7 @@ def gen_backup(self,
def flatten_msgs_per_node(self, msgs_per_node):
flattened = sum(msgs_per_node, [])
- # Zero out the CAS value, since we currently use SET/ADD.
- # TODO: (1) revisit CAS once we use SET_WITH_META/ADD_WITH_META.
arr = []
-
for msg in flattened:
cmd_tap, vbucket_id, key, val, flg, exp, cas, meta = msg
arr.append((cmd_tap, vbucket_id, key, val, flg, exp, 0, meta))

0 comments on commit 50c77a1

Please sign in to comment.