Skip to content
Browse files

[feature] stdlib: Arranged closed-socket mode for Mongo.

  • Loading branch information...
1 parent 86efce2 commit ee514b1a4cb36c99d829ba1eb17de3baace50f4a @nrs135 nrs135 committed Nov 16, 2011
Showing with 98 additions and 57 deletions.
  1. +5 −4 stdlib/apis/mongo/connection.opa
  2. +89 −49 stdlib/apis/mongo/mongo.opa
  3. +4 −4 stdlib/apis/mongo/replset.opa
View
9 stdlib/apis/mongo/connection.opa
@@ -108,8 +108,8 @@ MongoConnection = {{
*
* Example: [open(bufsize, host, port)]
**/
- open(bufsize:int, addr:string, port:int): outcome(Mongo.mongodb,Mongo.failure) =
- open_(MongoDriver.open(bufsize,addr,port,false))
+ open(bufsize:int, close_socket:bool, log:bool, addr:string, port:int): outcome(Mongo.mongodb,Mongo.failure) =
+ open_(MongoDriver.open(bufsize,close_socket,addr,port,log))
/**
* Open a connection to a replica set starting from the given list of seeds.
@@ -120,8 +120,9 @@ MongoConnection = {{
* and then searches for the primary among the hosts. Rconnection logic
* is enabled.
**/
- repl(name:string, bufsize:int, seeds:list(Mongo.mongo_host)): outcome(Mongo.mongodb,Mongo.failure) =
- open_(MongoReplicaSet.connect(MongoReplicaSet.init(name,bufsize,false,seeds)))
+ repl(name:string, bufsize:int, close_socket:bool, log:bool, seeds:list(Mongo.mongo_host))
+ : outcome(Mongo.mongodb,Mongo.failure) =
+ open_(MongoReplicaSet.connect(MongoReplicaSet.init(name,bufsize,close_socket,log,seeds)))
/**
* Clone a connection. We actually just bump the link count. On close
View
138 stdlib/apis/mongo/mongo.opa
@@ -62,6 +62,8 @@ type Mongo.mongo_host = (string, int)
type Mongo.db = {
conn : Mutable.t(option(Socket.connection));
conncell : Cell.cell(Mongo.sr,Mongo.srr);
+ close_socket : bool;
+ socket_link : Mutable.t(int);
primary : Mutable.t(option(Mongo.mongo_host));
bufsize : int;
log : bool;
@@ -531,28 +533,63 @@ MongoDriver = {{
ret(false)
@private
- send_no_reply_(m,mbuf,name,reply_expected): bool =
+ open_socket(m:Mongo.db): outcome(Socket.connection,Mongo.failure) =
match m.conn.get() with
| {some=conn} ->
+ do m.socket_link.set(m.socket_link.get()+1)
+ {success=conn}
+ | {none} ->
+ (match m.primary.get() with
+ | {some=(host,port)} ->
+ (match Socket.connect_with_err_cont(host,port) with
+ | {success=conn} ->
+ do m.conn.set({some=conn})
+ do m.socket_link.set(m.socket_link.get()+1)
+ {success=conn}
+ | {failure=str} ->
+ {failure={Error="MongoDriver.open_socket: Got exception {str}"}})
+ | {none} -> {failure={Error="MongoDriver.open_socket: no primary"}})
+
+ @private
+ close_socket(m:Mongo.db): void =
+ do m.socket_link.set(m.socket_link.get()-1)
+ if m.close_socket
+ then
+ if m.socket_link.get() <= 0
+ then
+ do match m.conn.get() with
+ | {some=conn} ->
+ do Socket.close(conn)
+ m.conn.set({none})
+ | {none} -> void
+ m.socket_link.set(0)
+
+ @private
+ send_no_reply_(m,mbuf,name,reply_expected): bool =
+ match open_socket(m) with
+ | {success=conn} ->
(str, len) = export_(mbuf)
s = String.substring(0,len,str)
do if m.log then ML.debug("Mongo.send({name})","\n{string_of_message(s)}",void)
(match Socket.write_len_with_err_cont(conn,m.comms_timeout,s,len) with
| {success=cnt} ->
do if not(reply_expected) then free_(mbuf) else void
+ do close_socket(m)
(cnt==len)
- | {failure=_} -> false)
- | {none} ->
- ML.error("Mongo.send({name})","Attempt to write to unopened connection",false)
+ | {failure=_} ->
+ do close_socket(m)
+ false)
+ | {~failure} ->
+ ML.error("Mongo.send({name})","{string_of_failure(failure)}",false)
@private
send_no_reply(m,mbuf,name): bool = send_no_reply_(m,mbuf,name,false)
@private
send_with_reply(m,mbuf,name): option(Mongo.reply) =
mrid = mongo_buf_requestId(mbuf)
- match m.conn.get() with
- | {some=conn} ->
+ match open_socket(m) with
+ | {success=conn} ->
if send_no_reply_(m,mbuf,name,true)
then
mailbox = new_mailbox_(m.bufsize)
@@ -562,16 +599,20 @@ MongoDriver = {{
do reset_mailbox_(mailbox)
do free_(mbuf)
do if m.log then ML.debug("Mongo.receive({name})","\n{string_of_message_reply(reply)}",void)
+ do close_socket(m)
if mrid != rrt
then ML.error("MongoDriver.send_with_reply","RequestId mismatch, expected {mrid}, got {rrt}",{none})
else {some=reply}
| {~failure} ->
do if m.log then ML.info("send_with_reply","failure={failure}",void)
do reset_mailbox_(mailbox)
+ do close_socket(m)
{none})
- else {none}
- | {none} ->
- ML.error("Mongo.receive({name})","Attempt to write to unopened connection",{none})
+ else
+ do close_socket(m)
+ {none}
+ | {~failure} ->
+ ML.error("Mongo.receive({name})","{string_of_failure(failure)}",{none})
@private
send_with_error(m,mbuf,name,ns): option(Mongo.reply) =
@@ -580,8 +621,8 @@ MongoDriver = {{
mrid = mongo_buf_requestId(mbuf2)
do append_(mbuf,mbuf2)
do free_(mbuf2)
- match m.conn.get() with
- | {some=conn} ->
+ match open_socket(m) with
+ | {success=conn} ->
if send_no_reply_(m,mbuf,name,true)
then
mailbox = new_mailbox_(m.bufsize)
@@ -591,44 +632,33 @@ MongoDriver = {{
do reset_mailbox_(mailbox)
do free_(mbuf)
do if m.log then ML.debug("Mongo.send_with_error({name})","\n{string_of_message_reply(reply)}",void)
+ do close_socket(m)
if mrid != rrt
then ML.error("MongoDriver.send_with_error","RequestId mismatch, expected {mrid}, got {rrt}",{none})
else {some=reply}
| {~failure} ->
do if m.log then ML.info("send_with_error","failure={failure}",void)
do reset_mailbox_(mailbox)
+ do close_socket(m)
{none})
- else {none}
- | {none} ->
- ML.error("Mongo.send_with_error({name})","Attempt to write to unopened connection",{none})
+ else
+ do close_socket(m)
+ {none}
+ | {~failure} ->
+ ML.error("Mongo.send_with_error({name})","{string_of_failure(failure)}",{none})
@private
sr(_, msg) =
match msg with
| {send=(m,mbuf,name)} ->
- (match m.conn.get() with
- | {some=_conn} ->
- sr = send_no_reply(m,mbuf,name)
- {return=if sr then {sendresult=sr} else {reconnect}; instruction={unchanged}}
- | {none} ->
- do ML.error("Mongo.send","Unopened connection",void)
- {return={sendresult=false}; instruction={unchanged}})
+ sr = send_no_reply(m,mbuf,name)
+ {return=if sr then {sendresult=sr} else {reconnect}; instruction={unchanged}}
| {sendrecv=(m,mbuf,name)} ->
- (match m.conn.get() with
- | {some=_conn} ->
- swr = send_with_reply(m,mbuf,name)
- {return=if Option.is_some(swr) then {sndrcvresult=swr} else {reconnect}; instruction={unchanged}}
- | {none} ->
- do ML.error("Mongo.sendrecv","Unopened connection",void)
- {return={sndrcvresult={none}}; instruction={unchanged}})
+ swr = send_with_reply(m,mbuf,name)
+ {return=if Option.is_some(swr) then {sndrcvresult=swr} else {reconnect}; instruction={unchanged}}
| {senderror=(m,mbuf,name,ns)} ->
- (match m.conn.get() with
- | {some=_conn} ->
- swe = send_with_error(m,mbuf,name,ns)
- {return=if Option.is_some(swe) then {snderrresult=swe} else {reconnect}; instruction={unchanged}}
- | {none} ->
- do ML.error("Mongo.senderror","Unopened connection",void)
- {return={snderrresult={none}}; instruction={unchanged}})
+ swe = send_with_error(m,mbuf,name,ns)
+ {return=if Option.is_some(swe) then {snderrresult=swe} else {reconnect}; instruction={unchanged}}
| {stop} ->
{return={stopresult}; instruction={stop}}
@@ -676,11 +706,11 @@ MongoDriver = {{
* @param bufsize A hint to the driver for the initial buffer size.
* @param log Whether to enable logging for the driver.
**/
- init(bufsize:int, log:bool): Mongo.db =
+ init(bufsize:int, close_socket:bool, log:bool): Mongo.db =
conn = Mutable.make({none})
{ ~conn;
conncell=(Cell.make(conn, sr):Cell.cell(Mongo.sr,Mongo.srr));
- ~bufsize; ~log;
+ ~bufsize; ~close_socket; ~log; socket_link=Mutable.make(0);
seeds=[]; hosts=Mutable.make([]); name="";
primary=Mutable.make({none}); reconnect=Mutable.make({none});
reconnect_wait=2000; max_attempts=30; comms_timeout=3600000;
@@ -703,27 +733,37 @@ MongoDriver = {{
do match m.conn.get() with | {some=conn} -> Socket.close(conn) | {none} -> void
do m.conn.set({none})
do m.primary.set({none})
- match Socket.connect_with_err_cont(addr,port) with
- | {success=conn} ->
- do m.conn.set({some=conn})
- do m.primary.set({some=(addr,port)})
- {success=m}
- | {failure=str} -> {failure={Error="Got exception {str}"}}
+ if m.close_socket
+ then
+ do m.primary.set({some=(addr,port)})
+ {success=m}
+ else
+ match Socket.connect_with_err_cont(addr,port) with
+ | {success=conn} ->
+ do m.conn.set({some=conn})
+ do m.primary.set({some=(addr,port)})
+ {success=m}
+ | {failure=str} -> {failure={Error="Got exception {str}"}}
/**
* Convenience function, initialise and connect at the same time.
**/
- open(bufsize:int, addr:string, port:int, log:bool): outcome(Mongo.db,Mongo.failure) =
- connect(init(bufsize,log),addr,port)
+ open(bufsize:int, close_socket:bool, addr:string, port:int, log:bool): outcome(Mongo.db,Mongo.failure) =
+ connect(init(bufsize,close_socket,log),addr,port)
/**
* Close mongo connection.
**/
+ /* Note: this may happen asynchronously with other tasks so we
+ * have to be careful how we access the connection value.
+ */
close(m:Mongo.db): Mongo.db =
- do if Option.is_some(m.conn.get())
- then
- do stop(m)
- Socket.close(Option.get(m.conn.get()))
+ do match m.conn.get() with
+ | {some=conn} ->
+ do stop(m)
+ Socket.close(conn)
+ | {none} ->
+ void
do m.conn.set({none})
do m.primary.set({none})
m
View
8 stdlib/apis/mongo/replset.opa
@@ -150,15 +150,15 @@ MongoReplicaSet = {{
/**
* Initialize a [Mongo.db] connection using the given list of seeds.
**/
- init(name:string, bufsize:int, log:bool, seeds:list(Mongo.mongo_host)): Mongo.db =
- m = MongoDriver.init(bufsize, log)
+ init(name:string, bufsize:int, close_socket:bool, log:bool, seeds:list(Mongo.mongo_host)): Mongo.db =
+ m = MongoDriver.init(bufsize, close_socket, log)
{m with ~seeds; hosts=Mutable.make([]); ~name}
/**
* Initialize a [Mongo.db] connection using a single seed.
**/
- init_single(name:string, bufsize:int, log:bool, seed:Mongo.mongo_host): Mongo.db =
- init(name,bufsize,log,[seed])
+ init_single(name:string, bufsize:int, close_socket:bool, log:bool, seed:Mongo.mongo_host): Mongo.db =
+ init(name,bufsize,close_socket,log,[seed])
/**
* Generate a [Mongo.mongo_host] value from a string: "host:port".

0 comments on commit ee514b1

Please sign in to comment.
Something went wrong with that request. Please try again.