Skip to content

Commit

Permalink
Change return value of update_counters/1; add gproc_ps:notify_single_…
Browse files Browse the repository at this point in the history
…if_true

update_counters(Cs) now returns [{Ctr,Pid,NewValue}] instead of just [NewValue]

gproc_ps:notify_single_if_true(Scope,Event,Test,Msg) enables/creates a
single-shot subscription and immediately executes a

  tell_singles(Scope,Event,Msg)

for that subscriber only if Test() -> true. Otherwise, the subscription will
stay active until a message is published for Event 'the normal way'.
  • Loading branch information
uwiger authored and RJ committed Jul 9, 2012
1 parent 79b39fa commit 5742ee9
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 14 deletions.
8 changes: 6 additions & 2 deletions doc/gproc.md
Expand Up @@ -1388,7 +1388,7 @@ that the position is omitted; in gproc, the value position is always `3`.<a name



<pre>update_counters(X1::<a href="#type-scope">scope()</a>, Cs::[{<a href="#type-key">key()</a>, pid(), <a href="#type-increment">increment()</a>}]) -> [integer()]</pre>
<pre>update_counters(X1::<a href="#type-scope">scope()</a>, Cs::[{<a href="#type-key">key()</a>, pid(), <a href="#type-increment">increment()</a>}]) -> [{<a href="#type-key">key()</a>, pid(), integer()}]</pre>
<br></br>


Expand All @@ -1398,9 +1398,13 @@ that the position is omitted; in gproc, the value position is always `3`.<a name

Update a list of counters



This function is not atomic, except (in a sense) for global counters. For local counters,
it is more of a convenience function. For global counters, it is much more efficient
than calling `gproc:update_counter/2` for each individual counter.<a name="update_shared_counter-2"></a>
than calling `gproc:update_counter/2` for each individual counter.

The return value is the corresponding list of `[{Counter, Pid, NewValue}]`.<a name="update_shared_counter-2"></a>

###update_shared_counter/2##

Expand Down
13 changes: 9 additions & 4 deletions src/gproc.erl
Expand Up @@ -1226,17 +1226,22 @@ update_counter1(_, _) ->
%% This function is not atomic, except (in a sense) for global counters. For local counters,
%% it is more of a convenience function. For global counters, it is much more efficient
%% than calling `gproc:update_counter/2' for each individual counter.
%%
%% The return value is the corresponding list of `[{Counter, Pid, NewValue}]'.
%% @end
-spec update_counters(scope(), [{key(), pid(), increment()}]) -> [integer()].
update_counters(l, Cs) ->
-spec update_counters(scope(), [{key(), pid(), increment()}]) ->
[{key(), pid(), integer()}].
update_counters(_, []) ->
[];
update_counters(l, [_|_] = Cs) ->
?CATCH_GPROC_ERROR(update_counters1(Cs), [Cs]);
update_counters(g, Cs) ->
update_counters(g, [_|_] = Cs) ->
?CHK_DIST,
gproc_dist:update_counters(Cs).


update_counters1([{{c,l,_} = Key, Pid, Incr}|T]) ->
[gproc_lib:update_counter(Key, Incr, Pid)|update_counters1(T)];
[{Key, Pid, gproc_lib:update_counter(Key, Incr, Pid)}|update_counters1(T)];
update_counters1([]) ->
[];
update_counters1(_) ->
Expand Down
6 changes: 3 additions & 3 deletions src/gproc_dist.erl
Expand Up @@ -653,10 +653,10 @@ batch_update_counters(Cs) ->
batch_update_counters([{{c,g,_} = Key, Pid, Incr}|T], Returns, Updates) ->
case update_counter_g(Key, Incr, Pid) of
[{_,_,_} = A, {_, _, V} = C] ->
batch_update_counters(T, [V|Returns], add_object(
A, add_object(C, Updates)));
batch_update_counters(T, [{Key,Pid,V}|Returns], add_object(
A, add_object(C, Updates)));
[{_, _, V} = C] ->
batch_update_counters(T, [V|Returns], add_object(C, Updates))
batch_update_counters(T, [{Key,Pid,V}|Returns], add_object(C, Updates))
end;
batch_update_counters([], Returns, Updates) ->
{lists:reverse(Returns), Updates}.
Expand Down
25 changes: 24 additions & 1 deletion src/gproc_ps.erl
Expand Up @@ -45,6 +45,7 @@
disable_single/2,
enable_single/2,
tell_singles/3,
notify_single_if_true/4,
list_singles/2]).

