Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Add command to send poolboy EXIT messages for unrelated pids; fix bugs

Sending poolboy EXIT messages for pids that are not workers causes
invalid changes to poolboy's state. Also, legitimate processes dying
after being checked back in also would corrupt the internal state. Add
better checking in the EXIT handling code to guard against this.
  • Loading branch information...
commit e964cc52e6dbda45d7fdcddf76836a2d5703b042 1 parent eacf28f
@Vagabond Vagabond authored
Showing with 89 additions and 60 deletions.
  1. +69 −59 src/poolboy.erl
  2. +20 −1 test/poolboy_eqc.erl
View
128 src/poolboy.erl
@@ -251,65 +251,75 @@ handle_info({'EXIT', Pid, _}, StateName, State) ->
waiting = Waiting,
max_overflow = MaxOverflow,
worker_init = InitFun} = State,
- Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
- {value, {_, Ref}, Left} -> erlang:demonitor(Ref), Left;
- false -> State#state.monitors
- end,
- case StateName of
- ready ->
- W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
- {next_state, ready, State#state{workers=queue:in(new_worker(Sup, InitFun), W),
- monitors=Monitors}};
- overflow when Overflow == 0 ->
- W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
- {next_state, ready, State#state{workers=queue:in(new_worker(Sup, InitFun), W),
- monitors=Monitors}};
- overflow ->
- {next_state, overflow, State#state{monitors=Monitors,
- overflow=Overflow-1}};
- full when MaxOverflow < 1 ->
- case queue:out(Waiting) of
- {{value, {{FromPid, _}=From, Timeout, StartTime}}, LeftWaiting} ->
- case wait_valid(StartTime, Timeout) of
- true ->
- MonitorRef = erlang:monitor(process, FromPid),
- Monitors2 = [{FromPid, MonitorRef} | Monitors],
- gen_fsm:reply(From, new_worker(Sup, InitFun)),
- {next_state, full, State#state{waiting=LeftWaiting,
- monitors=Monitors2}};
- _ ->
- %% replay it
- handle_info({'EXIT', Pid, foo}, StateName, State#state{waiting=LeftWaiting})
- end;
- {empty, Empty} ->
- Workers2 = queue:in(new_worker(Sup, InitFun), State#state.workers),
- {next_state, ready, State#state{monitors=Monitors,
- waiting=Empty,
- workers=Workers2}}
- end;
- full when Overflow =< MaxOverflow ->
- case queue:out(Waiting) of
- {{value, {{FromPid, _}=From, Timeout, StartTime}}, LeftWaiting} ->
- case wait_valid(StartTime, Timeout) of
- true ->
- MonitorRef = erlang:monitor(process, FromPid),
- Monitors2 = [{FromPid, MonitorRef} | Monitors],
- NewWorker = new_worker(Sup, InitFun),
- gen_fsm:reply(From, NewWorker),
- {next_state, full, State#state{waiting=LeftWaiting,
- monitors=Monitors2}};
- _ ->
- %% replay it
- handle_info({'EXIT', Pid, foo}, StateName, State#state{waiting=LeftWaiting})
- end;
- {empty, Empty} ->
- {next_state, overflow, State#state{monitors=Monitors,
- overflow=Overflow-1,
- waiting=Empty}}
- end;
- full ->
- {next_state, full, State#state{monitors=Monitors,
- overflow=Overflow-1}}
+ case lists:keytake(Pid, 1, State#state.monitors) of
+ {value, {_, Ref}, Monitors} -> erlang:demonitor(Ref),
+ case StateName of
+ ready ->
+ W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
+ {next_state, ready, State#state{workers=queue:in(new_worker(Sup, InitFun), W),
+ monitors=Monitors}};
+ overflow when Overflow == 0 ->
+ W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
+ {next_state, ready, State#state{workers=queue:in(new_worker(Sup, InitFun), W),
+ monitors=Monitors}};
+ overflow ->
+ {next_state, overflow, State#state{monitors=Monitors,
+ overflow=Overflow-1}};
+ full when MaxOverflow < 1 ->
+ case queue:out(Waiting) of
+ {{value, {{FromPid, _}=From, Timeout, StartTime}}, LeftWaiting} ->
+ case wait_valid(StartTime, Timeout) of
+ true ->
+ MonitorRef = erlang:monitor(process, FromPid),
+ Monitors2 = [{FromPid, MonitorRef} | Monitors],
+ gen_fsm:reply(From, new_worker(Sup, InitFun)),
+ {next_state, full, State#state{waiting=LeftWaiting,
+ monitors=Monitors2}};
+ _ ->
+ %% replay it
+ handle_info({'EXIT', Pid, foo}, StateName, State#state{waiting=LeftWaiting})
+ end;
+ {empty, Empty} ->
+ Workers2 = queue:in(new_worker(Sup, InitFun), State#state.workers),
+ {next_state, ready, State#state{monitors=Monitors,
+ waiting=Empty,
+ workers=Workers2}}
+ end;
+ full when Overflow =< MaxOverflow ->
+ case queue:out(Waiting) of
+ {{value, {{FromPid, _}=From, Timeout, StartTime}}, LeftWaiting} ->
+ case wait_valid(StartTime, Timeout) of
+ true ->
+ MonitorRef = erlang:monitor(process, FromPid),
+ Monitors2 = [{FromPid, MonitorRef} | Monitors],
+ NewWorker = new_worker(Sup, InitFun),
+ gen_fsm:reply(From, NewWorker),
+ {next_state, full, State#state{waiting=LeftWaiting,
+ monitors=Monitors2}};
+ _ ->
+ %% replay it
+ handle_info({'EXIT', Pid, foo}, StateName, State#state{waiting=LeftWaiting})
+ end;
+ {empty, Empty} ->
+ {next_state, overflow, State#state{monitors=Monitors,
+ overflow=Overflow-1,
+ waiting=Empty}}
+ end;
+ full ->
+ {next_state, full, State#state{monitors=Monitors,
+ overflow=Overflow-1}}
+ end;
+ _ ->
+ %% not a monitored pid, is it in the worker queue?
+ case queue:member(Pid, State#state.workers) of
+ true ->
+ %% a checked in worker died
+ W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
+ {next_state, StateName, State#state{workers=queue:in(new_worker(Sup, InitFun), W)}};
+ _ ->
+ %% completely irrelevant pid exited, don't change anything
+ {next_state, StateName, State}
+ end
end;
handle_info(_Info, StateName, State) ->
{next_state, StateName, State}.
View
21 test/poolboy_eqc.erl
@@ -35,12 +35,25 @@ command(S) ->
[{call, ?MODULE, ?SHRINK(checkout_block, [checkout_nonblock]), [S#state.pid]} || S#state.pid /= undefined] ++
[{call, ?MODULE, checkin, [S#state.pid, elements(S#state.checked_out)]} || S#state.pid /= undefined, S#state.checked_out /= []] ++
[{call, ?MODULE, kill_worker, [elements(S#state.checked_out)]} || S#state.pid /= undefined, S#state.checked_out /= []] ++
- [{call, ?MODULE, kill_idle_worker, [S#state.pid]} || S#state.pid /= undefined]
+ [{call, ?MODULE, kill_idle_worker, [S#state.pid]} || S#state.pid /= undefined] ++
+ [{call, ?MODULE, spurious_exit, [S#state.pid]} || S#state.pid /= undefined]
).
make_args(_S, Size, Overflow) ->
[[{size, Size}, {max_overflow, Overflow}, {worker_module, poolboy_test_worker}, {name, {local, poolboy_eqc}}]].
+spawn_linked_process(Pool) ->
+ Parent = self(),
+ Pid = spawn(fun() ->
+ link(Pool),
+ Parent ! {linked, self()},
+ timer:sleep(5000)
+ end),
+ receive
+ {linked, Pid} ->
+ Pid
+ end.
+
start_poolboy(Args) ->
{ok, Pid} = poolboy:start_link(Args),
Pid.
@@ -75,6 +88,10 @@ kill_idle_worker(Pool) ->
kill_idle_worker(Pool)
end.
+spurious_exit(Pool) ->
+ Pid = spawn_linked_process(Pool),
+ exit(Pid, kill).
+
precondition(S,{call,_,start_poolboy,_}) ->
%% only start new pool when old one is stopped
S#state.pid == undefined;
@@ -161,6 +178,8 @@ next_state(S,_V,{call, _, checkin, [_Pool, Worker]}) ->
next_state(S,_V,{call, _, kill_worker, [Worker]}) ->
S#state{checked_out=S#state.checked_out -- [Worker]};
next_state(S,_V,{call, _, kill_idle_worker, [_Pool]}) ->
+ S;
+next_state(S,_V,{call, _, spurious_exit, [_Pool]}) ->
S.
prop_sequential() ->

0 comments on commit e964cc5

Please sign in to comment.
Something went wrong with that request. Please try again.