-
Notifications
You must be signed in to change notification settings - Fork 1
/
counterreg.erl
131 lines (115 loc) · 3.73 KB
/
counterreg.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
%%% Use a write-only counter to take possession of an item.
%%% If the counter returns 1, you get to own the resource. Otherwise,
%%% you don't. Once you're done, you have to set the counter back to
%%% 0 or let an associated resource manager do it.
%%% To know who to contact, use another round-robin counter that
%%% gradually iterates over all available workers as time goes,
%%% assuring an even distribution of requests.
-module(counterreg).
-export([start/1, get_resource/1, put_resource/2, update/1, stop/0]).
start(Num) ->
Heir = spawn(fun() -> timer:sleep(infinity) end),
dispatch_table = ets:new(dispatch_table, [named_table, set, public, {heir,Heir,handover}, {write_concurrency,true}]),
true = ets:insert(dispatch_table, {ct,0}),
[start_watcher(N) || N <- lists:seq(1,Num)],
Num.
get_resource(Num) ->
case is_free(Id = dispatch_id(Num)) of
true ->
sync(get_name(Id), {get,self()});
false ->
busy
end.
put_resource(_Num, {Pid,Resource}) ->
async(Pid, {put, Resource}).
stop() ->
ets:foldl(fun({ct,_},_) ->
ok;
({Id,_},_) ->
sync(get_name(Id), stop)
end, [], dispatch_table),
ets:delete(dispatch_table).
start_watcher(Id) ->
S = self(),
R = make_ref(),
Pid = spawn(fun() -> init_watcher(Id,S,R) end),
MonRef = erlang:monitor(process,Pid),
receive
R -> erlang:demonitor(MonRef,[flush]);
{'DOWN', MonRef, process, _, Reason} -> erlang:error({badstart,Reason})
end.
init_watcher(Id,Parent,Ref) ->
register(get_name(Id), self()),
ets:insert(dispatch_table, {Id, 0}),
Parent ! Ref,
Res = make_ref(),
loop({Id,Res,undefined,undefined}).
%% res available
loop(S = {Id,Res,undefined,undefined}) ->
receive
update -> ?MODULE:update(S);
{From, stop} -> terminate(From, S);
{From, {get, To}} ->
Ref = erlang:monitor(process,To),
reply(From, {ok, {self(),Res}}),
loop({Id,Res,From,Ref});
{put, _Resource} -> % unexpected
loop(S);
{'DOWN', _, process, _Pid, _Reason} -> % unexpected
loop(S)
end;
%% res unavailable
loop(S = {Id,Res, OwnerPid, Ref}) ->
receive
update -> ?MODULE:update(S);
{From, stop} -> terminate(From, S);
{From, {get, _To}} ->
reply(From, busy),
loop(S);
{put, Res} ->
erlang:demonitor(Ref, [flush]),
set_free(Id),
loop({Id,Res,undefined,undefined});
{put, _WeirdRes} -> % undexpected
loop(S);
{'DOWN', Ref, process, OwnerPid, _Reason} ->
NewRes = make_ref(),
set_free(Id),
loop({Id, NewRes, undefined, undefined});
{'DOWN', _, process, _Pid, _Reason} -> % unexpected
loop(S)
end.
%%% UTILS
async(Pid,Msg) ->
Pid ! Msg.
sync(Pid,Msg) ->
Ref = erlang:monitor(process, Pid),
Pid ! {{self(), Ref}, Msg},
receive
{Ref, Reply} ->
erlang:demonitor(Ref, [flush]),
Reply;
{'DOWN', Ref, process, _, Reason} ->
error(Reason)
end.
reply(_From = {Pid, Ref}, Reply) ->
Pid ! {Ref, Reply}.
update(State) -> loop(State).
terminate(From, _State) ->
reply(From, ok).
%%% ID handling
dispatch_id(Num) ->
%ets:update_counter(dispatch_table, ct, {2,1,Num,1}).
erlang:phash2({self(),now()},Num)+1.
is_free(Id) ->
case ets:update_counter(dispatch_table, Id, {2,1}) of
%% allowing more messages, idea by LP
1 -> true;
2 -> true;
3 -> true;
_ -> false
end.
set_free(Id) ->
ets:insert(dispatch_table, {Id,0}).
get_name(Id) ->
list_to_atom("counterreg"++integer_to_list(Id)).