Permalink
Browse files

[enhance] stdlib: Split MongoDriver into MongoCommon, made MongoDrive…

…r private.
  • Loading branch information...
1 parent a006b89 commit 1aec0f57f14c2d51c52c629792bb52cc8791eabe @nrs135 nrs135 committed Nov 23, 2011
View
34 stdlib/apis/mongo/collection.opa
@@ -187,7 +187,7 @@ MongoCollection = {{
openfatal(name:string, dbname:string, collection:string): Mongo.collection('value) =
match open(name, dbname, collection) with
| {success=coll} -> coll
- | {~failure} -> ML.fatal("MongoCollection.openfatal","Can't connect: {MongoDriver.string_of_failure(failure)}",-1)
+ | {~failure} -> ML.fatal("MongoCollection.openfatal","Can't connect: {MongoCommon.string_of_failure(failure)}",-1)
/** Supply a set of useful values associated with a collection **/
makepkg(c:Mongo.collection('value)): Mongo.pkg('value) =
@@ -199,7 +199,7 @@ MongoCollection = {{
/** Same as [open] but returning pre-typed select and update functions **/
openpkg(name:string, dbname:string, collection:string): outcome(Mongo.pkg('value),Mongo.failure) =
- MongoDriver.map_success(open(name,dbname,collection),makepkg)
+ MongoCommon.map_success(open(name,dbname,collection),makepkg)
/** Same as [openfatal] but returning pre-typed select and update functions **/
openpkgfatal(name:string, dbname:string, collection:string) : Mongo.pkg('value) =
@@ -239,47 +239,47 @@ MongoCollection = {{
/** Set the "continueOnError" flag for all [insert] calls. **/
continueOnError(c:Mongo.collection('value)): Mongo.collection('value) =
- {c with db={ c.db with insert_flags=Bitwise.lor(c.db.insert_flags,MongoDriver.ContinueOnErrorBit) }}
+ {c with db={ c.db with insert_flags=Bitwise.lor(c.db.insert_flags,MongoCommon.ContinueOnErrorBit) }}
/** Set the "Upsert" flag for all [update] calls. **/
upsert(c:Mongo.collection('value)): Mongo.collection('value)
- = {c with db={ c.db with update_flags=Bitwise.lor(c.db.update_flags,MongoDriver.UpsertBit) }}
+ = {c with db={ c.db with update_flags=Bitwise.lor(c.db.update_flags,MongoCommon.UpsertBit) }}
/** Set the "multiUpdate" flag for all [update] calls. **/
multiUpdate(c:Mongo.collection('value)): Mongo.collection('value)
- = {c with db={ c.db with update_flags=Bitwise.lor(c.db.update_flags,MongoDriver.MultiUpdateBit) }}
+ = {c with db={ c.db with update_flags=Bitwise.lor(c.db.update_flags,MongoCommon.MultiUpdateBit) }}
/** Set the "singleRemove" flag for all [delete] calls. **/
singleRemove(c:Mongo.collection('value)): Mongo.collection('value)
- = {c with db={ c.db with delete_flags=Bitwise.lor(c.db.delete_flags,MongoDriver.SingleRemoveBit) }}
+ = {c with db={ c.db with delete_flags=Bitwise.lor(c.db.delete_flags,MongoCommon.SingleRemoveBit) }}
/** Set the "tailableCursor" flag for all [query] calls. **/
tailableCursor(c:Mongo.collection('value)): Mongo.collection('value)
- = {c with db={ c.db with query_flags=Bitwise.lor(c.db.query_flags,MongoDriver.TailableCursorBit) }}
+ = {c with db={ c.db with query_flags=Bitwise.lor(c.db.query_flags,MongoCommon.TailableCursorBit) }}
/** Set the "slaveOk" flag for all [query] calls. **/
slaveOk(c:Mongo.collection('value)): Mongo.collection('value)
- = {c with db={ c.db with query_flags=Bitwise.lor(c.db.query_flags,MongoDriver.SlaveOkBit) }}
+ = {c with db={ c.db with query_flags=Bitwise.lor(c.db.query_flags,MongoCommon.SlaveOkBit) }}
/** Set the "oplogReplay" flag for all [query] calls. **/
oplogReplay(c:Mongo.collection('value)): Mongo.collection('value)
- = {c with db={ c.db with query_flags=Bitwise.lor(c.db.query_flags,MongoDriver.OplogReplayBit) }}
+ = {c with db={ c.db with query_flags=Bitwise.lor(c.db.query_flags,MongoCommon.OplogReplayBit) }}
/** Set the "noCursorTimeout" flag for all [query] calls. **/
noCursorTimeout(c:Mongo.collection('value)): Mongo.collection('value)
- = {c with db={ c.db with query_flags=Bitwise.lor(c.db.query_flags,MongoDriver.NoCursorTimeoutBit) }}
+ = {c with db={ c.db with query_flags=Bitwise.lor(c.db.query_flags,MongoCommon.NoCursorTimeoutBit) }}
/** Set the "awaitData" flag for all [query] calls. **/
awaitData(c:Mongo.collection('value)): Mongo.collection('value)
- = {c with db={ c.db with query_flags=Bitwise.lor(c.db.query_flags,MongoDriver.AwaitDataBit) }}
+ = {c with db={ c.db with query_flags=Bitwise.lor(c.db.query_flags,MongoCommon.AwaitDataBit) }}
/** Set the "exhaust" flag for all [query] calls. **/
exhaust(c:Mongo.collection('value)): Mongo.collection('value)
- = {c with db={ c.db with query_flags=Bitwise.lor(c.db.query_flags,MongoDriver.ExhaustBit) }}
+ = {c with db={ c.db with query_flags=Bitwise.lor(c.db.query_flags,MongoCommon.ExhaustBit) }}
/** Set the "partial" flag for all [query] calls. **/
partial(c:Mongo.collection('value)): Mongo.collection('value)
- = {c with db={ c.db with query_flags=Bitwise.lor(c.db.query_flags,MongoDriver.PartialBit) }}
+ = {c with db={ c.db with query_flags=Bitwise.lor(c.db.query_flags,MongoCommon.PartialBit) }}
/**
* Insert an OPA value into a collection.
@@ -293,7 +293,7 @@ MongoCollection = {{
inserte(c:Mongo.collection('value), v:'value): Mongo.result =
ns = c.db.dbname^"."^c.db.collection
b = Bson.opa_to_bson(v,{some=@typeval('value)})
- MongoDriver.reply_to_result("MongoConnection.insert",0,MongoDriver.inserte(c.db.mongo,c.db.insert_flags,ns,c.db.dbname,b))
+ MongoCommon.reply_to_result("MongoConnection.insert",0,MongoDriver.inserte(c.db.mongo,c.db.insert_flags,ns,c.db.dbname,b))
/**
* Batch insert, you need to build the batch using the [Batch] module.
@@ -305,7 +305,7 @@ MongoCollection = {{
/** insert_batch with getlasterror **/
insert_batche(c:Mongo.collection('value), b:Mongo.batch('value)): Mongo.result =
ns = c.db.dbname^"."^c.db.collection
- MongoDriver.reply_to_result("MongoConnection.insert_batch",0,
+ MongoCommon.reply_to_result("MongoConnection.insert_batch",0,
MongoDriver.insert_batche(c.db.mongo,c.db.insert_flags,ns,c.db.dbname,b))
/**
@@ -326,7 +326,7 @@ MongoCollection = {{
/** update with getlasterror **/
updatee(c:Mongo.collection('value), select:Mongo.select('value), update:Mongo.update('value)): Mongo.result =
ns = c.db.dbname^"."^c.db.collection
- MongoDriver.reply_to_result("MongoConnection.update",0,
+ MongoCommon.reply_to_result("MongoConnection.update",0,
MongoDriver.updatee(c.db.mongo,c.db.update_flags,ns,c.db.dbname,select,update))
/**
@@ -339,7 +339,7 @@ MongoCollection = {{
/** delete with getlasterror **/
deletee(c:Mongo.collection('value), select:Mongo.select('value)): Mongo.result =
ns = c.db.dbname^"."^c.db.collection
- MongoDriver.reply_to_result("MongoConnection.delete",0,
+ MongoCommon.reply_to_result("MongoConnection.delete",0,
MongoDriver.deletee(c.db.mongo,c.db.delete_flags,ns,c.db.dbname,select))
/**
View
24 stdlib/apis/mongo/commands.opa
@@ -51,7 +51,7 @@
* MongoDB server commands.
*
* If we just want a single value out of the result,
- * then it is probably more efficient to use the [MongoDriver.dotresult_type] functions.
+ * then it is probably more efficient to use the [MongoCommon.dotresult_type] functions.
* If we want to manipulate the whole thing in OPA, however, we can just
* [Bosn.doc2opa] the result and cast to these types.
*
@@ -180,7 +180,7 @@ MongoCommands = {{
**/
run_command_ll(m:Mongo.db, ns:string, command:Bson.document): Mongo.result =
match MongoCursor.find_one(m, ns^".$cmd", command, {none}, {none}) with
- | {success=bson} -> MongoDriver.check_ok(bson)
+ | {success=bson} -> MongoCommon.check_ok(bson)
| {~failure} -> {~failure}
run_command(m:Mongo.mongodb, ns:string, command:Bson.document): Mongo.result =
@@ -219,7 +219,7 @@ MongoCommands = {{
dbToOpa(m:Mongo.mongodb, dbname:string, command:string): outcome('a,Mongo.failure) =
match simple_int_command(m,dbname,command,1) with
| {success=doc} ->
- (match MongoDriver.result_to_opa({success=doc}) with
+ (match MongoCommon.result_to_opa({success=doc}) with
| {some=ism} -> {success=ism}
| {none} -> {failure={Error="Mongo.{command}: invalid document from db {dbname} ({Bson.to_pretty(doc)})"}})
| {~failure} -> {~failure}
@@ -292,7 +292,7 @@ MongoCommands = {{
* Return the last error from database.
**/
getLastError(m:Mongo.mongodb, db:string): Mongo.result = simple_int_command(m, db, "getlasterror", 1)
- getLastErrorOpa(m:Mongo.mongodb, db:string): Mongo.error = MongoDriver.error_of_result(getLastError(m, db))
+ getLastErrorOpa(m:Mongo.mongodb, db:string): Mongo.error = MongoCommon.error_of_result(getLastError(m, db))
/**
* Return the last error from database, with full options.
@@ -303,7 +303,7 @@ MongoCommands = {{
simple_int_command_opts(m, db, "getlasterror", 1,
[H.bool("fsync",fsync), H.bool("j",j), H.i32("w",w), H.i32("wtimeout",wtimeout)])
getLastErrorFullOpa(m:Mongo.mongodb, db:string, fsync:bool, j:bool, w:int, wtimeout:int): Mongo.error =
- MongoDriver.error_of_result(getLastErrorFull(m, db, fsync, j, w, wtimeout))
+ MongoCommon.error_of_result(getLastErrorFull(m, db, fsync, j, w, wtimeout))
/**
* Reset database error status.
@@ -331,7 +331,7 @@ MongoCommands = {{
collStats(m:Mongo.mongodb, db:string, collection:string): Mongo.result =
simple_str_command(m, db, "collStats", collection)
collStatsOpa(m:Mongo.mongodb, db:string, collection:string): outcome(Mongo.collStatsType,Mongo.failure) =
- MongoDriver.resultToOpa(collStats(m, db, collection))
+ MongoCommon.resultToOpa(collStats(m, db, collection))
/**
* Create a collection.
@@ -448,7 +448,7 @@ MongoCommands = {{
* Low-level, set "config.settings" balancer value. Valid objects are "stopped" and "start/stop".
**/
setBalancer(m:Mongo.mongodb, param:Bson.document): bool =
- MongoDriver.update(m.mongo,MongoDriver.UpsertBit,"config.settings",[H.str("_id","balancer")],[H.doc("$set",param)])
+ MongoDriver.update(m.mongo,MongoCommon.UpsertBit,"config.settings",[H.str("_id","balancer")],[H.doc("$set",param)])
/**
* Update the balancer settings, [true]=stopped
@@ -466,7 +466,7 @@ MongoCommands = {{
* Set chunksize in MB.
**/
setChunkSize(m:Mongo.mongodb, size:int): bool =
- MongoDriver.update(m.mongo,MongoDriver.UpsertBit,"config.settings",[H.str("_id","chunksize")],[H.doc("$set",[H.i32("value",size)])])
+ MongoDriver.update(m.mongo,MongoCommon.UpsertBit,"config.settings",[H.str("_id","chunksize")],[H.doc("$set",[H.i32("value",size)])])
/**
* Query the "config.chunks" database, gives a information about shard distribution.
@@ -693,23 +693,23 @@ MongoCommands = {{
/** Same as [findAndModify] but convert to OPA type **/
findAndModifyOpa(m,dbname,collection,query,update_opt,new_opt,remove_opt,sort_opt): outcome('a,Mongo.failure) =
- MongoDriver.resultToOpa(findAndModify(m,dbname,collection,query,update_opt,new_opt,remove_opt,sort_opt))
+ MongoCommon.resultToOpa(findAndModify(m,dbname,collection,query,update_opt,new_opt,remove_opt,sort_opt))
/** Update-specific version of the [findAndModify] command **/
findAndUpdate(m,dbname,collection,query,update,new_opt,sort_opt) =
fAM(m,dbname,collection,query,{some=update},new_opt,{none},sort_opt)
/** Same as [findAndUpdate] but convert to OPA type **/
findAndUpdateOpa(m,dbname,collection,query,update,new_opt,sort_opt) =
- MongoDriver.resultToOpa(findAndUpdate(m,dbname,collection,query,update,new_opt,sort_opt))
+ MongoCommon.resultToOpa(findAndUpdate(m,dbname,collection,query,update,new_opt,sort_opt))
/** Remove-specific version of the [findAndModify] command **/
findAndRemove(m,dbname,collection,query,remove,new_opt,sort_opt) =
fAM(m,dbname,collection,query,{none},new_opt,{some=remove},sort_opt)
/** Same as [findAndRemove] but convert to OPA type **/
findAndRemoveOpa(m,dbname,collection,query,remove,new_opt,sort_opt) =
- MongoDriver.resultToOpa(findAndRemove(m,dbname,collection,query,remove,new_opt,sort_opt))
+ MongoCommon.resultToOpa(findAndRemove(m,dbname,collection,query,remove,new_opt,sort_opt))
@private pass_digest(user:string, pass:string): string = Crypto.Hash.md5("{user}:mongo:{pass}")
@@ -721,7 +721,7 @@ MongoCommands = {{
digest = pass_digest(user,pass)
bselector = [H.str("user",user)]
bupdate = [H.doc("$set",[H.str("pwd",digest)])]
- MongoDriver.update(m.mongo,MongoDriver.UpsertBit,(db^".system.users"),bselector,bupdate)
+ MongoDriver.update(m.mongo,MongoCommon.UpsertBit,(db^".system.users"),bselector,bupdate)
/**
* Authenticate a user for the given database.
View
455 stdlib/apis/mongo/common.opa
@@ -0,0 +1,455 @@
+/*
+ Copyright © 2011 MLstate
+
+ This file is part of OPA.
+
+ OPA is free software: you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License, version 3, as published by
+ the Free Software Foundation.
+
+ OPA is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
+ more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with OPA. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+/**
+ * MongoDB binding for OPA.
+ *
+ * @destination public
+ * @stabilization work in progress
+ **/
+
+/**
+ * {1 About this module}
+ *
+ * Module [MongoCommon] has common routines for handling responses from MongoDB driver functions.
+ *
+ * This module also has routines for dealing with tags and for accessing the [Mongo.reply] external type.
+ *
+ * {1 Where should I start?}
+ *
+ * {1 What if I need more?}
+ *
+ **/
+
+/** Type for a connection, host name and port **/
+type Mongo.mongo_host = (string, int)
+
+/** Type of concurrency handling **/
+type Mongo.concurrency = {pool} / {cell} / {singlethreaded}
+
+/**
+ * Mongo driver failure status.
+ * Either a failure document returned by the MongoDB server,
+ * an error string generated by the driver or [Incomplete]
+ * which signals that expected fields were missing from a reply.
+ * The [OK] value is for a connection which has been initialised but never used.
+ **/
+type Mongo.failure =
+ {OK}
+ / {Error : string}
+ / {DocError : Bson.document}
+ / {Incomplete}
+ / {NotFound}
+
+/**
+ * Mongo success status, just a document.
+ **/
+type Mongo.success = Bson.document
+type Mongo.successes = list(Bson.document)
+
+/**
+ * A Mongo driver result value is either a valid document
+ * or a [Mongo.failure] value.
+ **/
+type Mongo.result = outcome(Mongo.success, Mongo.failure)
+type Mongo.results = outcome(Mongo.successes, Mongo.failure)
+
+/**
+ * A Mongo error is either an error value which is an OPA
+ * value containing the error information from a [Bson.document]
+ * or a [Mongo.failure] value.
+ **/
+type Mongo.error = outcome(Bson.error, Mongo.failure)
+
+/* Flag tags */
+
+/** OP_INSERT **/
+type Mongo.insert_tag =
+ {ContinueOnError}
+
+/** OP_UPDATE **/
+type Mongo.update_tag =
+ {Upsert} /
+ {MultiUpdate}
+
+/** OP_QUERY **/
+type Mongo.query_tag =
+ {TailableCursor} /
+ {SlaveOk} /
+ {OplogReplay} /
+ {NoCursorTimeout} /
+ {AwaitData} /
+ {Exhaust} /
+ {Partial}
+
+/** OP_DELETE **/
+type Mongo.delete_tag =
+ {SingleRemove}
+
+/** OP_REPLY **/
+type Mongo.reply_tag =
+ {CursorNotFound} /
+ {QueryFailure} /
+ {ShardConfigStale} /
+ {AwaitCapable}
+
+/** Tags for indices **/
+type Mongo.index_tag =
+ {Unique} /
+ {DropDups} /
+ {Background} /
+ {Sparse}
+
+/**
+ * We wrap the tags so that we can tell if it is an insert tag,
+ * query tag etc. We don't want to send SingleRemove to an update.
+ **/
+type Mongo.mongo_tag =
+ {itag:Mongo.insert_tag} /
+ {utag:Mongo.update_tag} /
+ {qtag:Mongo.query_tag} /
+ {dtag:Mongo.delete_tag} /
+ {rtag:Mongo.reply_tag} /
+ {xtag:Mongo.index_tag}
+
+@server_private
+MongoCommon = {{
+
+ @private ML = MongoLog
+ @private H = Bson.Abbrevs
+
+ /** Generate a driver error message outcome **/
+ failErr(msg:string): outcome('a,Mongo.failure) = {failure={Error=msg}}
+
+ /** Map either a success function or a failure function over an outcome **/
+ map_outcome(outcome:outcome('s,'f), sfn:'s->'t, ffn:'f->'g): outcome('t,'g) =
+ match outcome with
+ | {~success} -> {success=sfn(success)}
+ | {~failure} -> {failure=ffn(failure)}
+
+ /** Map a function over a success, leave failures untouched **/
+ map_success(outcome, sfn) = map_outcome(outcome, sfn, (f -> f))
+
+ /** Map a function over a failure, leave successes untouched **/
+ map_failure(outcome, ffn) = map_outcome(outcome, (s -> s), ffn)
+
+ /** Map a pair of convergent functions over an outcome **/
+ outcome_map(outcome:outcome('s,'f), sfn:'s->'r, ffn:'f->'r): 'r =
+ match outcome with
+ | {~success} -> sfn(success)
+ | {~failure} -> ffn(failure)
+
+ /** Same as [outcome_map] but coerced to string **/
+ string_of_outcome = (outcome_map:outcome('s,'f), ('s->string), ('f->string) -> string)
+
+ /** Turn a result into a [Mongo.error] value **/
+ error_of_result(result:Mongo.result): Mongo.error = map_success(result, Bson.error_of_document)
+
+ /** Make a readable string out of a [Mongo.error] value **/
+ string_of_error(error:Mongo.error): string = outcome_map(error, Bson.string_of_error, string_of_failure)
+
+ /** String representation of a [Mongo.failure] value **/
+ string_of_failure(failure:Mongo.failure): string =
+ match failure with
+ | {Error=str} -> str
+ | {DocError=doc} -> Bson.string_of_doc_error(doc)
+ | {OK} -> "Ok"
+ | {Incomplete} -> "Incomplete"
+ | {NotFound} -> "NotFound"
+
+ /** Make an error report string out of a [Mongo.result] value, will print "<ok>" if no error. **/
+ string_of_result(result:Mongo.result): string = outcome_map(result, Bson.string_of_doc_error, string_of_failure)
+
+ /** Same for a list of results. **/
+ string_of_results(results:Mongo.results): string =
+ outcome_map(results, (l -> List.list_to_string(Bson.string_of_doc_error,l)), string_of_failure)
+
+ /** Similar to [string_of_result] but the success value is user-defined. **/
+ string_of_value_or_failure(result:outcome('a,Mongo.failure), success_to_str:'a->string): string =
+ string_of_outcome(result, success_to_str, (failure -> "\{failure={string_of_failure(failure)}\}"))
+
+ /** Either pretty-print the document or generate a failure string. **/
+ pretty_of_result(result:Mongo.result): string = string_of_value_or_failure(result,Bson.to_pretty)
+
+ /** Same as [pretty_of_result] but for a list of results. **/
+ pretty_of_results(results:Mongo.results): string =
+ string_of_value_or_failure(results,(l -> List.list_to_string(Bson.to_pretty,l)))
+
+ /** Predicate for error status of a [Mongo.result] value. **/
+ is_error(result:Mongo.result): bool = outcome_map(result, Bson.is_error, (_ -> true))
+
+ /** Predicate for error status of a [Mongo.result] value. **/
+ isError(error:Mongo.error): bool = outcome_map(error, Bson.isError, (_ -> true))
+
+ /**
+ * Validate a BSON document by turning it into a [Mongo.result] value.
+ * If [ok] is not 1 or there is an [errmsg] value then turn it into a [Mongo.failure] value.
+ * Note that if there is no "ok" element then we assume success.
+ **/
+ check_ok(bson:Bson.document): Mongo.result =
+ match Bson.find_int(bson,"ok") with
+ | {some=ok} ->
+ if ok == 1
+ then {success=bson}
+ else
+ (match Bson.find_string(bson,"errmsg") with
+ | {some=errmsg} -> failErr(errmsg)
+ | _ -> failErr("ok:{ok}"))
+ | _ -> {success=bson}
+
+ /**
+ * Outcome-wrapped versions of Bson.find_xxx etc.
+ **/
+ @private
+ result_(result:Mongo.result, key:string, find:(Bson.document, string -> option('a))): option('a) =
+ match result with
+ | {success=doc} -> find(doc,key)
+ | {failure=_} -> {none}
+
+ result_bool(result:Mongo.result,key:string): option(bool) = result_(result, key, Bson.find_bool)
+ result_int(result:Mongo.result,key:string): option(int) = result_(result, key, Bson.find_int)
+ result_float(result:Mongo.result,key:string): option(float) = result_(result, key, Bson.find_float)
+ result_string(result:Mongo.result,key:string): option(string) = result_(result, key, Bson.find_string)
+ result_doc(result:Mongo.result,key:string): option(Bson.document) = result_(result, key, Bson.find_doc)
+
+ /**
+ * Same as outcome-wrapped versions but allowing dot notation.
+ **/
+ @private
+ dotresult_(result:Mongo.result,key:string,find:(Bson.document, string -> option('a))): option('a) =
+ match result with
+ | {success=doc} -> Bson.find_dot(doc,key,find)
+ | {failure=_} -> {none}
+
+ dotresult_bool(result:Mongo.result,key:string): option(bool) = dotresult_(result, key, Bson.find_bool)
+ dotresult_int(result:Mongo.result,key:string): option(int) = dotresult_(result, key, Bson.find_int)
+ dotresult_float(result:Mongo.result,key:string): option(float) = dotresult_(result, key, Bson.find_float)
+ dotresult_string(result:Mongo.result,key:string): option(string) = dotresult_(result, key, Bson.find_string)
+ dotresult_doc(result:Mongo.result,key:string): option(Bson.document) = dotresult_(result, key, Bson.find_doc)
+
+ /**
+ * If a result is success then return an OPA value from the
+ * document using [Bson.doc2opa]. Must be cast at point of call.
+ **/
+ result_to_opa(result:Mongo.result): option('a) =
+ match result with
+ | {success=doc} -> (Bson.doc2opa(doc):option('a))
+ | {failure=_} -> {none}
+
+ /**
+ * Same as [result_to_opa] but returning an outcome instead of an option.
+ **/
+ resultToOpa(result:Mongo.result): outcome('a,Mongo.failure) =
+ match result with
+ | {success=doc} ->
+ (match (Bson.doc2opa(doc):option('a)) with
+ | {some=a} -> {success=a}
+ | {none} -> failErr("MongoDriver.resultToOpa: document conversion failure"))
+ | {~failure} -> {~failure}
+
+ /** Flag bitmasks **/
+
+ /* OP_INSERT */
+ ContinueOnErrorBit = 0x00000001
+
+ /* OP_UPDATE */
+ UpsertBit = 0x00000001
+ MultiUpdateBit = 0x00000002
+
+ /* OP_QUERY */
+ TailableCursorBit = 0x00000002
+ SlaveOkBit = 0x00000004
+ OplogReplayBit = 0x00000008
+ NoCursorTimeoutBit = 0x00000010
+ AwaitDataBit = 0x00000020
+ ExhaustBit = 0x00000040
+ PartialBit = 0x00000080
+
+ /* OP_DELETE */
+ SingleRemoveBit = 0x00000001
+
+ /* OP_REPLY */
+ CursorNotFoundBit = 0x00000001
+ QueryFailureBit = 0x00000002
+ ShardConfigStaleBit = 0x00000004
+ AwaitCapableBit = 0x00000008
+
+ /* Flags used by the index routines. */
+ UniqueBit = 0x00000001
+ DropDupsBit = 0x00000002
+ BackgroundBit = 0x00000004
+ SparseBit = 0x00000008
+
+ /**
+ * [flag_of_tag]: Turn a list of tags into a bit-wise flag suitable
+ * for sending to MongoDB. We have an extra layer of types to allow
+ * forcing of tags to belong to a particular operation.
+ **/
+ flag_of_tag(tag:Mongo.mongo_tag): int =
+ match tag with
+ /* OP_INSERT */
+ | {itag={ContinueOnError}} -> ContinueOnErrorBit
+
+ /* OP_UPDATE */
+ | {utag={Upsert}} -> UpsertBit
+ | {utag={MultiUpdate}} -> MultiUpdateBit
+
+ /* OP_QUERY */
+ | {qtag={TailableCursor}} -> TailableCursorBit
+ | {qtag={SlaveOk}} -> SlaveOkBit
+ | {qtag={OplogReplay}} -> OplogReplayBit
+ | {qtag={NoCursorTimeout}} -> NoCursorTimeoutBit
+ | {qtag={AwaitData}} -> AwaitDataBit
+ | {qtag={Exhaust}} -> ExhaustBit
+ | {qtag={Partial}} -> PartialBit
+
+ /* OP_DELETE */
+ | {dtag={SingleRemove}} -> SingleRemoveBit
+
+ /* OP_REPLY */
+ | {rtag={CursorNotFound}} -> CursorNotFoundBit
+ | {rtag={QueryFailure}} -> QueryFailureBit
+ | {rtag={ShardConfigStale}} -> ShardConfigStaleBit
+ | {rtag={AwaitCapable}} -> AwaitCapableBit
+
+ /* Index tags */
+ | {xtag={Unique}} -> UniqueBit
+ | {xtag={DropDups}} -> DropDupsBit
+ | {xtag={Background}} -> BackgroundBit
+ | {xtag={Sparse}} -> SparseBit
+
+ /**
+ * Turn a list of tags into a single MongoDB-compatible int.
+ **/
+ flags(tags:list(Mongo.mongo_tag)): int =
+ List.fold_left((flag, tag -> Bitwise.land(flag,flag_of_tag(tag))),0,tags)
+
+ /**
+ * Extract the tags from a given bit-wise flag. These are specific
+ * to each operation, you need to know which operation the flag was for/from
+ * before you can give meaning to the bits.
+ **/
+ insert_tags(flag:int): list(Mongo.mongo_tag) =
+ if Bitwise.land(flag,ContinueOnErrorBit) != 0 then [{itag={ContinueOnError}}] else []
+
+ update_tags(flag:int): list(Mongo.mongo_tag) =
+ tags = if Bitwise.land(flag,UpsertBit) != 0 then [{utag={Upsert}}] else []
+ if Bitwise.land(flag,MultiUpdateBit) != 0 then [{utag={MultiUpdate}}|tags] else tags
+
+ query_tags(flag:int): list(Mongo.mongo_tag) =
+ tags = if Bitwise.land(flag,TailableCursorBit) != 0 then [{qtag={TailableCursor}}] else []
+ tags = if Bitwise.land(flag,SlaveOkBit) != 0 then [{qtag={SlaveOk}}|tags] else tags
+ tags = if Bitwise.land(flag,OplogReplayBit) != 0 then [{qtag={OplogReplay}}|tags] else tags
+ tags = if Bitwise.land(flag,NoCursorTimeoutBit) != 0 then [{qtag={NoCursorTimeout}}|tags] else tags
+ tags = if Bitwise.land(flag,AwaitDataBit) != 0 then [{qtag={AwaitData}}|tags] else tags
+ tags = if Bitwise.land(flag,ExhaustBit) != 0 then [{qtag={Exhaust}}|tags] else tags
+ if Bitwise.land(flag,PartialBit) != 0 then [{qtag={Partial}}|tags] else tags
+
+ delete_tags(flag:int): list(Mongo.mongo_tag) =
+ if Bitwise.land(flag,SingleRemoveBit) != 0 then [{dtag={SingleRemove}}] else []
+
+ reply_tags(flag:int): list(Mongo.mongo_tag) =
+ tags = if Bitwise.land(flag,CursorNotFoundBit) != 0 then [{rtag={CursorNotFound}}] else []
+ tags = if Bitwise.land(flag,QueryFailureBit) != 0 then [{rtag={QueryFailure}}|tags] else tags
+ tags = if Bitwise.land(flag,ShardConfigStaleBit) != 0 then [{rtag={ShardConfigStale}}|tags] else tags
+ if Bitwise.land(flag,AwaitCapableBit) != 0 then [{rtag={AwaitCapable}}|tags] else tags
+
+ index_tags(flag:int): list(Mongo.mongo_tag) =
+ tags = if Bitwise.land(flag,UniqueBit) != 0 then [{xtag={Unique}}] else []
+ tags = if Bitwise.land(flag,DropDupsBit) != 0 then [{xtag={DropDups}}|tags] else tags
+ tags = if Bitwise.land(flag,BackgroundBit) != 0 then [{xtag={Background}}|tags] else tags
+ if Bitwise.land(flag,SparseBit) != 0 then [{xtag={Sparse}}|tags] else tags
+
+ /**
+ * A string representation of a [Mongo.mongo_tag] value.
+ **/
+ string_of_tag(tag:Mongo.mongo_tag): string =
+ match tag with
+ | {itag={ContinueOnError}} -> "ContinueOnError"
+ | {utag={Upsert}} -> "Upsert"
+ | {utag={MultiUpdate}} -> "MultiUpdate"
+ | {qtag={TailableCursor}} -> "TailableCursor"
+ | {qtag={SlaveOk}} -> "SlaveOk"
+ | {qtag={OplogReplay}} -> "OplogReplay"
+ | {qtag={NoCursorTimeout}} -> "NoCursorTimeout"
+ | {qtag={AwaitData}} -> "AwaitData"
+ | {qtag={Exhaust}} -> "Exhaust"
+ | {qtag={Partial}} -> "Partial"
+ | {dtag={SingleRemove}} -> "SingleRemove"
+ | {rtag={CursorNotFound}} -> "CursorNotFound"
+ | {rtag={QueryFailure}} -> "QueryFailure"
+ | {rtag={ShardConfigStale}} -> "ShardConfigStale"
+ | {rtag={AwaitCapable}} -> "AwaitCapable"
+ | {xtag={Unique}} -> "Unique"
+ | {xtag={DropDups}} -> "DropDups"
+ | {xtag={Background}} -> "Background"
+ | {xtag={Sparse}} -> "Sparse"
+
+ /** String of a list of tags. **/
+ string_of_tags(tags:list(Mongo.mongo_tag)): string = List.list_to_string(string_of_tag,tags)
+
+ /** Access components of the reply value **/
+ reply_messageLength = (%% BslMongo.Mongo.reply_messageLength %% : Mongo.reply -> int)
+ reply_requestId = (%% BslMongo.Mongo.reply_requestId %% : Mongo.reply -> int)
+ reply_responseTo = (%% BslMongo.Mongo.reply_responseTo %% : Mongo.reply -> int)
+ reply_opCode = (%% BslMongo.Mongo.reply_opCode %% : Mongo.reply -> int)
+ reply_responseFlags = (%% BslMongo.Mongo.reply_responseFlags %% : Mongo.reply -> int)
+ reply_cursorID = (%% BslMongo.Mongo.reply_cursorID %% : Mongo.reply -> Mongo.cursorID)
+ reply_startingFrom = (%% BslMongo.Mongo.reply_startingFrom %% : Mongo.reply -> int)
+ reply_numberReturned = (%% BslMongo.Mongo.reply_numberReturned %% : Mongo.reply -> int)
+
+ /** Return the n'th document attached to the reply **/
+ reply_document = (%% BslMongo.Mongo.reply_document %% : Mongo.reply, int -> option(Bson.document))
+
+ /** Debug routine, export the internal representation of the reply **/
+ export_reply = (%% BslMongo.Mongo.export_reply %%: Mongo.reply -> string)
+
+ /** Null cursor value **/
+ null_cursorID = (%% BslMongo.Mongo.null_cursorID %% : void -> Mongo.cursorID)
+
+ /** Return a string representation of a cursor (it's an int64) **/
+ string_of_cursorID = (%% BslMongo.Mongo.string_of_cursorID %% : Mongo.cursorID -> string)
+
+ /** Predicate for end of query, when the cursorID is returned as zero **/
+ is_null_cursorID = (%% BslMongo.Mongo.is_null_cursorID %% : Mongo.cursorID -> bool)
+
+ /**
+ * Extract a document from a reply.
+ *
+ * Example: [reply_to_result(from, n, reply_opt)]
+ *
+ * @param from A string included in failure messages.
+ * @param n The number of the document required (base 0).
+ * @param reply_opt The optional reply (this is what most query operations return).
+ **/
+ reply_to_result(from:string, n:int, reply_opt: option(Mongo.reply)): Mongo.result =
+ match reply_opt with
+ | {some=reply} ->
+ numberReturned = reply_numberReturned(reply)
+ if numberReturned <= n
+ then failErr("{from}: insufficient number of documents {numberReturned} vs {n}")
+ else
+ (match reply_document(reply,n) with
+ | {some=doc} -> {success=doc}
+ | {none} -> failErr("{from}: no document in reply"))
+ | {none} -> failErr("{from}: no reply")
+
+}}
+
+//End of file common.opa
+
View
37 stdlib/apis/mongo/connection.opa
@@ -90,6 +90,7 @@ type Mongo.params = list(Mongo.param)
MongoConnection = {{
@private ML = MongoLog
+ @private H = Bson.Abbrevs
@private default_seeds = ([("localhost",MongoDriver.default_port)]:list(Mongo.mongo_host))
@@ -318,7 +319,7 @@ MongoConnection = {{
openfatal(name:string): Mongo.mongodb =
match open(name) with
| {success=rs} -> rs
- | {~failure} -> ML.fatal("MongoConnection.openfatal","Can't connect: {MongoDriver.string_of_failure(failure)}",-1)
+ | {~failure} -> ML.fatal("MongoConnection.openfatal","Can't connect: {MongoCommon.string_of_failure(failure)}",-1)
/**
* Clone a connection. We actually just bump the link count. On close
@@ -411,9 +412,9 @@ MongoConnection = {{
**/
err(db:Mongo.mongodb, msg:string): bool =
err = MongoCommands.getLastError(db, db.dbname)
- status = MongoDriver.is_error(err)
+ status = MongoCommon.is_error(err)
do if db.mongo.log && status
- then ML.error("MongoConnection.err({db.dbname}.{db.collection})","msg={msg} err={MongoDriver.string_of_result(err)}",void)
+ then ML.error("MongoConnection.err({db.dbname}.{db.collection})","msg={msg} err={MongoCommon.string_of_result(err)}",void)
status
/** Set the "skip" number on the given connection. **/
@@ -430,63 +431,63 @@ MongoConnection = {{
/** Set the "continueOnError" flag for all [insert] calls. **/
continueOnError(db:Mongo.mongodb): Mongo.mongodb =
- { db with insert_flags=Bitwise.lor(db.insert_flags,MongoDriver.ContinueOnErrorBit) }
+ { db with insert_flags=Bitwise.lor(db.insert_flags,MongoCommon.ContinueOnErrorBit) }
/** Set the "Upsert" flag for all [update] calls. **/
upsert(db:Mongo.mongodb): Mongo.mongodb =
- { db with update_flags=Bitwise.lor(db.update_flags,MongoDriver.UpsertBit) }
+ { db with update_flags=Bitwise.lor(db.update_flags,MongoCommon.UpsertBit) }
/** Set the "multiUpdate" flag for all [update] calls. **/
multiUpdate(db:Mongo.mongodb): Mongo.mongodb =
- { db with update_flags=Bitwise.lor(db.update_flags,MongoDriver.MultiUpdateBit) }
+ { db with update_flags=Bitwise.lor(db.update_flags,MongoCommon.MultiUpdateBit) }
/** Set the "singleRemove" flag for all [delete] calls. **/
singleRemove(db:Mongo.mongodb): Mongo.mongodb =
- { db with delete_flags=Bitwise.lor(db.delete_flags,MongoDriver.SingleRemoveBit) }
+ { db with delete_flags=Bitwise.lor(db.delete_flags,MongoCommon.SingleRemoveBit) }
/** Set the "tailableCursor" flag for all [query] calls. **/
tailableCursor(db:Mongo.mongodb): Mongo.mongodb =
- { db with query_flags=Bitwise.lor(db.query_flags,MongoDriver.TailableCursorBit) }
+ { db with query_flags=Bitwise.lor(db.query_flags,MongoCommon.TailableCursorBit) }
/** Set the "slaveOk" flag for all [query] calls. **/
slaveOk(db:Mongo.mongodb): Mongo.mongodb =
- { db with query_flags=Bitwise.lor(db.query_flags,MongoDriver.SlaveOkBit) }
+ { db with query_flags=Bitwise.lor(db.query_flags,MongoCommon.SlaveOkBit) }
/** Set the "oplogReplay" flag for all [query] calls. **/
oplogReplay(db:Mongo.mongodb): Mongo.mongodb =
- { db with query_flags=Bitwise.lor(db.query_flags,MongoDriver.OplogReplayBit) }
+ { db with query_flags=Bitwise.lor(db.query_flags,MongoCommon.OplogReplayBit) }
/** Set the "noCursorTimeout" flag for all [query] calls. **/
noCursorTimeout(db:Mongo.mongodb): Mongo.mongodb =
- { db with query_flags=Bitwise.lor(db.query_flags,MongoDriver.NoCursorTimeoutBit) }
+ { db with query_flags=Bitwise.lor(db.query_flags,MongoCommon.NoCursorTimeoutBit) }
/** Set the "awaitData" flag for all [query] calls. **/
awaitData(db:Mongo.mongodb): Mongo.mongodb =
- { db with query_flags=Bitwise.lor(db.query_flags,MongoDriver.AwaitDataBit) }
+ { db with query_flags=Bitwise.lor(db.query_flags,MongoCommon.AwaitDataBit) }
/** Set the "exhaust" flag for all [query] calls. **/
exhaust(db:Mongo.mongodb): Mongo.mongodb =
- { db with query_flags=Bitwise.lor(db.query_flags,MongoDriver.ExhaustBit) }
+ { db with query_flags=Bitwise.lor(db.query_flags,MongoCommon.ExhaustBit) }
/** Set the "partial" flag for all [query] calls. **/
partial(db:Mongo.mongodb): Mongo.mongodb =
- { db with query_flags=Bitwise.lor(db.query_flags,MongoDriver.PartialBit) }
+ { db with query_flags=Bitwise.lor(db.query_flags,MongoCommon.PartialBit) }
/** Set the "unique" flag for all [index] calls. **/
unique(db:Mongo.mongodb): Mongo.mongodb =
- { db with index_flags=Bitwise.lor(db.index_flags,MongoDriver.UniqueBit) }
+ { db with index_flags=Bitwise.lor(db.index_flags,MongoCommon.UniqueBit) }
/** Set the "dropdups" flag for all [index] calls. **/
dropDups(db:Mongo.mongodb): Mongo.mongodb =
- { db with index_flags=Bitwise.lor(db.index_flags,MongoDriver.DropDupsBit) }
+ { db with index_flags=Bitwise.lor(db.index_flags,MongoCommon.DropDupsBit) }
/** Set the "background" flag for all [index] calls. **/
background(db:Mongo.mongodb): Mongo.mongodb =
- { db with index_flags=Bitwise.lor(db.index_flags,MongoDriver.BackgroundBit) }
+ { db with index_flags=Bitwise.lor(db.index_flags,MongoCommon.BackgroundBit) }
/** Set the "sparse" flag for all [index] calls. **/
sparse(db:Mongo.mongodb): Mongo.mongodb =
- { db with index_flags=Bitwise.lor(db.index_flags,MongoDriver.SparseBit) }
+ { db with index_flags=Bitwise.lor(db.index_flags,MongoCommon.SparseBit) }
/** Insert document into the defined database with inbuilt flags **/
insert(m:Mongo.mongodb, documents:Bson.document): bool =
View
26 stdlib/apis/mongo/cursor.opa
@@ -92,7 +92,7 @@ MongoCursor = {{
fields = {none};
orderby = {none};
query_sent = {false};
- cid = MongoDriver.null_cursorID(void);
+ cid = MongoCommon.null_cursorID(void);
reply = {none};
returned = 0;
current = 0;
@@ -113,7 +113,7 @@ MongoCursor = {{
set_fields(c:Mongo.cursor, fields:option(Bson.document)): Mongo.cursor = { c with ~fields }
set_orderby(c:Mongo.cursor, orderby:option(Bson.document)): Mongo.cursor = { c with ~orderby }
- tailable(c:Mongo.cursor): Mongo.cursor = { c with flags=Bitwise.lor(c.flags, MongoDriver.TailableCursorBit) }
+ tailable(c:Mongo.cursor): Mongo.cursor = { c with flags=Bitwise.lor(c.flags, MongoCommon.TailableCursorBit) }
@private
set_error(c:Mongo.cursor, error:Mongo.failure): Mongo.cursor = { c with ~error; killed={true} }
@@ -122,12 +122,12 @@ MongoCursor = {{
reply(c:Mongo.cursor, reply_opt:option(Mongo.reply), name:string, query_sent:bool): Mongo.cursor =
match reply_opt with
| {some=reply} ->
- cursorID = MongoDriver.reply_cursorID(reply)
+ cursorID = MongoCommon.reply_cursorID(reply)
{ c with
cid = cursorID;
reply = {some=reply};
~query_sent;
- returned = MongoDriver.reply_numberReturned(reply);
+ returned = MongoCommon.reply_numberReturned(reply);
current = 0;
doc = error_document("Uninitialised document",-1);
}
@@ -157,7 +157,7 @@ MongoCursor = {{
* Perform an OP_GETMORE call, if a valid cursor ID exists in the cursor.
**/
get_more(c:Mongo.cursor): Mongo.cursor =
- if not(c.killed) && not(MongoDriver.is_null_cursorID(c.cid))
+ if not(c.killed) && not(MongoCommon.is_null_cursorID(c.cid))
then reply(c,MongoDriver.get_more(c.mongo, c.ns, c.limit, c.cid),"get_more",c.query_sent)
else set_error(c,{Error="MongoCursor.get_more: attempt to get more with dead cursor"})
@@ -173,7 +173,7 @@ MongoCursor = {{
else
match c.reply with
| {some=reply} ->
- (match MongoDriver.reply_document(reply,n) with
+ (match MongoCommon.reply_document(reply,n) with
| {some=doc} -> {success=doc}
| {none} -> {failure={Error="MongoCursor.document: no document"}})
| {none} -> {failure={Error="MongoCursor.document: no reply"}}
@@ -187,7 +187,7 @@ MongoCursor = {{
rec aux(n:int) =
if n >= c.returned
then []
- else (match MongoDriver.reply_document(reply,n) with
+ else (match MongoCommon.reply_document(reply,n) with
| {some=doc} -> (doc +> (aux(n+1)))
| {none} -> (aux(n+1)))
{success=aux(0)}
@@ -199,7 +199,7 @@ MongoCursor = {{
error={Error="reset"};
doc=error_document("Dead cursor",-1);
killed={true};
- cid=MongoDriver.null_cursorID(void)
+ cid=MongoCommon.null_cursorID(void)
}
/**
@@ -209,7 +209,7 @@ MongoCursor = {{
* ID still exists in the cursor.
**/
reset(c:Mongo.cursor): Mongo.cursor =
- if not(MongoDriver.is_null_cursorID(c.cid))
+ if not(MongoCommon.is_null_cursorID(c.cid))
then
if MongoDriver.kill_cursors(c.mongo, [c.cid])
then destroy(c)
@@ -240,20 +240,20 @@ MongoCursor = {{
// TODO: tailable cursors
if c.returned <= 0
then
- tags = MongoDriver.reply_tags(MongoDriver.reply_responseFlags(Option.get(c.reply)))
+ tags = MongoCommon.reply_tags(MongoCommon.reply_responseFlags(Option.get(c.reply)))
tags = List.filter((tag -> tag != {rtag={AwaitCapable}}),tags)
if List.is_empty(tags)
then set_error(c,{NotFound})
- else set_error(c,{Error="MongoCursor.next: no data returned tags={MongoDriver.string_of_tags(tags)}"})
+ else set_error(c,{Error="MongoCursor.next: no data returned tags={MongoCommon.string_of_tags(tags)}"})
else
if c.current >= c.returned
then
- if MongoDriver.is_null_cursorID(c.cid)
+ if MongoCommon.is_null_cursorID(c.cid)
then set_error({c with doc = error_document("Read past end of data",-1)},{Error="MongoCursor.next: end of data"})
else next(get_more(c))
else {c with
current=c.current+1;
- doc=(match MongoDriver.reply_document(Option.get(c.reply),c.current) with
+ doc=(match MongoCommon.reply_document(Option.get(c.reply),c.current) with
| {some=doc} -> doc
| {none} -> error_document("Reply parse error",-1))}
View
483 stdlib/apis/mongo/mongo.opa
@@ -49,12 +49,6 @@ type Mongo.cursorID = external
type Mongo.mailbox = external
type Mongo.reply = external
-/** Type for a connection, host name and port **/
-type Mongo.mongo_host = (string, int)
-
-/** Type of concurrency handling **/
-type Mongo.concurrency = {pool} / {cell} / {singlethreaded}
-
/**
* Main connection type.
* Stores the socket connection plus other parameters such as
@@ -65,7 +59,7 @@ type Mongo.concurrency = {pool} / {cell} / {singlethreaded}
type Mongo.db = {
concurrency : Mongo.concurrency;
conn : Mutable.t(option(Socket.connection));
- conncell : Cell.cell(Mongo.sr,Mongo.srr);
+ conncell : option(Cell.cell(Mongo.sr,Mongo.srr));
pool : Mutable.t(option(SocketPool.t));
pool_max : int;
close_socket : bool;
@@ -85,383 +79,32 @@ type Mongo.db = {
}
/** Outgoing Cell messages **/
+@private
type Mongo.sr =
{send:(Mongo.db,Mongo.mongo_buf,string)} // Send and forget
/ {sendrecv:(Mongo.db,Mongo.mongo_buf,string)} // Send and expect reply
/ {senderror:(Mongo.db,Mongo.mongo_buf,string,string)} // Send and call getlasterror
/ {stop} // Stop the cell
/** Incoming Cell messages **/
+@private
type Mongo.srr =
{sendresult:bool}
/ {sndrcvresult:option(Mongo.reply)}
/ {snderrresult:option(Mongo.reply)}
/ {stopresult}
/ {reconnect}
-/**
- * Mongo driver failure status.
- * Either a failure document returned by the MongoDB server,
- * an error string generated by the driver or [Incomplete]
- * which signals that expected fields were missing from a reply.
- **/
-type Mongo.failure =
- {OK}
- / {Error : string}
- / {DocError : Bson.document}
- / {Incomplete}
- / {NotFound}
-
-/**
- * Mongo success status, just a document.
- **/
-type Mongo.success = Bson.document
-type Mongo.successes = list(Bson.document)
-
-/**
- * A Mongo driver result value is either a valid document
- * or a [Mongo.failure] value.
- **/
-type Mongo.result = outcome(Mongo.success, Mongo.failure)
-type Mongo.results = outcome(Mongo.successes, Mongo.failure)
-
-/**
- * A Mongo error is either an error value which is an OPA
- * value containing the error information from a [Bson.document]
- * or a [Mongo.failure] value.
- **/
-type Mongo.error = outcome(Bson.error, Mongo.failure)
-
-/* Flag tags */
-
-/** OP_INSERT **/
-type Mongo.insert_tag =
- {ContinueOnError}
-
-/** OP_UPDATE **/
-type Mongo.update_tag =
- {Upsert} /
- {MultiUpdate}
-
-/** OP_QUERY **/
-type Mongo.query_tag =
- {TailableCursor} /
- {SlaveOk} /
- {OplogReplay} /
- {NoCursorTimeout} /
- {AwaitData} /
- {Exhaust} /
- {Partial}
-
-/** OP_DELETE **/
-type Mongo.delete_tag =
- {SingleRemove}
-
-/** OP_REPLY **/
-type Mongo.reply_tag =
- {CursorNotFound} /
- {QueryFailure} /
- {ShardConfigStale} /
- {AwaitCapable}
-
-/** Tags for indices **/
-type Mongo.index_tag =
- {Unique} /
- {DropDups} /
- {Background} /
- {Sparse}
-
-/**
- * We wrap the tags so that we can tell if it is an insert tag,
- * query tag etc. We don't want to send SingleRemove to an update.
- **/
-type Mongo.mongo_tag =
- {itag:Mongo.insert_tag} /
- {utag:Mongo.update_tag} /
- {qtag:Mongo.query_tag} /
- {dtag:Mongo.delete_tag} /
- {rtag:Mongo.reply_tag} /
- {xtag:Mongo.index_tag}
-
-@server_private
+@private
MongoDriver = {{
@private ML = MongoLog
@private H = Bson.Abbrevs
+ @private C = MongoCommon
/** The MongoDB default port number **/
default_port = 27017
- /**
- * Some routines for manipulating outcomes from Mongo commands.
- **/
-
- /** Generate a driver error message outcome **/
- failErr(msg) = {failure={Error=msg}}
-
- /**
- * Some code to handle outcomes.
- **/
- map_outcome(outcome:outcome('s,'f), sfn:'s->'t, ffn:'f->'g): outcome('t,'g) =
- match outcome with
- | {~success} -> {success=sfn(success)}
- | {~failure} -> {failure=ffn(failure)}
-
- map_success(outcome, sfn) = map_outcome(outcome, sfn, (f -> f))
- map_failure(outcome, ffn) = map_outcome(outcome, (s -> s), ffn)
-
- outcome_map(outcome:outcome('s,'f), sfn:'s->'r, ffn:'f->'r): 'r =
- match outcome with
- | {~success} -> sfn(success)
- | {~failure} -> ffn(failure)
-
- string_of_outcome = (outcome_map:outcome('s,'f), ('s->string), ('f->string) -> string)
-
- /** Turn a result into a [Mongo.error] value **/
- error_of_result(result:Mongo.result): Mongo.error = map_success(result, Bson.error_of_document)
-
- /** Make a readable string out of a [Mongo.error] value **/
- string_of_error(error:Mongo.error): string = outcome_map(error, Bson.string_of_error, string_of_failure)
-
- /** String representation of a [Mongo.failure] value **/
- string_of_failure(failure:Mongo.failure): string =
- match failure with
- | {Error=str} -> str
- | {DocError=doc} -> Bson.string_of_doc_error(doc)
- | {OK} -> "Ok"
- | {Incomplete} -> "Incomplete"
- | {NotFound} -> "NotFound"
-
- /** Make an error report string out of a [Mongo.result] value, will print "<ok>" if no error. **/
- string_of_result(result:Mongo.result): string = outcome_map(result, Bson.string_of_doc_error, string_of_failure)
-
- /** Same for a list of results. **/
- string_of_results(results:Mongo.results): string =
- outcome_map(results, (l -> List.list_to_string(Bson.string_of_doc_error,l)), string_of_failure)
-
- /** Similar to [string_of_result] but the success value is user-defined. **/
- string_of_value_or_failure(result:outcome('a,Mongo.failure), success_to_str:'a->string): string =
- string_of_outcome(result, success_to_str, (failure -> "\{failure={string_of_failure(failure)}\}"))
-
- /** Either pretty-print the document or generate a failure string. **/
- pretty_of_result(result:Mongo.result): string = string_of_value_or_failure(result,Bson.to_pretty)
-
- /** Same as [pretty_of_result] but for a list of results. **/
- pretty_of_results(results:Mongo.results): string =
- string_of_value_or_failure(results,(l -> List.list_to_string(Bson.to_pretty,l)))
-
- /** Predicate for error status of a [Mongo.result] value. **/
- is_error(result:Mongo.result): bool = outcome_map(result, Bson.is_error, (_ -> true))
-
- /** Predicate for error status of a [Mongo.result] value. **/
- isError(error:Mongo.error): bool = outcome_map(error, Bson.isError, (_ -> true))
-
- /**
- * Validate a BSON document by turning it into a [Mongo.result] value.
- * If [ok] is non-zero or there is an [errmsg] value then turn it into a [Mongo.failure] value.
- **/
- check_ok(bson:Bson.document): Mongo.result =
- match Bson.find_int(bson,"ok") with
- | {some=ok} ->
- if ok == 1
- then {success=bson}
- else
- (match Bson.find_string(bson,"errmsg") with
- | {some=errmsg} -> failErr(errmsg)
- | _ -> failErr("ok:{ok}"))
- | _ -> {success=bson}
-
- /**
- * Outcome-wrapped versions of Bson.find_xxx etc.
- **/
- @private
- result_(result:Mongo.result,key:string,find:(Bson.document, string -> option('a))): option('a) =
- match result with
- | {success=doc} -> find(doc,key)
- | {failure=_} -> {none}
-
- result_bool(result:Mongo.result,key:string): option(bool) = result_(result, key, Bson.find_bool)
- result_int(result:Mongo.result,key:string): option(int) = result_(result, key, Bson.find_int)
- result_float(result:Mongo.result,key:string): option(float) = result_(result, key, Bson.find_float)
- result_string(result:Mongo.result,key:string): option(string) = result_(result, key, Bson.find_string)
- result_doc(result:Mongo.result,key:string): option(Bson.document) = result_(result, key, Bson.find_doc)
-
- /**
- * Same as outcome-wrapped versions but allowing dot notation.
- **/
- @private
- dotresult_(result:Mongo.result,key:string,find:(Bson.document, string -> option('a))): option('a) =
- match result with
- | {success=doc} -> Bson.find_dot(doc,key,find)
- | {failure=_} -> {none}
-
- dotresult_bool(result:Mongo.result,key:string): option(bool) = dotresult_(result, key, Bson.find_bool)
- dotresult_int(result:Mongo.result,key:string): option(int) = dotresult_(result, key, Bson.find_int)
- dotresult_float(result:Mongo.result,key:string): option(float) = dotresult_(result, key, Bson.find_float)
- dotresult_string(result:Mongo.result,key:string): option(string) = dotresult_(result, key, Bson.find_string)
- dotresult_doc(result:Mongo.result,key:string): option(Bson.document) = dotresult_(result, key, Bson.find_doc)
-
- /**
- * If a result is success then return an OPA value from the
- * document using [Bson.doc2opa]. Must be cast at point of call.
- **/
- result_to_opa(result:Mongo.result): option('a) =
- match result with
- | {success=doc} -> (Bson.doc2opa(doc):option('a))
- | {failure=_} -> {none}
-
- /**
- * Same as [result_to_opa] but returning an outcome instead of an option.
- **/
- resultToOpa(result:Mongo.result): outcome('a,Mongo.failure) =
- match result with
- | {success=doc} ->
- (match (Bson.doc2opa(doc):option('a)) with
- | {some=a} -> {success=a}
- | {none} -> failErr("MongoDriver.resultToOpa: document conversion failure"))
- | {~failure} -> {~failure}
-
- /** Flag bitmasks **/
-
- /* OP_INSERT */
- ContinueOnErrorBit = 0x00000001
-
- /* OP_UPDATE */
- UpsertBit = 0x00000001
- MultiUpdateBit = 0x00000002
-
- /* OP_QUERY */
- TailableCursorBit = 0x00000002
- SlaveOkBit = 0x00000004
- OplogReplayBit = 0x00000008
- NoCursorTimeoutBit = 0x00000010
- AwaitDataBit = 0x00000020
- ExhaustBit = 0x00000040
- PartialBit = 0x00000080
-
- /* OP_DELETE */
- SingleRemoveBit = 0x00000001
-
- /* OP_REPLY */
- CursorNotFoundBit = 0x00000001
- QueryFailureBit = 0x00000002
- ShardConfigStaleBit = 0x00000004
- AwaitCapableBit = 0x00000008
-
- /* Flags used by the index routines. */
- UniqueBit = 0x00000001
- DropDupsBit = 0x00000002
- BackgroundBit = 0x00000004
- SparseBit = 0x00000008
-
- /**
- * [flag_of_tag]: Turn a list of tags into a bit-wise flag suitable
- * for sending to MongoDB. We have an extra layer of types to allow
- * forcing of tags to belong to a particular operation.
- **/
- flag_of_tag(tag:Mongo.mongo_tag): int =
- match tag with
- /* OP_INSERT */
- | {itag={ContinueOnError}} -> ContinueOnErrorBit
-
- /* OP_UPDATE */
- | {utag={Upsert}} -> UpsertBit
- | {utag={MultiUpdate}} -> MultiUpdateBit
-
- /* OP_QUERY */
- | {qtag={TailableCursor}} -> TailableCursorBit
- | {qtag={SlaveOk}} -> SlaveOkBit
- | {qtag={OplogReplay}} -> OplogReplayBit
- | {qtag={NoCursorTimeout}} -> NoCursorTimeoutBit
- | {qtag={AwaitData}} -> AwaitDataBit
- | {qtag={Exhaust}} -> ExhaustBit
- | {qtag={Partial}} -> PartialBit
-
- /* OP_DELETE */
- | {dtag={SingleRemove}} -> SingleRemoveBit
-
- /* OP_REPLY */
- | {rtag={CursorNotFound}} -> CursorNotFoundBit
- | {rtag={QueryFailure}} -> QueryFailureBit
- | {rtag={ShardConfigStale}} -> ShardConfigStaleBit
- | {rtag={AwaitCapable}} -> AwaitCapableBit
-
- /* Index tags */
- | {xtag={Unique}} -> UniqueBit
- | {xtag={DropDups}} -> DropDupsBit
- | {xtag={Background}} -> BackgroundBit
- | {xtag={Sparse}} -> SparseBit
-
- /**
- * Turn a list of tags into a single MongoDB-compatible int.
- **/
- flags(tags:list(Mongo.mongo_tag)): int =
- List.fold_left((flag, tag -> Bitwise.land(flag,flag_of_tag(tag))),0,tags)
-
- /**
- * Extract the tags from a given bit-wise flag. These are specific
- * to each operation, you need to know which operation the flag was for/from
- * before you can give meaning to the bits.
- **/
- insert_tags(flag:int): list(Mongo.mongo_tag) =
- if Bitwise.land(flag,ContinueOnErrorBit) != 0 then [{itag={ContinueOnError}}] else []
-
- update_tags(flag:int): list(Mongo.mongo_tag) =
- tags = if Bitwise.land(flag,UpsertBit) != 0 then [{utag={Upsert}}] else []
- if Bitwise.land(flag,MultiUpdateBit) != 0 then [{utag={MultiUpdate}}|tags] else tags
-
- query_tags(flag:int): list(Mongo.mongo_tag) =
- tags = if Bitwise.land(flag,TailableCursorBit) != 0 then [{qtag={TailableCursor}}] else []
- tags = if Bitwise.land(flag,SlaveOkBit) != 0 then [{qtag={SlaveOk}}|tags] else tags
- tags = if Bitwise.land(flag,OplogReplayBit) != 0 then [{qtag={OplogReplay}}|tags] else tags
- tags = if Bitwise.land(flag,NoCursorTimeoutBit) != 0 then [{qtag={NoCursorTimeout}}|tags] else tags
- tags = if Bitwise.land(flag,AwaitDataBit) != 0 then [{qtag={AwaitData}}|tags] else tags
- tags = if Bitwise.land(flag,ExhaustBit) != 0 then [{qtag={Exhaust}}|tags] else tags
- if Bitwise.land(flag,PartialBit) != 0 then [{qtag={Partial}}|tags] else tags
-
- delete_tags(flag:int): list(Mongo.mongo_tag) =
- if Bitwise.land(flag,SingleRemoveBit) != 0 then [{dtag={SingleRemove}}] else []
-
- reply_tags(flag:int): list(Mongo.mongo_tag) =
- tags = if Bitwise.land(flag,CursorNotFoundBit) != 0 then [{rtag={CursorNotFound}}] else []
- tags = if Bitwise.land(flag,QueryFailureBit) != 0 then [{rtag={QueryFailure}}|tags] else tags
- tags = if Bitwise.land(flag,ShardConfigStaleBit) != 0 then [{rtag={ShardConfigStale}}|tags] else tags
- if Bitwise.land(flag,AwaitCapableBit) != 0 then [{rtag={AwaitCapable}}|tags] else tags
-
- index_tags(flag:int): list(Mongo.mongo_tag) =
- tags = if Bitwise.land(flag,UniqueBit) != 0 then [{xtag={Unique}}] else []
- tags = if Bitwise.land(flag,DropDupsBit) != 0 then [{xtag={DropDups}}|tags] else tags
- tags = if Bitwise.land(flag,BackgroundBit) != 0 then [{xtag={Background}}|tags] else tags
- if Bitwise.land(flag,SparseBit) != 0 then [{xtag={Sparse}}|tags] else tags
-
- /**
- * A string representation of a [Mongo.mongo_tag] value.
- **/
- string_of_tag(tag:Mongo.mongo_tag): string =
- match tag with
- | {itag={ContinueOnError}} -> "ContinueOnError"
- | {utag={Upsert}} -> "Upsert"
- | {utag={MultiUpdate}} -> "MultiUpdate"
- | {qtag={TailableCursor}} -> "TailableCursor"
- | {qtag={SlaveOk}} -> "SlaveOk"
- | {qtag={OplogReplay}} -> "OplogReplay"
- | {qtag={NoCursorTimeout}} -> "NoCursorTimeout"
- | {qtag={AwaitData}} -> "AwaitData"
- | {qtag={Exhaust}} -> "Exhaust"
- | {qtag={Partial}} -> "Partial"
- | {dtag={SingleRemove}} -> "SingleRemove"
- | {rtag={CursorNotFound}} -> "CursorNotFound"
- | {rtag={QueryFailure}} -> "QueryFailure"
- | {rtag={ShardConfigStale}} -> "ShardConfigStale"
- | {rtag={AwaitCapable}} -> "AwaitCapable"
- | {xtag={Unique}} -> "Unique"
- | {xtag={DropDups}} -> "DropDups"
- | {xtag={Background}} -> "Background"
- | {xtag={Sparse}} -> "Sparse"
-
- /** String of a list of tags. **/
- string_of_tags(tags:list(Mongo.mongo_tag)): string = List.list_to_string(string_of_tag,tags)
-
/* Allocate new buffer of given size */
@private create_ = (%% BslMongo.Mongo.create %%: int -> Mongo.mongo_buf)
@@ -570,7 +213,7 @@ MongoDriver = {{
do if m.log then ML.info("MongoDriver.reconnect({from})","reconnected",void)
ret(true)
| {~failure} ->
- do if m.log then ML.info("MongoDriver.reconnect({from})","failure={string_of_failure(failure)}",void)
+ do if m.log then ML.info("MongoDriver.reconnect({from})","failure={C.string_of_failure(failure)}",void)
do Scheduler.wait(m.reconnect_wait)
aux(attempts+1))
aux(0)
@@ -594,8 +237,8 @@ MongoDriver = {{
do m.socket_link.set(m.socket_link.get()+1)
{success=conn}
| {failure=str} ->
- failErr("MongoDriver.open_socket: Got exception {str}"))
- | {none} -> failErr("MongoDriver.open_socket: no primary"))
+ C.failErr("MongoDriver.open_socket: Got exception {str}"))
+ | {none} -> C.failErr("MongoDriver.open_socket: no primary"))
@private
close_socket(m:Mongo.db): void =
@@ -627,7 +270,7 @@ MongoDriver = {{
do close_socket(m)
false)
| {~failure} ->
- ML.error("MongoDriver.send({name})","{string_of_failure(failure)}",false)
+ ML.error("MongoDriver.send({name})","{C.string_of_failure(failure)}",false)
@private
send_no_reply(m,mbuf,name): bool = send_no_reply_(m,mbuf,name,false)
@@ -642,7 +285,7 @@ MongoDriver = {{
mailbox = new_mailbox_(m.bufsize)
(match read_mongo_(conn,m.comms_timeout,mailbox) with
| {success=reply} ->
- rrt = reply_responseTo(reply)
+ rrt = C.reply_responseTo(reply)
do reset_mailbox_(mailbox)
do free_(mbuf)
do if m.log then ML.debug("MongoDriver.receive({name})","\n{string_of_message_reply(reply)}",void)
@@ -659,7 +302,7 @@ MongoDriver = {{
do close_socket(m)
{none}
| {~failure} ->
- ML.error("MongoDriver.receive({name})","{string_of_failure(failure)}",{none})
+ ML.error("MongoDriver.receive({name})","{C.string_of_failure(failure)}",{none})
@private
send_with_error(m,mbuf,name,ns): option(Mongo.reply) =
@@ -677,7 +320,7 @@ MongoDriver = {{
mailbox = new_mailbox_(m.bufsize)
(match read_mongo_(conn,m.comms_timeout,mailbox) with
| {success=reply} ->
- rrt = reply_responseTo(reply)
+ rrt = C.reply_responseTo(reply)
do reset_mailbox_(mailbox)
do free_(mbuf)
do if m.log then ML.debug("MongoDriver.send_with_error({name})","\n{string_of_message_reply(reply)}",void)
@@ -698,7 +341,7 @@ MongoDriver = {{
do close_socket(m)
{none}
| {~failure} ->
- ML.error("MongoDriver.send_with_error({name})","{string_of_failure(failure)}",{none})
+ ML.error("MongoDriver.send_with_error({name})","{C.string_of_failure(failure)}",{none})
@private
@@ -733,7 +376,7 @@ MongoDriver = {{
do m.conn.set({none})
result
| {~failure} ->
- do if m.log then ML.error("MongoDriver.srpool","Can't get pool {string_of_failure(failure)}",void)
+ do if m.log then ML.error("MongoDriver.srpool","Can't get pool {C.string_of_failure(failure)}",void)
{reconnect})
| {none} ->
do if m.log then ML.error("MongoDriver.srpool","No pool",void)
@@ -758,7 +401,7 @@ MongoDriver = {{
srr =
(match m.concurrency with
| {pool} -> srpool(m,{send=((m,mbuf,name))})
- | {cell} -> (Cell.call(m.conncell,({send=((m,mbuf,name))}:Mongo.sr)):Mongo.srr)
+ | {cell} -> (Cell.call(Option.get(m.conncell),({send=((m,mbuf,name))}:Mongo.sr)):Mongo.srr)
| {singlethreaded} -> sr_snr(m,mbuf,name))
match srr with
| {reconnect} -> recon()
@@ -776,7 +419,7 @@ MongoDriver = {{
srr =
(match m.concurrency with
| {pool} -> srpool(m,{sendrecv=((m,mbuf,name))})
- | {cell} -> (Cell.call(m.conncell,({sendrecv=((m,mbuf,name))}:Mongo.sr)):Mongo.srr)
+ | {cell} -> (Cell.call(Option.get(m.conncell),({sendrecv=((m,mbuf,name))}:Mongo.sr)):Mongo.srr)
| {singlethreaded} -> sr_swr(m,mbuf,name))
match srr with
| {reconnect} -> recon()
@@ -794,7 +437,7 @@ MongoDriver = {{
srr =
(match m.concurrency with
| {pool} -> srpool(m,{senderror=((m,mbuf,name,ns))})
- | {cell} -> (Cell.call(m.conncell,({senderror=((m,mbuf,name,ns))}:Mongo.sr)):Mongo.srr)
+ | {cell} -> (Cell.call(Option.get(m.conncell),({senderror=((m,mbuf,name,ns))}:Mongo.sr)):Mongo.srr)
| {singlethreaded} -> sr_swe(m,mbuf,name,ns))
match srr with
| {reconnect} -> recon()
@@ -803,22 +446,23 @@ MongoDriver = {{
@private
stop(m) =
- match Cell.call(m.conncell,({stop}:Mongo.sr)):Mongo.srr with
+ match Cell.call(Option.get(m.conncell),({stop}:Mongo.sr)):Mongo.srr with
| {stopresult} -> void
| _ -> @fail
/**
* Due to the number of parameters we have a separate [init] routine
* from [connect]. This feature is mostly used by replica set connection
* and re-connection.
- * Example: [init(bufsize, log)]
+ * Example: [init(bufsize, concurrency, pool_max, close_socket, log)]
* @param bufsize A hint to the driver for the initial buffer size.
* @param log Whether to enable logging for the driver.
**/
init(bufsize:int, concurrency:Mongo.concurrency, pool_max:int, close_socket:bool, log:bool): Mongo.db =
conn = Mutable.make({none})
{ ~concurrency;
- ~conn; conncell=(Cell.make(conn, sr):Cell.cell(Mongo.sr,Mongo.srr));
+ ~conn;
+ conncell=if concurrency == {cell} then {some=(Cell.make(conn, sr):Cell.cell(Mongo.sr,Mongo.srr))} else {none};
pool=Mutable.make({none}); ~pool_max;
~bufsize; ~close_socket; ~log; socket_link=Mutable.make(0);
seeds=[]; hosts=Mutable.make([]); name="";
@@ -857,7 +501,7 @@ MongoDriver = {{
do m.primary.set({some=(addr,port)})
{success=m})
else
- failErr("MongoDriver.connect: Pool concurrency requires pool_max > 1 (actually: {m.pool_max})")
+ C.failErr("MongoDriver.connect: Pool concurrency requires pool_max > 1 (actually: {m.pool_max})")
| {cell} | {singlethreaded} ->
if m.close_socket
then
@@ -869,7 +513,7 @@ MongoDriver = {{
do m.conn.set({some=conn})
do m.primary.set({some=(addr,port)})
{success=m}
- | {failure=str} -> failErr("Got exception {str}")
+ | {failure=str} -> C.failErr("Got exception {str}")
/**
* Convenience function, initialise and connect at the same time.
@@ -886,17 +530,20 @@ MongoDriver = {{
* have to be careful how we access the connection value.
*/
close(m:Mongo.db): Mongo.db =
- do match m.pool.get() with
- | {some=pool} ->
- do if m.log then ML.info("MongoDriver.close","{m.primary.get()}",void)
- SocketPool.stop(pool)
- | {none} -> void
- do match m.conn.get() with
- | {some=conn} ->
- do if m.log then ML.info("MongoDriver.close","{m.primary.get()}",void)
- do stop(m)
- Socket.close(conn)
- | {none} -> void
+ do match m.concurrency with
+ | {pool} ->
+ (match m.pool.get() with
+ | {some=pool} ->
+ do if m.log then ML.info("MongoDriver.close","{m.primary.get()}",void)
+ SocketPool.stop(pool)
+ | {none} -> void)
+ | {cell} | {singlethreaded} ->
+ (match m.conn.get() with
+ | {some=conn} ->
+ do if m.log then ML.info("MongoDriver.close","{m.primary.get()}",void)
+ do if m.concurrency != {singlethreaded} then stop(m)
+ Socket.close(conn)
+ | {none} -> void)
do m.conn.set({none})
do m.primary.set({none})
m
@@ -942,7 +589,7 @@ MongoDriver = {{
* [insertf]: same as [insert] but using tags instead of bit-wise flags.
**/
insertf(m:Mongo.db, tags:list(Mongo.insert_tag), ns:string, documents:Bson.document): bool =
- flags = flags(List.map((t -> {itag=t}),tags))
+ flags = C.flags(List.map((t -> {itag=t}),tags))
insert(m,flags,ns,documents)
/**
@@ -964,7 +611,7 @@ MongoDriver = {{
* [insert_batchf]: same as [insert_batch] but using tags instead of bit-wise flags.
**/
insert_batchf(m:Mongo.db, tags:list(Mongo.insert_tag), ns:string, documents:list(Bson.document)): bool =
- flags = flags(List.map((t -> {itag=t}),tags))
+ flags = C.flags(List.map((t -> {itag=t}),tags))
insert_batch(m,flags,ns,documents)
/**
@@ -988,7 +635,7 @@ MongoDriver = {{
* [updatef]: same as [update] but using tags instead of bit-wise flags.
**/
updatef(m:Mongo.db, tags:list(Mongo.update_tag), ns:string, selector:Bson.document, update_doc:Bson.document): bool =
- flags = flags(List.map((t -> {utag=t}),tags))
+ flags = C.flags(List.map((t -> {utag=t}),tags))
update(m,flags,ns,selector,update_doc)
/**
@@ -1008,7 +655,7 @@ MongoDriver = {{
**/
queryf(m:Mongo.db, tags:list(Mongo.query_tag), ns:string, numberToSkip:int, numberToReturn:int,
query_doc:Bson.document, returnFieldSelector_opt:option(Bson.document)): option(Mongo.reply) =
- flags = flags(List.map((t -> {qtag=t}),tags))
+ flags = C.flags(List.map((t -> {qtag=t}),tags))
query(m,flags,ns,numberToSkip,numberToReturn,query_doc,returnFieldSelector_opt)
/**
@@ -1042,7 +689,7 @@ MongoDriver = {{
* [deletef]: same as [delete] but using tags instead of bit-wise flags.
**/
deletef(m:Mongo.db, tags:list(Mongo.delete_tag), ns:string, selector:Bson.document): bool =
- flags = flags(List.map((t -> {dtag=t}),tags))
+ flags = C.flags(List.map((t -> {dtag=t}),tags))
delete(m,flags,ns,selector)
/**
@@ -1075,50 +722,12 @@ MongoDriver = {{
do msg_(mbuf,msg)
snderr(m,mbuf,"msg",dbname)
- /** Access components of the reply value **/
- reply_messageLength = (%% BslMongo.Mongo.reply_messageLength %% : Mongo.reply -> int)
- reply_requestId = (%% BslMongo.Mongo.reply_requestId %% : Mongo.reply -> int)
- reply_responseTo = (%% BslMongo.Mongo.reply_responseTo %% : Mongo.reply -> int)
- reply_opCode = (%% BslMongo.Mongo.reply_opCode %% : Mongo.reply -> int)
- reply_responseFlags = (%% BslMongo.Mongo.reply_responseFlags %% : Mongo.reply -> int)
- reply_cursorID = (%% BslMongo.Mongo.reply_cursorID %% : Mongo.reply -> Mongo.cursorID)
- reply_startingFrom = (%% BslMongo.Mongo.reply_startingFrom %% : Mongo.reply -> int)
- reply_numberReturned = (%% BslMongo.Mongo.reply_numberReturned %% : Mongo.reply -> int)
-
- /** Return the n'th document attached to the reply **/
- reply_document = (%% BslMongo.Mongo.reply_document %% : Mongo.reply, int -> option(Bson.document))
-
- /** Debug routine, export the internal representation of the reply **/
- export_reply = (%% BslMongo.Mongo.export_reply %%: Mongo.reply -> string)
-
- /** Null cursor value **/
- null_cursorID = (%% BslMongo.Mongo.null_cursorID %% : void -> Mongo.cursorID)
-
- /** Return a string representation of a cursor (it's an int64) **/
- string_of_cursorID = (%% BslMongo.Mongo.string_of_cursorID %% : Mongo.cursorID -> string)
-
- /** Predicate for end of query, when the cursorID is returned as zero **/
- is_null_cursorID = (%% BslMongo.Mongo.is_null_cursorID %% : Mongo.cursorID -> bool)
-
- reply_to_result(from:string, n:int, reply_opt: option(Mongo.reply)): Mongo.result =
- match reply_opt with
- | {some=reply} ->
- numberReturned = reply_numberReturned(reply)
- if numberReturned <= n
- then failErr("{from}: insufficient number of documents {numberReturned} vs {n}")
- else
- (match MongoDriver.reply_document(reply,n) with
- | {some=doc} -> {success=doc}
- | {none} -> failErr("{from}: no document in reply"))
- | {none} -> failErr("{from}: no reply")
-
@private get_index_opts(options:int): Bson.document =
- List.flatten([(if Bitwise.land(options,UniqueBit) != 0 then [H.bool("unique",true)] else []),
- (if Bitwise.land(options,DropDupsBit) != 0 then [H.bool("dropDups",true)] else []),
- (if Bitwise.land(options,BackgroundBit) != 0 then [H.bool("background",true)] else []),
- (if Bitwise.land(options,SparseBit) != 0 then [H.bool("sparse",true)] else [])])
+ List.flatten([(if Bitwise.land(options,C.UniqueBit) != 0 then [H.bool("unique",true)] else []),
+ (if Bitwise.land(options,C.DropDupsBit) != 0 then [H.bool("dropDups",true)] else []),
+ (if Bitwise.land(options,C.BackgroundBit) != 0 then [H.bool("background",true)] else []),
+ (if Bitwise.land(options,C.SparseBit) != 0 then [H.bool("sparse",true)] else [])])
- // TODO: create_indexe with getlasterror
@private create_index_(ns:string, key:Bson.document, opts:Bson.document) =
keys = Bson.keys(key)
name = "_"^(String.concat("",keys))
View
2 stdlib/apis/mongo/replset.opa
@@ -175,7 +175,7 @@ MongoReplicaSet = {{
adminCommandOpaLL(m:Mongo.db, cmd:string): outcome('a,Mongo.failure) =
match MongoCommands.simple_int_command_ll(m,"admin",cmd,1) with
| {success=doc} ->
- (match MongoDriver.result_to_opa({success=doc}) with
+ (match MongoCommon.result_to_opa({success=doc}) with
| {some=a} -> {success=a}
| {none} -> {failure={Error="MongoReplicaSet.adminCommandOpaLL: invalid document from db admin ({Bson.to_pretty(doc)})"}})
| {~failure} -> {~failure}
View
2 stdlib/apis/mongo/utils.opa
@@ -52,7 +52,7 @@ MongoUtils = {{
(match Bson.find_string(success, "err") with
| {some=""} | {none} -> true
| {some=err} -> ML.error("{msg}","{err}",false))
- | {~failure} -> ML.error("{msg}","fatal error {MongoDriver.string_of_failure(failure)}",false))
+ | {~failure} -> ML.error("{msg}","fatal error {MongoCommon.string_of_failure(failure)}",false))
safe_insert(c,v) = safe_(c,((c,v) -> MongoCollection.insert(c,v)),(c,v),"Collection.insert")
safe_insert_batch(c,b) = safe_(c,((c,b) -> MongoCollection.insert_batch(c,b)),(c,b),"Collection.insert_batch")

0 comments on commit 1aec0f5

Please sign in to comment.