Browse files

Merge branch '1.2'

Conflicts:
	rebar.config
  • Loading branch information...
2 parents 2675e1f + 40e50f5 commit 93c12b9dd3cec0cf8d6e3badf4f0046103463aa6 @kellymclaughlin kellymclaughlin committed Aug 29, 2012
Showing with 102 additions and 15 deletions.
  1. +102 −15 src/riak_core_vnode.erl
View
117 src/riak_core_vnode.erl
@@ -40,6 +40,14 @@
core_status/1,
handoff_error/3]).
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-export([test_link/2,
+ current_state/1]).
+-endif.
+
-define(normal_reason(R),
(R == normal orelse R == shutdown orelse
(is_tuple(R) andalso element(1,R) == shutdown))).
@@ -95,6 +103,7 @@ behaviour_info(_Other) ->
forward :: node(),
handoff_node=none :: none | node(),
pool_pid :: pid() | undefined,
+ pool_config :: tuple() | undefined,
manager_event_timer :: reference(),
inactivity_timeout}).
@@ -129,19 +138,22 @@ init([Mod, Index, InitialInactivityTimeout, Forward]) ->
{error, Reason} ->
{stop, Reason};
_ ->
- PoolPid = case lists:keyfind(pool, 1, Props) of
- {pool, WorkerModule, PoolSize, WorkerArgs} ->
+ case lists:keyfind(pool, 1, Props) of
+ {pool, WorkerModule, PoolSize, WorkerArgs}=PoolConfig ->
lager:debug("starting worker pool ~p with size of ~p~n",
- [WorkerModule, PoolSize]),
- {ok, Pid} = riak_core_vnode_worker_pool:start_link(WorkerModule,
- PoolSize, Index, WorkerArgs, worker_props),
- Pid;
- _ -> undefined
+ [WorkerModule, PoolSize]),
+ {ok, PoolPid} = riak_core_vnode_worker_pool:start_link(WorkerModule,
+ PoolSize,
+ Index,
+ WorkerArgs,
+ worker_props);
+ _ ->
+ PoolPid = PoolConfig = undefined
end,
riak_core_handoff_manager:remove_exclusion(Mod, Index),
Timeout = app_helper:get_env(riak_core, vnode_inactivity_timeout, ?DEFAULT_TIMEOUT),
State = #state{index=Index, mod=Mod, modstate=ModState, forward=Forward,
- inactivity_timeout=Timeout, pool_pid=PoolPid},
+ inactivity_timeout=Timeout, pool_pid=PoolPid, pool_config=PoolConfig},
lager:debug("vnode :: ~p/~p :: ~p~n", [Mod, Index, Forward]),
{ok, active, State, InitialInactivityTimeout}
end.
@@ -425,7 +437,7 @@ handle_event(finish_handoff, _StateName, State=#state{mod=Mod,
end;
handle_event(cancel_handoff, _StateName, State=#state{mod=Mod,
modstate=ModState}) ->
- %% it would be nice to pass {Err, Reason} to the vnode but the
+ %% it would be nice to pass {Err, Reason} to the vnode but the
%% API doesn't currently allow for that.
stop_manager_event_timer(State),
case State#state.handoff_node of
@@ -446,6 +458,8 @@ handle_event(R=?COVERAGE_REQ{}, _StateName, State) ->
active(R, State).
+handle_sync_event(current_state, _From, StateName, State) ->
+ {reply, {StateName, State}, StateName, State};
handle_sync_event(get_mod_index, _From, StateName,
State=#state{index=Idx,mod=Mod}) ->
{reply, {Mod, Idx}, StateName, State, State#state.inactivity_timeout};
@@ -501,15 +515,29 @@ handle_sync_event(core_status, _From, StateName, State=#state{index=Index,
{reply, {Mode, Status}, StateName, State, State#state.inactivity_timeout}.
-handle_info({'EXIT', Pid, Reason}, _StateName,
- State=#state{mod=Mod, index=Index, pool_pid=Pid}) ->
+handle_info({'EXIT', Pid, Reason},
+ _StateName,
+ State=#state{mod=Mod,
+ index=Index,
+ pool_pid=Pid,
+ pool_config=PoolConfig}) ->
case Reason of
Reason when Reason == normal; Reason == shutdown ->
- ok;
+ continue(State#state{pool_pid=undefined});
_ ->
- lager:error("~p ~p worker pool crashed ~p\n", [Index, Mod, Reason])
- end,
- continue(State#state{pool_pid=undefined});
+ lager:error("~p ~p worker pool crashed ~p\n", [Index, Mod, Reason]),
+ {pool, WorkerModule, PoolSize, WorkerArgs}=PoolConfig,
+ lager:debug("starting worker pool ~p with size "
+ "of ~p for vnode ~p.",
+ [WorkerModule, PoolSize, Index]),
+ {ok, NewPoolPid} =
+ riak_core_vnode_worker_pool:start_link(WorkerModule,
+ PoolSize,
+ Index,
+ WorkerArgs,
+ worker_props),
+ continue(State#state{pool_pid=NewPoolPid})
+ end;
handle_info(Info, _StateName,
State=#state{mod=Mod,modstate={deleted, _},index=Index}) ->
@@ -659,3 +687,62 @@ stop_manager_event_timer(#state{manager_event_timer=undefined}) ->
ok;
stop_manager_event_timer(#state{manager_event_timer=T}) ->
gen_fsm:cancel_timer(T).
+
+%% ===================================================================
+%% Test API
+%% ===================================================================
+
+-ifdef(TEST).
+
+%% @doc Start the garbage collection server
+test_link(Mod, Index) ->
+ gen_fsm:start_link(?MODULE, [Mod, Index, 0, node()], []).
+
+%% @doc Get the current state of the fsm for testing inspection
+-spec current_state(pid()) -> {atom(), #state{}} | {error, term()}.
+current_state(Pid) ->
+ gen_fsm:sync_send_all_state_event(Pid, current_state).
+
+pool_death_test() ->
+ meck:new(test_vnode),
+ meck:expect(test_vnode, init, fun(_) -> {ok, [], [{pool, test_pool_mod, 1, []}]} end),
+ meck:expect(test_vnode, terminate, fun(_, _) -> normal end),
+ meck:new(test_pool_mod),
+ meck:expect(test_pool_mod, init_worker, fun(_, _, _) -> {ok, []} end),
+
+ {ok, Pid} = ?MODULE:test_link(test_vnode, 0),
+ {_, StateData1} = ?MODULE:current_state(Pid),
+ PoolPid1 = StateData1#state.pool_pid,
+ exit(PoolPid1, kill),
+ wait_for_process_death(PoolPid1),
+ ?assertNot(is_process_alive(PoolPid1)),
+ wait_for_state_update(StateData1, Pid),
+ {_, StateData2} = ?MODULE:current_state(Pid),
+ PoolPid2 = StateData2#state.pool_pid,
+ ?assertNot(PoolPid2 =:= undefined),
+ exit(Pid, normal),
+ wait_for_process_death(Pid),
+ meck:validate(test_pool_mod),
+ meck:validate(test_vnode),
+ meck:unload(test_pool_mod),
+ meck:unload(test_vnode).
+
+wait_for_process_death(Pid) ->
+ wait_for_process_death(Pid, is_process_alive(Pid)).
+
+wait_for_process_death(Pid, true) ->
+ wait_for_process_death(Pid, is_process_alive(Pid));
+wait_for_process_death(_Pid, false) ->
+ ok.
+
+wait_for_state_update(OriginalStateData, Pid) ->
+ {_, CurrentStateData} = ?MODULE:current_state(Pid),
+ wait_for_state_update(OriginalStateData, CurrentStateData, Pid).
+
+wait_for_state_update(OriginalStateData, OriginalStateData, Pid) ->
+ {_, CurrentStateData} = ?MODULE:current_state(Pid),
+ wait_for_state_update(OriginalStateData, CurrentStateData, Pid);
+wait_for_state_update(_OriginalState, _StateData, _Pid) ->
+ ok.
+
+-endif.

0 comments on commit 93c12b9

Please sign in to comment.