Skip to content

Commit

Permalink
[feature] stdlib: Added send_with_error, send command, piggyback getl…
Browse files Browse the repository at this point in the history
…asterror query.
  • Loading branch information
nrs135 committed Nov 24, 2011
1 parent 5d41d02 commit 86efce2
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 6 deletions.
4 changes: 4 additions & 0 deletions libbase/mongo.ml
Expand Up @@ -102,6 +102,10 @@ let import s = { Bson.buf = Buf.of_string s; stack = []; finished = true }

let copy m = { Bson.buf = Buf.copy m.Bson.buf; stack = m.Bson.stack; finished = m.Bson.finished }

let concat m1 m2 = { Bson.buf = Buf.of_string(m1.Bson.buf.Buf.str^m2.Bson.buf.Buf.str); stack = []; finished = true }

let append m1 m2 = Buf.add_buf m1.Bson.buf m2.Bson.buf

(*
struct MsgHeader {
int32 messageLength; // total message size, including this
Expand Down
2 changes: 2 additions & 0 deletions libbase/mongo.mli
Expand Up @@ -42,6 +42,8 @@ val get : mongo_buf -> string
val export : mongo_buf -> string * int
val import : string -> mongo_buf
val copy : mongo_buf -> mongo_buf
val concat : mongo_buf -> mongo_buf -> mongo_buf
val append : mongo_buf -> mongo_buf -> unit
val set_header_len : mongo_buf -> int -> unit
val set_header : mongo_buf -> int32 -> int -> int -> unit
val get_buf : ?hint:int -> unit -> Buf.t
Expand Down
6 changes: 6 additions & 0 deletions opabsl/mlbsl/bslMongo.ml
Expand Up @@ -331,6 +331,12 @@ let import = Mongo.import
##register copy: Mongo.mongo_buf -> Mongo.mongo_buf
let copy = Mongo.copy

##register concat: Mongo.mongo_buf, Mongo.mongo_buf -> Mongo.mongo_buf
let concat = Mongo.concat

##register append: Mongo.mongo_buf, Mongo.mongo_buf -> void
let append = Mongo.append

##register clear: Mongo.mongo_buf -> void
let clear = Mongo.clear

Expand Down
13 changes: 11 additions & 2 deletions stdlib/apis/mongo/bson.opa
Expand Up @@ -503,12 +503,21 @@ Bson = {{
/**
* Decide if a document contains an error or not.
**/
isError(doc:Bson.document): bool =
(match find_int(doc,"ok") with {some=ok} -> ok != 0 | {none} -> false) ||
is_error(doc:Bson.document): bool =
(match find_int(doc,"ok") with {some=ok} -> ok != 1 | {none} -> false) ||
(match find_string(doc, "err") with {some=err} -> err != "" | {none} -> false) ||
(match find_int(doc, "code") with {some=code} -> code != 0 | {none} -> false) ||
(match find_string(doc, "errmsg") with {some=errmsg} -> errmsg != "" | {none} -> false)
/**
* Same as is_error but for a [Mongo.error] type.
**/
isError(err:Bson.error): bool =
(match err.ok with {present=ok} -> ok != 1 | {absent} -> false) ||
(match err.err with {present=err} -> err != "" | {absent} -> false) ||
(match err.code with {present=code} -> code != 0 | {absent} -> false) ||
(match err.errmsg with {present=errmsg} -> errmsg != "" | {absent} -> false)
/**
* Same as [string_of_doc] but using an OPA type.
**/
Expand Down
29 changes: 29 additions & 0 deletions stdlib/apis/mongo/collection.opa
Expand Up @@ -240,6 +240,14 @@ MongoCollection = {{
partial(c:Mongo.collection('value)): Mongo.collection('value)
= {c with db={ c.db with query_flags=Bitwise.lor(c.db.query_flags,MongoDriver.PartialBit) }}

@private reply_to_result(from:string, reply_opt: option(Mongo.reply)): Mongo.result =
match reply_opt with
| {some=reply} ->
(match MongoDriver.reply_document(reply,0) with
| {some=doc} -> {success=doc}
| {none} -> {failure={Error="{from}: no document in reply"}})
| {none} -> {failure={Error="{from}: no reply"}}
/**
* Insert an OPA value into a collection.
**/
Expand All @@ -248,13 +256,24 @@ MongoCollection = {{
b = Bson.opa_to_bson(v,{some=@typeval('value)})
MongoDriver.insert(c.db.mongo,c.db.insert_flags,ns,b)
/** insert with getlasterror **/
inserte(c:Mongo.collection('value), v:'value): Mongo.result =
ns = c.db.dbname^"."^c.db.collection
b = Bson.opa_to_bson(v,{some=@typeval('value)})
reply_to_result("MongoConnection.insert",MongoDriver.inserte(c.db.mongo,c.db.insert_flags,ns,c.db.dbname,b))
/**
* Batch insert, you need to build the batch using the [Batch] module.
**/
insert_batch(c:Mongo.collection('value), b:Mongo.batch('value)): bool =
ns = c.db.dbname^"."^c.db.collection
MongoDriver.insert_batch(c.db.mongo,c.db.insert_flags,ns,b)
/** insert_batch with getlasterror **/
insert_batche(c:Mongo.collection('value), b:Mongo.batch('value)): Mongo.result =
ns = c.db.dbname^"."^c.db.collection
reply_to_result("MongoConnection.insert_barch",MongoDriver.insert_batche(c.db.mongo,c.db.insert_flags,ns,c.db.dbname,b))
/**
* Update a value in a collection.
*
Expand All @@ -270,13 +289,23 @@ MongoCollection = {{
ns = c.db.dbname^"."^c.db.collection
MongoDriver.update(c.db.mongo,c.db.update_flags,ns,select,update)
/** update with getlasterror **/
updatee(c:Mongo.collection('value), select:Mongo.select('value), update:Mongo.update('value)): Mongo.result =
ns = c.db.dbname^"."^c.db.collection
reply_to_result("MongoConnection.update",MongoDriver.updatee(c.db.mongo,c.db.update_flags,ns,c.db.dbname,select,update))
/**
* Delete values in a collection according to a select value.
**/
delete(c:Mongo.collection('value), select:Mongo.select('value)): bool =
ns = c.db.dbname^"."^c.db.collection
MongoDriver.delete(c.db.mongo,c.db.delete_flags,ns,select)
/** delete with getlasterror **/
deletee(c:Mongo.collection('value), select:Mongo.select('value)): Mongo.result =
ns = c.db.dbname^"."^c.db.collection
reply_to_result("MongoConnection.delete",MongoDriver.deletee(c.db.mongo,c.db.delete_flags,ns,c.db.dbname,select))
/**
* Return the [Bson.document] representation of a single value selected from
* a collection. This might facilitate more efficient handling of values
Expand Down
1 change: 1 addition & 0 deletions stdlib/apis/mongo/commands.opa
Expand Up @@ -61,6 +61,7 @@
* Note also that some of the results have floating elements which we
* map to the Bson.register type.
**/
//TODO: missing commands [currentOp, killOp]

type Mongo.getLastErrorOptions = {
fsync : Bson.register(bool);
Expand Down
2 changes: 1 addition & 1 deletion stdlib/apis/mongo/connection.opa
Expand Up @@ -175,7 +175,7 @@ MongoConnection = {{
**/
err(db:Mongo.mongodb, msg:string): bool =
err = MongoCommands.getLastError(db, db.dbname)
status = MongoDriver.isError(err)
status = MongoDriver.is_error(err)
do if db.mongo.log && status
then ML.error("MongoConnection.err({db.dbname}.{db.collection})","msg={msg} err={MongoDriver.string_of_result(err)}",void)
status
Expand Down
112 changes: 109 additions & 3 deletions stdlib/apis/mongo/mongo.opa
Expand Up @@ -76,8 +76,20 @@ type Mongo.db = {
max_depth : int;
}

type Mongo.sr = {send:(Mongo.db,Mongo.mongo_buf,string)} / {sendrecv:(Mongo.db,Mongo.mongo_buf,string)} / {stop}
type Mongo.srr = {sendresult:bool} / {sndrcvresult:option(Mongo.reply)} / {stopresult} / {reconnect}
/** Outgoing Cell messages **/
type Mongo.sr =
{send:(Mongo.db,Mongo.mongo_buf,string)} // Send and forget
/ {sendrecv:(Mongo.db,Mongo.mongo_buf,string)} // Send and expect reply
/ {senderror:(Mongo.db,Mongo.mongo_buf,string,string)} // Send and call getlasterror
/ {stop} // Stop the cell

/** Incoming Cell messages **/
type Mongo.srr =
{sendresult:bool}
/ {sndrcvresult:option(Mongo.reply)}
/ {snderrresult:option(Mongo.reply)}
/ {stopresult}
/ {reconnect}

/**
* Mongo driver failure status.
Expand Down Expand Up @@ -223,7 +235,10 @@ MongoDriver = {{
string_of_value_or_failure(results,(l -> List.list_to_string(Bson.to_pretty,l)))

/** Predicate for error status of a [Mongo.result] value. **/
isError(result:Mongo.result): bool = outcome_map(result, Bson.isError, (_ -> true))
is_error(result:Mongo.result): bool = outcome_map(result, Bson.is_error, (_ -> true))

/** Predicate for error status of a [Mongo.result] value. **/
isError(error:Mongo.error): bool = outcome_map(error, Bson.isError, (_ -> true))

/**
* Validate a BSON document by turning it into a [Mongo.result] value.
Expand Down Expand Up @@ -448,6 +463,12 @@ MongoDriver = {{
/* Make a copy of a buffer */
@private copy_ = (%% BslMongo.Mongo.copy %%: Mongo.mongo_buf -> Mongo.mongo_buf)
/* Concatenate two buffers */
@private concat_ = (%% BslMongo.Mongo.concat %%: Mongo.mongo_buf, Mongo.mongo_buf -> Mongo.mongo_buf)
/* Append two buffers */
@private append_ = (%% BslMongo.Mongo.append %%: Mongo.mongo_buf, Mongo.mongo_buf -> void)
/* Clear out any data in the buffer, leave buffer allocated */
@private clear_ = (%% BslMongo.Mongo.clear %%: Mongo.mongo_buf -> void)
Expand Down Expand Up @@ -552,6 +573,35 @@ MongoDriver = {{
| {none} ->
ML.error("Mongo.receive({name})","Attempt to write to unopened connection",{none})
@private
send_with_error(m,mbuf,name,ns): option(Mongo.reply) =
mbuf2 = create_(m.bufsize)
do query_(mbuf2,0,ns^".$cmd",0,1,[H.i32("getlasterror",1)],{none})
mrid = mongo_buf_requestId(mbuf2)
do append_(mbuf,mbuf2)
do free_(mbuf2)
match m.conn.get() with
| {some=conn} ->
if send_no_reply_(m,mbuf,name,true)
then
mailbox = new_mailbox_(m.bufsize)
(match read_mongo_(conn,m.comms_timeout,mailbox) with
| {success=reply} ->
rrt = reply_responseTo(reply)
do reset_mailbox_(mailbox)
do free_(mbuf)
do if m.log then ML.debug("Mongo.send_with_error({name})","\n{string_of_message_reply(reply)}",void)
if mrid != rrt
then ML.error("MongoDriver.send_with_error","RequestId mismatch, expected {mrid}, got {rrt}",{none})
else {some=reply}
| {~failure} ->
do if m.log then ML.info("send_with_error","failure={failure}",void)
do reset_mailbox_(mailbox)
{none})
else {none}
| {none} ->
ML.error("Mongo.send_with_error({name})","Attempt to write to unopened connection",{none})
@private
sr(_, msg) =
match msg with
Expand All @@ -571,6 +621,14 @@ MongoDriver = {{
| {none} ->
do ML.error("Mongo.sendrecv","Unopened connection",void)
{return={sndrcvresult={none}}; instruction={unchanged}})
| {senderror=(m,mbuf,name,ns)} ->
(match m.conn.get() with
| {some=_conn} ->
swe = send_with_error(m,mbuf,name,ns)
{return=if Option.is_some(swe) then {snderrresult=swe} else {reconnect}; instruction={unchanged}}
| {none} ->
do ML.error("Mongo.senderror","Unopened connection",void)
{return={snderrresult={none}}; instruction={unchanged}})
| {stop} ->
{return={stopresult}; instruction={stop}}
Expand All @@ -594,6 +652,16 @@ MongoDriver = {{
| {~sndrcvresult} -> sndrcvresult
| _ -> @fail
@private
snderr(m,mbuf,name,ns) =
match Cell.call(m.conncell,({senderror=(m,mbuf,name,ns)}:Mongo.sr)):Mongo.srr with
| {reconnect} ->
if reconnect("send_with_error",m)
then snderr(m,mbuf,name,ns)
else ML.fatal("Mongo.snderr({name}):","comms error (Can't reconnect)",-1)
| {~snderrresult} -> snderrresult
| _ -> @fail
@private
stop(m) =
match Cell.call(m.conncell,({stop}:Mongo.sr)):Mongo.srr with
Expand Down Expand Up @@ -683,6 +751,14 @@ MongoDriver = {{
do insert_(mbuf,flags,ns,documents)
snd(m,mbuf,"insert")
/**
* Same as insert but piggyback a getlasterror command.
**/
inserte(m:Mongo.db, flags:int, ns:string, dbname:string, documents:Bson.document): option(Mongo.reply) =
mbuf = create_(m.bufsize)
do insert_(mbuf,flags,ns,documents)
snderr(m,mbuf,"insert",dbname)
/**
* [insertf]: same as [insert] but using tags instead of bit-wise flags.
**/
Expand All @@ -699,6 +775,12 @@ MongoDriver = {{
do insert_batch_(mbuf,flags,ns,documents)
snd(m,mbuf,"insert")
/** insert_batch with added getlasterror query **/
insert_batche(m:Mongo.db, flags:int, ns:string, dbname:string, documents:list(Bson.document)): option(Mongo.reply) =
mbuf = create_(m.bufsize)
do insert_batch_(mbuf,flags,ns,documents)
snderr(m,mbuf,"insert",dbname)
/**
* [insert_batchf]: same as [insert_batch] but using tags instead of bit-wise flags.
**/
Expand All @@ -717,6 +799,12 @@ MongoDriver = {{
do update_(mbuf,flags,ns,selector,update)
snd(m,mbuf,"update")
/** update with added getlasterror query **/
updatee(m:Mongo.db, flags:int, ns:string, dbname:string, selector:Bson.document, update:Bson.document): option(Mongo.reply) =
mbuf = create_(m.bufsize)
do update_(mbuf,flags,ns,selector,update)
snderr(m,mbuf,"update",dbname)
/**
* [updatef]: same as [update] but using tags instead of bit-wise flags.
**/
Expand Down Expand Up @@ -765,6 +853,12 @@ MongoDriver = {{
do delete_(mbuf,flags,ns,selector)
snd(m,mbuf,"delete")
/** delete with added getlasterror query **/
deletee(m:Mongo.db, flags:int, ns:string, dbname:string, selector:Bson.document): option(Mongo.reply) =
mbuf = create_(m.bufsize)
do delete_(mbuf,flags,ns,selector)
snderr(m,mbuf,"delete",dbname)
/**
* [deletef]: same as [delete] but using tags instead of bit-wise flags.
**/
Expand All @@ -781,6 +875,12 @@ MongoDriver = {{
do kill_cursors_(mbuf,cursors)
snd(m,mbuf,"kill_cursors")
/** kill_cursors with added getlasterror query **/
kill_cursorse(m:Mongo.db, dbname:string, cursors:list(Mongo.cursorID)): option(Mongo.reply) =
mbuf = create_(m.bufsize)
do kill_cursors_(mbuf,cursors)
snderr(m,mbuf,"kill_cursors",dbname)
/**
* Send OP_MSG.
* @return a bool indicating whether the message was successfully sent or not.
Expand All @@ -790,6 +890,12 @@ MongoDriver = {{
do msg_(mbuf,msg)
snd(m,mbuf,"msg")
/** kill_cursors with added getlasterror query **/
msge(m:Mongo.db, dbname:string, msg:string): option(Mongo.reply) =
mbuf = create_(m.bufsize)
do msg_(mbuf,msg)
snderr(m,mbuf,"msg",dbname)
/** Access components of the reply value **/
reply_messageLength = (%% BslMongo.Mongo.reply_messageLength %% : Mongo.reply -> int)
reply_requestId = (%% BslMongo.Mongo.reply_requestId %% : Mongo.reply -> int)
Expand Down

0 comments on commit 86efce2

Please sign in to comment.