-define(ETag, gproc_ps_event).
Expand Down Expand Up @@ -177,7 +178,7 @@ tell_singles(Scope, Event, Msg) when Scope==l; Scope==g ->
{Scope,c},
[{ {{c,Scope,{?ETag,Event}}, '$1', 1}, [],
[{{ {{c,Scope, {{?ETag,Event}} }}, '$1', {{-1,0,0}} }}] }]),
gproc:update_counters(Scope, Subs),
_ = gproc:update_counters(Scope, Subs),
[begin P ! {?ETag, Event, Msg}, P end || {_,P,_} <- Subs].

-spec list_singles(scope(), event()) -> [{pid(), status()}].
Expand All @@ -186,3 +187,25 @@ tell_singles(Scope, Event, Msg) when Scope==l; Scope==g ->
list_singles(Scope, Event) ->
gproc:select({Scope,c}, [{ {{c,Scope,{?ETag,Event}}, '$1', '$2'},
[], [{{'$1','$2'}}] }]).

-spec notify_single_if_true(scope(), event(), fun(() -> boolean()), msg()) -> ok.
%% @doc Create/enable a single subscription for event; notify at once if F() -> true
%%
%% This function is a convenience function, wrapping a single-shot pub/sub around a
%% user-provided boolean test. `Msg' should be what the publisher will send later, if the
%% immediate test returns `false'.
%% @end
notify_single_if_true(Scope, Event, F, Msg) ->
try enable_single(Scope, Event)
catch
error:_ ->
create_single(Scope, Event)
end,
case F() of
true ->
disable_single(Scope, Event),
self() ! {?ETag, Event, Msg},
ok;
false ->
ok
end.
6 changes: 4 additions & 2 deletions test/gproc_dist_tests.erl
Expand Up @@ -156,8 +156,10 @@ t_update_counters([H1,H2|_] = Ns) ->
?assertMatch(ok, t_read_everywhere(C2, P2, Ns, 1)),
?assertMatch(ok, t_read_everywhere(A1, Pa1, Ns, 4)),
?debugFmt("code:which(gproc_dist) = ~p~n", [code:which(gproc_dist)]),
?assertMatch([3,4,0], t_call(P1, {apply, gproc, update_counters,
[g, [{C1,P1,1},{C1,P12,2},{C2,P2,{-2,0,0}}]]})),
?assertMatch([{C1,P1, 3},
{C1,P12,4},
{C2,P2, 0}], t_call(P1, {apply, gproc, update_counters,
[g, [{C1,P1,1},{C1,P12,2},{C2,P2,{-2,0,0}}]]})),
?assertMatch(ok, t_read_everywhere(C1, P1, Ns, 3)),
?assertMatch(ok, t_read_everywhere(C1, P12, Ns, 4)),
?assertMatch(ok, t_read_everywhere(C2, P2, Ns, 0)),
Expand Down
6 changes: 4 additions & 2 deletions test/gproc_tests.erl
Expand Up @@ -174,8 +174,10 @@ t_update_counters() ->
end),
receive {P1, ok} -> ok end,
?assert(gproc:get_value({a,l,c1}) =:= 8),
?assertEqual([7,8], gproc:update_counters(l, [{{c,l,c1}, self(), 4},
{{c,l,c1}, P1, 3}])),
Me = self(),
?assertEqual([{{c,l,c1},Me,7},
{{c,l,c1},P1,8}], gproc:update_counters(l, [{{c,l,c1}, Me, 4},
{{c,l,c1}, P1, 3}])),
?assert(gproc:get_value({a,l,c1}) =:= 15),
P1 ! {self(), goodbye},
R = erlang:monitor(process, P1),
Expand Down

0 comments on commit 5742ee9

Please sign in to comment.