Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

CBD-87: added gen workload source

Change-Id: I0a40531cdb2b5077319adec82f884f07e1762043
Reviewed-on: http://review.couchbase.org/19010
Reviewed-by: Pavel Paulau <pavel.paulau@gmail.com>
Tested-by: Steve Yen <steve.yen@gmail.com>
  • Loading branch information...
commit 58c09c3e503144aa5fb26e48aafce462f3a65187 1 parent f1bccb9
@steveyen steveyen authored
Showing with 132 additions and 1 deletion.
  1. +1 −0  Makefile.am
  2. +128 −0 pump_gen.py
  3. +2 −0  pump_transfer.py
  4. +1 −1  t/pump_test.py
View
1  Makefile.am
@@ -20,6 +20,7 @@ nobase_pythonlib_DATA= \
pump_bson.py \
pump_cb.py \
pump_csv.py \
+ pump_gen.py \
pump_mbf.py \
pump_mc.py \
pump_sfd.py \
View
128 pump_gen.py
@@ -0,0 +1,128 @@
+#!/usr/bin/env python
+
+import memcacheConstants
+import pump
+
+
+class GenSource(pump.Source):
+ """Generates simple SET/GET workload, useful for basic testing.
+ Examples:
+ ./cbtransfer gen: http://10.3.121.192:8091
+ ./cbtransfer gen:max-items=50000,min-value-size=10,exit-after-creates=1,\
+ prefix=steve1-,ratio-sets=1.0 \
+ http://10.3.121.192:8091 -B my-other-bucket --threads=10
+ """
+ def __init__(self, opts, spec, source_bucket, source_node,
+ source_map, sink_map, ctl, cur):
+ super(GenSource, self).__init__(opts, spec, source_bucket, source_node,
+ source_map, sink_map, ctl, cur)
+ self.done = False
+ self.body = None
+ self.cur_ops = source_map['cfg']['cur-ops']
+ self.cur_gets = source_map['cfg']['cur-gets']
+ self.cur_sets = source_map['cfg']['cur-sets']
+ self.cur_items = source_map['cfg']['cur-items']
+
+ @staticmethod
+ def can_handle(opts, spec):
+ """The gen spec follows gen:[key=value[,key=value]] format."""
+ return spec.startswith("gen:")
+
+ @staticmethod
+ def check(opts, spec):
+ rv, cfg = GenSource.parse_spec(opts, spec)
+ if rv != 0:
+ return rv, None
+ return 0, {'cfg': cfg,
+ 'spec': spec,
+ 'buckets': [{'name': 'default',
+ 'nodes': [{'hostname': 'N/A-' + str(i)}
+ for i in range(opts.threads)]}]}
+
+ @staticmethod
+ def parse_spec(opts, spec):
+ """Parse the comma-separated key=value configuration from the gen spec.
+ Names and semantics were inspired from subset of mcsoda parameters."""
+ cfg = {'cur-ops': 0,
+ 'cur-gets': 0,
+ 'cur-sets': 0,
+ 'cur-items': 0,
+ 'exit-after-creates': 0,
+ 'max-items': 10000,
+ 'min-value-size': 10,
+ 'prefix': "",
+ 'ratio-sets': 0.05}
+ for kv in spec[len("gen:"):].split(','):
+ if kv:
+ k = kv.split('=')[0].strip()
+ v = kv.split('=')[1].strip()
+ try:
+ if k in cfg:
+ cfg[k] = type(cfg[k])(v)
+ else:
+ return "error: unknown workload gen parameter: %s" % (k), None
+ except ValueError:
+ return "error: could not parse value from: %s" % (kv), None
+ return 0, cfg
+
+ @staticmethod
+ def provide_design(opts, source_spec, source_bucket, source_map):
+ """No design from a GenSource."""
+ return 0, None
+
+ def provide_batch(self):
+ """Provides a batch of messages, with GET/SET ratios and keys
+ controlled by a mcsoda-inspired approach, but simpler."""
+ if self.done:
+ return 0, None
+
+ cfg = self.source_map['cfg']
+ prefix = cfg['prefix']
+ max_items = cfg['max-items']
+ ratio_sets = cfg['ratio-sets']
+ exit_after_creates = cfg['exit-after-creates']
+ if not self.body:
+ self.body = "0" * cfg['min-value-size']
+
+ batch = pump.Batch(self)
+
+ batch_max_size = self.opts.extra['batch_max_size']
+ batch_max_bytes = self.opts.extra['batch_max_bytes']
+
+ vbucket_id = 0x0000ffff
+ cas, exp, flg = 0, 0, 0
+
+ while (batch.size() < batch_max_size and
+ batch.bytes < batch_max_bytes):
+ if ratio_sets >= float(self.cur_sets) / float(self.cur_ops or 1):
+ self.cur_sets = self.cur_sets + 1
+ cmd = memcacheConstants.CMD_TAP_MUTATION
+ if self.cur_items < max_items:
+ key = self.cur_items
+ self.cur_items = self.cur_items + 1
+ else:
+ key = self.cur_sets % self.cur_items
+ else:
+ self.cur_gets = self.cur_gets + 1
+ cmd = memcacheConstants.CMD_GET
+ key = self.cur_gets % self.cur_items
+ self.cur_ops = self.cur_ops + 1
+
+ msg = (cmd, vbucket_id, prefix + str(key), flg, exp, cas, '', self.body)
+ batch.append(msg, len(self.body))
+
+ if exit_after_creates and self.cur_items >= max_items:
+ self.done = True
+ return 0, batch
+
+ if batch.size() <= 0:
+ return 0, None
+ return 0, batch
+
+ @staticmethod
+ def total_msgs(opts, source_bucket, source_node, source_map):
+ """Returns max-items only if exit-after-creates was specified.
+ Else, total msgs is unknown as GenSource does not stop generating."""
+ if source_map['cfg']['exit-after-creates']:
+ return 0, source_map['cfg']['max-items']
+ return 0, None
View
2  pump_transfer.py
@@ -20,6 +20,7 @@
import pump_bfd
import pump_csv
import pump_cb
+import pump_gen
import pump_mbf
import pump_mc
import pump_tap
@@ -291,6 +292,7 @@ def opt_extra_help(extra_defaults):
SOURCES = [pump_bfd.BFDSource,
pump_csv.CSVSource,
+ pump_gen.GenSource,
pump_mbf.MBFSource,
pump_tap.TAPDumpSource,
pump.StdInSource]
View
2  t/pump_test.py
@@ -316,7 +316,7 @@ def test_find_handlers(self):
extra_sources = extra_sources + 1
except ImportError:
pass
- self.assertEqual(5 + extra_sources, len(pump_transfer.SOURCES))
+ self.assertEqual(6 + extra_sources, len(pump_transfer.SOURCES))
self.assertTrue(5 + extra_sinks, len(pump_transfer.SINKS))
self.assertEqual(pump_tap.TAPDumpSource,
Please sign in to comment.
Something went wrong with that request. Please try again.