Skip to content

Commit

Permalink
Added subscribe to pipe test and operator interface function
Browse files Browse the repository at this point in the history
get subscribers to pipe
  • Loading branch information
Gianfranco Alongi committed Mar 21, 2012
1 parent eadabea commit 68ed72d
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 10 deletions.
25 changes: 22 additions & 3 deletions Three/Solution/src/pubsub_pipes.erl
Expand Up @@ -7,7 +7,9 @@
-behaviour(gen_server).

-export([new_pipe/1,
get_pipes/0
get_pipes/0,
subscribe_to_pipe/1,
get_subscribers_to_pipe/1
]).
-export([start_link/0,
stop/0
Expand Down Expand Up @@ -35,9 +37,17 @@ new_pipe(PipeName) ->
get_pipes() ->
gen_server:call(?MODULE,get_pipes).

-spec(subscribe_to_pipe(string()) -> ok).
subscribe_to_pipe(PipeName) ->
gen_server:call(?MODULE,{subscribe_to_pipe,PipeName,self()}).

-spec(get_subscribers_to_pipe(string()) -> [string()]).
get_subscribers_to_pipe(PipeName) ->
gen_server:call(?MODULE,{get_subscribers_to_pipe,PipeName}).

%%%===================================================================
init([]) ->
{ok, #state{pipes = ets:new(pipes,[bag])}}.
{ok, #state{pipes = ets:new(pipes,[set])}}.

handle_call(stop,_From,State) ->
{stop,normal,ok,State};
Expand All @@ -50,7 +60,16 @@ handle_call(get_pipes,_From,State) ->
Pipes = ets:foldl(fun({Key,_},Acc) -> [Key|Acc] end,
[],
State#state.pipes),
{reply,Pipes, State}.
{reply,Pipes, State};

handle_call({subscribe_to_pipe,PipeName,Pid},_From,State) ->
[{PipeName,Subscribers}] = ets:lookup(State#state.pipes,PipeName),
ets:insert(State#state.pipes,[{PipeName,[Pid|Subscribers]}]),
{reply,ok,State};

handle_call({get_subscribers_to_pipe,PipeName},_From,State) ->
[{PipeName,Subscribers}] = ets:lookup(State#state.pipes,PipeName),
{reply,Subscribers,State}.

handle_cast(_Msg, State) ->
{noreply, State}.
Expand Down
26 changes: 19 additions & 7 deletions Three/Solution/test/pubsub_pipes_tests.erl
@@ -1,15 +1,14 @@
-module(pubsub_pipes_tests).
-include_lib("eunit/include/eunit.hrl").

create_pipe_test_() ->
{setup,
pipe_test_() ->
{foreach,
fun setup/0,
fun cleanup/1,
fun() ->
PipeName = "A",
ok = pubsub_pipes:new_pipe(PipeName),
?assertEqual([PipeName],pubsub_pipes:get_pipes())
end}.
[
fun create_pipe/0,
fun register_on_pipe/0
]}.

setup() ->
pubsub_pipes:start_link().
Expand All @@ -18,3 +17,16 @@ cleanup(_) ->
pubsub_pipes:stop().


create_pipe() ->
PipeName = "A",
ok = pubsub_pipes:new_pipe(PipeName),
?assertEqual([PipeName],pubsub_pipes:get_pipes()).

register_on_pipe() ->
PipeName = "A",
Self = self(),
ok = pubsub_pipes:new_pipe(PipeName),
ok = pubsub_pipes:subscribe_to_pipe(PipeName),
?assertEqual([Self],pubsub_pipes:get_subscribers_to_pipe(PipeName)).


0 comments on commit 68ed72d

Please sign in to comment.