diff --git a/lib/diameter/test/diameter_watchdog_SUITE.erl b/lib/diameter/test/diameter_watchdog_SUITE.erl index 388177a0bf19..82244a1c7f4d 100644 --- a/lib/diameter/test/diameter_watchdog_SUITE.erl +++ b/lib/diameter/test/diameter_watchdog_SUITE.erl @@ -30,10 +30,14 @@ end_per_suite/1]). %% testcases --export([reopen/1, reopen/4, reopen/6]). +-export([reopen/0, reopen/1, reopen/4, reopen/6, + suspect/1, suspect/4, + okay/1, okay/4]). -export([id/1, %% jitter callback - run1/1]). + run1/1, + abuse/1, + abuse/2]). %% diameter_app callbacks -export([peer_up/3, @@ -75,14 +79,14 @@ %% Watchdog timers used by the testcases. -define(WD_TIMERS, [10000, ?WD(10000)]). -%% Watchdog timer of the misbehaving node. Longer than the peer's so -%% that DWR is never sent at watchdog timeout. --define(PEER_WD, 20000). +%% Watchdog timer of the misbehaving node. +-define(PEER_WD, 10000). %% A timeout that ensures one watchdog. To ensure only one watchdog %% requires (Wd + 2000) + 1000 < 2*(Wd - 2000) ==> 7000 < Wd for the %% case with random jitter. -define(ONE_WD(Wd), jitter(Wd,2000) + 1000). +-define(INFO(T), #diameter_event{info = T}). %% Receive an event message from diameter. -define(EVENT(T), @@ -97,16 +101,19 @@ -define(WD_EVENT(Ref), log_wd(element(4, ?EVENT({watchdog, Ref, _, _, _})))). %% Log to make failures identifiable. --define(LOG(T), ?LOG("~p", [T])). --define(LOG(F,A), ct:pal("~p: " ++ F, [self() | A])). +-define(LOG(T), ?LOG("~p", [T])). +-define(LOG(F,A), ct:pal("~p: " ++ F, [self() | A])). +-define(WARN(F,A), ct:pal(error, "~p: " ++ F, [self() | A])). %% =========================================================================== suite() -> - [{timetrap, {minutes, 5}}]. %% 20 watchdogs @ 15 sec + [{timetrap, {seconds, 90}}]. all() -> - [reopen]. + [reopen, + suspect, + okay]. init_per_suite(Config) -> ok = diameter:start(), @@ -122,6 +129,9 @@ end_per_suite(_Config) -> %% Test the watchdog state machine for the required failover, failback %% and reopen behaviour by examining watchdog events. +reopen() -> + [{timetrap, {minutes, 5}}]. %% 20 watchdogs @ 15 sec + reopen(_) -> [] = run([[reopen, T, W, N, M] || T <- [listen, connect], %% watchdog to test @@ -138,44 +148,19 @@ reopen(Test, Wd, N, M) -> %% reopen/6 reopen(Type, Test, Ref, Wd, N, M) -> - {SvcName, TRef} = start(Type, Ref, opts(Type, Test, Ref, Wd)), + {SvcName, TRef} = start(Type, Ref, cfg(Type, Test, Wd)), reopen(Type, Test, SvcName, TRef, Wd, N, M). -%% start/3 - -start(Type, Ref, Opts) -> - Name = hostname(), - true = diameter:subscribe(Name), - ok = diameter:start_service(Name, [{monitor, self()} | ?SERVICE(Name)]), - {ok, TRef} = diameter:add_transport(Name, {Type, Opts}), - true = diameter_reg:add_new({Type, Ref, Name}), - {Name, TRef}. - -opts(Type, Test, Ref, Wd) -> - {Timer, Counts, Mod} = opts(Type, Test, Wd), - [{transport_module, diameter_tcp}, - {transport_config, Mod ++ [{ip, ?ADDR}, {port, 0}] ++ cfg(Type, Ref)}, - {watchdog_timer, Timer}, - {watchdog_counts, Counts}]. - -cfg(listen, _) -> - []; -cfg(connect, Ref) -> - [{{_, _, SvcName}, _Pid}] = diameter_reg:wait({listen, Ref, '_'}), - [[{ref, LRef} | _]] = diameter:service_info(SvcName, transport), - [LP] = ?util:lport(tcp, LRef, 20), - [{raddr, ?ADDR}, {rport, LP}]. - -opts(Type, Type, Wd) -> +cfg(Type, Type, Wd) -> {Wd, [], []}; -opts(_Type, _Test, _Wd) -> - {?WD(?PEER_WD), [{open, 0}], [{module, ?MODULE}]}. +cfg(_Type, _Test, _Wd) -> + {?WD(?PEER_WD), [{okay, 0}], [{module, ?MODULE}]}. %% reopen/7 %% The watchdog to be tested. reopen(Type, Type, SvcName, Ref, Wd, N, M) -> - ?LOG("node, ~p, ~p", [Type, Ref]), + ?LOG("node ~p", [[Type, SvcName, Ref, Wd, N, M]]), %% Connection should come up immediately as a consequence of %% starting the watchdog process. In the accepting case this @@ -309,24 +294,24 @@ reopen(Type, Type, SvcName, Ref, Wd, N, M) -> %% The misbehaving peer. reopen(Type, _, SvcName, Ref, Wd, N, M) -> - ?LOG("peer, ~p, ~p", [Type, Ref]), + ?LOG("peer ~p", [[Type, SvcName, Ref, Wd, N, M]]), %% First transport process. {initial, okay} = ?WD_EVENT(Ref), ?EVENT({up, Ref, _, _, #diameter_packet{}}), - reg(Type, Ref, SvcName, {SvcName, {Wd,N,M}}), + reg(Ref, SvcName, {SvcName, {Wd,N,M}}), {okay, down} = ?WD_EVENT(Ref), %% Second transport process. - ?EVENT({watchdog, Ref, _, {_, reopen}, _}), - reg(Type, Ref, SvcName, 3), + ?EVENT({watchdog, Ref, _, {_, okay}, _}), + reg(Ref, SvcName, 3), %% answer 3 watchdogs then fall silent ?EVENT({watchdog, Ref, _, {_, down}, _}), %% Third transport process. - ?EVENT({watchdog, Ref, _, {_, reopen}, _}), - reg(Type, Ref, SvcName, 0), + ?EVENT({watchdog, Ref, _, {_, okay}, _}), + reg(Ref, SvcName, 0), %% disable outgoing DWA ?EVENT({watchdog, Ref, _, {_, down}, _}), ok. @@ -340,14 +325,6 @@ log_event(E) -> T == watchdog orelse ?LOG("~p", [T]), E. -%% wd_counts/1 - -wd_counts(SvcName) -> - [Info] = diameter:service_info(SvcName, transport), - {_, Counters} = lists:keyfind(statistics, 1, Info), - [proplists:get_value({{0,280,R}, D}, Counters, 0) || D <- [send,recv], - R <- [1,0]]. - %% recv_reopen/2 recv_reopen(connect, Ref) -> @@ -357,32 +334,32 @@ recv_reopen(connect, Ref) -> recv_reopen(listen, Ref) -> {_, reopen} = ?WD_EVENT(Ref). -%% reg/4 +%% reg/3 %% %% Lookup the pid of the transport process and publish a term for %% send/2 to lookup. -reg(Type, Ref, SvcName, T) -> - TPid = tpid(Type, Ref, diameter:service_info(SvcName, transport)), +reg(TRef, SvcName, T) -> + TPid = tpid(TRef, diameter:service_info(SvcName, transport)), true = diameter_reg:add_new({?MODULE, TPid, T}). -%% tpid/3 - -tpid(connect, Ref, [[{ref, Ref}, - {type, connect}, - {options, _}, - {watchdog, _}, - {peer, _}, - {apps, _}, - {caps, _}, - {port, [{owner, TPid} | _]} - | _]]) -> +%% tpid/2 + +tpid(Ref, [[{ref, Ref}, + {type, connect}, + {options, _}, + {watchdog, _}, + {peer, _}, + {apps, _}, + {caps, _}, + {port, [{owner, TPid} | _]} + | _]]) -> TPid; -tpid(listen, Ref, [[{ref, Ref}, - {type, listen}, - {options, _}, - {accept, As} - | _]]) -> +tpid(Ref, [[{ref, Ref}, + {type, listen}, + {options, _}, + {accept, As} + | _]]) -> [[{watchdog, _}, {peer, _}, {apps, _}, @@ -395,6 +372,156 @@ tpid(listen, Ref, [[{ref, Ref}, As), TPid. +%% =========================================================================== +%% # suspect/1 +%% =========================================================================== + +%% Configure transports to require a set number of watchdogs before +%% moving from OKAY to SUSPECT. + +suspect(_) -> + [] = run([[abuse, [suspect, N]] || N <- [0,1,3]]). + +suspect(Type, Fake, Ref, N) + when is_reference(Ref) -> + {SvcName, TRef} + = start(Type, Ref, {?WD(10000), [{suspect, N}], mod(Fake)}), + {initial, okay} = ?WD_EVENT(TRef), + suspect(TRef, Fake, SvcName, N); + +suspect(TRef, true, SvcName, _) -> + reg(TRef, SvcName, 0), %% disable outgoing DWA + {okay, _} = ?WD_EVENT(TRef); + +suspect(TRef, false, SvcName, 0) -> %% SUSPECT disabled + %% Wait 2+ watchdogs and see that two unanswered watchdogs have + %% been sent. + [2,0,0,0] = receive + ?INFO({watchdog, TRef, _, _, _} = T) -> T + after 28000 -> + wd_counts(SvcName) + end; + +suspect(TRef, false, SvcName, N) -> + {okay, suspect} = ?WD_EVENT(TRef), + [N,0,0,0] = wd_counts(SvcName), + {suspect, down} = ?WD_EVENT(TRef), + [N,0,0,0] = wd_counts(SvcName). + +%% abuse/1 + +abuse(F) -> + [] = run([[abuse, F, T] || T <- [listen, connect]]). + +abuse(F, [_,_,_|_] = Args) -> + ?LOG("~p", [Args]), + apply(?MODULE, F, Args); + +abuse([F|A], Test) -> + Ref = make_ref(), + [] = run([[abuse, F, [T, T == Test, Ref] ++ A] + || T <- [listen, connect]]); + +abuse(F, Test) -> + abuse([F], Test). + +mod(true) -> + [{module, ?MODULE}]; +mod(false) -> + []. + +%% =========================================================================== +%% # okay/1 +%% =========================================================================== + +%% Configure the number of watchdog exchanges before moving from +%% REOPEN to OKAY. + +okay(_) -> + [] = run([[abuse, [okay, N]] || N <- [0,2,3]]). + +okay(Type, Fake, Ref, N) + when is_reference(Ref) -> + {SvcName, TRef} + = start(Type, Ref, {?WD(10000), + [{okay, choose(Fake, 0, N)}], + mod(Fake)}), + {initial, okay} = ?WD_EVENT(TRef), + okay(TRef, + Fake, + SvcName, + choose(Type == listen, initial, down), + N). + +okay(TRef, true, SvcName, Down, _) -> + reg(TRef, SvcName, 0), %% disable outgoing DWA + {okay, down} = ?WD_EVENT(TRef), + {Down, okay} = ?WD_EVENT(TRef), + reg(TRef, SvcName, -1), %% enable outgoing DWA + {okay, down} = ?WD_EVENT(TRef); + +okay(TRef, false, SvcName, Down, N) -> + {okay, suspect} = ?WD_EVENT(TRef), + [1,0,0,0] = wd_counts(SvcName), + {suspect, down} = ?WD_EVENT(TRef), + ok(TRef, SvcName, Down, N). + +ok(TRef, SvcName, Down, 0) -> + %% Connection comes up without watchdog exchange. + {Down, okay} = ?WD_EVENT(TRef), + [1,0,0,0] = wd_counts(SvcName), + %% Wait 2+ watchdog timeout to see that the connection stays up and + %% two watchdogs are exchanged. + ok = receive ?INFO({watchdog, TRef, _, _, _} = T) -> + T + after 28000 -> + ok + end, + [3,0,0,2] = wd_counts(SvcName); + +ok(TRef, SvcName, Down, N) -> + %% Connection required watchdog exchange before reaching OKAY. + {Down, reopen} = ?WD_EVENT(TRef), + {reopen, okay} = ?WD_EVENT(TRef), + %% One DWR was sent in moving to expect, plus N more to reopen the + %% connection. + N1 = N+1, + [N1,0,0,N] = wd_counts(SvcName). + +%% =========================================================================== + +%% wd_counts/1 + +wd_counts(SvcName) -> + [Info] = diameter:service_info(SvcName, transport), + {_, Counters} = lists:keyfind(statistics, 1, Info), + [proplists:get_value({{0,280,R}, D}, Counters, 0) || D <- [send,recv], + R <- [1,0]]. + +%% start/3 + +start(Type, Ref, T) -> + Name = hostname(), + true = diameter:subscribe(Name), + ok = diameter:start_service(Name, [{monitor, self()} | ?SERVICE(Name)]), + {ok, TRef} = diameter:add_transport(Name, {Type, opts(Type, Ref, T)}), + true = diameter_reg:add_new({Type, Ref, Name}), + {Name, TRef}. + +opts(Type, Ref, {Timer, Config, Mod}) -> + [{transport_module, diameter_tcp}, + {transport_config, Mod ++ [{ip, ?ADDR}, {port, 0}] ++ cfg(Type, Ref)}, + {watchdog_timer, Timer}, + {watchdog_config, Config}]. + +cfg(listen, _) -> + []; +cfg(connect, Ref) -> + [{{_, _, SvcName}, _Pid}] = diameter_reg:wait({listen, Ref, '_'}), + [[{ref, LRef} | _]] = diameter:service_info(SvcName, transport), + [LP] = ?util:lport(tcp, LRef, 20), + [{raddr, ?ADDR}, {rport, LP}]. + %% =========================================================================== listen(PortNr, Opts) -> @@ -417,6 +544,7 @@ send(Sock, Bin) -> %% First outgoing message from a new transport process is CER/CEA. %% Remaining outgoing messages are either DWR or DWA. send(undefined, Sock, Bin) -> + <<_:32, _:8, 257:24, _/binary>> = Bin, putr(config, init), gen_tcp:send(Sock, Bin); @@ -526,7 +654,7 @@ run1([F|A]) -> catch E:R -> S = erlang:get_stacktrace(), - io:format("~p~n", [{A, E, R, S}]), + ?WARN("~p", [{A, E, R, S}]), S end.