Skip to content

Commit

Permalink
indent level to 2
Browse files Browse the repository at this point in the history
  • Loading branch information
hayeah committed Apr 24, 2009
1 parent acf3c04 commit b09da05
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 252 deletions.
60 changes: 30 additions & 30 deletions piperl.erl
Expand Up @@ -26,69 +26,69 @@

-spec start() -> {'ok',pid()} | {'error',_}.
start() ->
gen_server:start(?MODULE,[],[]).
gen_server:start(?MODULE,[],[]).

-spec start({'local'|'global',atom()}) -> {'ok',pid()} | {'error',_}.
start(Name) ->
gen_server:start(Name,?MODULE,[],[]).
gen_server:start(Name,?MODULE,[],[]).

-spec open(atom() | pid(), atom(),exe(),hosts_spec()) -> 'ok' | {'error',_}.
open(Pid,Name,Exe,Hosts) ->
gen_server:call(Pid,{open,Name,Exe,Hosts}).
gen_server:call(Pid,{open,Name,Exe,Hosts}).

%close
%close

-spec find(atom() | pid(),atom()) -> pid() | not_found.
find(PiperlPid,Name) ->
gen_server:call(PiperlPid,{find,Name}).
gen_server:call(PiperlPid,{find,Name}).

-spec find_slaves(atom() | pid(),atom()) -> [pid()] | not_found.
find_slaves(PiperlPid,Name) ->
case piperl:find(PiperlPid,Name) of
not_found -> not_found;
Master -> piperl_master:get_slaves(Master)
end.
case piperl:find(PiperlPid,Name) of
not_found -> not_found;
Master -> piperl_master:get_slaves(Master)
end.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% gen_sever call backs

