Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

[feature] stdlib: Mongo connection parameters now read from command l…

…ine.
  • Loading branch information...
commit 4822b26e9b709aee25bf3bf68acf13c30686d42a 1 parent ee514b1
@nrs135 nrs135 authored
View
45 stdlib/apis/mongo/collection.opa
@@ -99,6 +99,16 @@ type Mongo.collection_cursor('a) = {
type Mongo.group('a) = { retval:list('a); count:int; keys:int; ok:int }
type Mongo.group_result('a) = outcome(Mongo.group('a),Mongo.failure)
+/**
+ * Return a package of pre-typed values.
+ **/
+type Mongo.pkg('value) =
+ (Mongo.collection('value),
+ {select:(Bson.document -> Mongo.select('value));
+ update:(Bson.document -> Mongo.update('value));
+ sempty:Mongo.select('value);
+ uempty:Mongo.update('value)})
+
MongoCollection = {{
@private ML = MongoLog
@@ -159,12 +169,43 @@ MongoCollection = {{
* Create a collection from a [Mongo.mongodb] connection. The type of the connection
* is remembered here and used to check the types of values returned from the MongoDB server.
* Note, however, that we clone the connection so that we will be using the same connection
- * to the server as the parent connection. For concurrent access to collections you should
- * have a fresh connection for each thread.
+ * to the server as the parent connection.
**/
create(db:Mongo.mongodb): Mongo.collection('value) = { db=MongoConnection.clone(db); ty=@typeval('value); }
/**
+ * Open a connection and create a collection on top of it. Unlike [create] the connection
+ * is encapsulated in the collection object so that when the collection is destroyed the
+ * connection is closed.
+ **/
+ open(name:string, dbname:string, collection:string): outcome(Mongo.collection('value),Mongo.failure) =
+ match MongoConnection.open(name) with
+ | {success=mongo} -> {success={ db=MongoConnection.namespace(mongo,dbname,collection); ty=@typeval('value); }}
+ | {~failure} -> {~failure}
+
+ /** Same as [open] but treat a failure to open the connection as a fatal error. **/
+ openfatal(name:string, dbname:string, collection:string): Mongo.collection('value) =
+ match open(name, dbname, collection) with
+ | {success=coll} -> coll
+ | {~failure} -> ML.fatal("MongoCollection.openfatal","Can't connect: {MongoDriver.string_of_failure(failure)}",-1)
+
+ /** Supply a set of useful values associated with a collection **/
+ makepkg(c:Mongo.collection('value)): Mongo.pkg('value) =
+ (c,
+ {select=MongoSelect.create;
+ update=MongoUpdate.create;
+ sempty=(MongoSelect.create(MongoSelectUpdate.empty()));
+ uempty=(MongoUpdate.create(MongoSelectUpdate.empty()))})
+
+ /** Same as [open] but returning pre-typed select and update functions **/
+ openpkg(name:string, dbname:string, collection:string): outcome(Mongo.pkg('value),Mongo.failure) =
+ MongoDriver.map_success(open(name,dbname,collection),makepkg)
+
+ /** Same as [openfatal] but returning pre-typed select and update functions **/
+ openpkgfatal(name:string, dbname:string, collection:string) : Mongo.pkg('value) =
+ makepkg(openfatal(name,dbname,collection))
+
+ /**
* Destroy a collection. Actually just close the cloned connection.
**/
destroy(c:Mongo.collection('value)): void = MongoConnection.close(c.db)
View
19 stdlib/apis/mongo/commands.opa
@@ -113,10 +113,10 @@ type Mongo.serverStatusType = {
opcounters : { insert : int; query : int; update : int; delete : int; getmore : int; command : int; };
asserts : { regular : int; warning : int; msg : int; user : int; rollovers : int; };
writeBacksQueued : bool;
- repl : Bson.register({ setName : string;
+ repl : Bson.register({ setName : Bson.register(string);
ismaster : bool;
- secondary : bool;
- hosts : list(string);
+ secondary : Bson.register(bool);
+ hosts : Bson.register(list(string));
primary : Bson.register(string);
});
ok : int;
@@ -323,15 +323,10 @@ MongoCommands = {{
/**
* Create a collection.
**/
- // TODO: There is no such command. See how the mongo shell does it and copy...
- /*
- createCollection(m:Mongo.mongodb, db:string, collection:string, capped:option({capped:bool; size:int;})): Mongo.result =
- match capped with
- | {some=~{capped; size}} ->
- simple_str_command_opts(m, db, "createCollection", collection, [H.bool("capped",capped), H.i32("size",size)])
- | {none} ->
- simple_str_command(m, db, "createCollection", collection)
- */
+ createCollection(m:Mongo.mongodb, dbname:string, collection:string, capped:option(bool), size:option(int)): Mongo.result =
+ opts = List.flatten([match capped with {some=tf} -> [H.bool("capped",tf)] | _ -> [],
+ match size with {some=size} -> [H.i32("size",size)] | _ -> []])
+ simple_str_command_opts(m, dbname, "create", collection, opts)
/**
* Cap a collection. Example: [convertToCapped(m, db, collection, size)]
View
215 stdlib/apis/mongo/connection.opa
@@ -30,8 +30,14 @@
* servers. To be used by higher-level modules so that only one
* connection is opened to a given server whereas several interfaces can be attached to the open connection.
*
- * Note that you have to be careful with concurrency, here. This mechanism is not intended
- * to block access to shared resources.
+ * We also handle the command line arguments here. When we call [open] we parse (one time)
+ * the command line which sets up the variable [params]. We implement a system of named
+ * connections and we build up a description of each named connection:
+ *
+ * [prog.exe -mn conn_name -mr replname -ms localhost:12345 -ms localhost:54321]
+ *
+ * We can then open connections by name: [MongoConnection.open("conn_name")], the default
+ * connection name is "default".
*
* {1 Where should I start?}
*
@@ -48,6 +54,7 @@
@abstract
type Mongo.mongodb = {
mongo: Mongo.db;
+ name: string;
bufsize: int;
addr: string;
port: int;
@@ -67,17 +74,151 @@ type Mongo.mongodb = {
query_flags: int;
}
+type Mongo.param = {
+ name:string;
+ replname:option(string);
+ bufsize:int;
+ close_socket:bool;
+ log:bool;
+ seeds:list(Mongo.mongo_host);
+}
+type Mongo.params = list(Mongo.param)
+
MongoConnection = {{
@private ML = MongoLog
+ @private default_seeds = ([("localhost",MongoDriver.default_port)]:list(Mongo.mongo_host))
+
+ @private init_param = ({
+ name = "default";
+ replname = {none};
+ bufsize = 50*1024;
+ close_socket = false;
+ log = false;
+ seeds = default_seeds;
+ }:Mongo.param)
+
+ @private last_name = Mutable.make("default")
+
+ @private params = Mutable.make(([init_param]:Mongo.params))
+
+ @private params_done = Mutable.make(false)
+
+ /**
+ * Add a named connection to the list of named connections.
+ * If this function is called {b before} the first call to [MongoConnection.open]
+ * then the command line parameters can update the value we add here. If it is
+ * called {b after} the first [open] call then we override the command line parameters
+ * and set them here.
+ **/
+ add_named_connection(p:Mongo.param): void =
+ rec add(l) =
+ match l with
+ | [] -> [p]
+ | [h|t] ->
+ if h.name == p.name
+ then [p|t]
+ else [h|add(t)]
+ params.set(add(params.get()))
+
@private
- open_(dbo:outcome(Mongo.db,Mongo.failure)): outcome(Mongo.mongodb,Mongo.failure) =
+ add_param(f,p:Mongo.params) =
+ ln = last_name.get()
+ rec updt(l) =
+ match l with
+ | [p|rest] ->
+ if p.name == ln
+ then [f(p)|rest]
+ else [p|updt(rest)]
+ | [] -> [f({ init_param with name=ln })]
+ updt(p)
+
+ @private
+ get_params = ->
+ do if not(params_done.get())
+ then params.set(CommandLine.filter({
+ title = "MongoDB connection parameters";
+ init = params.get() : Mongo.params;
+ anonymous = [];
+ parsers = [
+ {CommandLine.default_parser with
+ names = ["--mongoname", "-mn"]
+ description = "Name for the MongoDB server connection"
+ param_doc = "<string>"
+ on_param(p) = parser name={Rule.consume} ->
+ do last_name.set(name)
+ {no_params = add_param((p -> { p with ~name }),p)}
+ },
+ {CommandLine.default_parser with
+ names = ["--mongoreplname", "-mr"]
+ description = "Replica set name for the MongoDB server"
+ param_doc = "<string>"
+ on_param(p) = parser s={Rule.consume} ->
+ {no_params = add_param((p -> { p with replname={some=s} }),p)}
+ },
+ {CommandLine.default_parser with
+ names = ["--mongobufsize", "-mb"]
+ description = "Hint for initial MongoDB connection buffer size"
+ param_doc = "<int>"
+ on_param(p) = parser n={Rule.natural} -> {no_params = add_param((p -> { p with bufsize = n }),p)}
+ },
+ {CommandLine.default_parser with
+ names = ["--mongoclosesocket", "-mc"]
+ description = "Maintain MongoDB server sockets in a closed state"
+ param_doc = "<bool>"
+ on_param(p) = parser b={Rule.bool} -> {no_params = add_param((p -> { p with close_socket = b }),p)}
+ },
+ {CommandLine.default_parser with
+ names = ["--mongolog", "-ml"]
+ description = "Enable MongoLog logging"
+ param_doc = "<bool>"
+ on_param(p) = parser b={Rule.bool} -> {no_params = add_param((p -> { p with log = b }),p)}
+ },
+ {CommandLine.default_parser with
+ names = ["--mongoseed", "-ms"]
+ description = "Add a seed to a replica set, allows multiple seeds"
+ param_doc = "<host>\{:<port>\}"
+ on_param(p) =
+ parser s={Rule.consume} ->
+ {no_params = add_param((p ->
+ seeds = if p.seeds == default_seeds then [] else p.seeds
+ { p with seeds=[MongoReplicaSet.mongo_host_of_string(s)|seeds] }),p)}
+ },
+ {CommandLine.default_parser with
+ names = ["--mongohost", "-mh"]
+ description = "Host name of a MongoDB server, overwrites any previous addresses for this name"
+ param_doc = "<host>\{:<port>\}"
+ on_param(p) =
+ parser s={Rule.consume} ->
+ {no_params = add_param((p ->
+ { p with seeds=[MongoReplicaSet.mongo_host_of_string(s)] }),p)}
+ },
+ {CommandLine.default_parser with
+ names = ["--mongologtype", "-mt"]
+ description = "Type of logging: stdout, stderr, logger, none"
+ param_doc = "<string>"
+ on_param(p) = parser s={Rule.consume} ->
+ logtype =
+ ((match s with
+ | "stdout" -> {stdout}
+ | "stderr" -> {stderr}
+ | "logger" -> {logger}
+ | "none" | "nomongolog" -> {nomongolog}
+ | _ -> ML.fatal("MongoConnection.get_params","Unknown Mongo log type string {s}",-1)):Mongo.logtype)
+ {no_params = add_param((p -> do MongoLog.logtype.set(logtype) p),p)}
+ },
+ ];
+ }))
+ params_done.set(true)
+
+ @private
+ open_(dbo:outcome(Mongo.db,Mongo.failure),name:string): outcome(Mongo.mongodb,Mongo.failure) =
match dbo with
| {success=mongo} ->
(match mongo.primary.get() with
| {some=(addr,port)} ->
- db = {~mongo; bufsize=mongo.bufsize; ~addr; ~port; link_count=Mutable.make(1);
+ db = {~mongo; ~name; bufsize=mongo.bufsize; ~addr; ~port; link_count=Mutable.make(1);
keyname="key"; valname="value"; idxname="index";
dbname="db"; collection="collection";
fields={none}; orderby={none}; limit=0; skip=0;
@@ -96,33 +237,61 @@ MongoConnection = {{
| {none} -> {failure={Error="MongoConnection.open: no primary"}})
| {~failure} -> {~failure}
- /** Return the name of the inbuilt db **/
- dbname(m:Mongo.mongodb): string = m.dbname
-
- /** Return the name of the inbuilt collection **/
- collection(m:Mongo.mongodb): string = m.collection
-
/**
* Open a connection to a single server. No check for primary status is
- * carried out and no reconnection is attempted.
+ * carried out and no reconnection is attempted. Note that we need to
+ * give a name to the connection even though it is dissociated from the
+ * list of named connections.
*
- * Example: [open(bufsize, host, port)]
+ * Example: [openraw(name, bufsize, close_socket, log, host, port)]
**/
- open(bufsize:int, close_socket:bool, log:bool, addr:string, port:int): outcome(Mongo.mongodb,Mongo.failure) =
- open_(MongoDriver.open(bufsize,close_socket,addr,port,log))
+ openraw(name:string, bufsize:int, close_socket:bool, log:bool, addr:string, port:int): outcome(Mongo.mongodb,Mongo.failure) =
+ open_(MongoDriver.open(bufsize,close_socket,addr,port,log),name)
/**
* Open a connection to a replica set starting from the given list of seeds.
*
- * Example: [open(name, bufsize, seeds)]
+ * Example: [replraw(name, bufsize, close_socket, log, seeds)]
*
* This routine causes a serach for the current host list among the seeds
* and then searches for the primary among the hosts. Rconnection logic
* is enabled.
**/
- repl(name:string, bufsize:int, close_socket:bool, log:bool, seeds:list(Mongo.mongo_host))
+ replraw(name:string, bufsize:int, close_socket:bool, log:bool, seeds:list(Mongo.mongo_host))
: outcome(Mongo.mongodb,Mongo.failure) =
- open_(MongoReplicaSet.connect(MongoReplicaSet.init(name,bufsize,close_socket,log,seeds)))
+ open_(MongoReplicaSet.connect(MongoReplicaSet.init(name,bufsize,close_socket,log,seeds)),name)
+
+ /**
+ * Open a connection according to the named parameters.
+ *
+ * Parameters are defined on the command line and can define any number
+ * of connections:
+ *
+ * [prog.exe -ms localhost:27017 -mn blort -mr blort -ms localhost:10001 -mc true]
+ *
+ * The [-mn] option defines the name, following options apply to the most recent
+ * name on the command line (the default name is "default").
+ **/
+ open(name:string): outcome(Mongo.mongodb,Mongo.failure) =
+ do get_params()
+ match List.find((p -> p.name == name),params.get()) with
+ | {some=p} ->
+ (match p.replname with
+ | {some=rn} -> replraw(rn,p.bufsize,p.close_socket,p.log,p.seeds)
+ | {none} ->
+ (match p.seeds with
+ | [] -> {failure={Error="MongoConnection.open: No host for plain connection"}}
+ | [(host,port)] -> openraw(name,p.bufsize,p.close_socket,p.log,host,port)
+ | _ -> {failure={Error="MongoConnection.open: Multiple hosts for plain connection"}}))
+ | {none} -> {failure={Error="MongoConnection.open: No such replica name {name}"}}
+
+ /**
+ * Open a named connection but cause a fatal error if a connection cannot be found.
+ **/
+ openfatal(name:string): Mongo.mongodb =
+ match open(name) with
+ | {success=rs} -> rs
+ | {~failure} -> ML.fatal("MongoConnection.openfatal","Can't connect: {MongoDriver.string_of_failure(failure)}",-1)
/**
* Clone a connection. We actually just bump the link count. On close
@@ -134,11 +303,11 @@ MongoConnection = {{
/**
* Change the namespace built into the connection. The defaults are:
- * db="db" and collection="collection". Changing the namespace bumps
- * the link count.
+ * db="db" and collection="collection".
**/
+ //* Changing the namespace {b no longer} bumps the link count.
namespace(db:Mongo.mongodb, dbname:string, collection:string): Mongo.mongodb =
- do db.link_count.set(db.link_count.get()+1)
+ //do db.link_count.set(db.link_count.get()+1)
{ db with ~dbname; ~collection }
/**
@@ -165,6 +334,12 @@ MongoConnection = {{
else void
else void
+ /** Return the name of the inbuilt db **/
+ dbname(m:Mongo.mongodb): string = m.dbname
+
+ /** Return the name of the inbuilt collection **/
+ collection(m:Mongo.mongodb): string = m.collection
+
/**
* Return the last error on the given connection.
**/
View
32 stdlib/apis/mongo/mongo.opa
@@ -304,7 +304,7 @@ MongoDriver = {{
| {success=doc} ->
(match (Bson.doc2opa(doc):option('a)) with
| {some=a} -> {success=a}
- | {none} -> {failure={Error="Mongo.resultToOpa: document conversion failure"}})
+ | {none} -> {failure={Error="MongoDriver.resultToOpa: document conversion failure"}})
| {~failure} -> {~failure}
/** Flag bitmasks **/
@@ -509,7 +509,7 @@ MongoDriver = {{
reconnect(from:string, m:Mongo.db): bool =
if m.depth.get() > m.max_depth
then
- do if m.log then ML.error("reconnect({from})","max depth exceeded",void)
+ do if m.log then ML.error("MongoDriver.reconnect({from})","max depth exceeded",void)
false
else
ret(tf:bool) = do m.depth.set(m.depth.get()-1) tf
@@ -522,10 +522,10 @@ MongoDriver = {{
else
(match reconnectfn(m) with
| {success=_} ->
- do if m.log then ML.info("reconnect({from})","reconnected",void)
+ do if m.log then ML.info("MongoDriver.reconnect({from})","reconnected",void)
ret(true)
| {~failure} ->
- do if m.log then ML.info("reconnect({from})","failure={string_of_failure(failure)}",void)
+ do if m.log then ML.info("MongoDriver.reconnect({from})","failure={string_of_failure(failure)}",void)
do Scheduler.wait(m.reconnect_wait)
aux(attempts+1))
aux(0)
@@ -533,6 +533,8 @@ MongoDriver = {{
ret(false)
@private
+ // Rule: for every open_socket which returns success we must call close_socket
+ // Property: we can stack opens and the socket will only close once the last is closed
open_socket(m:Mongo.db): outcome(Socket.connection,Mongo.failure) =
match m.conn.get() with
| {some=conn} ->
@@ -570,7 +572,7 @@ MongoDriver = {{
| {success=conn} ->
(str, len) = export_(mbuf)
s = String.substring(0,len,str)
- do if m.log then ML.debug("Mongo.send({name})","\n{string_of_message(s)}",void)
+ do if m.log then ML.debug("MongoDriver.send({name})","\n{string_of_message(s)}",void)
(match Socket.write_len_with_err_cont(conn,m.comms_timeout,s,len) with
| {success=cnt} ->
do if not(reply_expected) then free_(mbuf) else void
@@ -580,7 +582,7 @@ MongoDriver = {{
do close_socket(m)
false)
| {~failure} ->
- ML.error("Mongo.send({name})","{string_of_failure(failure)}",false)
+ ML.error("MongoDriver.send({name})","{string_of_failure(failure)}",false)
@private
send_no_reply(m,mbuf,name): bool = send_no_reply_(m,mbuf,name,false)
@@ -598,7 +600,7 @@ MongoDriver = {{
rrt = reply_responseTo(reply)
do reset_mailbox_(mailbox)
do free_(mbuf)
- do if m.log then ML.debug("Mongo.receive({name})","\n{string_of_message_reply(reply)}",void)
+ do if m.log then ML.debug("MongoDriver.receive({name})","\n{string_of_message_reply(reply)}",void)
do close_socket(m)
if mrid != rrt
then ML.error("MongoDriver.send_with_reply","RequestId mismatch, expected {mrid}, got {rrt}",{none})
@@ -612,7 +614,7 @@ MongoDriver = {{
do close_socket(m)
{none}
| {~failure} ->
- ML.error("Mongo.receive({name})","{string_of_failure(failure)}",{none})
+ ML.error("MongoDriver.receive({name})","{string_of_failure(failure)}",{none})
@private
send_with_error(m,mbuf,name,ns): option(Mongo.reply) =
@@ -631,7 +633,7 @@ MongoDriver = {{
rrt = reply_responseTo(reply)
do reset_mailbox_(mailbox)
do free_(mbuf)
- do if m.log then ML.debug("Mongo.send_with_error({name})","\n{string_of_message_reply(reply)}",void)
+ do if m.log then ML.debug("MongoDriver.send_with_error({name})","\n{string_of_message_reply(reply)}",void)
do close_socket(m)
if mrid != rrt
then ML.error("MongoDriver.send_with_error","RequestId mismatch, expected {mrid}, got {rrt}",{none})
@@ -645,7 +647,7 @@ MongoDriver = {{
do close_socket(m)
{none}
| {~failure} ->
- ML.error("Mongo.send_with_error({name})","{string_of_failure(failure)}",{none})
+ ML.error("MongoDriver.send_with_error({name})","{string_of_failure(failure)}",{none})
@private
sr(_, msg) =
@@ -668,7 +670,7 @@ MongoDriver = {{
| {reconnect} ->
if reconnect("send_no_reply",m)
then snd(m,mbuf,name)
- else ML.fatal("Mongo.send({name}):","comms error (Can't reconnect)",-1)
+ else ML.fatal("MongoDriver.send({name}):","comms error (Can't reconnect)",-1)
| {~sendresult} -> sendresult
| _ -> @fail
@@ -678,7 +680,7 @@ MongoDriver = {{
| {reconnect} ->
if reconnect("send_with_reply",m)
then sndrcv(m,mbuf,name)
- else ML.fatal("Mongo.receive({name}):","comms error (Can't reconnect)",-1)
+ else ML.fatal("MongoDriver.receive({name}):","comms error (Can't reconnect)",-1)
| {~sndrcvresult} -> sndrcvresult
| _ -> @fail
@@ -688,7 +690,7 @@ MongoDriver = {{
| {reconnect} ->
if reconnect("send_with_error",m)
then snderr(m,mbuf,name,ns)
- else ML.fatal("Mongo.snderr({name}):","comms error (Can't reconnect)",-1)
+ else ML.fatal("MongoDriver.snderr({name}):","comms error (Can't reconnect)",-1)
| {~snderrresult} -> snderrresult
| _ -> @fail
@@ -729,7 +731,7 @@ MongoDriver = {{
* @param port Port number for the MongoDB server.
**/
connect(m:Mongo.db, addr:string, port:int): outcome(Mongo.db,Mongo.failure) =
- do if m.log then ML.info("Mongo.connect","bufsize={m.bufsize} addr={addr} port={port} log={m.log}",void)
+ do if m.log then ML.info("MongoDriver.connect","bufsize={m.bufsize} addr={addr} port={port} log={m.log}",void)
do match m.conn.get() with | {some=conn} -> Socket.close(conn) | {none} -> void
do m.conn.set({none})
do m.primary.set({none})
@@ -749,6 +751,7 @@ MongoDriver = {{
* Convenience function, initialise and connect at the same time.
**/
open(bufsize:int, close_socket:bool, addr:string, port:int, log:bool): outcome(Mongo.db,Mongo.failure) =
+ do if log then ML.info("MongoDriver.open","{addr}:{port}",void)
connect(init(bufsize,close_socket,log),addr,port)
/**
@@ -760,6 +763,7 @@ MongoDriver = {{
close(m:Mongo.db): Mongo.db =
do match m.conn.get() with
| {some=conn} ->
+ do if m.log then ML.info("MongoDriver.close","{m.primary.get()}",void)
do stop(m)
Socket.close(conn)
| {none} ->

0 comments on commit 4822b26

Please sign in to comment.
Something went wrong with that request. Please try again.