Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial couch-spill test command.

The ops/second are not believable, since responses aren't counted
correctly.
  • Loading branch information...
commit d0385392a7a15dc89102e405b7734ff8708b70e7 1 parent 0666b93
@steveyen steveyen authored
Showing with 126 additions and 4 deletions.
  1. +117 −0 couch-spill.py
  2. +9 −4 proxy.py
View
117 couch-spill.py
@@ -0,0 +1,117 @@
+#!/usr/bin/env python
+
+import sys
+import copy
+import math
+import time
+import socket
+import threading
+import mcsoda
+
+class Reader(threading.Thread):
+ def __init__(self, store, src):
+ self.store = store
+ self.src = src
+ threading.Thread.__init__(self)
+
+ def run(self):
+ print "reading"
+ try:
+ while True:
+ data = self.src.recv(4096)
+ print "reading", data
+ if not data:
+ break
+ print(data)
+ except:
+ pass
+
+ try:
+ self.src.shutdown(socket.SHUT_RD)
+ except:
+ pass
+
+
+# Stream some mcsoda onto a couch for performance testing.
+#
+class StoreCouch(mcsoda.Store):
+
+ def connect(self, target, user, pswd, cfg, cur):
+ self.cfg = cfg
+ self.cur = cur
+ self.target = target
+ self.host_port = (target + ":5984").split(':')[0:2]
+ self.host_port[1] = int(self.host_port[1])
+ self.queue = []
+ self.ops = 0
+ self.seq = 1
+ self.skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.skt.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ self.skt.connect(tuple(self.host_port))
+ self.skt_reader = Reader(self, self.skt)
+ self.skt_reader.daemon = True
+ self.skt_reader.start()
+
+ def gen_doc(self, key_num, key_str, min_value_size, json=True, cache=None):
+ # Always json and never cache.
+ #
+ suffix_ex = '"_rev":"%s-0286dbb6323b61e7f0be3ba5d1633985",' % (self.seq,)
+
+ # suffix_ex += '"_revisions":{"start":1,"ids":["0286dbb6323b61e7f0be3ba5d1633985"]},'
+
+ self.seq = self.seq + 1
+
+ return mcsoda.gen_doc_string(key_num, key_str, min_value_size,
+ self.cfg['suffix'][min_value_size],
+ True, cache=None, key_name="_id",
+ suffix_ex=suffix_ex,
+ whitespace=False)
+
+ def command(self, c):
+ self.queue.append(c)
+ if len(self.queue) > (self.cur.get('batch') or \
+ self.cfg.get('batch', 100)):
+ self.flush()
+ return True
+ return False
+
+ def flush(self):
+ h = "POST /default/_bulk_docs HTTP/1.1\r\n" \
+ "X-Couch-Full-Commit: false\r\n" \
+ "Content-Type: application/json\r\n" \
+ "Accept: application/json\r\n" \
+ "Host: %s:%s\r\n" % (self.host_port[0], self.host_port[1])
+ n = 0
+ a = [ '{"new_edits":false,"docs":[' ]
+ for c in self.queue:
+ cmd, key_num, key_str, data, expiration = c
+ buf = self.command_send(cmd, key_num, key_str, data, expiration)
+ if buf:
+ if n > 0:
+ a.append(',')
+ a.append(buf)
+ n = n + 1
+ a.append("]}")
+
+ if n > 0:
+ body = ''.join(a)
+ full = h + "Content-Length: " + str(len(body)) + "\r\n\r\n" + body
+ self.skt.send(full)
+
+ self.ops += len(self.queue)
+ self.queue = []
+
+ def command_send(self, cmd, key_num, key_str, data, expiration):
+ return data
+
+ def command_recv(self, cmd, key_num, key_str, data, expiration):
+ pass
+
+
+if __name__ == "__main__":
+ if sys.argv[1].find("http") != 0:
+ raise Exception("usage: %s http://HOST:5984 ..." % (sys.argv[0],))
+
+ argv = (' '.join(sys.argv) + ' doc-gen=0').split(' ')
+
+ mcsoda.main(argv, protocol="http", stores=[StoreCouch()])
View
13 proxy.py
@@ -16,20 +16,23 @@
DEST = ("www.google.com", 80)
class Pump(threading.Thread):
- def __init__(self, tag, src, dst, sub=[], end=False):
+ def __init__(self, tag, src, dst, sub=[], end=False, capture=True):
self.tag = tag
self.logj = "\n" + tag + ": "
self.src = src
self.dst = dst
self.sub = sub
self.end = end
+ self.capture = capture
threading.Thread.__init__(self)
def log(self, msg):
print self.tag, self.logj.join(msg.split("\n"))
def run(self):
- f = open("out/" + self.tag + ".out", 'w')
+ f = None
+ if self.capture:
+ f = open("out/" + self.tag + ".out", 'w')
try:
while True:
@@ -39,7 +42,8 @@ def run(self):
for patt, repl in self.sub:
data = re.sub(patt, repl, data)
self.log(data)
- f.write(data)
+ if f:
+ f.write(data)
self.dst.send(data)
except:
pass
@@ -52,7 +56,8 @@ def run(self):
except:
pass
- f.close()
+ if f:
+ f.close()
def run(port, dest):
sub = [("Host: (127\\.0\\.0\\.1|localhost):%s" % (port,),
Please sign in to comment.
Something went wrong with that request. Please try again.