Permalink
Browse files

[feature] stdlib: Added socket pool, fixed major problem with reconne…

…ct, made concurrency selectable.
  • Loading branch information...
1 parent 5e3ab99 commit bc98b99ac5b34419d70956d619b2e00b4d490926 @nrs135 nrs135 committed Nov 22, 2011
View
@@ -106,6 +106,10 @@ let concat m1 m2 = { Bson.buf = Buf.of_string(m1.Bson.buf.Buf.str^m2.Bson.buf.Bu
let append m1 m2 = Buf.add_buf m1.Bson.buf m2.Bson.buf
+let length m = Buf.length m.Bson.buf
+
+let clip m len = m.Bson.buf.Buf.i <- len
+
(*
struct MsgHeader {
int32 messageLength; // total message size, including this
@@ -116,6 +120,7 @@ struct MsgHeader {
*)
let mongo_buf_requestId mb = geti32 mb.Bson.buf 4
+let mongo_buf_refresh_requestId mb rid = St.lei32l mb.Bson.buf.Buf.str 4 rid
let mongo_buf_responseTo mb = geti32 mb.Bson.buf 8
let header_messageLength s = geti32 s 0
View
@@ -44,6 +44,8 @@ val import : string -> mongo_buf
val copy : mongo_buf -> mongo_buf
val concat : mongo_buf -> mongo_buf -> mongo_buf
val append : mongo_buf -> mongo_buf -> unit
+val length : mongo_buf -> int
+val clip : mongo_buf -> int -> unit
val set_header_len : mongo_buf -> int -> unit
val set_header : mongo_buf -> int32 -> int -> int -> unit
val get_buf : ?hint:int -> unit -> Buf.t
@@ -79,4 +81,5 @@ val string_of_message_str : string -> string
val string_of_message_reply : Buf.buf * int * int -> string
val mongo_buf_requestId : mongo_buf -> int
+val mongo_buf_refresh_requestId : mongo_buf -> int32 -> unit
val mongo_buf_responseTo : mongo_buf -> int
View
@@ -337,6 +337,12 @@ let concat = Mongo.concat
##register append: Mongo.mongo_buf, Mongo.mongo_buf -> void
let append = Mongo.append
+##register length: Mongo.mongo_buf -> int
+let length = Mongo.length
+
+##register clip: Mongo.mongo_buf, int -> void
+let clip = Mongo.clip
+
##register clear: Mongo.mongo_buf -> void
let clear = Mongo.clear
@@ -422,6 +428,9 @@ let is_null_cursorID cid = ServerLib.wrap_bool (cid = 0L)
##register mongo_buf_requestId : Mongo.mongo_buf -> int
let mongo_buf_requestId = Mongo.mongo_buf_requestId
+##register mongo_buf_refresh_requestId : Mongo.mongo_buf -> void
+let mongo_buf_refresh_requestId m = Mongo.mongo_buf_refresh_requestId m (nextrid())
+
##register mongo_buf_responseTo : Mongo.mongo_buf -> int
let mongo_buf_responseTo = Mongo.mongo_buf_responseTo
@@ -29,12 +29,6 @@ open OpabslgenMLRuntime
##extern-type Socket.connection = Scheduler.connection_info
-(*let temporary_hack o = wrap_opa_outcome (BslUtils.unwrap_opa_outcome o)
-
-# # register [cps-bypass] test : string, continuation(outcome(string, int)) \
- -> void
-let test (s:string) k = QmlCpsServerLib.return k (temporary_hack (BslUtils.create_outcome (`success s)))*)
-
let create_outcome outcome k =
QmlCpsServerLib.return k (wrap_opa_outcome (BslUtils.unwrap_opa_outcome (BslUtils.create_outcome outcome)))
@@ -108,3 +102,6 @@ let read_with_err_cont connection_info timeout cont =
Scheduler.read Scheduler.default connection_info ~timeout:(Time.milliseconds timeout)
~err_cont:(fun exn -> create_outcome (`failure (Printexc.to_string exn)) cont)
(fun (_,str) -> create_outcome (`success str) cont)
+
+##register conn_id : Socket.connection -> int
+let conn_id conn = conn.Scheduler.conn_id
@@ -281,14 +281,6 @@ MongoCollection = {{
partial(c:Mongo.collection('value)): Mongo.collection('value)
= {c with db={ c.db with query_flags=Bitwise.lor(c.db.query_flags,MongoDriver.PartialBit) }}
- @private reply_to_result(from:string, reply_opt: option(Mongo.reply)): Mongo.result =
- match reply_opt with
- | {some=reply} ->
- (match MongoDriver.reply_document(reply,0) with
- | {some=doc} -> {success=doc}
- | {none} -> {failure={Error="{from}: no document in reply"}})
- | {none} -> {failure={Error="{from}: no reply"}}
-
/**
* Insert an OPA value into a collection.
**/
@@ -301,7 +293,7 @@ MongoCollection = {{
inserte(c:Mongo.collection('value), v:'value): Mongo.result =
ns = c.db.dbname^"."^c.db.collection
b = Bson.opa_to_bson(v,{some=@typeval('value)})
- reply_to_result("MongoConnection.insert",MongoDriver.inserte(c.db.mongo,c.db.insert_flags,ns,c.db.dbname,b))
+ MongoDriver.reply_to_result("MongoConnection.insert",0,MongoDriver.inserte(c.db.mongo,c.db.insert_flags,ns,c.db.dbname,b))
/**
* Batch insert, you need to build the batch using the [Batch] module.
@@ -313,7 +305,8 @@ MongoCollection = {{
/** insert_batch with getlasterror **/
insert_batche(c:Mongo.collection('value), b:Mongo.batch('value)): Mongo.result =
ns = c.db.dbname^"."^c.db.collection
- reply_to_result("MongoConnection.insert_barch",MongoDriver.insert_batche(c.db.mongo,c.db.insert_flags,ns,c.db.dbname,b))
+ MongoDriver.reply_to_result("MongoConnection.insert_batch",0,
+ MongoDriver.insert_batche(c.db.mongo,c.db.insert_flags,ns,c.db.dbname,b))
/**
* Update a value in a collection.
@@ -333,7 +326,8 @@ MongoCollection = {{
/** update with getlasterror **/
updatee(c:Mongo.collection('value), select:Mongo.select('value), update:Mongo.update('value)): Mongo.result =
ns = c.db.dbname^"."^c.db.collection
- reply_to_result("MongoConnection.update",MongoDriver.updatee(c.db.mongo,c.db.update_flags,ns,c.db.dbname,select,update))
+ MongoDriver.reply_to_result("MongoConnection.update",0,
+ MongoDriver.updatee(c.db.mongo,c.db.update_flags,ns,c.db.dbname,select,update))
/**
* Delete values in a collection according to a select value.
@@ -345,7 +339,8 @@ MongoCollection = {{
/** delete with getlasterror **/
deletee(c:Mongo.collection('value), select:Mongo.select('value)): Mongo.result =
ns = c.db.dbname^"."^c.db.collection
- reply_to_result("MongoConnection.delete",MongoDriver.deletee(c.db.mongo,c.db.delete_flags,ns,c.db.dbname,select))
+ MongoDriver.reply_to_result("MongoConnection.delete",0,
+ MongoDriver.deletee(c.db.mongo,c.db.delete_flags,ns,c.db.dbname,select))
/**
* Return the [Bson.document] representation of a single value selected from
@@ -70,6 +70,19 @@ type Mongo.getLastErrorOptions = {
wtimeout : Bson.register(Bson.int32);
}
+type Mongo.lastError = {
+ ok : int; //- true indicates the getLastError command completed successfully. This does NOT indicate there wasn't a last error.
+ err : Bson.register(string); //- if non-null, indicates an error occurred. Value is a textual description of the error.
+ code : Bson.register(int); //- if set, indicates the error code which occurred.
+ connectionId : Bson.register(int); //- the id of the connection
+ lastOp : Bson.register(Bson.document); //- the op-id from the last operation (varies, not always just an int)
+ n : Bson.register(int);
+}
+
+type Mongo.ok = {
+ ok : int;
+}
+
type Mongo.isMaster = {
ismaster : bool;
msg : Bson.register(string);
@@ -78,6 +78,8 @@ type Mongo.param = {
name:string;
replname:option(string);
bufsize:int;
+ concurrency:Mongo.concurrency;
+ pool_max:int;
close_socket:bool;
log:bool;
seeds:list(Mongo.mongo_host);
@@ -91,12 +93,14 @@ MongoConnection = {{
@private default_seeds = ([("localhost",MongoDriver.default_port)]:list(Mongo.mongo_host))
@private init_param = ({
- name = "default";
- replname = {none};
- bufsize = 50*1024;
- close_socket = false;
- log = false;
- seeds = default_seeds;
+ name="default";
+ replname={none};
+ bufsize=50*1024;
+ concurrency={pool};
+ pool_max=2;
+ close_socket=false;
+ log=false;
+ seeds=default_seeds;
}:Mongo.param)
@private last_name = Mutable.make("default")
@@ -143,40 +147,59 @@ MongoConnection = {{
anonymous = [];
parsers = [
{CommandLine.default_parser with
- names = ["--mongoname", "-mn"]
+ names = ["--mongo-name", "--mongoname", "-mn"]
description = "Name for the MongoDB server connection"
param_doc = "<string>"
on_param(p) = parser name={Rule.consume} ->
do last_name.set(name)
{no_params = add_param((p -> { p with ~name }),p)}
},
{CommandLine.default_parser with
- names = ["--mongoreplname", "-mr"]
+ names = ["--mongo-repl-name", "--mongoreplname", "-mr"]
description = "Replica set name for the MongoDB server"
param_doc = "<string>"
on_param(p) = parser s={Rule.consume} ->
{no_params = add_param((p -> { p with replname={some=s} }),p)}
},
{CommandLine.default_parser with
- names = ["--mongobufsize", "-mb"]
+ names = ["--mongo-buf-size", "--mongobufsize", "-mb"]
description = "Hint for initial MongoDB connection buffer size"
param_doc = "<int>"
on_param(p) = parser n={Rule.natural} -> {no_params = add_param((p -> { p with bufsize = n }),p)}
},
{CommandLine.default_parser with
- names = ["--mongoclosesocket", "-mc"]
+ names = ["--mongo-concurrency", "--mongoconcurrency", "-mx"]
+ description = "Concurrency type, 'pool', 'cell' or 'singlethreaded'"
+ param_doc = "<string>"
+ on_param(p) = parser s={Rule.consume} ->
+ concurrency =
+ ((match s with
+ | "pool" -> {pool}
+ | "cell" -> {cell}
+ | "singlethreaded" -> {singlethreaded}
+ | _ -> ML.fatal("MongoConnection.get_params","Unknown Mongo concurrency string {s}",-1)):Mongo.concurrency)
+ {no_params = add_param((p -> { p with ~concurrency }),p)}
+ },
+ {CommandLine.default_parser with
+ names = ["--mongo-socket-pool", "--mongosocketpool", "-mp"]
+ description = "Number of sockets in socket pool (>=2 enables socket pool)"
+ param_doc = "<int>"
+ on_param(p) = parser n={Rule.natural} -> {no_params = add_param((p -> { p with pool_max = n }),p)}
+ },
+ {CommandLine.default_parser with
+ names = ["--mongo-close-socket", "--mongoclosesocket", "-mc"]
description = "Maintain MongoDB server sockets in a closed state"
param_doc = "<bool>"
on_param(p) = parser b={Rule.bool} -> {no_params = add_param((p -> { p with close_socket = b }),p)}
},
{CommandLine.default_parser with
- names = ["--mongolog", "-ml"]
+ names = ["--mongo-log", "--mongolog", "-ml"]
description = "Enable MongoLog logging"
param_doc = "<bool>"
on_param(p) = parser b={Rule.bool} -> {no_params = add_param((p -> { p with log = b }),p)}
},
{CommandLine.default_parser with
- names = ["--mongoseed", "-ms"]
+ names = ["--mongo-seed", "--mongoseed", "-ms"]
description = "Add a seed to a replica set, allows multiple seeds"
param_doc = "<host>\{:<port>\}"
on_param(p) =
@@ -186,7 +209,7 @@ MongoConnection = {{
{ p with seeds=[MongoReplicaSet.mongo_host_of_string(s)|seeds] }),p)}
},
{CommandLine.default_parser with
- names = ["--mongohost", "-mh"]
+ names = ["--mongo-host", "--mongohost", "-mh"]
description = "Host name of a MongoDB server, overwrites any previous addresses for this name"
param_doc = "<host>\{:<port>\}"
on_param(p) =
@@ -195,7 +218,7 @@ MongoConnection = {{
{ p with seeds=[MongoReplicaSet.mongo_host_of_string(s)] }),p)}
},
{CommandLine.default_parser with
- names = ["--mongologtype", "-mt"]
+ names = ["--mongo-log-type", "--mongologtype", "-mt"]
description = "Type of logging: stdout, stderr, logger, none"
param_doc = "<string>"
on_param(p) = parser s={Rule.consume} ->
@@ -245,8 +268,10 @@ MongoConnection = {{
*
* Example: [openraw(name, bufsize, close_socket, log, host, port)]
**/
- openraw(name:string, bufsize:int, close_socket:bool, log:bool, addr:string, port:int): outcome(Mongo.mongodb,Mongo.failure) =
- open_(MongoDriver.open(bufsize,close_socket,addr,port,log),name)
+ openraw(name:string, bufsize:int, concurrency:Mongo.concurrency,
+ pool_max:int, close_socket:bool, log:bool, addr:string, port:int)
+ : outcome(Mongo.mongodb,Mongo.failure) =
+ open_(MongoDriver.open(bufsize,concurrency,pool_max,close_socket,addr,port,log),name)
/**
* Open a connection to a replica set starting from the given list of seeds.
@@ -257,9 +282,10 @@ MongoConnection = {{
* and then searches for the primary among the hosts. Rconnection logic
* is enabled.
**/
- replraw(name:string, bufsize:int, close_socket:bool, log:bool, seeds:list(Mongo.mongo_host))
+ replraw(name:string, bufsize:int, concurrency:Mongo.concurrency,
+ pool_max:int, close_socket:bool, log:bool, seeds:list(Mongo.mongo_host))
: outcome(Mongo.mongodb,Mongo.failure) =
- open_(MongoReplicaSet.connect(MongoReplicaSet.init(name,bufsize,close_socket,log,seeds)),name)
+ open_(MongoReplicaSet.connect(MongoReplicaSet.init(name,bufsize,concurrency,pool_max,close_socket,log,seeds)),name)
/**
* Open a connection according to the named parameters.
@@ -277,11 +303,11 @@ MongoConnection = {{
match List.find((p -> p.name == name),params.get()) with
| {some=p} ->
(match p.replname with
- | {some=rn} -> replraw(rn,p.bufsize,p.close_socket,p.log,p.seeds)
+ | {some=rn} -> replraw(rn,p.bufsize,p.concurrency,p.pool_max,p.close_socket,p.log,p.seeds)
| {none} ->
(match p.seeds with
| [] -> {failure={Error="MongoConnection.open: No host for plain connection"}}
- | [(host,port)] -> openraw(name,p.bufsize,p.close_socket,p.log,host,port)
+ | [(host,port)] -> openraw(name,p.bufsize,p.concurrency,p.pool_max,p.close_socket,p.log,host,port)
| _ -> {failure={Error="MongoConnection.open: Multiple hosts for plain connection"}}))
| {none} -> {failure={Error="MongoConnection.open: No such replica name {name}"}}
@@ -412,6 +438,11 @@ MongoConnection = {{
partial(db:Mongo.mongodb): Mongo.mongodb =
{ db with query_flags=Bitwise.lor(db.query_flags,MongoDriver.PartialBit) }
+ insert(m:Mongo.mongodb, flags:int, ns:string, documents:Bson.document): bool =
+ MongoDriver.insert(m.mongo, flags, ns, documents)
+ inserte(m:Mongo.mongodb, flags:int, ns:string, dbname:string, documents:Bson.document): option(Mongo.reply) =
+ MongoDriver.inserte(m.mongo, flags, ns, dbname, documents)
+
}}
// End of file connection.opa
Oops, something went wrong.

0 comments on commit bc98b99

Please sign in to comment.