From b2c8070650868cb660dde213d98507de4eada45a Mon Sep 17 00:00:00 2001 From: Norman Scaife Date: Fri, 9 Sep 2011 18:05:05 +0200 Subject: [PATCH] [feature] mongo: New files for mongo API. --- libbase/buf.ml | 118 +++++++++++++++++ libbase/buf.mli | 57 +++++++++ libbase/mongo.ml | 258 ++++++++++++++++++++++++++++++++++++++ libbase/mongo.mli | 58 +++++++++ opabsl/mlbsl/bslMongo.ml | 123 ++++++++++++++++++ stdlib/io/mongo/mongo.opa | 203 ++++++++++++++++++++++++++++++ 6 files changed, 817 insertions(+) create mode 100644 libbase/buf.ml create mode 100644 libbase/buf.mli create mode 100644 libbase/mongo.ml create mode 100644 libbase/mongo.mli create mode 100644 opabsl/mlbsl/bslMongo.ml create mode 100644 stdlib/io/mongo/mongo.opa diff --git a/libbase/buf.ml b/libbase/buf.ml new file mode 100644 index 00000000..65a78c30 --- /dev/null +++ b/libbase/buf.ml @@ -0,0 +1,118 @@ +(* + Copyright © 2011 MLstate + + This file is part of OPA. + + OPA is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License, version 3, as published by + the Free Software Foundation. + + OPA is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for + more details. + + You should have received a copy of the GNU Affero General Public License + along with OPA. If not, see . +*) + +(* Simple library, like Buffer but fixed size but can also look like String if required *) + +type buf = { mutable str : string; mutable i : int } +type t = buf + +let empty = { str=""; i=0; } + +let create size = { str=String.create size; i=0; } + +let make size ch = { str=String.make size ch; i=size; } + +let resize buf size = + let str = String.create size in + let newlen = min buf.i (String.length str) in + if buf.i > 0 then String.unsafe_blit buf.str 0 str 0 newlen; + buf.str <- str; + buf.i <- newlen + +let clear buf = buf.i <- 0 + +let reset buf = buf.str <- ""; buf.i <- 0 + +let length buf = buf.i + +let real_length buf = String.length buf.str + +let get buf i = + if i < 0 || i >= buf.i then invalid_arg (Printf.sprintf "Buf.get index out of bounds %d" i); + String.get buf.str i +let nth = get + +let unsafe_get buf i = String.unsafe_get buf.str i + +let set buf i ch = + if i < 0 || i >= buf.i then invalid_arg (Printf.sprintf "Buf.set index out of bounds %d" i); + String.set buf.str i ch + +let unsafe_set buf i ch = String.unsafe_set buf.str i ch + +let sub buf base len = + if base < 0 || base + len >= buf.i then invalid_arg (Printf.sprintf "Buf.sub index out of bounds %d %d" base len); + String.sub buf.str base len + +let add_char buf ch = + if String.length buf.str - buf.i < 1 then invalid_arg (Printf.sprintf "Buf.add_char %c" ch); + buf.str.[buf.i] <- ch; + buf.i <- buf.i + 1 + +let append buf str len = + if String.length buf.str - buf.i < len then invalid_arg (Printf.sprintf "Buf.add_stringn %s" str); + String.unsafe_blit str 0 buf.str buf.i len; + buf.i <- buf.i + len + +let extend buf len = + if String.length buf.str - buf.i < len then invalid_arg (Printf.sprintf "Buf.extend %d" len); + buf.i <- buf.i + len + +let add_string buf str = + append buf str (String.length str) + +let add_buf buf1 buf2 = + append buf1 buf2.str buf2.i + +let of_string str = { str; i=String.length str; } + +let to_string buf = String.sub buf.str 0 buf.i +let contents = to_string + +let spare buf = String.length buf.str - buf.i + +(* Test code *) +(* +let buf = of_string "abc";; +let () = set buf 1 'B';; +let ch = get buf 0;; +let ch = get buf 1;; +let ch = get buf 2;; +let str = try ignore (get buf 3); "NOT OK" with Invalid_argument str -> "OK: "^str;; +let str = try ignore (set buf 4 'x'); "NOT OK" with Invalid_argument str -> "OK: "^str;; +let buf = create 5;; +let () = add_char buf 'D';; +let () = add_string buf "ef";; +let len = length buf;; +let rlen = real_length buf;; +let str = to_string buf;; +let () = resize buf 8;; +let len = length buf;; +let rlen = real_length buf;; +let str = to_string buf;; +let () = resize buf 2;; +let len = length buf;; +let rlen = real_length buf;; +let str = to_string buf;; +let str = try ignore (add_char empty 'x'); "NOT OK" with Invalid_argument str -> "OK: "^str;; +let () = resize buf 10;; +let () = add_string buf "fghi";; +let str = to_string buf;; +let str = sub buf 1 3;; +let str = try ignore (sub buf 100 100); "NOT OK" with Invalid_argument str -> "OK: "^str;; +*) diff --git a/libbase/buf.mli b/libbase/buf.mli new file mode 100644 index 00000000..ba65c930 --- /dev/null +++ b/libbase/buf.mli @@ -0,0 +1,57 @@ +(* + Copyright © 2011 MLstate + + This file is part of OPA. + + OPA is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License, version 3, as published by + the Free Software Foundation. + + OPA is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for + more details. + + You should have received a copy of the GNU Affero General Public License + along with OPA. If not, see . +*) + +(** + * Simple library, intended to have some of the properties of both + * String and Buffer. No automatic resize but it can be done manually. +**) + +(** This type is concrete **) +type buf = { mutable str : string; mutable i : int; } + +(** Common to String and Buffer **) +type t = buf +val length : buf -> int +val create : int -> buf +val sub : buf -> int -> int -> string + +(** Compatibility with String *) +val make : int -> char -> buf +val get : buf -> int -> char +val unsafe_get : buf -> int -> char +val set : buf -> int -> char -> unit +val unsafe_set : buf -> int -> char -> unit + +(** Compatibility with Buffer **) +val nth : buf -> int -> char +val clear : buf -> unit +val reset : buf -> unit +val add_char : buf -> char -> unit +val add_string : buf -> string -> unit +val contents : buf -> string + +(** Specifics **) +val empty : buf +val append : buf -> string -> int -> unit +val add_buf : buf -> buf -> unit +val of_string : string -> buf +val to_string : buf -> string +val resize : buf -> int -> unit +val extend : buf -> int -> unit +val real_length : buf -> int +val spare : buf -> int diff --git a/libbase/mongo.ml b/libbase/mongo.ml new file mode 100644 index 00000000..92cb15a9 --- /dev/null +++ b/libbase/mongo.ml @@ -0,0 +1,258 @@ + +module St = Stuff.StuffString + +(* OP codes *) +let _OP_REPLY = 1 (* Reply to a client request. responseTo is set *) +let _OP_MSG = 1000 (* generic msg command followed by a string *) +let _OP_UPDATE = 2001 (* update document *) +let _OP_INSERT = 2002 (* insert new document *) +let _RESERVED = 2003 (* formerly used for OP_GET_BY_OID *) +let _OP_QUERY = 2004 (* query a collection *) +let _OP_GET_MORE = 2005 (* Get more data from a query. See Cursors *) +let _OP_DELETE = 2006 (* Delete documents *) +let _OP_KILL_CURSORS = 2007 (* Tell database client is done with a cursor *) + +(* Flags *) + +(* OP_INSERT *) +let _ContinueOnError = 0x00000001 + +(* OP_UPDATE *) +let _Upsert = 0x00000001 +let _MultiUpdate = 0x00000002 + +(* OP_QUERY *) +let _TailableCursor = 0x00000002 +let _SlaveOk = 0x00000004 +let _OplogReplay = 0x00000008 +let _NoCursorTimeout = 0x00000010 +let _AwaitData = 0x00000020 +let _Exhaust = 0x00000040 +let _Partial = 0x00000080 + +(* OP_DELETE *) +let _SingleRemove = 0x00000001 + +(* OP_REPLY *) +let _CursorNotFound = 0x00000001 +let _QueryFailure = 0x00000002 +let _ShardConfigStale = 0x00000003 +let _AwaitCapable = 0x00000004 + +type mongo_buf = Bson.buf + +let add_bson m bson = + Buf.append m.Bson.buf bson.Bson.buf.Buf.str (Bson.Append.size bson) + +let get m = Buf.to_string m.Bson.buf + +let export m = (m.Bson.buf.Buf.str, m.Bson.buf.Buf.i) + +(* +struct MsgHeader { + int32 messageLength; // total message size, including this + int32 requestID; // identifier for this message + int32 responseTo; // requestID from the original request (used in reponses from db) + int32 opCode; // request type - see table below +} +*) +let set_header_len m messageLength = + St.lei32 m.Bson.buf.Buf.str 0 messageLength + +let set_header m requestId responseTo opCode = + St.lei32l m.Bson.buf.Buf.str 4 (if requestId = 0l then Random.int32 Int32.max_int else requestId); + St.lei32 m.Bson.buf.Buf.str 8 responseTo; + St.lei32 m.Bson.buf.Buf.str 12 opCode; + m.Bson.buf.Buf.i <- 16 + +let create size = + if size < 16 then raise (Failure "Mongo.create: ridiculous size value"); + let b = { Bson.buf=Buf.create size; stack=[]; finished=false; } in + b.Bson.buf.Buf.i <- 16; + b + +let init ?(hint=100) messageLength requestId responseTo opCode = + let m = create hint in + set_header_len m messageLength; + set_header m requestId responseTo opCode; + m + +let clear m = m.Bson.buf.Buf.i <- 16 + +let reset m = Buf.reset m.Bson.buf + +(*struct OP_INSERT { + MsgHeader header; // standard message header + int32 flags; // bit vector - see below + cstring fullCollectionName; // "dbname.collectionname" + document* documents; // one or more documents to insert into the collection +}*) + +let start_insert m rid flags ns = + set_header m rid 0 _OP_INSERT; + Stuff.add_le_int32 m.Bson.buf flags; + Buf.add_string m.Bson.buf ns; + Buf.add_char m.Bson.buf '\x00' + +(*struct OP_UPDATE { + MsgHeader header; // standard message header + int32 ZERO; // 0 - reserved for future use + cstring fullCollectionName; // "dbname.collectionname" + int32 flags; // bit vector. see below + document selector; // the query to select the document + document update; // specification of the update to perform +}*) + +let start_update m rid flags ns = + set_header m rid 0 _OP_UPDATE; + Stuff.add_le_int32 m.Bson.buf 0; + Buf.add_string m.Bson.buf ns; + Buf.add_char m.Bson.buf '\x00'; + Stuff.add_le_int32 m.Bson.buf flags + +(*struct OP_QUERY { + MsgHeader header; // standard message header + int32 flags; // bit vector of query options. + cstring fullCollectionName; // "dbname.collectionname" + int32 numberToSkip; // number of documents to skip + int32 numberToReturn; // number of documents to return in the first OP_REPLY batch + document query; // query object. + [ document returnFieldSelector; ] // Optional. Selector indicating the fields to return. +}*) + +let start_query m rid flags ns numberToSkip numberToReturn = + set_header m rid 0 _OP_QUERY; + Stuff.add_le_int32 m.Bson.buf flags; + Buf.add_string m.Bson.buf ns; + Buf.add_char m.Bson.buf '\x00'; + Stuff.add_le_int32 m.Bson.buf numberToSkip; + Stuff.add_le_int32 m.Bson.buf numberToReturn + +(*struct OP_GETMORE { + MsgHeader header; // standard message header + int32 ZERO; // 0 - reserved for future use + cstring fullCollectionName; // "dbname.collectionname" + int32 numberToReturn; // number of documents to return + int64 cursorID; // cursorID from the OP_REPLY +}*) + +let start_getmore m rid ns numberToReturn cursorID = + set_header m rid 0 _OP_GET_MORE; + Stuff.add_le_int32 m.Bson.buf 0; + Buf.add_string m.Bson.buf ns; + Buf.add_char m.Bson.buf '\x00'; + Stuff.add_le_int32 m.Bson.buf numberToReturn; + Stuff.add_le_int64L m.Bson.buf cursorID + +(*struct OP_DELETE { + MsgHeader header; // standard message header + int32 ZERO; // 0 - reserved for future use + cstring fullCollectionName; // "dbname.collectionname" + int32 flags; // bit vector - see below for details. + document selector; // query object. See below for details. +}*) + +let start_delete m rid flags ns = + set_header m rid 0 _OP_DELETE; + Stuff.add_le_int32 m.Bson.buf 0; + Buf.add_string m.Bson.buf ns; + Buf.add_char m.Bson.buf '\x00'; + Stuff.add_le_int32 m.Bson.buf flags + +(*struct OP_KILL_CURSORS { + MsgHeader header; // standard message header + int32 ZERO; // 0 - reserved for future use + int32 numberOfCursorIDs; // number of cursorIDs in message + int64* cursorIDs; // sequence of cursorIDs to close +}*) +let start_kill_cursors m rid clist = + set_header m rid 0 _OP_KILL_CURSORS; + Stuff.add_le_int32 m.Bson.buf 0; + Stuff.add_le_int32 m.Bson.buf (List.length clist); + List.iter (fun cursorID -> Stuff.add_le_int64L m.Bson.buf cursorID) clist + +(*struct OP_MSG { + MsgHeader header; // standard message header + cstring message; // message for the database +}*) +let start_msg m rid msg = + set_header m rid 0 _OP_MSG; + Buf.add_string m.Bson.buf msg; + Buf.add_char m.Bson.buf '\x00' + +(*struct OP_REPLY { + MsgHeader header; // standard message header + int32 responseFlags; // bit vector - see details below + int64 cursorID; // cursor id if client needs to do get more's + int32 startingFrom; // where in the cursor this reply is starting + int32 numberReturned; // number of documents in the reply + document* documents; // documents +}*) + +let bson_init m = + m.Bson.stack <- m.Bson.buf.Buf.i :: m.Bson.stack; + Buf.extend m.Bson.buf 4 + +let bson_finish m = + let start = List.hd m.Bson.stack in + m.Bson.stack <- List.tl m.Bson.stack; + if not m.Bson.finished + then (Buf.add_char m.Bson.buf '\x00'; + St.lei32 m.Bson.buf.Buf.str start (m.Bson.buf.Buf.i-start)) + +let finish m = + set_header_len m (Buf.length m.Bson.buf); + m.Bson.finished <- true + +(* +(* Test code *) + +let dump ?(base=10) s = + let bb = Buffer.create 1024 in + let bh = Buffer.create 1024 in + let ba = Buffer.create 1024 in + let len = String.length s in + let m, n = len / base, len mod base in + for i = 0 to m do + let row = i * base in + for j = 0 to (if i = m then n-1 else base-1) do + let idx = i * base + j in + let code = Char.code s.[idx] in + Printf.bprintf bh "%02x " code; + Printf.bprintf ba "%c" (if code >= 32 && code < 127 then s.[idx] else '.'); + if j = base-1 || (i = m && j = n-1) + then + (if base = 10 + then Printf.bprintf bb "%04d %-30s %-10s\n" row (Buffer.contents bh) (Buffer.contents ba) + else Printf.bprintf bb "%04x %-48s %-16s\n" row (Buffer.contents bh) (Buffer.contents ba); + Buffer.clear bh; Buffer.clear ba) + done + done; + Buffer.contents bb;; + +let rid = 0x44495152l;; +let flags = 1195461702;; + +let b = Bson.Append.init ();; +let () = Bson.Append.oid b "_id" "OIDOIDOIDOID";; +let () = Bson.Append.string b "name" "Joe";; +let () = Bson.Append.int b "age" 33;; +let () = Bson.Append.finish b;; + +let m1 = create 100;; +let () = insert m1 rid flags "tutorial.persons" [b];; +let () = print_string (dump (get m1));; + +let m2 = create 100;; +let () = start_insert m2 rid flags "tutorial.persons";; +let () = bson_init m2;; +let () = Bson.Append.oid m2 "_id" "OIDOIDOIDOID";; +let () = Bson.Append.string m2 "name" "Joe";; +let () = Bson.Append.int m2 "age" 33;; +let () = bson_finish m2;; +let () = finish m2;; +let () = print_string (dump (get m2));; +let good = (get m1) = (get m2);; + +*) + diff --git a/libbase/mongo.mli b/libbase/mongo.mli new file mode 100644 index 00000000..3a10c5cf --- /dev/null +++ b/libbase/mongo.mli @@ -0,0 +1,58 @@ + +(* OP codes *) +val _OP_REPLY : int +val _OP_MSG : int +val _OP_UPDATE : int +val _OP_INSERT : int +val _RESERVED : int +val _OP_QUERY : int +val _OP_GET_MORE : int +val _OP_DELETE : int +val _OP_KILL_CURSORS : int + +(* OP_INSERT *) +val _ContinueOnError : int + +(* OP_UPDATE *) +val _Upsert : int +val _MultiUpdate : int + +(* OP_QUERY *) +val _TailableCursor : int +val _SlaveOk : int +val _OplogReplay : int +val _NoCursorTimeout : int +val _AwaitData : int +val _Exhaust : int +val _Partial : int + +(* OP_DELETE *) +val _SingleRemove : int + +(* OP_REPLY *) +val _CursorNotFound : int +val _QueryFailure : int +val _ShardConfigStale : int +val _AwaitCapable : int + +type mongo_buf = Bson.buf + +val add_bson : mongo_buf -> Bson.buf -> unit +val get : mongo_buf -> string +val export : mongo_buf -> string * int +val set_header_len : mongo_buf -> int -> unit +val set_header : mongo_buf -> int32 -> int -> int -> unit +val create : int -> mongo_buf +val init : ?hint:int -> int -> int32 -> int -> int -> mongo_buf +val clear : mongo_buf -> unit +val reset : mongo_buf -> unit +val start_insert : mongo_buf -> int32 -> int -> string -> unit +val start_update : mongo_buf -> int32 -> int -> string -> unit +val start_query : mongo_buf -> int32 -> int -> string -> int -> int -> unit +val start_getmore : mongo_buf -> int32 -> string -> int -> int64 -> unit +val start_delete : mongo_buf -> int32 -> int -> string -> unit +val start_kill_cursors : mongo_buf -> int32 -> int64 list -> unit +val start_msg : mongo_buf -> int32 -> string -> unit +val bson_init : mongo_buf -> unit +val bson_finish : mongo_buf -> unit +val finish : mongo_buf -> unit diff --git a/opabsl/mlbsl/bslMongo.ml b/opabsl/mlbsl/bslMongo.ml new file mode 100644 index 00000000..ea62bd1e --- /dev/null +++ b/opabsl/mlbsl/bslMongo.ml @@ -0,0 +1,123 @@ +(* + Copyright © 2011 MLstate + + This file is part of OPA. + + OPA is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License, version 3, as published by + the Free Software Foundation. + + OPA is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for + more details. + + You should have received a copy of the GNU Affero General Public License + along with OPA. If not, see . +*) +##extern-type mongo_buf = Mongo.mongo_buf +##extern-type cursorID = int64 +##extern-type reply = Buf.buf + +##module Mongo + +##register create: int -> mongo_buf +let create = Mongo.create + +(* AAAAAAARGHHHHHHH!!!!!! OPA just can't equate opa_rpc_bson_bson with ServerLib.ty_record *) + +##register insert: mongo_buf, int, string, 'a -> void +let insert m f ns (bson:'a) = + let (bson:BslBson.opa_rpc_bson_bson) = Obj.magic bson in + Mongo.start_insert m 0l f ns; + Mongo.bson_init m; + BslBson.Bson.serializeb bson m; + Mongo.bson_finish m; + Mongo.finish m + +##register update: mongo_buf, int, string, 'a, 'a -> void +let update m flags ns selector update = + let (selector:BslBson.opa_rpc_bson_bson) = Obj.magic selector in + let (update:BslBson.opa_rpc_bson_bson) = Obj.magic update in + Mongo.start_update m 0l flags ns; + Mongo.bson_init m; + BslBson.Bson.serializeb selector m; + Mongo.bson_finish m; + Mongo.bson_init m; + BslBson.Bson.serializeb update m; + Mongo.bson_finish m; + Mongo.finish m + +##register query: mongo_buf, int, string, int, int, 'a, option('a) -> void +let query m flags ns numberToSkip numberToReturn query returnFieldSelector_opt = + let (query:BslBson.opa_rpc_bson_bson) = Obj.magic query in + let (returnFieldSelector_opt:BslBson.opa_rpc_bson_bson option) = Obj.magic returnFieldSelector_opt in + Mongo.start_query m 0l flags ns numberToSkip numberToReturn; + Mongo.bson_init m; + BslBson.Bson.serializeb query m; + Mongo.bson_finish m; + (match returnFieldSelector_opt with + | Some returnFieldSelector -> + Mongo.bson_init m; + BslBson.Bson.serializeb returnFieldSelector m; + Mongo.bson_finish m + | None -> ()); + Mongo.finish m + +##register get_more: mongo_buf, string, int, cursorID -> void +let get_more m ns numberToReturn cursorID = + Mongo.start_getmore m 0l ns numberToReturn cursorID; + Mongo.finish m + +##register delete: mongo_buf, int, string, 'a -> void +let delete m flags ns selector = + let (selector:BslBson.opa_rpc_bson_bson) = Obj.magic selector in + Mongo.start_delete m 0l flags ns; + Mongo.bson_init m; + BslBson.Bson.serializeb selector m; + Mongo.bson_finish m; + Mongo.finish m + +##register kill_cursors: mongo_buf, list(cursorID) -> void +let kill_cursors m clist = + Mongo.start_kill_cursors m 0l (BslNativeLib.opa_list_to_ocaml_list (fun x -> x) clist); + Mongo.finish m + +##register msg: mongo_buf, string -> void +let msg m msg = + Mongo.start_msg m 0l msg; + Mongo.finish m + +##register get: mongo_buf -> string +let get = Mongo.get + +##register export: mongo_buf -> opa[tuple_2(string, int)] +let export m = + let (str, i) = Mongo.export m in + BslNativeLib.opa_tuple_2 (ServerLib.wrap_string str, ServerLib.wrap_int i) + +##register clear: mongo_buf -> void +let clear = Mongo.clear + +##register reset: mongo_buf -> void +let reset = Mongo.reset + +module C = QmlCpsServerLib +open C.Ops + +##register [cps-bypass] read_mongo : Socket.connection, int, continuation(reply) -> void +let read_mongo conn size k = + let buf = Buf.create size in + let rec aux (len, str) = + Buf.append buf str len; + let len = Buf.length buf in + if len < 4 + then Scheduler.read Scheduler.default conn aux + else + if len < Stuff.StuffString.ldi32 buf.Buf.str 0 + then Scheduler.read Scheduler.default conn aux + else buf |> k + in + Scheduler.read Scheduler.default conn aux + +##endmodule diff --git a/stdlib/io/mongo/mongo.opa b/stdlib/io/mongo/mongo.opa new file mode 100644 index 00000000..eea6ed8f --- /dev/null +++ b/stdlib/io/mongo/mongo.opa @@ -0,0 +1,203 @@ +/* + Copyright © 2011 MLstate + + This file is part of OPA. + + OPA is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License, version 3, as published by + the Free Software Foundation. + + OPA is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for + more details. + + You should have received a copy of the GNU Affero General Public License + along with OPA. If not, see . +*/ + +import stdlib.io.socket +import stdlib.core.{rpc.core} + +type mongo_buf = external +type cursorID = external +type reply = external + +type mongo = { + conn : Socket.connection; + mbuf : mongo_buf + } + +//@both <-- ??? why doesn't this work ??? +Mongo = {{ + + /* Flags */ + + /* OP_INSERT */ + _ContinueOnError = 0x00000001 + + /* OP_UPDATE */ + _Upsert = 0x00000001 + _MultiUpdate = 0x00000002 + + /* OP_QUERY */ + _TailableCursor = 0x00000002 + _SlaveOk = 0x00000004 + _OplogReplay = 0x00000008 + _NoCursorTimeout = 0x00000010 + _AwaitData = 0x00000020 + _Exhaust = 0x00000040 + _Partial = 0x00000080 + + /* OP_DELETE */ + _SingleRemove = 0x00000001 + + /* OP_REPLY */ + _CursorNotFound = 0x00000001 + _QueryFailure = 0x00000002 + _ShardConfigStale = 0x00000003 + _AwaitCapable = 0x00000004 + + /** Allocate new buffer of given size **/ + @private create_ = (%% BslMongo.Mongo.create %%: int -> mongo_buf) + + /** Build OP_INSERT message in buffer **/ + @private insert_ = (%% BslMongo.Mongo.insert %%: mongo_buf, int, string, 'a -> void) + + /** Build OP_UPDATE message in buffer **/ + @private update_ = (%% BslMongo.Mongo.update %%: mongo_buf, int, string, 'a, 'a -> void) + + /** Build OP_QUERY message in buffer **/ + @private query_ = (%% BslMongo.Mongo.query %%: mongo_buf, int, string, int, int, 'a, option('a) -> void) + + /** Build OP_GET_MORE message in buffer **/ + @private get_more_ = (%% BslMongo.Mongo.get_more %%: mongo_buf, string, int, cursorID -> void) + + /** Build OP_DELETE message in buffer **/ + @private delete_ = (%% BslMongo.Mongo.delete %%: mongo_buf, int, string, 'a -> void) + + /** Build OP_KILL_CURSORS message in buffer **/ + @private kill_cursors_ = (%% BslMongo.Mongo.kill_cursors %%: mongo_buf, list(cursorID) -> void) + + /** Build OP_MSG message in buffer **/ + @private msg_ = (%% BslMongo.Mongo.msg %%: mongo_buf, string -> void) + + /** Copies string out of buffer. **/ + @private get_ = (%% BslMongo.Mongo.get %%: mongo_buf -> string) + + /** Access the raw string and length **/ + @private export_ = (%% BslMongo.Mongo.export %%: mongo_buf -> (string, int)) + + /** Clear out any data in the buffer, leave buffer allocated **/ + @private clear_ = (%% BslMongo.Mongo.clear %%: mongo_buf -> void) + + /** Reset the buffer, unallocate storage **/ + @private reset_ = (%% BslMongo.Mongo.reset %%: mongo_buf -> void) + + /** + * Specialised read, read until the size equals the (little endian) + * 4-byte int at the start of the reply. + **/ + @private read_mongo = (%% BslMongo.read_mongo %%: Socket.connection, int -> reply) + + /** + * Create new mongo object: + * - Open connection to mongo server at addr:port + * - Allocate buffer of given size + **/ + open(bufsize,addr,port): mongo = + { conn = Socket.connect(addr,port); + mbuf = create_(bufsize) + } + + @private + send_no_reply(m,name): bool = + match export_(m.mbuf) with + | (str, len) -> + s = String.substring(0,len,str) + do println("{name}: s=\n{Bson.dump(10)(s)}") + cnt = Socket.write_len(m.conn,s,len) + do println("cnt={cnt} len={len}") + (cnt==len) + + @private + send_with_reply(m,name): option(reply) = + match export_(m.mbuf) with + | (str, len) -> + s = String.substring(0,len,str) + do println("{name}: s=\n{Bson.dump(10)(s)}") + cnt = Socket.write_len(m.conn,s,len) + do println("cnt={cnt} len={len}") + if (cnt==len) + then {some=Socket.read_mongo(m.conn,(1024*1024))} + else {none} + + /** + * Send OP_INSERT with given collection name: + * - no reply expected + * - returns a bool indicating success or failure + **/ + insert(m,flags,ns,documents): bool = + do insert_(m.mbuf,flags,ns,documents) + send_no_reply(m,"insert") + + /** + * Send OP_UPDATE with given collection name: + * - no reply expected + * - returns a bool indicating success or failure + **/ + update(m,flags,ns,selector,update): bool = + do update_(m.mbuf,flags,ns,selector,update) + send_no_reply(m,"update") + + /** + * Send OP_QUERY and get reply: + **/ + /*query(m,flags,ns,numberToSkip,numberToReturn,query,returnFieldSelector_opt): bool = + do query_(m.mbuf,0,flags,ns,numberToSkip,numberToReturn,query,returnFieldSelector_opt) + send_with_reply(m,"query")*/ + + /** + * Send OP_GETMORE and get reply: + **/ + /*getmore(m,ns,numberToReturn,cursorID): bool = + do getmore_(m.mbuf,0,ns,numberToReturn,cursorID) + send_with_reply(m,"getmore")*/ + + /** + * Send OP_DELETE: + * - no reply expected + * - returns a bool indicating success or failure + **/ + delete(m,flags,ns,selector): bool = + do delete_(m.mbuf,flags,ns,selector) + send_no_reply(m,"delete") + + /** + * Send OP_KILL_CURSORS: + * - no reply expected + * - returns a bool indicating success or failure + **/ + kill_cursors(m,cursors): bool = + do kill_cursors_(m.mbuf,cursors) + send_no_reply(m,"kill_cursors") + + /** + * Send OP_MSG: + * - no reply expected + * - returns a bool indicating success or failure + **/ + msg(m,msg): bool = + do msg_(m.mbuf,msg) + send_no_reply(m,"msg") + + /** + * Close mongo connection and deallocate buffer. + **/ + close(m) = + do Socket.close(m.conn) + do reset_(m.mbuf) + void + +}} +