Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

replica sets

  • Loading branch information...
commit 1b66e5fbdfda87f66fe6aa4919bf714ae5c62e2c 1 parent 5837b7b
@SergejJurecko authored
Showing with 82 additions and 28 deletions.
  1. +9 −3 README.rdoc
  2. +8 −1 src/erlmongo.hrl
  3. +65 −24 src/mongodb.erl
View
12 README.rdoc
@@ -24,16 +24,22 @@ If your usage pattern is read heavy and want to use slaves for reading, connect
Always use an atom for naming connections. The connection process will register itself on that name.
Runtime connection API:
-mongodb:singleServer/1
-mongodb:singleServer/2
+mongodb:singleServer/1,2
mongodb:replicaPairs/3
+mongodb:replicaSets/2
mongodb:masterSlave/3
mongodb:connect/1
-Connection info is saved in an application variable (erlmongo,connections) and updated with every call to: singleServer, masterSlave, replicaPairs, masterMaster and deleteConnection. If mongodb process crashes for some reason, connections will be restarted. Also if the application itself is stopped and started. You can add a connections variable to erlmongo.app file:
+Connection info is saved in an application variable (erlmongo,connections) and updated with every call to: singleServer, masterSlave, replicaPairs, masterMaster, replicaSets, and deleteConnection. If mongodb process crashes for some reason, connections will be restarted. Also if the application itself is stopped and started. You can add a connections variable to erlmongo.app file:
% def is name of connection, types can be: masterSlave,masterMaster or replicaPairs. This is how singleServer info is saved:
{env, [{def,{masterSlave,{"localhost",27017},{"localhost",27017}}}]},
+= Replica sets
+ % List of servers does not have to be the entire list of the replica set.
+ % Erlmongo will read the primary server from them and connect to it (even if not in the list).
+ mongodb:replicaSets(repl,["127.0.0.1:30000","127.0.0.1:30001"]).
+ mongodb:connect(repl).
+
= Examples
make
erl
View
9 src/erlmongo.hrl
@@ -44,7 +44,14 @@ loop_fields(Tuple, [Field|T], DefRec, Props, N) ->
end;
loop_fields(Tuple, [], _, _, _) ->
Tuple.
-
+
+-ifdef(DEBUG).
+-define(DBG(Format, Args), io:format("L(~p:~p:~p:~p) : "++Format++"~n", [time(),self(),?MODULE,?LINE]++Args)).
+-define(DBG0(Format), io:format("L(~p:~p:~p:~p) : "++Format++"~n", [time(),self(),?MODULE,?LINE])).
+-else.
+-define(DBG(F,A),[]).
+-define(DBG0(F),[]).
+-endif.
% mongo
-define(QUER_OPT_NONE, 0).
View
89 src/mongodb.erl
@@ -2,12 +2,13 @@
-export([deser_prop/1,reload/0, print_info/0, start/0, stop/0, init/1, handle_call/3,
handle_cast/2, handle_info/2, terminate/2, code_change/3]).
% API
--export([connect/1, connect/2, is_connected/1,deleteConnection/1, singleServer/2, singleServer/1, masterSlave/3,masterMaster/3, replicaPairs/3,
- datetime_to_now/1]).
+-export([connect/1, connect/2, is_connected/1,deleteConnection/1, singleServer/2, singleServer/1,
+ masterSlave/3,masterMaster/3, replicaPairs/3, datetime_to_now/1,replicas/2]).
% Internal
-export([exec_cursor/3, exec_delete/3, exec_cmd/3, exec_insert/3, exec_find/3, exec_update/3, exec_getmore/3,
encoderec/1, encode_findrec/1, encoderec_selector/2, gen_keyname/2, gen_prop_keyname/2, rec/0, recoffset/1, recfields/1,
decoderec/2, encode/1, decode/1, ensureIndex/3, clearIndexCache/0, create_id/0, startgfs/1, dec2hex/2, hex2dec/2]).
+% -define(DEBUG,true).
-include_lib("erlmongo.hrl").
% -compile(export_all).
@@ -105,7 +106,10 @@ replicaPairs(Pool,Addr1,Addr2) ->
[IP1,Port1] = string:tokens(Addr1,":"),
[IP2,Port2] = string:tokens(Addr2,":"),
gen_server:cast(?MODULE, {conninfo,Pool, {replicaPairs, {IP1,Port1}, {IP2,Port2}}}).
-
+% Takes a list of "Address:Port"
+replicaSets(Pool,L) ->
+ LT = [list_to_tuple(string:tokens(S,":")) || S <- L],
+ gen_server:cast(?MODULE,{conninfo,Pool,{replicaSets,LT}}).
datetime_to_now(Loctime) ->
Secs = calendar:datetime_to_gregorian_seconds(Loctime) - 719528 * 24 * 60 * 60,
{Secs div 1000000, Secs rem 1000000,0}.
@@ -283,6 +287,7 @@ handle_cast({clear_indexcache}, P) ->
ets:delete_all_objects(P#mngd.indexes),
{noreply, P};
handle_cast({conninfo, Pool, Info}, P) ->
+ ?DBG("conninfo ~p", [{Pool,Info}]),
case get(Pool) of
undefined ->
put(Pool,#conn{conninfo = Info});
@@ -302,6 +307,7 @@ handle_cast({conninfo, Pool, Info}, P) ->
handle_cast({start_connection, Pool}, P) ->
handle_cast({start_connection,Pool,undefined}, P);
handle_cast({start_connection, Pool, CB}, P) ->
+ ?DBG("start_connection ~p ~p", [Pool, get(Pool)]),
case get(Pool) of
undefined ->
true;
@@ -364,6 +370,15 @@ start_connection(Name, #conn{conninfo = {replicaPairs, {A1,P1},{A2,P2}}} = P) -
Timer = ctimer(P#conn.timer)
end,
put(Name,P#conn{timer = Timer});
+start_connection(Name,#conn{conninfo = {replicaSets,L}} = P) ->
+ case P#conn.pid of
+ undefined ->
+ Timer = P#conn.timer,
+ [startcon(Name,undefined,ifmaster,A,Po) || {A,Po} <- L];
+ _ ->
+ Timer = ctimer(P#conn.timer)
+ end,
+ put(Name,P#conn{timer = Timer});
start_connection(_,_) ->
true.
@@ -393,6 +408,7 @@ conn_callback(P) ->
end.
handle_info({conn_established, Pool, readwrite, ConnProc}, P) ->
+ ?DBG("conn_established ~p", [Pool]),
case get(Pool) of
undefined ->
true;
@@ -403,8 +419,10 @@ handle_info({conn_established, Pool, readwrite, ConnProc}, P) ->
end,
{noreply, P};
handle_info({reconnect, Pool}, P) ->
+ ?DBG("reconnect ~p ~p", [Pool,get(Pool)]),
handle_cast({start_connection, Pool}, P);
handle_info({'EXIT', PID,W}, P) ->
+ ?DBG("conndied ~p ~p", [PID,get(get(PID))]),
% io:format("condied ~p~n", [{PID,_W}]),
case get(PID) of
undefined ->
@@ -431,25 +449,47 @@ handle_info({save_connections}, P) ->
{noreply, P};
handle_info({query_result, Src, <<_:20/binary, Res/binary>>}, P) ->
PI = get(get(Src)),
+ ?DBG("query_result ~p~n~p~n", [get(Src), PI]),
% io:format("~p~n", ["RES"]),
- try mongodb:decode(Res) of
- [[{<<"ismaster">>, 1}|_]] when element(1,PI#conn.conninfo) == replicaPairs, PI#conn.pid == undefined ->
- link(Src),
- % io:format("~p, registering ~p~n", ["Foundmaster", registered()]),
- conn_callback(PI#conn.cb),
- put(get(Src),PI#conn{pid = Src}),
- register(get(Src),Src),
- {noreply, P};
- _X ->
- % io:format("~p~n", [_X]),
- Src ! {stop},
- {noreply, P}
- catch
- error:_X ->
- % io:format("~p~n", [_X]),
- Src ! {stop},
- {noreply, P}
- end;
+ case catch mongodb:decode(Res) of
+ [Obj] ->
+ case proplists:get_value(<<"ismaster">>,Obj) of
+ % [[{<<"ismaster">>, 1}|_]] when element(1,PI#conn.conninfo) == replicaPairs, PI#conn.pid == undefined ->
+ Val when (Val == true orelse Val == 1) andalso
+ ((element(1,PI#conn.conninfo) == replicaPairs andalso PI#conn.pid == undefined) orelse
+ (element(1,PI#conn.conninfo) == replicaSets andalso PI#conn.pid == undefined)) ->
+ ?DBG("foundmaster ~p", [Src]),
+ link(Src),
+ conn_callback(PI#conn.cb),
+ put(get(Src),PI#conn{pid = Src, timer = ctimer(PI#conn.timer)}),
+ case whereis(get(Src)) of
+ undefined ->
+ register(get(Src),Src);
+ Src ->
+ ok;
+ Old ->
+ ?DBG("already registered proc ~p", [Old]),
+ unregister(get(Src)),
+ Old ! {stop},
+ register(get(Src),Src)
+ end;
+ false when (element(1,PI#conn.conninfo) == replicaSets andalso PI#conn.pid == undefined) ->
+ Src ! {stop},
+ case proplists:get_value(<<"primary">>,Obj) of
+ undefined ->
+ ok;
+ Prim ->
+ ?DBG("Connecting to primary ~p", [Prim]),
+ [Addr,Port] = string:tokens(binary_to_list(Prim),":"),
+ startcon(get(Src), undefined, ifmaster, Addr,Port)
+ end;
+ _X ->
+ Src ! {stop}
+ end;
+ _ ->
+ Src ! {stop}
+ end,
+ {noreply, P};
handle_info({query_result, Src, _}, P) ->
Src ! {stop},
{noreply, P};
@@ -458,8 +498,10 @@ handle_info(_X, P) ->
{noreply, P}.
-conndied(Name,_PID,P) ->
- put(Name, P#conn{pid = undefined, timer = timer(P#conn.timer, Name)}).
+conndied(Name,PID,P) when P#conn.pid == PID ->
+ put(Name, P#conn{pid = undefined, timer = timer(P#conn.timer, Name)});
+conndied(_,_,_) ->
+ ok.
terminate(_, _) ->
ok.
@@ -497,7 +539,6 @@ gfs_proc(#gfs_state{mode = write} = P, Buf) ->
<<FlushBin:FlSize/binary,Rem/binary>> = Buf,
gfs_proc(gfsflush(P, FlushBin, <<>>),Rem);
{close} ->
- io:format("closing~n"),
gfsflush(P#gfs_state{closed = true}, Buf, <<>>);
{'EXIT',_,_} ->
self() ! {close},
Please sign in to comment.
Something went wrong with that request. Please try again.