Skip to content

Commit

Permalink
WHISTLE-250: fix devices and registrations loading of registrations f…
Browse files Browse the repository at this point in the history
…rom AMQP query
  • Loading branch information
James Aimonetti committed Nov 17, 2011
1 parent 03a6579 commit dd17981
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 308 deletions.
240 changes: 49 additions & 191 deletions whistle_apps/apps/crossbar/src/crossbar_bindings.erl
Expand Up @@ -41,9 +41,13 @@

-define(SERVER, ?MODULE).

-type binding_result() :: tuple(binding_result, term(), term()).
-type binding() :: tuple(binary(), queue()). %% queue(pid() | atom())
-type bindings() :: list(binding()) | [].
-type binding_result() :: {'binding_result', term(), term()}.

%% {FullBinding, BindingPieces, QueueOfPids}
%% {<<"foo.bar.#">>, [<<"foo">>, <<"bar">>, <<"#">>], queue()}
-type binding() :: {ne_binary(), [ne_binary(),...], queue()}. %% queue(pid() | atom())
-type bindings() :: [binding(),...] | [].


-record(state, {bindings = [] :: bindings()}).

Expand All @@ -58,9 +62,7 @@
%% is the payload, possibly modified
%% @end
%%--------------------------------------------------------------------
-spec map/2 :: (Routing, Payload) -> [{term(), term()},...] when
Routing :: binary(),
Payload :: term().
-spec map/2 :: (ne_binary(), term()) -> [{term(), term()},...].
map(Routing, Payload) ->
gen_server:call(?MODULE, {map, Routing, Payload, get(callid)}, infinity).

Expand All @@ -71,9 +73,7 @@ map(Routing, Payload) ->
%% all matching bindings
%% @end
%%--------------------------------------------------------------------
-spec fold/2 :: (Routing, Payload) -> term() when
Routing :: binary(),
Payload :: term().
-spec fold/2 :: (ne_binary(), term()) -> term().
fold(Routing, Payload) ->
gen_server:call(?MODULE, {fold, Routing, Payload, get(callid)}, infinity).

Expand All @@ -82,20 +82,16 @@ fold(Routing, Payload) ->
%% Helper functions for working on a result set of bindings
%% @end
%%-------------------------------------------------------------------
-spec any/1 :: (Res) -> boolean() when
Res :: proplist().
-spec any/1 :: (proplist()) -> boolean().
any(Res) when is_list(Res) -> lists:any(fun check_bool/1, Res).

-spec all/1 :: (Res) -> boolean() when
Res :: proplist().
-spec all/1 :: (proplist()) -> boolean().
all(Res) when is_list(Res) -> lists:all(fun check_bool/1, Res).

-spec failed/1 :: (Res) -> proplist_bool() when
Res :: proplist_bool().
-spec failed/1 :: (proplist_bool()) -> proplist_bool().
failed(Res) when is_list(Res) -> [R || R <- Res, filter_out_succeeded(R)].

-spec succeeded/1 :: (Res) -> proplist_bool() when
Res :: proplist_bool().
-spec succeeded/1 :: (proplist_bool()) -> proplist_bool().
succeeded(Res) when is_list(Res) -> [R || R <- Res, filter_out_failed(R)].

%%--------------------------------------------------------------------
Expand All @@ -111,17 +107,15 @@ start_link() ->
stop() ->
gen_server:cast(?SERVER, stop).

-spec bind/1 :: (Binding) -> ok | {error, exists} when
Binding :: binary().
-spec bind/1 :: (ne_binary()) -> 'ok' | {'error', 'exists'}.
bind(Binding) ->
gen_server:call(?MODULE, {bind, Binding}, infinity).

-spec flush/0 :: () -> ok.
-spec flush/0 :: () -> 'ok'.
flush() ->
gen_server:cast(?MODULE, flush).

-spec flush/1 :: (Binding) -> ok when
Binding :: binary().
-spec flush/1 :: (ne_binary()) -> 'ok'.
flush(Binding) ->
gen_server:cast(?MODULE, {flush, Binding}).

