Skip to content

Commit

Permalink
manages slaves with supervisor
Browse files Browse the repository at this point in the history
  • Loading branch information
hayeah committed May 2, 2009
1 parent b09da05 commit 7440fe2
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 47 deletions.
11 changes: 6 additions & 5 deletions piperl.hrl
Expand Up @@ -18,20 +18,21 @@

-record(master,
{exe,
%% remote hosts available to spawn slaves
%%%% specifies how many slaves are allowed on a node
supervisor,
hosts,
%% spawned slaves
slaves
clients=ordsets:new() % all clients that used this pipe.
}).

-type master() :: #master{exe::exe(),
hosts::[{node(),non_neg_integer()}],
slaves::[{node(),pid()}]}.
supervisor::pid()
}.

-record(slave,
{id, % unix pid
pid, % erlang pid
port,
seq, % master's sequence number
timeout=5000
}).

Expand Down
52 changes: 30 additions & 22 deletions piperl_master.erl
@@ -1,3 +1,6 @@
% master starts a supervisor that starts and
% restarts slaves. Master and supervisor are
% linked (one dies, both die).
-module(piperl_master).
-include("piperl.hrl").

Expand All @@ -20,19 +23,17 @@
%% API
-type hosts_spec() :: [{node(),non_neg_integer()} | node()].

