Skip to content
Browse files

Much improved/useful health check: qc not passing still.

Doesn't play nicely with interweaved node up, service up, service down
and node downs yet.
  • Loading branch information...
1 parent 183b7a4 commit 7190e9102a648394718b122b0bfac1981059dabe @lordnull lordnull committed
Showing with 287 additions and 212 deletions.
  1. +218 −180 src/riak_core_node_watcher.erl
  2. +69 −32 test/node_watcher_qc.erl
View
398 src/riak_core_node_watcher.erl
@@ -142,6 +142,15 @@ handle_call({service_up, Id, Pid}, _From, State) ->
Services = ordsets:add_element(Id, State#state.services),
S2 = State#state { services = Services },
+ %% remove any existing health checks
+ Healths = case orddict:find(Id, State#state.health_checks) of
+ error ->
+ State#state.health_checks;
+ {ok, Check} ->
+ health_fsm(remove, Id, Check),
+ orddict:erase(Id, State#state.health_checks)
+ end,
+
%% Remove any existing mrefs for this service
delete_service_mref(Id),
@@ -152,14 +161,19 @@ handle_call({service_up, Id, Pid}, _From, State) ->
%% Update our local ETS table and broadcast
S3 = local_update(S2),
- {reply, ok, update_avsn(S3)};
+ {reply, ok, update_avsn(S3#state{health_checks = Healths})};
handle_call({service_up, Id, Pid, MFA, Options}, From, State) ->
%% update the active set of services if needed.
{reply, _, State1} = handle_call({service_up, Id, Pid}, From, State),
%% uninstall old health check
- State2 = cancel_health_check(Id, State1),
+ case orddict:find(Id, State#state.health_checks) of
+ {ok, OldCheck} ->
+ health_fsm(remove, Id, OldCheck);
+ error ->
+ ok
+ end,
%% install the health check
CheckInterval = proplists:get_value(check_interval, Options, 60),
@@ -171,7 +185,7 @@ handle_call({service_up, Id, Pid, MFA, Options}, From, State) ->
interval_tref = erlang:send_after(CheckInterval * 1000, self(), {health_check, Id})
},
Healths = orddict:store(Id, CheckRec, State1#state.health_checks),
- {reply, ok, State2#state{health_checks = Healths}};
+ {reply, ok, State1#state{health_checks = Healths}};
handle_call({service_down, Id}, _From, State) ->
%% Update the set of active services locally
@@ -183,11 +197,17 @@ handle_call({service_down, Id}, _From, State) ->
%% Update local ETS table and broadcast
S3 = local_update(S2),
- {reply, ok, update_avsn(S3)};
-handle_call({service_down, Id, health_check}, From, State) ->
- State1 = cancel_health_check(Id, State),
- handle_call({service_down, Id}, From, State1);
+ %% Remove health check if any
+ case orddict:find(Id, State#state.health_checks) of
+ error ->
+ ok;
+ {ok, Check} ->
+ health_fsm(remove, Id, Check)
+ end,
+
+ Healths = orddict:erase(Id, S3#state.health_checks),
+ {reply, ok, update_avsn(S3#state{health_checks = Healths})};
handle_call({node_status, Status}, _From, State) ->
Transition = {State#state.status, Status},
@@ -223,17 +243,7 @@ handle_cast({up, Node, Services}, State) ->
handle_cast({down, Node}, State) ->
node_down(Node, State),
- {noreply, update_avsn(State)};
-
-handle_cast({force_health_check, Id}, State) ->
- case orddict:find(Id, State#state.health_checks) of
- error ->
- {noreply, State};
- {ok, CheckRec} ->
- CheckRec2 = trigger_health_check(Id, CheckRec, true),
- Checks = orddict:store(Id, CheckRec2, State#state.health_checks),
- {noreply, State#state{health_checks = Checks}}
- end.
+ {noreply, update_avsn(State)}.
handle_info({nodeup, _Node}, State) ->
%% Ignore node up events; nothing to do here...
@@ -255,42 +265,30 @@ handle_info({'DOWN', Mref, _, _Pid, _Info}, State) ->
%% Remove the id<->mref entries in the pdict
delete_service_mref(Id),
+ %% remove any health checks in place
+ case orddict:find(Id, State#state.health_checks) of
+ error ->
+ ok;
+ {ok, Health} ->
+ health_fsm(remove, Id, Health)
+ end,
+ Healths = orddict:erase(Id, State#state.health_checks),
+
%% Update our list of active services and ETS table
Services = ordsets:del_element(Id, State#state.services),
S2 = State#state { services = Services },
local_update(S2),
- {noreply, update_avsn(S2)}
+ {noreply, update_avsn(S2#state{health_checks = Healths})}
end;
-handle_info({'EXIT', Pid, Cause}, State) ->
- case erlang:erase(Pid) of
- undefined ->
- % likely a late exit from a canceled health check.
- {noreply, State};
- Id ->
- State2 = handle_health_check_exit(Id, Cause, State),
- {noreply, State2}
- end;
-
-handle_info({health_check, Id}, State) ->
- case orddict:find(Id, State#state.health_checks) of
- error ->
- {noreply, State};
- {ok, CheckRec} ->
- CheckRec2 = trigger_health_check(Id, CheckRec, false),
- Checks = orddict:store(Id, CheckRec2, State#state.health_checks),
- {noreply, State#state{health_checks = Checks}}
- end;
+handle_info({'EXIT', Pid, Cause} = Msg, State) ->
+ Service = erlang:erase(Pid),
+ State2 = handle_check_msg(Msg, Service, State),
+ {noreply, State2};
-handle_info({force_health_check, Id}, State) ->
- case orddict:find(Id, State#state.health_checks) of
- error ->
- {noreply, State};
- {ok, CheckRec} ->
- CheckRec2 = trigger_health_check(Id, CheckRec, true),
- Checks = orddict:store(Id, CheckRec2, State#state.health_checks),
- {noreply, State#state{health_checks = Checks}}
- end;
+handle_info({check_health, Id}, State) ->
+ State2 = handle_check_msg(check_health, Id, State),
+ {noreply, State2};
handle_info({gen_event_EXIT, _, _}, State) ->
%% Ring event handler has been removed for some reason; re-register
@@ -513,122 +511,193 @@ internal_get_nodes(Service) ->
[]
end.
-cancel_health_check(Id, State) ->
- case orddict:find(Id, State#state.health_checks) of
+handle_check_msg(_Msg, undefined, State) ->
+ State;
+handle_check_msg(Msg, ServiceId, State) ->
+ case orddict:find(ServiceId, State#state.health_checks) of
error ->
State;
- {ok, CheckRec} ->
- cancel_health_check(CheckRec),
- HealthChecks = orddict:erase(Id, State#state.health_checks),
- State#state{health_checks = HealthChecks}
+ {ok, Check} ->
+ CheckReturn = health_fsm(Msg, ServiceId, Check),
+ handle_check_return(CheckReturn, ServiceId, State)
end.
-cancel_health_check(#health_check{checking_pid = Pid}) when is_pid(Pid) ->
- %% Yes, I'm not killing the pid, I'm just unlinking
- unlink(Pid),
- erlang:erase(Pid),
+handle_check_return(ok, ServiceId, State) ->
+ Healths = orddict:erase(ServiceId, State#state.health_checks),
+ State#state{health_checks = Healths};
+handle_check_return({ok, Check}, ServiceId, State) ->
+ Healths = orddict:store(ServiceId, Check, State#state.health_checks),
+ State#state{health_checks = Healths};
+handle_check_return({up, Check}, ServiceId, State) ->
+ #health_check{service_pid = Pid} = Check,
+ Healths = orddict:store(ServiceId, Check, State#state.health_checks),
+
+ %% Update the set of active services locally
+ Services = ordsets:add_element(ServiceId, State#state.services),
+ S2 = State#state { services = Services },
+
+ %% Remove any existing mrefs for this service
+ delete_service_mref(ServiceId),
+
+ %% Setup a monitor for the Pid representing this service
+ Mref = erlang:monitor(process, Pid),
+ erlang:put(Mref, ServiceId),
+ erlang:put(ServiceId, Mref),
+
+ %% Update our local ETS table and broadcast
+ S3 = local_update(S2),
+ update_avsn(S3#state{health_checks = Healths});
+handle_check_return({down, Check}, ServiceId, State) ->
+ Healths = orddict:store(ServiceId, Check, State#state.health_checks),
+
+ %% Update the set of active services locally
+ Services = ordsets:del_element(ServiceId, State#state.services),
+ S2 = State#state { services = Services },
+
+ %% Remove any existing mrefs for this service
+ delete_service_mref(ServiceId),
+
+ %% Update local ETS table and broadcast
+ S3 = local_update(S2),
+
+ update_avsn(S3#state{health_checks = Healths}).
+
+%% health checks are an fsm to make mental modeling easier.
+%% There are X states:
+%% waiting: in between check intervals
+%% dormant: Check interval disabled
+%% checking: health check in progress
+%% messages to handle:
+%% go dormant
+%% do a scheduled health check
+%% remove health check
+%% health check finished
+
+%% message handling when in dormant state
+health_fsm(disable, _Service, #health_check{check_interval = infinity} = InCheck) ->
+ {ok, InCheck};
+
+health_fsm(check_health, Service, #health_check{check_interval = infinity} = InCheck) ->
+ InCheck1 = start_health_check(Service, InCheck),
+ {ok, InCheck1};
+
+health_fsm(remove, _Service, #health_check{check_interval = infinity} = InCheck) ->
ok;
-cancel_health_check(#health_check{interval_tref = Tref}) ->
- case erlang:cancel_timer(Tref) of
- false ->
- lager:notice("Canceling health check timer already canceled or sent"),
- ok;
- _N ->
- ok
- end.
+%health_fsm({'EXIT', _Pid, _Cause}, _Service, InCheck) ->
+% {ok, InCheck};
-schedule_health_check(Id, CheckRec) ->
- Override = app_helper:get_env(riak_core, {health_check_interval, Id}),
- schedule_health_check(Id, CheckRec, Override).
+%% message handling when checking state
+health_fsm(disable, _Service, #health_check{checking_pid = Pid} = InCheck) when is_pid(Pid) ->
+ {ok, InCheck#health_check{checking_pid = undefined, check_interval = infinity}};
-schedule_health_check(_Id, #health_check{check_interval = infinity} = CheckRec, undefined) ->
- CheckRec#health_check{interval_tref = undefined};
+health_fsm(check_health, _Service, #health_check{checking_pid = Pid} = InCheck) when is_pid(Pid) ->
+ {ok, InCheck};
-schedule_health_check(Id, #health_check{callback_failures = N, max_callback_failures = N} = CheckRec, undefined) ->
- lager:warning("No further checking of ~p due to max callback failures", [Id]),
- CheckRec#health_check{interval_tref = undefined};
+health_fsm(remove, _Service, #health_check{checking_pid = Pid}) when is_pid(Pid) ->
+ ok;
-schedule_health_check(Id, #health_check{health_failures = 0, check_interval = N} = CheckRec, undefined) ->
- Timer = erlang:send_after(N * 1000, self(), {health_check, Id}),
- CheckRec#health_check{interval_tref = Timer};
+health_fsm({'EXIT', Pid, normal}, Service, #health_check{checking_pid = Pid, health_failures = N, max_health_failures = M} = InCheck) when N >= M ->
+ Time = determine_time(0, InCheck#health_check.check_interval) * 1000,
+ OutCheck = InCheck#health_check{
+ checking_pid = undefined,
+ health_failures = 0,
+ callback_failures = 0,
+ interval_tref = erlang:send_after(Time * 1000, self(), {check_health, Service})
+ },
+ {up, OutCheck};
+
+health_fsm({'EXIT', Pid, normal}, Service, #health_check{checking_pid = Pid, health_failures = N, max_health_failures = M} = InCheck) when N < M ->
+ Time = determine_time(N, InCheck#health_check.check_interval) * 1000,
+ OutCheck = InCheck#health_check{
+ checking_pid = undefined,
+ health_failures = 0,
+ callback_failures = 0,
+ interval_tref = erlang:send_after(Time * 1000, self(), {check_health, Service})
+ },
+ {ok, OutCheck};
+
+health_fsm({'EXIT', Pid, false}, Service, #health_check{health_failures = N, max_health_failures = M, checking_pid = Pid} = InCheck) when N + 1 == M ->
+ Time = determine_time(N + 1, InCheck#health_check.check_interval),
+ OutCheck = InCheck#health_check{
+ checking_pid = undefined,
+ health_failures = N + 1,
+ callback_failures = 0,
+ interval_tref = erlang:send_after(Time * 1000, self(), {check_health, Service})
+ },
+ {down, OutCheck};
+
+health_fsm({'EXIT', Pid, false}, Service, #health_check{health_failures = N, max_health_failures = M, checking_pid = Pid} = InCheck) when N >= M ->
+ Time = determine_time(N + 1, InCheck#health_check.check_interval),
+ OutCheck = InCheck#health_check{
+ checking_pid = undefined,
+ health_failures = N + 1,
+ callback_failures = 0,
+ interval_tref = erlang:send_after(Time * 1000, self(), {check_health, Service})
+ },
+ {ok, OutCheck};
+
+health_fsm({'EXIT', Pid, false}, Service, #health_check{health_failures = N, max_health_failures = M, checking_pid = Pid} = InCheck) ->
+ Time = determine_time(N + 1, InCheck#health_check.check_interval),
+ OutCheck = InCheck#health_check{
+ checking_pid = undefined,
+ health_failures = N + 1,
+ callback_failures = 0,
+ interval_tref = erlang:send_after(Time * 1000, self(), {check_health, Service})
+ },
+ {ok, OutCheck};
+
+health_fsm({'EXIT', Pid, Cause}, Service, #health_check{checking_pid = Pid} = InCheck) ->
+ lager:error("health check process for ~p error'ed: ~p", [Service, Cause]),
+ Fails = InCheck#health_check.callback_failures + 1,
+ if
+ Fails == InCheck#health_check.max_callback_failures ->
+ lager:error("health check callback for ~p failed too many times, disabling.", [Service]),
+ {ok, InCheck#health_check{checking_pid = undefined, callback_failures = Fails}};
+ Fails < InCheck#health_check.max_callback_failures ->
+ #health_check{health_failures = N, check_interval = Inter} = InCheck,
+ Time = determine_time(N, Inter),
+ Tref = erlang:send_after(Time * 1000, self(), {check_health, Service}),
+ OutCheck = InCheck#health_check{checking_pid = undefined,
+ callback_failures = Fails, interval_tref = Tref},
+ {ok, OutCheck};
+ true ->
+ {ok, InCheck#health_check{checking_pid = undefined, callback_failures = Fails}}
+ end;
-schedule_health_check(Id, #health_check{health_failures = N, check_interval = Interval} = CheckRec, undefined) ->
- Seconds = determine_time(N, Interval),
- Timer = erlang:send_after(round(Seconds * 1000), self(), {health_check, Id, false}),
- CheckRec#health_check{interval_tref = Timer};
+%% message handling when in waiting state
+health_fsm(disable, _Service, #health_check{interval_tref = Tref} = InCheck) ->
+ erlang:cancel_timer(Tref),
+ {ok, InCheck#health_check{interval_tref = undefined, check_interval = infinity}};
-schedule_health_check(Id, CheckRec, Interval) ->
- application:unset_env(riak_core, {health_check_interval, Id}),
- CheckRec1 = CheckRec#health_check{health_failures = 0, check_interval = Interval},
- schedule_health_check(Id, CheckRec1, undefined).
+health_fsm(check_health, Service, InCheck) ->
+ InCheck1 = start_health_check(Service, InCheck),
+ {ok, InCheck1};
-handle_health_check_exit(Id, Cause, State) ->
- case orddict:find(Id, State#state.health_checks) of
- error ->
- State;
- {ok, CheckRec} ->
- handle_health_check_exit(Id, Cause, CheckRec, State)
- end.
+health_fsm(remove, _Service, #health_check{interval_tref = Tref}) ->
+ erlang:cancel_timer(Tref),
+ ok;
-% all is well
-handle_health_check_exit(Id, normal, #health_check{health_failures = 0} = CheckRec, State) ->
- CheckRec2 = schedule_health_check(Id, CheckRec#health_check{checking_pid = undefined, callback_failures = 0}),
- Checks = orddict:store(Id, CheckRec2, State#state.health_checks),
- State#state{health_checks = Checks};
-
-% recovery before service_down
-handle_health_check_exit(Id, normal, #health_check{health_failures = N, max_health_failures = M} = CheckRec, State) when N < M ->
- CheckRec2 = CheckRec#health_check{health_failures = 0, checking_pid = undefined, callback_failures = 0},
- CheckRec3 = schedule_health_check(Id, CheckRec2),
- Checks = orddict:store(Id, CheckRec3),
- State#state{health_checks = Checks};
-
-% recovery after service_down
-handle_health_check_exit(Id, normal, #health_check{health_failures = N} = CheckRec, State) when N > 0 ->
- CheckRec2 = CheckRec#health_check{health_failures = 0, checking_pid = undefined, callback_failures = 0},
- {reply, _, State2} = handle_call({service_up, Id, CheckRec2#health_check.service_pid}, "from", State),
- CheckRec3 = schedule_health_check(Id, CheckRec2),
- Checks = orddict:store(Id, CheckRec3),
- State2#state{health_checks = Checks};
-
-% healthy failure reaches threshold
-handle_health_check_exit(Id, false, #health_check{health_failures = N, max_health_failures = M} = CheckRec, State) when N + 1 == M ->
- CheckRec2 = CheckRec#health_check{health_failures = N + 1, checking_pid = undefined, callback_failures = 0},
- {reply, _, State2} = handle_call({service_down, Id}, "from", State),
- CheckRec3 = schedule_health_check(Id, CheckRec2),
- Checks = orddict:store(Id, CheckRec3, State2#state.health_checks),
- State2#state{health_checks = Checks};
-
-% healthy failure while service_down
-handle_health_check_exit(Id, false, #health_check{health_failures = N, max_health_failures = M} = CheckRec, State) when N >= M ->
- CheckRec2 = CheckRec#health_check{health_failures = N + 1, checking_pid = undefined, callback_failures = 0},
- CheckRec3 = schedule_health_check(Id, CheckRec2),
- Checks = orddict:store(Id, CheckRec3),
- State#state{health_checks = Checks};
-
-% healyth failure while service_up, but below threshold
-handle_health_check_exit(Id, false, #health_check{health_failures = N, max_health_failures = M} = CheckRec, State) when N < M ->
- CheckRec2 = CheckRec#health_check{health_failures = N + 1, checking_pid = undefined, callback_failures = 0},
- CheckRec3 = schedule_health_check(Id, CheckRec2),
- Checks = orddict:store(Id, CheckRec3),
- State#state{health_checks = Checks};
-
-% callback fails too many times.
-handle_health_check_exit(Id, Cause, #health_check{callback_failures = N, max_callback_failures = M}, State) when N + 1 == M ->
- lager:warning("health check function for ~p errored: ~p", [Id, Cause]),
- lager:warning("health check function for ~p failed too many times, removing it", [Id]),
- Checks = orddiect:erase(Id, State#state.health_checks),
- State#state{health_checks = Checks};
-
-% callback fails
-handle_health_check_exit(Id, Cause, CheckRec, State) ->
- lager:warning("health check function for ~p errored: ~p", [Id, Cause]),
- #health_check{callback_failures = N} = CheckRec,
- CheckRec2 = CheckRec#health_check{checking_pid = undefined, callback_failures = N + 1},
- CheckRec3 = schedule_health_check(Id, CheckRec2),
- Checks = orddict:store(Id, CheckRec3, State#state.health_checks),
- State#state{health_checks = Checks}.
+% fallthrough handling
+health_fsm(Msg, _Service, Health) ->
+ {ok, Health}.
+
+start_health_check(Service, #health_check{checking_pid = undefined} = CheckRec) ->
+ {Mod, Func, Args} = CheckRec#health_check.callback,
+ Pid = CheckRec#health_check.service_pid,
+ Tref = CheckRec#health_check.interval_tref,
+ erlang:cancel_timer(Tref),
+ CheckingPid = proc_lib:spawn_link(fun() ->
+ case erlang:apply(Mod, Func, [Pid | Args]) of
+ true -> ok;
+ false -> exit(false);
+ Else -> exit(Else)
+ end
+ end),
+ erlang:put(CheckingPid, Service),
+ CheckRec#health_check{checking_pid = CheckingPid, interval_tref = undefined};
+start_health_check(_Service, Check) ->
+ Check.
determine_time(Failures, BaseInterval) when Failures < 4 ->
BaseInterval;
@@ -638,34 +707,3 @@ determine_time(Failures, BaseInterval) when Failures < 11 ->
determine_time(Failures, BaseInterval) when Failures > 10 ->
BaseInterval * 20.
-
-trigger_health_check(_Id, #health_check{checking_pid = Pid} = R, _) when is_pid(Pid) ->
- R;
-
-trigger_health_check(Id, #health_check{interval_tref = undefined} = Rec, _) ->
- #health_check{callback = MFA, service_pid = ServPid} = Rec,
- {Mod, Fun, Args} = MFA,
- Pid = proc_lib:spawn_link(Mod, Fun, [ServPid | Args]),
- erlang:put(Pid, Id),
- Rec#health_check{checking_pid = Pid};
-
-trigger_health_check(_Id, Rec, false) ->
- Rec;
-
-trigger_health_check(Id, Rec, true) ->
- % if the message is already delivered, it should be okay.
- erlang:cancel_timer(Rec#health_check.interval_tref),
- #health_check{callback = MFA, service_pid = ServPid} = Rec,
- {Mod, Fun, Args} = MFA,
- CheckFun = fun() ->
- Out = erlang:apply(Mod,Fun,[ServPid | Args]),
- if
- Out ->
- ok;
- true ->
- exit(Out)
- end
- end,
- Pid = proc_lib:spawn_link(CheckFun),
- erlang:put(Pid, Id),
- Rec#health_check{checking_pid = Pid, interval_tref = undefined}.
View
101 test/node_watcher_qc.erl
@@ -108,9 +108,9 @@ command(S) ->
{call, ?MODULE, wait_for_bcast, []},
{call, ?MODULE, healthy_service, [g_service()]},
{call, ?MODULE, unhealthy_service, [g_service()]},
- {call, ?MODULE, recovering_service, [g_service()]},
- {call, ?MODULE, faulty_health_check_callback_once, [g_service()]},
- {call, ?MODULE, faulty_health_check_callback_many, [g_service()]}
+ {call, ?MODULE, recovering_service, [g_service()]}%,
+ %{call, ?MODULE, faulty_health_check_callback_once, [g_service()]},
+ %{call, ?MODULE, faulty_health_check_callback_many, [g_service()]}
]).
precondition(S, {call, _, local_service_kill, [Service, S]}) ->
@@ -161,19 +161,12 @@ next_state(S, Res, {call, M, unhealthy_service, [Service]}) ->
S;
next_state(S, Res, {call, M, recovering_service, [Service]}) ->
- meck:sequence(mod_health, callback, 2, [false, true]),
- next_state(S, Res, {call, M, local_service_up, [Service]});
+ S2 = service_up(node(), Service, S),
+ Pids = orddict:store(Service, Res, S2#state.service_pids),
+ S2#state { service_pids = Pids };
next_state(S, Res, {call, M, faulty_health_check_callback_once, [Service]}) ->
- meck:expect(mod_health, callback, fun(Pid, error_once) when is_pid(Pid) ->
- case put(meck_boomed, true) of
- false ->
- meck:exception(meck_boomed);
- true ->
- true
- end
- end),
- next_state(S, Res, {call, M, local_service_up, [Service]});
+ S;
next_state(S, Res, {call, M, faulty_health_check_callback_many, [Service]}) ->
meck:expect(mod_health, callback, fun(Pid, error) when is_pid(Pid) ->
@@ -255,16 +248,17 @@ postcondition(S, {call, _, healthy_service, [Service]}, _Res) ->
postcondition(S, {call, _, unhealthy_service, [Service]}, Res) ->
S2 = service_up(node(), Service, S),
S3 = service_down(node(), Service, S2),
- validate_broadcasts([S,S2,S3], service),
+ validate_broadcasts([S,S2,S3,S3], service),
?assert(meck:validate(mod_health)),
deep_validate(S3);
postcondition(S, {call, _, recovering_service, [Service]}, _Res) ->
- S2 = service_down(node(), Service, S),
- S3 = service_up(node(), Service, S2),
- validate_broadcasts([S, S2, S3], Service),
+ S2 = service_up(node(), Service, S),
+ S3 = service_down(node(), Service, S2),
+ S4 = service_up(node(), Service, S3),
+ validate_broadcasts([S, S2, S3, S4], service),
?assert(meck:validate(mod_health)),
- deep_validate(S3);
+ deep_validate(S4);
postcondition(S, {call, _, faulty_health_check_callback_once, [_Service]}, _Res) ->
?assert(meck:validate(mod_health)),
@@ -331,15 +325,29 @@ validate_broadcast(S0, Sfinal, Op, Bcasts) ->
validate_broadcasts(States, Op) ->
Bcasts = broadcasts(),
- ?assertEqual(length(Bcasts) + 1, length(States)),
+ ?debugFmt("validate broadcats:\n"
+ " broadcasts: ~p\n"
+ " states: ~p", [Bcasts, States]),
validate_broadcasts(States, Op, Bcasts).
-validate_broadcasts([S0, Sfinal], Op, [Bcast]) ->
- validate_broadcast(S0, Sfinal, Op, [Bcast]);
+validate_broadcasts([], _Op, []) ->
+ true;
+
+validate_broadcasts([_S], _Op, _Bcasts) ->
+ true;
+
+validate_broadcasts([S0, Sfinal | Tail], Op, []) ->
+ Transition = {is_node_up(node(), S0), is_node_up(node(), Sfinal), Op},
+ ?assertMatch({false, false, Op}, Transition),
+ validate_broadcasts([Sfinal | Tail], Op, []);
+
+validate_broadcasts([], _Op, Bcasts) ->
+ % there should be at least length(Bcasts) + 1 states to check.
+ ?assertEqual([], Bcasts);
-validate_broadcasts([S0, Sfinal | STail], Op, [Bcast | BTail]) ->
- true = validate_broadcast(S0, Sfinal, Op, [Bcast]),
- validate_broadcasts([Sfinal | STail], Op, BTail).
+validate_broadcasts([S0, Sfinal | Tail], Op, [Bcast | BcastTail]) ->
+ validate_broadcast(S0, Sfinal, Op, [Bcast]),
+ validate_broadcasts([Sfinal | Tail], Op, BcastTail).
%% ====================================================================
@@ -422,7 +430,7 @@ healthy_service(Service) ->
true
end),
ok = riak_core_node_watcher:service_up(Service, Pid, {mod_health, callback, [true]}),
- gen_server:cast(riak_core_node_watcher, {force_health_check, Service}),
+ riak_core_node_watcher ! {check_health, Service},
receive
{meck_done, MeckInPid} ->
?assertEqual(Pid, MeckInPid)
@@ -439,22 +447,51 @@ unhealthy_service(Service) ->
false
end),
ok = riak_core_node_watcher:service_up(Service, Pid, {mod_health, callback, [false]}),
- gen_server:cast(riak_core_node_watcher, {force_health_check, Service}),
+ riak_core_node_watcher ! {check_health, Service},
receive
{meck_done, MeckInPid} ->
?assertEqual(Pid, MeckInPid)
after 1000 ->
erlang:error(timeout)
end,
- % give the broadcast for service down to be handled.
- timer:sleep(50),
+ wait_for_bcast(),
Pid.
recovering_service(Service) ->
Pid = spawn(fun() -> service_loop() end),
- ok = riak_core_node_watcher:service_up(Service, Pid, {mod_health, callback, [false_once]}),
- gen_server:cast(riak_core_node_watcher, {force_health_check, Service}),
- gen_server:cast(riak_core_node_watcher, {force_health_check, Service}),
+ meck:new(flip_flop),
+ meck:expect(flip_flop, init, 1, {ok, false}),
+ meck:expect(flip_flop, handle_call, fun (next, _, S) ->
+ {reply, S, not S}
+ end),
+ meck:expect(flip_flop, next, fun(Pid) ->
+ gen_server:call(Pid, next)
+ end),
+ {ok, Flipper} = gen_server:start(flip_flop, [], []),
+ Self = self(),
+ meck:expect(mod_health, callback, fun(_Pid, _Args) ->
+ Out = flip_flop:next(Flipper),
+ Self ! meck_done,
+ Out
+ end),
+ ok = riak_core_node_watcher:service_up(Service, Pid, {mod_health, callback, [meck_boom]}),
+ riak_core_node_watcher ! {check_health, Service},
+ receive
+ meck_done ->
+ ok
+ after 1000 ->
+ meck:unload(flip_flop),
+ erlang:error(timeout)
+ end,
+ riak_core_node_watcher ! {check_health, Service},
+ receive
+ meck_done ->
+ ok
+ after 1000 ->
+ meck:unload(flip_flop),
+ erlang:error(timeout)
+ end,
+ meck:unload(flip_flop),
Pid.
faulty_health_check_callback_once(Service) ->

0 comments on commit 7190e91

Please sign in to comment.
Something went wrong with that request. Please try again.