forked from Gianfrancoalongi/WorkerNet
/
wn_resource_process.erl.BAK
121 lines (105 loc) · 4 KB
/
wn_resource_process.erl.BAK
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
%%%-------------------------------------------------------------------
%%% @author Gianfranco <zenon@zen.local>
%%% @copyright (C) 2011, Gianfranco
%%% Created : 11 Jan 2011 by Gianfranco <zenon@zen.local>
%%%-------------------------------------------------------------------
-module(wn_resource_process).
-behaviour(gen_fsm).
-include("include/worker_net.hrl").
-define(TIMEOUT,3000).
-record(state,{node_root :: string(),
queues :: [{atom,[pid()]}],
slots, %% ets {atom,non_neg_integer()|infinity,
%% non_neg_integer()|infinity}
job :: pid(),
job_keeper :: pid()
}).
%% API
-export([start_link/2,signal/3,queued/1]).
%% gen_fsm callbacks
-export([init/1, handle_event/3, handle_sync_event/4,
handle_info/3, terminate/3, code_change/4]).
%%%===================================================================
%%% API
%%%===================================================================
-spec(start_link(string(),[{atom(),non_neg_integer()|infinity}]) ->
{ok,pid()} | {error,term()}).
start_link(NodeRoot,TypeSpec) ->
gen_fsm:start_link(?MODULE, {NodeRoot,TypeSpec}, []).
-spec(signal(pid(),pid(),atom()) -> ok).
signal(Pid,JobKeeperPid,QueueType) ->
gen_fsm:send_all_state_event(Pid,{signal,JobKeeperPid,QueueType}).
-spec(queued(pid()) -> [{atom(),[#wn_job{}]}]).
queued(Pid) ->
gen_fsm:sync_send_all_state_event(Pid,queued).
%%%===================================================================
%%% gen_fsm callbacks
%%%===================================================================
init({NodeRoot,TypeSpec}) ->
Ets = ets:new(available,[set]),
lists:foreach(fun({Type,Amount}) -> ets:insert(Ets,{Type,Amount,Amount})
end,TypeSpec),
{ok, free, #state{node_root = NodeRoot,
queues = [],
slots = Ets
}}.
handle_event({signal,JobKeeperPid,_}, free, State) ->
{Ns,Nld} =
case wn_job_keeper:signal(JobKeeperPid) of
{ok,WnJob} ->
process_flag(trap_exit,true),
{ok,WorkerPid} =
wn_job_worker:start_link(State#state.node_root,
JobKeeperPid,WnJob),
{taken,State#state{job = WorkerPid,
job_keeper = JobKeeperPid}};
{error,taken} ->
{free,State}
end,
{next_state,Ns,Nld};
handle_event({signal,JobKeeperPid,QueueType}, taken, State) ->
Queues = add_to_queues(JobKeeperPid,QueueType,State#state.queues),
{next_state,taken,State#state{queues = Queues}}.
handle_sync_event(queued, _From, StateName, State) ->
Reply = collect_jobs(State),
{reply, Reply, StateName, State}.
handle_info({'EXIT',WorkerPid,Reason},
taken,#state{job = WorkerPid,job_keeper = JobKeeperPid} = State) ->
wn_job_keeper:done(JobKeeperPid,Reason),
{Ns,Nld} =
case waiting_in(State#state.queues) of
none -> {free,State};
{WaitingKeeperPid,NewQs} ->
case wn_job_keeper:signal(WaitingKeeperPid) of
{ok,WnJob} ->
process_flag(trap_exit,true),
{ok,NewWorkerPid} =
wn_job_worker:start_link(State#state.node_root,
WaitingKeeperPid,
WnJob),
{taken,State#state{job = NewWorkerPid,
job_keeper = WaitingKeeperPid,
queues = NewQs}};
{error,taken} ->
{free,State#state{queues = NewQs}}
end
end,
{next_state,Ns,Nld}.
terminate(_Reason, _StateName, _State) ->
ok.
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
collect_jobs(State) ->
[{Type,[ wn_job_keeper:get_job(Pid) || Pid <- Queue]}
|| {Type,Queue} <- State#state.queues].
add_to_queues(JobKeeperPid,Type,[{Type,Queue}|Rest]) ->
[{Type,Queue++[JobKeeperPid]}|Rest];
add_to_queues(JobKeeperPid,Type,[E|Rest]) ->
[E|add_to_queues(JobKeeperPid,Type,Rest)];
add_to_queues(JobKeeperPid,Type,[]) -> [{Type,[JobKeeperPid]}].
waiting_in([{_,[Pid]}|Q]) -> {Pid,Q};
waiting_in([{Type,[Pid|R]}|Q]) -> {Pid,[{Type,R}|Q]};
waiting_in([]) -> none.