Skip to content
Browse files

Merge pull request #240 from basho/issue_388

Extend riak_core_node_watcher to support the registration of health
check logic that monitors a given service, automatically marking the
service as down when unhealthy and back-up when healthy.
  • Loading branch information...
2 parents e23dce9 + 3ac5e32 commit 08f1f29a5a1d3022a0950083bcd740b7d78d313f @jtuple jtuple committed Dec 5, 2012
Showing with 513 additions and 32 deletions.
  1. +350 −28 src/riak_core_node_watcher.erl
  2. +163 −4 test/node_watcher_qc.erl
View
378 src/riak_core_node_watcher.erl
@@ -23,10 +23,15 @@
-behaviour(gen_server).
+-define(DEFAULT_HEALTH_CHECK_INTERVAL, 60000).
%% API
-export([start_link/0,
service_up/2,
+ service_up/3,
+ service_up/4,
+ check_health/1,
service_down/1,
+ service_down/2,
node_up/0,
node_down/0,
services/0, services/1,
@@ -39,11 +44,25 @@
-record(state, { status = up,
services = [],
+ health_checks = [],
peers = [],
avsn = 0,
bcast_tref,
bcast_mod = {gen_server, abcast}}).
+-record(health_check, { state = 'waiting' :: 'waiting' | 'checking' | 'suspend',
+ callback :: mfa(),
+ service_pid :: pid(),
+ checking_pid :: pid(),
+ health_failures = 0 :: non_neg_integer(),
+ callback_failures = 0 :: non_neg_integer(),
+ interval_tref,
+ %% how many milliseconds to wait after a check has
+ %% finished before starting a new one
+ check_interval = ?DEFAULT_HEALTH_CHECK_INTERVAL :: timeout(),
+ max_callback_failures = 3,
+ max_health_failures = 1 }).
+
%% ===================================================================
%% Public API
@@ -55,9 +74,60 @@ start_link() ->
service_up(Id, Pid) ->
gen_server:call(?MODULE, {service_up, Id, Pid}, infinity).
+%% @doc {@link service_up/4} with default options.
+%% @see service_up/4
+-spec service_up(Id :: atom(), Pid :: pid(), MFA :: mfa()) -> 'ok'.
+service_up(Id, Pid, MFA) ->
+ service_up(Id, Pid, MFA, []).
+
+-type hc_check_interval_opt() :: {check_interval, timeout()}.
+-type hc_max_callback_fails_opt() :: {max_callback_failures, non_neg_integer()}.
+-type hc_max_health_fails_opt() :: {max_health_failures, non_neg_integer()}.
+-type health_opt() :: hc_check_interval_opt() |
+ hc_max_callback_fails_opt() |
+ hc_max_health_fails_opt().
+-type health_opts() :: [health_opt()].
+%% @doc Create a service that can be declared up or down based on the
+%% result of a function in addition to usual monitoring. The function can
+%% be set to be called automatically every interval, or only explicitly.
+%% An explicit health check can be done using {@link check_health/1}. The
+%% check interval is expressed in milliseconds. If `infinity' is passed
+%% in, a check is never done automatically. The function used to check for
+%% health must return a boolean; if it does not, it is considered an error.
+%% A check has a default maximum health failures as 1, and maximum number
+%% of other callback errors as 3. Either of those being reached will cause
+%% the service to be marked as down. In the case of a health failure, the
+%% health function will continue to be called at increasing intervals. In
+%% the case of a callback error, the automatic health check is disabled.
+%% The callback function will have the pid of the service prepended to its
+%% list of args, so the actual arity of the function must be 1 + the length
+%% of the argument list provided. A service added this way is removed like
+%% any other, using {@link service_down/1}.
+%% @see service_up/2
+-spec service_up(Id :: atom(), Pid :: pid(), Callback :: mfa(),
+ Options :: health_opts()) -> 'ok'.
+service_up(Id, Pid, {Module, Function, Args}, Options) ->
+ gen_server:call(?MODULE,
+ {service_up, Id, Pid, {Module, Function, Args}, Options},
+ infinity).
+
+%% @doc Force a health check for the given service. If the service does
+%% not have a health check associated with it, this is ignored. Resets the
+%% automatic health check timer if there is one.
+%% @see service_up/4
+-spec check_health(Service :: atom()) -> 'ok'.
+check_health(Service) ->
+ ?MODULE ! {check_health, Service},
+ ok.
+
service_down(Id) ->
gen_server:call(?MODULE, {service_down, Id}, infinity).
+service_down(Id, true) ->
+ gen_server:call(?MODULE, {service_down, Id, health_check}, infinitiy);
+service_down(Id, false) ->
+ service_down(Id).
+
node_up() ->
gen_server:call(?MODULE, {node_status, up}, infinity).
@@ -111,42 +181,61 @@ handle_call(get_avsn, _From, State) ->
{reply, State#state.avsn, State};
handle_call({service_up, Id, Pid}, _From, State) ->
- %% Update the set of active services locally
- Services = ordsets:add_element(Id, State#state.services),
- S2 = State#state { services = Services },
+ %% remove any existing health checks
+ S2 = remove_health_check(Id, State),
- %% Remove any existing mrefs for this service
- delete_service_mref(Id),
+ S3 = add_service(Id, Pid, S2),
- %% Setup a monitor for the Pid representing this service
- Mref = erlang:monitor(process, Pid),
- erlang:put(Mref, Id),
- erlang:put(Id, Mref),
+ {reply, ok, S3};
- %% Update our local ETS table and broadcast
- S3 = local_update(S2),
- {reply, ok, update_avsn(S3)};
+handle_call({service_up, Id, Pid, MFA, Options}, From, State) ->
+ %% update the active set of services if needed.
+ {reply, _, State1} = handle_call({service_up, Id, Pid}, From, State),
+
+ State2 = remove_health_check(Id, State1),
+
+ %% install the health check
+ CheckInterval = proplists:get_value(check_interval, Options,
+ ?DEFAULT_HEALTH_CHECK_INTERVAL),
+ IntervalTref = case CheckInterval of
+ infinity -> undefined;
+ N -> erlang:send_after(N, self(), {check_health, Id})
+ end,
+ CheckRec = #health_check{
+ callback = MFA,
+ check_interval = CheckInterval,
+ service_pid = Pid,
+ max_health_failures = proplists:get_value(max_health_failures, Options, 1),
+ max_callback_failures = proplists:get_value(max_callback_failures, Options, 3),
+ interval_tref = IntervalTref
+ },
+ Healths = orddict:store(Id, CheckRec, State2#state.health_checks),
+ {reply, ok, State2#state{health_checks = Healths}};
handle_call({service_down, Id}, _From, State) ->
- %% Update the set of active services locally
- Services = ordsets:del_element(Id, State#state.services),
- S2 = State#state { services = Services },
+ %% Remove health check if any
+ S2 = remove_health_check(Id, State),
- %% Remove any existing mrefs for this service
- delete_service_mref(Id),
+ S3 = drop_service(Id, S2),
- %% Update local ETS table and broadcast
- S3 = local_update(S2),
- {reply, ok, update_avsn(S3)};
+ {reply, ok, S3};
handle_call({node_status, Status}, _From, State) ->
Transition = {State#state.status, Status},
S2 = case Transition of
{up, down} -> %% up -> down
- local_delete(State#state { status = down });
+ Healths = [begin
+ {ok, C1} = health_fsm(suspend, S, C),
+ {S, C1}
+ end || {S, C} <- State#state.health_checks],
+ local_delete(State#state { status = down, health_checks = Healths});
{down, up} -> %% down -> up
- local_update(State#state { status = up });
+ Healths = [begin
+ {ok, C1} = health_fsm(resume, S, C),
+ {S, C1}
+ end || {S, C} <- State#state.health_checks],
+ local_update(State#state { status = up, health_checks = Healths });
{Status, Status} -> %% noop
State
@@ -175,7 +264,6 @@ handle_cast({down, Node}, State) ->
node_down(Node, State),
{noreply, update_avsn(State)}.
-
handle_info({nodeup, _Node}, State) ->
%% Ignore node up events; nothing to do here...
{noreply, State};
@@ -196,13 +284,25 @@ handle_info({'DOWN', Mref, _, _Pid, _Info}, State) ->
%% Remove the id<->mref entries in the pdict
delete_service_mref(Id),
+ %% remove any health checks in place
+ S2 = remove_health_check(Id, State),
+
%% Update our list of active services and ETS table
Services = ordsets:del_element(Id, State#state.services),
- S2 = State#state { services = Services },
- local_update(S2),
- {noreply, update_avsn(S2)}
+ S3 = S2#state { services = Services },
+ local_update(S3),
+ {noreply, update_avsn(S3)}
end;
+handle_info({'EXIT', Pid, _Cause} = Msg, State) ->
+ Service = erlang:erase(Pid),
+ State2 = handle_check_msg(Msg, Service, State),
+ {noreply, State2};
+
+handle_info({check_health, Id}, State) ->
+ State2 = handle_check_msg(check_health, Id, State),
+ {noreply, State2};
+
handle_info({gen_event_EXIT, _, _}, State) ->
%% Ring event handler has been removed for some reason; re-register
watch_for_ring_events(),
@@ -281,8 +381,8 @@ is_node_up(Node) ->
node_up(Node, Services, State) ->
case is_peer(Node, State) of
true ->
- %% Before we alter the ETS table, see if this node was previously down. In
- %% that situation, we'll go ahead broadcast out.
+ %% Before we alter the ETS table, see if this node was previously
+ %% down. In that situation, we'll go ahead and broadcast out.
S2 = case is_node_up(Node) of
false ->
broadcast([Node], State);
@@ -423,3 +523,225 @@ internal_get_nodes(Service) ->
[] ->
[]
end.
+
+add_service(ServiceId, Pid, State) ->
+ %% Update the set of active services locally
+ Services = ordsets:add_element(ServiceId, State#state.services),
+ S2 = State#state { services = Services },
+
+ %% Remove any existing mrefs for this service
+ delete_service_mref(ServiceId),
+
+ %% Setup a monitor for the Pid representing this service
+ Mref = erlang:monitor(process, Pid),
+ erlang:put(Mref, ServiceId),
+ erlang:put(ServiceId, Mref),
+
+ %% Update our local ETS table and broadcast
+ S3 = local_update(S2),
+ update_avsn(S3).
+
+drop_service(ServiceId, State) ->
+ %% Update the set of active services locally
+ Services = ordsets:del_element(ServiceId, State#state.services),
+ S2 = State#state { services = Services },
+
+ %% Remove any existing mrefs for this service
+ delete_service_mref(ServiceId),
+
+ %% Update local ETS table and broadcast
+ S3 = local_update(S2),
+
+ update_avsn(S3).
+
+handle_check_msg(_Msg, undefined, State) ->
+ State;
+handle_check_msg(_Msg, _ServiceId, #state{status = down} = State) ->
+ %% most likely a late message
+ State;
+handle_check_msg(Msg, ServiceId, State) ->
+ case orddict:find(ServiceId, State#state.health_checks) of
+ error ->
+ State;
+ {ok, Check} ->
+ CheckReturn = health_fsm(Msg, ServiceId, Check),
+ handle_check_return(CheckReturn, ServiceId, State)
+ end.
+
+handle_check_return({remove, _Check}, ServiceId, State) ->
+ Healths = orddict:erase(ServiceId, State#state.health_checks),
+ State#state{health_checks = Healths};
+handle_check_return({ok, Check}, ServiceId, State) ->
+ Healths = orddict:store(ServiceId, Check, State#state.health_checks),
+ State#state{health_checks = Healths};
+handle_check_return({up, Check}, ServiceId, State) ->
+ #health_check{service_pid = Pid} = Check,
+ Healths = orddict:store(ServiceId, Check, State#state.health_checks),
+ S2 = State#state{health_checks = Healths},
+ add_service(ServiceId, Pid, S2);
+handle_check_return({down, Check}, ServiceId, State) ->
+ Healths = orddict:store(ServiceId, Check, State#state.health_checks),
+ S2 = State#state{health_checks = Healths},
+ drop_service(ServiceId, S2).
+
+remove_health_check(ServiceId, State) ->
+ #state{health_checks = Healths} = State,
+ Healths2 = case orddict:find(ServiceId, Healths) of
+ error ->
+ Healths;
+ {ok, Check} ->
+ health_fsm(remove, ServiceId, Check),
+ orddict:erase(ServiceId, Healths)
+ end,
+ State#state{health_checks = Healths2}.
+
+%% health checks are an fsm to make mental modeling easier.
+%% There are 3 states:
+%% waiting: in between check intervals
+%% suspend: Check interval disabled
+%% checking: health check in progress
+%% messages to handle:
+%% go dormant
+%% do a scheduled health check
+%% remove health check
+%% health check finished
+
+health_fsm(Msg, Service, #health_check{state = StateName} = Check) ->
+ {Reply, NextState, Check2} = health_fsm(StateName, Msg, Service, Check),
+ Check3 = Check2#health_check{state = NextState},
+ {Reply, Check3}.
+
+%% suspend state
+health_fsm(suspend, resume, Service, InCheck) ->
+ #health_check{health_failures = N, check_interval = V} = InCheck,
+ Tref = next_health_tref(N, V, Service),
+ OutCheck = InCheck#health_check{
+ interval_tref = Tref
+ },
+ {ok, waiting, OutCheck};
+
+health_fsm(suspend, remove, _Service, InCheck) ->
+ {remove, suspend, InCheck};
+
+%% message handling when checking state
+health_fsm(checking, suspend, _Service, InCheck) ->
+ #health_check{checking_pid = Pid} = InCheck,
+ erlang:erase(Pid),
+ {ok, suspend, InCheck#health_check{checking_pid = undefined}};
+
+health_fsm(checking, check_health, _Service, InCheck) ->
+ {ok, checking, InCheck};
+
+health_fsm(checking, remove, _Service, InCheck) ->
+ {remove, checking, InCheck};
+
+health_fsm(checking, {'EXIT', Pid, Cause}, Service, #health_check{checking_pid = Pid} = InCheck)
+ when Cause == normal; Cause == false ->
+ %% correct exits of checking pid
+ #health_check{health_failures = HPFails, max_health_failures = HPMaxFails} = InCheck,
+ {Reply, HPFails1} = handle_fsm_exit(Cause, HPFails, HPMaxFails),
+ Tref = next_health_tref(HPFails1, InCheck#health_check.check_interval, Service),
+ OutCheck = InCheck#health_check{
+ checking_pid = undefined,
+ health_failures = HPFails1,
+ callback_failures = 0,
+ interval_tref = Tref
+ },
+ {Reply, waiting, OutCheck};
+
+health_fsm(checking, {'EXIT', Pid, Cause}, Service, #health_check{checking_pid = Pid} = InCheck) ->
+ lager:error("health check process for ~p error'ed: ~p", [Service, Cause]),
+ Fails = InCheck#health_check.callback_failures + 1,
+ if
+ Fails == InCheck#health_check.max_callback_failures ->
+ lager:error("health check callback for ~p failed too "
+ "many times, disabling.", [Service]),
+ {down, suspend, InCheck#health_check{checking_pid = undefined,
+ callback_failures = Fails}};
+ Fails < InCheck#health_check.max_callback_failures ->
+ #health_check{health_failures = N, check_interval = Inter} = InCheck,
+ Tref = next_health_tref(N, Inter, Service),
+ OutCheck = InCheck#health_check{checking_pid = undefined,
+ callback_failures = Fails, interval_tref = Tref},
+ {ok, waiting, OutCheck};
+ true ->
+ %% likely a late message, or a faker
+ {ok, suspend, InCheck#health_check{checking_pid = undefined,
+ callback_failures = Fails}}
+ end;
+
+%% message handling when in a waiting state
+health_fsm(waiting, suspend, _Service, InCheck) ->
+ case InCheck#health_check.interval_tref of
+ undefined -> ok;
+ _ -> erlang:cancel_timer(InCheck#health_check.interval_tref)
+ end,
+ {ok, suspend, InCheck#health_check{interval_tref = undefined}};
+
+health_fsm(waiting, check_health, Service, InCheck) ->
+ InCheck1 = start_health_check(Service, InCheck),
+ {ok, checking, InCheck1};
+
+health_fsm(waiting, remove, _Service, InCheck) ->
+ case InCheck#health_check.interval_tref of
+ undefined -> ok;
+ Tref -> erlang:cancel_timer(Tref)
+ end,
+ OutCheck = InCheck#health_check{interval_tref = undefined},
+ {remove, waiting, OutCheck};
+
+%% fallthrough handling
+health_fsm(_Msg, StateName, _Service, Health) ->
+ {ok, StateName, Health}.
+
+handle_fsm_exit(normal, HPFails, MaxHPFails) when HPFails >= MaxHPFails ->
+ %% service was failed, but recovered
+ {up, 0};
+
+handle_fsm_exit(normal, HPFails, MaxHPFails) when HPFails < MaxHPFails ->
+ %% service never fully failed
+ {ok, 0};
+
+handle_fsm_exit(false, HPFails, MaxHPFails) when HPFails + 1 == MaxHPFails ->
+ %% service has failed enough to go down
+ {down, HPFails + 1};
+
+handle_fsm_exit(false, HPFails, __) ->
+ %% all other cases handled, this is health continues to fail
+ {ok, HPFails + 1}.
+
+start_health_check(Service, #health_check{checking_pid = undefined} = CheckRec) ->
+ {Mod, Func, Args} = CheckRec#health_check.callback,
+ Pid = CheckRec#health_check.service_pid,
+ case CheckRec#health_check.interval_tref of
+ undefined -> ok;
+ Tref -> erlang:cancel_timer(Tref)
+ end,
+ CheckingPid = proc_lib:spawn_link(fun() ->
+ case erlang:apply(Mod, Func, [Pid | Args]) of
+ true -> ok;
+ false -> exit(false);
+ Else -> exit(Else)
+ end
+ end),
+ erlang:put(CheckingPid, Service),
+ CheckRec#health_check{state = checking,
+ checking_pid = CheckingPid,
+ interval_tref = undefined};
+start_health_check(_Service, Check) ->
+ Check.
+
+next_health_tref(_, infinity, _) ->
+ undefined;
+next_health_tref(N, V, Service) ->
+ Time = determine_time(N, V),
+ erlang:send_after(Time, self(), {check_health, Service}).
+
+determine_time(Failures, BaseInterval) when Failures < 4 ->
+ BaseInterval;
+
+determine_time(Failures, BaseInterval) when Failures < 11 ->
+ erlang:trunc(BaseInterval * (math:pow(Failures, 1.3)));
+
+determine_time(Failures, BaseInterval) when Failures > 10 ->
+ BaseInterval * 20.
View
167 test/node_watcher_qc.erl
@@ -32,6 +32,7 @@
-record(state, { up_nodes = [],
services = [],
service_pids = [],
+ service_healths = [],
peers = []}).
-define(QC_OUT(P),
@@ -53,6 +54,9 @@ prop_main() ->
riak_core_ring_events:start_link(),
riak_core_node_watcher_events:start_link(),
+ %% meck used for health watch / threshold
+ meck:new(mod_health),
+
?FORALL(Cmds, commands(?MODULE),
begin
%% Setup ETS table to recv broadcasts
@@ -101,31 +105,44 @@ command(S) ->
{call, ?MODULE, remote_service_up, [g_node(), g_services()]},
{call, ?MODULE, remote_service_down, [g_node()]},
{call, ?MODULE, remote_service_down_disterl, [g_node()]},
- {call, ?MODULE, wait_for_bcast, []}
+ {call, ?MODULE, wait_for_bcast, []},
+ {call, ?MODULE, health_service, [g_service()]},
+ {call, ?MODULE, health_service_defaults, [g_service()]},
+ {call, ?MODULE, health_service_up, [g_service(), S]},
+ {call, ?MODULE, health_service_down, [g_service(), S]},
+ {call, ?MODULE, health_service_error, [g_service(), S]}
]).
precondition(S, {call, _, local_service_kill, [Service, S]}) ->
orddict:is_key(Service, S#state.service_pids);
precondition(S, {call, _, wait_for_bcast, _}) ->
is_node_up(node(), S);
+precondition(S, {call, _, Test, [Service, S]}) when
+ Test =:= health_service_up;
+ Test =:= health_service_down;
+ Test =:= health_service_error ->
+ is_node_up(node(), S) andalso lists:member(Service, S#state.service_healths);
precondition(_, _) ->
true.
next_state(S, Res, {call, _, local_service_up, [Service]}) ->
S2 = service_up(node(), Service, S),
Pids = orddict:store(Service, Res, S2#state.service_pids),
- S2#state { service_pids = Pids };
+ Healths = orddict:erase(Service, S2#state.service_healths),
+ S2#state { service_pids = Pids, service_healths = Healths };
next_state(S, _Res, {call, _, local_service_down, [Service]}) ->
S2 = service_down(node(), Service, S),
Pids = orddict:erase(Service, S2#state.service_pids),
- S2#state { service_pids = Pids };
+ Healths = orddict:erase(Service, S2#state.service_healths),
+ S2#state { service_pids = Pids, service_healths = Healths };
next_state(S, _Res, {call, _, local_service_kill, [Service, _]}) ->
S2 = service_down(node(), Service, S),
Pids = orddict:erase(Service, S2#state.service_pids),
- S2#state { service_pids = Pids };
+ Healths = orddict:erase(Service, S2#state.service_healths),
+ S2#state { service_pids = Pids, service_healths = Healths };
next_state(S, _Res, {call, _, local_node_up, []}) ->
node_up(node(), S);
@@ -143,12 +160,41 @@ next_state(S, _Res, {call, _, Fn, [Node]})
next_state(S, _Res, {call, _, wait_for_bcast, _}) ->
S;
+next_state(S, Res, {call, _, HPService, [Service]}) when HPService =:= health_service; HPService =:= health_service_defaults ->
+ S2 = service_up(node(), Service, S),
+ Pids = orddict:store(Service, Res, S2#state.service_pids),
+ Healths = orddict:store(Service, Res, S2#state.service_healths),
+ S2#state { service_pids = Pids, service_healths = Healths};
+
+%next_state(S, Res, {call, _, health_service_defaults, [Service]}) ->
+% S2 = service_up(node(), Service, S),
+% Pids = orddict:store(Service, Res, S2#state.service_pids),
+% Healths = orddict:store(Service, Res, S2#state.service_healths),
+% S2#state { service_pids = Pids, service_healths = Healths};
+
+next_state(S, _Res, {call, _, health_service_up, [Service, _]}) ->
+ S2 = service_up(node(), Service, S),
+ Pid = orddict:fetch(Service, S2#state.service_healths),
+ Pids = orddict:store(Service, Pid, S2#state.service_pids),
+ S2#state { service_pids = Pids };
+
+next_state(S, _Res, {call, _, health_service_down, [Service, _]}) ->
+ S2 = service_down(node(), Service, S),
+ Pids = orddict:erase(Service, S2#state.service_pids),
+ S2#state { service_pids = Pids };
+
+next_state(S, _Res, {call, _, health_service_error, [Service, _]}) ->
+ S2 = service_down(node(), Service, S),
+ Pids = orddict:erase(Service, S2#state.service_pids),
+ S2#state { service_pids = Pids };
+
next_state(S, _Res, {call, _, ring_update, [Nodes]}) ->
Peers = ordsets:del_element(node(), ordsets:from_list(Nodes)),
peer_filter(S#state { peers = Peers }).
+
postcondition(S, {call, _, local_service_up, [Service]}, _Res) ->
S2 = service_up(node(), Service, S),
validate_broadcast(S, S2, service),
@@ -207,6 +253,39 @@ postcondition(S, {call, _, Fn, [Node]}, _Res)
postcondition(S, {call, _, wait_for_bcast, _}, _Res) ->
validate_broadcast(S, S, service);
+postcondition(S, {call, _, health_service, [Service]}, _Res) ->
+ S2 = service_up(node(), Service, S),
+ validate_broadcast(S, S2, service),
+ deep_validate(S2);
+
+postcondition(S, {call, _, health_service_defaults, [Service]}, _Res) ->
+ S2 = service_up(node(), Service, S),
+ validate_broadcast(S, S2, service),
+ deep_validate(S2);
+
+postcondition(S, {call, _, health_service_up, [Service, _]}, _Res) ->
+ case is_service_up(node(), Service, S) of
+ true ->
+ deep_validate(S);
+ false ->
+ S2 = service_up(node(), Service, S),
+ ?assert(meck:validate(mod_health)),
+ validate_broadcast(S, S2, service),
+ deep_validate(S2)
+ end;
+
+postcondition(S, {call, _, health_service_down, [Service, _]}, _Res) ->
+ S2 = service_down(node(), Service, S),
+ ?assert(meck:validate(mod_health)),
+ validate_broadcast(S, S2, service),
+ deep_validate(S2);
+
+postcondition(S, {call, _, health_service_error, [Service, _]}, _Res) ->
+ S2 = service_down(node(), Service, S),
+ ?assert(meck:validate(mod_health)),
+ validate_broadcast(S, S2, service),
+ deep_validate(S2);
+
postcondition(S, {call, _, ring_update, [Nodes]}, _Res) ->
%% Ring update should generate a broadcast to all NEW peers
Bcasts = broadcasts(),
@@ -240,6 +319,9 @@ deep_validate(S) ->
validate_broadcast(S0, Sfinal, Op) ->
Bcasts = broadcasts(),
+ validate_broadcast(S0, Sfinal, Op, Bcasts).
+
+validate_broadcast(S0, Sfinal, Op, Bcasts) ->
Transition = {is_node_up(node(), S0), is_node_up(node(), Sfinal), Op},
ExpPeers = Sfinal#state.peers,
case Transition of
@@ -276,7 +358,11 @@ g_services() ->
g_ring_nodes() ->
vector(app_helper:get_env(riak_core, ring_creation_size),
oneof([node(), 'n1@127.0.0.1', 'n2@127.0.0.1', 'n3@127.0.0.1'])).
+g_service_threshold() ->
+ [g_service(), {nw_mecked_thresher, health_check, [oneof([true, false, kill, error])]}].
+g_services_threshold() ->
+ [g_service_threshold() | list(g_service_threshold())].
%% ====================================================================
%% Calls
@@ -328,6 +414,65 @@ ring_update(Nodes) ->
wait_for_avsn(Avsn0),
?ORDSET(Nodes).
+health_service(Service) ->
+ Avsn0 = riak_core_node_watcher:avsn(),
+ Pid = spawn(fun() -> service_loop() end),
+ ok = riak_core_node_watcher:service_up(Service, Pid, {mod_health, callback, [Pid]}, [{max_callback_failures, 1}, {check_interval, infinity}]),
+ wait_for_avsn(Avsn0),
+ Pid.
+
+health_service_defaults(Service) ->
+ Avsn0 = riak_core_node_watcher:avsn(),
+ Pid = spawn(fun() -> service_loop() end),
+ ok = riak_core_node_watcher:service_up(Service, Pid, {mod_health, callback, [Pid]}),
+ wait_for_avsn(Avsn0),
+ Pid.
+
+health_service_up(Service, S) ->
+ Self = self(),
+ health_meck(fun(P1, P2) ->
+ ?assertEqual(P1, P2),
+ Self ! meck_done,
+ true
+ end),
+ Avsn0 = riak_core_node_watcher:avsn(),
+ riak_core_node_watcher:check_health(Service),
+ receive meck_done -> ok after 100 -> erlang:error(timeout) end,
+ case is_service_up(node(), Service, S) of
+ true -> ok;
+ false -> wait_for_avsn(Avsn0)
+ end.
+
+health_service_down(Service, S) ->
+ Self = self(),
+ health_meck(fun(P1, P2) ->
+ ?assertEqual(P1, P2),
+ Self ! meck_done,
+ false
+ end),
+ Avsn0 = riak_core_node_watcher:avsn(),
+ riak_core_node_watcher:check_health(Service),
+ receive meck_done -> ok after 100 -> erlang:error(timeout) end,
+ case is_service_up(node(), Service, S) of
+ true -> wait_for_avsn(Avsn0);
+ false -> ok
+ end.
+
+health_service_error(Service, S) ->
+ Self = self(),
+ health_meck(fun(P1, P2) ->
+ ?assertEqual(P1, P2),
+ Self ! meck_done,
+ meck:exception(badarg)
+ end),
+ Avsn0 = riak_core_node_watcher:avsn(),
+ riak_core_node_watcher:check_health(Service),
+ receive meck_done -> ok after 100 -> erlang:error(timeout) end,
+ case is_service_up(node(), Service, S) of
+ true -> wait_for_avsn(Avsn0);
+ false -> ok
+ end.
+
%% ====================================================================
%% State functions
@@ -352,6 +497,10 @@ service_down(Node, Svc, S) ->
S#state { services = ordsets:del_element({Node, Svc}, S#state.services) }.
+is_service_up(Node, Service, S) ->
+ Services = services(Node, S),
+ lists:member(Service, Services).
+
is_node_up(Node, S) ->
ordsets:is_element(Node, S#state.up_nodes).
@@ -414,6 +563,16 @@ all_services(Node, S) ->
%% Internal functions
%% ====================================================================
+health_meck(Fun) ->
+ try meck:new(mod_health) of
+ _ ->
+ ok
+ catch
+ error:{already_started, _} ->
+ ok
+ end,
+ meck:expect(mod_health, callback, Fun).
+
on_broadcast(Nodes, _Name, Msg) ->
Id = ets:update_counter(?MODULE, bcast_id, {2, 1}),
ets:insert_new(?MODULE, {Id, Msg, Nodes}).

0 comments on commit 08f1f29

Please sign in to comment.
Something went wrong with that request. Please try again.