Permalink
Browse files

[feature] stdlib: Added buffer reuse to Mongo.

  • Loading branch information...
1 parent 9e4e412 commit 2cc8f8d3347f1b86a0464de775db218ad4bd2602 @nrs135 nrs135 committed Oct 18, 2011
View
@@ -18,6 +18,8 @@
<debugTracer.ml*> : with_mlstate_debug
<filePos.ml> : with_mlstate_debug
<file_mimetype.ml>: with_mlstate_debug
+<bson.ml>: with_mlstate_debug
+<mongo.ml>: with_mlstate_debug
<endian.ml>: with_mlstate_debug
View
@@ -16,6 +16,8 @@
along with OPA. If not, see <http://www.gnu.org/licenses/>.
*)
+#<Debugvar:MONGO_DEBUG>
+
module type S_sig =
sig
type t
@@ -103,6 +103,7 @@ let lambda_correct = var "lambda_correct"
let lambda_debug = var "lambda_debug"
let low_level_db_log = var "low_level_db_log"
let mimetype_debug = var "mimetype_debug"
+let mongo_debug = var "mongo_debug"
let no_access_log = var "no_access_log"
let no_database_upgrade = var "no_database_upgrade"
let no_flood_prevention = var "no_flood_prevention"
@@ -605,6 +605,12 @@ val low_level_db_log : debug_var
val mimetype_debug : debug_var
(**
+ {b MLSTATE_MONGO_DEBUG}
+ display the debug for MongoDB
+*)
+val mongo_debug : debug_var
+
+(**
{b MLSTATE_NO_ACCESS_LOG}
*)
val no_access_log : debug_var
View
@@ -1,3 +1,22 @@
+(*
+ Copyright © 2011 MLstate
+
+ This file is part of OPA.
+
+ OPA is free software: you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License, version 3, as published by
+ the Free Software Foundation.
+
+ OPA is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
+ more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with OPA. If not, see <http://www.gnu.org/licenses/>.
+*)
+
+#<Debugvar:MONGO_DEBUG>
module St = Stuff.StuffString
@@ -65,9 +84,23 @@ let set_header m requestId responseTo opCode =
St.lei32 m.Bson.buf.Buf.str 12 opCode;
m.Bson.buf.Buf.i <- 16
+let buflst = ref ([]:Buf.t list)
+let bufcnt = ref 0
+let buflog = ref (fun str -> Printf.eprintf "%s\n%!" str)
+
+let get_buf ?(hint=4096) () =
+ match !buflst with
+ | [] -> (#<If$minlevel 2>!buflog (Printf.sprintf "get_buf(%d): new" !bufcnt)#<End>; Buf.create hint)
+ | b::t -> (#<If$minlevel 2>!buflog (Printf.sprintf "get_buf(%d): old" !bufcnt)#<End>; buflst := t; decr bufcnt; Buf.clear b; b)
+
+let free_buf b =
+ if Buf.length b <= (10*1024*1024)
+ then (#<If$minlevel 2>!buflog (Printf.sprintf "free_buf(%d): return" !bufcnt)#<End>; buflst := b::(!buflst); incr bufcnt)
+ else (#<If$minlevel 2>!buflog (Printf.sprintf "free_buf(%d): reset" !bufcnt)#<End>; Buf.reset b)
+
let create size =
if size < 16 then raise (Failure "Mongo.create: ridiculous size value");
- let b = { Bson.buf=Buf.create size; stack=[]; finished=false; } in
+ let b = { Bson.buf=get_buf ~hint:size (); stack=[]; finished=false; } in
b.Bson.buf.Buf.i <- 16;
b
@@ -81,6 +114,8 @@ let clear m = m.Bson.buf.Buf.i <- 16
let reset m = Buf.reset m.Bson.buf
+let free m = free_buf m.Bson.buf
+
(*struct OP_INSERT {
MsgHeader header; // standard message header
int32 flags; // bit vector - see below
View
@@ -42,10 +42,13 @@ val get : mongo_buf -> string
val export : mongo_buf -> string * int
val set_header_len : mongo_buf -> int -> unit
val set_header : mongo_buf -> int32 -> int -> int -> unit
+val get_buf : ?hint:int -> unit -> Buf.t
+val free_buf : Buf.t -> unit
val create : int -> mongo_buf
val init : ?hint:int -> int -> int32 -> int -> int -> mongo_buf
val clear : mongo_buf -> unit
val reset : mongo_buf -> unit
+val free : mongo_buf -> unit
val start_insert : mongo_buf -> int32 -> int -> string -> unit
val start_update : mongo_buf -> int32 -> int -> string -> unit
val start_query : mongo_buf -> int32 -> int -> string -> int -> int -> unit
View
@@ -329,11 +329,14 @@ let clear = Mongo.clear
##register reset: mongo_buf -> void
let reset = Mongo.reset
+##register free: mongo_buf -> void
+let free = Mongo.free
+
##register new_mailbox: int -> mailbox
-let new_mailbox size = (Buf.create size, ref 0)
+let new_mailbox size = (Mongo.get_buf ~hint:size (), ref 0)
##register reset_mailbox: mailbox -> void
-let reset_mailbox (b,_) = Buf.reset b
+let reset_mailbox (b,_) = Mongo.free_buf b
##register [cps-bypass] read_mongo : Socket.connection, mailbox, continuation(reply) -> void
let read_mongo conn mailbox k =
@@ -854,6 +854,9 @@ Mongo = {{
/* Reset the buffer, unallocate storage */
@private reset_ = (%% BslMongo.Mongo.reset %%: mongo_buf -> void)
+ /* Free the buffer, return buffer for later use */
+ @private free_ = (%% BslMongo.Mongo.free %%: mongo_buf -> void)
+
/* Mailbox so we can use the streaming parser */
@private new_mailbox_ = (%% BslMongo.Mongo.new_mailbox %%: int -> mailbox)
@private reset_mailbox_ = (%% BslMongo.Mongo.reset_mailbox %%: mailbox -> void)
@@ -871,6 +874,7 @@ Mongo = {{
s = String.substring(0,len,str)
//do println("{name}: s=\n{Bson.dump(16,s)}")
cnt = Socket.write_len(m.conn,s,len)
+ do free_(m.mbuf)
(cnt==len)
@private
@@ -886,6 +890,7 @@ Mongo = {{
* - Primitive error handling in case of mongo server malfunction
**/
open(bufsize:int, addr:string, port:int): Mongo.db =
+ //do println("Mongo.open")
err_cont = Continuation.make((s:string ->
do prerrln("Mongo.open: exn={s}")
System.exit(-1)))
@@ -901,6 +906,7 @@ Mongo = {{
* We need this to create a new buffer for each cursor.
**/
copy(m:Mongo.db): Mongo.db =
+ //do println("Mongo.copy")
{ conn=m.conn;
mbuf = create_(m.bufsize);
mailbox = new_mailbox_(m.bufsize);
@@ -913,6 +919,7 @@ Mongo = {{
* - returns a bool indicating success or failure
**/
insert(m:Mongo.db, flags:int, ns:string, documents:Bson.document): bool =
+ //do println("Mongo.insert")
m = { m with mbuf = create_(m.bufsize) }
do insert_(m.mbuf,flags,ns,documents)
send_no_reply(m,"insert")
@@ -930,6 +937,7 @@ Mongo = {{
* - returns a bool indicating success or failure
**/
insert_batch(m:Mongo.db, flags:int, ns:string, documents:list(Bson.document)): bool =
+ //do println("Mongo.insert_batch")
m = { m with mbuf = create_(m.bufsize) }
do insert_batch_(m.mbuf,flags,ns,documents)
send_no_reply(m,"insert")
@@ -947,6 +955,7 @@ Mongo = {{
* - returns a bool indicating success or failure
**/
update(m:Mongo.db, flags:int, ns:string, selector:Bson.document, update:Bson.document): bool =
+ //do println("Mongo.update")
m = { m with mbuf = create_(m.bufsize) }
do update_(m.mbuf,flags,ns,selector,update)
send_no_reply(m,"update")
@@ -963,6 +972,7 @@ Mongo = {{
**/
query(m:Mongo.db, flags:int, ns:string, numberToSkip:int, numberToReturn:int,
query:Bson.document, returnFieldSelector_opt:option(Bson.document)): option(reply) =
+ //do println("Mongo.query")
m = { m with mbuf = create_(m.bufsize) }
do query_(m.mbuf,flags,ns,numberToSkip,numberToReturn,query,returnFieldSelector_opt)
send_with_reply(m,"query")
@@ -979,6 +989,7 @@ Mongo = {{
* Send OP_GETMORE and get reply:
**/
get_more(m:Mongo.db, ns:string, numberToReturn:int, cursorID:cursorID): option(reply) =
+ //do println("Mongo.get_more")
m = { m with mbuf = create_(m.bufsize) }
do get_more_(m.mbuf,ns,numberToReturn,cursorID)
send_with_reply(m,"getmore")
@@ -989,6 +1000,7 @@ Mongo = {{
* - returns a bool indicating success or failure
**/
delete(m:Mongo.db, flags:int, ns:string, selector:Bson.document): bool =
+ //do println("Mongo.delete")
m = { m with mbuf = create_(m.bufsize) }
do delete_(m.mbuf,flags,ns,selector)
send_no_reply(m,"delete")
@@ -1006,6 +1018,7 @@ Mongo = {{
* - returns a bool indicating success or failure
**/
kill_cursors(m:Mongo.db, cursors:list(cursorID)): bool =
+ //do println("Mongo.kill_cursors")
m = { m with mbuf = create_(m.bufsize) }
do kill_cursors_(m.mbuf,cursors)
send_no_reply(m,"kill_cursors")
@@ -1024,16 +1037,18 @@ Mongo = {{
* Close mongo connection and deallocate buffer.
**/
close(m:Mongo.db) =
+ //do println("Mongo.close")
do Socket.close(m.conn)
- do reset_(m.mbuf)
+ do free_(m.mbuf)
do reset_mailbox_(m.mailbox)
void
/**
* Close mongo copy, deallocate buffer but leave connection open.
**/
close_copy(m:Mongo.db) =
- do reset_(m.mbuf)
+ //do println("Mongo.close_copy")
+ do free_(m.mbuf)
do reset_mailbox_(m.mailbox)
void
@@ -1370,12 +1385,24 @@ Cursor = {{
run_command(m, ns, [H.i32(cmd,arg)])
/**
+ * Same as simple integer command but with options, eg. [{ "getlasterror" : 1, w : 3, wtimeout : 10000 }]
+ **/
+ simple_int_command_opts(m:Mongo.db, ns:string, cmd:string, arg:int, opts:Bson.document): Mongo.result =
+ run_command(m, ns, List.flatten([[H.i32(cmd,arg)],opts]))
+
+ /**
* Perform a simple integer command, eg. [{ drop : "collection" }]
**/
simple_str_command(m:Mongo.db, ns:string, cmd:string, arg:string): Mongo.result =
run_command(m, ns, [H.str(cmd,arg)])
/**
+ * Perform a simple integer command, eg. [{ drop : "collection" }]
+ **/
+ simple_str_command_opts(m:Mongo.db, ns:string, cmd:string, arg:string, opts:Bson.document): Mongo.result =
+ run_command(m, ns, List.flatten([[H.str(cmd,arg)],opts]))
+
+ /**
* Predicate for connection alive. Peforms an admin "ping" command.
**/
check_connection(m:Mongo.db): outcome(bool,Mongo.failure) =
@@ -1402,6 +1429,13 @@ Cursor = {{
simple_int_command(m, db, "getlasterror", 1)
/**
+ * Return the last error from database, with full options.
+ **/
+ last_error_full(m:Mongo.db, db:string, fsync:bool, j:bool, w:int, wtimeout:int): Mongo.result =
+ simple_int_command_opts(m, db, "getlasterror", 1,
+ [H.bool("fsync",fsync), H.bool("j",j), H.i32("w",w), H.i32("wtimeout",wtimeout)])
+
+ /**
* Reset database error status.
**/
reset_error(m:Mongo.db, db:string): Mongo.result =

0 comments on commit 2cc8f8d

Please sign in to comment.