Permalink
Browse files

heath tests full pass

  • Loading branch information...
1 parent 7190e91 commit d4bc1097e681b5d7f872f9bae6c798ed531e05b7 @lordnull lordnull committed Oct 17, 2012
Showing with 181 additions and 183 deletions.
  1. +83 −44 src/riak_core_node_watcher.erl
  2. +98 −139 test/node_watcher_qc.erl
@@ -48,7 +48,8 @@
bcast_tref,
bcast_mod = {gen_server, abcast}}).
--record(health_check, { callback :: mfa(),
+-record(health_check, { state = 'waiting' :: 'waiting' | 'checking' | 'suspend',
+ callback :: mfa(),
service_pid :: pid(),
checking_pid :: pid(),
health_failures = 0 :: non_neg_integer(),
@@ -177,12 +178,16 @@ handle_call({service_up, Id, Pid, MFA, Options}, From, State) ->
%% install the health check
CheckInterval = proplists:get_value(check_interval, Options, 60),
+ IntervalTref = case CheckInterval of
+ infinity -> undefined;
+ N -> erlang:send_after(N * 1000, self(), {check_health, Id})
+ end,
CheckRec = #health_check{
callback = MFA,
check_interval = CheckInterval,
service_pid = Pid,
max_callback_failures = proplists:get_value(max_callback_failures, Options, 3),
- interval_tref = erlang:send_after(CheckInterval * 1000, self(), {health_check, Id})
+ interval_tref = IntervalTref
},
Healths = orddict:store(Id, CheckRec, State1#state.health_checks),
{reply, ok, State1#state{health_checks = Healths}};
@@ -213,10 +218,18 @@ handle_call({node_status, Status}, _From, State) ->
Transition = {State#state.status, Status},
S2 = case Transition of
{up, down} -> %% up -> down
- local_delete(State#state { status = down });
+ Healths = [begin
+ {ok, C1} = health_fsm(suspend, S, C),
+ {S, C1}
+ end || {S, C} <- State#state.health_checks],
+ local_delete(State#state { status = down, health_checks = Healths});
{down, up} -> %% down -> up
- local_update(State#state { status = up });
+ Healths = [begin
+ {ok, C1} = health_fsm(resume, S, C),
+ {S, C1}
+ end || {S, C} <- State#state.health_checks],
+ local_update(State#state { status = up, health_checks = Healths });
{Status, Status} -> %% noop
State
@@ -513,6 +526,9 @@ internal_get_nodes(Service) ->
handle_check_msg(_Msg, undefined, State) ->
State;
+handle_check_msg(_Msg, _ServiceId, #state{status = down} = State) ->
+ % most likely a late message
+ State;
handle_check_msg(Msg, ServiceId, State) ->
case orddict:find(ServiceId, State#state.health_checks) of
error ->
@@ -573,120 +589,137 @@ handle_check_return({down, Check}, ServiceId, State) ->
%% remove health check
%% health check finished
-%% message handling when in dormant state
-health_fsm(disable, _Service, #health_check{check_interval = infinity} = InCheck) ->
- {ok, InCheck};
-
-health_fsm(check_health, Service, #health_check{check_interval = infinity} = InCheck) ->
- InCheck1 = start_health_check(Service, InCheck),
- {ok, InCheck1};
+%% message handling when suspended
+health_fsm(resume, Service, #health_check{state = suspend} = InCheck) ->
+ #health_check{health_failures = N, check_interval = V} = InCheck,
+ Tref = next_health_tref(N, V, Service),
+ OutCheck = InCheck#health_check{
+ state = waiting,
+ interval_tref = Tref
+ },
+ {ok, OutCheck};
-health_fsm(remove, _Service, #health_check{check_interval = infinity} = InCheck) ->
+health_fsm(remove, Service, #health_check{state = suspend} = InCheck) ->
ok;
-%health_fsm({'EXIT', _Pid, _Cause}, _Service, InCheck) ->
-% {ok, InCheck};
-
%% message handling when checking state
-health_fsm(disable, _Service, #health_check{checking_pid = Pid} = InCheck) when is_pid(Pid) ->
- {ok, InCheck#health_check{checking_pid = undefined, check_interval = infinity}};
+health_fsm(suspend, Service, #health_check{state = checking} = InCheck) ->
+ #health_check{checking_pid = Pid} = InCheck,
+ erlang:erase(Pid),
+ {ok, InCheck#health_check{state = suspend, checking_pid = undefined}};
-health_fsm(check_health, _Service, #health_check{checking_pid = Pid} = InCheck) when is_pid(Pid) ->
+health_fsm(check_health, _Service, #health_check{state = checking} = InCheck) ->
{ok, InCheck};
-health_fsm(remove, _Service, #health_check{checking_pid = Pid}) when is_pid(Pid) ->
+health_fsm(remove, _Service, #health_check{state = checking} = InCheck) ->
+ #health_check{checking_pid = Pid} = InCheck,
+ erlang:erase(Pid),
ok;
health_fsm({'EXIT', Pid, normal}, Service, #health_check{checking_pid = Pid, health_failures = N, max_health_failures = M} = InCheck) when N >= M ->
- Time = determine_time(0, InCheck#health_check.check_interval) * 1000,
+ Tref = next_health_tref(N, InCheck#health_check.check_interval, Service),
OutCheck = InCheck#health_check{
+ state = waiting,
checking_pid = undefined,
health_failures = 0,
callback_failures = 0,
- interval_tref = erlang:send_after(Time * 1000, self(), {check_health, Service})
+ interval_tref = Tref
},
{up, OutCheck};
health_fsm({'EXIT', Pid, normal}, Service, #health_check{checking_pid = Pid, health_failures = N, max_health_failures = M} = InCheck) when N < M ->
- Time = determine_time(N, InCheck#health_check.check_interval) * 1000,
+ Tref = next_health_tref(N, InCheck#health_check.check_interval, Service),
OutCheck = InCheck#health_check{
+ state = waiting,
checking_pid = undefined,
health_failures = 0,
callback_failures = 0,
- interval_tref = erlang:send_after(Time * 1000, self(), {check_health, Service})
+ interval_tref = Tref
},
{ok, OutCheck};
health_fsm({'EXIT', Pid, false}, Service, #health_check{health_failures = N, max_health_failures = M, checking_pid = Pid} = InCheck) when N + 1 == M ->
- Time = determine_time(N + 1, InCheck#health_check.check_interval),
+ Tref = next_health_tref(N, InCheck#health_check.check_interval, Service),
OutCheck = InCheck#health_check{
+ state = waiting,
checking_pid = undefined,
health_failures = N + 1,
callback_failures = 0,
- interval_tref = erlang:send_after(Time * 1000, self(), {check_health, Service})
+ interval_tref = Tref
},
{down, OutCheck};
health_fsm({'EXIT', Pid, false}, Service, #health_check{health_failures = N, max_health_failures = M, checking_pid = Pid} = InCheck) when N >= M ->
- Time = determine_time(N + 1, InCheck#health_check.check_interval),
+ Tref = next_health_tref(N, InCheck#health_check.check_interval, Service),
OutCheck = InCheck#health_check{
+ state = waiting,
checking_pid = undefined,
health_failures = N + 1,
callback_failures = 0,
- interval_tref = erlang:send_after(Time * 1000, self(), {check_health, Service})
+ interval_tref = Tref
},
{ok, OutCheck};
health_fsm({'EXIT', Pid, false}, Service, #health_check{health_failures = N, max_health_failures = M, checking_pid = Pid} = InCheck) ->
- Time = determine_time(N + 1, InCheck#health_check.check_interval),
+ Tref = next_health_tref(N, InCheck#health_check.check_interval, Service),
OutCheck = InCheck#health_check{
+ state = waiting,
checking_pid = undefined,
health_failures = N + 1,
callback_failures = 0,
- interval_tref = erlang:send_after(Time * 1000, self(), {check_health, Service})
+ interval_tref = Tref
},
{ok, OutCheck};
health_fsm({'EXIT', Pid, Cause}, Service, #health_check{checking_pid = Pid} = InCheck) ->
lager:error("health check process for ~p error'ed: ~p", [Service, Cause]),
+ {M,F,A} = InCheck#health_check.callback,
Fails = InCheck#health_check.callback_failures + 1,
if
Fails == InCheck#health_check.max_callback_failures ->
lager:error("health check callback for ~p failed too many times, disabling.", [Service]),
- {ok, InCheck#health_check{checking_pid = undefined, callback_failures = Fails}};
+ {down, InCheck#health_check{state = suspend, checking_pid = undefined, callback_failures = Fails}};
Fails < InCheck#health_check.max_callback_failures ->
#health_check{health_failures = N, check_interval = Inter} = InCheck,
- Time = determine_time(N, Inter),
- Tref = erlang:send_after(Time * 1000, self(), {check_health, Service}),
- OutCheck = InCheck#health_check{checking_pid = undefined,
- callback_failures = Fails, interval_tref = Tref},
+ Tref = next_health_tref(N, Inter, Service),
+ OutCheck = InCheck#health_check{state = waiting,
+ checking_pid = undefined, callback_failures = Fails,
+ interval_tref = Tref},
{ok, OutCheck};
true ->
{ok, InCheck#health_check{checking_pid = undefined, callback_failures = Fails}}
end;
%% message handling when in waiting state
-health_fsm(disable, _Service, #health_check{interval_tref = Tref} = InCheck) ->
- erlang:cancel_timer(Tref),
- {ok, InCheck#health_check{interval_tref = undefined, check_interval = infinity}};
+health_fsm(suspend, _Service, #health_check{state = waiting} = InCheck) ->
+ case InCheck#health_check.interval_tref of
+ undefined -> ok;
+ _ -> erlang:cancel_timer(InCheck#health_check.interval_tref)
+ end,
+ {ok, InCheck#health_check{state = suspend, interval_tref = undefined}};
-health_fsm(check_health, Service, InCheck) ->
+health_fsm(check_health, Service, #health_check{state = waiting} = InCheck) ->
InCheck1 = start_health_check(Service, InCheck),
{ok, InCheck1};
-health_fsm(remove, _Service, #health_check{interval_tref = Tref}) ->
- erlang:cancel_timer(Tref),
+health_fsm(remove, _Service, #health_check{state = waiting, interval_tref = Tref}) ->
+ case Tref of
+ undefined -> ok;
+ _ -> erlang:cancel_timer(Tref)
+ end,
ok;
% fallthrough handling
-health_fsm(Msg, _Service, Health) ->
+health_fsm(Msg, Service, Health) ->
{ok, Health}.
start_health_check(Service, #health_check{checking_pid = undefined} = CheckRec) ->
{Mod, Func, Args} = CheckRec#health_check.callback,
Pid = CheckRec#health_check.service_pid,
- Tref = CheckRec#health_check.interval_tref,
- erlang:cancel_timer(Tref),
+ case CheckRec#health_check.interval_tref of
+ undefined -> ok;
+ Tref -> erlang:cancel_timer(Tref)
+ end,
CheckingPid = proc_lib:spawn_link(fun() ->
case erlang:apply(Mod, Func, [Pid | Args]) of
true -> ok;
@@ -695,10 +728,16 @@ start_health_check(Service, #health_check{checking_pid = undefined} = CheckRec)
end
end),
erlang:put(CheckingPid, Service),
- CheckRec#health_check{checking_pid = CheckingPid, interval_tref = undefined};
+ CheckRec#health_check{state = checking, checking_pid = CheckingPid, interval_tref = undefined};
start_health_check(_Service, Check) ->
Check.
+next_health_tref(_, infinity, _) ->
+ undefined;
+next_health_tref(N, V, Service) ->
+ Time = determine_time(N, V),
+ erlang:send_after(Time * 1000, self(), {check_health, Service}).
+
determine_time(Failures, BaseInterval) when Failures < 4 ->
BaseInterval;
Oops, something went wrong.

0 comments on commit d4bc109

Please sign in to comment.