diff --git a/libbase/buf.ml b/libbase/buf.ml
new file mode 100644
index 00000000..65a78c30
--- /dev/null
+++ b/libbase/buf.ml
@@ -0,0 +1,118 @@
+(*
+ Copyright © 2011 MLstate
+
+ This file is part of OPA.
+
+ OPA is free software: you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License, version 3, as published by
+ the Free Software Foundation.
+
+ OPA is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
+ more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with OPA. If not, see .
+*)
+
+(* Simple library, like Buffer but fixed size but can also look like String if required *)
+
+type buf = { mutable str : string; mutable i : int }
+type t = buf
+
+let empty = { str=""; i=0; }
+
+let create size = { str=String.create size; i=0; }
+
+let make size ch = { str=String.make size ch; i=size; }
+
+let resize buf size =
+ let str = String.create size in
+ let newlen = min buf.i (String.length str) in
+ if buf.i > 0 then String.unsafe_blit buf.str 0 str 0 newlen;
+ buf.str <- str;
+ buf.i <- newlen
+
+let clear buf = buf.i <- 0
+
+let reset buf = buf.str <- ""; buf.i <- 0
+
+let length buf = buf.i
+
+let real_length buf = String.length buf.str
+
+let get buf i =
+ if i < 0 || i >= buf.i then invalid_arg (Printf.sprintf "Buf.get index out of bounds %d" i);
+ String.get buf.str i
+let nth = get
+
+let unsafe_get buf i = String.unsafe_get buf.str i
+
+let set buf i ch =
+ if i < 0 || i >= buf.i then invalid_arg (Printf.sprintf "Buf.set index out of bounds %d" i);
+ String.set buf.str i ch
+
+let unsafe_set buf i ch = String.unsafe_set buf.str i ch
+
+let sub buf base len =
+ if base < 0 || base + len >= buf.i then invalid_arg (Printf.sprintf "Buf.sub index out of bounds %d %d" base len);
+ String.sub buf.str base len
+
+let add_char buf ch =
+ if String.length buf.str - buf.i < 1 then invalid_arg (Printf.sprintf "Buf.add_char %c" ch);
+ buf.str.[buf.i] <- ch;
+ buf.i <- buf.i + 1
+
+let append buf str len =
+ if String.length buf.str - buf.i < len then invalid_arg (Printf.sprintf "Buf.add_stringn %s" str);
+ String.unsafe_blit str 0 buf.str buf.i len;
+ buf.i <- buf.i + len
+
+let extend buf len =
+ if String.length buf.str - buf.i < len then invalid_arg (Printf.sprintf "Buf.extend %d" len);
+ buf.i <- buf.i + len
+
+let add_string buf str =
+ append buf str (String.length str)
+
+let add_buf buf1 buf2 =
+ append buf1 buf2.str buf2.i
+
+let of_string str = { str; i=String.length str; }
+
+let to_string buf = String.sub buf.str 0 buf.i
+let contents = to_string
+
+let spare buf = String.length buf.str - buf.i
+
+(* Test code *)
+(*
+let buf = of_string "abc";;
+let () = set buf 1 'B';;
+let ch = get buf 0;;
+let ch = get buf 1;;
+let ch = get buf 2;;
+let str = try ignore (get buf 3); "NOT OK" with Invalid_argument str -> "OK: "^str;;
+let str = try ignore (set buf 4 'x'); "NOT OK" with Invalid_argument str -> "OK: "^str;;
+let buf = create 5;;
+let () = add_char buf 'D';;
+let () = add_string buf "ef";;
+let len = length buf;;
+let rlen = real_length buf;;
+let str = to_string buf;;
+let () = resize buf 8;;
+let len = length buf;;
+let rlen = real_length buf;;
+let str = to_string buf;;
+let () = resize buf 2;;
+let len = length buf;;
+let rlen = real_length buf;;
+let str = to_string buf;;
+let str = try ignore (add_char empty 'x'); "NOT OK" with Invalid_argument str -> "OK: "^str;;
+let () = resize buf 10;;
+let () = add_string buf "fghi";;
+let str = to_string buf;;
+let str = sub buf 1 3;;
+let str = try ignore (sub buf 100 100); "NOT OK" with Invalid_argument str -> "OK: "^str;;
+*)
diff --git a/libbase/buf.mli b/libbase/buf.mli
new file mode 100644
index 00000000..ba65c930
--- /dev/null
+++ b/libbase/buf.mli
@@ -0,0 +1,57 @@
+(*
+ Copyright © 2011 MLstate
+
+ This file is part of OPA.
+
+ OPA is free software: you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License, version 3, as published by
+ the Free Software Foundation.
+
+ OPA is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
+ more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with OPA. If not, see .
+*)
+
+(**
+ * Simple library, intended to have some of the properties of both
+ * String and Buffer. No automatic resize but it can be done manually.
+**)
+
+(** This type is concrete **)
+type buf = { mutable str : string; mutable i : int; }
+
+(** Common to String and Buffer **)
+type t = buf
+val length : buf -> int
+val create : int -> buf
+val sub : buf -> int -> int -> string
+
+(** Compatibility with String *)
+val make : int -> char -> buf
+val get : buf -> int -> char
+val unsafe_get : buf -> int -> char
+val set : buf -> int -> char -> unit
+val unsafe_set : buf -> int -> char -> unit
+
+(** Compatibility with Buffer **)
+val nth : buf -> int -> char
+val clear : buf -> unit
+val reset : buf -> unit
+val add_char : buf -> char -> unit
+val add_string : buf -> string -> unit
+val contents : buf -> string
+
+(** Specifics **)
+val empty : buf
+val append : buf -> string -> int -> unit
+val add_buf : buf -> buf -> unit
+val of_string : string -> buf
+val to_string : buf -> string
+val resize : buf -> int -> unit
+val extend : buf -> int -> unit
+val real_length : buf -> int
+val spare : buf -> int
diff --git a/libbase/mongo.ml b/libbase/mongo.ml
new file mode 100644
index 00000000..92cb15a9
--- /dev/null
+++ b/libbase/mongo.ml
@@ -0,0 +1,258 @@
+
+module St = Stuff.StuffString
+
+(* OP codes *)
+let _OP_REPLY = 1 (* Reply to a client request. responseTo is set *)
+let _OP_MSG = 1000 (* generic msg command followed by a string *)
+let _OP_UPDATE = 2001 (* update document *)
+let _OP_INSERT = 2002 (* insert new document *)
+let _RESERVED = 2003 (* formerly used for OP_GET_BY_OID *)
+let _OP_QUERY = 2004 (* query a collection *)
+let _OP_GET_MORE = 2005 (* Get more data from a query. See Cursors *)
+let _OP_DELETE = 2006 (* Delete documents *)
+let _OP_KILL_CURSORS = 2007 (* Tell database client is done with a cursor *)
+
+(* Flags *)
+
+(* OP_INSERT *)
+let _ContinueOnError = 0x00000001
+
+(* OP_UPDATE *)
+let _Upsert = 0x00000001
+let _MultiUpdate = 0x00000002
+
+(* OP_QUERY *)
+let _TailableCursor = 0x00000002
+let _SlaveOk = 0x00000004
+let _OplogReplay = 0x00000008
+let _NoCursorTimeout = 0x00000010
+let _AwaitData = 0x00000020
+let _Exhaust = 0x00000040
+let _Partial = 0x00000080
+
+(* OP_DELETE *)
+let _SingleRemove = 0x00000001
+
+(* OP_REPLY *)
+let _CursorNotFound = 0x00000001
+let _QueryFailure = 0x00000002
+let _ShardConfigStale = 0x00000003
+let _AwaitCapable = 0x00000004
+
+type mongo_buf = Bson.buf
+
+let add_bson m bson =
+ Buf.append m.Bson.buf bson.Bson.buf.Buf.str (Bson.Append.size bson)
+
+let get m = Buf.to_string m.Bson.buf
+
+let export m = (m.Bson.buf.Buf.str, m.Bson.buf.Buf.i)
+
+(*
+struct MsgHeader {
+ int32 messageLength; // total message size, including this
+ int32 requestID; // identifier for this message
+ int32 responseTo; // requestID from the original request (used in reponses from db)
+ int32 opCode; // request type - see table below
+}
+*)
+let set_header_len m messageLength =
+ St.lei32 m.Bson.buf.Buf.str 0 messageLength
+
+let set_header m requestId responseTo opCode =
+ St.lei32l m.Bson.buf.Buf.str 4 (if requestId = 0l then Random.int32 Int32.max_int else requestId);
+ St.lei32 m.Bson.buf.Buf.str 8 responseTo;
+ St.lei32 m.Bson.buf.Buf.str 12 opCode;
+ m.Bson.buf.Buf.i <- 16
+
+let create size =
+ if size < 16 then raise (Failure "Mongo.create: ridiculous size value");
+ let b = { Bson.buf=Buf.create size; stack=[]; finished=false; } in
+ b.Bson.buf.Buf.i <- 16;
+ b
+
+let init ?(hint=100) messageLength requestId responseTo opCode =
+ let m = create hint in
+ set_header_len m messageLength;
+ set_header m requestId responseTo opCode;
+ m
+
+let clear m = m.Bson.buf.Buf.i <- 16
+
+let reset m = Buf.reset m.Bson.buf
+
+(*struct OP_INSERT {
+ MsgHeader header; // standard message header
+ int32 flags; // bit vector - see below
+ cstring fullCollectionName; // "dbname.collectionname"
+ document* documents; // one or more documents to insert into the collection
+}*)
+
+let start_insert m rid flags ns =
+ set_header m rid 0 _OP_INSERT;
+ Stuff.add_le_int32 m.Bson.buf flags;
+ Buf.add_string m.Bson.buf ns;
+ Buf.add_char m.Bson.buf '\x00'
+
+(*struct OP_UPDATE {
+ MsgHeader header; // standard message header
+ int32 ZERO; // 0 - reserved for future use
+ cstring fullCollectionName; // "dbname.collectionname"
+ int32 flags; // bit vector. see below
+ document selector; // the query to select the document
+ document update; // specification of the update to perform
+}*)
+
+let start_update m rid flags ns =
+ set_header m rid 0 _OP_UPDATE;
+ Stuff.add_le_int32 m.Bson.buf 0;
+ Buf.add_string m.Bson.buf ns;
+ Buf.add_char m.Bson.buf '\x00';
+ Stuff.add_le_int32 m.Bson.buf flags
+
+(*struct OP_QUERY {
+ MsgHeader header; // standard message header
+ int32 flags; // bit vector of query options.
+ cstring fullCollectionName; // "dbname.collectionname"
+ int32 numberToSkip; // number of documents to skip
+ int32 numberToReturn; // number of documents to return in the first OP_REPLY batch
+ document query; // query object.
+ [ document returnFieldSelector; ] // Optional. Selector indicating the fields to return.
+}*)
+
+let start_query m rid flags ns numberToSkip numberToReturn =
+ set_header m rid 0 _OP_QUERY;
+ Stuff.add_le_int32 m.Bson.buf flags;
+ Buf.add_string m.Bson.buf ns;
+ Buf.add_char m.Bson.buf '\x00';
+ Stuff.add_le_int32 m.Bson.buf numberToSkip;
+ Stuff.add_le_int32 m.Bson.buf numberToReturn
+
+(*struct OP_GETMORE {
+ MsgHeader header; // standard message header
+ int32 ZERO; // 0 - reserved for future use
+ cstring fullCollectionName; // "dbname.collectionname"
+ int32 numberToReturn; // number of documents to return
+ int64 cursorID; // cursorID from the OP_REPLY
+}*)
+
+let start_getmore m rid ns numberToReturn cursorID =
+ set_header m rid 0 _OP_GET_MORE;
+ Stuff.add_le_int32 m.Bson.buf 0;
+ Buf.add_string m.Bson.buf ns;
+ Buf.add_char m.Bson.buf '\x00';
+ Stuff.add_le_int32 m.Bson.buf numberToReturn;
+ Stuff.add_le_int64L m.Bson.buf cursorID
+
+(*struct OP_DELETE {
+ MsgHeader header; // standard message header
+ int32 ZERO; // 0 - reserved for future use
+ cstring fullCollectionName; // "dbname.collectionname"
+ int32 flags; // bit vector - see below for details.
+ document selector; // query object. See below for details.
+}*)
+
+let start_delete m rid flags ns =
+ set_header m rid 0 _OP_DELETE;
+ Stuff.add_le_int32 m.Bson.buf 0;
+ Buf.add_string m.Bson.buf ns;
+ Buf.add_char m.Bson.buf '\x00';
+ Stuff.add_le_int32 m.Bson.buf flags
+
+(*struct OP_KILL_CURSORS {
+ MsgHeader header; // standard message header
+ int32 ZERO; // 0 - reserved for future use
+ int32 numberOfCursorIDs; // number of cursorIDs in message
+ int64* cursorIDs; // sequence of cursorIDs to close
+}*)
+let start_kill_cursors m rid clist =
+ set_header m rid 0 _OP_KILL_CURSORS;
+ Stuff.add_le_int32 m.Bson.buf 0;
+ Stuff.add_le_int32 m.Bson.buf (List.length clist);
+ List.iter (fun cursorID -> Stuff.add_le_int64L m.Bson.buf cursorID) clist
+
+(*struct OP_MSG {
+ MsgHeader header; // standard message header
+ cstring message; // message for the database
+}*)
+let start_msg m rid msg =
+ set_header m rid 0 _OP_MSG;
+ Buf.add_string m.Bson.buf msg;
+ Buf.add_char m.Bson.buf '\x00'
+
+(*struct OP_REPLY {
+ MsgHeader header; // standard message header
+ int32 responseFlags; // bit vector - see details below
+ int64 cursorID; // cursor id if client needs to do get more's
+ int32 startingFrom; // where in the cursor this reply is starting
+ int32 numberReturned; // number of documents in the reply
+ document* documents; // documents
+}*)
+
+let bson_init m =
+ m.Bson.stack <- m.Bson.buf.Buf.i :: m.Bson.stack;
+ Buf.extend m.Bson.buf 4
+
+let bson_finish m =
+ let start = List.hd m.Bson.stack in
+ m.Bson.stack <- List.tl m.Bson.stack;
+ if not m.Bson.finished
+ then (Buf.add_char m.Bson.buf '\x00';
+ St.lei32 m.Bson.buf.Buf.str start (m.Bson.buf.Buf.i-start))
+
+let finish m =
+ set_header_len m (Buf.length m.Bson.buf);
+ m.Bson.finished <- true
+
+(*
+(* Test code *)
+
+let dump ?(base=10) s =
+ let bb = Buffer.create 1024 in
+ let bh = Buffer.create 1024 in
+ let ba = Buffer.create 1024 in
+ let len = String.length s in
+ let m, n = len / base, len mod base in
+ for i = 0 to m do
+ let row = i * base in
+ for j = 0 to (if i = m then n-1 else base-1) do
+ let idx = i * base + j in
+ let code = Char.code s.[idx] in
+ Printf.bprintf bh "%02x " code;
+ Printf.bprintf ba "%c" (if code >= 32 && code < 127 then s.[idx] else '.');
+ if j = base-1 || (i = m && j = n-1)
+ then
+ (if base = 10
+ then Printf.bprintf bb "%04d %-30s %-10s\n" row (Buffer.contents bh) (Buffer.contents ba)
+ else Printf.bprintf bb "%04x %-48s %-16s\n" row (Buffer.contents bh) (Buffer.contents ba);
+ Buffer.clear bh; Buffer.clear ba)
+ done
+ done;
+ Buffer.contents bb;;
+
+let rid = 0x44495152l;;
+let flags = 1195461702;;
+
+let b = Bson.Append.init ();;
+let () = Bson.Append.oid b "_id" "OIDOIDOIDOID";;
+let () = Bson.Append.string b "name" "Joe";;
+let () = Bson.Append.int b "age" 33;;
+let () = Bson.Append.finish b;;
+
+let m1 = create 100;;
+let () = insert m1 rid flags "tutorial.persons" [b];;
+let () = print_string (dump (get m1));;
+
+let m2 = create 100;;
+let () = start_insert m2 rid flags "tutorial.persons";;
+let () = bson_init m2;;
+let () = Bson.Append.oid m2 "_id" "OIDOIDOIDOID";;
+let () = Bson.Append.string m2 "name" "Joe";;
+let () = Bson.Append.int m2 "age" 33;;
+let () = bson_finish m2;;
+let () = finish m2;;
+let () = print_string (dump (get m2));;
+let good = (get m1) = (get m2);;
+
+*)
+
diff --git a/libbase/mongo.mli b/libbase/mongo.mli
new file mode 100644
index 00000000..3a10c5cf
--- /dev/null
+++ b/libbase/mongo.mli
@@ -0,0 +1,58 @@
+
+(* OP codes *)
+val _OP_REPLY : int
+val _OP_MSG : int
+val _OP_UPDATE : int
+val _OP_INSERT : int
+val _RESERVED : int
+val _OP_QUERY : int
+val _OP_GET_MORE : int
+val _OP_DELETE : int
+val _OP_KILL_CURSORS : int
+
+(* OP_INSERT *)
+val _ContinueOnError : int
+
+(* OP_UPDATE *)
+val _Upsert : int
+val _MultiUpdate : int
+
+(* OP_QUERY *)
+val _TailableCursor : int
+val _SlaveOk : int
+val _OplogReplay : int
+val _NoCursorTimeout : int
+val _AwaitData : int
+val _Exhaust : int
+val _Partial : int
+
+(* OP_DELETE *)
+val _SingleRemove : int
+
+(* OP_REPLY *)
+val _CursorNotFound : int
+val _QueryFailure : int
+val _ShardConfigStale : int
+val _AwaitCapable : int
+
+type mongo_buf = Bson.buf
+
+val add_bson : mongo_buf -> Bson.buf -> unit
+val get : mongo_buf -> string
+val export : mongo_buf -> string * int
+val set_header_len : mongo_buf -> int -> unit
+val set_header : mongo_buf -> int32 -> int -> int -> unit
+val create : int -> mongo_buf
+val init : ?hint:int -> int -> int32 -> int -> int -> mongo_buf
+val clear : mongo_buf -> unit
+val reset : mongo_buf -> unit
+val start_insert : mongo_buf -> int32 -> int -> string -> unit
+val start_update : mongo_buf -> int32 -> int -> string -> unit
+val start_query : mongo_buf -> int32 -> int -> string -> int -> int -> unit
+val start_getmore : mongo_buf -> int32 -> string -> int -> int64 -> unit
+val start_delete : mongo_buf -> int32 -> int -> string -> unit
+val start_kill_cursors : mongo_buf -> int32 -> int64 list -> unit
+val start_msg : mongo_buf -> int32 -> string -> unit
+val bson_init : mongo_buf -> unit
+val bson_finish : mongo_buf -> unit
+val finish : mongo_buf -> unit
diff --git a/opabsl/mlbsl/bslMongo.ml b/opabsl/mlbsl/bslMongo.ml
new file mode 100644
index 00000000..ea62bd1e
--- /dev/null
+++ b/opabsl/mlbsl/bslMongo.ml
@@ -0,0 +1,123 @@
+(*
+ Copyright © 2011 MLstate
+
+ This file is part of OPA.
+
+ OPA is free software: you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License, version 3, as published by
+ the Free Software Foundation.
+
+ OPA is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
+ more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with OPA. If not, see .
+*)
+##extern-type mongo_buf = Mongo.mongo_buf
+##extern-type cursorID = int64
+##extern-type reply = Buf.buf
+
+##module Mongo
+
+##register create: int -> mongo_buf
+let create = Mongo.create
+
+(* AAAAAAARGHHHHHHH!!!!!! OPA just can't equate opa_rpc_bson_bson with ServerLib.ty_record *)
+
+##register insert: mongo_buf, int, string, 'a -> void
+let insert m f ns (bson:'a) =
+ let (bson:BslBson.opa_rpc_bson_bson) = Obj.magic bson in
+ Mongo.start_insert m 0l f ns;
+ Mongo.bson_init m;
+ BslBson.Bson.serializeb bson m;
+ Mongo.bson_finish m;
+ Mongo.finish m
+
+##register update: mongo_buf, int, string, 'a, 'a -> void
+let update m flags ns selector update =
+ let (selector:BslBson.opa_rpc_bson_bson) = Obj.magic selector in
+ let (update:BslBson.opa_rpc_bson_bson) = Obj.magic update in
+ Mongo.start_update m 0l flags ns;
+ Mongo.bson_init m;
+ BslBson.Bson.serializeb selector m;
+ Mongo.bson_finish m;
+ Mongo.bson_init m;
+ BslBson.Bson.serializeb update m;
+ Mongo.bson_finish m;
+ Mongo.finish m
+
+##register query: mongo_buf, int, string, int, int, 'a, option('a) -> void
+let query m flags ns numberToSkip numberToReturn query returnFieldSelector_opt =
+ let (query:BslBson.opa_rpc_bson_bson) = Obj.magic query in
+ let (returnFieldSelector_opt:BslBson.opa_rpc_bson_bson option) = Obj.magic returnFieldSelector_opt in
+ Mongo.start_query m 0l flags ns numberToSkip numberToReturn;
+ Mongo.bson_init m;
+ BslBson.Bson.serializeb query m;
+ Mongo.bson_finish m;
+ (match returnFieldSelector_opt with
+ | Some returnFieldSelector ->
+ Mongo.bson_init m;
+ BslBson.Bson.serializeb returnFieldSelector m;
+ Mongo.bson_finish m
+ | None -> ());
+ Mongo.finish m
+
+##register get_more: mongo_buf, string, int, cursorID -> void
+let get_more m ns numberToReturn cursorID =
+ Mongo.start_getmore m 0l ns numberToReturn cursorID;
+ Mongo.finish m
+
+##register delete: mongo_buf, int, string, 'a -> void
+let delete m flags ns selector =
+ let (selector:BslBson.opa_rpc_bson_bson) = Obj.magic selector in
+ Mongo.start_delete m 0l flags ns;
+ Mongo.bson_init m;
+ BslBson.Bson.serializeb selector m;
+ Mongo.bson_finish m;
+ Mongo.finish m
+
+##register kill_cursors: mongo_buf, list(cursorID) -> void
+let kill_cursors m clist =
+ Mongo.start_kill_cursors m 0l (BslNativeLib.opa_list_to_ocaml_list (fun x -> x) clist);
+ Mongo.finish m
+
+##register msg: mongo_buf, string -> void
+let msg m msg =
+ Mongo.start_msg m 0l msg;
+ Mongo.finish m
+
+##register get: mongo_buf -> string
+let get = Mongo.get
+
+##register export: mongo_buf -> opa[tuple_2(string, int)]
+let export m =
+ let (str, i) = Mongo.export m in
+ BslNativeLib.opa_tuple_2 (ServerLib.wrap_string str, ServerLib.wrap_int i)
+
+##register clear: mongo_buf -> void
+let clear = Mongo.clear
+
+##register reset: mongo_buf -> void
+let reset = Mongo.reset
+
+module C = QmlCpsServerLib
+open C.Ops
+
+##register [cps-bypass] read_mongo : Socket.connection, int, continuation(reply) -> void
+let read_mongo conn size k =
+ let buf = Buf.create size in
+ let rec aux (len, str) =
+ Buf.append buf str len;
+ let len = Buf.length buf in
+ if len < 4
+ then Scheduler.read Scheduler.default conn aux
+ else
+ if len < Stuff.StuffString.ldi32 buf.Buf.str 0
+ then Scheduler.read Scheduler.default conn aux
+ else buf |> k
+ in
+ Scheduler.read Scheduler.default conn aux
+
+##endmodule
diff --git a/stdlib/io/mongo/mongo.opa b/stdlib/io/mongo/mongo.opa
new file mode 100644
index 00000000..eea6ed8f
--- /dev/null
+++ b/stdlib/io/mongo/mongo.opa
@@ -0,0 +1,203 @@
+/*
+ Copyright © 2011 MLstate
+
+ This file is part of OPA.
+
+ OPA is free software: you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License, version 3, as published by
+ the Free Software Foundation.
+
+ OPA is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
+ more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with OPA. If not, see .
+*/
+
+import stdlib.io.socket
+import stdlib.core.{rpc.core}
+
+type mongo_buf = external
+type cursorID = external
+type reply = external
+
+type mongo = {
+ conn : Socket.connection;
+ mbuf : mongo_buf
+ }
+
+//@both <-- ??? why doesn't this work ???
+Mongo = {{
+
+ /* Flags */
+
+ /* OP_INSERT */
+ _ContinueOnError = 0x00000001
+
+ /* OP_UPDATE */
+ _Upsert = 0x00000001
+ _MultiUpdate = 0x00000002
+
+ /* OP_QUERY */
+ _TailableCursor = 0x00000002
+ _SlaveOk = 0x00000004
+ _OplogReplay = 0x00000008
+ _NoCursorTimeout = 0x00000010
+ _AwaitData = 0x00000020
+ _Exhaust = 0x00000040
+ _Partial = 0x00000080
+
+ /* OP_DELETE */
+ _SingleRemove = 0x00000001
+
+ /* OP_REPLY */
+ _CursorNotFound = 0x00000001
+ _QueryFailure = 0x00000002
+ _ShardConfigStale = 0x00000003
+ _AwaitCapable = 0x00000004
+
+ /** Allocate new buffer of given size **/
+ @private create_ = (%% BslMongo.Mongo.create %%: int -> mongo_buf)
+
+ /** Build OP_INSERT message in buffer **/
+ @private insert_ = (%% BslMongo.Mongo.insert %%: mongo_buf, int, string, 'a -> void)
+
+ /** Build OP_UPDATE message in buffer **/
+ @private update_ = (%% BslMongo.Mongo.update %%: mongo_buf, int, string, 'a, 'a -> void)
+
+ /** Build OP_QUERY message in buffer **/
+ @private query_ = (%% BslMongo.Mongo.query %%: mongo_buf, int, string, int, int, 'a, option('a) -> void)
+
+ /** Build OP_GET_MORE message in buffer **/
+ @private get_more_ = (%% BslMongo.Mongo.get_more %%: mongo_buf, string, int, cursorID -> void)
+
+ /** Build OP_DELETE message in buffer **/
+ @private delete_ = (%% BslMongo.Mongo.delete %%: mongo_buf, int, string, 'a -> void)
+
+ /** Build OP_KILL_CURSORS message in buffer **/
+ @private kill_cursors_ = (%% BslMongo.Mongo.kill_cursors %%: mongo_buf, list(cursorID) -> void)
+
+ /** Build OP_MSG message in buffer **/
+ @private msg_ = (%% BslMongo.Mongo.msg %%: mongo_buf, string -> void)
+
+ /** Copies string out of buffer. **/
+ @private get_ = (%% BslMongo.Mongo.get %%: mongo_buf -> string)
+
+ /** Access the raw string and length **/
+ @private export_ = (%% BslMongo.Mongo.export %%: mongo_buf -> (string, int))
+
+ /** Clear out any data in the buffer, leave buffer allocated **/
+ @private clear_ = (%% BslMongo.Mongo.clear %%: mongo_buf -> void)
+
+ /** Reset the buffer, unallocate storage **/
+ @private reset_ = (%% BslMongo.Mongo.reset %%: mongo_buf -> void)
+
+ /**
+ * Specialised read, read until the size equals the (little endian)
+ * 4-byte int at the start of the reply.
+ **/
+ @private read_mongo = (%% BslMongo.read_mongo %%: Socket.connection, int -> reply)
+
+ /**
+ * Create new mongo object:
+ * - Open connection to mongo server at addr:port
+ * - Allocate buffer of given size
+ **/
+ open(bufsize,addr,port): mongo =
+ { conn = Socket.connect(addr,port);
+ mbuf = create_(bufsize)
+ }
+
+ @private
+ send_no_reply(m,name): bool =
+ match export_(m.mbuf) with
+ | (str, len) ->
+ s = String.substring(0,len,str)
+ do println("{name}: s=\n{Bson.dump(10)(s)}")
+ cnt = Socket.write_len(m.conn,s,len)
+ do println("cnt={cnt} len={len}")
+ (cnt==len)
+
+ @private
+ send_with_reply(m,name): option(reply) =
+ match export_(m.mbuf) with
+ | (str, len) ->
+ s = String.substring(0,len,str)
+ do println("{name}: s=\n{Bson.dump(10)(s)}")
+ cnt = Socket.write_len(m.conn,s,len)
+ do println("cnt={cnt} len={len}")
+ if (cnt==len)
+ then {some=Socket.read_mongo(m.conn,(1024*1024))}
+ else {none}
+
+ /**
+ * Send OP_INSERT with given collection name:
+ * - no reply expected
+ * - returns a bool indicating success or failure
+ **/
+ insert(m,flags,ns,documents): bool =
+ do insert_(m.mbuf,flags,ns,documents)
+ send_no_reply(m,"insert")
+
+ /**
+ * Send OP_UPDATE with given collection name:
+ * - no reply expected
+ * - returns a bool indicating success or failure
+ **/
+ update(m,flags,ns,selector,update): bool =
+ do update_(m.mbuf,flags,ns,selector,update)
+ send_no_reply(m,"update")
+
+ /**
+ * Send OP_QUERY and get reply:
+ **/
+ /*query(m,flags,ns,numberToSkip,numberToReturn,query,returnFieldSelector_opt): bool =
+ do query_(m.mbuf,0,flags,ns,numberToSkip,numberToReturn,query,returnFieldSelector_opt)
+ send_with_reply(m,"query")*/
+
+ /**
+ * Send OP_GETMORE and get reply:
+ **/
+ /*getmore(m,ns,numberToReturn,cursorID): bool =
+ do getmore_(m.mbuf,0,ns,numberToReturn,cursorID)
+ send_with_reply(m,"getmore")*/
+
+ /**
+ * Send OP_DELETE:
+ * - no reply expected
+ * - returns a bool indicating success or failure
+ **/
+ delete(m,flags,ns,selector): bool =
+ do delete_(m.mbuf,flags,ns,selector)
+ send_no_reply(m,"delete")
+
+ /**
+ * Send OP_KILL_CURSORS:
+ * - no reply expected
+ * - returns a bool indicating success or failure
+ **/
+ kill_cursors(m,cursors): bool =
+ do kill_cursors_(m.mbuf,cursors)
+ send_no_reply(m,"kill_cursors")
+
+ /**
+ * Send OP_MSG:
+ * - no reply expected
+ * - returns a bool indicating success or failure
+ **/
+ msg(m,msg): bool =
+ do msg_(m.mbuf,msg)
+ send_no_reply(m,"msg")
+
+ /**
+ * Close mongo connection and deallocate buffer.
+ **/
+ close(m) =
+ do Socket.close(m.conn)
+ do reset_(m.mbuf)
+ void
+
+}}
+