Permalink
Browse files

[fix] stdlib: Fixed some problems with reconnect in MongoDB.

  • Loading branch information...
1 parent 644f674 commit ad5eb18d543c40aea87850461249a93562d93584 @nrs135 nrs135 committed with OpaOnWindowsNow Jan 27, 2012
Showing with 56 additions and 21 deletions.
  1. +19 −13 libbase/mongo.ml
  2. +3 −0 stdlib/apis/mongo/bson.opa
  3. +13 −0 stdlib/apis/mongo/common.opa
  4. +20 −8 stdlib/apis/mongo/mongo.opa
  5. +1 −0 stdlib/apis/mongo/replset.opa
View
@@ -419,19 +419,25 @@ let reply_document_pos (b,s,l) n =
in
aux 0 (s+32)
-let string_of_message_buf msg =
- (string_of_MsgHeader msg)^
- (match header_opCode msg with
- | c when c = _OP_REPLY -> string_of_reply msg
- | c when c = _OP_MSG -> string_of_msg msg
- | c when c = _OP_UPDATE -> string_of_update msg
- | c when c = _OP_INSERT -> string_of_insert msg
- | c when c = _RESERVED -> " reserved"
- | c when c = _OP_QUERY -> string_of_query msg
- | c when c = _OP_GET_MORE -> string_of_get_more msg
- | c when c = _OP_DELETE -> string_of_delete msg
- | c when c = _OP_KILL_CURSORS -> string_of_kill_cursors msg
- | c -> Printf.sprintf " unknown (%d)" c)
+let rec string_of_message_buf msg =
+ let len = header_messageLength msg in
+ let str1 = (string_of_MsgHeader msg)^
+ (match header_opCode msg with
+ | c when c = _OP_REPLY -> string_of_reply msg
+ | c when c = _OP_MSG -> string_of_msg msg
+ | c when c = _OP_UPDATE -> string_of_update msg
+ | c when c = _OP_INSERT -> string_of_insert msg
+ | c when c = _RESERVED -> " reserved"
+ | c when c = _OP_QUERY -> string_of_query msg
+ | c when c = _OP_GET_MORE -> string_of_get_more msg
+ | c when c = _OP_DELETE -> string_of_delete msg
+ | c when c = _OP_KILL_CURSORS -> string_of_kill_cursors msg
+ | c -> Printf.sprintf " unknown (%d)" c)
+ in
+ let blen = Buf.length msg in
+ if blen <= len
+ then str1
+ else str1^"\n"^(string_of_message_buf (Buf.of_string (Buf.sub msg len (blen - len))))
let string_of_message_str str = string_of_message_buf (Buf.of_string str)
@@ -539,6 +539,9 @@ Bson = {{
(match find_int(doc, "errno") with {some=errno} -> errno != 0 | {none} -> false) ||
(match find_string(doc, "errmsg") with {some=errmsg} -> errmsg != "" | {none} -> false)
+ is_not_master(doc:Bson.document): bool =
+ match find_string(doc, "err") with {some=err} -> err == "not master" | {none} -> false
+
/**
* Same as is_error but for a [Mongo.error] type.
**/
@@ -218,6 +218,7 @@ MongoCommon = {{
/** Predicate for error status of a [Mongo.result] value. **/
is_error(result:Mongo.result): bool = outcome_map(result, Bson.is_error, (_ -> true))
+ is_not_master(result:Mongo.result): bool = outcome_map(result, Bson.is_not_master, (_ -> false))
/** Predicate for error status of a [Mongo.result] value. **/
isError(error:Mongo.error): bool = outcome_map(error, Bson.isError, (_ -> true))
@@ -472,6 +473,18 @@ MongoCommon = {{
| {none} -> failErr("{from}: no document in reply"))
| {none} -> failErr("{from}: no reply")
+ reply_is_not_master(reply_opt: option(Mongo.reply)): bool =
+ match reply_opt with
+ | {some=reply} ->
+ numberReturned = reply_numberReturned(reply)
+ if numberReturned != 1
+ then false
+ else
+ (match reply_document(reply,0) with
+ | {some=doc} -> Bson.is_not_master(doc)
+ | {none} -> false)
+ | {none} -> false
+
/**
* Extract all documents from a reply.
*
@@ -234,6 +234,14 @@ MongoDriver = {{
| {stopresult} -> void
| _ -> @fail
+/*
+ @private
+ print_mbuf(txt,mbuf) =
+ (str, len) = export_(mbuf)
+ s = String.substring(0,len,str)
+ ML.debug(txt,"\n{string_of_message(s)}",void)
+*/
+
@private
send_no_reply_(m,mbuf,name,reply_expected): bool =
match m.conn with
@@ -324,12 +332,12 @@ MongoDriver = {{
@private
sr_swr(m,mbuf,name) =
swr = send_with_reply(m,mbuf,name)
- if Option.is_some(swr) then {sndrcvresult=swr} else {reconnect}
+ if Option.is_some(swr) && not(MongoCommon.reply_is_not_master(swr)) then {sndrcvresult=swr} else {reconnect}
@private
sr_swe(m,mbuf,name,ns) =
swe = send_with_error(m,mbuf,name,ns)
- if Option.is_some(swe) then {snderrresult=swe} else {reconnect}
+ if Option.is_some(swe) && not(MongoCommon.reply_is_not_master(swe)) then {snderrresult=swe} else {reconnect}
@private
srpool(m:Mongo.db,msg:Mongo.sr): Mongo.srr =
@@ -351,10 +359,11 @@ MongoDriver = {{
@private
snd(m,mbuf,name) =
recon() =
+ mbuf2 = copy_(mbuf)
if reconnect("send_no_reply",m)
then
- do mongo_buf_refresh_requestId(mbuf) // Probably not necessary but we don't want unnecessary confusion
- snd(m,mbuf,name)
+ do mongo_buf_refresh_requestId(mbuf2) // Probably not necessary but we don't want unnecessary confusion
+ snd(m,mbuf2,name)
else ML.fatal("MongoDriver.snd({name}):","comms error (Can't reconnect)",-1)
srr = srpool(m,{send=((m,mbuf,name))})
match srr with
@@ -365,10 +374,11 @@ MongoDriver = {{
@private
sndrcv(m,mbuf,name) =
recon() =
+ mbuf2 = copy_(mbuf)
if reconnect("send_with_reply",m)
then
- do mongo_buf_refresh_requestId(mbuf)
- sndrcv(m,mbuf,name)
+ do mongo_buf_refresh_requestId(mbuf2)
+ sndrcv(m,mbuf2,name)
else ML.fatal("MongoDriver.sndrcv({name}):","comms error (Can't reconnect)",-1)
srr = srpool(m,{sendrecv=((m,mbuf,name))})
match srr with
@@ -379,10 +389,11 @@ MongoDriver = {{
@private
snderr(m,mbuf,name,ns) =
recon() =
+ mbuf2 = copy_(mbuf)
if reconnect("send_with_error",m)
then
- do mongo_buf_refresh_requestId(mbuf)
- snderr(m,mbuf,name,ns)
+ do mongo_buf_refresh_requestId(mbuf2)
+ snderr(m,mbuf2,name,ns)
else ML.fatal("MongoDriver.snderr({name}):","comms error (Can't reconnect)",-1)
srr = srpool(m,{senderror=((m,mbuf,name,ns))})
match srr with
@@ -437,6 +448,7 @@ MongoDriver = {{
do if m.log then ML.info("MongoDriver.force_reconnect","{SocketPool.gethost(m.pool)}",void)
if not(reconnect("force_reconnect",m))
then ML.fatal("MongoDriver.force_reconnect:","comms error (Can't reconnect)",-1)
+ else void
else
ML.warning("MongoDriver.force_reconnect","connection is not reconnectable",void)
@@ -44,6 +44,7 @@ type Mongo.replSetGetStatus =
set : string;
date : Date.date;
myState : Bson.int32;
+ syncingTo : Bson.register(string);
members : list({_id : Bson.register(int);
name : string;
self : Bson.register(bool);

0 comments on commit ad5eb18

Please sign in to comment.