Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Added subscriber receival test

  • Loading branch information...
commit d1910e11abea6fe6ad8d57c2a1179e21538a6d31 1 parent 17633e7
Gianfranco Alongi authored
View
3  Three/Solution/src/message.hrl
@@ -1,3 +1,4 @@
--record(message,{body :: binary(),
+-record(message,{pipe ::string(),
+ body :: binary(),
byte_size :: non_neg_integer()
}).
View
20 Three/Solution/src/pubsub_pipes.erl
@@ -5,12 +5,14 @@
%%%-------------------------------------------------------------------
-module(pubsub_pipes).
-behaviour(gen_server).
+-include("message.hrl").
-export([new_pipe/1,
get_pipes/0,
subscribe_to_pipe/1,
get_subscribers_to_pipe/1,
- unsubscribe_from_pipe/1
+ unsubscribe_from_pipe/1,
+ publish_message_on_pipe/2
]).
-export([start_link/0,
stop/0
@@ -49,6 +51,11 @@ get_subscribers_to_pipe(PipeName) ->
-spec(unsubscribe_from_pipe(string()) -> ok).
unsubscribe_from_pipe(PipeName) ->
gen_server:call(?MODULE,{unsubscribe_from_pipe,PipeName,self()}).
+
+-spec(publish_message_on_pipe(string(),binary()) -> ok).
+publish_message_on_pipe(PipeName,Message) ->
+ gen_server:call(?MODULE,{publish_message,PipeName,Message}).
+
%%%===================================================================
init([]) ->
@@ -80,9 +87,20 @@ handle_call({unsubscribe_from_pipe,PipeName,Pid},_From,State) ->
[{PipeName,Subscribers}] = ets:lookup(State#state.pipes,PipeName),
Removed = [ X || X <- Subscribers, X =/= Pid],
ets:insert(State#state.pipes,[{PipeName,Removed}]),
+ {reply,ok,State};
+
+handle_call({publish_message,PipeName,BinaryMessage},_From,State) ->
+ [{PipeName,Subscribers}] = ets:lookup(State#state.pipes,PipeName),
+ Message = #message{pipe = PipeName,
+ body = BinaryMessage,
+ byte_size = erlang:byte_size(BinaryMessage)
+ },
+ lists:foreach(fun(Subscriber) -> Subscriber ! Message end,
+ Subscribers),
{reply,ok,State}.
+
handle_cast(_Msg, State) ->
{noreply, State}.
View
18 Three/Solution/test/pubsub_pipes_tests.erl
@@ -1,5 +1,6 @@
-module(pubsub_pipes_tests).
-include_lib("eunit/include/eunit.hrl").
+-include("../src/message.hrl").
pipe_test_() ->
{foreach,
@@ -8,7 +9,8 @@ pipe_test_() ->
[
fun create_pipe/0,
fun register_on_pipe/0,
- fun deregister_from_pipe/0
+ fun deregister_from_pipe/0,
+ fun publish_message_on_pipe/0
]}.
setup() ->
@@ -36,6 +38,20 @@ deregister_from_pipe() ->
ok = pubsub_pipes:subscribe_to_pipe(PipeName),
ok = pubsub_pipes:unsubscribe_from_pipe(PipeName),
?assertEqual([],pubsub_pipes:get_subscribers_to_pipe(PipeName)).
+
+publish_message_on_pipe() ->
+ PipeName = "A",
+ Message = <<1,2,3,4,5>>,
+ ok = pubsub_pipes:new_pipe(PipeName),
+ ok = pubsub_pipes:subscribe_to_pipe(PipeName),
+ ok = pubsub_pipes:publish_message_on_pipe(PipeName,Message),
+ Received = receive X -> X end,
+ ?assertMatch(
+ #message{pipe = PipeName,
+ body = Message,
+ byte_size = 5},
+ Received).
+

0 comments on commit d1910e1

Please sign in to comment.
Something went wrong with that request. Please try again.