Permalink
Browse files

connnection send/call catches closed connection as well as network fa…

…ilures
  • Loading branch information...
1 parent aa1741b commit 445ebaf0fb8248f24262b65f3656ddd9c80a42a8 Tony Hannan committed May 18, 2011
Showing with 51 additions and 36 deletions.
  1. +3 −10 src/mongo_connect.erl
  2. +38 −17 src/mongo_replset.erl
  3. +10 −9 src/mongodb_tests.erl
View
@@ -4,7 +4,7 @@
-export_type ([host/0, connection/0, dbconnection/0, failure/0]).
-export ([host_port/1, read_host/1, show_host/1]).
--export ([connect/1, reconnect/1, conn_host/1, close/1, is_closed/1]).
+-export ([connect/1, conn_host/1, close/1, is_closed/1]).
-export ([call/3, send/2]). % for mongo_query and mongo_cursor
@@ -48,13 +48,6 @@ connect (Host) -> try mvar:create (fun () -> tcp_connect (host_port (Host)) end,
of VSocket -> {ok, {connection, host_port (Host), VSocket}}
catch Reason -> {error, Reason} end.
--spec reconnect (connection()) -> ok | {error, reason()}. % IO
-% Close current socket and create a new socket connected to same server. Error if fails
-reconnect ({connection, Host, VSocket}) -> try
- mvar:modify_ (VSocket, fun (Socket) -> gen_tcp:close (Socket), tcp_connect (host_port (Host)) end)
- of ok -> ok
- catch Reason -> {error, Reason} end.
-
-spec conn_host (connection()) -> host().
% Host this is connected to
conn_host ({connection, Host, _VSocket}) -> Host.
@@ -83,15 +76,15 @@ call ({Db, Conn = {connection, _Host, VSocket}}, Notices, Request) ->
of ReplyBin ->
{RequestId, Reply, <<>>} = mongo_protocol:get_reply (ReplyBin),
Reply % ^ ResponseTo must match RequestId
- catch Reason -> close (Conn), throw ({connection_failure, Conn, Reason}) end.
+ catch _:Reason -> close (Conn), throw ({connection_failure, Conn, Reason}) end.
-spec send (dbconnection(), [mongo_protocol:notice()]) -> ok. % IO throws failure()
% Asynchronous send (no reply). Don't know if send succeeded. Exclusive access to the connection during send.
send ({Db, Conn = {connection, _Host, VSocket}}, Notices) ->
{NoticesBin, _} = messages_binary (Db, Notices),
Send = fun (Socket) -> tcp_send (Socket, NoticesBin) end,
try mvar:with (VSocket, Send)
- catch Reason -> close (Conn), throw ({connection_failure, Conn, Reason}) end.
+ catch _:Reason -> close (Conn), throw ({connection_failure, Conn, Reason}) end.
-spec messages_binary (mongo_protocol:db(), [mongo_protocol:message()]) -> {binary(), mongo_protocol:requestid()}.
% Binary representation of messages
View
@@ -100,27 +100,48 @@ secondary_ok_conn (ReplConn, Hosts) -> try
-spec fetch_member_info (rs_connection()) -> member_info(). % EIO
% Retrieve isMaster info from a current known member in replica set. Update known list of members from fetched info.
-% TODO: close connections dropped from Dict
fetch_member_info (ReplConn = {rs_connection, _ReplName, VConns}) ->
- OldHosts = dict:fetch_keys (mvar:read (VConns)),
- {Conn, Info} = until_success (OldHosts, fun (Host) -> connect_member (ReplConn, Host) end),
- NewHosts = lists:map (fun mongo_connect:read_host/1, bson:at (hosts, Info)),
+ OldHosts_ = dict:fetch_keys (mvar:read (VConns)),
+ {Conn, Info} = until_success (OldHosts_, fun (Host) -> connect_member (ReplConn, Host) end),
+ OldHosts = sets:from_list (OldHosts_),
+ NewHosts = sets:from_list (lists:map (fun mongo_connect:read_host/1, bson:at (hosts, Info))),
mvar:modify_ (VConns, fun (Dict) ->
- MapHost = fun (Host) -> {Host, case dict:find (Host, Dict) of {ok, MConn} -> MConn; error -> {} end} end,
- dict:from_list (lists:map (MapHost, NewHosts)) end),
+ Dict1 = sets:fold (fun remove_host/2, Dict, sets:subtract (OldHosts, NewHosts)),
+ Dict2 = sets:fold (fun add_host/2, Dict1, sets:subtract (NewHosts, OldHosts)),
+ Dict2 end),
{Conn, Info}.
+add_host (Host, Dict) -> dict:store (Host, {}, Dict).
+
+remove_host (Host, Dict) ->
+ MConn = dict:fetch (Host, Dict),
+ Dict1 = dict:erase (Host, Dict),
+ case MConn of {Conn} -> mongo_connect:close (Conn); {} -> ok end,
+ Dict1.
+
-spec connect_member (rs_connection(), host()) -> member_info(). % EIO
% Connect to host and verify membership. Cache connection in rs_connection
-connect_member ({rs_connection, ReplName, VConns}, Host) -> mvar:modify (VConns, fun (Dict) ->
- Conn = case dict:find (Host, Dict) of
- {ok, {Con}} -> Con;
- _ -> case mongo_connect:connect (Host) of
- {ok, Con} -> Con;
- {error, Reason} -> throw ({cant_connect, Reason}) end end,
- Info = try mongo_query:command ({admin, Conn}, {isMaster, 1}, true) catch _ ->
- case mongo_connect:reconnect (Conn) of ok -> ok; {error, Reas} -> throw ({cant_connect, Reas}) end,
- mongo_query:command ({admin, Conn}, {isMaster, 1}, true) end,
+connect_member ({rs_connection, ReplName, VConns}, Host) ->
+ Conn = get_connection (VConns, Host),
+ Info = try get_member_info (Conn) catch _ ->
+ mongo_connect:close (Conn),
+ Conn1 = get_connection (VConns, Host),
+ get_member_info (Conn1) end,
case bson:at (setName, Info) of
- ReplName -> {dict:store (Host, {Conn}, Dict), {Conn, Info}};
- _ -> mongo_connect:close (Conn), throw ({not_member, Host, Info}) end end).
+ ReplName -> {Conn, Info};
+ _ ->
+ mongo_connect:close (Conn),
+ throw ({not_member, ReplName, Host, Info}) end.
+
+get_connection (VConns, Host) -> mvar:modify (VConns, fun (Dict) ->
+ case dict:find (Host, Dict) of
+ {ok, {Conn}} -> case mongo_connect:is_closed (Conn) of
+ false -> {Dict, Conn};
+ true -> new_connection (Dict, Host) end;
+ _ -> new_connection (Dict, Host) end end).
+
+new_connection (Dict, Host) -> case mongo_connect:connect (Host) of
+ {ok, Conn} -> {dict:store (Host, {Conn}, Dict), Conn};
+ {error, Reason} -> throw ({cant_connect, Reason}) end.
+
+get_member_info (Conn) -> mongo_query:command ({admin, Conn}, {isMaster, 1}, true).
View
@@ -6,9 +6,9 @@
-include_lib("eunit/include/eunit.hrl").
-include ("mongo_protocol.hrl").
--export ([test1/0, test2/0]).
+-export ([test/0, test_rs/0]).
-test1() -> eunit:test ({setup,
+test() -> eunit:test ({setup,
fun () -> application:start (mongodb),
io:format (user, "~n** Make sure mongod is running on 127.0.0.1:27017 **~n~n", []) end,
fun (_) -> application:stop (mongodb) end,
@@ -20,7 +20,7 @@ test1() -> eunit:test ({setup,
fun pool_test/0
]}).
-test2() -> eunit:test ({setup,
+test_rs() -> eunit:test ({setup,
fun () -> application:start (mongodb),
io:format (user, "~n** Make sure replica set is running on 127.0.0.1:27017 & 27018 **~n~n", []) end,
fun (_) -> application:stop (mongodb) end,
@@ -112,12 +112,12 @@ pool_test() ->
pool:close (Pool),
true = pool:is_closed (Pool).
-% Replica set named "rs1" must be running on 127.0.0.1:27017 & 27018
+% Replica set named "rs1" must be running on localhost:27017 & 27018
replset_test() -> % TODO: change from connect_test
- RS0 = mongo_replset:connect ({<<"rs0">>,["127.0.0.1"]}),
- {error, [{not_member, _, _} | _]} = mongo_replset:primary (RS0),
+ RS0 = mongo_replset:connect ({<<"rs0">>,[localhost]}),
+ {error, [{not_member, _, _, _} | _]} = mongo_replset:primary (RS0),
mongo_replset:close (RS0),
- RS1 = mongo_replset:connect ({<<"rs1">>,["127.0.0.1"]}),
+ RS1 = mongo_replset:connect ({<<"rs1">>,[localhost]}),
{ok, Conn} = mongo_replset:primary (RS1),
DbConn = {test, Conn},
Res = mongo_query:write (DbConn, #delete {collection = foo, selector = {}}, {}),
@@ -137,9 +137,9 @@ replset_test() -> % TODO: change from connect_test
mongo_replset:close (RS1),
true = mongo_replset:is_closed (RS1).
-% Replica set named "rs1" must be running on 127.0.0.1:27017 & 27018
+% Replica set named "rs1" must be running on localhost:27017 & 27018
mongo_rs_test() ->
- RsConn = mongo:rs_connect ({<<"rs0">>,["127.0.0.1"]}),
+ RsConn = mongo:rs_connect ({<<"rs1">>,[localhost]}),
{ok, {Teams1, Ids1}} = mongo:do (safe, master, RsConn, baseball, fun () ->
mongo:delete (team, {}),
Teams0 = [
@@ -150,6 +150,7 @@ mongo_rs_test() ->
Ids0 = mongo:insert_all (team, Teams0),
{Teams0, Ids0}
end),
+ timer:sleep (200),
mongo:do (safe, slave_ok, RsConn, baseball, fun () ->
4 = mongo:count (team, {}),
Teams = lists:zipwith (fun (Id, Team) -> bson:append ({'_id', Id}, Team) end, Ids1, Teams1),

0 comments on commit 445ebaf

Please sign in to comment.