Permalink
Browse files

Add kill_worker and fix resulting bugs

kill_worker exposed some new faults in poolboy, where the maximum size
of the pool could end up smaller than it should be after a worker was
killed.

To help in debugging this, a 'status' command was added to poolboy to
look at its internal state and a dynamic_precondition function was used
to compare the expected state to the actual state.

Numerous problems were discovered by doing this, poolboy had issues
around when it changed states, like allocating the last non-overflow
worker would leave the pool in 'ready' until the next request came in,
instead of immediately changing the state.
  • Loading branch information...
Vagabond committed Jan 22, 2012
1 parent 6a53f06 commit c2ba14ccd5dc6dc882d43db7d3190b94f033b185
Showing with 79 additions and 14 deletions.
  1. +44 −13 src/poolboy.erl
  2. +35 −1 test/poolboy_eqc.erl
View
@@ -66,9 +66,15 @@ init([{stop_fun, StopFun} | Rest], State) when is_function(StopFun) ->
init(Rest, State#state{worker_stop=StopFun});
init([_ | Rest], State) ->
init(Rest, State);
-init([], #state{size=Size, worker_sup=Sup, worker_init=InitFun}=State) ->
+init([], #state{size=Size, worker_sup=Sup, worker_init=InitFun,
+ max_overflow=MaxOverflow}=State) ->
Workers = prepopulate(Size, Sup, InitFun),
- {ok, ready, State#state{workers=Workers}}.
+ StartState = case queue:len(Workers) of
+ 0 when MaxOverflow ==0 -> full;
+ 0 -> overflow;
+ _ -> ready
+ end,
+ {ok, StartState, State#state{workers=Workers}}.
ready({checkin, Pid}, State) ->
Workers = queue:in(Pid, State#state.workers),
@@ -89,8 +95,16 @@ ready({checkout, Block, Timeout}, {FromPid, _}=From, State) ->
{{value, Pid}, Left} ->
Ref = erlang:monitor(process, FromPid),
Monitors = [{Pid, Ref} | State#state.monitors],
- {reply, Pid, ready, State#state{workers=Left,
- monitors=Monitors}};
+ NextState = case queue:len(Left) of
+ 0 when MaxOverflow == 0 ->
+ full;
+ 0 ->
+ overflow;
+ _ ->
+ ready
+ end,
+ {reply, Pid, NextState, State#state{workers=Left,
+ monitors=Monitors}};
{empty, Empty} when MaxOverflow > 0 ->
{Pid, Ref} = new_worker(Sup, FromPid, InitFun),
Monitors = [{Pid, Ref} | State#state.monitors],
@@ -107,14 +121,19 @@ ready({checkout, Block, Timeout}, {FromPid, _}=From, State) ->
ready(_Event, _From, State) ->
{reply, ok, ready, State}.
-overflow({checkin, Pid}, #state{overflow=1}=State) ->
- StopFun = State#state.worker_stop,
- dismiss_worker(Pid, StopFun),
+overflow({checkin, Pid}, #state{overflow=0}=State) ->
+ %StopFun = State#state.worker_stop,
+ %dismiss_worker(Pid, StopFun),
Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
{value, {_, Ref}, Left} -> erlang:demonitor(Ref), Left;
false -> []
end,
- {next_state, ready, State#state{overflow=0, monitors=Monitors}};
+ NextState = case State#state.size > 0 of
+ true -> ready;
+ _ -> overflow
+ end,
+ {next_state, NextState, State#state{overflow=0, monitors=Monitors,
+ workers=queue:in(Pid, State#state.workers)}};
overflow({checkin, Pid}, State) ->
#state{overflow=Overflow, worker_stop=StopFun} = State,
dismiss_worker(Pid, StopFun),
@@ -138,11 +157,17 @@ overflow({checkout, Block, Timeout}, From, #state{overflow=Overflow,
{next_state, full, State#state{waiting=add_waiting(From, Timeout, Waiting)}}
end;
overflow({checkout, _Block, _Timeout}, {From, _}, State) ->
- #state{worker_sup=Sup, overflow=Overflow, worker_init=InitFun} = State,
+ #state{worker_sup=Sup, overflow=Overflow,
+ worker_init=InitFun, max_overflow=MaxOverflow} = State,
{Pid, Ref} = new_worker(Sup, From, InitFun),
Monitors = [{Pid, Ref} | State#state.monitors],
- {reply, Pid, overflow, State#state{monitors=Monitors,
- overflow=Overflow+1}};
+ NewOverflow = Overflow + 1,
+ Next = case NewOverflow >= MaxOverflow of
+ true -> full;
+ _ -> overflow
+ end,
+ {reply, Pid, Next, State#state{monitors=Monitors,
+ overflow=NewOverflow}};
overflow(_Event, _From, State) ->
{reply, ok, overflow, State}.
@@ -204,6 +229,10 @@ handle_sync_event(stop, _From, _StateName,State) ->
Sup = State#state.worker_sup,
exit(Sup, shutdown),
{stop, normal, ok, State};
+handle_sync_event(status, _From, StateName, State) ->
+ {reply, {StateName, queue:len(State#state.workers),
+ State#state.overflow},
+ StateName, State};
handle_sync_event(_Event, _From, StateName, State) ->
Reply = {error, invalid_message},
{reply, Reply, StateName, State}.
@@ -231,8 +260,10 @@ handle_info({'EXIT', Pid, _}, StateName, State) ->
W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
{next_state, ready, State#state{workers=queue:in(new_worker(Sup, InitFun), W),
monitors=Monitors}};
- overflow when Overflow =< 1 ->
- {next_state, ready, State#state{monitors=Monitors, overflow=0}};
+ overflow when Overflow == 0 ->
+ W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
+ {next_state, ready, State#state{workers=queue:in(new_worker(Sup, InitFun), W),
+ monitors=Monitors}};
overflow ->
{next_state, overflow, State#state{monitors=Monitors,
overflow=Overflow-1}};
View
@@ -33,7 +33,9 @@ command(S) ->
[{call, ?MODULE, checkout_nonblock, [S#state.pid]} || S#state.pid /= undefined] ++
%% checkout shrinks to checkout_nonblock so we can simplify counterexamples
[{call, ?MODULE, ?SHRINK(checkout_block, [checkout_nonblock]), [S#state.pid]} || S#state.pid /= undefined] ++
- [{call, ?MODULE, checkin, [S#state.pid, elements(S#state.checked_out)]} || S#state.pid /= undefined, S#state.checked_out /= []]
+ [{call, ?MODULE, checkin, [S#state.pid, elements(S#state.checked_out)]} || S#state.pid /= undefined, S#state.checked_out /= []] ++
+ [{call, ?MODULE, kill_worker, [elements(S#state.checked_out)]} || S#state.pid /= undefined, S#state.checked_out /= []
+ ]
).
make_args(_S, Size, Overflow) ->
@@ -58,6 +60,10 @@ checkin(Pool, Worker) ->
gen_fsm:sync_send_all_state_event(Pool, get_avail_workers),
Res.
+kill_worker(Worker) ->
+ exit(Worker, kill),
+ timer:sleep(1).
+
precondition(S,{call,_,start_poolboy,_}) ->
%% only start new pool when old one is stopped
S#state.pid == undefined;
@@ -66,9 +72,34 @@ precondition(S,_) when S#state.pid == undefined ->
false;
precondition(S, {call, _, checkin, [_Pool, Pid]}) ->
lists:member(Pid, S#state.checked_out);
+precondition(S, {call, _, kill_worker, [Pid]}) ->
+ lists:member(Pid, S#state.checked_out);
precondition(_S,{call,_,_,_}) ->
true.
+%% XXX comment out for parallel mode XXX
+dynamic_precondition(S = #state{pid=Pid},_) when Pid /= undefined ->
+ State = if length(S#state.checked_out) == S#state.size + S#state.max_overflow ->
+ full;
+ length(S#state.checked_out) >= S#state.size ->
+ overflow;
+ true ->
+ ready
+ end,
+
+ Workers = max(0, S#state.size - length(S#state.checked_out)),
+ OverFlow = max(0, length(S#state.checked_out) - S#state.size),
+
+ RealStatus = gen_fsm:sync_send_all_state_event(Pid, status),
+ case RealStatus == {State, Workers, OverFlow} of
+ true ->
+ true;
+ _ ->
+ exit({wrong_state, RealStatus, {State, Workers, OverFlow}})
+ end;
+dynamic_precondition(_,_) ->
+ true.
+
postcondition(S,{call,_,checkout_block,[_Pool]},R) ->
case R of
{'EXIT', {timeout, _}} ->
@@ -112,9 +143,12 @@ next_state(S,V,{call,_,checkout_nonblock,_}) ->
S#state{checked_out=S#state.checked_out++[V]}
end;
next_state(S,_V,{call, _, checkin, [_Pool, Worker]}) ->
+ S#state{checked_out=S#state.checked_out -- [Worker]};
+next_state(S,_V,{call, _, kill_worker, [Worker]}) ->
S#state{checked_out=S#state.checked_out -- [Worker]}.
+
prop_sequential() ->
?FORALL(Cmds,commands(?MODULE),
?TRAPEXIT(

0 comments on commit c2ba14c

Please sign in to comment.