0
%%%-------------------------------------------------------------------
0
-%%% File : /
Users/dfayram/Projects/concilium/elibs/resource_manager.erl
0
+%%% File : /
fuzed/elibs/resource_manager.erl
0
%%% Author : David Fayram
0
%%%-------------------------------------------------------------------
0
-module(resource_manager).
0
-behaviour(gen_server).
0
--export([start_link/3, start/3,nodes/0,nodecount/0,change_nodecount/1,cycle/0,cycle/1]).
0
+-export([start_link/5,start/5,nodes/0,nodecount/0,change_spec/1,register_nodes/0,cycle/0,
0
-%% gen_server callback
exports
0
+%% gen_server callback
s
0
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
0
- terminate/2, code_change/3]).
0
-%% Erlang records are ugly.
0
--record(state, {generator = fun() -> undefined end,
0
- terminator = fun(_) -> undefined end,
0
- term_hook = fun(_) -> undefined end
0
+ terminate/2, code_change/3, start_fresh_nodes/0, stop_all_nodes/0]).
0
+ preproc = fun() -> undefined end,
0
+ postproc = fun(_) -> undefined end,
0
-%% External call functions
0
+%%====================================================================
0
+%%====================================================================
0
+%%--------------------------------------------------------------------
0
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
0
+%% Description: Starts the server
0
+%%--------------------------------------------------------------------
0
-% Note the local server, one of these should run on every
0
-% node serving up rails responders.
0
-start_link(Generator, Terminator, NumNodes) ->
0
- gen_server:start_link({local, ?MODULE}, ?MODULE, [Generator, Terminator, NumNodes], []).
0
-start(Generator, Terminator, NumNodes) ->
0
- gen_server:start({local, ?MODULE}, ?MODULE, [Generator, Terminator, NumNodes], []).
0
+start_link(Master, Nodes, Preproc, Postproc, Timeout) ->
0
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [Master, Nodes,Preproc,Postproc,Timeout], []).
0
+start(Master, Nodes, Preproc, Postproc, Timeout) ->
0
+ gen_server:start({local, ?MODULE}, ?MODULE, [Master, Nodes,Preproc,Postproc,Timeout], []).
0
+% Returns the starting specification for this resource manager.
0
+% @spec specification() -> {string(), int()}
0
+% Returns a list of nodes maintained in this ResourceManager
0
+% @spec nodes() -> [pid()]
0
nodes() -> gen_server:call(?MODULE, nodes).
0
+% Returns a count of all nodes maintained currently.
0
+% @spec nodecount() -> int()
0
nodecount() -> gen_server:call(?MODULE, nodecount).
0
-change_nodecount(NewNodecount) -> gen_server:cast(?MODULE, {change_nodecount, NewNodecount}).
0
-cycle() -> gen_server:cast(?MODULE, cycle).
0
-cycle(Node) -> gen_server:cast({?MODULE, Node}, cycle).
0
-%% GEN_SERVER callbacks.
0
-init([Generator, Terminator, NumNodes]) ->
0
- process_flag(trap_exit, true),
0
- Nodes = spawn_nodes(Generator, NumNodes),
0
- {ok, #state{generator = Generator, nodecount = NumNodes,
0
- nodes = Nodes, terminator = Terminator}}.
0
-handle_call(term_hook, _From, State) ->
0
- {reply, State#state.term_hook, State};
0
-handle_call({term_hook, Hook}, _From, State) when is_function(Hook, 1) ->
0
- {reply, State#state.term_hook, State#state{term_hook = Hook}};
0
+% Changes the current nodecount. This may result in processes starting or stopping.
0
+% @spec change_nodecount(int()) -> ok
0
+change_spec(NewSpec) -> gen_server:cast(?MODULE, {change_spec, NewSpec}).
0
+% Register the nodes with the master.
0
+% @spec register_nodes() -> ok
0
+register_nodes() -> gen_server:cast(?MODULE, register_nodes).
0
+% Adds a node. Not for external use.
0
+add_node(Node, Cmd) -> gen_server:cast(?MODULE, {add, Cmd, Node}).
0
+% Restarts all processes managed by the current system. Good for un-wedging a wedged system.
0
+cycle() -> gen_server:cast(?MODULE, stop_all_nodes), gen_server:cast(?MODULE, start_fresh_nodes).
0
+stop_all_nodes() -> gen_server:cast(?MODULE, stop_all_nodes).
0
+start_fresh_nodes() -> gen_server:cast(?MODULE, start_fresh_nodes).
0
+%%====================================================================
0
+%% gen_server callbacks
0
+%%====================================================================
0
+%%--------------------------------------------------------------------
0
+%% Function: init(Args) -> {ok, State} |
0
+%% {ok, State, Timeout} |
0
+%% Description: Initiates the server
0
+%%--------------------------------------------------------------------
0
+init([Master, Nodes, Preproc, Postproc, Timeout]) ->
0
+ process_flag(trap_exit, true),
0
+ spawn_nodes(Nodes, Preproc, Timeout, dict:new()),
0
+ {ok, #state{spec = Nodes,
0
+%%--------------------------------------------------------------------
0
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
0
+%% {reply, Reply, State, Timeout} |
0
+%% {noreply, State, Timeout} |
0
+%% {stop, Reason, Reply, State} |
0
+%% {stop, Reason, State}
0
+%% Description: Handling call messages
0
+%%--------------------------------------------------------------------
0
handle_call(nodecount,_From,State) ->
0
- {reply,
State#state.nodecount, State};
0
+ {reply,
length(dict:fetch_keys(State#state.nodes)), State};
0
handle_call(nodes, _From, State) ->
0
- {reply, State#state.nodes, State}.
0
-handle_cast(cycle, State) ->
0
- drop_nodes(State#state.terminator, State#state.nodes),
0
- {noreply, State#state{nodes=spawn_nodes(State#state.generator, State#state.nodecount)}};
0
-handle_cast({change_nodecount, NewCount}, S) when is_number(NewCount) ->
0
- Count = S#state.nodecount,
0
- S#state{nodecount = NewCount,
0
- nodes = spawn_nodes(S#state.generator, NewCount - Count) ++ S#state.nodes}};
0
- {ToKill, ToKeep} = lists:split(NewCount - Count, S#state.nodes),
0
- drop_nodes(S#state.terminator, ToKill),
0
- {noreply, S#state{nodecount=NewCount, nodes=ToKeep}};
0
-handle_info({'EXIT', Pid, _Reason}, S) ->
0
- Term = S#state.terminator,
0
- Membership = lists:any(fun(X) -> X =:= Pid end, S#state.nodes),
0
- Res = lists:delete(Pid, S#state.nodes),
0
- NewNode = spawn_linked_node(S#state.generator),
0
- {noreply, S#state{nodes=[NewNode|Res]}};
0
+ {reply, dict:fetch_keys(State#state.nodes), State};
0
+handle_call(spec, _From, State) ->
0
+ {reply, State#state.spec, State}.
0
+%%--------------------------------------------------------------------
0
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
0
+%% {noreply, State, Timeout} |
0
+%% {stop, Reason, State}
0
+%% Description: Handling cast messages
0
+%%--------------------------------------------------------------------
0
+% Node, Setup, Finisher, Registry
0
+handle_cast({add, Cmd, Node}, State) ->
0
+ link(Node), % We wuv woo!
0
+ NewDict = add_node_record(Node, Cmd, State#state.nodes),
0
+ {noreply, State#state{nodes=NewDict}};
0
+handle_cast(start_fresh_nodes, State) ->
0
+ spawn_nodes(State#state.spec, State#state.preproc, State#state.timeout, State#state.nodes),
0
+handle_cast(stop_all_nodes, State) ->
0
+ NodeDict = State#state.nodes,
0
+ RenewNodes = fun(N) -> cease_node(N,State#state.postproc,NodeDict) end,
0
+ lists:foreach(RenewNodes, dict:fetch_keys(NodeDict)),
0
+ {noreply, State#state{nodes=dict:new()}};
0
+handle_cast({change_spec, NewSpec}, State) ->
0
+ {noreply, State#state{spec=NewSpec}};
0
+handle_cast(register_nodes, State) ->
0
+ Nodes = State#state.nodes,
0
+ lists:foreach(fun(X) -> run_call(State#state.preproc, X) end, dict:fetch_keys(Nodes)),
0
+ error_logger:info_msg("All ports reregistered.~n"),
0
+%%--------------------------------------------------------------------
0
+%% Function: handle_info(Info, State) -> {noreply, State} |
0
+%% {noreply, State, Timeout} |
0
+%% {stop, Reason, State}
0
+%% Description: Handling all non call/cast messages
0
+%%--------------------------------------------------------------------
0
+handle_info({'EXIT', Pid, Reason}, State) ->
0
+ case dict:is_key(Pid, State#state.nodes) of
0
+ error_logger:warning_msg("PortWrapper ~p was terminated due to: ~p. Restarting & Heating.", [Pid,Reason]),
0
+ NewNodeDict = restart_dead_node(Pid,State#state.preproc, State#state.postproc,State#state.timeout,State#state.nodes),
0
+ {noreply, State#state{nodes=NewNodeDict}};
0
-
io:format("Got INFO ~p~n", [Any]),
0
+
error_logger:info_msg("Got INFO ~p~n", [Any]),
0
+%%--------------------------------------------------------------------
0
+%% Function: terminate(Reason, State) -> void()
0
+%% Description: This function is called by a gen_server when it is about to
0
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
0
+%% cleaning up. When it returns, the gen_server terminates with Reason.
0
+%% The return value is ignored.
0
+%%--------------------------------------------------------------------
0
terminate(_Reason, _State) ->
0
+%%--------------------------------------------------------------------
0
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
0
+%% Description: Convert process state when code is changed
0
+%%--------------------------------------------------------------------
0
code_change(_OldVsn, State, _Extra) ->
0
+%%--------------------------------------------------------------------
0
+%%--------------------------------------------------------------------
0
+listify(X) when is_list(X) -> X;
0
-spawn_linked_node(Generator) ->
0
+run_call({Module, Function}, Args) -> apply(Module, Function, listify(Args));
0
+run_call(Function,Args) when is_function(Function) -> apply(Function, listify(Args)).
0
-spawn_nodes(Generator,NumNodes) ->
0
- spawn_nodes(Generator,NumNodes,[]).
0
+% @spec spawn_linked_node(Cmd, Call) -> {process(), Cmd}
0
+spawn_unlinked_node(Cmd, Call, Timeout) ->
0
+ Port = port_wrapper:wrap(Cmd, Timeout),
0
+ case run_call(Call, Port) of
0
-spawn_nodes(_Generator,0,Acc) -> Acc;
0
-spawn_nodes(Generator,NumNodes,Acc) -> spawn_nodes(Generator,NumNodes - 1, [spawn_linked_node(Generator)|Acc]).
0
+add_node_record(Node, Cmd, Registry) -> dict:store(Node, Cmd, Registry).
0
+erase_node_record(Node, Registry) -> dict:erase(Node, Registry).
0
+% @spec spawn_nodes(Nodes::[string()], Registerer::fun(), Registry::dict()) -> dict()
0
+spawn_nodes({Cmd, Num}, Setup, Timeout, Registry) -> spawn_nodes(lists:duplicate(Num, Cmd), Setup, Timeout, Registry);
0
+spawn_nodes(Nodes, Setup, Timeout, _Registry) when is_list(Nodes) ->
0
+ F = fun(Cmd) -> spawn(
0
+ case spawn_unlinked_node(Cmd, Setup, Timeout) of
0
+ error_logger:error_msg("Failed to start node with command: ~n~p~n. Resource manager cannot function in this state, system is idle.~nPossible causes include a crash on startup or a timeout on startup. Make sure nodes can start on this machine!",
0
+ {Port, Cmd} -> resource_manager:add_node(Port, Cmd)
0
+ lists:foreach(F, Nodes).
0
+restart_dead_node(Node, Setup, Finisher, Timeout, Registry) ->
0
+ % Node is dead, so it should be removed everywhere that
0
+ % cares about it, no need to explicitly remove it from
0
+ apply(Finisher, [Node]),
0
+ Cmd = dict:fetch(Node, Registry),
0
+ NegReg = erase_node_record(Node, Registry),
0
+ spawn_nodes([Cmd], Setup, Timeout, NegReg),
0
+cease_node(Node, Finisher, Registry) ->
0
+ case Finisher(Node) of
0
+ {error, Why} -> throw(Why)
0
+ _:X -> error_logger:error_msg("Failed to remove node ~p. Reason: ~p", [Node, X])
0
+ port_wrapper:shutdown(Node),
0
+ erase_node_record(Node, Registry).
0
-drop_nodes(Terminator, Nodes) ->
0
- Killer = fun(Node) -> unlink(Node), Terminator(Node) end,
0
- lists:map(Killer, Nodes).
0
\ No newline at end of file