diff --git a/Makefile b/Makefile index 4157a3a..9448a3d 100644 --- a/Makefile +++ b/Makefile @@ -7,6 +7,7 @@ COMPILE_FIRST = erwa_middleware.erl DEPS = cowboy ranch wamper dep_cowboy = git https://github.com/ninenines/cowboy.git master +dep_ranch = git https://github.com/ninenines/ranch.git master dep_wamper = git https://github.com/bwegh/wamper master diff --git a/README.md b/README.md index fdb562b..3f5f836 100644 --- a/README.md +++ b/README.md @@ -56,8 +56,8 @@ Erwa has the following features: Router ====== -The router implementation in Erwa uses the great [ranch](https://github.com/extend/ranch) -and [cowboy](https://github.com/extend/cowboy) from [Loïc Hoguin (essen)](https://github.com/essen) +The router implementation in Erwa uses the great [ranch](https://github.com/ninenines/ranch) +and [cowboy](https://github.com/ninenines/cowboy) from [Loïc Hoguin (essen)](https://github.com/essen) to handle the incomming connections and the webserver part for the websockets. Erwa has two modules to work either as a protocol for ranch on incomming TCP connections, or as websocket handler with cowboy on incomming websocket connections. @@ -66,25 +66,20 @@ All you need to do to get a simple WAMP router up and running is to add a dispat ranch and/or cowboy: A WAMP router on websockets: -* using erwa_in_handler as the websocket handler, by dispatching a certain path to conditions -* starting cowboy on a certain port (here 8080) and add the dispatch rule ```Erlang -%% a rule to dispatch incomming connections to any host with the path /wamp to the erwa_in_handler -Dispatch = cowboy_router:compile([ {'_', [ {"/wamp", erwa_in_handler, []}, ]} ]), -%% fire up cowboy with the dispatch rule for the wamp connection -{ok, _} = cowboy:start_http(http, 100, [{port, 8080}],[{env, [{dispatch, Dispatch}]}]), +%% start erwa to handle any incomming connections to any host at the path /wamp +%% start it with 100 parallel acceptors on port 8080 +ok = erwa:start_websocket("/wamp", 8080, 100). ``` In the examples directory you can find the simple_router which includes just the above and starts a WAMP router, including a simple javascript client, using [wampy.js](https://github.com/KSDaemon/wampy.js). The other possibility is to start Erwa as a TCP router: -Erwa implements a protocol for ranch in the erwa_in_handler modules. -So starting and tcp router is done by starting ranch with -erwa_in_handler as the protocol: ```Erlang -%% start ranch with the wamp protocol by using erwa_in_handler on port 555 -{ok,_} = ranch:start_listener(erwa_tcp, 5, ranch_tcp, [{port,5555}], erwa_in_handler, []), +%% start erwa listening for raw tcp connections on port 5555 +%% starting it with 5 parallel acceptors +ok = erwa:start_socket(5555,5). ``` This is also included in the simple_router example in the examples directory. diff --git a/src/erwa.app.src b/src/erwa.app.src index 792a8a8..eaa7692 100644 --- a/src/erwa.app.src +++ b/src/erwa.app.src @@ -35,6 +35,7 @@ kernel, stdlib, wamper, + mnesia, crypto ]}, {mod, {erwa_app, []}}, diff --git a/src/erwa.erl b/src/erwa.erl index 8d54aad..6fa0f02 100644 --- a/src/erwa.erl +++ b/src/erwa.erl @@ -37,8 +37,10 @@ -export([get_routing_for_realm/1]). - -export([get_version/0]). +-export([start_websocket/3]). +-export([start_websocket/4]). +-export([start_socket/2]). %% @doc returns the version string for the application, used as agent description @@ -50,7 +52,31 @@ get_version() -> end, << <<"Erwa-">>/binary, Ver/binary >>. - +%% @doc start router listening on websocket +-spec start_websocket( Path :: string(), Port :: integer(), Acceptors :: + integer() ) -> ok. +start_websocket(Path, Port, Acceptors) -> + start_websocket(Path, Port, Acceptors, []). + +%% @doc start router listening on websocket with custom handlers added +-spec start_websocket( Path :: string(), Port :: integer(), Acceptors :: + non_neg_integer(), Handlers :: [term()] ) -> ok. +start_websocket(Path, Port, Acceptors, Handlers) -> + Dispatch = cowboy_router:compile([{'_',[ {Path, erwa_in_ws, + [] } | Handlers] }]), + {ok, _} = cowboy:start_http(erwa_http, Acceptors, [{port, Port}], [{env, + [{dispatch, + Dispatch}]}]), + ok. + + +%% @doc start the router listening for raw tcp connections +-spec start_socket(Port :: non_neg_integer(), Acceptors :: non_neg_integer()) -> + ok. +start_socket(Port, Acceptors) -> + {ok, _} = ranch:start_listener(erwa_tcp, Acceptors, ranch_tcp, [{port, Port}], + erwa_in_tcp, []), + ok. %% for router diff --git a/src/erwa_broker.erl b/src/erwa_broker.erl index c271a9e..6cb2924 100644 --- a/src/erwa_broker.erl +++ b/src/erwa_broker.erl @@ -19,6 +19,7 @@ %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %% SOFTWARE. %% +%% @doc the broker is a per realm gen_server that -module(erwa_broker). -behaviour(gen_server). @@ -61,171 +62,171 @@ -export([get_data/1]). -define(FEATURES,#{features => #{ - event_history => false, - partitioned_pubsub => false, - pattern_based_subscription => false, - publication_trustlevels => false, - publisher_exclusion => true, - publisher_identification => true, - subscriber_blackwhite_listing => true, - subscriber_list => false, - subscriber_metaevents => false - } + event_history => false, + partitioned_pubsub => false, + pattern_based_subscription => false, + publication_trustlevels => false, + publisher_exclusion => true, + publisher_identification => true, + subscriber_blackwhite_listing => true, + subscriber_list => false, + subscriber_metaevents => false + } - } - ). + } + ). -record(data, { - ets = none, - pid = unknown, - features = ?FEATURES - }). + ets = none, + pid = unknown, + features = ?FEATURES + }). -record(state, { - ets = none, - meta_events = enabled - }). + ets = none, + meta_events = enabled + }). -record(topic, { - uri = unknown, - id = none, - match = exact, - created = unknown, - subscribers = []}). + uri = unknown, + id = none, + match = exact, + created = unknown, + subscribers = []}). -record(id_topic, { - id = none, - topic = unknown - }). + id = none, + topic = unknown + }). -record(id_info, { - id = unknown, - topics = [] - }). + id = unknown, + topics = [] + }). start() -> - gen_server:start(?MODULE, [], []). + gen_server:start(?MODULE, [], []). start_link() -> - gen_server:start_link(?MODULE, [], []). + gen_server:start_link(?MODULE, [], []). -spec get_data( pid() ) -> {ok, record(data)}. get_data(Pid) -> - gen_server:call(Pid, get_data). + gen_server:call(Pid, get_data). -spec enable_metaevents(record(data) ) -> ok. enable_metaevents( #data{pid=Pid} ) -> - gen_server:call(Pid,enable_metaevents). + gen_server:call(Pid,enable_metaevents). -spec disable_metaevents( record(data) ) -> ok. disable_metaevents( #data{pid=Pid} ) -> - gen_server:call(Pid,disable_metaevents). + gen_server:call(Pid,disable_metaevents). -spec get_subscriptions( record(data) ) -> map(). get_subscriptions(#data{pid=Pid}) -> - gen_server:call(Pid,get_subscriptions). + gen_server:call(Pid,get_subscriptions). -spec get_subscription( record(data), non_neg_integer() ) -> map(). get_subscription(#data{pid=Pid},SubscriptionId) -> - gen_server:call(Pid,{get_subscription,SubscriptionId}). + gen_server:call(Pid,{get_subscription,SubscriptionId}). -spec subscribe(Topic::binary(),Options::map(), SessionId :: non_neg_integer(), Data::record(data)) -> {ok, non_neg_integer()}. subscribe(Topic,Options,SessionId,#data{pid=Pid}) -> - gen_server:call(Pid, {subscribe,Topic,Options,SessionId} ). + gen_server:call(Pid, {subscribe,Topic,Options,SessionId} ). -spec unsubscribe(SubscriptionId::non_neg_integer(), SessionId::non_neg_integer(), Data::record(data)) -> ok | {error, Reason::term()}. unsubscribe(SubscriptionId,SessionId,#data{pid=Pid}) -> - gen_server:call(Pid, {unsubscribe,SubscriptionId,SessionId} ). + gen_server:call(Pid, {unsubscribe,SubscriptionId,SessionId} ). -spec unsubscribe_all(SessionId::non_neg_integer(), Data::record(data)) -> ok. unsubscribe_all(SessionId, #data{pid=Pid}) -> - gen_server:call(Pid,{unsubscribe_all,SessionId}). + gen_server:call(Pid,{unsubscribe_all,SessionId}). -spec publish(Topic::binary(),Options::map(),Arguments :: list(), ArgumentsKw :: map(), Session :: term(), Data::record(data)) -> - {ok, non_neg_integer()}. + {ok, non_neg_integer()}. publish(TopicUri,Options,Arguments,ArgumentsKw,SessionId,#data{ets=Ets}) -> - case ets:lookup(Ets,TopicUri) of - [#topic{subscribers=Subs,id=SubscriptionId}] -> - {ok,PublicationID} = erwa_publications:get_pub_id(), - Receipients = case maps:get(exclude_me,Options,true) of - false -> - Subs; - _ -> - lists:delete(SessionId,Subs) - end, - Details = case maps:get(disclose_me,Options,false) of - true -> #{publisher => SessionId}; - _ -> #{} - end, - - ToExclude = maps:get(exclude,Options,[]), - ToEligible = maps:get(eligible,Options,Receipients), - SendFilter = fun(SessId) -> - case (not lists:member(SessId,ToExclude)) and lists:member(SessId,ToEligible) of - true -> - Msg = {event,SubscriptionId,PublicationID,Details,Arguments,ArgumentsKw}, - erwa_sessions:send_message_to(Msg,SessId), - true; - false -> - false - end - end, - lists:filter(SendFilter,Receipients), - {ok,PublicationID}; - [] -> - {ok,gen_id()} - end. + case ets:lookup(Ets,TopicUri) of + [#topic{subscribers=Subs,id=SubscriptionId}] -> + {ok,PublicationID} = erwa_publications:get_pub_id(), + Receipients = case maps:get(exclude_me,Options,true) of + false -> + Subs; + _ -> + lists:delete(SessionId,Subs) + end, + Details = case maps:get(disclose_me,Options,false) of + true -> #{publisher => SessionId}; + _ -> #{} + end, + + ToExclude = maps:get(exclude,Options,[]), + ToEligible = maps:get(eligible,Options,Receipients), + SendFilter = fun(SessId) -> + case (not lists:member(SessId,ToExclude)) and lists:member(SessId,ToEligible) of + true -> + Msg = {event,SubscriptionId,PublicationID,Details,Arguments,ArgumentsKw}, + erwa_sessions:send_message_to(Msg,SessId), + true; + false -> + false + end + end, + lists:filter(SendFilter,Receipients), + {ok,PublicationID}; + [] -> + {ok,gen_id()} + end. -spec get_features(record(data)) -> term(). get_features(#data{features = F}) -> - F. + F. stop(#data{pid=Pid}) -> - stop(Pid); + stop(Pid); stop(Pid) -> - gen_server:call(Pid, stop). + gen_server:call(Pid, stop). - %% gen_server. +%% gen_server. init([]) -> - Ets = ets:new(events,[set,{keypos,2}]), + Ets = ets:new(events,[set,{keypos,2}]), {ok, #state{ets=Ets}}. handle_call({subscribe,TopicUri,Options,SessionId}, _From, State) -> - Result = int_subscribe(TopicUri,Options,SessionId,State), + Result = int_subscribe(TopicUri,Options,SessionId,State), {reply,Result,State}; handle_call({unsubscribe,SubscriptionId,SessionId}, _From, State) -> - Result = int_unsubscribe(SubscriptionId,SessionId,State), + Result = int_unsubscribe(SubscriptionId,SessionId,State), {reply,Result,State}; handle_call({unsubscribe_all, SessionId}, _From, #state{ets=_Ets} = State) -> - Result = unsubscribe_all_for(SessionId,State), + Result = unsubscribe_all_for(SessionId,State), {reply,Result,State}; handle_call(get_data, _From, #state{ets=Ets} = State) -> {reply,{ok,#data{ets=Ets, pid=self()}},State}; handle_call(get_subscriptions, _From, #state{ets=Ets} = State) -> - Exact = lists:flatten(ets:match(Ets,#topic{match = exact, id='$1', _='_'})), - Prefix = lists:flatten(ets:match(Ets,#topic{match = prefix, id='$1', _='_'})), - Wildcard = lists:flatten(ets:match(Ets,#topic{match = wildcard, id='$1', _='_'})), - {reply,{ok,#{exact => Exact, prefix => Prefix, wildcard => Wildcard}},State}; + Exact = lists:flatten(ets:match(Ets,#topic{match = exact, id='$1', _='_'})), + Prefix = lists:flatten(ets:match(Ets,#topic{match = prefix, id='$1', _='_'})), + Wildcard = lists:flatten(ets:match(Ets,#topic{match = wildcard, id='$1', _='_'})), + {reply,{ok,#{exact => Exact, prefix => Prefix, wildcard => Wildcard}},State}; handle_call({get_subscription,SubscriptionId}, _From, #state{ets=Ets} = State) -> - case ets:lookup(Ets,SubscriptionId) of - [#id_topic{id=SubscriptionId,topic=Uri}] -> - [#topic{uri=Uri,match=Match,created=Created,id=Id}] = ets:lookup(Ets,Uri), - {reply,{ok,#{uri => Uri, id => Id, match => Match, created => Created}},State}; - [] -> - {reply,{error,not_found},State} - end; + case ets:lookup(Ets,SubscriptionId) of + [#id_topic{id=SubscriptionId,topic=Uri}] -> + [#topic{uri=Uri,match=Match,created=Created,id=Id}] = ets:lookup(Ets,Uri), + {reply,{ok,#{uri => Uri, id => Id, match => Match, created => Created}},State}; + [] -> + {reply,{error,not_found},State} + end; handle_call(enable_metaevents, _From, State) -> - {reply,ok,State#state{meta_events=enabled}}; + {reply,ok,State#state{meta_events=enabled}}; handle_call(disable_metaevents, _From, State) -> - {reply,ok,State#state{meta_events=disabled}}; + {reply,ok,State#state{meta_events=disabled}}; handle_call(stop, _From, State) -> {stop,normal,{ok,stopped},State}; handle_call(_Request, _sFrom, State) -> @@ -247,400 +248,400 @@ code_change(_OldVsn, State, _Extra) -> -spec int_subscribe(TopicUri :: binary(), Options :: map(), SessionId::non_neg_integer(), State :: record(state)) -> - {ok, ID::non_neg_integer()} | {error, Reason :: term()}. + {ok, ID::non_neg_integer()} | {error, Reason :: term()}. int_subscribe(TopicUri,Options,SessionId,#state{ets=Ets}=State) -> - Match = maps:get(match,Options,exact), - case Match of - prefix -> - {error,not_supported}; - wildcard -> - {error,not_supported}; - exact -> - {SubscriptionId,Created,TopicDetails} = case ets:lookup(Ets,TopicUri) of - [#topic{id=SID,subscribers=Subs}=T] -> - NewSubs = [SessionId|lists:delete(SessionId,Subs)], - ets:insert(Ets,T#topic{subscribers=NewSubs}), - {SID,false,not_needed}; - [] -> - {ok,SID,TDetails} = create_topic(TopicUri,Match,[SessionId],State), - {SID,true,TDetails} - end, - ok = add_topic_to_session(TopicUri,SessionId,State), - case Created of - true -> - publish_metaevent(on_create,TopicUri,SessionId,TopicDetails,State); - false -> nothing - end, - publish_metaevent(on_subscribe,TopicUri,SessionId,SubscriptionId,State), - {ok,SubscriptionId} - end. + Match = maps:get(match,Options,exact), + case Match of + prefix -> + {error,not_supported}; + wildcard -> + {error,not_supported}; + exact -> + {SubscriptionId,Created,TopicDetails} = case ets:lookup(Ets,TopicUri) of + [#topic{id=SID,subscribers=Subs}=T] -> + NewSubs = [SessionId|lists:delete(SessionId,Subs)], + ets:insert(Ets,T#topic{subscribers=NewSubs}), + {SID,false,not_needed}; + [] -> + {ok,SID,TDetails} = create_topic(TopicUri,Match,[SessionId],State), + {SID,true,TDetails} + end, + ok = add_topic_to_session(TopicUri,SessionId,State), + case Created of + true -> + publish_metaevent(on_create,TopicUri,SessionId,TopicDetails,State); + false -> nothing + end, + publish_metaevent(on_subscribe,TopicUri,SessionId,SubscriptionId,State), + {ok,SubscriptionId} + end. -spec int_unsubscribe(IdOrTopic :: non_neg_integer() | binary(), SessionId :: non_neg_integer(), State :: record(state)) -> - ok | {error, Reason :: term()}. + ok | {error, Reason :: term()}. int_unsubscribe(SubscriptionId,SessionId,#state{ets=Ets}=State) when is_integer(SubscriptionId) -> - case ets:lookup(Ets,SubscriptionId) of - [#id_topic{id=SubscriptionId,topic=TopicUri}] -> - int_unsubscribe(TopicUri,SessionId,State); - [] -> - {error,not_found} - end; + case ets:lookup(Ets,SubscriptionId) of + [#id_topic{id=SubscriptionId,topic=TopicUri}] -> + int_unsubscribe(TopicUri,SessionId,State); + [] -> + {error,not_found} + end; int_unsubscribe(TopicUri,SessionId,#state{ets=Ets}=State) when is_binary(TopicUri) -> - [#topic{subscribers=Subs,id=SubscriptionId,uri=TopicUri}=T] = ets:lookup(Ets,TopicUri), - case {lists:member(SessionId,Subs),lists:delete(SessionId,Subs)} of - {false,_} -> - {error, not_subscribed}; - {true,[]} -> - ok = remove_topic_from_session(TopicUri,SessionId,State), - true = ets:delete(Ets,SubscriptionId), - true = ets:delete(Ets,TopicUri), - publish_metaevent(on_unsubscribe,TopicUri,SessionId,SubscriptionId,State), - publish_metaevent(on_delete,TopicUri,SessionId,SubscriptionId,State), - ok; - {true,NewSubs} -> - ok = remove_topic_from_session(TopicUri,SessionId,State), - true = ets:insert(Ets,T#topic{subscribers=NewSubs}), - publish_metaevent(on_unsubscribe,TopicUri,SessionId,SubscriptionId,State), - ok - end. + [#topic{subscribers=Subs,id=SubscriptionId,uri=TopicUri}=T] = ets:lookup(Ets,TopicUri), + case {lists:member(SessionId,Subs),lists:delete(SessionId,Subs)} of + {false,_} -> + {error, not_subscribed}; + {true,[]} -> + ok = remove_topic_from_session(TopicUri,SessionId,State), + true = ets:delete(Ets,SubscriptionId), + true = ets:delete(Ets,TopicUri), + publish_metaevent(on_unsubscribe,TopicUri,SessionId,SubscriptionId,State), + publish_metaevent(on_delete,TopicUri,SessionId,SubscriptionId,State), + ok; + {true,NewSubs} -> + ok = remove_topic_from_session(TopicUri,SessionId,State), + true = ets:insert(Ets,T#topic{subscribers=NewSubs}), + publish_metaevent(on_unsubscribe,TopicUri,SessionId,SubscriptionId,State), + ok + end. -spec unsubscribe_all_for( SessionId :: non_neg_integer(), State :: record(state)) -> - ok | {error, Reason :: term()}. + ok | {error, Reason :: term()}. unsubscribe_all_for(SessionId,#state{ets=Ets}=State) -> - case ets:lookup(Ets,{sess,SessionId}) of - [#id_info{topics=Topics}] -> - F = fun(TopicUri) -> - ok = int_unsubscribe(TopicUri,SessionId,State), - false - end, - lists:filter(F,Topics), - ok; - [] -> - ok - end. + case ets:lookup(Ets,{sess,SessionId}) of + [#id_info{topics=Topics}] -> + F = fun(TopicUri) -> + ok = int_unsubscribe(TopicUri,SessionId,State), + false + end, + lists:filter(F,Topics), + ok; + [] -> + ok + end. create_topic(Uri,Match,Sessions,#state{ets=Ets}=State) -> - ID = gen_id(), - Created = erlang:universaltime(), - Topic = #topic{uri=Uri,id=ID,match=Match,created=Created,subscribers=Sessions}, - case ets:insert_new(Ets,[#id_topic{id=ID,topic=Uri},Topic]) of - true -> - {ok,ID,#{id => ID, - created => cowboy_clock:rfc1123(Created), - uri => Uri, - match => Match}}; - false -> - create_topic(Uri,Match,Sessions,State) - end. + ID = gen_id(), + Created = erlang:universaltime(), + Topic = #topic{uri=Uri,id=ID,match=Match,created=Created,subscribers=Sessions}, + case ets:insert_new(Ets,[#id_topic{id=ID,topic=Uri},Topic]) of + true -> + {ok,ID,#{id => ID, + created => cowboy_clock:rfc1123(Created), + uri => Uri, + match => Match}}; + false -> + create_topic(Uri,Match,Sessions,State) + end. add_topic_to_session(Topic,SessionId,#state{ets=Ets}) -> - case ets:lookup(Ets,{sess,SessionId}) of - [#id_info{topics=Topics} = IdInf] -> - true = ets:insert(Ets,IdInf#id_info{topics=[Topic|lists:delete(Topic,Topics)]}), - ok; - [] -> - IdInf = #id_info{id={sess,SessionId},topics=[Topic]}, - true = ets:insert_new(Ets,IdInf), - ok - end. + case ets:lookup(Ets,{sess,SessionId}) of + [#id_info{topics=Topics} = IdInf] -> + true = ets:insert(Ets,IdInf#id_info{topics=[Topic|lists:delete(Topic,Topics)]}), + ok; + [] -> + IdInf = #id_info{id={sess,SessionId},topics=[Topic]}, + true = ets:insert_new(Ets,IdInf), + ok + end. remove_topic_from_session(Topic,SessionId,#state{ets=Ets}) -> - [#id_info{topics=Topics} = IdInf] = ets:lookup(Ets,{sess,SessionId}), - case lists:delete(Topic,Topics) of - [] -> - true = ets:delete(Ets,{sess,SessionId}); - NewTopics -> - true = ets:insert(Ets,IdInf#id_info{topics=NewTopics}) - end, - ok. + [#id_info{topics=Topics} = IdInf] = ets:lookup(Ets,{sess,SessionId}), + case lists:delete(Topic,Topics) of + [] -> + true = ets:delete(Ets,{sess,SessionId}); + NewTopics -> + true = ets:insert(Ets,IdInf#id_info{topics=NewTopics}) + end, + ok. gen_id() -> - crypto:rand_uniform(0,9007199254740992). + crypto:rand_uniform(0,9007199254740992). publish_metaevent(_,_,_,_,#state{meta_events=disabled}) -> - ok; + ok; publish_metaevent(Event,TopicUri,SessionId,SecondArg,#state{ets=Ets}) -> - case binary:part(TopicUri,{0,5}) == <<"wamp.">> of - true -> - % do not fire metaevents on wamp. uris - ok; - false -> - MetaTopic = case Event of - on_create -> <<"wamp.subscription.on_create">>; - on_subscribe -> <<"wamp.subscription.on_subscribe">>; - on_unsubscribe -> <<"wamp.subscription.on_unsubscribe">>; - on_delete -> <<"wamp.subscription.on_delete">> - end, - {ok,_} = publish(MetaTopic,#{},[SessionId,SecondArg],undefined,no_session,#data{ets=Ets}) - end, - ok. + case binary:part(TopicUri,{0,5}) == <<"wamp.">> of + true -> + % do not fire metaevents on wamp. uris + ok; + false -> + MetaTopic = case Event of + on_create -> <<"wamp.subscription.on_create">>; + on_subscribe -> <<"wamp.subscription.on_subscribe">>; + on_unsubscribe -> <<"wamp.subscription.on_unsubscribe">>; + on_delete -> <<"wamp.subscription.on_delete">> + end, + {ok,_} = publish(MetaTopic,#{},[SessionId,SecondArg],undefined,no_session,#data{ets=Ets}) + end, + ok. -ifdef(TEST). get_tablesize(#data{ets=Ets}) -> - ets:info(Ets,size). + ets:info(Ets,size). start_stop_test() -> - {ok,Pid} = start(), - {ok,Data} = get_data(Pid), - 0 = get_tablesize(Data), - ok = enable_metaevents(Data), - {ok,stopped} = stop(Data). + {ok,Pid} = start(), + {ok,Data} = get_data(Pid), + 0 = get_tablesize(Data), + ok = enable_metaevents(Data), + {ok,stopped} = stop(Data). un_subscribe_test() -> - {ok,Pid} = start(), - {ok,Data} = get_data(Pid), - ok = disable_metaevents(Data), - SessionId = gen_id(), - 0 = get_tablesize(Data), - {ok,ID1} = subscribe(<<"topic.test1">>,#{},SessionId,Data), - 3 = get_tablesize(Data), - {ok,ID2} = subscribe(<<"topic.test2">>,#{},SessionId,Data), - 5 = get_tablesize(Data), - ok = unsubscribe(ID1,SessionId,Data), - 3 = get_tablesize(Data), - {error,not_found} = unsubscribe(ID1,SessionId,Data), - ok = unsubscribe(ID2,SessionId,Data), - 0 = get_tablesize(Data), - {error,not_found} = unsubscribe(ID2,SessionId,Data), - 0 = get_tablesize(Data), - {ok,stopped} = stop(Data). + {ok,Pid} = start(), + {ok,Data} = get_data(Pid), + ok = disable_metaevents(Data), + SessionId = gen_id(), + 0 = get_tablesize(Data), + {ok,ID1} = subscribe(<<"topic.test1">>,#{},SessionId,Data), + 3 = get_tablesize(Data), + {ok,ID2} = subscribe(<<"topic.test2">>,#{},SessionId,Data), + 5 = get_tablesize(Data), + ok = unsubscribe(ID1,SessionId,Data), + 3 = get_tablesize(Data), + {error,not_found} = unsubscribe(ID1,SessionId,Data), + ok = unsubscribe(ID2,SessionId,Data), + 0 = get_tablesize(Data), + {error,not_found} = unsubscribe(ID2,SessionId,Data), + 0 = get_tablesize(Data), + {ok,stopped} = stop(Data). unsubscribe_all_test() -> - {ok,Pid} = start(), - {ok,Data} = get_data(Pid), - ok = disable_metaevents(Data), - SessionId = gen_id(), - 0 = get_tablesize(Data), - ok = unsubscribe_all(SessionId,Data), - 0 = get_tablesize(Data), - {ok,ID1} = subscribe(<<"topic.test1">>,#{},SessionId,Data), - 3 = get_tablesize(Data), - {ok,ID2} = subscribe(<<"topic.test2">>,#{},SessionId,Data), - 5 = get_tablesize(Data), - ok = unsubscribe_all(SessionId,Data), - 0 = get_tablesize(Data), - {error,not_found} = unsubscribe(ID1,SessionId,Data), - 0 = get_tablesize(Data), - {error,not_found} = unsubscribe(ID2,SessionId,Data), - 0 = get_tablesize(Data), - ok = unsubscribe_all(SessionId,Data), - 0 = get_tablesize(Data), - {ok,stopped} = stop(Data). + {ok,Pid} = start(), + {ok,Data} = get_data(Pid), + ok = disable_metaevents(Data), + SessionId = gen_id(), + 0 = get_tablesize(Data), + ok = unsubscribe_all(SessionId,Data), + 0 = get_tablesize(Data), + {ok,ID1} = subscribe(<<"topic.test1">>,#{},SessionId,Data), + 3 = get_tablesize(Data), + {ok,ID2} = subscribe(<<"topic.test2">>,#{},SessionId,Data), + 5 = get_tablesize(Data), + ok = unsubscribe_all(SessionId,Data), + 0 = get_tablesize(Data), + {error,not_found} = unsubscribe(ID1,SessionId,Data), + 0 = get_tablesize(Data), + {error,not_found} = unsubscribe(ID2,SessionId,Data), + 0 = get_tablesize(Data), + ok = unsubscribe_all(SessionId,Data), + 0 = get_tablesize(Data), + {ok,stopped} = stop(Data). multiple_un_subscribe_test() -> - erwa_sessions:start_link(), - {ok,Pid} = start(), - {ok,Data} = get_data(Pid), - ok = disable_metaevents(Data), - {ok,SessionId} = erwa_sessions:register_session(<<"erwa.test">>), - 0 = get_tablesize(Data), - {ok,ID1} = subscribe(<<"topic.test1">>,#{},SessionId,Data), - 3 = get_tablesize(Data), - {ok,ID2} = subscribe(<<"topic.test2">>,#{},SessionId,Data), - 5 = get_tablesize(Data), - MyPid = self(), - F = fun() -> - {ok,S2} = erwa_sessions:register_session(<<"erwa.test">>), - {ok,ID3} = erwa_broker:subscribe(<<"topic.test1">>,#{},S2,Data), - MyPid ! {first_subscription,ID3}, - receive - after 200 -> ok - end, - {ok,ID4} = erwa_broker:subscribe(<<"topic.test2">>,#{},S2,Data), - MyPid ! {second_subscription,ID4}, - receive - after 200 -> ok - end, - ok = erwa_broker:unsubscribe_all(S2,Data), - MyPid ! done, - ok - end, - spawn(F), - receive - {first_subscription,ID1} -> - ok - end, - 6 = get_tablesize(Data), - receive - {second_subscription,ID2} -> - ok - end, - 6 = get_tablesize(Data), - receive - done -> - ok - end, - 5 = get_tablesize(Data), - ok = unsubscribe(ID1,SessionId,Data), - 3 = get_tablesize(Data), - ok = unsubscribe_all(SessionId,Data), - 0 = get_tablesize(Data), - erwa_sessions:stop(), - {ok,stopped} = stop(Data). + erwa_sessions:start_link(), + {ok,Pid} = start(), + {ok,Data} = get_data(Pid), + ok = disable_metaevents(Data), + {ok,SessionId} = erwa_sessions:register_session(<<"erwa.test">>), + 0 = get_tablesize(Data), + {ok,ID1} = subscribe(<<"topic.test1">>,#{},SessionId,Data), + 3 = get_tablesize(Data), + {ok,ID2} = subscribe(<<"topic.test2">>,#{},SessionId,Data), + 5 = get_tablesize(Data), + MyPid = self(), + F = fun() -> + {ok,S2} = erwa_sessions:register_session(<<"erwa.test">>), + {ok,ID3} = erwa_broker:subscribe(<<"topic.test1">>,#{},S2,Data), + MyPid ! {first_subscription,ID3}, + receive + after 200 -> ok + end, + {ok,ID4} = erwa_broker:subscribe(<<"topic.test2">>,#{},S2,Data), + MyPid ! {second_subscription,ID4}, + receive + after 200 -> ok + end, + ok = erwa_broker:unsubscribe_all(S2,Data), + MyPid ! done, + ok + end, + spawn(F), + receive + {first_subscription,ID1} -> + ok + end, + 6 = get_tablesize(Data), + receive + {second_subscription,ID2} -> + ok + end, + 6 = get_tablesize(Data), + receive + done -> + ok + end, + 5 = get_tablesize(Data), + ok = unsubscribe(ID1,SessionId,Data), + 3 = get_tablesize(Data), + ok = unsubscribe_all(SessionId,Data), + 0 = get_tablesize(Data), + erwa_sessions:stop(), + {ok,stopped} = stop(Data). publish_test() -> - erwa_sessions:start_link(), - {ok,_} = erwa_publications:start(), - {ok,Pid} = start(), - {ok,Data} = get_data(Pid), - ok = disable_metaevents(Data), - {ok,SessionId} = erwa_sessions:register_session(<<"erwa.test">>), - {ok,ID} = erwa_broker:subscribe(<<"topic.test1">>,#{},SessionId,Data), - MyPid = self(), - F = fun() -> - {ok,S2} = erwa_sessions:register_session(<<"erwa.test">>), - {ok,ID} = erwa_broker:subscribe(<<"topic.test1">>,#{},S2,Data), - MyPid ! subscribed, - receive - {erwa,{event,ID,PubId,#{},undefined,undefined}} -> - MyPid ! {received,PubId} - end, - ok = erwa_broker:unsubscribe_all(S2,Data), - ok - end, - spawn(F), - receive - subscribed -> ok - end, - {ok,PublicationID1} = publish(<<"topic.test1">>,#{},undefined,undefined,SessionId,Data), - receive - {received,PublicationID1} -> ok - end, - {ok,PublicationID2} = publish(<<"topic.test1">>,#{exclude_me=>false},undefined,undefined,SessionId,Data), - ok = receive - {erwa,{event,ID,PublicationID2,#{},undefined,undefined}} -> - ok - end, - erwa_sessions:stop(), - {ok,stopped} = stop(Data), - {ok,stopped} = erwa_publications:stop(). + erwa_sessions:start_link(), + {ok,_} = erwa_publications:start(), + {ok,Pid} = start(), + {ok,Data} = get_data(Pid), + ok = disable_metaevents(Data), + {ok,SessionId} = erwa_sessions:register_session(<<"erwa.test">>), + {ok,ID} = erwa_broker:subscribe(<<"topic.test1">>,#{},SessionId,Data), + MyPid = self(), + F = fun() -> + {ok,S2} = erwa_sessions:register_session(<<"erwa.test">>), + {ok,ID} = erwa_broker:subscribe(<<"topic.test1">>,#{},S2,Data), + MyPid ! subscribed, + receive + {erwa,{event,ID,PubId,#{},undefined,undefined}} -> + MyPid ! {received,PubId} + end, + ok = erwa_broker:unsubscribe_all(S2,Data), + ok + end, + spawn(F), + receive + subscribed -> ok + end, + {ok,PublicationID1} = publish(<<"topic.test1">>,#{},undefined,undefined,SessionId,Data), + receive + {received,PublicationID1} -> ok + end, + {ok,PublicationID2} = publish(<<"topic.test1">>,#{exclude_me=>false},undefined,undefined,SessionId,Data), + ok = receive + {erwa,{event,ID,PublicationID2,#{},undefined,undefined}} -> + ok + end, + erwa_sessions:stop(), + {ok,stopped} = stop(Data), + {ok,stopped} = erwa_publications:stop(). exclude_test() -> - erwa_sessions:start_link(), - {ok,_} = erwa_publications:start(), - {ok,Pid} = start(), - {ok,Data} = get_data(Pid), - ok = disable_metaevents(Data), - {ok,SessionId1} = erwa_sessions:register_session(<<"erwa.test">>), - {ok,SessionId2} = erwa_sessions:register_session(<<"erwa.test">>), - {ok,ID} = erwa_broker:subscribe(<<"topic.test1">>,#{},SessionId1,Data), - MyPid = self(), - F = fun() -> - {ok,ID} = erwa_broker:subscribe(<<"topic.test1">>,#{},SessionId2,Data), - MyPid ! subscribed, - Received = receive - {erwa,{event,ID,_,#{},undefined,undefined}} -> - true; - got_something -> - MyPid ! nothing, - false - end, - case Received of - true -> - receive - {got_something} -> - MyPid ! yes_got_it - end; - false -> - ok - end, - ok = erwa_broker:unsubscribe_all(SessionId2,Data), - MyPid ! done, - ok - end, - ClientPid = spawn(F), - receive - subscribed -> ok - end, - {ok,PubID} = publish(<<"topic.test1">>,#{exclude_me => false,exclude => [SessionId2]},undefined,undefined,SessionId1,Data), - ok = receive - {erwa,{event,ID,PubID,#{},undefined,undefined}} -> - ok - end, - receive - after 100 -> ok - end, - ClientPid ! got_something, - ok = receive - nothing -> ok; - yes_got_it -> wrong - end, - erwa_sessions:stop(), - {ok,stopped} = stop(Data), - {ok,stopped} = erwa_publications:stop(). + erwa_sessions:start_link(), + {ok,_} = erwa_publications:start(), + {ok,Pid} = start(), + {ok,Data} = get_data(Pid), + ok = disable_metaevents(Data), + {ok,SessionId1} = erwa_sessions:register_session(<<"erwa.test">>), + {ok,SessionId2} = erwa_sessions:register_session(<<"erwa.test">>), + {ok,ID} = erwa_broker:subscribe(<<"topic.test1">>,#{},SessionId1,Data), + MyPid = self(), + F = fun() -> + {ok,ID} = erwa_broker:subscribe(<<"topic.test1">>,#{},SessionId2,Data), + MyPid ! subscribed, + Received = receive + {erwa,{event,ID,_,#{},undefined,undefined}} -> + true; + got_something -> + MyPid ! nothing, + false + end, + case Received of + true -> + receive + {got_something} -> + MyPid ! yes_got_it + end; + false -> + ok + end, + ok = erwa_broker:unsubscribe_all(SessionId2,Data), + MyPid ! done, + ok + end, + ClientPid = spawn(F), + receive + subscribed -> ok + end, + {ok,PubID} = publish(<<"topic.test1">>,#{exclude_me => false,exclude => [SessionId2]},undefined,undefined,SessionId1,Data), + ok = receive + {erwa,{event,ID,PubID,#{},undefined,undefined}} -> + ok + end, + receive + after 100 -> ok + end, + ClientPid ! got_something, + ok = receive + nothing -> ok; + yes_got_it -> wrong + end, + erwa_sessions:stop(), + {ok,stopped} = stop(Data), + {ok,stopped} = erwa_publications:stop(). eligible_test() -> - erwa_sessions:start_link(), - {ok,_} = erwa_publications:start(), - {ok,Pid} = start(), - {ok,Data} = get_data(Pid), - ok = disable_metaevents(Data), - {ok, SessionId1} = erwa_sessions:register_session(<<"erwa.test">>), - {ok, SessionId2} = erwa_sessions:register_session(<<"erwa.test">>), - - {ok,ID} = erwa_broker:subscribe(<<"topic.test1">>,#{},SessionId1,Data), - MyPid = self(), - F = fun() -> - {ok,ID} = erwa_broker:subscribe(<<"topic.test1">>,#{},SessionId2,Data), - MyPid ! subscribed, - Received = receive - {erwa,{event,ID,_,[],undefined,undefined}} -> - true; - got_something -> - MyPid ! nothing, - false - end, - case Received of - true -> - receive - {got_something} -> - MyPid ! yes_got_it - end; - false -> - ok - end, - ok = erwa_broker:unsubscribe_all(SessionId2,Data), - MyPid ! done, - ok - end, - ClientPid = spawn(F), - receive - subscribed -> ok - end, - {ok,PubID} = publish(<<"topic.test1">>,#{exclude_me=>false,eligible=>[SessionId1]},undefined,undefined,SessionId1,Data), - ok = receive - {erwa,{event,ID,PubID,#{},undefined,undefined}} -> - ok - end, - receive - after 100 -> ok - end, - ClientPid ! got_something, - ok = receive - nothing -> ok; - yes_got_it -> wrong - end, - erwa_sessions:stop(), - {ok,stopped} = stop(Data), - {ok,stopped} = erwa_publications:stop(). + erwa_sessions:start_link(), + {ok,_} = erwa_publications:start(), + {ok,Pid} = start(), + {ok,Data} = get_data(Pid), + ok = disable_metaevents(Data), + {ok, SessionId1} = erwa_sessions:register_session(<<"erwa.test">>), + {ok, SessionId2} = erwa_sessions:register_session(<<"erwa.test">>), + + {ok,ID} = erwa_broker:subscribe(<<"topic.test1">>,#{},SessionId1,Data), + MyPid = self(), + F = fun() -> + {ok,ID} = erwa_broker:subscribe(<<"topic.test1">>,#{},SessionId2,Data), + MyPid ! subscribed, + Received = receive + {erwa,{event,ID,_,[],undefined,undefined}} -> + true; + got_something -> + MyPid ! nothing, + false + end, + case Received of + true -> + receive + {got_something} -> + MyPid ! yes_got_it + end; + false -> + ok + end, + ok = erwa_broker:unsubscribe_all(SessionId2,Data), + MyPid ! done, + ok + end, + ClientPid = spawn(F), + receive + subscribed -> ok + end, + {ok,PubID} = publish(<<"topic.test1">>,#{exclude_me=>false,eligible=>[SessionId1]},undefined,undefined,SessionId1,Data), + ok = receive + {erwa,{event,ID,PubID,#{},undefined,undefined}} -> + ok + end, + receive + after 100 -> ok + end, + ClientPid ! got_something, + ok = receive + nothing -> ok; + yes_got_it -> wrong + end, + erwa_sessions:stop(), + {ok,stopped} = stop(Data), + {ok,stopped} = erwa_publications:stop(). garbage_test() -> - {ok,Pid} = start(), - ignored = gen_server:call(Pid,some_garbage), - ok = gen_server:cast(Pid,some_garbage), - Pid ! some_garbage, - {ok,stopped} = stop(Pid). + {ok,Pid} = start(), + ignored = gen_server:call(Pid,some_garbage), + ok = gen_server:cast(Pid,some_garbage), + Pid ! some_garbage, + {ok,stopped} = stop(Pid). -endif. diff --git a/src/erwa_realms.erl b/src/erwa_realms.erl index d011e83..06fcb57 100644 --- a/src/erwa_realms.erl +++ b/src/erwa_realms.erl @@ -61,7 +61,7 @@ -spec add(Name :: binary() ) -> ok | {error, Reason :: term() }. add(Name) -> - MW_List = get_env(erwa,router_middleware,[erwa_mw_default]), + MW_List = application:get_env(erwa,router_middleware,[erwa_mw_default]), add(Name,MW_List). -spec add(Name :: binary(), Middlewares :: [atom()] ) -> ok | {error, Reason :: term() }. @@ -107,7 +107,7 @@ stop() -> %% gen_server. init([]) -> - AutoCreate = get_env(erwa,realm_autocreate,false), + AutoCreate = application:get_env(erwa,realm_autocreate,false), Ets = ets:new(realms,[set]), {ok, #state{ets=Ets, autocreate_realm=AutoCreate}}. @@ -194,7 +194,7 @@ get_realm_data(Tag,Name,#state{ets=Ets, autocreate_realm=AC}=State) -> [] -> case AC of true -> - MW_List = get_env(erwa,router_middleware,[erwa_mw_default]), + MW_List = application:get_env(erwa,router_middleware,[erwa_mw_default]), ok = create_new_realm(Name,MW_List,State), get_realm_data(Tag,Name,State); false -> @@ -235,14 +235,6 @@ stop_realm({monitor,Ref},clean_up,#state{ets=Ets}) -> -get_env(Application, Pararmeter, Default) when is_atom(Application), is_atom(Pararmeter) -> - case application:get_env(Application,Pararmeter) of - undefined -> Default; - {ok, Value} -> Value - end. - - - -ifdef(TEST). get_tablesize() -> @@ -271,9 +263,9 @@ start_stop_test() -> environment_test() -> application:set_env(erwa,router_middleware,[erwa_mw_allow]), - [erwa_mw_allow] = get_env(erwa,router_middleware,[erwa_mw_default]), + [erwa_mw_allow] = application:get_env(erwa,router_middleware,[erwa_mw_default]), application:unset_env(erwa,router_middleware), - [erwa_mw_default] = get_env(erwa,router_middleware,[erwa_mw_default]). + [erwa_mw_default] = application:get_env(erwa,router_middleware,[erwa_mw_default]). garbage_test() -> {ok,Pid} = start(), diff --git a/src/erwa_session.erl b/src/erwa_session.erl index 81b4249..3d8c0d4 100644 --- a/src/erwa_session.erl +++ b/src/erwa_session.erl @@ -199,29 +199,26 @@ check_out_message(Result, Msg,State) -> hndl_msg({hello,RealmName,Details}, #state{trans=Transport} = State) -> AuthId = maps:get(authid, Details, anonymous), Roles = maps:get(roles, Details, []), - case ( AuthId == anonymous ) or ( AuthId == <<"anonymous">> ) of - true -> - case erwa_user_db:allow_anonymous(RealmName,Transport) of - true -> - case erwa_realms:get_routing(RealmName) of - {ok,RoutingPid} -> - % the realm does exist - State1 = create_session(RoutingPid,RealmName,Roles,State), - #state{id=SessionId, broker=Broker, dealer=Dealer}=State1, - BrokerFeat = erwa_broker:get_features(Broker), - DealerFeat = erwa_dealer:get_features(Dealer), - SessionData = #{authid => anonymous, role => anonymous, session => - SessionId}, - WelcomeMsg ={welcome,SessionId,#{agent => erwa:get_version(), roles => #{broker => BrokerFeat, dealer => DealerFeat}}}, - {reply,WelcomeMsg,State1#state{is_auth=true, - session_data=SessionData}}; - {error,_} -> - {reply_stop, {abort, #{}, no_such_realm},State} - end; - false -> - {reply_stop, {abort, #{}, no_such_realm}, State} + case {AuthId == anonymous, erwa_user_db:allow_anonymous(RealmName, Transport)} of + {true, true} -> + case erwa_realms:get_routing(RealmName) of + {ok,RoutingPid} -> + % the realm does exist + State1 = create_session(RoutingPid,RealmName,Roles,State), + #state{id=SessionId, broker=Broker, dealer=Dealer}=State1, + BrokerFeat = erwa_broker:get_features(Broker), + DealerFeat = erwa_dealer:get_features(Dealer), + SessionData = #{authid => anonymous, role => anonymous, session => + SessionId}, + WelcomeMsg ={welcome,SessionId,#{agent => erwa:get_version(), roles => #{broker => BrokerFeat, dealer => DealerFeat}}}, + {reply,WelcomeMsg,State1#state{is_auth=true, + session_data=SessionData}}; + {error,_} -> + {reply_stop, {abort, #{}, no_such_realm},State} end; - false -> + {true, false} -> + {reply_stop, {abort, #{}, no_such_realm}, State}; + {false,_} -> AuthMethods = maps:get(authmethods, Details, []), authenticate(AuthMethods, RealmName, Details, State) end; @@ -251,20 +248,18 @@ hndl_msg(_Msg,State) -> {stop,State}. - - authenticate([], _RealmName, _Details, State) -> {reply_stop, {abort, #{}, no_such_realm}, State}; authenticate([wampcra|_], RealmName, Details, #state{trans=Transport }=State) -> AuthId = maps:get(authid, Details, anonymous), - Roles = maps:get(roles, Details, []), + ClientRoles = maps:get(roles, Details, []), case erwa_user_db:can_join(AuthId, RealmName, Transport ) of {true, Role} -> case erwa_realms:get_routing(RealmName) of {ok,RoutingPid} -> % the realm does exist - State1 = create_session(RoutingPid,RealmName,Roles,State), + State1 = create_session(RoutingPid,RealmName,ClientRoles,State), #state{id = SessionId} = State1, % a user that needs to authenticate % need to create a a challenge diff --git a/src/erwa_sup.erl b/src/erwa_sup.erl index f647342..9aa9822 100644 --- a/src/erwa_sup.erl +++ b/src/erwa_sup.erl @@ -31,6 +31,7 @@ %% supervisor. -export([init/1]). +-define( CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). %% API. -spec start_link() -> {ok, pid()}. @@ -40,11 +41,19 @@ start_link() -> %% supervisor. init([]) -> -Procs = [{sessions,{erwa_sessions,start_link,[]},permanent,5000,worker,[]}, - {publications,{erwa_publications,start_link,[]},permanent,5000,worker,[]}, - {invocation_sup,{erwa_invocation_sup,start_link,[]},permanent,5000,supervisor,[]}, - {realms_sup,{erwa_routing_sup,start_link,[]},permanent,5000,supervisor,[]}, - {realms,{erwa_realms,start_link,[]},permanent,5000,worker,[]}, - {user_db,{erwa_user_db,start_link,[]},permanent,5000,worker,[]} + Sessions = ?CHILD(erwa_sessions, worker), + Publications = ?CHILD(erwa_publications, worker), + InvocationSup = ?CHILD(erwa_invocation_sup, supervisor), + RoutingSup = ?CHILD(erwa_routing_sup, supervisor), + Realms = ?CHILD(erwa_realms, worker), + UserDb = ?CHILD(erwa_user_db, worker), + +Procs = [ + Sessions, + Publications, + InvocationSup, + RoutingSup, + Realms, + UserDb ], {ok, {{one_for_one, 10, 10}, Procs}}.