Skip to content

Latest commit

 

History

History
455 lines (280 loc) · 15.9 KB

gproc_ps.md

File metadata and controls

455 lines (280 loc) · 15.9 KB

Module gproc_ps

Gproc Publish/Subscribe patterns This module implements a few convenient functions for publish/subscribe.

Authors: Ulf Wiger (ulf@wiger.net).

Description

Publish/subscribe with Gproc relies entirely on gproc properties and counters. This makes for a very concise implementation, as the monitoring of subscribers and removal of subscriptions comes for free with Gproc.

Using this module instead of rolling your own (which is easy enough) brings the benefit of consistency, in tracing and debugging. The implementation can also serve to illustrate how to use gproc properties and counters to good effect.

Data Types


cond_spec() = undefined | ets:match_spec()

event() = any()

msg() = any()

notification() = {?ETag, event(), msg()}

scope() = l | g

status() = 1 | 0

Function Index

change_cond/3Change the condition specification of an existing subscription.
change_cond_remote/3Change the condition spec of a subscription created from a remote node.
create_single/2Creates a single-shot subscription entry for Event.
create_single_remote/2Create a local single-shot subscription from a remote node.
delete_single/2Deletes the single-shot subscription for Event.
delete_single_remote/2Delete a single-shot subscription created from a remote node.
disable_single/2Disables the single-shot subscription for Event.
disable_single_remote/2Disable a single-shot subscription created from a remote node.
enable_single/2Enables the single-shot subscription for Event.
enable_single_remote/2Enable a single-shot subscription created from a remote node.
list_singles/2Lists all single-shot subscribers of Event, together with their status.
list_subs/2List the pids of all processes subscribing to Event
notify_single_if_true/4Create/enable a single subscription for event; notify at once if F() -> true.
publish/3Publish the message Msg to all subscribers of Event
publish_cond/3Publishes the message Msg to conditional subscribers of Event
subscribe/2Subscribe to events of type Event
subscribe_cond/3Subscribe conditionally to events of type Event
subscribe_cond_remote/3Subscribe conditionally from a remote node.
subscribe_remote/2Subscribe from a remote node.
tell_singles/3Publish Msg to all single-shot subscribers of Event
unsubscribe/2Remove subscription created using subscribe(Scope, Event)
unsubscribe_remote/2Remove subscription created from a remote node.

Function Details

change_cond/3


change_cond(Scope::scope(), Event::event(), Spec::cond_spec()) -> true

Change the condition specification of an existing subscription.

This function atomically changes the condition spec of an existing subscription (see subscribe_cond/3). An exception is raised if the subscription doesn't already exist.

Note that this function can also be used to change a conditional subscription to an unconditional one (by setting Spec = undefined), or a 'normal' subscription to a conditional one.

change_cond_remote/3


change_cond_remote(Node::node(), Event::event(), Spec::cond_spec()) -> true

Change the condition spec of a subscription created from a remote node

create_single/2


create_single(Scope::scope(), Event::event()) -> true

Creates a single-shot subscription entry for Event

Single-shot subscriptions behave similarly to the {active,once} property of sockets. Once a message has been published, the subscription is disabled, and no more messages will be delivered to the subscriber unless the subscription is re-enabled using enable_single/2.

The function creates a gproc counter entry, {c,Scope,{gproc_ps_event,Event}}, which will have either of the values 0 (disabled) or 1 (enabled). Initially, the value is 1, meaning the subscription is enabled.

Counters are used in this case, since they can be atomically updated by both the subscriber (owner) and publisher. The publisher sets the counter value to 0 as soon as it has delivered a message.

create_single_remote/2


create_single_remote(Node::node(), Event::event()) -> true

Create a local single-shot subscription from a remote node

delete_single/2


delete_single(Scope::scope(), Event::event()) -> true

Deletes the single-shot subscription for Event

This function deletes the counter entry representing the single-shot description. An exception will be raised if there is no such subscription.

delete_single_remote/2


delete_single_remote(Node::node(), Event::event()) -> true

Delete a single-shot subscription created from a remote node

disable_single/2


disable_single(Scope::scope(), Event::event()) -> integer()

Disables the single-shot subscription for Event

This function changes the value of the corresponding gproc counter to 0 (disabled).

The subscription remains (e.g. for debugging purposes), but with a 'disabled' status. This function is insensitive to concurrency, using 'wrapping' ets counter update ops. This guarantees that the counter will have either the value 1 or 0, depending on which update happened last.

The return value indicates the previous status.

disable_single_remote/2


disable_single_remote(Node::node(), Event::event()) -> status()

Disable a single-shot subscription created from a remote node