Expand Down Expand Up @@ -163,9 +157,12 @@ handle_call({map, Routing, Payload, ReqId}, From , #state{bindings=Bs}=State) ->
put(callid, ReqId),
?TIMER_START(list_to_binary(["bindings.map ", Routing])),
?LOG("running map for binding ~s", [Routing]),

RoutingParts = lists:reverse(binary:split(Routing, <<".">>, [global])),

Reply = lists:foldl(
fun({B, Ps}, Acc) ->
case binding_matches(B, Routing) of
fun({B, BParts, Ps}, Acc) ->
case B =:= Routing orelse matches(BParts, RoutingParts) of
true ->
map_bind_results(Ps, Payload, Acc, Routing);
false ->
Expand All @@ -181,9 +178,12 @@ handle_call({fold, Routing, Payload, ReqId}, From , #state{bindings=Bs}=State) -
put(callid, ReqId),
?TIMER_START(list_to_binary(["bindings.fold ", Routing])),
?LOG("running fold for binding ~s", [Routing]),

RoutingParts = lists:reverse(binary:split(Routing, <<".">>, [global])),

Reply = lists:foldl(
fun({B, Ps}, Acc) ->
case binding_matches(B, Routing) of
fun({B, BParts, Ps}, Acc) ->
case B =:= Routing orelse matches(BParts, RoutingParts) of
true -> fold_bind_results(Ps, Acc, Routing);
false -> Acc
end
Expand All @@ -194,19 +194,22 @@ handle_call({fold, Routing, Payload, ReqId}, From , #state{bindings=Bs}=State) -
{noreply, State};
handle_call({bind, Binding}, {From, _Ref}, #state{bindings=[]}=State) ->
link(From),
{reply, ok, State#state{bindings=[{Binding, queue:in(From, queue:new())}]}, hibernate};
BParts = lists:reverse(binary:split(Binding, <<".">>, [global])),
{reply, ok, State#state{bindings=[{Binding, BParts, queue:in(From, queue:new())}]}, hibernate};
handle_call({bind, Binding}, {From, _Ref}, #state{bindings=Bs}=State) ->
?LOG_SYS("~w is binding ~s", [From, Binding]),
case lists:keyfind(Binding, 1, Bs) of
false ->
link(From),
{reply, ok, State#state{bindings=[{Binding, queue:in(From, queue:new())} | Bs]}, hibernate};
{_, Subscribers} ->
BParts = lists:reverse(binary:split(Binding, <<".">>, [global])),
{reply, ok, State#state{bindings=[{Binding, BParts, queue:in(From, queue:new())} | Bs]}, hibernate};
{_, _, Subscribers} ->
case queue:member(From, Subscribers) of
true -> {reply, {error, exists}, State};
false ->
link(From),
{reply, ok, State#state{bindings=[{Binding, queue:in(From, Subscribers)} | lists:keydelete(Binding, 1, Bs)]}, hibernate}
BParts = lists:reverse(binary:split(Binding, <<".">>, [global])),
{reply, ok, State#state{bindings=[{Binding, BParts, queue:in(From, Subscribers)} | lists:keydelete(Binding, 1, Bs)]}, hibernate}
end
end.

Expand All @@ -226,7 +229,7 @@ handle_cast(flush, #state{bindings=Bs}=State) ->
handle_cast({flush, Binding}, #state{bindings=Bs}=State) ->
case lists:keyfind(Binding, 1, Bs) of
false -> {noreply, State};
{_, _}=B ->
{_, _, _}=B ->
flush_binding(B),
{noreply, State#state{bindings=lists:keydelete(Binding, 1, Bs)}, hibernate}
end;
Expand All @@ -245,8 +248,8 @@ handle_cast(stop, State) ->
%%--------------------------------------------------------------------
handle_info({'EXIT', Pid, _Reason}, #state{bindings=Bs}=State) ->
?LOG_SYS("subscriber ~w went down(~w)", [Pid, _Reason]),
Bs1 = lists:foldr(fun({B, Subs}, Acc) ->
[{B, remove_subscriber(Pid, Subs)} | Acc]
Bs1 = lists:foldr(fun({B, BParts, Subs}, Acc) ->
[{B, BParts, remove_subscriber(Pid, Subs)} | Acc]
end, [], Bs),
{noreply, State#state{bindings=Bs1}, hibernate};
handle_info(_Info, State) ->
Expand Down Expand Up @@ -298,25 +301,6 @@ remove_subscriber(Pid, Subs) ->
end, Subs)
end.

%%--------------------------------------------------------------------
%% @private
%% @doc Evaluate a binding against a routing key
%% Break both binaries on the '.' delimiter, reverse the resulting list of
%% symbols, and iterate through the lists until a determination is made of
%% whether there is a match.
%%
%% @end
%%--------------------------------------------------------------------
-spec binding_matches/2 :: (B, R) -> boolean() when
B :: binary(),
R :: binary().
binding_matches(B, B) -> true;
binding_matches(B, R) ->
Opts = [global],
matches(lists:reverse(binary:split(B, <<".">>, Opts))
,lists:reverse(binary:split(R, <<".">>, Opts))
).

%%--------------------------------------------------------------------
%% @private
%% @doc
Expand All @@ -325,10 +309,14 @@ binding_matches(B, R) ->
%% <<"#.6.*.1.4.*">>,<<"6.a.a.6.a.1.4.a">>
%%
%%--------------------------------------------------------------------
-spec binding_matches/2 :: (ne_binary(), ne_binary()) -> boolean().
-spec matches/2 :: ([ne_binary(),...] | [], [ne_binary(),...] | []) -> boolean().

binding_matches(B, R) ->
matches(lists:reverse(binary:split(B, <<".">>, [global]))
,lists:reverse(binary:split(R, <<".">>, [global]))).

%% if both are empty, we made it!
-spec matches/2 :: (Bs, Rs) -> boolean() when
Bs :: [binary(),...] | [],
Rs :: [binary(),...] | [].
matches([], []) -> true;
matches([<<"#">>], []) -> true;

Expand Down Expand Up @@ -486,9 +474,8 @@ wait_for_fold_binding() ->
%% let those bound know their binding is flushed
%% @end
%%--------------------------------------------------------------------
-spec flush_binding/1 :: (Binding) -> 'ok' when
Binding :: binding().
flush_binding({B, Subs}) ->
-spec flush_binding/1 :: (binding()) -> 'ok'.
flush_binding({B, _, Subs}) ->
lists:foreach(fun(S) -> S ! {binding_flushed, B} end, queue:to_list(Subs)).

%%-------------------------------------------------------------------------
Expand Down Expand Up @@ -521,9 +508,8 @@ filter_out_succeeded({_, _}) -> true.

%% EUNIT and PropEr TESTING %%

-ifdef(TEST).

-include_lib("eunit/include/eunit.hrl").
-ifdef(TEST).

-define(ROUTINGS, [ <<"foo.bar.zot">>, <<"foo.quux.zot">>, <<"foo.bar.quux.zot">>, <<"foo.zot">>, <<"foo">>, <<"xap">>]).

Expand All @@ -548,10 +534,11 @@ bindings_match_test() ->

simple_bind_test() ->
?MODULE:start_link(),
logger:start(),
logger:start_link(),

Binding = <<"foo">>,


BindFun = fun() ->
timer:sleep(500),
?assertEqual(ok, ?MODULE:bind(Binding)),
Expand Down Expand Up @@ -629,7 +616,7 @@ map_bindings_loop(B) ->
-define(BINDINGS_MAP_FOLD, [ <<"#">>, <<"foo.*.zot">>, <<"foo.#.zot">>, <<"*">>, <<"#.quux">>]).

start_server(Fun) ->
logger:start(),
logger:start_link(),
?MODULE:start_link(),
?assertEqual(ok, ?MODULE:flush()),

Expand Down Expand Up @@ -788,133 +775,4 @@ segment() ->
markers() ->
?LET(S, ?LAZY(union([[$#, $., c()], [$*, $., b()]])), lists:flatten(S)).

proper_test_() ->
{"Runs the module's PropEr tests for rebar quick commands",
{timeout, 15000,
[
?_assertEqual([], proper:module(?MODULE, [{max_shrinks, 0}]))
]}}.

%%% PropEr tests
%% Checks that the patterns for paths (a.#.*.c) match or do not
%% match a given expanded path.
prop_expands() ->
?FORALL(Paths, expanded_paths(),
?WHENFAIL(io:format("Failed on ~p~n",[Paths]),
lists:all(fun(X) -> X end, %% checks if all true
[binding_matches(Pattern, Expanded) =:= Expected ||
{Pattern, Expanded, Expected} <- Paths]))).

%%% GENERATORS

%% Gives a list of paths that were expanded, some of them to fail on purpose,
%% some of them not to.
expanded_paths() ->
?LET(P, path(),
begin
B = list_to_binary(P),
?LET({{Expanded1, IsRight1},{Expanded2, IsRight2}},
{wrong(P), right(P)},
[{B, list_to_binary(Expanded1), IsRight1},
{B, list_to_binary(Expanded2), IsRight2}])
end).

%% Tries to make a pattern wrong. Will not always suceed because a pattern
%% like "#" can be anything at all.
%%
%% Returns {Str, ShouldMatchOriginal}.
wrong(Path) ->
?LET(P, Path,
begin
{Str, Bool} = wrong(P, true, []),
{re:replace(Str, <<"\\.+">>, <<".">>, [{return, list}]), Bool}
end).

%% Will expand the patterns according to the rules so they should always match
%%
%% Returns {Str, ShouldMatchOriginal}.
right(Path) ->
?LET(P, Path, {right1(P), true}).

%% Here's why some patterns will always succeed even if we try to make them
%% wrong. This is the case of "#" which we will have to simply ignore.
%%
%% Returns {Str, ShouldMatchOriginal}.
wrong([], Bool, Acc) ->
{lists:reverse(Acc), Bool};
wrong("*.#." ++ Rest, _Bool, Acc) ->
wrong(Rest, false, [$.|Acc]);
wrong(".*.#." ++ Rest, _Bool, Acc) ->
wrong(Rest, false, [$.|Acc]);
wrong("*.#", _Bool, Acc) -> %% same as above, end of string
{lists:reverse(Acc), false};
wrong("*." ++ Rest, _Bool, Acc) ->
wrong(Rest, false, Acc);
wrong(".*", _Bool, Acc) ->
{lists:reverse(Acc), false};
wrong(".#." ++ Rest, Bool, Acc) -> %% can't make this one wrong
wrong(Rest, Bool, [$.|Acc]);
wrong("#." ++ Rest, Bool, Acc) -> %% same, start of string
wrong(Rest, Bool, Acc);
wrong(".#", Bool, Acc) -> %% same as above, end of string
{lists:reverse(Acc), Bool};
wrong([Char|Rest], Bool, Acc) when Char =/= $*, Char =/= $# ->
wrong(Rest, Bool, [Char|Acc]).

%% Returns an expanded string according to the rules
right1([]) -> [];
right1("*" ++ Rest) ->
?LET(S, segment(), S++right1(Rest));
right1(".#" ++ Rest) ->
?LET(X,
union([
"",
?LAZY(?LET(S, segment(), [$.]++S)),
?LAZY(?LET({A,B}, {segment(), segment()}, [$.]++A++[$.]++B)),
?LAZY(?LET({A,B,C}, {segment(), segment(), segment()}, [$.]++A++[$.]++B++[$.]++C))
]),
X ++ right1(Rest));
right1("#." ++ Rest) ->
?LET(X,
union([
"",
?LAZY(?LET(S, segment(), S++[$.])),
?LAZY(?LET({A,B}, {segment(), segment()}, A++[$.]++B++[$.])),
?LAZY(?LET({A,B,C}, {segment(), segment(), segment()}, A++[$.]++B++[$.]++C++[$.]))
]),
X ++ right1(Rest));
right1([Char|Rest]) ->
[Char|right1(Rest)].

%% Building a basic pattern/path string
path() ->
?LET(Base, ?LAZY(weighted_union([{3,a()}, {1,b()}])),
?LET({H,T}, {union(["*.","#.",""]), union([".*",".#",""])},
H ++ Base ++ T)).

a() ->
?LET({X,Y}, {segment(), ?LAZY(union([b(), markers()]))},
X ++ [$.] ++ Y).

b() ->
?LET({X,Y}, {segment(), ?LAZY(union([b(), c()]))},
X ++ [$.] ++ Y).

c() ->
?LAZY(union([segment(), markers()])).

segment() ->
?SUCHTHAT(
X,
list(union([choose($a,$z), choose($A,$Z), choose($0,$9)])),
length(X) =/= 0
).

markers() ->
?LET(S, ?LAZY(union([
[$#, $., b()],
[$#, $., c()],
[$*, $., b()],
[$*, $., c()]
])), lists:flatten(S)).
-endif.

0 comments on commit dd17981

Please sign in to comment.