Skip to content

Commit

Permalink
Convert to using gproc instead of gen_event
Browse files Browse the repository at this point in the history
Signed-off-by: Gregory Haskins <ghaskins@novell.com>
  • Loading branch information
Gregory Haskins committed Apr 16, 2011
1 parent 798f356 commit e8c080d
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 32 deletions.
4 changes: 2 additions & 2 deletions agent/app/src/main/erlang/release_fsm.erl
Expand Up @@ -217,8 +217,8 @@ bind(State) ->
case net_adm:ping(State#state.cnode) of
pong ->
error_logger:info_msg("Binding complete~n", []),
gen_event:notify({global, edist_event_bus},
{online, State#state.cnode}),
edist_event_bus:notify(edist_agents,
{online, State#state.cnode}),
true;
_ ->
false
Expand Down
9 changes: 9 additions & 0 deletions controller/app/pom.xml
Expand Up @@ -6,6 +6,15 @@
<version>0.1-SNAPSHOT</version>
<packaging>erlang-otp</packaging>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>edist_lib</artifactId>
<version>${project.version}</version>
<type>erlang-otp</type>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
Expand Down
11 changes: 4 additions & 7 deletions controller/app/src/main/erlang/edist_controller.erl
Expand Up @@ -183,8 +183,7 @@ handle_call({commit_release, Name, Vsn}, _From, State) ->
throw({"bad state", Version})
end
end),
gen_event:notify({global, edist_event_bus},
{release_update, Name, Vsn}),
edist_event_bus:notify(edist_releases, {update, Name, Vsn}),
ok
end,
try mnesia:transaction(F) of
Expand Down Expand Up @@ -271,8 +270,7 @@ handle_call({client, join, Cookie, Facts}, _From, State) ->

UpdatedClient = Client#client{joined=true, facts=Facts},
mnesia:write(edist_controller_clients, UpdatedClient, write),
gen_event:notify({global, edist_event_bus},
{agent_join, Cookie, Facts}),
edist_event_bus:notify(edist_agents, {join, Cookie, Facts}),
{ok, Releases}
end,
try mnesia:transaction(F) of
Expand Down Expand Up @@ -403,9 +401,8 @@ handle_info({'DOWN', Ref, process, Pid, _Info}, State) ->
fun(Client) ->
mnesia:delete_object(Tab, Client, write),

gen_event:notify({global, edist_event_bus},
{agent_leave,
Client#client.cookie})
edist_event_bus:notify(edist_agents,
{leave, Client#client.cookie})
end,

Q = qlc:q([ DelClient(Client)
Expand Down
5 changes: 1 addition & 4 deletions controller/app/src/main/erlang/edist_controller_app.erl
Expand Up @@ -18,10 +18,7 @@ start(_Type, _StartArgs) ->
_:_ -> [node()]
end,

{ok, Pid} = edist_controller_sup:start_link(Nodes),
gen_event:add_handler({global, edist_event_bus}, event_logger, []),

{ok, Pid}.
edist_controller_sup:start_link(Nodes).

stop(_State) ->
ok.
Expand Down
12 changes: 6 additions & 6 deletions controller/app/src/main/erlang/edist_controller_sup.erl
Expand Up @@ -15,6 +15,9 @@

-define(SERVER, ?MODULE).

%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type, Args), {I, {I, start_link, Args}, permanent, 5000, Type, [I]}).

start_link(Nodes) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [Nodes]).

Expand All @@ -23,12 +26,9 @@ start_child(ChildSpec) ->

init([Nodes]) ->
{ok,{{one_for_all,0,1},
[{'edist-controller',
{edist_controller,start_link,[Nodes]},
permanent, 2000, worker,[controller]},
{'edist-event-bus',
{gen_event, start_link, [{global, edist_event_bus}]},
permanent, 5000, worker, dynamic}
[
?CHILD(edist_controller, worker, [Nodes]),
?CHILD(event_logger, worker, [])
]
}
}.
Expand Down
22 changes: 19 additions & 3 deletions controller/app/src/main/erlang/event_logger.erl
@@ -1,12 +1,28 @@
-module(event_logger).
-export([init/1, handle_event/2, terminate/2]).
-behavor(gen_server).

-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

start_link() ->
gen_server:start_link(?MODULE, [], []).

init(_Args) ->
edist_event_bus:subscribe([]),
{ok, []}.

handle_event(Event, State) ->
handle_call(_Request, _From, State) ->
{reply, {error, enotsup}, State}.

handle_cast(_Request, State) ->
{noreply, State}.

handle_info({edist_event_bus, Header, Data}=Event, State) ->
error_logger:info_msg("Event: ~p~n", [Event]),
{ok, State}.
{noreply, State}.

terminate(_Args, _State) ->
ok.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.
2 changes: 1 addition & 1 deletion controller/app/src/main/erlang/release_input_device.erl
Expand Up @@ -93,7 +93,7 @@ terminate(normal, #state{name=Name,vsn=Vsn,elem_id=Id} = State) ->
ok
end,
{atomic, ok} = mnesia:transaction(F),
gen_event:notify({global, event_bus}, {release_installed, Name, Vsn}),
edist_event_bus:notify(edist_releases, {installed, Name, Vsn}),
ok;
terminate(_Reason, #state{name=Name, vsn=Vsn} = _State) ->
% issue a compensating transaction to remove all traces of this instance
Expand Down
15 changes: 15 additions & 0 deletions lib/pom.xml
Expand Up @@ -6,6 +6,21 @@
<version>0.1-SNAPSHOT</version>
<packaging>erlang-otp</packaging>

<dependencies>
<dependency>
<groupId>com.github.esl</groupId>
<artifactId>gproc</artifactId>
<version>0.1.2</version>
<type>erlang-otp</type>
</dependency>
<dependency>
<groupId>com.github.abecciu.revival</groupId>
<artifactId>gen_leader</artifactId>
<version>1.0</version>
<type>erlang-otp</type>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
Expand Down
60 changes: 60 additions & 0 deletions lib/src/main/erlang/edist_event_bus.erl
@@ -0,0 +1,60 @@
-module(edist_event_bus).
-export([subscribe/1, subscribe/2, notify/2, notify/3]).

-include_lib("stdlib/include/qlc.hrl").

-record(subscriber, {cookie}).
-record(options, {scope=global, cookie}).

subscribe(Options) ->
subscribe(all, Options).

subscribe(EventType, RawOptions) ->
Options = parse_options(RawOptions),
Scope = case Options#options.scope of
local -> l;
global -> g
end,
gproc:reg({p, Scope, {?MODULE, EventType}},
#subscriber{cookie=Options#options.cookie}).

notify(EventType, Msg) ->
notify(EventType, Msg, []).

notify(EventType, Msg, RawOptions) ->
Options = parse_options(RawOptions),
Scope = Options#options.scope,

Q = qlc:q([{P, S}
|| {{p, '_', {?MODULE, E}}, P, S} <- gproc:table({Scope, props}),
E =:= EventType orelse E =:= all]),
lists:foreach(fun({P, S}) ->
Header = [
{headerver, 1},
{from, self()},
{type, EventType},
{cookie, S#subscriber.cookie}
],

P ! {?MODULE, Header, Msg}
end,
qlc:e(Q)).

parse_options(List) ->
parse_options(List, #options{}).

parse_options([{scope, RawScope} | T], Options) ->
Scope = case RawScope of
local -> local;
_ -> global
end,
parse_options(T, Options#options{scope=Scope});
parse_options([{cookie, Cookie} | T], Options) ->
parse_options(T, Options#options{cookie=Cookie});
parse_options([H | _T], _Options) ->
throw({earg, H});
parse_options([], Options) ->
Options.



27 changes: 27 additions & 0 deletions lib/src/test/erlang/edist_event_bus_test.erl
@@ -0,0 +1,27 @@
-module(edist_event_bus_test).
-include_lib("eunit/include/eunit.hrl").

init_test() ->
ok = application:start(gen_leader),
ok = application:start(gproc).

subscribe_test() ->
true = edist_event_bus:subscribe(foo, [{cookie, bar}, {scope, local}]),
edist_event_bus:notify(foo, baz, [{scope, local}]),

receive
{edist_event_bus, _Header, baz} ->
ok;
Msg -> throw({"Unexpected msg", Msg})
after
0 -> throw(timeout)
end.

badoptions_test() ->
try
edist_event_bus:subscribe(bar, [{bar, baz}]),
throw("unexpected success")
catch
throw:{earg, {bar, baz}} -> ok
end.

9 changes: 0 additions & 9 deletions test-script
Expand Up @@ -2,7 +2,6 @@
;; If you want to create a file, visit that file with C-x C-f,
;; then enter the text in that file's own buffer.

net_adm:ping('contact1@linux-mp').
Name = "client-release".
Vsn = "0.1-SNAPSHOT".
File = "/home/ghaskins/sandbox/git/edist/demo/client/release/target/" ++ Name ++ "-" ++ Vsn ++ ".tar.gz".
Expand All @@ -16,14 +15,6 @@ ok = edist_controller:commit_release(Name, Vsn, []).
ok = edist_controller:create_group("default", matchall, [Name], []).


---------------------------------

application:start(sasl).
application:start(agent_link).
application:start(genevent_bridge).
application:start(edist_lib).
application:start(edist_agent).




0 comments on commit e8c080d

Please sign in to comment.