Skip to content

Commit

Permalink
Add testcases for watchdog_config
Browse files Browse the repository at this point in the history
  • Loading branch information
Anders Svensson committed Mar 12, 2013
1 parent 21708fe commit 8b947be
Showing 1 changed file with 201 additions and 73 deletions.
274 changes: 201 additions & 73 deletions lib/diameter/test/diameter_watchdog_SUITE.erl
Expand Up @@ -30,10 +30,14 @@
end_per_suite/1]).

%% testcases
-export([reopen/1, reopen/4, reopen/6]).
-export([reopen/0, reopen/1, reopen/4, reopen/6,
suspect/1, suspect/4,
okay/1, okay/4]).

-export([id/1, %% jitter callback
run1/1]).
run1/1,
abuse/1,
abuse/2]).

%% diameter_app callbacks
-export([peer_up/3,
Expand Down Expand Up @@ -75,14 +79,14 @@
%% Watchdog timers used by the testcases.
-define(WD_TIMERS, [10000, ?WD(10000)]).

%% Watchdog timer of the misbehaving node. Longer than the peer's so
%% that DWR is never sent at watchdog timeout.
-define(PEER_WD, 20000).
%% Watchdog timer of the misbehaving node.
-define(PEER_WD, 10000).

%% A timeout that ensures one watchdog. To ensure only one watchdog
%% requires (Wd + 2000) + 1000 < 2*(Wd - 2000) ==> 7000 < Wd for the
%% case with random jitter.
-define(ONE_WD(Wd), jitter(Wd,2000) + 1000).
-define(INFO(T), #diameter_event{info = T}).

%% Receive an event message from diameter.
-define(EVENT(T),
Expand All @@ -97,16 +101,19 @@
-define(WD_EVENT(Ref), log_wd(element(4, ?EVENT({watchdog, Ref, _, _, _})))).

%% Log to make failures identifiable.
-define(LOG(T), ?LOG("~p", [T])).
-define(LOG(F,A), ct:pal("~p: " ++ F, [self() | A])).
-define(LOG(T), ?LOG("~p", [T])).
-define(LOG(F,A), ct:pal("~p: " ++ F, [self() | A])).
-define(WARN(F,A), ct:pal(error, "~p: " ++ F, [self() | A])).

%% ===========================================================================

suite() ->
[{timetrap, {minutes, 5}}]. %% 20 watchdogs @ 15 sec
[{timetrap, {seconds, 90}}].

all() ->
[reopen].
[reopen,
suspect,
okay].

init_per_suite(Config) ->
ok = diameter:start(),
Expand All @@ -122,6 +129,9 @@ end_per_suite(_Config) ->
%% Test the watchdog state machine for the required failover, failback
%% and reopen behaviour by examining watchdog events.

reopen() ->
[{timetrap, {minutes, 5}}]. %% 20 watchdogs @ 15 sec

reopen(_) ->
[] = run([[reopen, T, W, N, M]
|| T <- [listen, connect], %% watchdog to test
Expand All @@ -138,44 +148,19 @@ reopen(Test, Wd, N, M) ->
%% reopen/6

reopen(Type, Test, Ref, Wd, N, M) ->
{SvcName, TRef} = start(Type, Ref, opts(Type, Test, Ref, Wd)),
{SvcName, TRef} = start(Type, Ref, cfg(Type, Test, Wd)),
reopen(Type, Test, SvcName, TRef, Wd, N, M).

%% start/3

start(Type, Ref, Opts) ->
Name = hostname(),
true = diameter:subscribe(Name),
ok = diameter:start_service(Name, [{monitor, self()} | ?SERVICE(Name)]),
{ok, TRef} = diameter:add_transport(Name, {Type, Opts}),
true = diameter_reg:add_new({Type, Ref, Name}),
{Name, TRef}.

opts(Type, Test, Ref, Wd) ->
{Timer, Counts, Mod} = opts(Type, Test, Wd),
[{transport_module, diameter_tcp},
{transport_config, Mod ++ [{ip, ?ADDR}, {port, 0}] ++ cfg(Type, Ref)},
{watchdog_timer, Timer},
{watchdog_counts, Counts}].

cfg(listen, _) ->
[];
cfg(connect, Ref) ->
[{{_, _, SvcName}, _Pid}] = diameter_reg:wait({listen, Ref, '_'}),
[[{ref, LRef} | _]] = diameter:service_info(SvcName, transport),
[LP] = ?util:lport(tcp, LRef, 20),
[{raddr, ?ADDR}, {rport, LP}].

opts(Type, Type, Wd) ->
cfg(Type, Type, Wd) ->
{Wd, [], []};
opts(_Type, _Test, _Wd) ->
{?WD(?PEER_WD), [{open, 0}], [{module, ?MODULE}]}.
cfg(_Type, _Test, _Wd) ->
{?WD(?PEER_WD), [{okay, 0}], [{module, ?MODULE}]}.

%% reopen/7

%% The watchdog to be tested.
reopen(Type, Type, SvcName, Ref, Wd, N, M) ->
?LOG("node, ~p, ~p", [Type, Ref]),
?LOG("node ~p", [[Type, SvcName, Ref, Wd, N, M]]),

%% Connection should come up immediately as a consequence of
%% starting the watchdog process. In the accepting case this
Expand Down Expand Up @@ -309,24 +294,24 @@ reopen(Type, Type, SvcName, Ref, Wd, N, M) ->

%% The misbehaving peer.
reopen(Type, _, SvcName, Ref, Wd, N, M) ->
?LOG("peer, ~p, ~p", [Type, Ref]),
?LOG("peer ~p", [[Type, SvcName, Ref, Wd, N, M]]),

%% First transport process.
{initial, okay} = ?WD_EVENT(Ref),
?EVENT({up, Ref, _, _, #diameter_packet{}}),

reg(Type, Ref, SvcName, {SvcName, {Wd,N,M}}),
reg(Ref, SvcName, {SvcName, {Wd,N,M}}),

{okay, down} = ?WD_EVENT(Ref),

%% Second transport process.
?EVENT({watchdog, Ref, _, {_, reopen}, _}),
reg(Type, Ref, SvcName, 3),
?EVENT({watchdog, Ref, _, {_, okay}, _}),
reg(Ref, SvcName, 3), %% answer 3 watchdogs then fall silent
?EVENT({watchdog, Ref, _, {_, down}, _}),

%% Third transport process.
?EVENT({watchdog, Ref, _, {_, reopen}, _}),
reg(Type, Ref, SvcName, 0),
?EVENT({watchdog, Ref, _, {_, okay}, _}),
reg(Ref, SvcName, 0), %% disable outgoing DWA
?EVENT({watchdog, Ref, _, {_, down}, _}),

ok.
Expand All @@ -340,14 +325,6 @@ log_event(E) ->
T == watchdog orelse ?LOG("~p", [T]),
E.

%% wd_counts/1

wd_counts(SvcName) ->
[Info] = diameter:service_info(SvcName, transport),
{_, Counters} = lists:keyfind(statistics, 1, Info),
[proplists:get_value({{0,280,R}, D}, Counters, 0) || D <- [send,recv],
R <- [1,0]].

%% recv_reopen/2

recv_reopen(connect, Ref) ->
Expand All @@ -357,32 +334,32 @@ recv_reopen(connect, Ref) ->
recv_reopen(listen, Ref) ->
{_, reopen} = ?WD_EVENT(Ref).

%% reg/4
%% reg/3
%%
%% Lookup the pid of the transport process and publish a term for
%% send/2 to lookup.
reg(Type, Ref, SvcName, T) ->
TPid = tpid(Type, Ref, diameter:service_info(SvcName, transport)),
reg(TRef, SvcName, T) ->
TPid = tpid(TRef, diameter:service_info(SvcName, transport)),
true = diameter_reg:add_new({?MODULE, TPid, T}).

%% tpid/3

tpid(connect, Ref, [[{ref, Ref},
{type, connect},
{options, _},
{watchdog, _},
{peer, _},
{apps, _},
{caps, _},
{port, [{owner, TPid} | _]}
| _]]) ->
%% tpid/2

tpid(Ref, [[{ref, Ref},
{type, connect},
{options, _},
{watchdog, _},
{peer, _},
{apps, _},
{caps, _},
{port, [{owner, TPid} | _]}
| _]]) ->
TPid;

tpid(listen, Ref, [[{ref, Ref},
{type, listen},
{options, _},
{accept, As}
| _]]) ->
tpid(Ref, [[{ref, Ref},
{type, listen},
{options, _},
{accept, As}
| _]]) ->
[[{watchdog, _},
{peer, _},
{apps, _},
Expand All @@ -395,6 +372,156 @@ tpid(listen, Ref, [[{ref, Ref},
As),
TPid.

%% ===========================================================================
%% # suspect/1
%% ===========================================================================

%% Configure transports to require a set number of watchdogs before
%% moving from OKAY to SUSPECT.

suspect(_) ->
[] = run([[abuse, [suspect, N]] || N <- [0,1,3]]).

suspect(Type, Fake, Ref, N)
when is_reference(Ref) ->
{SvcName, TRef}
= start(Type, Ref, {?WD(10000), [{suspect, N}], mod(Fake)}),
{initial, okay} = ?WD_EVENT(TRef),
suspect(TRef, Fake, SvcName, N);

suspect(TRef, true, SvcName, _) ->
reg(TRef, SvcName, 0), %% disable outgoing DWA
{okay, _} = ?WD_EVENT(TRef);

suspect(TRef, false, SvcName, 0) -> %% SUSPECT disabled
%% Wait 2+ watchdogs and see that two unanswered watchdogs have
%% been sent.
[2,0,0,0] = receive
?INFO({watchdog, TRef, _, _, _} = T) -> T
after 28000 ->
wd_counts(SvcName)
end;

suspect(TRef, false, SvcName, N) ->
{okay, suspect} = ?WD_EVENT(TRef),
[N,0,0,0] = wd_counts(SvcName),
{suspect, down} = ?WD_EVENT(TRef),
[N,0,0,0] = wd_counts(SvcName).

%% abuse/1

abuse(F) ->
[] = run([[abuse, F, T] || T <- [listen, connect]]).

abuse(F, [_,_,_|_] = Args) ->
?LOG("~p", [Args]),
apply(?MODULE, F, Args);

abuse([F|A], Test) ->
Ref = make_ref(),
[] = run([[abuse, F, [T, T == Test, Ref] ++ A]
|| T <- [listen, connect]]);

abuse(F, Test) ->
abuse([F], Test).

mod(true) ->
[{module, ?MODULE}];
mod(false) ->
[].

%% ===========================================================================
%% # okay/1
%% ===========================================================================

%% Configure the number of watchdog exchanges before moving from
%% REOPEN to OKAY.

okay(_) ->
[] = run([[abuse, [okay, N]] || N <- [0,2,3]]).

okay(Type, Fake, Ref, N)
when is_reference(Ref) ->
{SvcName, TRef}
= start(Type, Ref, {?WD(10000),
[{okay, choose(Fake, 0, N)}],
mod(Fake)}),
{initial, okay} = ?WD_EVENT(TRef),
okay(TRef,
Fake,
SvcName,
choose(Type == listen, initial, down),
N).

okay(TRef, true, SvcName, Down, _) ->
reg(TRef, SvcName, 0), %% disable outgoing DWA
{okay, down} = ?WD_EVENT(TRef),
{Down, okay} = ?WD_EVENT(TRef),
reg(TRef, SvcName, -1), %% enable outgoing DWA
{okay, down} = ?WD_EVENT(TRef);

okay(TRef, false, SvcName, Down, N) ->
{okay, suspect} = ?WD_EVENT(TRef),
[1,0,0,0] = wd_counts(SvcName),
{suspect, down} = ?WD_EVENT(TRef),
ok(TRef, SvcName, Down, N).

ok(TRef, SvcName, Down, 0) ->
%% Connection comes up without watchdog exchange.
{Down, okay} = ?WD_EVENT(TRef),
[1,0,0,0] = wd_counts(SvcName),
%% Wait 2+ watchdog timeout to see that the connection stays up and
%% two watchdogs are exchanged.
ok = receive ?INFO({watchdog, TRef, _, _, _} = T) ->
T
after 28000 ->
ok
end,
[3,0,0,2] = wd_counts(SvcName);

ok(TRef, SvcName, Down, N) ->
%% Connection required watchdog exchange before reaching OKAY.
{Down, reopen} = ?WD_EVENT(TRef),
{reopen, okay} = ?WD_EVENT(TRef),
%% One DWR was sent in moving to expect, plus N more to reopen the
%% connection.
N1 = N+1,
[N1,0,0,N] = wd_counts(SvcName).

%% ===========================================================================

%% wd_counts/1

wd_counts(SvcName) ->
[Info] = diameter:service_info(SvcName, transport),
{_, Counters} = lists:keyfind(statistics, 1, Info),
[proplists:get_value({{0,280,R}, D}, Counters, 0) || D <- [send,recv],
R <- [1,0]].

%% start/3

start(Type, Ref, T) ->
Name = hostname(),
true = diameter:subscribe(Name),
ok = diameter:start_service(Name, [{monitor, self()} | ?SERVICE(Name)]),
{ok, TRef} = diameter:add_transport(Name, {Type, opts(Type, Ref, T)}),
true = diameter_reg:add_new({Type, Ref, Name}),
{Name, TRef}.

opts(Type, Ref, {Timer, Config, Mod}) ->
[{transport_module, diameter_tcp},
{transport_config, Mod ++ [{ip, ?ADDR}, {port, 0}] ++ cfg(Type, Ref)},
{watchdog_timer, Timer},
{watchdog_config, Config}].

cfg(listen, _) ->
[];
cfg(connect, Ref) ->
[{{_, _, SvcName}, _Pid}] = diameter_reg:wait({listen, Ref, '_'}),
[[{ref, LRef} | _]] = diameter:service_info(SvcName, transport),
[LP] = ?util:lport(tcp, LRef, 20),
[{raddr, ?ADDR}, {rport, LP}].

%% ===========================================================================

listen(PortNr, Opts) ->
Expand All @@ -417,6 +544,7 @@ send(Sock, Bin) ->
%% First outgoing message from a new transport process is CER/CEA.
%% Remaining outgoing messages are either DWR or DWA.
send(undefined, Sock, Bin) ->
<<_:32, _:8, 257:24, _/binary>> = Bin,
putr(config, init),
gen_tcp:send(Sock, Bin);

Expand Down Expand Up @@ -526,7 +654,7 @@ run1([F|A]) ->
catch
E:R ->
S = erlang:get_stacktrace(),
io:format("~p~n", [{A, E, R, S}]),
?WARN("~p", [{A, E, R, S}]),
S
end.

Expand Down

0 comments on commit 8b947be

Please sign in to comment.