Skip to content

Commit

Permalink
performance upgrade;multithreading support
Browse files Browse the repository at this point in the history
  • Loading branch information
alfons haffmans committed Oct 7, 2010
1 parent faa96b9 commit 3a66e1f
Show file tree
Hide file tree
Showing 11 changed files with 575 additions and 11 deletions.
15 changes: 9 additions & 6 deletions cl-mongo.asd
Expand Up @@ -7,12 +7,13 @@
(asdf:defsystem cl-mongo
:name "cl-mongo"
:author "Fons Haffmans; fons.haffmans@gmail.com"
:version "0.0.1"
:version "0.7"
:licence "MIT"
:description "lisp system to interact with mongo, a non-sql db"
:description "lisp system to interact with mongodb, a non-sql db"
:depends-on (:uuid
:babel
:documentation-template
:bordeaux-threads
;;:documentation-template
:lisp-unit
:parenscript
:usocket)
Expand All @@ -39,7 +40,9 @@
(:file "protocol")
(:file "mongo")
(:file "db")
(:file "doc")
(:file "mem")
(:file "do-query")
;;(:file "doc")
(:file "map-reduce")
(:file "shell")))
(:static-file "README.md")
Expand All @@ -48,9 +51,9 @@
(asdf:defsystem cl-mongo-test
:name "cl-mongo"
:author "Fons Haffmans; fons.haffmans@gmail.com"
:version "0.0.1"
:version "0.7"
:licence "MIT"
:description "tesing cl-mongo"
:description "testing cl-mongo"
:depends-on (:cl-mongo)
:serial t
:components
Expand Down
1 change: 1 addition & 0 deletions perf/README
@@ -0,0 +1 @@
This directory contains files and little experiments I did before finalizing do-query.lisp.
54 changes: 54 additions & 0 deletions perf/attic.lisp
@@ -0,0 +1,54 @@

(defun test-it ()
(let ((nix (do-reduce #'collect)))
nil))

(defun count-elem (l r)
(+ l 1))


(defun collect (l r)
(if (listp l)
(cons r l)
(list r l)))

;;with-input-from-string (stream "This is my input via stream.")
;; (read stream))
;;THIS
;;? (with-output-to-string (stream)
;; (princ "I'm writing to memory!" stream))
;;"I'm writing to memory!"

;; (funcall func (list 12))))
;; (format t "elem ~A ~%" (length elem))))
;; (length R))
;; (push

(defvar *HT* (make-hash-table :test 'equal :size 100))

(defun print-collect (l r)
(with-output-to-string (stream l)
(format stream "~A~%" r))
l)


(defun print-it (elem)
(with-output-to-string (stream)
(print-elem elem stream)))


(defun print-it2 (val)
(with-output-to-string (stream)
(format stream "hello [~A] ~%" val)
(format stream "hello 2 ~%") ))

(defun print-it3 (val str)
(with-output-to-string (stream str)
(format stream "{==>hello [~A] ~%" val)
(format stream "==>hello 2} ~%"))
str)


(setf *S* (make-array 0 :element-type 'character :adjustable t :fill-pointer 0))

(maphash (lambda (k v) (format t "~A -> ~A~%" k (print-it elem))) *HT*)
102 changes: 102 additions & 0 deletions perf/map-doc.lisp
@@ -0,0 +1,102 @@
(defun perf.mongo-reply (array)
(labels ((header (array)
(let ((lst ()))
(push (octet-to-int32.1 array 0) lst) ; length
(push (octet-to-int32.1 array 4) lst) ; request id
(push (octet-to-int32.1 array 8) lst) ; response to
(push (octet-to-int32.1 array 12) lst) ; opcode
(push (octet-to-int32.1 array 16) lst) ; response flag
(push (octet-to-int64.1 array 20) lst) ; cursor id
(push (octet-to-int32.1 array 28) lst) ; starting from
(push (octet-to-int32.1 array 32) lst) ; returned
(nreverse lst))))
(let ((head (header array)))
(values head (list (car head) 36 (nth 7 head) array)))))


(defun perf.db.find ( collection qm &key (query (bson-encode "query" (kv nil nil)) ) (mongo nil) (options 0) (skip 0) (limit 0) (selector nil) )
(let ((mongo (or mongo (mongo)))
(doccount 0)
(next-cursor-id 0))
(labels ((query ()
(mongo-message mongo (mongo-query
(full-collection-name mongo collection) query
:limit limit
:skip skip
:selector (bson-encode-container (expand-selector selector))
:options options)))
(get-more (cursor-id)
(mongo-message mongo (mongo-get-more
(full-collection-name mongo collection)
(int64-to-octet cursor-id) :limit limit)))
(decode-it (elem)
(bson-decode (nth 0 elem) (nth 1 elem) (nth 2 elem) (nth 3 elem))))
(multiple-value-bind (header docs) (perf.mongo-reply (query) )
(incf doccount (nth 7 header))
(setf next-cursor-id (nth 5 header))
(funcall qm (decode-it docs))
(loop
(multiple-value-bind (h1 d1) (perf.mongo-reply (get-more next-cursor-id))
(incf doccount (nth 7 h1))
(setf next-cursor-id (nth 5 h1))
(funcall qm (decode-it d1))
)
(when (zerop next-cursor-id) (return nil)))))
doccount))


(defun print-elem ( elem stream &key (nd nil) )
(labels ((br+ ()
(unless nd (format stream "~%{")))
(br- ()
(unless nd (format stream "~%}~%")))

(pp-oid ( value )
(format stream "~% \"_id\" -> objectid(")
(let* ((arr (id value))
(size (length arr)))
(dotimes (index size)
(let ((x (aref arr index)))
(if (< x 10)
(format stream "0~1X" x)
(format stream "~2X" x)))))
(format stream ")"))

(pp* ( value )
(cond ( (typep value 'document) (pp-doc value) )
( (typep value 'hash-table) (pp-ht value))
( (typep value 'cons) (pp-cons value))
( (typep value 'bson-oid) (pp-oid value))
( t (format stream " ~A" value))))

(pp-cons (lst)
(progn
(format stream "[")
(dolist (el lst)
(progn
(pp* el)
(format stream ",")))
(format stream "]")))

(pp-doc ( d )
(br+)
(unless (_local d) (pp* (_id d)))
(pp* (elements d) )
(br-) )

(pp-ht ( ht )
(with-hash-table-iterator (iterator ht)
(dotimes (repeat (hash-table-count ht))
(multiple-value-bind (exists-p key value) (iterator)
(if exists-p
(progn
(format stream "~% \"~A\" -> " key)
(pp* value))))))))
(pp-doc elem)))

(defun print-str (elem)
(with-output-to-string (stream)
(print-elem elem stream)))



12 changes: 12 additions & 0 deletions perf/mem.lisp
@@ -0,0 +1,12 @@
(defun gen-clean (gen)
(let ((thr (sb-ext:generation-minimum-age-before-gc gen)))
(setf (sb-ext:generation-minimum-age-before-gc gen) 0.d0)
(sb-ext:gc :gen gen)
(setf (sb-ext:generation-minimum-age-before-gc gen) thr)))


(defun cleanup()
(dotimes (gen 7)
(progn
(format t "hello ~A~%" gen)
(gen-clean gen))))
148 changes: 148 additions & 0 deletions perf/threads.lisp
@@ -0,0 +1,148 @@
(in-package :cl-mongo)

(defun perf.mongo-reply (array)
(labels ((header (array)
(let ((lst ()))
(push (octet-to-int32.1 array 0) lst) ; length
(push (octet-to-int32.1 array 4) lst) ; request id
(push (octet-to-int32.1 array 8) lst) ; response to
(push (octet-to-int32.1 array 12) lst) ; opcode
(push (octet-to-int32.1 array 16) lst) ; response flag
(push (octet-to-int64.1 array 20) lst) ; cursor id
(push (octet-to-int32.1 array 28) lst) ; starting from
(push (octet-to-int32.1 array 32) lst) ; returned
(nreverse lst))))
(let ((head (header array)))
(values head (list (car head) 36 (nth 7 head) array)))))


(defun perf.db.find ( collection qm &key (query (bson-encode "query" (kv nil nil)) ) (mongo nil) (options 0) (skip 0) (limit 0) (selector nil) )
(let ((mongo (or mongo (mongo)))
(doccount 0)
(next-cursor-id 0))
(labels ((query ()
(mongo-message mongo (mongo-query
(full-collection-name mongo collection) query
:limit limit
:skip skip
:selector (bson-encode-container (expand-selector selector))
:options options)))
(get-more (cursor-id)
(mongo-message mongo (mongo-get-more
(full-collection-name mongo collection)
(int64-to-octet cursor-id) :limit limit)))
(decode-it (elem)
(bson-decode (nth 0 elem) (nth 1 elem) (nth 2 elem) (nth 3 elem))))
(multiple-value-bind (header docs) (perf.mongo-reply (query) )
(incf doccount (nth 7 header))
(setf next-cursor-id (nth 5 header))
(funcall qm (decode-it docs))
(loop
(multiple-value-bind (h1 d1) (perf.mongo-reply (get-more next-cursor-id))
(incf doccount (nth 7 h1))
(setf next-cursor-id (nth 5 h1))
(funcall qm (decode-it d1))
)
(when (zerop next-cursor-id) (return nil)))))
doccount))

(defvar *Q* nil)
(defvar *R* nil)
(defvar *RUNNERS* 0)


(defun mongodb-reader (coll lock cv &key (query (bson-encode "query" (kv nil nil)) ) (mongo nil) (limit 0) (selector nil) )
(lambda ()
(labels ((queue-data (item)
(push item *Q*))
(set-runners (val)
(bordeaux-threads:acquire-lock lock)
(setf *RUNNERS* val)
(bordeaux-threads:condition-notify cv)
(bordeaux-threads:release-lock lock)))
(set-runners 1)
(handler-case
(perf.db.find coll #'queue-data :query query :mongo mongo :limit limit :selector selector)
(error (c)
(format t "~% error ~A" c)))
(set-runners 0))))

(defun mongodb-reader (coll lock cv &key (query (bson-encode "query" (kv nil nil)) ) (mongo nil) (limit 0) (selector nil) )
(lambda ()
(labels ((queue-data (item)
(push item *Q*))
(set-runners (val)
(bordeaux-threads:acquire-lock lock)
(setf *RUNNERS* val)
(bordeaux-threads:condition-notify cv)
(bordeaux-threads:release-lock lock)))
(set-runners 1)
(perf.db.find coll #'queue-data :query query :mongo mongo :limit limit :selector selector)
(set-runners 0))))


(defun th-writer(coll lock cv &key (query (bson-encode "query" (kv nil nil)) ) (mongo nil) (limit 0) (selector nil) )
(bordeaux-threads:make-thread (mongodb-reader coll lock cv :query query :mongo mongo :limit limit :selector selector) :name "th-writer"))


(defun toR (fn)
(lambda (lst)
(do ()
((null lst) 'done)
(push (funcall fn (pop lst)) *R* ))))

(defun queue-reader(fn lock cv lock2 cv2)
(lambda ()
(progn
(bordeaux-threads:acquire-lock lock2)
(bordeaux-threads:acquire-lock lock)
(when ( = -1 *RUNNERS* )
(bordeaux-threads:condition-wait cv lock))
(block top
(loop
(progn
(do ()
((null *Q*) 'done)
(funcall fn (pop *Q*)))
(when (zerop *RUNNERS* )
(progn
(bordeaux-threads:release-lock lock)
(return-from top 'done)))
(bordeaux-threads:condition-wait cv lock))))
(bordeaux-threads:condition-notify cv2)
(bordeaux-threads:release-lock lock2))))


(defun th-reader (fun lock-q cv-q lock-r cv-r)
(bordeaux-threads:make-thread (queue-reader fun lock-q cv-q lock-r cv-r) :name "th-reader"))


(defun do-reduce (fn &key (initial-value nil) )
(let ((lvalue (or initial-value (pop *R*))))
(do ()
((null *R*) 'done)
(setf lvalue (funcall fn lvalue (pop *R*))))
lvalue))

(defun collect-to-hash (ht doc)
(setf (gethash (doc-id doc) ht) doc)
ht)

(defun do-query(coll &key (map-fn #'identity) (reduce-fn #'collect-to-hash) (initial-value (make-hash-table :test 'equal :size 100))
(query (bson-encode "query" (kv nil nil)) ) (mongo nil) (limit 0) (selector nil) )
(let ((lock-q (bordeaux-threads:make-lock "lock-q"))
(lock-r (bordeaux-threads:make-lock "lock-q"))
(cv-q (bordeaux-threads:make-condition-variable))
(cv-r (bordeaux-threads:make-condition-variable)))
(setf *Q* nil)
(setf *R* nil)
(setf *RUNNERS* -1)
(th-writer coll lock-q cv-q :mongo mongo :selector selector :query query :limit limit)
(bordeaux-threads:acquire-lock lock-r t)
(th-reader (toR map-fn) lock-q cv-q lock-r cv-r)
(bordeaux-threads:condition-wait cv-r lock-r)
(do-reduce reduce-fn :initial-value initial-value)))


(defun th-status ()
(bordeaux-threads:all-threads))
6 changes: 3 additions & 3 deletions src/bson-decode.lisp
Expand Up @@ -3,7 +3,7 @@
(defun ht->list.1 (ht)
(let ((lst ()))
(maphash (lambda (k v) (push v lst)) ht)
lst))
(nreverse lst)))

(defun end-of-key (start array)
(let ((eol start))
Expand Down Expand Up @@ -189,8 +189,8 @@
(incf pos)
(push (funcall container ht) lst) ))
(decf docs)
(when (= totlen pos) (return lst))
(when (zerop docs) (return lst))
(when (= totlen pos) (return (nreverse lst)))
(when (zerop docs) (return (nreverse lst)))
(go start-document)) )))


Expand Down

0 comments on commit 3a66e1f

Please sign in to comment.