Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Rough implementation of health check system

Tests fail, so not final.
  • Loading branch information...
commit d0bc513b321db5e8063f605c1451a40f80e6ef22 1 parent 81a6b17
@lordnull lordnull authored
Showing with 242 additions and 1 deletion.
  1. +242 −1 src/riak_core_node_watcher.erl
View
243 src/riak_core_node_watcher.erl
@@ -26,7 +26,10 @@
%% API
-export([start_link/0,
service_up/2,
+ service_up/3,
+ service_up/4,
service_down/1,
+ service_down/2,
node_up/0,
node_down/0,
services/0, services/1,
@@ -39,11 +42,24 @@
-record(state, { status = up,
services = [],
+ health_checks = [],
peers = [],
avsn = 0,
bcast_tref,
bcast_mod = {gen_server, abcast}}).
+-record(health_check, { callback :: mfa(),
+ service_pid :: pid(),
+ checking_pid :: pid(),
+ health_failures = 0 :: non_neg_integer(),
+ callback_failures = 0,
+ interval_tref,
+ % how many seconds to wait after a check has
+ % finished before starting a new one
+ check_interval = 60 :: timeout(),
+ max_callback_failures = 3,
+ max_health_failures = 1 }).
+
%% ===================================================================
%% Public API
@@ -55,9 +71,20 @@ start_link() ->
service_up(Id, Pid) ->
gen_server:call(?MODULE, {service_up, Id, Pid}, infinity).
+service_up(Id, Pid, MFA) ->
+ service_up(Id, Pid, MFA, []).
+
+service_up(Id, Pid, {Module, Function, Args}, Options) ->
+ gen_server:call(?MODULE, {service_up, Id, Pid, {Module, Function, Args}, Options}, infinity).
+
service_down(Id) ->
gen_server:call(?MODULE, {service_down, Id}, infinity).
+service_down(Id, true) ->
+ gen_server:call(?MODULE, {service_down, Id, health_check}, infintiy);
+service_down(Id, false) ->
+ service_down(Id).
+
node_up() ->
gen_server:call(?MODULE, {node_status, up}, infinity).
@@ -127,6 +154,29 @@ handle_call({service_up, Id, Pid}, _From, State) ->
S3 = local_update(S2),
{reply, ok, update_avsn(S3)};
+handle_call({service_up, Id, Pid, MFA, Options}, From, State) ->
+ %% update the active set of services if needed.
+ {reply, _, State1} = case erlang:get(Id) of
+ Pid ->
+ {reply, ok, State};
+ _OtherPid ->
+ handle_call({service_up, Id, Pid}, From, State)
+ end,
+
+ %% uninstall old health check
+ State2 = cancel_health_check(Id, State1),
+
+ %% install the health check
+ CheckInterval = proplists:get_value(check_interval, Options, 60),
+ CheckRec = #health_check{
+ callback = MFA,
+ check_interval = CheckInterval,
+ max_callback_failures = proplists:get_value(max_callback_failures, Options, 3),
+ interval_tref = erlang:send_after(CheckInterval * 1000, self(), {health_check, Id})
+ },
+ Healths = orddict:store(Id, CheckRec, State1#state.health_checks),
+ {reply, ok, State2#state{health_checks = Healths}};
+
handle_call({service_down, Id}, _From, State) ->
%% Update the set of active services locally
Services = ordsets:del_element(Id, State#state.services),
@@ -139,6 +189,10 @@ handle_call({service_down, Id}, _From, State) ->
S3 = local_update(S2),
{reply, ok, update_avsn(S3)};
+handle_call({service_down, Id, health_check}, From, State) ->
+ State1 = cancel_health_check(Id, State),
+ handle_call({service_down, Id}, From, State1);
+
handle_call({node_status, Status}, _From, State) ->
Transition = {State#state.status, Status},
S2 = case Transition of
@@ -173,8 +227,17 @@ handle_cast({up, Node, Services}, State) ->
handle_cast({down, Node}, State) ->
node_down(Node, State),
- {noreply, update_avsn(State)}.
+ {noreply, update_avsn(State)};
+handle_cast({force_health_check, Id}, State) ->
+ case orddict:find(Id, State#state.health_checks) of
+ error ->
+ {noreply, State};
+ {ok, CheckRec} ->
+ CheckRec2 = trigger_health_check(Id, CheckRec, true),
+ Checks = orddict:store(Id, CheckRec2, State#state.health_checks),
+ {noreply, State#state{health_checks = Checks}}
+ end.
handle_info({nodeup, _Node}, State) ->
%% Ignore node up events; nothing to do here...
@@ -203,6 +266,36 @@ handle_info({'DOWN', Mref, _, _Pid, _Info}, State) ->
{noreply, update_avsn(S2)}
end;
+handle_info({'EXIT', Pid, Cause}, State) ->
+ case erlang:erase(Pid) of
+ undefined ->
+ % likely a late exit from a canceled health check.
+ {noreply, State};
+ Id ->
+ State2 = handle_health_check_exit(Id, Cause, State),
+ {noreply, State2}
+ end;
+
+handle_info({health_check, Id}, State) ->
+ case orddict:find(Id, State#state.health_checks) of
+ error ->
+ {noreply, State};
+ {ok, CheckRec} ->
+ CheckRec2 = trigger_health_check(Id, CheckRec, false),
+ Checks = orddict:store(Id, CheckRec2, State#state.health_checks),
+ {noreply, State#state{health_checks = Checks}}
+ end;
+
+handle_info({force_health_check, Id}, State) ->
+ case orddict:find(Id, State#state.health_checks) of
+ error ->
+ {noreply, State};
+ {ok, CheckRec} ->
+ CheckRec2 = trigger_health_check(Id, CheckRec, true),
+ Checks = orddict:store(Id, CheckRec2, State#state.health_checks),
+ {noreply, State#state{health_checks = Checks}}
+ end;
+
handle_info({gen_event_EXIT, _, _}, State) ->
%% Ring event handler has been removed for some reason; re-register
watch_for_ring_events(),
@@ -423,3 +516,151 @@ internal_get_nodes(Service) ->
[] ->
[]
end.
+
+cancel_health_check(Id, State) ->
+ case orddict:find(Id, State#state.health_checks) of
+ error ->
+ State;
+ {ok, CheckRec} ->
+ cancel_health_check(CheckRec),
+ HealthChecks = orddict:erase(Id, State#state.health_checks),
+ State#state{health_checks = HealthChecks}
+ end.
+
+cancel_health_check(#health_check{checking_pid = Pid}) when is_pid(Pid) ->
+ %% Yes, I'm not killing the pid, I'm just unlinking
+ unlink(Pid),
+ erlang:erase(Pid),
+ ok;
+
+cancel_health_check(#health_check{interval_tref = Tref}) ->
+ case erlang:cancel_timer(Tref) of
+ false ->
+ lager:notice("Canceling health check timer already canceled or sent"),
+ ok;
+ _N ->
+ ok
+ end.
+
+schedule_health_check(Id, CheckRec) ->
+ Override = app_helper:get_env(riak_core, {health_check_interval, Id}),
+ schedule_health_check(Id, CheckRec, Override).
+
+schedule_health_check(_Id, #health_check{check_interval = infinity} = CheckRec, undefined) ->
+ CheckRec#health_check{interval_tref = undefined};
+
+schedule_health_check(Id, #health_check{callback_failures = N, max_callback_failures = N} = CheckRec, undefined) ->
+ lager:warning("No further checking of ~p due to max callback failures", [Id]),
+ CheckRec#health_check{interval_tref = undefined};
+
+schedule_health_check(Id, #health_check{health_failures = 0, check_interval = N} = CheckRec, undefined) ->
+ Timer = erlang:send_after(N * 1000, self(), {health_check, Id}),
+ CheckRec#health_check{interval_tref = Timer};
+
+schedule_health_check(Id, #health_check{health_failures = N, check_interval = Interval} = CheckRec, undefined) ->
+ Seconds = determine_time(N, Interval),
+ Timer = erlang:send_after(round(Seconds * 1000), self(), {health_check, Id, false}),
+ CheckRec#health_check{interval_tref = Timer};
+
+schedule_health_check(Id, CheckRec, Interval) ->
+ application:unset_env(riak_core, {health_check_interval, Id}),
+ CheckRec1 = CheckRec#health_check{health_failures = 0, check_interval = Interval},
+ schedule_health_check(Id, CheckRec1, undefined).
+
+handle_health_check_exit(Id, Cause, State) ->
+ case orddict:find(Id, State#state.health_checks) of
+ error ->
+ State;
+ {ok, CheckRec} ->
+ handle_health_check_exit(Id, Cause, CheckRec, State)
+ end.
+
+% all is well
+handle_health_check_exit(Id, normal, #health_check{health_failures = 0} = CheckRec, State) ->
+ CheckRec2 = schedule_health_check(Id, CheckRec#health_check{checking_pid = undefined, callback_failures = 0}),
+ Checks = orddict:store(Id, CheckRec2),
+ State#state{health_checks = Checks};
+
+% recovery before service_down
+handle_health_check_exit(Id, normal, #health_check{health_failures = N, max_health_failures = M} = CheckRec, State) when N < M ->
+ CheckRec2 = CheckRec#health_check{health_failures = 0, checking_pid = undefined, callback_failures = 0},
+ CheckRec3 = schedule_health_check(Id, CheckRec2),
+ Checks = orddict:store(Id, CheckRec3),
+ State#state{health_checks = Checks};
+
+% recovery after service_down
+handle_health_check_exit(Id, normal, #health_check{health_failures = N} = CheckRec, State) when N > 0 ->
+ CheckRec2 = CheckRec#health_check{health_failures = 0, checking_pid = undefined, callback_failures = 0},
+ {reply, _, State2} = handle_call({service_up, Id, CheckRec2#health_check.service_pid}, "from", State),
+ CheckRec3 = schedule_health_check(Id, CheckRec2),
+ Checks = orddict:store(Id, CheckRec3),
+ State2#state{health_checks = Checks};
+
+% healthy failure reaches threshold
+handle_health_check_exit(Id, false, #health_check{health_failures = N, max_health_failures = M} = CheckRec, State) when N + 1 == M ->
+ CheckRec2 = CheckRec#health_check{health_failures = N + 1, checking_pid = undefined, callback_failures = 0},
+ {reply, _, State2} = handle_call({service_down, Id}, "from", State),
+ CheckRec3 = schedule_health_check(Id, CheckRec2),
+ Checks = orddict:store(Id, CheckRec3),
+ State2#state{health_checks = Checks};
+
+% healthy failure while service_down
+handle_health_check_exit(Id, false, #health_check{health_failures = N, max_health_failures = M} = CheckRec, State) when N >= M ->
+ CheckRec2 = CheckRec#health_check{health_failures = N + 1, checking_pid = undefined, callback_failures = 0},
+ CheckRec3 = schedule_health_check(Id, CheckRec2),
+ Checks = orddict:store(Id, CheckRec3),
+ State#state{health_checks = Checks};
+
+% healyth failure while service_up, but below threshold
+handle_health_check_exit(Id, false, #health_check{health_failures = N, max_health_failures = M} = CheckRec, State) when N < M ->
+ CheckRec2 = CheckRec#health_check{health_failures = N + 1, checking_pid = undefined, callback_failures = 0},
+ CheckRec3 = schedule_health_check(Id, CheckRec2),
+ Checks = orddict:store(Id, CheckRec3),
+ State#state{health_checks = Checks};
+
+% callback fails too many times.
+handle_health_check_exit(Id, Cause, #health_check{callback_failures = N, max_callback_failures = M}, State) when N + 1 == M ->
+ lager:warning("health check function for ~p errored: ~p", [Id, Cause]),
+ lager:warning("health check function for ~p failed too many times, removing it", [Id]),
+ Checks = orddiect:erase(Id, State#state.health_checks),
+ State#state{health_checks = Checks};
+
+% callback fails
+handle_health_check_exit(Id, Cause, CheckRec, State) ->
+ lager:warning("health check function for ~p errored: ~p", [Id, Cause]),
+ #health_check{callback_failures = N} = CheckRec,
+ CheckRec2 = CheckRec#health_check{checking_pid = undefined, callback_failures = N + 1},
+ CheckRec3 = schedule_health_check(Id, CheckRec2),
+ Checks = orddict:store(Id, CheckRec3, State#state.health_checks),
+ State#state{health_checks = Checks}.
+
+determine_time(Failures, BaseInterval) when Failures < 4 ->
+ BaseInterval;
+
+determine_time(Failures, BaseInterval) when Failures < 11 ->
+ BaseInterval * (math:pow(Failures, 1.3));
+
+determine_time(Failures, BaseInterval) when Failures > 10 ->
+ BaseInterval * 20.
+
+trigger_health_check(_Id, #health_check{checking_pid = Pid} = R, _) when is_pid(Pid) ->
+ R;
+
+trigger_health_check(Id, #health_check{interval_tref = undefined} = Rec, _) ->
+ #health_check{callback = MFA, service_pid = ServPid} = Rec,
+ {Mod, Fun, Args} = MFA,
+ Pid = proc_lib:spawn_link(Mod, Fun, [ServPid | Args]),
+ erlang:put(Pid, Id),
+ Rec#health_check{checking_pid = Pid};
+
+trigger_health_check(_Id, Rec, false) ->
+ Rec;
+
+trigger_health_check(Id, Rec, true) ->
+ % if the message is already delivered, it should be okay.
+ erlang:cancel_timer(Rec#health_check.interval_tref),
+ #health_check{callback = MFA, service_pid = ServPid} = Rec,
+ {Mod, Fun, Args} = MFA,
+ Pid = proc_lib:spawn_link(Mod, Fun, [ServPid | Args]),
+ erlang:put(Pid, Id),
+ Rec#health_check{checking_pid = Pid, interval_tref = undefined}.
Please sign in to comment.
Something went wrong with that request. Please try again.