Permalink
Browse files

adding helthy service works

  • Loading branch information...
1 parent c748deb commit 1a3b2c2df5d93b33e834dda490d206508bea31a0 @lordnull lordnull committed Oct 4, 2012
Showing with 25 additions and 21 deletions.
  1. +3 −7 src/riak_core_node_watcher.erl
  2. +22 −14 test/node_watcher_qc.erl
@@ -156,12 +156,7 @@ handle_call({service_up, Id, Pid}, _From, State) ->
handle_call({service_up, Id, Pid, MFA, Options}, From, State) ->
%% update the active set of services if needed.
- {reply, _, State1} = case erlang:get(Id) of
- Pid ->
- {reply, ok, State};
- _OtherPid ->
- handle_call({service_up, Id, Pid}, From, State)
- end,
+ {reply, _, State1} = handle_call({service_up, Id, Pid}, From, State),
%% uninstall old health check
State2 = cancel_health_check(Id, State1),
@@ -171,6 +166,7 @@ handle_call({service_up, Id, Pid, MFA, Options}, From, State) ->
CheckRec = #health_check{
callback = MFA,
check_interval = CheckInterval,
+ service_pid = Pid,
max_callback_failures = proplists:get_value(max_callback_failures, Options, 3),
interval_tref = erlang:send_after(CheckInterval * 1000, self(), {health_check, Id})
},
@@ -578,7 +574,7 @@ handle_health_check_exit(Id, Cause, State) ->
% all is well
handle_health_check_exit(Id, normal, #health_check{health_failures = 0} = CheckRec, State) ->
CheckRec2 = schedule_health_check(Id, CheckRec#health_check{checking_pid = undefined, callback_failures = 0}),
- Checks = orddict:store(Id, CheckRec2),
+ Checks = orddict:store(Id, CheckRec2, State#state.health_checks),
State#state{health_checks = Checks};
% recovery before service_down
View
@@ -106,8 +106,7 @@ command(S) ->
{call, ?MODULE, remote_service_down, [g_node()]},
{call, ?MODULE, remote_service_down_disterl, [g_node()]},
{call, ?MODULE, wait_for_bcast, []},
- {call, ?MODULE, healthy_new_service, [g_service()]},
- {call, ?MODULE, healthy_existing_service, [g_service()]},
+ {call, ?MODULE, healthy_service, [g_service()]},
{call, ?MODULE, unhealthy_service, [g_service()]},
{call, ?MODULE, recovering_service, [g_service()]},
{call, ?MODULE, faulty_health_check_callback_once, [g_service()]},
@@ -153,11 +152,10 @@ next_state(S, _Res, {call, _, Fn, [Node]})
next_state(S, _Res, {call, _, wait_for_bcast, _}) ->
S;
-next_state(S, _, {call, _, healthy_new_service, [_Service]}) ->
- meck:expect(mod_health, callback, fun (Pid, true) when is_pid(Pid) ->
- true
- end),
- S;
+next_state(S, Res, {call, _, healthy_service, [Service]}) ->
+ S2 = service_up(node(), Service, S),
+ Pids = orddict:store(Service, Res, S2#state.service_pids),
+ S2#state { service_pids = Pids };
next_state(S, Res, {call, M, healthy_existing_service, [Service]}) ->
meck:expect(mod_health, callback, fun (Pid, true) when is_pid(Pid) ->
@@ -176,7 +174,7 @@ next_state(S, Res, {call, M, recovering_service, [Service]}) ->
next_state(S, Res, {call, M, local_service_up, [Service]});
next_state(S, Res, {call, M, faulty_health_check_callback_once, [Service]}) ->
- meck:expect(mod_health, callback, fun(_Pid, error_once) ->
+ meck:expect(mod_health, callback, fun(Pid, error_once) when is_pid(Pid) ->
case put(meck_boomed, true) of
false ->
meck:exception(meck_boomed);
@@ -187,7 +185,7 @@ next_state(S, Res, {call, M, faulty_health_check_callback_once, [Service]}) ->
next_state(S, Res, {call, M, local_service_up, [Service]});
next_state(S, Res, {call, M, faulty_health_check_callback_many, [Service]}) ->
- meck:expect(mod_health, callback, fun(_Pid, error) ->
+ meck:expect(mod_health, callback, fun(Pid, error) when is_pid(Pid) ->
meck:exception(meck_boomed)
end),
next_state(S, Res, {call, M, local_service_up, [Service]});
@@ -257,7 +255,7 @@ postcondition(S, {call, _, Fn, [Node]}, _Res)
postcondition(S, {call, _, wait_for_bcast, _}, _Res) ->
validate_broadcast(S, S, service);
-postcondition(S, {call, _, healthy_new_service, [Service]}, _Res) ->
+postcondition(S, {call, _, healthy_service, [Service]}, _Res) ->
S2 = service_up(node(), Service, S),
validate_broadcast(S, S2, service),
?assert(meck:validate(mod_health)),
@@ -277,8 +275,7 @@ postcondition(S, {call, _, unhealthy_service, [Service]}, _Res) ->
postcondition(S, {call, _, recovering_service, [Service]}, _Res) ->
S2 = service_down(node(), Service, S),
S3 = service_up(node(), Service, S2),
- validate_broadcast(S, S2, service),
- validate_broadcast(S2, S3, service),
+ validate_broadcasts([S, S2, S3], Service),
?assert(meck:validate(mod_health)),
deep_validate(S3);
@@ -430,10 +427,21 @@ ring_update(Nodes) ->
wait_for_avsn(Avsn0),
?ORDSET(Nodes).
-healthy_new_service(Service) ->
+healthy_service(Service) ->
Pid = spawn(fun() -> service_loop() end),
+ Self = self(),
+ meck:expect(mod_health, callback, fun (InPid, true) when is_pid(Pid) ->
+ Self ! {meck_done, InPid},
+ true
+ end),
ok = riak_core_node_watcher:service_up(Service, Pid, {mod_health, callback, [true]}),
gen_server:cast(riak_core_node_watcher, {force_health_check, Service}),
+ receive
+ {meck_done, MeckInPid} ->
+ ?assertEqual(Pid, MeckInPid)
+ after 1000 ->
+ erlang:error(timeout)
+ end,
Pid.
healthy_existing_service(Service) ->
@@ -466,7 +474,7 @@ faulty_health_check_callback_once(Service) ->
faulty_health_check_callback_many(Service) ->
Pid = spawn(fun() -> service_loop() end),
- ok = riak_core_node_watcher:service_up(Service, Pid, {mod_health, callback, [error_once]}),
+ ok = riak_core_node_watcher:service_up(Service, Pid, {mod_health, callback, [error]}),
gen_server:cast(riak_core_node_watcher, {force_health_check, Service}),
gen_server:cast(riak_core_node_watcher, {force_health_check, Service}),
gen_server:cast(riak_core_node_watcher, {force_health_check, Service}),

0 comments on commit 1a3b2c2

Please sign in to comment.