Skip to content

Commit

Permalink
made changes to run pycouchbase in multiple threads and in non-verbos…
Browse files Browse the repository at this point in the history
…e mode by default

sample usage :
./pycouchtest.py -n 192.168.1.1:8091 -i 1000 -b default -p password --threads 10

it will print out the average stats per thread:

Thread 0 - average set time 0.0233402225048 seconds , min 0.00143504142761 seconds , max 2.25945711136 seconds
stats
Thread 1 - average set time 0.0240276454936 seconds , min 0.00137400627136 seconds , max 2.26322698593 seconds

Change-Id: I6f32b96ff524acac8f4a5938e6bd606cf1b1a5bd
  • Loading branch information
farshidce committed Jul 26, 2011
1 parent 9f7e0dd commit 3d206f3
Showing 1 changed file with 126 additions and 31 deletions.
157 changes: 126 additions & 31 deletions pymembase/pycouchtest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,107 @@
#!/usr/bin/python
import random
from threading import Thread
import uuid
import time
from membaseclient import VBucketAwareMembaseClient
from optparse import OptionParser
from util import ProgressBar, StringUtil
import sys

class SharedProgressBar(object):
def __init__(self, number_of_items):
self.bar = ProgressBar(0, number_of_items, 77)
self.number_of_items = number_of_items
self.counter = 0
self.old_bar_string = ""

def update(self):
self.counter += 1
if self.old_bar_string != str(self.bar):
sys.stdout.write(str(self.bar) + '\r')
sys.stdout.flush()
self.old_bar_string = str(self.bar)
self.bar.updateAmount(self.counter)

def flush(self):
self.bar.updateAmount(self.number_of_items)
sys.stdout.write(str(self.bar) + '\r')
sys.stdout.flush()
old_bar_string = str(self.bar)


class SmartLoader(object):
def __init__(self, options, server, sharedProgressBar,thread_id):
self._options = options
self._server = server
self._thread = None
self.shut_down = False
self._stats = {"total_time": 0, "max": 0, "min": 1 * 1000 * 1000, "samples": 0}
self._bar = sharedProgressBar
self._thread_id = thread_id

def start(self):
self._thread = Thread(target=self._run)
self._thread.start()

def _run(self):
v = None
try:
options = self._options
v = VBucketAwareMembaseClient(self._server, options.bucket, options.verbose)
number_of_items = (int(options.items) / int(options.num_of_threads))
value = StringUtil.create_value("*", int(options.value_size))
for i in range(0, number_of_items):
if self.shut_down:
break
key = "{0}-{1}".format(options.key_prefix, str(uuid.uuid4())[:5])
if options.load_json:
document = "\"name\":\"pymc-{0}\"".format(key, key)
document = document + ",\"age\":{0}".format(random.randint(0, 1000))
document = "{" + document + "}"
self._profile_before()
v.set(key, 0, 0, document)
self._profile_after()
else:
self._profile_before()
v.set(key, 0, 0, value)
self._profile_after()
self._bar.update()
v.done()
v = None
except BaseException as err:
print err
print ""
if v:
v.done()

def print_stats(self):
print "stats"
msg = "Thread {0} - average set time {1} seconds , min {2} seconds , max {3} seconds"
print msg.format(self._thread_id, self._stats["total_time"] / self._stats["samples"],
self._stats["min"], self._stats["max"])

def wait(self):
self._thread.join()

def stop(self):
self.shut_down = True
if v:
v.done()

def _profile_before(self):
self.start = time.time()

def _profile_after(self):
self.end = time.time()
diff = self.end - self.start
self._stats["samples"] += 1
self._stats["total_time"] = self._stats["total_time"] + diff
if self._stats["min"] > diff:
self._stats["min"] = diff
if self._stats["max"] < diff:
self._stats["max"] = diff

if __name__ == "__main__":
parser = OptionParser()
parser.add_option("-n", "--node", dest="node",
Expand All @@ -18,20 +114,31 @@
default="default", metavar="default")
parser.add_option("-j", "--json", dest="load_json", help="insert json data",
default=False, metavar="True")

parser.add_option("-v", "--verbose", dest="verbose", help="run in verbose mode",
default=False, metavar="False")

parser.add_option("-i", "--items", dest="items", help="number of items to be inserted",
default=100, metavar="100")

parser.add_option("--size", dest="value_size", help="value size,default is 256 byte",
default=512, metavar="100")

parser.add_option("--threads", dest="num_of_threads", help="number of threads to run load",
default=1, metavar="100")

parser.add_option("--prefix", dest="key_prefix",
help="prefix to use for memcached keys and json _ids,default is uuid string",
default=str(uuid.uuid4())[:5], metavar="pymc")

options, args = parser.parse_args()

node = options.node
#if port is not given use :8091

if not node:
parser.print_help()
exit()
#if port is not given use :8091
if node.find(":") == -1:
hostname = node
port = 8091
Expand All @@ -40,41 +147,29 @@
port = node[node.find(":") + 1:]
server = {"ip": hostname,
"port": port,
"rest_username": options.username,
"rest_password": options.password,
"username": options.username,
"password": options.password}
print server
v = None
workers = []
try:
v = VBucketAwareMembaseClient(server, options.bucket)
no_threads = options.num_of_threads
number_of_items = int(options.items)
bar = ProgressBar(0, number_of_items, 77)
old_bar_string = ""
value = StringUtil.create_value("*", options.value_size)
for i in range(0, number_of_items):
key = "{0}-{1}".format(options.key_prefix, str(uuid.uuid4())[:5])
if options.load_json:
document = "\"name\":\"pymc-{0}\"".format(key, key)
document = document + ",\"age\":{0}".format(random.randint(0, 1000))
document = "{" + document + "}"
a, b, c = v.set(key, 0, 0, document)
else:
a, b, c = v.set(key, 0, 0, value)
a, b, c = v.get(key)

bar.updateAmount(i)
if old_bar_string != str(bar):
sys.stdout.write(str(bar) + '\r')
sys.stdout.flush()
old_bar_string = str(bar)

bar.updateAmount(number_of_items)
sys.stdout.write(str(bar) + '\r')
sys.stdout.flush()
v.done()
sharedProgressBar = SharedProgressBar(number_of_items)
for i in range(0, int(no_threads)):
worker = SmartLoader(options, server, sharedProgressBar, i)
worker.start()
workers.append(worker)
for worker in workers:
worker.wait()
sharedProgressBar.flush()
for worker in workers:
worker.print_stats()
except Exception as ex:
print ex
print ""
for worker in workers:
worker.stop()
except:
print ""
if v:
v.done()
for worker in workers:
worker.stop()

0 comments on commit 3d206f3

Please sign in to comment.