Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Fix dist update_counter() bug, add gproc_ps module, add update_counte…

…rs/1

The update_counter/2 function did not work properly for the global case,
since the introduction of complex "update ops" for counters.

The gproc_ps module adds publish/subscribe functions, including
single-shot subscriptions (subscription is disabled after each notify).

The update_counters(Cs) function was mainly added for gproc_ps, but is
generally useful mainly for bulk update of global counters.
  • Loading branch information...
commit 1740e41069aebd62819ccf8dd37cf95814c880ea 1 parent b1855cb
@uwiger uwiger authored
View
1  README.md
@@ -127,5 +127,6 @@ Freiburg 2007 ([Paper available here](http://github.com/esl/gproc/blob/master/do
<tr><td><a href="http://github.com/esl/gproc/blob/master/doc/gproc_init.md" class="module">gproc_init</a></td></tr>
<tr><td><a href="http://github.com/esl/gproc/blob/master/doc/gproc_lib.md" class="module">gproc_lib</a></td></tr>
<tr><td><a href="http://github.com/esl/gproc/blob/master/doc/gproc_monitor.md" class="module">gproc_monitor</a></td></tr>
+<tr><td><a href="http://github.com/esl/gproc/blob/master/doc/gproc_ps.md" class="module">gproc_ps</a></td></tr>
<tr><td><a href="http://github.com/esl/gproc/blob/master/doc/gproc_sup.md" class="module">gproc_sup</a></td></tr></table>
View
1  doc/README.md
@@ -127,5 +127,6 @@ Freiburg 2007 ([Paper available here](erlang07-wiger.pdf)).
<tr><td><a href="gproc_init.md" class="module">gproc_init</a></td></tr>
<tr><td><a href="gproc_lib.md" class="module">gproc_lib</a></td></tr>
<tr><td><a href="gproc_monitor.md" class="module">gproc_monitor</a></td></tr>
+<tr><td><a href="gproc_ps.md" class="module">gproc_ps</a></td></tr>
<tr><td><a href="gproc_sup.md" class="module">gproc_sup</a></td></tr></table>
View
2  doc/edoc-info
@@ -1,4 +1,4 @@
{application,gproc}.
{packages,[]}.
{modules,[gproc,gproc_app,gproc_dist,gproc_info,gproc_init,gproc_lib,
- gproc_monitor,gproc_sup]}.
+ gproc_monitor,gproc_ps,gproc_sup]}.
View
70 doc/gproc.md
@@ -63,6 +63,38 @@ will improve performance.
+###<a name="type-ctr_incr">ctr_incr()</a>##
+
+
+
+<pre>ctr_incr() = integer()</pre>
+
+
+
+###<a name="type-ctr_setval">ctr_setval()</a>##
+
+
+
+<pre>ctr_setval() = integer()</pre>
+
+
+
+###<a name="type-ctr_thr">ctr_thr()</a>##
+
+
+
+<pre>ctr_thr() = integer()</pre>
+
+
+
+###<a name="type-ctr_update">ctr_update()</a>##
+
+
+
+<pre>ctr_update() = <a href="#type-ctr_incr">ctr_incr()</a> | {<a href="#type-ctr_incr">ctr_incr()</a>, <a href="#type-ctr_thr">ctr_thr()</a>, <a href="#type-ctr_setval">ctr_setval()</a>}</pre>
+
+
+
###<a name="type-headpat">headpat()</a>##
@@ -71,6 +103,14 @@ will improve performance.
+###<a name="type-increment">increment()</a>##
+
+
+
+<pre>increment() = <a href="#type-ctr_incr">ctr_incr()</a> | <a href="#type-ctr_update">ctr_update()</a> | [<a href="#type-ctr_update">ctr_update()</a>]</pre>
+
+
+
###<a name="type-key">key()</a>##
@@ -78,6 +118,8 @@ will improve performance.
<pre>key() = {<a href="#type-type">type()</a>, <a href="#type-scope">scope()</a>, any()}</pre>
+update_counter increment
+
###<a name="type-keypat">keypat()</a>##
@@ -173,7 +215,7 @@ to forget about the calling process.</td></tr><tr><td valign="top"><a href="#i-0
about names and properties registered in Gproc, where applicable.</td></tr><tr><td valign="top"><a href="#info-1">info/1</a></td><td>Similar to <code>process_info(Pid)</code> but with additional gproc info.</td></tr><tr><td valign="top"><a href="#info-2">info/2</a></td><td>Similar to process_info(Pid, Item), but with additional gproc info.</td></tr><tr><td valign="top"><a href="#last-1">last/1</a></td><td>Behaves as ets:last(Tab) for a given type of registration object.</td></tr><tr><td valign="top"><a href="#lookup_global_aggr_counter-1">lookup_global_aggr_counter/1</a></td><td>Lookup a global (unique) aggregated counter and returns its value.</td></tr><tr><td valign="top"><a href="#lookup_global_counters-1">lookup_global_counters/1</a></td><td>Look up all global (non-unique) instances of a given Counter.</td></tr><tr><td valign="top"><a href="#lookup_global_name-1">lookup_global_name/1</a></td><td>Lookup a global unique name.</td></tr><tr><td valign="top"><a href="#lookup_global_properties-1">lookup_global_properties/1</a></td><td>Look up all global (non-unique) instances of a given Property.</td></tr><tr><td valign="top"><a href="#lookup_local_aggr_counter-1">lookup_local_aggr_counter/1</a></td><td>Lookup a local (unique) aggregated counter and returns its value.</td></tr><tr><td valign="top"><a href="#lookup_local_counters-1">lookup_local_counters/1</a></td><td>Look up all local (non-unique) instances of a given Counter.</td></tr><tr><td valign="top"><a href="#lookup_local_name-1">lookup_local_name/1</a></td><td>Lookup a local unique name.</td></tr><tr><td valign="top"><a href="#lookup_local_properties-1">lookup_local_properties/1</a></td><td>Look up all local (non-unique) instances of a given Property.</td></tr><tr><td valign="top"><a href="#lookup_pid-1">lookup_pid/1</a></td><td>Lookup the Pid stored with a key.</td></tr><tr><td valign="top"><a href="#lookup_pids-1">lookup_pids/1</a></td><td>Returns a list of pids with the published key Key.</td></tr><tr><td valign="top"><a href="#lookup_value-1">lookup_value/1</a></td><td>Lookup the value stored with a key.</td></tr><tr><td valign="top"><a href="#lookup_values-1">lookup_values/1</a></td><td>Retrieve the <code>{Pid,Value}</code> pairs corresponding to Key.</td></tr><tr><td valign="top"><a href="#monitor-1">monitor/1</a></td><td>monitor a registered name
This function works much like erlang:monitor(process, Pid), but monitors
a unique name registered via gproc.</td></tr><tr><td valign="top"><a href="#mreg-3">mreg/3</a></td><td>Register multiple {Key,Value} pairs of a given type and scope.</td></tr><tr><td valign="top"><a href="#munreg-3">munreg/3</a></td><td>Unregister multiple Key items of a given type and scope.</td></tr><tr><td valign="top"><a href="#nb_wait-1">nb_wait/1</a></td><td>Wait for a local name to be registered.</td></tr><tr><td valign="top"><a href="#next-2">next/2</a></td><td>Behaves as ets:next(Tab,Key) for a given type of registration object.</td></tr><tr><td valign="top"><a href="#prev-2">prev/2</a></td><td>Behaves as ets:prev(Tab,Key) for a given type of registration object.</td></tr><tr><td valign="top"><a href="#reg-1">reg/1</a></td><td>Equivalent to <a href="#reg-2"><tt>reg(Key, default(Key))</tt></a>.</td></tr><tr><td valign="top"><a href="#reg-2">reg/2</a></td><td>Register a name or property for the current process.</td></tr><tr><td valign="top"><a href="#reg_shared-1">reg_shared/1</a></td><td>Register a resource, but don't tie it to a particular process.</td></tr><tr><td valign="top"><a href="#reg_shared-2">reg_shared/2</a></td><td>Register a resource, but don't tie it to a particular process.</td></tr><tr><td valign="top"><a href="#register_name-2">register_name/2</a></td><td>Behaviour support callback.</td></tr><tr><td valign="top"><a href="#reset_counter-1">reset_counter/1</a></td><td>Reads and resets a counter in a "thread-safe" way.</td></tr><tr><td valign="top"><a href="#select-1">select/1</a></td><td>
-see http://www.erlang.org/doc/man/ets.html#select-1.</td></tr><tr><td valign="top"><a href="#select-2">select/2</a></td><td>Perform a select operation on the process registry.</td></tr><tr><td valign="top"><a href="#select-3">select/3</a></td><td>Like <a href="#select-2"><code>select/2</code></a> but returns Limit objects at a time.</td></tr><tr><td valign="top"><a href="#select_count-1">select_count/1</a></td><td>Equivalent to <a href="#select_count-2"><tt>select_count(all, Pat)</tt></a>.</td></tr><tr><td valign="top"><a href="#select_count-2">select_count/2</a></td><td>Perform a select_count operation on the process registry.</td></tr><tr><td valign="top"><a href="#send-2">send/2</a></td><td>Sends a message to the process, or processes, corresponding to Key.</td></tr><tr><td valign="top"><a href="#set_env-5">set_env/5</a></td><td>Updates the cached value as well as underlying environment.</td></tr><tr><td valign="top"><a href="#set_value-2">set_value/2</a></td><td>Sets the value of the registeration entry given by Key.</td></tr><tr><td valign="top"><a href="#start_link-0">start_link/0</a></td><td>Starts the gproc server.</td></tr><tr><td valign="top"><a href="#table-0">table/0</a></td><td>Equivalent to <a href="#table-1"><tt>table({all, all})</tt></a>.</td></tr><tr><td valign="top"><a href="#table-1">table/1</a></td><td>Equivalent to <a href="#table-2"><tt>table(Context, [])</tt></a>.</td></tr><tr><td valign="top"><a href="#table-2">table/2</a></td><td>QLC table generator for the gproc registry.</td></tr><tr><td valign="top"><a href="#unreg-1">unreg/1</a></td><td>Unregister a name or property.</td></tr><tr><td valign="top"><a href="#unreg_shared-1">unreg_shared/1</a></td><td>Unregister a shared resource.</td></tr><tr><td valign="top"><a href="#unregister_name-1">unregister_name/1</a></td><td>Equivalent to <tt>unreg / 1</tt>.</td></tr><tr><td valign="top"><a href="#update_counter-2">update_counter/2</a></td><td>Updates the counter registered as Key for the current process.</td></tr><tr><td valign="top"><a href="#update_shared_counter-2">update_shared_counter/2</a></td><td>Updates the shared counter registered as Key.</td></tr><tr><td valign="top"><a href="#where-1">where/1</a></td><td>Returns the pid registered as Key.</td></tr><tr><td valign="top"><a href="#whereis_name-1">whereis_name/1</a></td><td>Equivalent to <tt>where / 1</tt>.</td></tr></table>
+see http://www.erlang.org/doc/man/ets.html#select-1.</td></tr><tr><td valign="top"><a href="#select-2">select/2</a></td><td>Perform a select operation on the process registry.</td></tr><tr><td valign="top"><a href="#select-3">select/3</a></td><td>Like <a href="#select-2"><code>select/2</code></a> but returns Limit objects at a time.</td></tr><tr><td valign="top"><a href="#select_count-1">select_count/1</a></td><td>Equivalent to <a href="#select_count-2"><tt>select_count(all, Pat)</tt></a>.</td></tr><tr><td valign="top"><a href="#select_count-2">select_count/2</a></td><td>Perform a select_count operation on the process registry.</td></tr><tr><td valign="top"><a href="#send-2">send/2</a></td><td>Sends a message to the process, or processes, corresponding to Key.</td></tr><tr><td valign="top"><a href="#set_env-5">set_env/5</a></td><td>Updates the cached value as well as underlying environment.</td></tr><tr><td valign="top"><a href="#set_value-2">set_value/2</a></td><td>Sets the value of the registeration entry given by Key.</td></tr><tr><td valign="top"><a href="#start_link-0">start_link/0</a></td><td>Starts the gproc server.</td></tr><tr><td valign="top"><a href="#table-0">table/0</a></td><td>Equivalent to <a href="#table-1"><tt>table({all, all})</tt></a>.</td></tr><tr><td valign="top"><a href="#table-1">table/1</a></td><td>Equivalent to <a href="#table-2"><tt>table(Context, [])</tt></a>.</td></tr><tr><td valign="top"><a href="#table-2">table/2</a></td><td>QLC table generator for the gproc registry.</td></tr><tr><td valign="top"><a href="#unreg-1">unreg/1</a></td><td>Unregister a name or property.</td></tr><tr><td valign="top"><a href="#unreg_shared-1">unreg_shared/1</a></td><td>Unregister a shared resource.</td></tr><tr><td valign="top"><a href="#unregister_name-1">unregister_name/1</a></td><td>Equivalent to <tt>unreg / 1</tt>.</td></tr><tr><td valign="top"><a href="#update_counter-2">update_counter/2</a></td><td>Updates the counter registered as Key for the current process.</td></tr><tr><td valign="top"><a href="#update_counters-2">update_counters/2</a></td><td>Update a list of counters.</td></tr><tr><td valign="top"><a href="#update_shared_counter-2">update_shared_counter/2</a></td><td>Updates the shared counter registered as Key.</td></tr><tr><td valign="top"><a href="#where-1">where/1</a></td><td>Returns the pid registered as Key.</td></tr><tr><td valign="top"><a href="#whereis_name-1">whereis_name/1</a></td><td>Equivalent to <tt>where / 1</tt>.</td></tr></table>
<a name="functions"></a>
@@ -1321,8 +1363,9 @@ Equivalent to `unreg / 1`.<a name="update_counter-2"></a>
-<pre>update_counter(Key::<a href="#type-key">key()</a>, Incr) -> integer() | [integer()]</pre>
-<ul class="definitions"><li><pre>Incr = IncrVal | UpdateOp | [UpdateOp]</pre></li><li><pre>UpdateOp = IncrVal | {IncrVal, Threshold, SetValue}</pre></li><li><pre>IncrVal = integer()</pre></li></ul>
+<pre>update_counter(Key::<a href="#type-key">key()</a>, Incr::<a href="#type-increment">increment()</a>) -> integer()</pre>
+<br></br>
+
@@ -1338,7 +1381,26 @@ will fail if the type of object referred to by Key is not a counter.
Aggregated counters with the same name will be updated automatically.
The `UpdateOp` patterns are the same as for `ets:update_counter/3`, except
-that the position is omitted; in gproc, the value position is always `3`.<a name="update_shared_counter-2"></a>
+that the position is omitted; in gproc, the value position is always `3`.<a name="update_counters-2"></a>
+
+###update_counters/2##
+
+
+
+
+<pre>update_counters(X1::<a href="#type-scope">scope()</a>, Cs::[{<a href="#type-key">key()</a>, pid(), <a href="#type-increment">increment()</a>}]) -> [integer()]</pre>
+<br></br>
+
+
+
+
+
+
+Update a list of counters
+
+This function is not atomic, except (in a sense) for global counters. For local counters,
+it is more of a convenience function. For global counters, it is much more efficient
+than calling `gproc:update_counter/2` for each individual counter.<a name="update_shared_counter-2"></a>
###update_shared_counter/2##
View
13 doc/gproc_dist.md
@@ -10,7 +10,7 @@ Extended process registry.
-__Behaviours:__ [`gen_leader`](/Users/uwiger/ETC/git/gproc/deps/gen_leader/doc/gen_leader.md).
+__Behaviours:__ [`gen_leader`](/Users/uwiger/FL/git/gen_leader/doc/gen_leader.md).
__Authors:__ Ulf Wiger ([`ulf.wiger@erlang-solutions.com`](mailto:ulf.wiger@erlang-solutions.com)).<a name="description"></a>
@@ -30,7 +30,7 @@ Class = n - unique name
| p - non-unique property
| c - counter
| a - aggregated counter
-Scope = l | g (global or local).</td></tr><tr><td valign="top"><a href="#reg_shared-2">reg_shared/2</a></td><td></td></tr><tr><td valign="top"><a href="#reset_counter-1">reset_counter/1</a></td><td></td></tr><tr><td valign="top"><a href="#set_value-2">set_value/2</a></td><td></td></tr><tr><td valign="top"><a href="#start_link-0">start_link/0</a></td><td></td></tr><tr><td valign="top"><a href="#start_link-1">start_link/1</a></td><td></td></tr><tr><td valign="top"><a href="#surrendered-3">surrendered/3</a></td><td></td></tr><tr><td valign="top"><a href="#sync-0">sync/0</a></td><td>Synchronize with the gproc leader.</td></tr><tr><td valign="top"><a href="#terminate-2">terminate/2</a></td><td></td></tr><tr><td valign="top"><a href="#unreg-1">unreg/1</a></td><td></td></tr><tr><td valign="top"><a href="#unreg_shared-1">unreg_shared/1</a></td><td></td></tr><tr><td valign="top"><a href="#update_counter-2">update_counter/2</a></td><td></td></tr><tr><td valign="top"><a href="#update_shared_counter-2">update_shared_counter/2</a></td><td></td></tr></table>
+Scope = l | g (global or local).</td></tr><tr><td valign="top"><a href="#reg_shared-2">reg_shared/2</a></td><td></td></tr><tr><td valign="top"><a href="#reset_counter-1">reset_counter/1</a></td><td></td></tr><tr><td valign="top"><a href="#set_value-2">set_value/2</a></td><td></td></tr><tr><td valign="top"><a href="#start_link-0">start_link/0</a></td><td></td></tr><tr><td valign="top"><a href="#start_link-1">start_link/1</a></td><td></td></tr><tr><td valign="top"><a href="#surrendered-3">surrendered/3</a></td><td></td></tr><tr><td valign="top"><a href="#sync-0">sync/0</a></td><td>Synchronize with the gproc leader.</td></tr><tr><td valign="top"><a href="#terminate-2">terminate/2</a></td><td></td></tr><tr><td valign="top"><a href="#unreg-1">unreg/1</a></td><td></td></tr><tr><td valign="top"><a href="#unreg_shared-1">unreg_shared/1</a></td><td></td></tr><tr><td valign="top"><a href="#update_counter-2">update_counter/2</a></td><td></td></tr><tr><td valign="top"><a href="#update_counters-1">update_counters/1</a></td><td></td></tr><tr><td valign="top"><a href="#update_shared_counter-2">update_shared_counter/2</a></td><td></td></tr></table>
<a name="functions"></a>
@@ -333,6 +333,15 @@ the leader died.)<a name="terminate-2"></a>
`update_counter(Key, Incr) -> any()`
+<a name="update_counters-1"></a>
+
+###update_counters/1##
+
+
+
+
+`update_counters(List) -> any()`
+
<a name="update_shared_counter-2"></a>
###update_shared_counter/2##
View
312 doc/gproc_ps.md
@@ -0,0 +1,312 @@
+
+
+#Module gproc_ps#
+* [Description](#description)
+* [Data Types](#types)
+* [Function Index](#index)
+* [Function Details](#functions)
+
+
+Gproc Publish/Subscribe patterns
+This module implements a few convenient functions for publish/subscribe.
+
+
+
+__Authors:__ Ulf Wiger ([`ulf.wiger@feuerlabs.com`](mailto:ulf.wiger@feuerlabs.com)).<a name="description"></a>
+
+##Description##
+
+
+
+
+Publish/subscribe with Gproc relies entirely on gproc properties and counters.
+This makes for a very concise implementation, as the monitoring of subscribers and
+removal of subscriptions comes for free with Gproc.
+
+Using this module instead of rolling your own (which is easy enough) brings the
+benefit of consistency, in tracing and debugging.
+The implementation can also serve to illustrate how to use gproc properties and
+counters to good effect.
+
+<a name="types"></a>
+
+##Data Types##
+
+
+
+
+###<a name="type-event">event()</a>##
+
+
+
+<pre>event() = any()</pre>
+
+
+
+###<a name="type-msg">msg()</a>##
+
+
+
+<pre>msg() = any()</pre>
+
+
+
+###<a name="type-scope">scope()</a>##
+
+
+
+<pre>scope() = l | g</pre>
+
+
+
+###<a name="type-status">status()</a>##
+
+
+
+<pre>status() = 1 | 0</pre>
+<a name="index"></a>
+
+##Function Index##
+
+
+<table width="100%" border="1" cellspacing="0" cellpadding="2" summary="function index"><tr><td valign="top"><a href="#create_single-2">create_single/2</a></td><td>Creates a single-shot subscription entry for Event.</td></tr><tr><td valign="top"><a href="#delete_single-2">delete_single/2</a></td><td>Deletes the single-shot subscription for Event.</td></tr><tr><td valign="top"><a href="#disable_single-2">disable_single/2</a></td><td>Disables the single-shot subscription for Event.</td></tr><tr><td valign="top"><a href="#enable_single-2">enable_single/2</a></td><td>Enables the single-shot subscription for Event.</td></tr><tr><td valign="top"><a href="#list_singles-2">list_singles/2</a></td><td>Lists all single-shot subscribers of Event, together with their status.</td></tr><tr><td valign="top"><a href="#list_subs-2">list_subs/2</a></td><td>List the pids of all processes subscribing to <code>Event</code></td></tr><tr><td valign="top"><a href="#publish-3">publish/3</a></td><td>Publish the message <code>Msg</code> to all subscribers of <code>Event</code></td></tr><tr><td valign="top"><a href="#subscribe-2">subscribe/2</a></td><td>Subscribe to events of type <code>Event</code></td></tr><tr><td valign="top"><a href="#tell_singles-3">tell_singles/3</a></td><td>Publish <code>Msg` to all single-shot subscribers of `Event</code></td></tr><tr><td valign="top"><a href="#unsubscribe-2">unsubscribe/2</a></td><td>Remove subscribtion created using <code>subscribe(Scope, Event)</code></td></tr></table>
+
+
+<a name="functions"></a>
+
+##Function Details##
+
+<a name="create_single-2"></a>
+
+###create_single/2##
+
+
+
+
+<pre>create_single(Scope::<a href="#type-scope">scope()</a>, Event::<a href="#type-event">event()</a>) -> true</pre>
+<br></br>
+
+
+
+
+
+
+Creates a single-shot subscription entry for Event
+
+
+
+Single-shot subscriptions behave similarly to the `{active,once}` property of sockets.
+Once a message has been published, the subscription is disabled, and no more messages
+will be delivered to the subscriber unless the subscription is re-enabled using
+`enable_single/2`.
+
+
+
+The function creates a gproc counter entry, `{c,Scope,{gproc_ps_event,Event}}`, which
+will have either of the values `0` (disabled) or `1` (enabled). Initially, the value
+is `1`, meaning the subscription is enabled.
+
+Counters are used in this case, since they can be atomically updated by both the
+subscriber (owner) and publisher. The publisher sets the counter value to `0` as soon
+as it has delivered a message.<a name="delete_single-2"></a>
+
+###delete_single/2##
+
+
+
+
+<pre>delete_single(Scope::<a href="#type-scope">scope()</a>, Event::<a href="#type-event">event()</a>) -> true</pre>
+<br></br>
+
+
+
+
+
+
+Deletes the single-shot subscription for Event
+
+This function deletes the counter entry representing the single-shot description.
+An exception will be raised if there is no such subscription.<a name="disable_single-2"></a>
+
+###disable_single/2##
+
+
+
+
+<pre>disable_single(Scope::<a href="#type-scope">scope()</a>, Event::<a href="#type-event">event()</a>) -> integer()</pre>
+<br></br>
+
+
+
+
+
+
+Disables the single-shot subscription for Event
+
+
+
+This function changes the value of the corresponding gproc counter to `0` (disabled).
+
+
+
+The subscription remains (e.g. for debugging purposes), but with a 'disabled' status.
+This function is insensitive to concurrency, using 'wrapping' ets counter update ops.
+This guarantees that the counter will have either the value 1 or 0, depending on which
+update happened last.
+
+The return value indicates the previous status.<a name="enable_single-2"></a>
+
+###enable_single/2##
+
+
+
+
+<pre>enable_single(Scope::<a href="#type-scope">scope()</a>, Event::<a href="#type-event">event()</a>) -> integer()</pre>
+<br></br>
+
+
+
+
+
+
+Enables the single-shot subscription for Event
+
+
+
+This function changes the value of the corresponding gproc counter to `1` (enabled).
+
+
+
+After enabling, the subscriber will receive the next message published for `Event`,
+after which the subscription is automatically disabled.
+
+
+
+This function is insensitive to concurrency, using 'wrapping' ets counter update ops.
+This guarantees that the counter will have either the value 1 or 0, depending on which
+update happened last.
+
+The return value indicates the previous status.<a name="list_singles-2"></a>
+
+###list_singles/2##
+
+
+
+
+<pre>list_singles(Scope::<a href="#type-scope">scope()</a>, Event::<a href="#type-event">event()</a>) -> [{pid(), <a href="#type-status">status()</a>}]</pre>
+<br></br>
+
+
+
+
+Lists all single-shot subscribers of Event, together with their status<a name="list_subs-2"></a>
+
+###list_subs/2##
+
+
+
+
+<pre>list_subs(Scope::<a href="#type-scope">scope()</a>, Event::<a href="#type-event">event()</a>) -> [pid()]</pre>
+<br></br>
+
+
+
+
+
+
+List the pids of all processes subscribing to `Event`
+
+This function uses `gproc:select/2` to find all properties indicating a subscription.<a name="publish-3"></a>
+
+###publish/3##
+
+
+
+
+<pre>publish(Scope::<a href="#type-scope">scope()</a>, Event::<a href="#type-event">event()</a>, Msg::<a href="#type-msg">msg()</a>) -> ok</pre>
+<br></br>
+
+
+
+
+
+
+Publish the message `Msg` to all subscribers of `Event`
+
+
+
+The message delivered to each subscriber will be of the form:
+
+
+
+`{gproc_ps_event, Event, Msg}`
+
+The function uses `gproc:send/2` to send a message to all processes which have a
+property `{p,Scope,{gproc_ps_event,Event}}`.<a name="subscribe-2"></a>
+
+###subscribe/2##
+
+
+
+
+<pre>subscribe(Scope::<a href="#type-scope">scope()</a>, Event::<a href="#type-event">event()</a>) -> true</pre>
+<br></br>
+
+
+
+
+
+
+Subscribe to events of type `Event`
+
+
+
+Any messages published with `gproc_ps:publish(Scope, Event, Msg)` will be delivered to
+the current process, along with all other subscribers.
+
+This function creates a property, `{p,Scope,{gproc_ps_event,Event}}`, which can be
+searched and displayed for debugging purposes.<a name="tell_singles-3"></a>
+
+###tell_singles/3##
+
+
+
+
+<pre>tell_singles(Scope::<a href="#type-scope">scope()</a>, Event::<a href="#type-event">event()</a>, Msg::<a href="#type-msg">msg()</a>) -> [pid()]</pre>
+<br></br>
+
+
+
+
+
+
+Publish `Msg` to all single-shot subscribers of `Event`
+
+
+
+The subscriber status of each active subscriber is changed to `0` (disabled) before
+delivering the message. This reduces the risk that two different processes will be able
+to both deliver a message before disabling the subscribers. This could happen if the
+context switch happens just after the select operation (finding the active subscribers)
+and before the process is able to update the counters. In this case, it is possible
+that more than one can be delivered.
+
+The way to prevent this from happening is to ensure that only one process publishes
+for `Event`.<a name="unsubscribe-2"></a>
+
+###unsubscribe/2##
+
+
+
+
+<pre>unsubscribe(Scope::<a href="#type-scope">scope()</a>, Event::<a href="#type-event">event()</a>) -> true</pre>
+<br></br>
+
+
+
+
+
+
+Remove subscribtion created using `subscribe(Scope, Event)`
+
+This removes the property created through `subscribe/2`.
View
8 include/gproc.hrl
@@ -33,3 +33,11 @@
-type key() :: {type(), scope(), any()}.
-type sel_pattern() :: [{headpat(), list(), list()}].
+
+%% update_counter increment
+-type ctr_incr() :: integer().
+-type ctr_thr() :: integer().
+-type ctr_setval() :: integer().
+-type ctr_update() :: ctr_incr()
+ | {ctr_incr(), ctr_thr(), ctr_setval()}.
+-type increment() :: ctr_incr() | ctr_update() | [ctr_update()].
View
BIN  rebar
Binary file not shown
View
35 src/gproc.erl
@@ -58,6 +58,14 @@
%% @type sel_var() = DollarVar | '_'.
%% @type sel_pattern() = [{headpat(), Guards, Prod}].
%% @type key() = {type(), scope(), any()}.
+%%
+%% update_counter increment
+%% @type ctr_incr() = integer().
+%% @type ctr_thr() = integer().
+%% @type ctr_setval() = integer().
+%% @type ctr_update() = ctr_incr()
+%% | {ctr_incr(), ctr_thr(), ctr_setval()}.
+%% @type increment() = ctr_incr() | ctr_update() | [ctr_update()].
-module(gproc).
-behaviour(gen_server).
@@ -81,6 +89,7 @@
lookup_value/1,
lookup_values/1,
update_counter/2,
+ update_counters/2,
reset_counter/1,
update_shared_counter/2,
give_away/2,
@@ -1184,7 +1193,7 @@ lookup_values({T,_,_} = Key) ->
end,
[Pair || {P,_} = Pair <- L, my_is_process_alive(P)].
-%% @spec (Key::key(), Incr) -> integer() | [integer()]
+%% @ spec (Key::key(), Incr) -> integer() | [integer()]
%% Incr = IncrVal | UpdateOp | [UpdateOp]
%% UpdateOp = IncrVal | {IncrVal, Threshold, SetValue}
%% IncrVal = integer()
@@ -1200,6 +1209,7 @@ lookup_values({T,_,_} = Key) ->
%% that the position is omitted; in gproc, the value position is always `3'.
%% @end
%%
+-spec update_counter(key(), increment()) -> integer().
update_counter(Key, Incr) ->
?CATCH_GPROC_ERROR(update_counter1(Key, Incr), [Key, Incr]).
@@ -1211,6 +1221,29 @@ update_counter1({c,g,_} = Key, Incr) ->
update_counter1(_, _) ->
?THROW_GPROC_ERROR(badarg).
+%% @doc Update a list of counters
+%%
+%% This function is not atomic, except (in a sense) for global counters. For local counters,
+%% it is more of a convenience function. For global counters, it is much more efficient
+%% than calling `gproc:update_counter/2' for each individual counter.
+%% @end
+-spec update_counters(scope(), [{key(), pid(), increment()}]) -> [integer()].
+update_counters(l, Cs) ->
+ ?CATCH_GPROC_ERROR(update_counters1(Cs), [Cs]);
+update_counters(g, Cs) ->
+ ?CHK_DIST,
+ gproc_dist:update_counters(Cs).
+
+
+update_counters1([{{c,l,_} = Key, Pid, Incr}|T]) ->
+ [gproc_lib:update_counter(Key, Incr, Pid)|update_counters1(T)];
+update_counters1([]) ->
+ [];
+update_counters1(_) ->
+ ?THROW_GPROC_ERROR(badarg).
+
+
+
%% @spec (Key) -> {ValueBefore, ValueAfter}
%% Key = {c, Scope, Name}
View
77 src/gproc_dist.erl
@@ -30,6 +30,7 @@
set_value/2,
give_away/2,
update_counter/2,
+ update_counters/1,
update_shared_counter/2,
reset_counter/1]).
@@ -148,6 +149,11 @@ update_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
update_counter(_, _) ->
?THROW_GPROC_ERROR(badarg).
+update_counters(List) when is_list(List) ->
+ leader_call({update_counters, List});
+update_counters(_) ->
+ ?THROW_GPROC_ERROR(badarg).
+
update_shared_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
leader_call({update_counter, Key, Incr, shared});
update_shared_counter(_, _) ->
@@ -322,6 +328,13 @@ handle_leader_call({update_counter, {c,g,_Ctr} = Key, Incr, Pid}, _From, S, _E)
error:_ ->
{reply, badarg, S}
end;
+handle_leader_call({update_counters, Cs}, _From, S, _E) ->
+ try {Replies, Vals} = batch_update_counters(Cs),
+ {reply, Replies, [{insert, Vals}], S}
+ catch
+ error:_ ->
+ {reply, badarg, S}
+ end;
handle_leader_call({reset_counter, {c,g,_Ctr} = Key, Pid}, _From, S, _E) ->
try Current = ets:lookup_element(?TAB, {Key, Pid}, 3),
Initial = case ets:lookup_element(?TAB, {Pid, Key}, 2) of
@@ -634,15 +647,73 @@ surrendered_1(Globs) ->
leader_cast({remove_globals, Remove})
end.
-update_aggr_counter({c,g,Ctr}, Incr) ->
+batch_update_counters(Cs) ->
+ batch_update_counters(Cs, [], []).
+
+batch_update_counters([{{c,g,_} = Key, Pid, Incr}|T], Returns, Updates) ->
+ case update_counter_g(Key, Incr, Pid) of
+ [{_,_,_} = A, {_, _, V} = C] ->
+ batch_update_counters(T, [V|Returns], add_object(
+ A, add_object(C, Updates)));
+ [{_, _, V} = C] ->
+ batch_update_counters(T, [V|Returns], add_object(C, Updates))
+ end;
+batch_update_counters([], Returns, Updates) ->
+ {lists:reverse(Returns), Updates}.
+
+
+add_object({K,P,_} = Obj, [{K,P,_} | T]) ->
+ [Obj | T];
+add_object(Obj, [H|T]) ->
+ [H | add_object(Obj, T)];
+add_object(Obj, []) ->
+ [Obj].
+
+
+
+update_counter_g({c,g,_} = Key, Incr, Pid) when is_integer(Incr) ->
+ Res = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
+ update_aggr_counter(Key, Incr, [{{Key,Pid},Pid,Res}]);
+update_counter_g({c,g,_} = Key, {Incr, Threshold, SetValue}, Pid)
+ when is_integer(Incr), is_integer(Threshold), is_integer(SetValue) ->
+ [Prev, New] = ets:update_counter(?TAB, {Key, Pid},
+ [{3, 0}, {3, Incr, Threshold, SetValue}]),
+ update_aggr_counter(Key, New - Prev, [{{Key,Pid},Pid,New}]);
+update_counter_g({c,l,_} = Key, Ops, Pid) when is_list(Ops) ->
+ case ets:update_counter(?TAB, {Key, Pid},
+ [{3, 0} | expand_ops(Ops)]) of
+ [_] ->
+ [];
+ [Prev | Rest] ->
+ [New | _] = lists:reverse(Rest),
+ update_aggr_counter(Key, New - Prev, [{Key, Pid, Rest}])
+ end;
+update_counter_g(_, _, _) ->
+ ?THROW_GPROC_ERROR(badarg).
+
+
+expand_ops([{Incr,Thr,SetV}|T])
+ when is_integer(Incr), is_integer(Thr), is_integer(SetV) ->
+ [{3, Incr, Thr, SetV}|expand_ops(T)];
+expand_ops([Incr|T]) when is_integer(Incr) ->
+ [{3, Incr}|expand_ops(T)];
+expand_ops([]) ->
+ [];
+expand_ops(_) ->
+ ?THROW_GPROC_ERROR(badarg).
+
+update_aggr_counter(Key, Incr) ->
+ update_aggr_counter(Key, Incr, []).
+
+update_aggr_counter({c,g,Ctr}, Incr, Acc) ->
Key = {{a,g,Ctr},a},
case ets:lookup(?TAB, Key) of
[] ->
- [];
+ Acc;
[{K, Pid, Prev}] ->
New = {K, Pid, Prev+Incr},
ets:insert(?TAB, New),
- [New]
+ [New|Acc]
end.
pid_to_give_away_to(P) when is_pid(P) ->
View
188 src/gproc_ps.erl
@@ -0,0 +1,188 @@
+%% ``The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved via the world wide web at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
+%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
+%% AB. All Rights Reserved.''
+%%
+%% @author Ulf Wiger <ulf.wiger@feuerlabs.com>
+%%
+%% @doc Gproc Publish/Subscribe patterns
+%% This module implements a few convenient functions for publish/subscribe.
+%%
+%% Publish/subscribe with Gproc relies entirely on gproc properties and counters.
+%% This makes for a very concise implementation, as the monitoring of subscribers and
+%% removal of subscriptions comes for free with Gproc.
+%%
+%% Using this module instead of rolling your own (which is easy enough) brings the
+%% benefit of consistency, in tracing and debugging.
+%% The implementation can also serve to illustrate how to use gproc properties and
+%% counters to good effect.
+%%
+%% @type scope() = l | g.
+%% @type event() = any().
+%% @type msg() = any().
+%% @type status() = 1 | 0.
+%% @end
+-module(gproc_ps).
+
+-export([subscribe/2,
+ unsubscribe/2,
+ publish/3,
+ list_subs/2
+ ]).
+
+-export([create_single/2,
+ delete_single/2,
+ disable_single/2,
+ enable_single/2,
+ tell_singles/3,
+ list_singles/2]).
+
+-define(ETag, gproc_ps_event).
+
+%% These types are duplicated above in EDoc syntax, since EDoc annoyingly doesn't pick up
+%% the type definitions, even if they are referred to in the -spec:s that EDoc does parse.
+-type scope() :: l | g.
+-type event() :: any().
+-type msg() :: any().
+-type status() :: 1 | 0.
+
+
+-spec subscribe(scope(), event()) -> true.
+%% @doc Subscribe to events of type `Event'
+%%
+%% Any messages published with `gproc_ps:publish(Scope, Event, Msg)' will be delivered to
+%% the current process, along with all other subscribers.
+%%
+%% This function creates a property, `{p,Scope,{gproc_ps_event,Event}}', which can be
+%% searched and displayed for debugging purposes.
+%% @end
+subscribe(Scope, Event) when Scope==l; Scope==g ->
+ gproc:reg({p,Scope,{?ETag, Event}}).
+
+-spec unsubscribe(scope(), event()) -> true.
+%% @doc Remove subscribtion created using `subscribe(Scope, Event)'
+%%
+%% This removes the property created through `subscribe/2'.
+%% @end
+unsubscribe(Scope, Event) when Scope==l; Scope==g ->
+ gproc:unreg({p,Scope,{?ETag, Event}}).
+
+-spec publish(scope(), event(), msg()) -> ok.
+%% @doc Publish the message `Msg' to all subscribers of `Event'
+%%
+%% The message delivered to each subscriber will be of the form:
+%%
+%% `{gproc_ps_event, Event, Msg}'
+%%
+%% The function uses `gproc:send/2' to send a message to all processes which have a
+%% property `{p,Scope,{gproc_ps_event,Event}}'.
+%% @end
+publish(Scope, Event, Msg) when Scope==l; Scope==g ->
+ gproc:send({p, Scope, {?ETag, Event}}, {?ETag, Event, Msg}).
+
+
+-spec list_subs(scope(), event()) -> [pid()].
+%% @doc List the pids of all processes subscribing to `Event'
+%%
+%% This function uses `gproc:select/2' to find all properties indicating a subscription.
+%% @end
+list_subs(Scope, Event) when Scope==l; Scope==g ->
+ gproc:select({Scope,p}, [{ {{p,Scope,{?ETag,Event}}, '$1', '_'}, [], ['$1'] }]).
+
+-spec create_single(scope(), event()) -> true.
+%% @doc Creates a single-shot subscription entry for Event
+%%
+%% Single-shot subscriptions behave similarly to the `{active,once}' property of sockets.
+%% Once a message has been published, the subscription is disabled, and no more messages
+%% will be delivered to the subscriber unless the subscription is re-enabled using
+%% `enable_single/2'.
+%%
+%% The function creates a gproc counter entry, `{c,Scope,{gproc_ps_event,Event}}', which
+%% will have either of the values `0' (disabled) or `1' (enabled). Initially, the value
+%% is `1', meaning the subscription is enabled.
+%%
+%% Counters are used in this case, since they can be atomically updated by both the
+%% subscriber (owner) and publisher. The publisher sets the counter value to `0' as soon
+%% as it has delivered a message.
+%% @end
+create_single(Scope, Event) when Scope==l; Scope==g ->
+ gproc:reg({c,Scope,{?ETag, Event}}, 1).
+
+-spec delete_single(scope(), event()) -> true.
+%% @doc Deletes the single-shot subscription for Event
+%%
+%% This function deletes the counter entry representing the single-shot description.
+%% An exception will be raised if there is no such subscription.
+%% @end
+delete_single(Scope, Event) when Scope==l; Scope==g ->
+ gproc:unreg({c,Scope,{?ETag, Event}}).
+
+-spec disable_single(scope(), event()) -> integer().
+%% @doc Disables the single-shot subscription for Event
+%%
+%% This function changes the value of the corresponding gproc counter to `0' (disabled).
+%%
+%% The subscription remains (e.g. for debugging purposes), but with a 'disabled' status.
+%% This function is insensitive to concurrency, using 'wrapping' ets counter update ops.
+%% This guarantees that the counter will have either the value 1 or 0, depending on which
+%% update happened last.
+%%
+%% The return value indicates the previous status.
+%% @end
+disable_single(Scope, Event) when Scope==l; Scope==g ->
+ gproc:update_counter({c,Scope,{?ETag,Event}}, {-1, 0, 0}).
+
+-spec enable_single(scope(), event()) -> integer().
+%% @doc Enables the single-shot subscription for Event
+%%
+%% This function changes the value of the corresponding gproc counter to `1' (enabled).
+%%
+%% After enabling, the subscriber will receive the next message published for `Event',
+%% after which the subscription is automatically disabled.
+%%
+%% This function is insensitive to concurrency, using 'wrapping' ets counter update ops.
+%% This guarantees that the counter will have either the value 1 or 0, depending on which
+%% update happened last.
+%%
+%% The return value indicates the previous status.
+%% @end
+enable_single(Scope, Event) when Scope==l; Scope==g ->
+ gproc:update_counter({c,Scope,{?ETag,Event}}, {1, 1, 1}).
+
+-spec tell_singles(scope(), event(), msg()) -> [pid()].
+%% @doc Publish `Msg` to all single-shot subscribers of `Event'
+%%
+%% The subscriber status of each active subscriber is changed to `0' (disabled) before
+%% delivering the message. This reduces the risk that two different processes will be able
+%% to both deliver a message before disabling the subscribers. This could happen if the
+%% context switch happens just after the select operation (finding the active subscribers)
+%% and before the process is able to update the counters. In this case, it is possible
+%% that more than one can be delivered.
+%%
+%% The way to prevent this from happening is to ensure that only one process publishes
+%% for `Event'.
+%% @end
+tell_singles(Scope, Event, Msg) when Scope==l; Scope==g ->
+ Subs = gproc:select(
+ {Scope,c},
+ [{ {{c,Scope,{?ETag,Event}}, '$1', 1}, [],
+ [{{ {{c,Scope, {{?ETag,Event}} }}, '$1', {{-1,0,0}} }}] }]),
+ gproc:update_counters(Scope, Subs),
+ [begin P ! {?ETag, Event, Msg}, P end || {_,P,_} <- Subs].
+
+-spec list_singles(scope(), event()) -> [{pid(), status()}].
+%% @doc Lists all single-shot subscribers of Event, together with their status
+%% @end
+list_singles(Scope, Event) ->
+ gproc:select({Scope,c}, [{ {{c,Scope,{?ETag,Event}}, '$1', '$2'},
+ [], [{{'$1','$2'}}] }]).
View
26 test/gproc_dist_tests.erl
@@ -50,6 +50,9 @@ dist_test_() ->
?debugVal(t_aggr_counter(Ns))
end,
fun() ->
+ ?debugVal(t_update_counters(Ns))
+ end,
+ fun() ->
?debugVal(t_shared_counter(Ns))
end,
fun() ->
@@ -140,6 +143,29 @@ t_aggr_counter([H1,H2|_] = Ns) ->
?assertMatch(ok, t_call(Pc2, die)),
?assertMatch(ok, t_call(Pa, die)).
+t_update_counters([H1,H2|_] = Ns) ->
+ {c,g,N1} = C1 = ?T_COUNTER,
+ A1 = {a,g,N1},
+ C2 = ?T_COUNTER,
+ P1 = t_spawn_reg(H1, C1, 2),
+ P12 = t_spawn_reg(H2, C1, 2),
+ P2 = t_spawn_reg(H2, C2, 1),
+ Pa1 = t_spawn_reg(H2, A1),
+ ?assertMatch(ok, t_read_everywhere(C1, P1, Ns, 2)),
+ ?assertMatch(ok, t_read_everywhere(C1, P12, Ns, 2)),
+ ?assertMatch(ok, t_read_everywhere(C2, P2, Ns, 1)),
+ ?assertMatch(ok, t_read_everywhere(A1, Pa1, Ns, 4)),
+ ?debugFmt("code:which(gproc_dist) = ~p~n", [code:which(gproc_dist)]),
+ ?assertMatch([3,4,0], t_call(P1, {apply, gproc, update_counters,
+ [g, [{C1,P1,1},{C1,P12,2},{C2,P2,{-2,0,0}}]]})),
+ ?assertMatch(ok, t_read_everywhere(C1, P1, Ns, 3)),
+ ?assertMatch(ok, t_read_everywhere(C1, P12, Ns, 4)),
+ ?assertMatch(ok, t_read_everywhere(C2, P2, Ns, 0)),
+ ?assertMatch(ok, t_read_everywhere(A1, Pa1, Ns, 7)),
+ ?assertMatch(ok, t_call(P1, die)),
+ ?assertMatch(ok, t_call(P12, die)),
+ ?assertMatch(ok, t_call(P2, die)).
+
t_mreg([H|_] = Ns) ->
Kvl = ?T_KVL,
View
26 test/gproc_tests.erl
@@ -75,6 +75,8 @@ reg_test_() ->
, ?_test(t_is_clean())
, {spawn, ?_test(?debugVal(t_simple_aggr_counter()))}
, ?_test(t_is_clean())
+ , {spawn, ?_test(?debugVal(t_update_counters()))}
+ , ?_test(t_is_clean())
, {spawn, ?_test(?debugVal(t_simple_prop()))}
, ?_test(t_is_clean())
, {spawn, ?_test(?debugVal(t_await()))}
@@ -158,6 +160,30 @@ t_simple_aggr_counter() ->
end,
?assert(gproc:get_value({a,l,c1}) =:= 7).
+t_update_counters() ->
+ ?assert(gproc:reg({c,l,c1}, 3) =:= true),
+ ?assert(gproc:reg({a,l,c1}) =:= true),
+ ?assert(gproc:get_value({a,l,c1}) =:= 3),
+ P = self(),
+ P1 = spawn_link(fun() ->
+ gproc:reg({c,l,c1}, 5),
+ P ! {self(), ok},
+ receive
+ {P, goodbye} -> ok
+ end
+ end),
+ receive {P1, ok} -> ok end,
+ ?assert(gproc:get_value({a,l,c1}) =:= 8),
+ ?assertEqual([7,8], gproc:update_counters(l, [{{c,l,c1}, self(), 4},
+ {{c,l,c1}, P1, 3}])),
+ ?assert(gproc:get_value({a,l,c1}) =:= 15),
+ P1 ! {self(), goodbye},
+ R = erlang:monitor(process, P1),
+ receive {'DOWN', R, _, _, _} ->
+ gproc:audit_process(P1)
+ end,
+ ?assert(gproc:get_value({a,l,c1}) =:= 7).
+
t_simple_prop() ->
?assert(gproc:reg({p,l,prop}) =:= true),
Please sign in to comment.
Something went wrong with that request. Please try again.