-spec init([non_neg_integer()]) -> {ok,piperl()}.
init([]) ->
{ok,#piperl{pipes=dict:new()}}.
{ok,#piperl{pipes=dict:new()}}.

%% gen_server API internal to piperl
handle_call({open,Name,Exe,Hosts},_From,S) ->
case find_master(S,Name) of
not_found ->
case piperl_master:start_link(Exe,Hosts) of
{ok,Pid} ->
Pipes2 = dict:store(Name,Pid,S#piperl.pipes),
{reply,ok,S#piperl{pipes=Pipes2}};
Err -> {reply,{error,Err},S}
end;
_ -> {reply,{error,pipe_exists,Name},S}
end;
case find_master(S,Name) of
not_found ->
case piperl_master:start_link(Exe,Hosts) of
{ok,Pid} ->
Pipes2 = dict:store(Name,Pid,S#piperl.pipes),
{reply,ok,S#piperl{pipes=Pipes2}};
Err -> {reply,{error,Err},S}
end;
_ -> {reply,{error,pipe_exists,Name},S}
end;
handle_call({find,Name},_From,S) ->
{reply,find_master(S,Name),S}.
{reply,find_master(S,Name),S}.

handle_cast(_,_) ->
exit(unimplemented).
exit(unimplemented).
handle_info(_,_) ->
exit(unimplemented).
exit(unimplemented).

terminate(Reason,_S) ->
exit(Reason).
exit(Reason).

code_change(_OldVsn,S,_Extra) ->
{ok,S}.
{ok,S}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% internal functions

-spec find_master(piperl(),atom()) -> pid() | 'not_found'.
find_master(S,Name) ->
case dict:find(Name,S#piperl.pipes) of
{ok,Pid} -> Pid;
error -> not_found
end.
case dict:find(Name,S#piperl.pipes) of
{ok,Pid} -> Pid;
error -> not_found
end.

48 changes: 24 additions & 24 deletions piperl_client.erl
Expand Up @@ -27,55 +27,55 @@

-spec start_link(atom() | pid()) -> {ok,pid()}.
start_link(PiperlPid) when is_pid(PiperlPid) ->
gen_server:start_link(?MODULE,[PiperlPid],[]).
gen_server:start_link(?MODULE,[PiperlPid],[]).

-spec send(pid(),atom(),msg()) -> 'ok' | {'error',_}.
send(ClientPid,PipeName,Msg)
when is_pid(ClientPid), is_atom(PipeName), is_record(Msg,msg) ->
%% this call is actually asychronous. But I want
%% gen_server to report error when ClientPid is gone.
gen_server:call(ClientPid,{send,PipeName,Msg}).
%% this call is actually asychronous. But I want
%% gen_server to report error when ClientPid is gone.
gen_server:call(ClientPid,{send,PipeName,Msg}).

%% sync()

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% gen_server
-spec init(_) -> {'ok',client()}.
init([PiperlPid]) ->
{ok,#client{piperl=PiperlPid}}.
{ok,#client{piperl=PiperlPid}}.

handle_call({send,PipeName,Msg},_From,S) ->
{Slave,S2} = choose_slave(S,PipeName),
%% assign message sequence number as token if none is used.
piperl_slave:send(Slave,Msg),
{reply,ok,S2}.
{Slave,S2} = choose_slave(S,PipeName),
%% assign message sequence number as token if none is used.
piperl_slave:send(Slave,Msg),
{reply,ok,S2}.

handle_cast(_,_) ->
exit(undefined).
exit(undefined).

handle_info({slave_out,Msg=#msg{handler=Handler}},S) ->
Handler ! Msg,
{noreply,S}.
Handler ! Msg,
{noreply,S}.

terminate(Reason,_S) ->
exit(Reason).
exit(Reason).

code_change(_OldVsn,S,_Extra) ->
{ok,S}.
{ok,S}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% internal functions

%% round robin
-spec choose_slave(client(),atom()) -> {pid(),client()}.
choose_slave(S,PipeName) ->
case dict:find(PipeName,S#client.slavess) of
{ok,{N,Slaves}} ->
N2 = (N rem size(Slaves))+1,
NewDict = dict:store(PipeName,{N2,Slaves},S#client.slavess),
{element(N2,Slaves),S#client{slavess=NewDict}};
error ->
Slaves = list_to_tuple(piperl:find_slaves(S#client.piperl,PipeName)),
NewDict = dict:store(PipeName,{1,Slaves},S#client.slavess),
{element(1,Slaves),S#client{slavess=NewDict}}
end.
case dict:find(PipeName,S#client.slavess) of
{ok,{N,Slaves}} ->
N2 = (N rem size(Slaves))+1,
NewDict = dict:store(PipeName,{N2,Slaves},S#client.slavess),
{element(N2,Slaves),S#client{slavess=NewDict}};
error ->
Slaves = list_to_tuple(piperl:find_slaves(S#client.piperl,PipeName)),
NewDict = dict:store(PipeName,{1,Slaves},S#client.slavess),
{element(1,Slaves),S#client{slavess=NewDict}}
end.
56 changes: 28 additions & 28 deletions piperl_master.erl
Expand Up @@ -22,54 +22,54 @@

-spec start_link(#exe{}) -> {'ok',pid()}.
start_link(Exe) ->
start_link(Exe,[{node(),1}]).
start_link(Exe,[{node(),1}]).
%% don't expose the second head until remote spawn is supported
-spec start_link(#exe{},hosts_spec()) -> {'ok',pid()}.
start_link(Exe,Hosts) ->
Hosts2 =
[case Node of
{Name,Num} when is_atom(Name), is_integer(Num),
Name == node() ->
Node;
Name when is_atom(Name),
Hosts2 =
[case Node of
{Name,Num} when is_atom(Name), is_integer(Num),
Name == node() ->
{Name,1}
end || Node <- Hosts],
gen_server:start_link(?MODULE,[{Exe,Hosts2}],[]).
Node;
Name when is_atom(Name),
Name == node() ->
{Name,1}
end || Node <- Hosts],
gen_server:start_link(?MODULE,[{Exe,Hosts2}],[]).

-spec get_slaves(pid()) -> [pid()].
get_slaves(MasterPid) ->
gen_server:call(MasterPid,get_slaves).
gen_server:call(MasterPid,get_slaves).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% callbacks

-spec init([{#exe{},[{node(),non_neg_integer()}]}]) -> {'ok',master()}.
init([{Exe,Hosts}]) ->
Slaves =
%% TODO generalize this for slave spawning on remote node
[{Node,
case piperl_slave:start_link(Exe) of
{ok,Pid} -> Pid;
{error,E} -> erlang:error(E)
end}
|| {Node,Num} <- Hosts,
_ <- lists:seq(1,Num)],
{ok,#master{exe=Exe,hosts=Hosts,slaves=Slaves}}.
Slaves =
%% TODO generalize this for slave spawning on remote node
[{Node,
case piperl_slave:start_link(Exe) of
{ok,Pid} -> Pid;
{error,E} -> erlang:error(E)
end}
|| {Node,Num} <- Hosts,
_ <- lists:seq(1,Num)],
{ok,#master{exe=Exe,hosts=Hosts,slaves=Slaves}}.

handle_cast(_Arg,_M) ->
exit(undefined).
exit(undefined).

handle_info(_,_M) ->
exit(undefined).
exit(undefined).

handle_call(get_slaves,_From,M) ->
{reply,
[Pid || {_Node,Pid} <- M#master.slaves],
M}.
{reply,
[Pid || {_Node,Pid} <- M#master.slaves],
M}.

terminate(Reason,_M) ->
exit(Reason).
exit(Reason).

code_change(_OldVsn,M,_Extra) ->
{ok,M}.
{ok,M}.
50 changes: 25 additions & 25 deletions piperl_slave.erl
Expand Up @@ -20,55 +20,55 @@

-spec start_link(#exe{}) -> {'ok',pid()} | {'error',_}.
start_link(Exe=#exe{}) ->
gen_server:start_link(?MODULE,[Exe],[]).
gen_server:start_link(?MODULE,[Exe],[]).

send(SlavePid,Msg) when is_record(Msg,msg) ->
gen_server:cast(SlavePid,{slave_in,Msg}).
gen_server:cast(SlavePid,{slave_in,Msg}).


%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% gen_server callbacks

init([Exe]) ->
%% spawned unix process should exit on broken pipe
Port = open_port({spawn,Exe#exe.bin},[stream,binary]),
{ok,#slave{port=Port,pid=self()}}.
%% spawned unix process should exit on broken pipe
Port = open_port({spawn,Exe#exe.bin},[stream,binary]),
{ok,#slave{port=Port,pid=self()}}.

handle_cast({slave_in,Msg=#msg{data=Data,handler=Handler}},S) ->
write(S,Data),
R = read(S), %% blocks
Handler ! {slave_out,Msg#msg{data=R}},
{noreply,S}.
write(S,Data),
R = read(S), %% blocks
Handler ! {slave_out,Msg#msg{data=R}},
{noreply,S}.

handle_call(_,_,_) ->
exit(undefined).
exit(undefined).

handle_info(_,_) ->
exit(undefined).
exit(undefined).

terminate(Reason,S) ->
port_close(S#slave.port),
exit(Reason).
port_close(S#slave.port),
exit(Reason).

code_change(_OldVsn,S,_Extra) ->
{ok,S}.
{ok,S}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Internal Functions

write(S,Data) when is_list(Data) or is_binary(Data) ->
Bin = iolist_to_binary(Data),
erlang:port_command(S#slave.port,ubf:encode(Bin)).
Bin = iolist_to_binary(Data),
erlang:port_command(S#slave.port,ubf:encode(Bin)).

read(S) ->
case piperl_util:decode_ubf_stream(fun () -> receive_bin(S) end) of
{Data,[]} -> Data;
{Data,LeftOver} -> erlang:error({left_over,Data,LeftOver})
end.
case piperl_util:decode_ubf_stream(fun () -> receive_bin(S) end) of
{Data,[]} -> Data;
{Data,LeftOver} -> erlang:error({left_over,Data,LeftOver})
end.

receive_bin(S) ->
Port = S#slave.port,
receive
{Port,{data,Bin}} -> Bin
after S#slave.timeout -> erlang:error("timeout")
end.
Port = S#slave.port,
receive
{Port,{data,Bin}} -> Bin
after S#slave.timeout -> erlang:error("timeout")
end.

0 comments on commit b09da05

Please sign in to comment.