Permalink
Browse files

Change riak_core_vnode_worker:handle_work to use an async cast.

Fixes riak_core 213

Change riak_core_vnode_worker:handle_work to use gen_server:cast
instead of gen_server:call. This addresses a possible timeout
condition that can result in a vnode worker pool crashing.
  • Loading branch information...
1 parent 93c12b9 commit ef983f52de988b18f46d6d557d5e69a60d54c282 @kellymclaughlin kellymclaughlin committed Jul 20, 2012
Showing with 15 additions and 12 deletions.
  1. +13 −10 src/riak_core_vnode_worker.erl
  2. +2 −2 src/riak_core_vnode_worker_pool.erl
View
23 src/riak_core_vnode_worker.erl
@@ -23,7 +23,7 @@
-export([behaviour_info/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([start_link/1, handle_work/4]).
+-export([start_link/1, handle_work/4, handle_work/5]).
-record(state, {
module :: atom(),
@@ -42,16 +42,22 @@ start_link(Args) ->
[VNodeIndex, WorkerArgs, WorkerProps] = proplists:get_value(worker_args, Args),
gen_server:start_link(?MODULE, [WorkerMod, VNodeIndex, WorkerArgs, WorkerProps], []).
-handle_work(Pid, Pool, Work, From) ->
- gen_server:call(Pid, {work, Pool, Work, From}).
+handle_work(Worker, Pool, Work, From) ->
+ handle_work(Worker, Pool, Work, From, self()).
+
+handle_work(Worker, Pool, Work, From, Caller) ->
+ gen_server:cast(Worker, {work, Pool, Work, From, Caller}).
init([Module, VNodeIndex, WorkerArgs, WorkerProps]) ->
{ok, WorkerState} = Module:init_worker(VNodeIndex, WorkerArgs, WorkerProps),
{ok, #state{module=Module, modstate=WorkerState}}.
-handle_call({work, Pool, Work, WorkFrom}, {Pid, _} = From, #state{module = Mod,
- modstate = ModState} = State) ->
- gen_server:reply(From, ok), %% unblock the caller
+handle_call(Event, _From, State) ->
+ lager:debug("Vnode worker received synchronous event: ~p.", [Event]),
+ {reply, ok, State}.
+
+handle_cast({work, Pool, Work, WorkFrom, Caller},
+ #state{module = Mod, modstate = ModState} = State) ->
NewModState = case Mod:handle_work(Work, WorkFrom, ModState) of
{reply, Reply, NS} ->
riak_core_vnode:reply(WorkFrom, Reply),
@@ -61,11 +67,8 @@ handle_call({work, Pool, Work, WorkFrom}, {Pid, _} = From, #state{module = Mod,
end,
%% check the worker back into the pool
poolboy:checkin(Pool, self()),
- gen_fsm:send_all_state_event(Pid, {checkin, self()}),
+ gen_fsm:send_all_state_event(Caller, {checkin, self()}),
{noreply, State#state{modstate=NewModState}};
-handle_call(_Event, _From, State) ->
- {reply, ok, State}.
-
handle_cast(_Event, State) ->
{noreply, State}.
View
4 src/riak_core_vnode_worker_pool.erl
@@ -66,7 +66,7 @@ ready({work, Work, From} = Msg, #state{pool=Pool, queue=Q, monitors=Monitors} =
{next_state, queueing, State#state{queue=queue:in(Msg, Q)}};
Pid when is_pid(Pid) ->
NewMonitors = monitor_worker(Pid, From, Work, Monitors),
- ok = riak_core_vnode_worker:handle_work(Pid, Pool, Work, From),
+ riak_core_vnode_worker:handle_work(Pid, Pool, Work, From),
{next_state, ready, State#state{monitors=NewMonitors}}
end;
ready(_Event, State) ->
@@ -107,7 +107,7 @@ handle_event({checkin, Worker}, _, #state{pool = Pool, queue=Q, monitors=Monitor
monitors=Monitors}};
Pid when is_pid(Pid) ->
NewMonitors = monitor_worker(Pid, From, Work, Monitors),
- ok = riak_core_vnode_worker:handle_work(Pid, Pool, Work, From),
+ riak_core_vnode_worker:handle_work(Pid, Pool, Work, From),
{next_state, queueing, State#state{queue=Rem,
monitors=NewMonitors}}
end;

0 comments on commit ef983f5

Please sign in to comment.