Permalink
Browse files

[enhance] stdlib: Reexported low-level functions at the MongoConnecti…

…on layer.
  • Loading branch information...
1 parent bc98b99 commit a006b891ae426dc665758147b379bc2cd20027d5 @nrs135 nrs135 committed Nov 22, 2011
Showing with 222 additions and 56 deletions.
  1. +11 −6 stdlib/apis/mongo/commands.opa
  2. +149 −23 stdlib/apis/mongo/connection.opa
  3. +7 −3 stdlib/apis/mongo/cursor.opa
  4. +55 −24 stdlib/apis/mongo/mongo.opa
@@ -429,17 +429,20 @@ MongoCommands = {{
/**
* Query the "config.shards" database, gives a list of shards.
**/
- findShards(m:Mongo.mongodb, query:Bson.document): Mongo.results = MongoCursor.find_all(m.mongo, "config.shards", query, 100)
+ findShards(m:Mongo.mongodb, query:Bson.document, limit:int): Mongo.results =
+ MongoCursor.find_all(m.mongo, "config.shards", query, {none}, {none}, limit)
/**
* Query the "config.databases" database, gives a list of shard information about databases.
**/
- findDatabases(m:Mongo.mongodb, query:Bson.document): Mongo.results = MongoCursor.find_all(m.mongo, "config.databases", query, 100)
+ findDatabases(m:Mongo.mongodb, query:Bson.document, limit:int): Mongo.results =
+ MongoCursor.find_all(m.mongo, "config.databases", query, {none}, {none}, limit)
/**
* Query the "config.locks" database, gives information about the shard balancer.
**/
- findBalancer(m:Mongo.mongodb): Mongo.results = MongoCursor.find_all(m.mongo, "config.locks", [H.str("_id","balancer")], 100)
+ findBalancer(m:Mongo.mongodb, limit:int): Mongo.results =
+ MongoCursor.find_all(m.mongo, "config.locks", [H.str("_id","balancer")], {none}, {none}, limit)
/**
* Low-level, set "config.settings" balancer value. Valid objects are "stopped" and "start/stop".
@@ -468,7 +471,8 @@ MongoCommands = {{
/**
* Query the "config.chunks" database, gives a information about shard distribution.
**/
- findChunks(m:Mongo.mongodb, query:Bson.document): Mongo.results = MongoCursor.find_all(m.mongo, "config.chunks", query, 100)
+ findChunks(m:Mongo.mongodb, query:Bson.document, limit:int): Mongo.results =
+ MongoCursor.find_all(m.mongo, "config.chunks", query, {none}, {none}, limit)
/**
* Add a shard to a database.
@@ -548,9 +552,10 @@ MongoCommands = {{
(match Bson.dot_int(doc,"remaining.dbs") with
| {some=0} | {none} -> {success=doc}//??failure
| _ ->
- (match MongoCursor.find_all(m.mongo, "config.databases", [H.str("primary",shard)], 100) with
+ (match MongoCursor.find_all(m.mongo, "config.databases",
+ [H.str("primary",shard)], {none}, {none}, 100) with
| {success=dbs} ->
- (match MongoCursor.find_all(m.mongo, "config.shards", [], 100) with
+ (match MongoCursor.find_all(m.mongo, "config.shards", [], {none}, {none}, 100) with
| {success=[]} -> {failure={Error="No shards to move primary"}}
| {success=shards} ->
do println("dbs={Bson.to_pretty_list(dbs)}\nshards={Bson.to_pretty_list(shards)}")
@@ -72,6 +72,7 @@ type Mongo.mongodb = {
update_flags: int;
delete_flags: int;
query_flags: int;
+ index_flags: int;
}
type Mongo.param = {
@@ -245,7 +246,7 @@ MongoConnection = {{
keyname="key"; valname="value"; idxname="index";
dbname="db"; collection="collection";
fields={none}; orderby={none}; limit=0; skip=0;
- insert_flags=0; update_flags=0; delete_flags=0; query_flags=0;
+ insert_flags=0; update_flags=0; delete_flags=0; query_flags=0; index_flags=0;
}
do System.at_exit( ->
if db.link_count.get() > 0
@@ -266,7 +267,7 @@ MongoConnection = {{
* give a name to the connection even though it is dissociated from the
* list of named connections.
*
- * Example: [openraw(name, bufsize, close_socket, log, host, port)]
+ * Example: [openraw(name, bufsize, concurrency, pool_max, close_socket, log, host, port)]
**/
openraw(name:string, bufsize:int, concurrency:Mongo.concurrency,
pool_max:int, close_socket:bool, log:bool, addr:string, port:int)
@@ -276,7 +277,7 @@ MongoConnection = {{
/**
* Open a connection to a replica set starting from the given list of seeds.
*
- * Example: [replraw(name, bufsize, close_socket, log, seeds)]
+ * Example: [replraw(name, bufsize, concurrency, pool_max, close_socket, log, seeds)]
*
* This routine causes a serach for the current host list among the seeds
* and then searches for the primary among the hosts. Rconnection logic
@@ -328,22 +329,6 @@ MongoConnection = {{
db
/**
- * Change the namespace built into the connection. The defaults are:
- * db="db" and collection="collection".
- **/
- //* Changing the namespace {b no longer} bumps the link count.
- namespace(db:Mongo.mongodb, dbname:string, collection:string): Mongo.mongodb =
- //do db.link_count.set(db.link_count.get()+1)
- { db with ~dbname; ~collection }
-
- /**
- * Enable/disable logging for the given connection. Only applies to
- * the connection returned.
- **/
- log(db:Mongo.mongodb, log:bool): Mongo.mongodb =
- { db with mongo={ db.mongo with ~log } }
-
- /**
* Decrement the link count on a connection and close when zero.
**/
close(db:Mongo.mongodb): void =
@@ -360,6 +345,55 @@ MongoConnection = {{
else void
else void
+ /**
+ * Change the namespace built into the connection. The defaults are:
+ * db="db" and collection="collection".
+ **/
+ //* Changing the namespace {b no longer} bumps the link count.
+ namespace(db:Mongo.mongodb, dbname:string, collection:string): Mongo.mongodb =
+ //do db.link_count.set(db.link_count.get()+1)
+ { db with ~dbname; ~collection }
+
+ /** Change the concurrency type (won't take effect until reconnect **/
+ concurrency(db:Mongo.mongodb, concurrency:Mongo.concurrency): Mongo.mongodb =
+ { db with mongo={ db.mongo with ~concurrency } }
+
+ /** Change the pool size **/
+ pool_max(db:Mongo.mongodb, pool_max:int): Mongo.mongodb =
+ { db with mongo={ db.mongo with ~pool_max } }
+
+ /** Change the socket close status (only for singlethreaded and cell) **/
+ close_socket(db:Mongo.mongodb, close_socket:bool): Mongo.mongodb =
+ { db with mongo={ db.mongo with ~close_socket } }
+
+ /** Chenge the bufsize hint (only applies to newly created buffers) **/
+ bufsize(db:Mongo.mongodb, bufsize:int): Mongo.mongodb =
+ { db with mongo={ db.mongo with ~bufsize } }
+
+ /** Enable/disable logging for the given connection. **/
+ log(db:Mongo.mongodb, log:bool): Mongo.mongodb =
+ { db with mongo={ db.mongo with ~log } }
+
+ /** Add a seed (takes effect on reconnect) **/
+ add_seed(db:Mongo.mongodb, seed:Mongo.mongo_host): Mongo.mongodb =
+ { db with mongo={ db.mongo with seeds=seed +> db.mongo.seeds } }
+
+ /** Remove a seed **/
+ remove_seed(db:Mongo.mongodb, seed:Mongo.mongo_host): Mongo.mongodb =
+ { db with mongo={ db.mongo with seeds=List.filter((s -> s != seed),db.mongo.seeds) } }
+
+ /** Change the reconnect wait time (milliseconds) **/
+ reconnect_wait(db:Mongo.mongodb, reconnect_wait:int): Mongo.mongodb =
+ { db with mongo={ db.mongo with ~reconnect_wait } }
+
+ /** Change the maximum number of reconnection attempts before fail **/
+ max_attempts(db:Mongo.mongodb, max_attempts:int): Mongo.mongodb =
+ { db with mongo={ db.mongo with ~max_attempts } }
+
+ /** Change the basic communications timeout (default, 1 hour) **/
+ comms_timeout(db:Mongo.mongodb, comms_timeout:int): Mongo.mongodb =
+ { db with mongo={ db.mongo with ~comms_timeout } }
+
/** Return the name of the inbuilt db **/
dbname(m:Mongo.mongodb): string = m.dbname
@@ -438,10 +472,102 @@ MongoConnection = {{
partial(db:Mongo.mongodb): Mongo.mongodb =
{ db with query_flags=Bitwise.lor(db.query_flags,MongoDriver.PartialBit) }
- insert(m:Mongo.mongodb, flags:int, ns:string, documents:Bson.document): bool =
- MongoDriver.insert(m.mongo, flags, ns, documents)
- inserte(m:Mongo.mongodb, flags:int, ns:string, dbname:string, documents:Bson.document): option(Mongo.reply) =
- MongoDriver.inserte(m.mongo, flags, ns, dbname, documents)
+ /** Set the "unique" flag for all [index] calls. **/
+ unique(db:Mongo.mongodb): Mongo.mongodb =
+ { db with index_flags=Bitwise.lor(db.index_flags,MongoDriver.UniqueBit) }
+
+ /** Set the "dropdups" flag for all [index] calls. **/
+ dropDups(db:Mongo.mongodb): Mongo.mongodb =
+ { db with index_flags=Bitwise.lor(db.index_flags,MongoDriver.DropDupsBit) }
+
+ /** Set the "background" flag for all [index] calls. **/
+ background(db:Mongo.mongodb): Mongo.mongodb =
+ { db with index_flags=Bitwise.lor(db.index_flags,MongoDriver.BackgroundBit) }
+
+ /** Set the "sparse" flag for all [index] calls. **/
+ sparse(db:Mongo.mongodb): Mongo.mongodb =
+ { db with index_flags=Bitwise.lor(db.index_flags,MongoDriver.SparseBit) }
+
+ /** Insert document into the defined database with inbuilt flags **/
+ insert(m:Mongo.mongodb, documents:Bson.document): bool =
+ MongoDriver.insert(m.mongo, m.insert_flags, "{m.dbname}.{m.collection}", documents)
+
+ /** Insert document with getlasterror into the defined database with inbuilt flags **/
+ inserte(m:Mongo.mongodb, documents:Bson.document): option(Mongo.reply) =
+ MongoDriver.inserte(m.mongo, m.insert_flags, "{m.dbname}.{m.collection}", m.dbname, documents)
+
+ /** Insert batch of documents into the defined database with inbuilt flags **/
+ insert_batch(m:Mongo.mongodb, documents:list(Bson.document)): bool =
+ MongoDriver.insert_batch(m.mongo, m.insert_flags, "{m.dbname}.{m.collection}", documents)
+
+ /** Insert batch of documents with getlasterror into the defined database with inbuilt flags **/
+ insert_batche(m:Mongo.mongodb, documents:list(Bson.document)): option(Mongo.reply) =
+ MongoDriver.insert_batche(m.mongo, m.insert_flags, "{m.dbname}.{m.collection}", m.dbname, documents)
+
+ /** Update document in the defined database with inbuilt flags **/
+ update(m:Mongo.mongodb, selector:Bson.document, update:Bson.document): bool =
+ MongoDriver.update(m.mongo, m.update_flags, "{m.dbname}.{m.collection}", selector, update)
+
+ /** Update document with getlasterror in the defined database with inbuilt flags **/
+ updatee(m:Mongo.mongodb, selector:Bson.document, update:Bson.document): option(Mongo.reply) =
+ MongoDriver.updatee(m.mongo, m.update_flags, "{m.dbname}.{m.collection}", m.dbname, selector, update)
+
+ /** Perform a query using inbuilt parameters.
+ * The functions to handle [Mongo.reply] are in [MongoDriver].
+ **/
+ query(m:Mongo.mongodb, query:Bson.document): option(Mongo.reply) =
+ query = (match m.orderby with
+ | {some=orderby} -> [H.doc("$query",query), H.doc("$orderby",orderby)]
+ | {none} -> query)
+ MongoDriver.query(m.mongo, m.query_flags, "{m.dbname}.{m.collection}", m.skip, m.limit, query, m.fields)
+
+ /** Perform a get_more using inbuilt parameters **/
+ get_more(m:Mongo.mongodb, cursorID:Mongo.cursorID): option(Mongo.reply) =
+ MongoDriver.get_more(m.mongo, "{m.dbname}.{m.collection}", m.limit, cursorID)
+
+ /** Delete documents from the defined database with inbuilt flags **/
+ delete(m:Mongo.mongodb, selector:Bson.document): bool =
+ MongoDriver.delete(m.mongo, m.delete_flags, "{m.dbname}.{m.collection}", selector)
+
+ /** Delete documents with getlasterror from the defined database with inbuilt flags **/
+ deletee(m:Mongo.mongodb, selector:Bson.document): option(Mongo.reply) =
+ MongoDriver.deletee(m.mongo, m.delete_flags, "{m.dbname}.{m.collection}", m.dbname, selector)
+
+ /** Perform a kill_cursors operation **/
+ kill_cursors(m:Mongo.mongodb, cursors:list(Mongo.cursorID)): bool =
+ MongoDriver.kill_cursors(m.mongo, cursors)
+
+ /** Perform a kill_cursors operation with getlasterror **/
+ kill_cursorse(m:Mongo.mongodb, cursors:list(Mongo.cursorID)): option(Mongo.reply) =
+ MongoDriver.kill_cursorse(m.mongo, m.dbname, cursors)
+
+ /** Perform a msg operation **/
+ msg(m:Mongo.mongodb, msg:string): bool =
+ MongoDriver.msg(m.mongo, msg)
+
+ /** Perform a msg operation with getlasterror **/
+ msge(m:Mongo.mongodb, msg:string): option(Mongo.reply) =
+ MongoDriver.msge(m.mongo, m.dbname, msg)
+
+ /** Add an index to the inbuilt collection **/
+ create_index(m:Mongo.mongodb, key:Bson.document): bool =
+ MongoDriver.create_index(m.mongo, "{m.dbname}.{m.collection}", key, m.index_flags)
+
+ /** Add an index to the inbuilt collection with getlasterror **/
+ create_indexe(m:Mongo.mongodb, key:Bson.document): option(Mongo.reply) =
+ MongoDriver.create_indexe(m.mongo, "{m.dbname}.{m.collection}", m.dbname, key, m.index_flags)
+
+ /** Perform a query according to inbuilt parameters, return cursor **/
+ find(m:Mongo.mongodb, query:Bson.document): outcome(Mongo.cursor,Mongo.failure) =
+ MongoCursor.find(m.mongo, "{m.dbname}.{m.collection}", query, m.fields, m.orderby, m.limit, m.skip, m.query_flags)
+
+ /** Perform a query according to inbuilt parameters, return first match **/
+ find_one(m:Mongo.mongodb, query:Bson.document): Mongo.result =
+ MongoCursor.find_one(m.mongo, "{m.dbname}.{m.collection}", query, m.fields, m.orderby)
+
+ /** Perform a query according to inbuilt parameters, return up to [limit] matches **/
+ find_all(m:Mongo.mongodb, query:Bson.document): Mongo.results =
+ MongoCursor.find_all(m.mongo, "{m.dbname}.{m.collection}", query, m.fields, m.orderby, m.limit)
}}
@@ -362,9 +362,13 @@ MongoCursor = {{
*
* {b NOTE: reverses the order.}
**/
- // TODO: fields and orderby here
- find_all(m:Mongo.db, ns:string, query:Bson.document, limit:int): Mongo.results =
- cursor = set_query(set_limit(init(m,ns),limit),{some=query})
+ find_all(m:Mongo.db, ns:string, query:Bson.document,
+ fields:option(Bson.document), orderby:option(Bson.document), limit:int): Mongo.results =
+ cursor = init(m, ns)
+ cursor = set_query(cursor, {some=query})
+ cursor = set_fields(cursor, fields)
+ cursor = set_orderby(cursor, orderby)
+ cursor = set_limit(cursor, limit)
(cursor,results) =
for((cursor,{success=[]}),
((cursor,results) ->
Oops, something went wrong.

0 comments on commit a006b89

Please sign in to comment.