enable_single/2


enable_single(Scope::scope(), Event::event()) -> integer()

Enables the single-shot subscription for Event

This function changes the value of the corresponding gproc counter to 1 (enabled).

After enabling, the subscriber will receive the next message published for Event, after which the subscription is automatically disabled.

This function is insensitive to concurrency, using 'wrapping' ets counter update ops. This guarantees that the counter will have either the value 1 or 0, depending on which update happened last.

The return value indicates the previous status.

enable_single_remote/2


enable_single_remote(Node::node(), Event::event()) -> status()

Enable a single-shot subscription created from a remote node

list_singles/2


list_singles(Scope::scope(), Event::event()) -> [{pid(), status()}]

Lists all single-shot subscribers of Event, together with their status

list_subs/2


list_subs(Scope::scope(), Event::event()) -> [pid()]

List the pids of all processes subscribing to Event

This function uses gproc:select/2 to find all properties indicating a subscription.

notify_single_if_true/4


notify_single_if_true(Scope::scope(), Event::event(), F::fun(() -> boolean()), Msg::msg()) -> ok

Create/enable a single subscription for event; notify at once if F() -> true

This function is a convenience function, wrapping a single-shot pub/sub around a user-provided boolean test. Msg should be what the publisher will send later, if the immediate test returns false.

publish/3


publish(Scope::scope(), Event::event(), Msg::msg()) -> notification()

Publish the message Msg to all subscribers of Event

The message delivered to each subscriber will be of the form:

{gproc_ps_event, Event, Msg}

The function uses gproc:send/2 to send a message to all processes which have a property {p,Scope,{gproc_ps_event,Event}}.

publish_cond/3


publish_cond(Scope::scope(), Event::event(), Msg::msg()) -> notification()

Publishes the message Msg to conditional subscribers of Event

The message will be delivered to each subscriber provided their respective condition tests succeed.

See also: subscribe_cond/3.

subscribe/2


subscribe(Scope::scope(), Event::event()) -> true

Subscribe to events of type Event

Any messages published with gproc_ps:publish(Scope, Event, Msg) will be delivered to the current process, along with all other subscribers.

This function creates a property, {p,Scope,{gproc_ps_event,Event}}, which can be searched and displayed for debugging purposes.

Note that, as with gproc:reg/1, this function will raise an exception if you try to subscribe to the same event twice from the same process.

subscribe_cond/3


subscribe_cond(Scope::scope(), Event::event(), Spec::cond_spec()) -> true

Subscribe conditionally to events of type Event

This function is similar to subscribe/2, but adds a condition in the form of a match specification.

The condition is tested by the publish_cond/3 function and a message is delivered only if the condition is true. Specifically, the test is:

ets:match_spec_run([Msg], ets:match_spec_compile(Cond)) == [true]

In other words, if the match_spec returns true for a message, that message is sent to the subscriber. For any other result from the match_spec, the message is not sent. Cond == undefined means that all messages will be delivered (that is, publish_cond/3 will treat 'normal' subscribers just like publish/3 does, except that publish/3 strictly speaking ignores the Value part of the property completely, whereas publish_cond/3 expects it to be either undefined or a valid match spec).

This means that Cond=undefined and Cond=[{'_',[],[true]}] are equivalent.

Note that, as with gproc:reg/1, this function will raise an exception if you try to subscribe to the same event twice from the same process.

subscribe_cond_remote/3


subscribe_cond_remote(Node::node(), Event::event(), Spec::cond_spec()) -> true

Subscribe conditionally from a remote node

subscribe_remote/2


subscribe_remote(Node::node(), Event::event()) -> true

Subscribe from a remote node

This functions as gproc_ps:subscribe(Scope, Event), but is supposed to be called from a remote node (it will fail if called from a local process).

tell_singles/3


tell_singles(Scope::scope(), Event::event(), Msg::msg()) -> [pid()]

Publish Msg to all single-shot subscribers of Event

The subscriber status of each active subscriber is changed to 0 (disabled) before delivering the message. This reduces the risk that two different processes will be able to both deliver a message before disabling the subscribers. This could happen if the context switch happens just after the select operation (finding the active subscribers) and before the process is able to update the counters. In this case, it is possible that more than one can be delivered.

The way to prevent this from happening is to ensure that only one process publishes for Event.

unsubscribe/2


unsubscribe(Scope::scope(), Event::event()) -> true

Remove subscription created using subscribe(Scope, Event)

This removes the property created through subscribe/2.

unsubscribe_remote/2


unsubscribe_remote(Node::node(), Event::event()) -> true

Remove subscription created from a remote node