Skip to content
Browse files

Source for the TDD hands on project now on github

  • Loading branch information...
0 parents commit a442260a8e3319724d0ab51eac6226af713748a9 Gianfranco committed
12 Makefile
@@ -0,0 +1,12 @@
+all:
+ erlc -pa . -o ebin/ src/*.erl test/*.erl
+
+test: all
+ erl -pa ebin/ -eval 'eunit:test(wn_resource_layer,[verbose]), init:stop().'
+ erl -pa ebin/ -eval 'eunit:test(wn_file_layer,[verbose]), init:stop().'
+ erl -pa ebin/ -eval 'eunit:test(wn_job_layer,[verbose]), init:stop().'
+
+dialyze:
+ dialyzer src/*.erl test/*.erl
+
+full: all test dialyze
31 include/worker_net.hrl
@@ -0,0 +1,31 @@
+%%% @author Gianfranco <zenon@zen.local>
+%%% @copyright (C) 2010, Gianfranco
+%%% Created : 10 Dec 2010 by Gianfranco <zenon@zen.local>
+
+-type resource_spec() :: [{atom(),infinity| non_neg_integer()}].
+
+-record(wn_resource,
+ {name :: string(),
+ type :: resource_spec(),
+ resides :: node(),
+ pid :: pid() | undefined
+ }).
+
+-record(wn_file,
+ {id :: string(),
+ file :: string(),
+ resides :: node()
+ }).
+
+-record(wn_job,
+ {id :: string(),
+ files :: [#wn_file{}],
+ resources :: [atom()],
+ commands :: [string()],
+ timeout :: integer()
+ }).
+
+-type date() :: {integer(),integer(),integer()}.
+-type time() :: {integer(),integer(),integer()}.
+-type now() :: {integer(),integer(),integer()}.
+-type time_marker() :: {date(),time(),now()}.
264 src/wn_file_layer.erl
@@ -0,0 +1,264 @@
+%%%-------------------------------------------------------------------
+%%% @author Gianfranco <zenon@zen.local>
+%%% @copyright (C) 2010, Gianfranco
+%%% Created : 19 Dec 2010 by Gianfranco <zenon@zen.local>
+%%%-------------------------------------------------------------------
+-module(wn_file_layer).
+-behaviour(gen_server).
+-include("include/worker_net.hrl").
+
+%% API
+-export([start_link/1,add_file/1,list_files/0,stop/0,
+ retrieve_file/2,delete_file/2]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state,
+ { node_root :: string()
+ }).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+start_link(NodeRoot) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, NodeRoot, []).
+
+-spec(add_file(#wn_file{}) -> ok | {error,term()}).
+add_file(WnFile) ->
+ case gen_server:call(?MODULE,{add_file,WnFile}) of
+ {ok,{Pid,Ref}} ->
+ {ok,IoDev} = file:open(WnFile#wn_file.file,[read,binary]),
+ Pid ! {Ref,self(),IoDev},
+ receive
+ {Ref,Result} -> Result
+ end;
+ Error -> Error
+ end.
+
+-spec(list_files() -> [#wn_file{}]).
+list_files() ->
+ gen_server:call(?MODULE,list_all_files).
+
+-spec(stop() -> ok).
+stop() ->
+ gen_server:call(?MODULE,stop).
+
+-spec(retrieve_file(node(),string()) -> {ok,string()} | {error,term()}).
+retrieve_file(Node,Id) ->
+ case gen_server:call(?MODULE,{retrieve_file,Node,Id}) of
+ {ok,{ReadDev,Name}} ->
+ retrieve_file(ReadDev,Name,file:open(Name,[write,binary]));
+ X -> X
+ end.
+retrieve_file(ReadDev,Name,{ok,WriteDev}) ->
+ case receive_file_client(WriteDev,ReadDev) of
+ ok -> {ok,Name};
+ Err -> Err
+ end;
+retrieve_file(ReadDev,_Name,Err) ->
+ file:close(ReadDev),
+ Err.
+
+-spec(delete_file(node(),string()) -> ok | {error,term()}).
+delete_file(Node,Id) ->
+ gen_server:call(?MODULE,{delete_file,Node,Id}).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+init(NodeRoot) ->
+ ok = filelib:ensure_dir(wn_util:file_root(NodeRoot)),
+ {ok, #state{node_root = NodeRoot}}.
+
+handle_call(stop,_From,State) ->
+ {stop,normal,ok,State};
+
+handle_call(list_all_files,From,State) ->
+ spawn_link(file_collector(From)),
+ {noreply,State};
+
+handle_call(list_files,_From,State) ->
+ PropList = check_files(State#state.node_root),
+ {reply,[V || {_,V} <- PropList],State};
+
+handle_call({delete_file,Node,Id},From,State) ->
+ case {node() == Node, lists:member(Node,nodes())} of
+ {true,false} ->
+ {reply,try_delete_file(Id,State),State};
+ {false,true} ->
+ case rpc:call(Node,erlang,whereis,[?MODULE]) of
+ undefined -> {reply,{error,noresides},State};
+ _Pid ->
+ gen_server:cast({?MODULE,Node},{delete_file,From,Id}),
+ {noreply,State}
+ end;
+ {false,false} ->
+ {reply,{error,noresides},State}
+ end;
+
+handle_call({retrieve_file,Node,Id},From,State) ->
+ case {node() == Node, lists:member(Node,nodes())} of
+ {true,false} ->
+ Result = try_retrieve(Id,State),
+ {reply,Result,State};
+ {false,true} ->
+ case rpc:call(Node,erlang,whereis,[?MODULE]) of
+ undefined -> {reply,{error,noresides},State};
+ _Pid ->
+ gen_server:cast({?MODULE,Node},{retrieve_file,From,Id}),
+ {noreply,State}
+ end;
+ {false,false} ->
+ {reply,{error,noresides},State}
+ end;
+
+handle_call({add_file,WnFile}, From, State) ->
+ #wn_file{resides=Node} = WnFile,
+ case {Node == node(),lists:member(Node,nodes())} of
+ {true,false} ->
+ Result = try_add(WnFile,State),
+ {reply,Result,State};
+ {false,true} ->
+ case rpc:call(Node,erlang,whereis,[?MODULE]) of
+ undefined -> {reply,{error,noresides},State};
+ _Pid ->
+ gen_server:cast({?MODULE,Node},{add_file,From,WnFile}),
+ {noreply,State}
+ end;
+ {false,false} ->
+ {reply,{error,noresides},State}
+ end.
+
+handle_cast({delete_file,From,Id},State) ->
+ Result = try_delete_file(Id,State),
+ gen_server:reply(From,Result),
+ {noreply,State};
+
+handle_cast({retrieve_file,From,Id},State) ->
+ Result = try_retrieve(Id,State),
+ gen_server:reply(From,Result),
+ {noreply,State};
+
+handle_cast({add_file,From,WnFile},State) ->
+ Result = try_add(WnFile,State),
+ gen_server:reply(From,Result),
+ {noreply,State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+check_files(NodeRoot) ->
+ Path = wn_util:file_root(NodeRoot),
+ ok = filelib:ensure_dir(Path),
+ {ok,NameDirs} = file:list_dir(Path),
+ lists:foldl(
+ fun(Dir,Acc) ->
+ {ok,[File]} = file:list_dir(Path++Dir),
+ [{Dir,#wn_file{id = Dir,
+ file = Path++Dir++"/"++File,
+ resides = node()}} | Acc]
+ end,[],NameDirs).
+
+receive_file(Ref,WriteDev) ->
+ fun() ->
+ receive
+ {Ref,Pid,ReadDev} ->
+ receive_file(Ref,Pid,WriteDev,ReadDev)
+ end
+ end.
+
+receive_file(Ref,Pid,WriteDev,ReadDev) ->
+ Res = close_transfer(WriteDev,ReadDev,
+ transfer(WriteDev,ReadDev)),
+ Pid ! {Ref,Res}.
+
+receive_file_client(WriteDev,ReadDev) ->
+ close_transfer(WriteDev,ReadDev,
+ transfer(WriteDev,ReadDev)).
+
+close_transfer(WriteDev,ReadDev,TransferResult) ->
+ case
+ lists:dropwhile(fun(X) -> X == ok end,
+ [TransferResult,
+ file:close(WriteDev),
+ file:close(ReadDev)]) of
+ [] -> ok;
+ [X|_] -> X
+ end.
+
+-spec(transfer(pid(),pid()) -> ok | {error,term()}).
+transfer(WriteDev,ReadDev) ->
+ transfer(WriteDev,ReadDev,file:read(ReadDev,1024)).
+
+transfer(WriteDev,ReadDev,{ok,Data}) ->
+ case file:write(WriteDev,Data) of
+ ok -> transfer(WriteDev,ReadDev,file:read(ReadDev,1024));
+ Err -> Err
+ end;
+transfer(_WriteDev,_ReadDev,eof) -> ok;
+transfer(_WriteDev,_ReadDev,Err) -> Err.
+
+file_collector(From) ->
+ Nodes = [node()|nodes()],
+ fun() ->
+ Res =
+ lists:foldl(
+ fun(Node,Acc) ->
+ case rpc:call(Node,erlang,whereis,[?MODULE]) of
+ undefined -> Acc;
+ _Pid ->
+ gen_server:call({?MODULE,Node},
+ list_files)++Acc
+ end
+ end,[],Nodes),
+ gen_server:reply(From,Res)
+ end.
+
+try_delete_file(Id,State) ->
+ PropList = check_files(State#state.node_root),
+ case proplists:lookup(Id,PropList) of
+ none ->
+ {error,noexists};
+ {Id,WnFile} ->
+ List = [ file:delete(WnFile#wn_file.file),
+ file:del_dir(wn_util:file_dir(State#state.node_root,WnFile)) ],
+ case lists:dropwhile(fun(X) -> X == ok end,List) of
+ [] -> ok;
+ [X|_] -> X
+ end
+ end.
+
+try_retrieve(Id,State) ->
+ PropList = check_files(State#state.node_root),
+ case proplists:lookup(Id,PropList) of
+ none -> {error,noexists};
+ {Id,WnFile} ->
+ Path = WnFile#wn_file.file,
+ {ok,ReadDev} = file:open(Path,[read,binary]),
+ {ok,{ReadDev,filename:basename(Path)}}
+ end.
+
+try_add(WnFile,State) ->
+ Path = wn_util:file_dir(State#state.node_root,WnFile)++
+ filename:basename(WnFile#wn_file.file),
+ case filelib:is_file(Path) of
+ true -> {error,exists};
+ false ->
+ ok = filelib:ensure_dir(Path),
+ {ok,WriteDev} = file:open(Path,[write,binary]),
+ Ref = make_ref(),
+ Pid = spawn(receive_file(Ref,WriteDev)),
+ {ok,{Pid,Ref}}
+ end.
168 src/wn_job_keeper.erl
@@ -0,0 +1,168 @@
+%%%-------------------------------------------------------------------
+%%% @author Gianfranco <zenon@zen.local>
+%%% @copyright (C) 2011, Gianfranco
+%%% Created : 13 Jan 2011 by Gianfranco <zenon@zen.local>
+%%%-------------------------------------------------------------------
+-module(wn_job_keeper).
+-behaviour(gen_fsm).
+-include("include/worker_net.hrl").
+
+%% API
+-export([start_link/1,done/2,progress/2,info/2,stream/2,
+ signal/1,get_result/1,get_done/1,get_job/1,
+ cancel/1, get_stored_result/1,
+ logs/1,set_stored_result/2,delete/1
+ ]).
+
+%% gen_fsm callbacks
+-export([init/1, handle_event/3, handle_sync_event/4,
+ handle_info/3, terminate/3, code_change/4]).
+
+-record(state, {job :: #wn_job{},
+ info :: [{time(),term()}],
+ result :: [{time(),term()}],
+ stored_result :: string(),
+ stream :: [pid()],
+ done_at :: undefined | time_marker()
+ }).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+start_link(WnJob) ->
+ gen_fsm:start_link(?MODULE, WnJob, []).
+
+-spec(done(pid(),time_marker()) -> ok).
+done(Pid,TimeMarker) ->
+ gen_fsm:send_all_state_event(Pid,{done,TimeMarker}).
+
+-spec(progress(pid(),term()) -> ok).
+progress(Pid,X) ->
+ gen_fsm:send_all_state_event(Pid,{progress,X}).
+
+-spec(info(pid(),term()) -> ok).
+info(Pid,X) ->
+ gen_fsm:send_all_state_event(Pid,{info,X}).
+
+-spec(stream(pid(),pid()) -> ok).
+stream(Pid,StreamPid) ->
+ gen_fsm:send_all_state_event(Pid,{stream,StreamPid}).
+
+-spec(signal(pid()) -> {ok,#wn_job{}} | {error,taken}).
+signal(Pid) ->
+ gen_fsm:sync_send_all_state_event(Pid,signal).
+
+-spec(get_result(pid()) -> [string()]).
+get_result(Pid) ->
+ gen_fsm:sync_send_all_state_event(Pid,get_result).
+
+-spec(get_done(pid()) -> {ok,time_marker()} | {error,not_done}).
+get_done(Pid) ->
+ gen_fsm:sync_send_all_state_event(Pid,get_done).
+
+-spec(get_job(pid()) -> #wn_job{}).
+get_job(Pid) ->
+ gen_fsm:sync_send_all_state_event(Pid,get_job).
+
+-spec(cancel(pid()) -> ok | {error,term()}).
+cancel(Pid) ->
+ gen_fsm:sync_send_all_state_event(Pid,cancel).
+
+-spec(logs(pid()) -> [{info|result,[{time(),term()}]}]).
+logs(Pid) ->
+ gen_fsm:sync_send_all_state_event(Pid,get_logs).
+
+-spec(set_stored_result(pid(),string()) -> ok).
+set_stored_result(Pid,Id) ->
+ gen_fsm:sync_send_all_state_event(Pid,{stored_result,Id}).
+
+-spec(get_stored_result(pid()) -> {ok,string()} | {error,term()}).
+get_stored_result(Pid) ->
+ gen_fsm:sync_send_all_state_event(Pid,get_stored_result).
+
+-spec(delete(pid()) -> ok | {error,term()}).
+delete(Pid) ->
+ gen_fsm:sync_send_all_state_event(Pid,delete).
+
+%%%===================================================================
+%%% gen_fsm callbacks
+%%%===================================================================
+init(WnJob) ->
+ {ok, waiting, #state{job = WnJob,
+ info = [],
+ result = [],
+ stream = []
+ }}.
+
+handle_event({done,TimeMarker}, working, #state{info = Info} = State) ->
+ Now = now(),
+ stream_msg(State#state.stream,{Now,done}),
+ {next_state, done, State#state{info = [{Now,done}|Info],
+ done_at = TimeMarker
+ }};
+handle_event({progress,X},working,#state{result = Result} = State) ->
+ Now = now(),
+ stream_msg(State#state.stream,{Now,X}),
+ {next_state,working,State#state{result = [{Now,X}|Result]}};
+handle_event({info,X},working,#state{info = Info} = State) ->
+ Now = now(),
+ stream_msg(State#state.stream,{Now,X}),
+ {next_state,working,State#state{info = [{Now,X}|Info]}};
+handle_event({stream,Pid},X,#state{stream = Stream} = State) ->
+ replay(Pid,State),
+ {next_state,X,State#state{stream=[Pid|Stream]}}.
+
+handle_sync_event(delete,_,done,State) ->
+ {stop,normal,ok,State};
+handle_sync_event(delete,_,X,State) ->
+ {reply,{error,not_done},X,State};
+
+handle_sync_event({stored_result,Id},_,X,State) ->
+ {reply,ok,X,State#state{stored_result = Id}};
+
+handle_sync_event(get_logs,_From,X,State) ->
+ {reply,[{info,State#state.info},
+ {result,State#state.result}],X,State};
+
+handle_sync_event(get_stored_result,_,X,State) ->
+ {reply,
+ case State#state.stored_result of
+ undefined -> {error,no_result};
+ R -> {ok,R}
+ end,X,State};
+
+handle_sync_event(cancel,_From,waiting,State) ->
+ {stop,normal,ok,State};
+handle_sync_event(cancel,_From,X,State) ->
+ {reply,{error,X},X,State};
+handle_sync_event(get_job,_From, X, State) ->
+ {reply,State#state.job,X,State};
+handle_sync_event(signal, _From, waiting, State) ->
+ {reply,{ok,State#state.job}, working, State};
+handle_sync_event(signal,_From,X,State) ->
+ {reply,{error,taken},X,State};
+handle_sync_event(get_result,_From,X,State) ->
+ {reply,[ Z || {_,Z} <- State#state.result],X,State};
+handle_sync_event(get_done,_From,done,State) ->
+ {reply,{ok,State#state.done_at},done,State};
+handle_sync_event(get_done,_From,X,State) ->
+ {reply,{error,not_done},X,State}.
+
+handle_info(_Info, StateName, State) ->
+ {next_state, StateName, State}.
+
+terminate(_Reason, _StateName, _State) ->
+ ok.
+
+code_change(_OldVsn, StateName, State, _Extra) ->
+ {ok, StateName, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+replay(Pid,#state{info = Info,result=Result}) ->
+ lists:foreach(fun(Entry) -> Pid ! Entry end,
+ lists:sort(Info++Result)).
+
+stream_msg(List,X) ->
+ lists:foreach(fun(Pid) -> Pid ! X end,List).
257 src/wn_job_layer.erl
@@ -0,0 +1,257 @@
+%%%-------------------------------------------------------------------
+%%% @author Gianfranco <zenon@zen.home>
+%%% @copyright (C) 2011, Gianfranco
+%%% Created : 4 Jan 2011 by Gianfranco <zenon@zen.home>
+%%%-------------------------------------------------------------------
+-module(wn_job_layer).
+-behaviour(gen_server).
+-include("include/worker_net.hrl").
+
+%% API
+-export([start_link/0,register/1,list_all_jobs/0,
+ stop/0,stream/2,result/1,finished_at/1,
+ cancel/1,stored_result/1,delete/1
+ ]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {jobs % ets table {Name,Pid,WnJob}
+ }).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+-spec(start_link() -> {ok,pid()} | {error,term()}).
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+-spec(register(#wn_job{}) -> ok | {error,term()}).
+register(WnJob) ->
+ case try_send_files(WnJob#wn_job.files) of
+ ok -> gen_server:call(?MODULE,{add_job,WnJob});
+ E -> E
+ end.
+
+-spec(list_all_jobs() -> [#wn_job{}]).
+list_all_jobs() ->
+ gen_server:call(?MODULE,list_all_jobs).
+
+-spec(stop() -> ok).
+stop() ->
+ gen_server:call(?MODULE,stop).
+
+-spec(stream(term(),string()) -> ok | {error,term()}).
+stream(IoDev,Id) ->
+ Stream = fun(F) -> receive {T,E} -> io:format(IoDev,"~p : ~p~n",[T,E]) end, F(F) end,
+ StreamPid = spawn_link(fun() -> Stream(Stream) end),
+ case gen_server:call(?MODULE,{stream,StreamPid,Id}) of
+ ok -> ok;
+ Err ->
+ exit(StreamPid,Err),
+ Err
+ end.
+
+-spec(result(string()) -> {ok,[string()]} | {error,term()}).
+result(Id) ->
+ gen_server:call(?MODULE,{result,Id}).
+
+-spec(finished_at(string()) -> {ok,time_marker()} | {error,term()}).
+finished_at(Id) ->
+ gen_server:call(?MODULE,{finished_at,Id}).
+
+-spec(cancel(string()) -> ok | {error,term()}).
+cancel(Id) ->
+ gen_server:call(?MODULE,{cancel,Id}).
+
+-spec(stored_result(string()) -> {ok,string()} | {error,term()}).
+stored_result(Id) ->
+ gen_server:call(?MODULE,{stored_result,Id}).
+
+-spec(delete(string()) -> ok | {error,term()}).
+delete(Id) ->
+ gen_server:call(?MODULE,{delete,Id}).
+
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+init([]) ->
+ {ok, #state{jobs = ets:new(jobs_table,[set])}}.
+
+handle_cast(_Msg,State) ->
+ {noreply,State}.
+
+handle_call({delete,Id},_,State) ->
+ Result = try_delete(Id,State),
+ {reply,Result,State};
+
+handle_call({stored_result,Id},_,State) ->
+ Result = try_get_stored(Id,State),
+ {reply,Result,State};
+
+handle_call({cancel,Id},_,State) ->
+ Reply = try_cancel(Id,State),
+ {reply,Reply,State};
+
+handle_call(stop,_From,State) ->
+ {stop,normal,ok,State};
+
+handle_call({finished_at,Id},_From,State) ->
+ {reply,
+ case ets:lookup(State#state.jobs,Id) of
+ [] -> {error,no_such_job};
+ [{Id,Pid,_}] ->
+ wn_job_keeper:get_done(Pid)
+ end,State};
+
+handle_call({result,Id},_From,State) ->
+ {reply,
+ case ets:lookup(State#state.jobs,Id) of
+ [] -> {error,no_such_job};
+ [{Id,Pid,_}] ->
+ {ok,wn_job_keeper:get_result(Pid)}
+ end, State};
+
+handle_call({stream,StreamPid,Id},_From,State) ->
+ {reply,
+ case ets:lookup(State#state.jobs,Id) of
+ [] -> {error,no_such_job};
+ [{Id,Pid,_}] ->
+ wn_job_keeper:stream(Pid,StreamPid),
+ ok
+ end,
+ State};
+
+handle_call(list_all_jobs,From,State) ->
+ spawn_link(job_collector(From)),
+ {noreply,State};
+
+handle_call(list_jobs,_From,State) ->
+ {reply,[WnJob || {_,_,WnJob} <- ets:tab2list(State#state.jobs)],State};
+
+handle_call({add_job,WnJob}, _From, State) ->
+ JobId = WnJob#wn_job.id,
+ {reply,
+ case ets:lookup(State#state.jobs,JobId) of
+ [] ->
+ {ok,Pid} = wn_job_keeper:start_link(WnJob),
+ ets:insert(State#state.jobs,{JobId,Pid,WnJob}),
+ lists:foreach(
+ fun(WnResource) ->
+ case resource_is_sufficient(WnJob,WnResource) of
+ {true,Possibles} ->
+ lists:foreach(
+ fun(Type) ->
+ signal_resource(Pid,WnResource,Type)
+ end,Possibles);
+ false -> ignore
+ end
+ end,wn_resource_layer:list_resources()),
+ ok;
+ [_] ->
+ lists:foreach(
+ fun(File) ->
+ wn_file_layer:delete_file(File#wn_file.resides,
+ File#wn_file.id)
+ end,WnJob#wn_job.files),
+ {error,already_exists}
+ end, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+try_delete(Id,State) ->
+ case ets:lookup(State#state.jobs,Id) of
+ [{Id,JobKeeperPid,_}] ->
+ case wn_job_keeper:get_stored_result(JobKeeperPid) of
+ {ok,Result} ->
+ ok = wn_job_keeper:delete(JobKeeperPid),
+ ets:delete(State#state.jobs,Id),
+ wn_file_layer:delete_file(node(),Result);
+ X -> X
+ end;
+ [] ->
+ {error,no_such_job}
+ end.
+
+try_get_stored(Id,State) ->
+ case ets:lookup(State#state.jobs,Id) of
+ [{Id,JobKeeperPid,_}] ->
+ wn_job_keeper:get_stored_result(JobKeeperPid);
+ [] -> {error,no_such_job}
+ end.
+
+try_cancel(Id, State) ->
+ case ets:lookup(State#state.jobs,Id) of
+ [{Id,JobKeeperPid,WnJob}] ->
+ ets:delete(State#state.jobs,Id),
+ wn_job_keeper:cancel(JobKeeperPid),
+ cancel_resources(JobKeeperPid,WnJob),
+ ok;
+ [] ->
+ {error,no_such_job}
+ end.
+
+cancel_resources(JobKeeperPid,WnJob) ->
+ lists:foreach(
+ fun(WnResource) ->
+ case resource_is_sufficient(WnJob,WnResource) of
+ {true,Possibles} ->
+ WnPid = WnResource#wn_resource.pid,
+ lists:foreach(
+ fun(Type) ->
+ wn_resource_process:cancel(WnPid,
+ JobKeeperPid,
+ Type)
+ end,
+ Possibles);
+ false -> ignore
+ end
+ end,
+ wn_resource_layer:list_resources()).
+
+try_send_files([F|R]) ->
+ case wn_file_layer:add_file(F) of
+ ok -> try_send_files(R);
+ E -> E
+ end;
+try_send_files([]) -> ok.
+
+resource_is_sufficient(WnJob,WnResource) ->
+ case [ T || {T,_} <- WnResource#wn_resource.type,
+ lists:member(T,WnJob#wn_job.resources)] of
+ [] -> false;
+ L -> {true,L}
+ end.
+
+signal_resource(JobKeeperPid,WnResource,Type) ->
+ wn_resource_process:signal(WnResource#wn_resource.pid,
+ JobKeeperPid,Type).
+
+job_collector(From) ->
+ Nodes = [node()|nodes()],
+ fun() ->
+ Res =
+ lists:foldr(
+ fun(Node,Acc) ->
+ case rpc:call(Node,erlang,whereis,[?MODULE]) of
+ undefined -> Acc;
+ _Pid ->
+ gen_server:call({?MODULE,Node},
+ list_jobs)++Acc
+ end
+ end,[],Nodes),
+ gen_server:reply(From,Res)
+ end.
130 src/wn_job_worker.erl
@@ -0,0 +1,130 @@
+%%%-------------------------------------------------------------------
+%%% @author Gianfranco <zenon@zen.home>
+%%% @copyright (C) 2011, Gianfranco
+%%% Created : 20 Jan 2011 by Gianfranco <zenon@zen.home>
+%%%-------------------------------------------------------------------
+-module(wn_job_worker).
+-behaviour(gen_server).
+-include("include/worker_net.hrl").
+
+%% API
+-export([start_link/3]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {pid :: pid(),
+ job :: #wn_job{},
+ workdir :: string(),
+ olddir :: string(),
+ commands :: [string()],
+ port :: port()
+ }).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+-spec(start_link(string(),pid(),#wn_job{}) -> {ok,pid()} | term()).
+start_link(NodeRoot,Pid,WnJob) ->
+ gen_server:start_link(?MODULE, {NodeRoot,Pid,WnJob}, []).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+init({NodeRoot,Pid,WnJob}) ->
+ gen_server:cast(self(),start),
+ {ok, #state{pid = Pid,
+ job = WnJob,
+ olddir = (fun({ok,Curr}) -> Curr end)(file:get_cwd()),
+ workdir = wn_util:work_dir(NodeRoot,WnJob)}}.
+
+handle_call(_Request, _From, State) ->
+ {reply, ok, State}.
+
+handle_cast(start, State) ->
+ ok = filelib:ensure_dir(State#state.workdir),
+ ok = file:set_cwd(State#state.workdir),
+ WnJob = State#state.job,
+ case get_work_files(WnJob) of
+ {ok,_} ->
+ wn_job_keeper:info(State#state.pid,file_fetching_done),
+ wn_job_keeper:info(State#state.pid,executing_commands),
+ timer:send_after(WnJob#wn_job.timeout,self(),timeout),
+ [C|Commands] = (State#state.job)#wn_job.commands,
+ wn_job_keeper:info(State#state.pid,{executing,C}),
+ Port = erlang:open_port({spawn,C},[eof,{line,2048}]),
+ {noreply,State#state{commands = Commands,port = Port}};
+ Err ->
+ ok = file:set_cwd(State#state.olddir),
+ ok = wn_util:clean_up(State#state.workdir),
+ wn_job_keeper:info(State#state.pid,{file_fetching_failed,Err}),
+ wn_job_keeper:info(State#state.pid,ending),
+ {stop,file_fetching_failed,State}
+ end.
+
+handle_info({Port,{data,{eol,D}}},#state{port = Port} = State) ->
+ wn_job_keeper:progress(State#state.pid,D),
+ {noreply,State};
+handle_info({Port,eof},#state{port = Port} = State) ->
+ case State#state.commands of
+ [] ->
+ wn_job_keeper:info(State#state.pid,no_more_commands),
+ wn_job_keeper:info(State#state.pid,building_result_tgz),
+ ok = file:set_cwd(State#state.olddir),
+ Keeper = State#state.pid,
+ TimeMarker = wn_util:time_marker(),
+ wn_job_keeper:done(Keeper,TimeMarker),
+ Job = wn_job_keeper:get_job(Keeper),
+ Logs = wn_job_keeper:logs(Keeper),
+ {Id,Name} = make_tgz_result(TimeMarker,
+ State#state.workdir,
+ Job,Logs),
+ ok = wn_file_layer:add_file(#wn_file{id = Id,
+ file = Name,
+ resides = node()}),
+ file:delete(Name),
+ wn_job_keeper:set_stored_result(Keeper,Id),
+ {stop,normal,State};
+ [C|Commands] ->
+ wn_job_keeper:info(State#state.pid,{executing,C}),
+ NewPort = erlang:open_port({spawn,C},[eof,{line,2048}]),
+ {noreply,State#state{commands = Commands,port = NewPort}}
+ end;
+handle_info(timeout, State) ->
+ wn_job_keeper:info(State#state.pid,timeout_on_job),
+ {stop,timeout,State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+get_work_files(WnJob) ->
+ lists:foldl(
+ fun(_WnFile,{error,X}) -> {error,X};
+ (WnFile,{ok,_}) ->
+ wn_file_layer:retrieve_file(WnFile#wn_file.resides,
+ WnFile#wn_file.id)
+ end,{ok,1},WnJob#wn_job.files).
+
+make_tgz_result(TimeMarker,Dir,WnJob,Logs) ->
+ Id = WnJob#wn_job.id++"_result",
+ Name = Id++"_"++wn_util:time_marker_to_string(TimeMarker)++".tgz",
+ {ok,TarFile} = erl_tar:open(Name,[write,compressed]),
+ Files = filelib:wildcard(Dir++"*"),
+ lists:foreach(
+ fun(File) ->
+ ok = erl_tar:add(TarFile,File,filename:basename(File),[])
+ end,Files),
+ LogStr = lists:foldl(
+ fun({TimeMark,Entry},Str) ->
+ Str++io_lib:format("~p : ~p~n",[TimeMark,Entry])
+ end,"",lists:sort(lists:append([Lines || {_,Lines} <- Logs]))),
+ ok = erl_tar:add(TarFile,erlang:list_to_binary(LogStr),"Log.txt",[]),
+ erl_tar:close(TarFile),
+ {Id,Name}.
179 src/wn_resource_layer.erl
@@ -0,0 +1,179 @@
+%%%-------------------------------------------------------------------
+%%% @author Gianfranco <zenon@zen.local>
+%%% @copyright (C) 2010, Gianfranco
+%%% Created : 11 Dec 2010 by Gianfranco <zenon@zen.local>
+%%%-------------------------------------------------------------------
+-module(wn_resource_layer).
+-behaviour(gen_server).
+-include("include/worker_net.hrl").
+
+%% API
+-export([start_link/1,
+ register/1,list_resources/0,stop/0,
+ deregister/2,queued/2]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state,
+ {resources, %% ETS {Name,#wn_resource{}}
+ node_root :: string(),
+ parent_pid :: pid()
+ }).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+-spec(start_link(string()) -> {ok,pid()} | term()).
+start_link(NodeRoot) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, NodeRoot, []).
+
+-spec(register(#wn_resource{}) -> ok | {error,term()}).
+register(Resource) ->
+ gen_server:call(?MODULE,{register,Resource}).
+
+-spec(deregister(node(),string()) -> ok | {error,term()}).
+deregister(Node,Name) ->
+ gen_server:call(?MODULE,{deregister,Node,Name}).
+
+-spec(list_resources() -> [#wn_resource{}]).
+list_resources() ->
+ gen_server:call(?MODULE,list_all_resources).
+
+-spec(stop() -> ok).
+stop() ->
+ gen_server:call(?MODULE,stop).
+
+-spec(queued(node(),string()) -> {ok,[{atom(),[#wn_job{}]}]} | {error,term()}).
+queued(Node,Name) ->
+ gen_server:call(?MODULE,{queued,Node,Name}).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+init(NodeRoot) ->
+ {ok, #state{resources = ets:new(resources,[set]),
+ node_root = NodeRoot
+ }}.
+
+handle_call(stop,_From,State) ->
+ {stop,normal,ok,State};
+
+handle_call({queued,Node,Name},From,State) ->
+ case {Node == node(), lists:member(Node,nodes())} of
+ {true,_} ->
+ Reply = try_get_queued(State,Name),
+ {reply,Reply,State};
+ {false,true} ->
+ gen_server:cast({?MODULE,Node},{queued,From,Name}),
+ {noreply,State};
+ {false,false} ->
+ {reply,{error,noresides},State}
+ end;
+
+handle_call(list_all_resources,From,State) ->
+ spawn(resource_collector(From)),
+ {noreply,State};
+
+handle_call(list_resources,_From,State) ->
+ {reply,[V || {_,V} <- ets:tab2list(State#state.resources)],State};
+
+handle_call({register,Resource},From, State) ->
+ #wn_resource{resides=Node} = Resource,
+ case {Node == node(),lists:member(Node,nodes())} of
+ {true,_} ->
+ Reply = try_register(State,Resource),
+ {reply,Reply,State};
+ {false,true} ->
+ gen_server:cast({?MODULE,Node},{register,From,Resource}),
+ {noreply,State};
+ {false,false} ->
+ {reply,{error,noresides},State}
+ end;
+
+handle_call({deregister,Node,Name},From,State) ->
+ case {Node == node(),lists:member(Node,nodes())} of
+ {true,_} ->
+ Reply = try_deregister(State,Name),
+ {reply,Reply,State};
+ {false,true} ->
+ gen_server:cast({?MODULE,Node},{deregister,From,Node,Name}),
+ {noreply,State};
+ {false,false} ->
+ {reply,{error,noresides},State}
+ end.
+
+handle_cast({queued,From,Name},State) ->
+ gen_server:reply(From,try_get_queued(State,Name)),
+ {noreply, State};
+
+handle_cast({deregister,From,_Node,Name},State) ->
+ gen_server:reply(From,try_deregister(State,Name)),
+ {noreply, State};
+
+handle_cast({register,From,Resource},State) ->
+ gen_server:reply(From,try_register(State,Resource)),
+ {noreply, State}.
+
+handle_info(_Msg,State) ->
+ {noreply,State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+try_get_queued(State,Name) ->
+ case ets:lookup(State#state.resources,Name) of
+ [] -> {error,noexists};
+ [{Name,WnResource}] ->
+ Pid = WnResource#wn_resource.pid,
+ {ok,wn_resource_process:queued(Pid)}
+ end.
+
+try_deregister(State,Name) ->
+ case ets:lookup(State#state.resources,Name) of
+ [] -> {error,noexists};
+ [{Name,WnResource}] ->
+ exit(WnResource#wn_resource.pid,deregistered),
+ ets:delete(State#state.resources,Name),
+ ok
+ end.
+
+try_register(State,Resource) ->
+ #wn_resource{name=Name} = Resource,
+ case ets:lookup(State#state.resources,Name) of
+ [] ->
+ process_flag(trap_exit,true),
+ {ok,Pid} = wn_resource_process:start_link(State#state.node_root,
+ Resource#wn_resource.type),
+ ets:insert(State#state.resources,
+ {Name,Resource#wn_resource{pid=Pid}}),
+ ok;
+ _ ->
+ {error,already_exists}
+ end.
+
+resource_collector(From) ->
+ Nodes = [node()|nodes()],
+ fun() ->
+ Res =
+ lists:foldr(
+ fun(Node,Acc) ->
+ case rpc:call(Node,erlang,whereis,[?MODULE]) of
+ undefined -> Acc;
+ _Pid ->
+ gen_server:call({?MODULE,Node},
+ list_resources)++Acc
+ end
+ end,[],Nodes),
+ gen_server:reply(From,Res)
+ end.
+
+
137 src/wn_resource_process.erl
@@ -0,0 +1,137 @@
+%%%-------------------------------------------------------------------
+%%% @author Gianfranco <zenon@zen.local>
+%%% @copyright (C) 2011, Gianfranco
+%%% Created : 14 Feb 2011 by Gianfranco <zenon@zen.local>
+%%%-------------------------------------------------------------------
+-module(wn_resource_process).
+-behaviour(gen_server).
+-include("include/worker_net.hrl").
+-define(TIMEOUT,3000).
+-record(state,{node_root :: string(),
+ queues :: [{atom,[pid()]}],
+ working, %% ets {pid(),pid(),atom()}
+ slots %% ets {atom,non_neg_integer()|infinity}
+ }).
+
+%% API
+-export([start_link/2,signal/3,queued/1,
+ cancel/3
+ ]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+-spec(start_link(string(),[resource_spec()]) -> {ok,pid()} | {error,term()}).
+start_link(NodeRoot,TypeSpec) ->
+ gen_server:start_link(?MODULE, {NodeRoot,TypeSpec}, []).
+
+-spec(signal(pid(),pid(),atom()) -> ok).
+signal(Pid,JobKeeperPid,QueueType) ->
+ gen_server:cast(Pid,{signal,JobKeeperPid,QueueType}).
+
+-spec(queued(pid()) -> [{atom(),[#wn_job{}]}]).
+queued(Pid) ->
+ gen_server:call(Pid,queued).
+
+-spec(cancel(pid(),pid(),atom()) -> ok | {error,term()}).
+cancel(Pid,JobKeeperPid,Type) ->
+ gen_server:call(Pid,{cancel,JobKeeperPid,Type}).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+init({NodeRoot,TypeSpec}) ->
+ Slots = ets:new(available,[set]),
+ Working = ets:new(working,[set]),
+ lists:foreach(fun({Type,Amount}) ->
+ ets:insert(Slots,{Type,Amount}),
+ ets:insert(Working,{Type,[]})
+ end,TypeSpec),
+ {ok,#state{node_root = NodeRoot,
+ queues = [{Type,[]} || {Type,_} <- TypeSpec],
+ slots = Slots,
+ working = Working
+ }}.
+
+handle_call({cancel,JobKeeperPid,Type},_From,State) ->
+ {value,{_,TypeQueue},Q} = lists:keytake(Type,1,State#state.queues),
+ case TypeQueue -- [JobKeeperPid] of
+ TypeQueue ->
+ {reply,{error,{not_in_queue,Type}},State};
+ X ->
+ {reply,ok,State#state{queues = [{Type,X}|Q]}}
+ end;
+
+handle_call(queued, _From, State) ->
+ Reply = collect_jobs(State),
+ {reply, Reply, State}.
+
+handle_cast({signal,JobKeeperPid,QueueType}, State) ->
+ {noreply,
+ case {ets:lookup(State#state.slots,QueueType),
+ lists:keytake(QueueType,1,State#state.queues)} of
+ {[{QueueType,infinity}], _ } ->
+ try_dispatch_job(JobKeeperPid,State,QueueType),
+ State;
+ {[{QueueType,Available}], {value,{_,[]},_}} when Available > 0 ->
+ case try_dispatch_job(JobKeeperPid,State,QueueType) of
+ ok -> ets:insert(State#state.slots,{QueueType,Available-1});
+ {error,taken} -> ignore
+ end,
+ State;
+ {[{QueueType,_}], {value,{Type,Queue},Queues}} ->
+ State#state{queues = [{Type,Queue++[JobKeeperPid]}|Queues]}
+ end}.
+
+handle_info({'EXIT',WorkerPid,_}, State) ->
+ {noreply,
+ begin
+ [{WorkerPid,_,QueueType}] = ets:lookup(State#state.working,WorkerPid),
+ true = ets:delete(State#state.working,WorkerPid),
+
+ case lists:keytake(QueueType,1,State#state.queues) of
+ {value,{_,[]},_} ->
+ case ets:lookup(State#state.slots,QueueType) of
+ [{QueueType,infinity}] -> ignore;
+ [{QueueType,X}] -> ets:insert(State#state.slots,{QueueType,X+1})
+ end,
+ State;
+ {value,{Type,[QueuedPid|R]},Queues} ->
+ case try_dispatch_job(QueuedPid,State,QueueType) of
+ ok ->
+ State#state{queues = [{Type,R}|Queues]};
+ {error,taken} ->
+ State
+ end
+ end
+ end}.
+
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+try_dispatch_job(JobKeeperPid,State,QueueType) ->
+ case wn_job_keeper:signal(JobKeeperPid) of
+ {ok,WnJob} ->
+ process_flag(trap_exit,true),
+ {ok,WorkerPid} = wn_job_worker:start_link(State#state.node_root,
+ JobKeeperPid,WnJob),
+ ets:insert(State#state.working,{WorkerPid,JobKeeperPid,QueueType}),
+ ok;
+ {error,taken} ->
+ {error,taken}
+ end.
+
+collect_jobs(State) ->
+ [{Type,[ wn_job_keeper:get_job(Pid) || Pid <- Queue]}
+ || {Type,Queue} <- State#state.queues].
121 src/wn_resource_process.erl.BAK
@@ -0,0 +1,121 @@
+%%%-------------------------------------------------------------------
+%%% @author Gianfranco <zenon@zen.local>
+%%% @copyright (C) 2011, Gianfranco
+%%% Created : 11 Jan 2011 by Gianfranco <zenon@zen.local>
+%%%-------------------------------------------------------------------
+-module(wn_resource_process).
+-behaviour(gen_fsm).
+-include("include/worker_net.hrl").
+-define(TIMEOUT,3000).
+-record(state,{node_root :: string(),
+ queues :: [{atom,[pid()]}],
+ slots, %% ets {atom,non_neg_integer()|infinity,
+ %% non_neg_integer()|infinity}
+ job :: pid(),
+ job_keeper :: pid()
+ }).
+
+%% API
+-export([start_link/2,signal/3,queued/1]).
+
+%% gen_fsm callbacks
+-export([init/1, handle_event/3, handle_sync_event/4,
+ handle_info/3, terminate/3, code_change/4]).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+-spec(start_link(string(),[{atom(),non_neg_integer()|infinity}]) ->
+ {ok,pid()} | {error,term()}).
+start_link(NodeRoot,TypeSpec) ->
+ gen_fsm:start_link(?MODULE, {NodeRoot,TypeSpec}, []).
+
+-spec(signal(pid(),pid(),atom()) -> ok).
+signal(Pid,JobKeeperPid,QueueType) ->
+ gen_fsm:send_all_state_event(Pid,{signal,JobKeeperPid,QueueType}).
+
+-spec(queued(pid()) -> [{atom(),[#wn_job{}]}]).
+queued(Pid) ->
+ gen_fsm:sync_send_all_state_event(Pid,queued).
+
+%%%===================================================================
+%%% gen_fsm callbacks
+%%%===================================================================
+init({NodeRoot,TypeSpec}) ->
+ Ets = ets:new(available,[set]),
+ lists:foreach(fun({Type,Amount}) -> ets:insert(Ets,{Type,Amount,Amount})
+ end,TypeSpec),
+ {ok, free, #state{node_root = NodeRoot,
+ queues = [],
+ slots = Ets
+ }}.
+
+handle_event({signal,JobKeeperPid,_}, free, State) ->
+ {Ns,Nld} =
+ case wn_job_keeper:signal(JobKeeperPid) of
+ {ok,WnJob} ->
+ process_flag(trap_exit,true),
+ {ok,WorkerPid} =
+ wn_job_worker:start_link(State#state.node_root,
+ JobKeeperPid,WnJob),
+ {taken,State#state{job = WorkerPid,
+ job_keeper = JobKeeperPid}};
+ {error,taken} ->
+ {free,State}
+ end,
+ {next_state,Ns,Nld};
+
+handle_event({signal,JobKeeperPid,QueueType}, taken, State) ->
+ Queues = add_to_queues(JobKeeperPid,QueueType,State#state.queues),
+ {next_state,taken,State#state{queues = Queues}}.
+
+handle_sync_event(queued, _From, StateName, State) ->
+ Reply = collect_jobs(State),
+ {reply, Reply, StateName, State}.
+
+handle_info({'EXIT',WorkerPid,Reason},
+ taken,#state{job = WorkerPid,job_keeper = JobKeeperPid} = State) ->
+ wn_job_keeper:done(JobKeeperPid,Reason),
+ {Ns,Nld} =
+ case waiting_in(State#state.queues) of
+ none -> {free,State};
+ {WaitingKeeperPid,NewQs} ->
+ case wn_job_keeper:signal(WaitingKeeperPid) of
+ {ok,WnJob} ->
+ process_flag(trap_exit,true),
+ {ok,NewWorkerPid} =
+ wn_job_worker:start_link(State#state.node_root,
+ WaitingKeeperPid,
+ WnJob),
+ {taken,State#state{job = NewWorkerPid,
+ job_keeper = WaitingKeeperPid,
+ queues = NewQs}};
+ {error,taken} ->
+ {free,State#state{queues = NewQs}}
+ end
+ end,
+ {next_state,Ns,Nld}.
+
+terminate(_Reason, _StateName, _State) ->
+ ok.
+
+code_change(_OldVsn, StateName, State, _Extra) ->
+ {ok, StateName, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+collect_jobs(State) ->
+ [{Type,[ wn_job_keeper:get_job(Pid) || Pid <- Queue]}
+ || {Type,Queue} <- State#state.queues].
+
+add_to_queues(JobKeeperPid,Type,[{Type,Queue}|Rest]) ->
+ [{Type,Queue++[JobKeeperPid]}|Rest];
+add_to_queues(JobKeeperPid,Type,[E|Rest]) ->
+ [E|add_to_queues(JobKeeperPid,Type,Rest)];
+add_to_queues(JobKeeperPid,Type,[]) -> [{Type,[JobKeeperPid]}].
+
+waiting_in([{_,[Pid]}|Q]) -> {Pid,Q};
+waiting_in([{Type,[Pid|R]}|Q]) -> {Pid,[{Type,R}|Q]};
+waiting_in([]) -> none.
+
51 src/wn_util.erl
@@ -0,0 +1,51 @@
+%%% @author Gianfranco <zenon@zen.home>
+%%% @copyright (C) 2011, Gianfranco
+%%% Created : 19 Jan 2011 by Gianfranco <zenon@zen.home>
+-module(wn_util).
+-include("include/worker_net.hrl").
+-export([file_dir/2,
+ work_dir/2,
+ clean_up/1,
+ file_root/1,
+ time_marker_to_string/1,
+ time_marker/0
+ ]).
+
+-spec(time_marker() -> time_marker()).
+time_marker() ->
+ {date(),time(),now()}.
+
+-spec(time_marker_to_string(time_marker()) -> string()).
+time_marker_to_string({Date,Time,Now}) ->
+ F = fun(X) -> string:join([integer_to_list(Y)||Y<-tuple_to_list(X)],"_")
+ end,
+ F(Date)++"_"++F(Time)++"_"++F(Now).
+
+-spec(clean_up(string()) -> ok | {error,term()}).
+clean_up(Path) ->
+ case filelib:is_dir(Path) of
+ true ->
+ {ok,Files} = file:list_dir(Path),
+ lists:foreach(
+ fun(File) -> clean_up(Path++"/"++File)
+ end,Files),
+ file:del_dir(Path);
+ false ->
+ ok = file:delete(Path)
+ end.
+
+-spec(work_dir(string(),#wn_job{}) -> string()).
+work_dir(NodeRoot,WnJob) ->
+ work_root(NodeRoot)++WnJob#wn_job.id++"/".
+
+-spec(file_dir(string(),#wn_file{}) -> string()).
+file_dir(NodeRoot,WnFile) ->
+ file_root(NodeRoot)++WnFile#wn_file.id++"/".
+
+-spec(file_root(string()) -> string()).
+file_root(NodeRoot) ->
+ NodeRoot++atom_to_list(node())++"/Files/".
+
+-spec(work_root(string()) -> string()).
+work_root(NodeRoot) ->
+ NodeRoot++atom_to_list(node())++"/Jobs/".
215 test/wn_file_layer_tests.erl
@@ -0,0 +1,215 @@
+%%% @author Gianfranco <zenon@zen.local>
+%%% @copyright (C) 2010, Gianfranco
+%%% Created : 19 Dec 2010 by Gianfranco <zenon@zen.local>
+-module(wn_file_layer_tests).
+-include_lib("eunit/include/eunit.hrl").
+-include("include/worker_net.hrl").
+
+-define(NODE_ROOT,
+ "/Users/zenon/ErlangBlog/worker_net-0.1/node_root/").
+
+file_layer_local_test_() ->
+ {foreach,
+ fun setup/0,
+ fun cleanup/1,
+ [
+ {"Can store file locally", fun store_locally/0},
+ {"Can retrieve files locally",fun store_retrieve_locally/0},
+ {"Can delete files locally",fun store_delete_locally/0}
+ ]}.
+
+file_layer_distributed_test_() ->
+ {foreach,
+ fun distr_setup/0,
+ fun distr_cleanup/1,
+ [
+ fun can_store_distributed/1,
+ fun can_retrieve_distributed/1,
+ fun can_delete_distributed/1,
+ fun must_retain/1
+ ]}.
+
+must_retain([N1,N2]) ->
+ {"Must retain information between node kill and node restart",
+ fun() ->
+ ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
+ ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
+ Path = create_file_at(?NODE_ROOT),
+ FileA = #wn_file{id = "File1",file = Path,resides = N1},
+ FileB = #wn_file{id = "File2",file = Path,resides = N2},
+ ok = wn_file_layer:add_file(FileA),
+ ok = wn_file_layer:add_file(FileB),
+ [ResA,ResB] = wn_file_layer:list_files(),
+ ?assertEqual(filename:basename(Path),filename:basename(ResA#wn_file.file)),
+ ?assertEqual(filename:basename(Path),filename:basename(ResB#wn_file.file)),
+ % Kill N1 and N2
+ slave:stop(N1),
+ slave:stop(N2),
+ ?assertEqual([],wn_file_layer:list_files()),
+ % Restart and check
+ Host = list_to_atom(inet_db:gethostname()),
+ Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
+ {ok,N1} = slave:start(Host,n1,Args),
+ {ok,N2} = slave:start(Host,n2,Args),
+ rpc:call(N1,net_kernel,connect_node,[N2]),
+ ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
+ ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
+ ?assertEqual([ResA,ResB],wn_file_layer:list_files())
+ end}.
+
+can_delete_distributed([N1,N2]) ->
+ {"Can delete file distributed",
+ fun() ->
+ ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
+ ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
+ Path = create_file_at(?NODE_ROOT),
+ Id = "AddedFile1",
+ File = #wn_file{id = Id,file = Path,resides = N1},
+ ok = wn_file_layer:add_file(File),
+ ?assertEqual(ok,wn_file_layer:delete_file(N1,Id)),
+ ?assertEqual([],wn_file_layer:list_files()),
+ % Check change in file-system
+ ExpectedPath = ?NODE_ROOT++"/"
+ ++atom_to_list(node())
+ ++"/"++Id++"/"++filename:basename(Path),
+ ?assertEqual(false,filelib:is_file(ExpectedPath)),
+ ?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
+ ++atom_to_list(N1)++"/"
+ ++Id++"/"))
+ end}.
+
+can_retrieve_distributed([N1,N2]) ->
+ {"Can retrieve file distributed",
+ fun() ->
+ ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
+ ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
+ Path = create_file_at(?NODE_ROOT),
+ Id = "AddedFile1",
+ File = #wn_file{id = Id,file = Path,resides = N1},
+ ok = wn_file_layer:add_file(File),
+ {ok,OriginalData} = file:read_file(Path),
+ ?assertEqual(ok,file:delete(Path)),
+ % Retrieve and see change in file system
+ {ok,FileName} = wn_file_layer:retrieve_file(N1,Id),
+ ?assertEqual(true,filelib:is_file(FileName)),
+ {ok,NewData} = file:read_file(FileName),
+ ?assertEqual(OriginalData,NewData),
+ ?assertEqual(filename:basename(Path),FileName),
+ file:delete(FileName)
+ end}.
+
+can_store_distributed([N1,N2]) ->
+ {"Can store file distributed",
+ fun() ->
+ ?assertMatch({ok,_},rpc:call(N1,wn_file_layer,start_link,[?NODE_ROOT])),
+ ?assertMatch({ok,_},rpc:call(N2,wn_file_layer,start_link,[?NODE_ROOT])),
+ Path = create_file_at(?NODE_ROOT),
+ Id = "AddedFile1",
+ File = #wn_file{id = Id,file = Path,resides = N1},
+ ok = wn_file_layer:add_file(File),
+ [Res] = wn_file_layer:list_files(),
+ [Res2] = rpc:call(N1,wn_file_layer,list_files,[]),
+ [Res3] = rpc:call(N2,wn_file_layer,list_files,[]),
+ ?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
+ ?assertEqual(Id,Res#wn_file.id),
+ ?assertEqual(N1,Res#wn_file.resides),
+ ?assertEqual(Res,Res2),
+ ?assertEqual(Res2,Res3),
+ % Check change in file-system
+ ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(N1)++"/Files/"++
+ Id++"/"++filename:basename(Path),
+ ?assertEqual(true,filelib:is_file(Path)),
+ ?assertEqual(true,filelib:is_file(ExpectedPath))
+ end}.
+
+store_locally() ->
+ Path = create_file_at(?NODE_ROOT),
+ Id = "AddedFile1",
+ File = #wn_file{id = Id,file = Path,resides = node()},
+ ok = wn_file_layer:add_file(File),
+ [Res] = wn_file_layer:list_files(),
+ ?assertEqual(filename:basename(Path),filename:basename(Res#wn_file.file)),
+ ?assertEqual(Id,Res#wn_file.id),
+ ?assertEqual(node(),Res#wn_file.resides),
+ % Check change in file-system
+ ExpectedPath = ?NODE_ROOT++"/"++atom_to_list(node())++"/Files/"++
+ Id++"/"++filename:basename(Path),
+ ?assertEqual(true,filelib:is_file(Path)),
+ ?assertEqual(true,filelib:is_file(ExpectedPath)).
+
+store_retrieve_locally() ->
+ Path = create_file_at(?NODE_ROOT),
+ Id = "AddedFile1",
+ File = #wn_file{id = Id,file = Path,resides = node()},
+ ok = wn_file_layer:add_file(File),
+ {ok,OriginalData} = file:read_file(Path),
+ ?assertEqual(ok,file:delete(Path)),
+ % Retrieve and see change in file system
+ {ok,FileName} = wn_file_layer:retrieve_file(node(),Id),
+ ?assertEqual(true,filelib:is_file(FileName)),
+ {ok,NewData} = file:read_file(FileName),
+ ?assertEqual(OriginalData,NewData),
+ ?assertEqual(filename:basename(Path),FileName),
+ file:delete(FileName).
+
+store_delete_locally() ->
+ Path = create_file_at(?NODE_ROOT),
+ Id = "AddedFile1",
+ File = #wn_file{id = Id,file = Path,resides = node()},
+ ok = wn_file_layer:add_file(File),
+ ?assertEqual(ok,wn_file_layer:delete_file(node(),Id)),
+ ?assertEqual([],wn_file_layer:list_files()),
+ % Check change in file-system
+ ExpectedPath = ?NODE_ROOT++"/"
+ ++atom_to_list(node())
+ ++"/Files/"++Id++"/"++filename:basename(Path),
+ ?assertEqual(false,filelib:is_file(ExpectedPath)),
+ ?assertEqual(false,filelib:is_dir(?NODE_ROOT++"/"
+ ++atom_to_list(node())++"/Files/"
+ ++Id++"/")).
+
+%%------------------------------------------------------------------
+create_file_at(X) ->
+ Path = X++"EUnitFile",
+ ok = filelib:ensure_dir(X),
+ ok = file:write_file(Path,<<1,2,3>>),
+ Path.
+
+clean_up(X) ->
+ case filelib:is_dir(X) of
+ true ->
+ {ok,Files} = file:list_dir(X),
+ lists:foreach(
+ fun(File) ->
+ clean_up(X++"/"++File)
+ end,Files),
+ file:del_dir(X);
+ false ->
+ ok = file:delete(X)
+ end.
+
+setup() ->
+ {ok,_} = net_kernel:start([eunit_resource,shortnames]),
+ erlang:set_cookie(node(),eunit),
+ {ok,_} = wn_file_layer:start_link(?NODE_ROOT).
+
+cleanup(_) ->
+ clean_up(?NODE_ROOT),
+ ok = net_kernel:stop(),
+ ok = wn_file_layer:stop().
+
+distr_setup() ->
+ setup(),
+ Host = list_to_atom(inet_db:gethostname()),
+ Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
+ {ok,N1} = slave:start(Host,n1,Args),
+ {ok,N2} = slave:start(Host,n2,Args),
+ rpc:call(N1,net_kernel,connect_node,[N2]),
+ [N1,N2].
+
+distr_cleanup([N1,N2]) ->
+ rpc:call(N1,wn_file_layer,stop,[]),
+ rpc:call(N2,wn_file_layer,stop,[]),
+ slave:stop(N1),
+ slave:stop(N2),
+ cleanup(nothing).
252 test/wn_job_layer_tests.erl
@@ -0,0 +1,252 @@
+%%% @author Gianfranco <zenon@zen.local>
+%%% @copyright (C) 2010, Gianfranco
+%%% Created : 26 Dec 2010 by Gianfranco <zenon@zen.local>
+-module(wn_job_layer_tests).
+-include_lib("eunit/include/eunit.hrl").
+-include("include/worker_net.hrl").
+-define(NODE_ROOT,
+ "/Users/zenon/ErlangBlog/worker_net-0.1/node_root/").
+
+local_test_() ->
+ {foreach,
+ fun setup/0,
+ fun cleanup/1,
+ [
+ {"Can register locally", fun register_locally/0},
+ {"Executed locally", fun executed_locally/0},
+ {"Executed queue", fun executed_queue/0},
+ {"Queueus on resource type amount", fun queues_on_resource_types_amount/0},
+ {"Canceled in queue", fun cancel_before_running/0},
+ {"Done Job Stored in file layer", fun stored_in_file_layer/0},
+ {"Done Job canceled", fun done_job_deleted/0}
+ ]}.
+
+done_job_deleted() ->
+ wn_resource_layer:register(#wn_resource{name = "Laptop",
+ type = [{perl,1}],
+ resides = node()
+ }),
+ Path = create_file_at(?NODE_ROOT),
+ File1 = #wn_file{id = "File1",file = Path,resides = node()},
+ Job1 = #wn_job{id = "JobId",
+ files = [File1],
+ resources = [perl],
+ commands = ["perl -e 'print(\"HelloWorld\n\")'"],
+ timeout = 1000
+ },
+ ok = wn_job_layer:register(Job1),
+ ok = wn_job_layer:stream(user,"JobId"),
+ timer:sleep(500),
+ ok = wn_job_layer:delete("JobId"),
+ ?assertMatch([#wn_file{id="File1"}],wn_file_layer:list_files()),
+ ?assertEqual([],wn_job_layer:list_all_jobs()),
+ ok.
+
+stored_in_file_layer() ->
+ %% (1)
+ wn_resource_layer:register(#wn_resource{name = "Laptop",
+ type = [{laptop,1}],
+ resides = node()
+ }),
+ %% (2)
+ Path = create_file_at(?NODE_ROOT),
+ %% (3)
+ File1 = #wn_file{id = "File1",file = Path,resides = node()},
+ Job1 = #wn_job{id = "JobId",
+ files = [File1],
+ resources = [laptop],
+ commands = ["cat EunitFile","touch MadeFile"],
+ timeout = 100
+ },
+ %% (4)
+ ok = wn_job_layer:register(Job1),
+ %% (5)
+ timer:sleep(500),
+
+ %% (6)
+ {ok,T} = wn_job_layer:finished_at("JobId"),
+
+ %% (7)
+ TimeSuffix = wn_util:time_marker_to_string(T),
+ ExpectedFileName = "JobId_result_"++TimeSuffix++".tgz",
+ ExpectedFileId = "JobId_result",
+ ExpectedFiles = [F1,F2,F3] = ["EUnitFile","MadeFile","Log.txt"],
+
+ %% (8)
+ {ok,ResultId} = wn_job_layer:stored_result("JobId"),
+ ?assertEqual(ExpectedFileId,ResultId),
+
+ %% (9)
+ [_,Res] = wn_file_layer:list_files(),
+ ?assertEqual(ExpectedFileName,filename:basename(Res#wn_file.file)),
+ ?assertEqual(ExpectedFileId,Res#wn_file.id),
+
+ %% (10)
+ {ok,Tar} = wn_file_layer:retrieve_file(node(),ResultId),
+ ?assertMatch({ok,ExpectedFiles},erl_tar:table(Tar,[compressed])),
+
+ %% (11)
+ erl_tar:extract(Tar,[{files,[F3]},compressed]),
+ {ok,IoDev} = file:open("local_stream.txt",[write]),
+ ok = wn_job_layer:stream(IoDev,"JobId"),
+ timer:sleep(100),
+ file:close(IoDev),
+ {ok,LocalStreamBin} = file:read_file("local_stream.txt"),
+ {ok,JobLogBin} = file:read_file(F3),
+ ?assertEqual(LocalStreamBin,JobLogBin),
+
+ %% (12)
+ erl_tar:extract(Tar,[{files,[F1]},compressed]),
+ {ok,LocalEunitBin} = file:read_file(Path),
+ {ok,JobEunitBin} = file:read_file(F1),
+ ?assertEqual(LocalEunitBin,JobEunitBin),
+
+ %% (13)
+ erl_tar:extract(Tar,[{files,[F2]},compressed]),
+ ?assertEqual({ok,<<>>},file:read_file(F2)),
+
+ %% (14)
+ ok = file:delete("local_stream.txt"),
+ ok = file:delete(F1),
+ ok = file:delete(F2),
+ ok = file:delete(F3),
+ ok = file:delete(Tar).
+
+
+queues_on_resource_types_amount() ->
+ wn_resource_layer:register(#wn_resource{name = "Laptop",
+ type = [{a,0},{b,0}],
+ resides = node()
+ }),
+ Queued = fun() -> wn_resource_layer:queued(node(),"Laptop") end,
+ Job1 = #wn_job{id = "JobId",files = [],resources = [a]},
+ Job2 = Job1#wn_job{id = "JobId2", resources = [a]},
+ Job3 = Job1#wn_job{id = "JobId3", resources = [a,b]},
+
+ wn_job_layer:register(Job1),
+ ?assertMatch({ok,[{a,[Job1]},{b,[]}]},Queued()),
+
+ wn_job_layer:register(Job2),
+ ?assertMatch({ok,[{a,[Job1, Job2]},{b,[]}]},Queued()),
+
+ wn_job_layer:register(Job3),
+ ?assertMatch({ok,[{b,[Job3]},
+ {a,[Job1,Job2,Job3]}
+ ]},Queued()).
+
+cancel_before_running() ->
+ wn_resource_layer:register(#wn_resource{name = "Laptop",
+ type = [{'os-x',0}],
+ resides = node()
+ }),
+ Job1 = #wn_job{id = "JobId",
+ files = [],
+ resources = ['os-x'],
+ commands = ["echo hello"]},
+ ?assertEqual(ok,wn_job_layer:register(Job1)),
+ ?assertEqual({ok,[{'os-x',[Job1]}]},wn_resource_layer:queued(node(),"Laptop")),
+ [Res] = wn_job_layer:list_all_jobs(),
+ ?assertEqual("JobId",Res#wn_job.id),
+ ?assertEqual(['os-x'],Res#wn_job.resources),
+ ?assertEqual([],Res#wn_job.files),
+ ?assertEqual(["echo hello"],Res#wn_job.commands),
+ ?assertEqual(ok,wn_job_layer:cancel("JobId")),
+ [] = wn_job_layer:list_all_jobs(),
+ ?assertEqual({ok,[{'os-x',[]}]},wn_resource_layer:queued(node(),"Laptop")).
+
+executed_queue() ->
+ wn_resource_layer:register(#wn_resource{name = "Laptop",
+ type = [{'os-x',1}],
+ resides = node()
+ }),
+ Path = create_file_at(?NODE_ROOT),
+ File1 = #wn_file{id = "File1",file = Path,resides = node()},
+ Job1 = #wn_job{id = "JobId",
+ files = [File1],
+ resources = ['os-x'],
+ commands = ["file EunitFile"],
+ timeout = 100
+ },
+ Job2 = Job1#wn_job{id = "JobId2",
+ files = [File1#wn_file{id="File"}],
+ commands = ["cat EUnitFile"]},
+ ?assertEqual(ok,wn_job_layer:register(Job1)),
+ ?assertEqual(ok,wn_job_layer:register(Job2)),
+ ok = wn_job_layer:stream(user,"JobId"),
+ ok = wn_job_layer:stream(user,"JobId2"),
+ timer:sleep(100),
+ ?assertEqual({ok,["EunitFile: ASCII text"]},wn_job_layer:result("JobId")),
+ timer:sleep(1000),
+ ?assertEqual({ok,["1,2,3"]},wn_job_layer:result("JobId2")),
+ {ok,T1} = wn_job_layer:finished_at("JobId"),
+ {ok,T2} = wn_job_layer:finished_at("JobId2"),
+ ?assertEqual(true,T1 < T2).
+
+executed_locally() ->
+ wn_resource_layer:register(#wn_resource{name = "Laptop",
+ type = [{'os-x',infinity}],
+ resides = node()
+ }),
+ Path = create_file_at(?NODE_ROOT),
+ File1 = #wn_file{id = "File1",file = Path,resides = node()},
+ Job1 = #wn_job{id = "JobId",
+ files = [File1],
+ resources = ['os-x'],
+ commands = ["more EUnitFile"],
+ timeout = 100
+ },
+ ?assertEqual(ok,wn_job_layer:register(Job1)),
+ ok = wn_job_layer:stream(user,"JobId"),
+ timer:sleep(1000),
+ ?assertEqual({ok,["1,2,3"]},wn_job_layer:result("JobId")).
+
+register_locally() ->
+ Path = create_file_at(?NODE_ROOT),
+ File1 = #wn_file{id = "File1",file = Path,resides = node()},
+ File2 = #wn_file{id = "File2",file = Path,resides = node()},
+ Job = #wn_job{id = "JobId",
+ files = [File1,File2],
+ resources = ['non-existent'],
+ commands = ["ls -l"]
+ },
+ ?assertEqual(ok,wn_job_layer:register(Job)),
+ [Res] = wn_job_layer:list_all_jobs(),
+ ?assertEqual("JobId",Res#wn_job.id),
+ ?assertEqual(['non-existent'],Res#wn_job.resources),
+ ?assertEqual([File1,File2],Res#wn_job.files),
+ ?assertEqual(["ls -l"],Res#wn_job.commands).
+
+%% -----------------------------------------------------------------
+setup() ->
+ {ok,_} = net_kernel:start([eunit_resource,shortnames]),
+ erlang:set_cookie(node(),eunit),
+ {ok,_} = wn_file_layer:start_link(?NODE_ROOT),
+ {ok,_} = wn_resource_layer:start_link(?NODE_ROOT),
+ {ok,_} = wn_job_layer:start_link(),
+ ok.
+
+cleanup(_) ->
+ clean_up(?NODE_ROOT),
+ ok = net_kernel:stop(),
+ ok = wn_file_layer:stop(),
+ ok = wn_resource_layer:stop(),
+ ok = wn_job_layer:stop().
+
+create_file_at(X) ->
+ Path = X++"EUnitFile",
+ ok = filelib:ensure_dir(X),
+ ok = file:write_file(Path,"1,2,3\n"),
+ Path.
+
+clean_up(X) ->
+ case filelib:is_dir(X) of
+ true ->
+ {ok,Files} = file:list_dir(X),
+ lists:foreach(
+ fun(File) ->
+ clean_up(X++"/"++File)
+ end,Files),
+ file:del_dir(X);
+ false ->
+ ok = file:delete(X)
+ end.
169 test/wn_resource_layer_tests.erl
@@ -0,0 +1,169 @@
+%%% @author Gianfranco <zenon@zen.local>
+%%% @copyright (C) 2010, Gianfranco
+%%% Created : 10 Dec 2010 by Gianfranco <zenon@zen.local>
+-module(wn_resource_layer_tests).
+-include_lib("eunit/include/eunit.hrl").
+-include("include/worker_net.hrl").
+-define(NODE_ROOT,
+ "/Users/zenon/ErlangBlog/worker_net-0.1/node_root/").
+
+local_resource_test_() ->
+ {foreach,
+ fun setup/0,
+ fun cleanup/1,
+ [
+ {"Can register resources locally",fun register_locally/0}
+ ]}.
+
+distr_resource_test_() ->
+ {foreach,
+ fun distr_setup/0,
+ fun distr_cleanup/1,
+ [
+ fun register_distributed/1,
+ fun register_restart_register/1,
+ fun register_deregister/1
+ ]
+ }.
+
+
+register_locally() ->
+ ResourceA = #wn_resource{name = "macbook pro laptop",
+ type = [{'os-x',1},{bsd,1}],
+ resides = node()},
+ ResourceB = #wn_resource{name = "erlang runtime system",
+ type = [{erlang,4}],
+ resides = node()},
+ ok = wn_resource_layer:register(ResourceA),
+ ok = wn_resource_layer:register(ResourceB),
+ List = lists:sort(wn_resource_layer:list_resources()),
+ resource_processes_are_alive([ResourceA,ResourceB],List).
+
+register_distributed([N1,N2]) ->
+ {"Can Register Distributed",
+ fun() ->
+ ResourceA = #wn_resource{name = "erlang R14",
+ type = [{erlang,infinity}],
+ resides = N1},
+ ResourceB = #wn_resource{name = "os-x macbook pro",
+ type = [{'os-x',1}],
+ resides = N2},
+ ResourceC = #wn_resource{name = "g++",
+ type = [{'g++',1}],
+ resides = node()},
+ ok = wn_resource_layer:register(ResourceA),
+ ok = wn_resource_layer:register(ResourceB),
+ ok = wn_resource_layer:register(ResourceC),
+ ListA = lists:sort(wn_resource_layer:list_resources()),
+ ListB = lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])),
+ ListC = lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])),
+ ?assertEqual(3,length(ListA)),
+ ?assertEqual(ListB,ListA),
+ ?assertEqual(ListB,ListC),
+ resource_processes_are_alive([ResourceA,ResourceB,ResourceC],ListA)
+ end}.
+
+register_restart_register([N1,N2]) ->
+ {"Can Register, Restart and Register",
+ fun() ->
+ ResourceA = #wn_resource{name = "erlang R14",
+ type = [{erlang,infinity}],
+ resides = N1},
+ ResourceB = #wn_resource{name = "os-x macbook pro",
+ type = [{'os-x',1}],
+ resides = N2},
+ ResourceC = #wn_resource{name = "g++",
+ type = [{'g++',1}],
+ resides = node()},
+ ok = wn_resource_layer:register(ResourceA),
+ ok = wn_resource_layer:register(ResourceB),
+ ok = wn_resource_layer:register(ResourceC),
+ M = fun() -> lists:sort(wn_resource_layer:list_resources()) end,
+ S1 = fun() -> lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])) end,
+ S2 = fun() -> lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])) end,
+ ?assertEqual(3,length(M())),
+ ?assertEqual(M(),S1()),
+ ?assertEqual(S1(),S2()),
+ resource_processes_are_alive([ResourceA,ResourceB,ResourceC],M()),
+ rpc:call(N1,wn_resource_layer,stop,[]),
+ ?assertEqual(M(),S2()),
+ resource_processes_are_alive([ResourceB,ResourceC],M()),
+ rpc:call(N2,wn_resource_layer,stop,[]),
+ resource_processes_are_alive([ResourceC],M()),
+ {ok,_} = rpc:call(N1,wn_resource_layer,start_link,[?NODE_ROOT]),
+ {ok,_} = rpc:call(N2,wn_resource_layer,start_link,[?NODE_ROOT]),
+ ok = wn_resource_layer:register(ResourceA),
+ resource_processes_are_alive([ResourceA,ResourceC],M()),
+ ok = wn_resource_layer:register(ResourceB),
+ resource_processes_are_alive([ResourceA,ResourceB,ResourceC],M())
+ end}.
+
+register_deregister([N1,N2]) ->
+ {"Can Register, Deregister and Register",
+ fun() ->
+ M = fun() -> lists:sort(wn_resource_layer:list_resources()) end,
+ S1 = fun() -> lists:sort(rpc:call(N1,wn_resource_layer,list_resources,[])) end,
+ S2 = fun() -> lists:sort(rpc:call(N2,wn_resource_layer,list_resources,[])) end,
+ resource_processes_are_alive([],S1()),
+ ResourceA = #wn_resource{name = "A",type = [{a,1}],resides = N1},
+ ResourceB = #wn_resource{name = "B",type = [{b,2}],resides = N2},
+ ResourceC = #wn_resource{name = "C",type = [{c,3}],resides = node()},
+ ok = wn_resource_layer:register(ResourceA),
+ ok = wn_resource_layer:register(ResourceB),
+ ok = wn_resource_layer:register(ResourceC),
+ resource_processes_are_alive([ResourceA,ResourceB,ResourceC],M()),
+ ?assertEqual(ok,wn_resource_layer:deregister(N1,"A")),
+ resource_processes_are_alive([ResourceB,ResourceC],S1()),
+ ?assertEqual(ok,wn_resource_layer:deregister(N2,"B")),
+ resource_processes_are_alive([ResourceC],S2()),
+ ?assertEqual(ok,wn_resource_layer:deregister(node(),"C")),
+ ?assertEqual([],M()),
+ ?assertEqual([],S1()),
+ ?assertEqual([],S2()),
+ ok = wn_resource_layer:register(ResourceA),
+ ok = wn_resource_layer:register(ResourceB),
+ ok = wn_resource_layer:register(ResourceC),
+ resource_processes_are_alive([ResourceA,ResourceB,ResourceC],M())
+ end}.
+
+%% -----------------------------------------------------------------
+resource_processes_are_alive([],_) -> ok;
+resource_processes_are_alive([Expected|Tail],List) ->
+ #wn_resource{name = Name, type = Type, resides = Resides} = Expected,
+ Filtered =
+ lists:filter(
+ fun(#wn_resource{name=N,type=T,resides=R}) ->
+ N == Name andalso T == Type andalso R == Resides
+ end,List),
+ ?assertMatch([_X],Filtered),
+ [T] = Filtered,
+ ?assertEqual(true,rpc:call(node(T#wn_resource.pid),erlang,is_process_alive,
+ [T#wn_resource.pid])),
+ resource_processes_are_alive(Tail,List).
+
+setup() ->
+ {ok,_} = net_kernel:start([eunit_resource,shortnames]),
+ erlang:set_cookie(node(),eunit),
+ {ok,_} = wn_resource_layer:start_link(?NODE_ROOT).
+
+cleanup(_) ->
+ ok = net_kernel:stop(),
+ ok = wn_resource_layer:stop().
+
+distr_setup() ->
+ setup(),
+ Host = list_to_atom(inet_db:gethostname()),
+ Args = " -pa "++hd(code:get_path())++" -setcookie eunit",
+ {ok,N1} = slave:start(Host,n1,Args),
+ {ok,N2} = slave:start(Host,n2,Args),
+ true = rpc:call(N1,net_kernel,connect_node,[N2]),
+ {ok,_} = rpc:call(N1,wn_resource_layer,start_link,[?NODE_ROOT]),
+ {ok,_} = rpc:call(N2,wn_resource_layer,start_link,[?NODE_ROOT]),
+ [N1,N2].
+
+distr_cleanup([N1,N2]) ->
+ rpc:call(N1,wn_resource_layer,stop,[]),
+ rpc:call(N2,wn_resource_layer,stop,[]),
+ slave:stop(N1),
+ slave:stop(N2),
+ cleanup(nothing).

0 comments on commit a442260

Please sign in to comment.
Something went wrong with that request. Please try again.