Permalink
Browse files

is_closed. mongo: replica sets & pools

  • Loading branch information...
1 parent 7206361 commit b8020706f09e8f0f4e25bb0844628b53ffc7c343 Tony Hannan committed May 16, 2011
Showing with 95 additions and 36 deletions.
  1. +38 −3 src/mongo.erl
  2. +5 −1 src/mongo_connect.erl
  3. +5 −1 src/mongo_cursor.erl
  4. +24 −16 src/mongo_replset.erl
  5. +18 −14 src/mongodb_tests.erl
  6. +5 −1 src/mvar.erl
View
@@ -4,7 +4,10 @@
-export_type ([maybe/1]).
-export_type ([host/0, connection/0]).
--export ([connect/1, disconnect/1]).
+-export ([connect/1, disconnect/1, connection_factory/1]).
+
+-export_type ([replset/0, rs_connection/0]).
+-export ([rs_connect/1, rs_primary/1, rs_secondary_ok/1, rs_disconnect/1, rs_connection_factory/1]).
-export_type ([action/1, db/0, write_mode/0, read_mode/0, failure/0]).
-export ([do/5]).
@@ -30,10 +33,11 @@
-type reason() :: any().
+% Server %
+
-type host() :: mongo_connect:host().
% Hostname or ip address with or without port. Port defaults to 27017 when missing.
% Eg. "localhost" or {"localhost", 27017}
-
-type connection() :: mongo_connect:connection().
-spec connect (host()) -> {ok, connection()} | {error, reason()}. % IO
@@ -44,6 +48,37 @@ connect (Host) -> mongo_connect:connect (Host).
% Close connection to server
disconnect (Conn) -> mongo_connect:close (Conn).
+-spec connection_factory (host()) -> pool:factory(connection()).
+% Factory for use with a connection pool. See pool module.
+connection_factory (Host) -> {Host, fun connect/1, fun disconnect/1, fun mongo_connect:is_closed/1}.
+
+% Replica Set %
+
+-type replset() :: mongo_replset:replset().
+-type rs_connection() :: mongo_replset:rs_connection().
+
+-spec rs_connect (replset()) -> rs_connection(). % IO
+% Create new cache of connections to replica set members starting with seed members. No connection attempted until rs_primary or rs_secondary_ok called.
+rs_connect (Replset) -> mongo_replset:connect (Replset).
+
+-spec rs_primary (rs_connection()) -> {ok, connection()} | {error, reason()}. % IO
+% Return connection to current primary in replica set
+rs_primary (ReplsetConn) -> mongo_replset:primary (ReplsetConn).
+
+-spec rs_secondary_ok (rs_connection()) -> {ok, connection()} | {error, reason()}. % IO
+% Return connection to a current secondary in replica set or primary if none
+rs_secondary_ok (ReplsetConn) -> mongo_replset:secondary_ok (ReplsetConn).
+
+-spec rs_disconnect (rs_connection()) -> ok. % IO
+% Close cache of replset connections
+rs_disconnect (ReplsetConn) -> mongo_replset:close (ReplsetConn).
+
+-spec rs_connection_factory (replset()) -> pool:factory(rs_connection()).
+% Factory for use with a rs_connection pool. See pool module.
+rs_connection_factory (Replset) -> {Replset, fun (RS) -> RC = rs_connect (RS), {ok, RC} end, fun rs_disconnect/1, fun mongo_replset:is_closed/1}.
+
+% Action %
+
-type action(A) :: fun (() -> A).
% An Action does IO, reads process dict {mongo_action_context, #context{}}, and throws failure()
@@ -154,7 +189,7 @@ delete_one (Coll, Selector) ->
-type read_mode() :: master | slave_ok.
% Every query inside an action() will use this mode.
% master = Server must be master/primary so reads are consistent (read latest writes).
-% slave_ok = Server may be slave/secondary so reads are may not be consistent (may read stale data). But the slaves will eventually get the latest writes, so technically this is called eventually-consistent.
+% slave_ok = Server may be slave/secondary so reads may not be consistent (may read stale data). Slaves will eventually get the latest writes, so technically this is called eventually-consistent.
slave_ok (#context {read_mode = slave_ok}) -> true;
slave_ok (#context {read_mode = master}) -> false.
@@ -4,7 +4,7 @@
-export_type ([host/0, connection/0, dbconnection/0, failure/0]).
-export ([host_port/1, read_host/1, show_host/1]).
--export ([connect/1, reconnect/1, close/1, conn_host/1]).
+-export ([connect/1, reconnect/1, conn_host/1, close/1, is_closed/1]).
-export ([call/3, send/2]). % for mongo_query and mongo_cursor
@@ -62,6 +62,10 @@ conn_host ({Host, _VSocket}) -> Host.
% Close connection.
close ({_Host, VSocket}) -> mvar:terminate (VSocket).
+-spec is_closed (connection()) -> boolean(). % IO
+% Has connection been closed?
+is_closed ({_, VSocket}) -> mvar:is_terminated (VSocket).
+
-type dbconnection() :: {mongo_protocol:db(), connection()}.
-type failure() :: {connection_failure, connection(), reason()}.
@@ -5,7 +5,7 @@
-export_type ([maybe/1]).
-export_type ([cursor/0, expired/0]).
--export ([next/1, rest/1, close/1]). % API
+-export ([next/1, rest/1, close/1, is_closed/1]). % API
-export ([cursor/4]). % for mongo_query
-include ("mongo_protocol.hrl").
@@ -30,6 +30,10 @@ cursor (DbConn, Collection, BatchSize, Batch) ->
% Close cursor
close (Cursor) -> mvar:terminate (Cursor).
+-spec is_closed (cursor()) -> boolean(). % IO
+% Is cursor closed
+is_closed (Cursor) -> mvar:is_terminated (Cursor).
+
-spec rest (cursor()) -> [bson:document()]. % IO throws expired() & mongo_connect:failure()
% Return remaining documents in query result
rest (Cursor) -> case next (Cursor) of
View
@@ -1,11 +1,8 @@
%% Get connection to appropriate server in a replica set
-module (mongo_replset).
--export_type ([replset/0, replset_connection/0]).
--export ([connect/1, primary/1, secondary_ok/1]). % API
-
--export ([connect_member/2, fetch_member_info/1]).
--export ([until_success/2]).
+-export_type ([replset/0, rs_connection/0]).
+-export ([connect/1, primary/1, secondary_ok/1, close/1, is_closed/1]). % API
-type maybe(A) :: {A} | {}.
-type err_or(A) :: {ok, A} | {error, reason()}.
@@ -32,30 +29,30 @@ rotate (N, List) ->
-type host() :: mongo_connect:host().
-type connection() :: mongo_connect:connection().
--type replset() :: {replset_name(), [host()]}.
+-type replset() :: {rs_name(), [host()]}.
% Identify replset. Hosts is just seed list, not necessarily all hosts in replica set
--type replset_name() :: bson:utf8().
+-type rs_name() :: bson:utf8().
--spec connect (replset()) -> replset_connection(). % IO
+-spec connect (replset()) -> rs_connection(). % IO
% Create new cache of connections to replica set members starting with seed members. No connection attempted until primary or secondary_ok called.
connect ({ReplName, Hosts}) ->
Dict = dict:from_list (lists:map (fun (Host) -> {mongo_connect:host_port (Host), {}} end, Hosts)),
{ReplName, mvar:new (Dict)}.
--opaque replset_connection() :: {replset_name(), mvar:mvar(connections())}.
+-opaque rs_connection() :: {rs_name(), mvar:mvar(connections())}.
% Maintains set of connections to some if not all of the replica set members
-type connections() :: dict:dictionary (host(), maybe(connection())).
% All hosts listed in last member_info fetched are keys in dict. Value is {} if no attempt to connect to that host yet
--spec primary (replset_connection()) -> err_or(connection()). % IO
+-spec primary (rs_connection()) -> err_or(connection()). % IO
% Return connection to current primary in replica set
primary (ReplConn) -> try
MemberInfo = fetch_member_info (ReplConn),
primary_conn (2, ReplConn, MemberInfo)
of Conn -> {ok, Conn}
catch Reason -> {error, Reason} end.
--spec secondary_ok (replset_connection()) -> err_or([connection()]). % IO
+-spec secondary_ok (rs_connection()) -> err_or(connection()). % IO
% Return connection to a current secondary in replica set or primary if none
secondary_ok (ReplConn) -> try
{_Conn, Info} = fetch_member_info (ReplConn),
@@ -65,12 +62,23 @@ secondary_ok (ReplConn) -> try
of Conn -> {ok, Conn}
catch Reason -> {error, Reason} end.
+-spec close (rs_connection()) -> ok. % IO
+% Close replset connection
+close ({_, VConns}) ->
+ CloseConn = fun (_, MCon, _) -> case MCon of {Con} -> mongo_connect:close (Con); {} -> ok end end,
+ mvar:with (VConns, fun (Dict) -> dict:fold (CloseConn, ok, Dict) end),
+ mvar:terminate (VConns).
+
+-spec is_closed (rs_connection()) -> boolean(). % IO
+% Has replset connection been closed?
+is_closed ({_, VConns}) -> mvar:is_terminated (VConns).
+
% EIO = IO that may throw error of any type
-type member_info() :: {connection(), bson:document()}.
% Result of isMaster query on a server connnection. Returned fields are: setName, ismaster, secondary, hosts, [primary]. primary only present when ismaster = false
--spec primary_conn (integer(), replset_connection(), member_info()) -> connection(). % EIO
+-spec primary_conn (integer(), rs_connection(), member_info()) -> connection(). % EIO
% Return connection to primary designated in member_info. Only chase primary pointer N times.
primary_conn (0, _ReplConn, MemberInfo) -> throw ({false_primary, MemberInfo});
primary_conn (Tries, ReplConn, {Conn, Info}) -> case bson:at (ismaster, Info) of
@@ -81,15 +89,15 @@ primary_conn (Tries, ReplConn, {Conn, Info}) -> case bson:at (ismaster, Info) of
primary_conn (Tries - 1, ReplConn, MemberInfo);
{} -> throw ({no_primary, {Conn, Info}}) end end.
--spec secondary_ok_conn (replset_connection(), [host()]) -> connection(). % EIO
+-spec secondary_ok_conn (rs_connection(), [host()]) -> connection(). % EIO
% Return connection to a live secondaries in replica set, or primary if none
secondary_ok_conn (ReplConn, Hosts) -> try
until_success (Hosts, fun (Host) ->
{Conn, Info} = connect_member (ReplConn, Host),
case bson:at (secondary, Info) of true -> Conn; false -> throw (not_secondary) end end)
catch _ -> primary_conn (2, ReplConn, fetch_member_info (ReplConn)) end.
--spec fetch_member_info (replset_connection()) -> member_info(). % EIO
+-spec fetch_member_info (rs_connection()) -> member_info(). % EIO
% Retrieve isMaster info from a current known member in replica set. Update known list of members from fetched info.
% TODO: close connections dropped from Dict
fetch_member_info ({ReplName, VConns}) ->
@@ -101,8 +109,8 @@ fetch_member_info ({ReplName, VConns}) ->
dict:from_list (lists:map (MapHost, NewHosts)) end),
{Conn, Info}.
--spec connect_member (replset_connection(), host()) -> member_info(). % EIO
-% Connect to host and verify membership. Cache connection in replset_connection
+-spec connect_member (rs_connection(), host()) -> member_info(). % EIO
+% Connect to host and verify membership. Cache connection in rs_connection
connect_member ({ReplName, VConns}, Host) -> mvar:modify (VConns, fun (Dict) ->
Conn = case dict:find (Host, Dict) of
{ok, {Con}} -> Con;
View
@@ -16,8 +16,8 @@ test1() -> eunit:test ({setup,
fun var_finalize_test/0,
fun app_test/0,
fun connect_test/0,
- fun mongo_test/0
-%% fun pool_test/0
+ fun mongo_test/0,
+ fun pool_test/0
]}).
test2() -> eunit:test ({setup,
@@ -69,8 +69,10 @@ connect_test() ->
Doc1X = bson:update (text, <<"world!!">>, Doc1),
Cursor = mongo_query:find (DbConn, #'query' {collection = foo, selector = {}}),
[Doc0, Doc1X] = mongo_cursor:rest (Cursor),
+ true = mongo_cursor:is_closed (Cursor),
#reply {cursornotfound = true} = mongo_connect:call (DbConn, [], #getmore {collection = foo, cursorid = 2938725639}),
- mongo_connect:close (Conn).
+ mongo_connect:close (Conn),
+ true = mongo_connect:is_closed (Conn).
% Mongod server must be running on 127.0.0.1:27017
mongo_test() ->
@@ -98,20 +100,22 @@ mongo_test() ->
mongo:disconnect (Conn).
% Mongod server must be running on 127.0.0.1:27017
-%% pool_test() ->
-%% Pool = mongo_pool:new (2, {localhost, 27017}),
-%% Conn = mongo_pool:connect (Pool),
-%% Do = fun () ->
-%% mongo:do (safe, master, Conn, admin, fun () -> mongo:command ({listDatabases, 1}) end) end,
-%% {ok, Doc} = Do(),
-%% {_} = bson:lookup (databases, Doc),
-%% mongo_pool:close (Pool),
-%% {'EXIT', {noproc, _}} = (catch Do()).
+pool_test() ->
+ Pool = pool:new (mongo:connection_factory ({"127.0.0.1", 27017}), 2),
+ Do = fun (Conn) -> mongo:do (safe, master, Conn, admin, fun () -> mongo:command ({listDatabases, 1}) end) end,
+ lists:foreach (fun (_) ->
+ {ok, Conn} = pool:get (Pool),
+ {ok, Doc} = Do (Conn),
+ {_} = bson:lookup (databases, Doc) end,
+ lists:seq (1,8)),
+ pool:close (Pool),
+ true = pool:is_closed (Pool).
% Replica set named "rs1" must be running on 127.0.0.1:27017 & 27018
replset_test() -> % TODO: change from connect_test
RS0 = mongo_replset:connect ({<<"rs0">>,["127.0.0.1"]}),
{error, [{not_member, _, _} | _]} = mongo_replset:primary (RS0),
+ mongo_replset:close (RS0),
RS1 = mongo_replset:connect ({<<"rs1">>,["127.0.0.1"]}),
{ok, Conn} = mongo_replset:primary (RS1),
DbConn = {test, Conn},
@@ -125,9 +129,9 @@ replset_test() -> % TODO: change from connect_test
Doc1X = bson:update (text, <<"world!!">>, Doc1),
Cursor = mongo_query:find (DbConn, #'query' {collection = foo, selector = {}}),
[Doc0, Doc1X] = mongo_cursor:rest (Cursor),
- mongo_connect:close (Conn),
{ok, Conn2} = mongo_replset:secondary_ok (RS1),
DbConn2 = {test, Conn2},
Cursor2 = mongo_query:find (DbConn2, #'query' {collection = foo, selector = {}, slaveok = true}),
[Doc0, Doc1X] = mongo_cursor:rest (Cursor2),
- mongo_connect:close (Conn2).
+ mongo_replset:close (RS1),
+ true = mongo_replset:is_closed (RS1).
View
@@ -4,7 +4,7 @@
-export_type ([mvar/1]).
-export ([create/2, new/2, new/1]).
-export ([modify/2, modify_/2, with/2, read/1, write/2]).
--export ([terminate/1]).
+-export ([terminate/1, is_terminated/1]).
-behaviour (gen_server).
-export ([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@@ -61,6 +61,10 @@ write (Var, Value) -> modify (Var, fun (A) -> {Value, A} end).
% Terminate mvar. Its finalizer will be executed. Future accesses to this mvar will fail, although repeated termination is fine.
terminate (Var) -> catch gen_server:call (Var, stop), ok.
+-spec is_terminated (mvar(_)) -> boolean(). % IO
+% Has mvar been terminated?
+is_terminated (Var) -> not is_process_alive (Var).
+
% gen_server callbacks %
-type state(A) :: {A, finalizer(A)}.

0 comments on commit b802070

Please sign in to comment.