Permalink
Browse files

Merge branch 'core212-restart-worker-pool' into 1.2

  • Loading branch information...
kellymclaughlin committed Aug 29, 2012
2 parents 906eebd + d753cfb commit 40e50f5521f246bc6d67300f32d2ed60423dec47
Showing with 102 additions and 15 deletions.
  1. +102 −15 src/riak_core_vnode.erl
View
@@ -40,6 +40,14 @@
core_status/1,
handoff_error/3]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-export([test_link/2,
current_state/1]).
-endif.
-define(normal_reason(R),
(R == normal orelse R == shutdown orelse
(is_tuple(R) andalso element(1,R) == shutdown))).
@@ -95,6 +103,7 @@ behaviour_info(_Other) ->
forward :: node(),
handoff_node=none :: none | node(),
pool_pid :: pid() | undefined,
pool_config :: tuple() | undefined,
manager_event_timer :: reference(),
inactivity_timeout}).
@@ -129,19 +138,22 @@ init([Mod, Index, InitialInactivityTimeout, Forward]) ->
{error, Reason} ->
{stop, Reason};
_ ->
PoolPid = case lists:keyfind(pool, 1, Props) of
{pool, WorkerModule, PoolSize, WorkerArgs} ->
case lists:keyfind(pool, 1, Props) of
{pool, WorkerModule, PoolSize, WorkerArgs}=PoolConfig ->
lager:debug("starting worker pool ~p with size of ~p~n",
[WorkerModule, PoolSize]),
{ok, Pid} = riak_core_vnode_worker_pool:start_link(WorkerModule,
PoolSize, Index, WorkerArgs, worker_props),
Pid;
_ -> undefined
[WorkerModule, PoolSize]),
{ok, PoolPid} = riak_core_vnode_worker_pool:start_link(WorkerModule,
PoolSize,
Index,
WorkerArgs,
worker_props);
_ ->
PoolPid = PoolConfig = undefined
end,
riak_core_handoff_manager:remove_exclusion(Mod, Index),
Timeout = app_helper:get_env(riak_core, vnode_inactivity_timeout, ?DEFAULT_TIMEOUT),
State = #state{index=Index, mod=Mod, modstate=ModState, forward=Forward,
inactivity_timeout=Timeout, pool_pid=PoolPid},
inactivity_timeout=Timeout, pool_pid=PoolPid, pool_config=PoolConfig},
lager:debug("vnode :: ~p/~p :: ~p~n", [Mod, Index, Forward]),
{ok, active, State, InitialInactivityTimeout}
end.
@@ -425,7 +437,7 @@ handle_event(finish_handoff, _StateName, State=#state{mod=Mod,
end;
handle_event(cancel_handoff, _StateName, State=#state{mod=Mod,
modstate=ModState}) ->
%% it would be nice to pass {Err, Reason} to the vnode but the
%% it would be nice to pass {Err, Reason} to the vnode but the
%% API doesn't currently allow for that.
stop_manager_event_timer(State),
case State#state.handoff_node of
@@ -446,6 +458,8 @@ handle_event(R=?COVERAGE_REQ{}, _StateName, State) ->
active(R, State).
handle_sync_event(current_state, _From, StateName, State) ->
{reply, {StateName, State}, StateName, State};
handle_sync_event(get_mod_index, _From, StateName,
State=#state{index=Idx,mod=Mod}) ->
{reply, {Mod, Idx}, StateName, State, State#state.inactivity_timeout};
@@ -501,15 +515,29 @@ handle_sync_event(core_status, _From, StateName, State=#state{index=Index,
{reply, {Mode, Status}, StateName, State, State#state.inactivity_timeout}.
handle_info({'EXIT', Pid, Reason}, _StateName,
State=#state{mod=Mod, index=Index, pool_pid=Pid}) ->
handle_info({'EXIT', Pid, Reason},
_StateName,
State=#state{mod=Mod,
index=Index,
pool_pid=Pid,
pool_config=PoolConfig}) ->
case Reason of
Reason when Reason == normal; Reason == shutdown ->
ok;
continue(State#state{pool_pid=undefined});
_ ->
lager:error("~p ~p worker pool crashed ~p\n", [Index, Mod, Reason])
end,
continue(State#state{pool_pid=undefined});
lager:error("~p ~p worker pool crashed ~p\n", [Index, Mod, Reason]),
{pool, WorkerModule, PoolSize, WorkerArgs}=PoolConfig,
lager:debug("starting worker pool ~p with size "
"of ~p for vnode ~p.",
[WorkerModule, PoolSize, Index]),
{ok, NewPoolPid} =
riak_core_vnode_worker_pool:start_link(WorkerModule,
PoolSize,
Index,
WorkerArgs,
worker_props),
continue(State#state{pool_pid=NewPoolPid})
end;
handle_info(Info, _StateName,
State=#state{mod=Mod,modstate={deleted, _},index=Index}) ->
@@ -659,3 +687,62 @@ stop_manager_event_timer(#state{manager_event_timer=undefined}) ->
ok;
stop_manager_event_timer(#state{manager_event_timer=T}) ->
gen_fsm:cancel_timer(T).
%% ===================================================================
%% Test API
%% ===================================================================
-ifdef(TEST).
%% @doc Start the garbage collection server
test_link(Mod, Index) ->
gen_fsm:start_link(?MODULE, [Mod, Index, 0, node()], []).
%% @doc Get the current state of the fsm for testing inspection
-spec current_state(pid()) -> {atom(), #state{}} | {error, term()}.
current_state(Pid) ->
gen_fsm:sync_send_all_state_event(Pid, current_state).
pool_death_test() ->
meck:new(test_vnode),
meck:expect(test_vnode, init, fun(_) -> {ok, [], [{pool, test_pool_mod, 1, []}]} end),
meck:expect(test_vnode, terminate, fun(_, _) -> normal end),
meck:new(test_pool_mod),
meck:expect(test_pool_mod, init_worker, fun(_, _, _) -> {ok, []} end),
{ok, Pid} = ?MODULE:test_link(test_vnode, 0),
{_, StateData1} = ?MODULE:current_state(Pid),
PoolPid1 = StateData1#state.pool_pid,
exit(PoolPid1, kill),
wait_for_process_death(PoolPid1),
?assertNot(is_process_alive(PoolPid1)),
wait_for_state_update(StateData1, Pid),
{_, StateData2} = ?MODULE:current_state(Pid),
PoolPid2 = StateData2#state.pool_pid,
?assertNot(PoolPid2 =:= undefined),
exit(Pid, normal),
wait_for_process_death(Pid),
meck:validate(test_pool_mod),
meck:validate(test_vnode),
meck:unload(test_pool_mod),
meck:unload(test_vnode).
wait_for_process_death(Pid) ->
wait_for_process_death(Pid, is_process_alive(Pid)).
wait_for_process_death(Pid, true) ->
wait_for_process_death(Pid, is_process_alive(Pid));
wait_for_process_death(_Pid, false) ->
ok.
wait_for_state_update(OriginalStateData, Pid) ->
{_, CurrentStateData} = ?MODULE:current_state(Pid),
wait_for_state_update(OriginalStateData, CurrentStateData, Pid).
wait_for_state_update(OriginalStateData, OriginalStateData, Pid) ->
{_, CurrentStateData} = ?MODULE:current_state(Pid),
wait_for_state_update(OriginalStateData, CurrentStateData, Pid);
wait_for_state_update(_OriginalState, _StateData, _Pid) ->
ok.
-endif.

0 comments on commit 40e50f5

Please sign in to comment.