-spec start_link(#exe{}) -> {'ok',pid()}.
-spec start_link(#exe{}) -> {'ok',pid()} | {'error',_}.
start_link(Exe) ->
start_link(Exe,[{node(),1}]).
%% don't expose the second head until remote spawn is supported
-spec start_link(#exe{},hosts_spec()) -> {'ok',pid()}.
-spec start_link(#exe{},hosts_spec()) -> {'ok',pid()} | {'error',_}.
start_link(Exe,Hosts) ->
Hosts2 =
[case Node of
{Name,Num} when is_atom(Name), is_integer(Num),
Name == node() ->
{Name,Num} when is_atom(Name), is_integer(Num) ->
Node;
Name when is_atom(Name),
Name == node() ->
Name when is_atom(Name) ->
{Name,1}
end || Node <- Hosts],
gen_server:start_link(?MODULE,[{Exe,Hosts2}],[]).
Expand All @@ -46,30 +47,37 @@ get_slaves(MasterPid) ->

-spec init([{#exe{},[{node(),non_neg_integer()}]}]) -> {'ok',master()}.
init([{Exe,Hosts}]) ->
Slaves =
%% TODO generalize this for slave spawning on remote node
[{Node,
case piperl_slave:start_link(Exe) of
{ok,Pid} -> Pid;
{error,E} -> erlang:error(E)
end}
|| {Node,Num} <- Hosts,
_ <- lists:seq(1,Num)],
{ok,#master{exe=Exe,hosts=Hosts,slaves=Slaves}}.
{ok,Supervisor} = piperl_slaves_supervisor:start_link(),
M = #master{exe=Exe,hosts=Hosts,supervisor=Supervisor},
_ = [{Node,
case supervisor:start_child(Supervisor,[Node,Exe]) of
{ok,_Slave} -> ok;
{ok,_Slave,_Info} -> ok;
{error,E} -> erlang:error(E)
end}
|| {Node,Num} <- Hosts,
_ <- lists:seq(1,Num)],
{ok,M}.

handle_cast(_Arg,_M) ->
exit(undefined).

handle_info(_,_M) ->
handle_info(_,_) ->
exit(undefined).

handle_call(get_slaves,_From,M) ->
{reply,
[Pid || {_Node,Pid} <- M#master.slaves],
M}.
handle_call(get_slaves,{Pid,_Tag},M) ->
Clients2 = ordsets:add_element(Pid,M#master.clients),
M2 = M#master{clients=Clients2},
{reply, supervisor_slaves(M2), M2}.

terminate(Reason,_M) ->
exit(Reason).

code_change(_OldVsn,M,_Extra) ->
{ok,M}.

supervisor_slaves(M) ->
[Slave || {_,Slave,_,_} <- supervisor:which_children(M#master.supervisor)].

%% TODO master only needs to notify clients when new instances are added.
%% %% client notices dead processes (with monitor)
%% notify_changes(M,Clients)
6 changes: 4 additions & 2 deletions piperl_slave.erl
Expand Up @@ -58,6 +58,7 @@ code_change(_OldVsn,S,_Extra) ->

write(S,Data) when is_list(Data) or is_binary(Data) ->
Bin = iolist_to_binary(Data),
%% data disappears if port is closed (which is fine)
erlang:port_command(S#slave.port,ubf:encode(Bin)).

read(S) ->
Expand All @@ -69,6 +70,7 @@ read(S) ->
receive_bin(S) ->
Port = S#slave.port,
receive
{Port,{data,Bin}} -> Bin
after S#slave.timeout -> erlang:error("timeout")
{Port,{data,Bin}} -> Bin;
{Port,closed} -> erlang:error(port_closed)
after S#slave.timeout -> erlang:error(timeout)
end.
60 changes: 42 additions & 18 deletions piperl_test.erl
Expand Up @@ -15,19 +15,42 @@ test() ->

piperl_slave_test() ->
{ok,Slave}=piperl_slave:start_link(echo_exe()),
Msg = echo_msg(),
Bin = <<"slave test">>,
Msg = echo_msg(Bin),
piperl_slave:send(Slave,Msg),
piperl_slave:send(Slave,Msg),
piperl_slave:send(Slave,Msg),
Rs = get_msgs(),
?assertEqual(3, length(Rs)),
lists:foreach(fun ({_From,Bin}) -> ?assertEqual(Msg#msg.data,Bin) end, Rs).
[begin ?assertMatch(Bin,Bin2) end || {_From,Bin2} <- get_msgs()],
shutdown(Slave).

piperl_master_test() ->
% start three slave instances
%% start three slave instances
{ok,Master} = piperl_master:start_link(echo_exe(),[{node(),3}]),
Slaves = piperl_master:get_slaves(Master),
msg_slaves(Slaves).
?assertEqual(3,length(Slaves)),
%% kill all slaves and wait for supervisor to restart them
%%
%% we expect the slaves to be different erlang
%% pids, and the results returned from different
%% unix pids.
UPids = [UPid || {UPid,_} <- msg_slaves(Slaves,<<"master test 1">>)],
?assertEqual(3,length(UPids)),
[exit(Slave,die_die_die) || Slave <- Slaves],
%% let supervisor restart processes
timer:sleep(100),
Slaves2 = piperl_master:get_slaves(Master),
?assertEqual(3,length(Slaves2)),
?assert(ordsets:is_disjoint(
ordsets:from_list(Slaves),
ordsets:from_list(Slaves2))),
UPids2 = [UPid || {UPid,_} <- msg_slaves(Slaves2,<<"master test 2">>)],
?assertEqual(3,length(UPids2)),
?assert(ordsets:is_disjoint(
ordsets:from_list(UPids),
ordsets:from_list(UPids2))),
shutdown(Master).

piperl_test() ->
{ok,Piperl} = piperl:start(),
Expand All @@ -36,7 +59,8 @@ piperl_test() ->
not_found = piperl:find_slaves(Piperl,not_echo),
Slaves = piperl:find_slaves(Piperl,echo),
?assertEqual(3,length(Slaves)),
msg_slaves(Slaves).
?assertEqual(3,length(msg_slaves(Slaves,<<"piperl test">>))),
shutdown(Piperl).

piperl_client_test() ->
{ok,Piperl} = piperl:start(),
Expand All @@ -47,7 +71,8 @@ piperl_client_test() ->
piperl_client:send(Client,echo,echo_msg(Bin)),
piperl_client:send(Client,echo,echo_msg(Bin)),
piperl_client:send(Client,echo,echo_msg(Bin)),
[begin ?assertMatch(Bin,Bin2) end || {_From,Bin2} <- get_msgs()].
[begin ?assertMatch(Bin,Bin2) end || {_From,Bin2} <- get_msgs()],
shutdown(Piperl).

piperl_tcp_server_test() ->
{ok,Piperl} = piperl:start(),
Expand All @@ -63,18 +88,13 @@ piperl_tcp_server_test() ->
fun () -> {ok,TBin} = gen_tcp:recv(Socket,0), TBin end,
Excess),
?assertMatch({_From,<<"tcp_msg">>},parse_msg(Bin)),
?assertMatch({_From,<<"tcp_msg2">>},parse_msg(Bin2)).
?assertMatch({_From,<<"tcp_msg2">>},parse_msg(Bin2)),
shutdown(Piperl).

msg_slaves(Slaves) when is_list(Slaves) ->
Msg = echo_msg(),
[begin
piperl_slave:send(Slave,Msg)
end
|| Slave <- Slaves],
Rs = get_msgs(),
NMsgs = length(Slaves),
?assertEqual(NMsgs,length(Rs)),
[?assertEqual(Msg#msg.data,Bin) || {_From,Bin} <- Rs].
msg_slaves(Slaves,Bin) when is_list(Slaves) ->
Msg = echo_msg(Bin),
[piperl_slave:send(Slave,Msg) || Slave <- Slaves],
get_msgs().

get_msgs() ->
timer:sleep(100),
Expand All @@ -94,7 +114,7 @@ get_msg() ->

parse_msg(Bin) ->
{match,[From,Data]} = re:run(Bin,"(\\d+)==(.*)",[{capture,[1,2],binary}]),
{From,Data}.
{list_to_integer(binary_to_list(From)),Data}.


echo_exe() ->
Expand All @@ -104,3 +124,7 @@ echo_msg() ->
echo_msg(<<"echo msg">>).
echo_msg(Bin) ->
#msg{handler=self(),data=Bin}.

shutdown(Pid) ->
unlink(Pid),
exit(Pid,shutdown).

0 comments on commit 7440fe2

Please sign in to comment.