Permalink
Browse files

Initial attempt at clean vnode shutdown that waits for queued work

bz1188

This patch adds a patched supervisor module that supports graceful
shutdown from a simple_one_for_one, so when a node stops gracefully, we
can block shutdown long enough to process any queued work and do any
other cleanups.
  • Loading branch information...
1 parent 8209a6e commit 3326a06dc0f515a0f43b4c7e7d8b46d89074df82 @Vagabond Vagabond committed Sep 14, 2011
View
@@ -59,6 +59,7 @@
riak_core_wm_urlmap,
slide,
spiraltime,
+ supervisor_pre_r14b04,
vclock
]},
{registered, []},
@@ -314,22 +314,37 @@ active(_Event, _From, State) ->
finish_handoff(State=#state{mod=Mod,
modstate=ModState,
- index=Idx,
+ index=Idx,
+ pool_pid=Pool,
handoff_node=HN}) ->
case riak_core_gossip:finish_handoff(Idx, node(), HN, Mod) of
forward ->
+ case is_pid(Pool) of
+ true ->
+ riak_core_vnode_worker_pool:shutdown_pool(Pool, 60000);
+ _ ->
+ ok
+ end,
{ok, NewModState} = Mod:delete(ModState),
{stop, normal, State#state{modstate=NewModState,
handoff_node=none,
+ pool_pid=undefined,
handoff_pid=undefined}};
continue ->
continue(State#state{handoff_node=none,
handoff_pid=undefined});
shutdown ->
+ case is_pid(Pool) of
+ true ->
+ riak_core_vnode_worker_pool:shutdown_pool(Pool, 60000);
+ _ ->
+ ok
+ end,
{ok, NewModState} = Mod:delete(ModState),
riak_core_handoff_manager:add_exclusion(Mod, Idx),
{stop, normal, State#state{modstate=NewModState,
handoff_node=none,
+ pool_pid=undefined,
handoff_pid=undefined}}
end.
@@ -390,9 +405,14 @@ handle_info(Info, StateName, State=#state{mod=Mod,modstate=ModState}) ->
{next_state, StateName, State, State#state.inactivity_timeout}
end.
-terminate(Reason, _StateName, #state{mod=Mod, modstate=ModState}) ->
- Mod:terminate(Reason, ModState),
- ok.
+terminate(Reason, _StateName, #state{mod=Mod, modstate=ModState, pool_pid=Pool}) ->
+ case is_pid(Pool) of
+ true ->
+ riak_core_vnode_worker_pool:shutdown_pool(Pool, 60000);
+ _ ->
+ ok
+ end,
+ Mod:terminate(Reason, ModState).
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.
@@ -28,15 +28,15 @@
-export([start_vnode/2]).
start_vnode(Mod, Index) when is_integer(Index) ->
- supervisor:start_child(?MODULE, [Mod, Index]).
+ supervisor_pre_r14b04:start_child(?MODULE, [Mod, Index]).
start_link() ->
- supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+ supervisor_pre_r14b04:start_link({local, ?MODULE}, ?MODULE, []).
%% @private
init([]) ->
{ok,
{{simple_one_for_one, 10, 10},
[{undefined,
{riak_core_vnode, start_link, []},
- temporary, brutal_kill, worker, dynamic}]}}.
+ temporary, 300000, worker, dynamic}]}}.
@@ -61,7 +61,7 @@ handle_call({work, Pool, Work, WorkFrom}, {Pid, _} = From, #state{module = Mod,
end,
%% check the worker back into the pool
poolboy:checkin(Pool, self()),
- gen_fsm:send_all_state_event(Pid, checkin),
+ gen_fsm:send_all_state_event(Pid, {checkin, self()}),
{noreply, State#state{modstate=NewModState}};
handle_call(_Event, _From, State) ->
{reply, ok, State}.
@@ -25,15 +25,16 @@
terminate/3, code_change/4]).
%% gen_fsm states
--export([ready/2, queueing/2, ready/3, queueing/3]).
+-export([ready/2, queueing/2, ready/3, queueing/3, shutdown/2, shutdown/3]).
%% API
--export([start_link/5, handle_work/3]).
+-export([start_link/5, stop/2, shutdown_pool/2, handle_work/3]).
-record(state, {
queue = queue:new(),
pool :: pid(),
- monitors = [] :: list()
+ monitors = [] :: list(),
+ shutdown :: undefined | {pid(), reference()}
}).
start_link(WorkerMod, PoolSize, VNodeIndex, WorkerArgs, WorkerProps) ->
@@ -42,6 +43,13 @@ start_link(WorkerMod, PoolSize, VNodeIndex, WorkerArgs, WorkerProps) ->
handle_work(Pid, Work, From) ->
gen_fsm:send_event(Pid, {work, Work, From}).
+stop(Pid, Reason) ->
+ gen_fsm:sync_send_all_state_event(Pid, {stop, Reason}).
+
+%% wait for all the workers to finish any current work
+shutdown_pool(Pid, Wait) ->
+ gen_fsm:sync_send_all_state_event(Pid, {shutdown, Wait}).
+
init([WorkerMod, PoolSize, VNodeIndex, WorkerArgs, WorkerProps]) ->
{ok, Pid} = poolboy:start_link([{worker_module, riak_core_vnode_worker},
{worker_args, [VNodeIndex, WorkerArgs, WorkerProps]},
@@ -73,7 +81,26 @@ queueing({work, _Work, _From} = Msg, #state{queue=Q} = State) ->
queueing(_Event, State) ->
{next_state, queueing, State}.
-handle_event(checkin, _, #state{pool = Pool, queue=Q, monitors=Monitors} = State) ->
+shutdown(_Event, _From, State) ->
+ {reply, ok, shutdown, State}.
+
+shutdown({work, _Work, From}, State) ->
+ %% tell the process requesting work that we're shutting down
+ riak_core_vnode:reply(From, {error, vnode_shutdown}),
+ {next_state, shutdown, State};
+shutdown(_Event, State) ->
+ {next_state, shutdown, State}.
+
+handle_event({checkin, Pid}, shutdown, #state{monitors=Monitors0} = State) ->
+ Monitors = lists:keydelete(Pid, 1, Monitors0),
+ case Monitors of
+ [] -> %% work all done, time to exit!
+ {stop, shutdown, State};
+ _ ->
+ {next_state, shutdown, State#state{monitors=Monitors}}
+ end;
+handle_event({checkin, Pid}, _, #state{pool = Pool, queue=Q, monitors=Monitors0} = State) ->
+ Monitors = lists:keydelete(Pid, 1, Monitors0),
case queue:out(Q) of
{{value, {work, Work, From}}, Rem} ->
case poolboy:checkout(Pool) of
@@ -91,6 +118,23 @@ handle_event(checkin, _, #state{pool = Pool, queue=Q, monitors=Monitors} = State
handle_event(_Event, StateName, State) ->
{next_state, StateName, State}.
+handle_sync_event({stop, Reason}, _From, _StateName, State) ->
+ {stop, Reason, ok, State};
+handle_sync_event({shutdown, Time}, From, _StateName, #state{queue=Q,
+ monitors=Monitors} = State) ->
+ discard_queued_work(Q),
+ case Monitors of
+ [] ->
+ {stop, shutdown, ok, State};
+ _ ->
+ case Time of
+ infinity ->
+ ok;
+ _ when is_integer(Time) ->
+ erlang:send_after(Time, self(), shutdown)
+ end,
+ {noreply, shutdown, State#state{shutdown=From}}
+ end;
handle_sync_event(_Event, _From, StateName, State) ->
{reply, {error, unknown_message}, StateName, State}.
@@ -102,24 +146,29 @@ handle_info({'DOWN', _Ref, _, Pid, Info}, StateName, #state{monitors=Monitors} =
NewMonitors = lists:keydelete(Pid, 1, Monitors),
%% pretend a worker just checked in so that any queued work can
%% sent to the new worker just started to replace this dead one
- gen_fsm:send_all_state_event(self(), checkin),
+ gen_fsm:send_all_state_event(self(), {checkin, undefined}),
{next_state, StateName, State#state{monitors=NewMonitors}};
false ->
{next_state, StateName, State}
end;
+handle_info(shutdown, shutdown, #state{monitors=Monitors} = State) ->
+ %% we've waited too long to shutdown, time to force the issue.
+ [riak_core_vnode:reply(From, {error, vnode_shutdown}) || {_, _, From, _}
+ <- Monitors],
+ {stop, shutdown, State};
handle_info(_Info, StateName, State) ->
{next_state, StateName, State}.
-terminate(_Reason, _StateName, _State) ->
+terminate(_Reason, _StateName, #state{pool=Pool}) ->
+ %% stop poolboy
+ gen_fsm:sync_send_all_state_event(Pool, stop),
ok.
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.
%% Keep track of which worker we pair with what work/from and monitor the
-%% worker. Dead workers are removed from the list but successful ones aren't,
-%% the data in the list is just updated when the worker is reused.
-%% This should be fine because the worker shouldn't crash while not in use.
+%% worker. Only active workers are tracked
monitor_worker(Worker, From, Work, Monitors) ->
case lists:keyfind(Worker, 1, Monitors) of
{Worker, Ref, _OldFrom, _OldWork} ->
@@ -129,3 +178,13 @@ monitor_worker(Worker, From, Work, Monitors) ->
Ref = erlang:monitor(process, Worker),
[{Worker, Ref, From, Work} | Monitors]
end.
+
+discard_queued_work(Q) ->
+ case queue:out(Q) of
+ {{value, {work, _Work, From}}, Rem} ->
+ riak_core_vnode:reply(From, {error, vnode_shutdown}),
+ discard_queued_work(Rem);
+ {empty, _Empty} ->
+ ok
+ end.
+
Oops, something went wrong.

0 comments on commit 3326a06

Please sign in to comment.