Skip to content

Commit

Permalink
mongodb:sharded/2 add multiple or just one mongos, will connect to ra…
Browse files Browse the repository at this point in the history
…ndom mongos in the list
  • Loading branch information
SergejJurecko committed Nov 30, 2010
1 parent 5903920 commit 2f5b223
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 14 deletions.
1 change: 1 addition & 0 deletions README.rdoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ mongodb:replicaPairs/3
mongodb:replicaSets/2
mongodb:masterSlave/3
mongodb:connect/1
mongodb:sharded/2

Connection info is saved in an application variable (erlmongo,connections) and updated with every call to: singleServer, masterSlave, replicaPairs, masterMaster, replicaSets, and deleteConnection. If mongodb process crashes for some reason, connections will be restarted. Also if the application itself is stopped and started. You can add a connections variable to erlmongo.app file:
% def is name of connection, types can be: masterSlave,masterMaster or replicaPairs. This is how singleServer info is saved:
Expand Down
2 changes: 1 addition & 1 deletion src/mongoapi.erl
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ findOne(Query) when is_tuple(Query) ->
R ->
R
end.
findOne(Col, [_|_] = Query, [_|_] = Selector) ->
findOne(Col, Query, Selector) ->
case find(Col, Query, Selector, 0, 1) of
{ok, [Res]} -> {ok, Res};
{ok, []} -> {ok, []};
Expand Down
53 changes: 40 additions & 13 deletions src/mongodb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
handle_cast/2, handle_info/2, terminate/2, code_change/3]).
% API
-export([connect/1, connect/2, is_connected/1,deleteConnection/1, singleServer/2, singleServer/1,
masterSlave/3,masterMaster/3, replicaPairs/3, datetime_to_now/1,replicaSets/2]).
masterSlave/3,masterMaster/3, replicaPairs/3, datetime_to_now/1,replicaSets/2,sharded/2]).
% Internal
-export([exec_cursor/3, exec_delete/3, exec_cmd/3, exec_insert/3, exec_find/3, exec_update/3, exec_getmore/3,
encoderec/1, encode_findrec/1, encoderec_selector/2, gen_keyname/2, gen_prop_keyname/2, rec/0, recoffset/1, recfields/1,
Expand Down Expand Up @@ -99,9 +99,15 @@ masterSlave(Pool,MasterAddr, SlaveAddr) ->
[IP2,Port2] = string:tokens(SlaveAddr,":"),
gen_server:cast(?MODULE, {conninfo,Pool, {masterSlave, {IP1,Port1}, {IP2,Port2}}}).
masterMaster(Pool,Addr1,Addr2) ->
[IP1,Port1] = string:tokens(Addr1,":"),
[IP2,Port2] = string:tokens(Addr2,":"),
gen_server:cast(?MODULE, {conninfo,Pool, {masterMaster, {IP1,Port1}, {IP2,Port2}}}).
% [IP1,Port1] = string:tokens(Addr1,":"),
% [IP2,Port2] = string:tokens(Addr2,":"),
% gen_server:cast(?MODULE, {conninfo,Pool, {masterMaster, {IP1,Port1}, {IP2,Port2}}}).
sharded(Pool,[Addr1,Addr2]).
sharded(Pool,[[_|_]|_] = L) ->
SL = [list_to_tuple(string:tokens(A,":")) || A <- L],
gen_server:cast(?MODULE, {conninfo,Pool, {multimaster, list_to_tuple(SL)}});
sharded(Pool,[_|_] = L) ->
sharded(Pool,[L]).
replicaPairs(Pool,Addr1,Addr2) ->
[IP1,Port1] = string:tokens(Addr1,":"),
[IP2,Port2] = string:tokens(Addr2,":"),
Expand Down Expand Up @@ -173,7 +179,7 @@ exec_find(Pool,Collection, Quer) ->
receive
{query_result, _Src, <<_:32,_CursorID:64/little, _From:32/little, _NDocs:32/little, Result/binary>>} ->
Result
after 20000 ->
after 200000 ->
not_connected
end;
X ->
Expand Down Expand Up @@ -269,7 +275,6 @@ startcon(Name, undefined, Type, Addr, Port) when is_list(Port) ->
startcon(Name, undefined, Type, Addr, Port) ->
PID = spawn_link(fun() -> connection(true) end),
put(PID,Name),
% register(Name,PID),
PID ! {start, Name, self(), Type, Addr, Port};
startcon(_,PID, _, _, _) ->
PID.
Expand Down Expand Up @@ -336,16 +341,27 @@ handle_cast({print_info}, P) ->
handle_cast(_, P) ->
{noreply, P}.

start_connection(Name, #conn{conninfo = {masterMaster, {A1,P1},{A2,P2}}} = P) ->
% start_connection(Name, #conn{conninfo = {masterMaster, {A1,P1},{A2,P2}}} = P) ->
% case P#conn.pid of
% undefined ->
% Timer = P#conn.timer,
% case random:uniform(2) of
% 1 ->
% startcon(Name,P#conn.pid,readwrite,A1,P1);
% 2 ->
% startcon(Name,P#conn.pid,readwrite,A2,P2)
% end;
% _ ->
% Timer = ctimer(P#conn.timer)
% end,
% put(Name,P#conn{timer = Timer});
start_connection(Name, #conn{conninfo = {multimaster, Srvrs}} = P) ->
case P#conn.pid of
undefined ->
Timer = P#conn.timer,
case random:uniform(2) of
1 ->
startcon(Name,P#conn.pid,readwrite,A1,P1);
2 ->
startcon(Name,P#conn.pid,readwrite,A2,P2)
end;
Index = random:uniform(tuple_size(Srvrs)),
{Adr,Port} = element(Index,Srvrs),
startcon(Name,P#conn.pid,readwrite,Adr,Port);
_ ->
Timer = ctimer(P#conn.timer)
end,
Expand Down Expand Up @@ -519,6 +535,8 @@ init([]) ->
{ok, HN} = inet:gethostname(),
<<HashedHN:3/binary,_/binary>> = erlang:md5(HN),
process_flag(trap_exit, true),
{A1,A2,A3} = now(),
random:seed(A1, A2, A3),
{ok, #mngd{indexes = ets:new(mongoIndexes, [set, private]), hashed_hostn = HashedHN}}.


Expand Down Expand Up @@ -717,11 +735,19 @@ connection(#con{} = P,Index,Buf) ->
{tcp, _, Bin} ->
% io:format("~p~n", [{byte_size(Bin), Buf}]),
connection(P,Index,readpacket(<<Buf/binary,Bin/binary>>));
{ping} ->
erlang:send_after(1000,self(),{ping}),
Collection = <<"admin.$cmd">>,
Query = #search{nskip = 0, ndocs = 1, criteria = mongodb:encode([{<<"ping">>, 1}])},
QBin = constr_query(Query,Index, Collection),
ok = gen_tcp:send(P#con.sock, QBin),
connection(P,Index+1,Buf);
{stop} ->
true;
{start, Pool, Source, Type, IP, Port} ->
{A1,A2,A3} = now(),
random:seed(A1, A2, A3),
% io:format("MDB ~p~n", [{Pool,IP,Port}]),
{ok, Sock} = gen_tcp:connect(IP, Port, [binary, {packet, 0}, {active, true}, {keepalive, true}]),
case Type of
ifmaster ->
Expand All @@ -737,6 +763,7 @@ connection(#con{} = P,Index,Buf) ->
register(Pool,self()),
Source ! {conn_established, Pool, Type, self()}
end,
erlang:send_after(1000,self(),{ping}),
connection(#con{sock = Sock},1, <<>>);
{tcp_closed, _} ->
exit(stop)
Expand Down

0 comments on commit 2f5b223

Please sign in to comment.