Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Kill orphaned workers and prune monitor state space. #4

Merged
merged 3 commits into from

2 participants

@skeltoac

If a process checks out a worker and then dies, the DOWN message dismisses the worker without regard for the pool state. This can leave a pool with too few workers. There are also places where monitors are not removed from state. This branch fixes both problems, adds a new test, and adds monitor state checking to other tests.

@skeltoac

Sorry for the mess... I'm still green at git.

@skeltoac skeltoac Fix demonitored pids being left in state space and check monitor list…
… length in all tests. Add user_death test to check that an orphaned worker is killed when a monitored user is reported 'DOWN'.
d27c012
@devinus
Owner

Looking into this as well...

@devinus devinus merged commit d27c012 into devinus:master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Sep 26, 2011
  1. @skeltoac
  2. @skeltoac
Commits on Sep 27, 2011
  1. @skeltoac

    Fix demonitored pids being left in state space and check monitor list…

    skeltoac authored
    … length in all tests. Add user_death test to check that an orphaned worker is killed when a monitored user is reported 'DOWN'.
This page is out of date. Refresh to see the latest.
Showing with 71 additions and 16 deletions.
  1. +28 −16 src/poolboy.erl
  2. +43 −0 test/poolboy_tests.erl
View
44 src/poolboy.erl
@@ -47,7 +47,7 @@ ready({checkin, Pid}, State) ->
Workers = queue:in(Pid, State#state.workers),
Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
{value, {_, Ref}, Left} -> erlang:demonitor(Ref), Left;
- false -> []
+ false -> State#state.monitors
end,
{next_state, ready, State#state{workers=Workers, monitors=Monitors}};
ready(_Event, State) ->
@@ -80,11 +80,19 @@ ready(_Event, _From, State) ->
{reply, ok, ready, State}.
overflow({checkin, Pid}, #state{overflow=1}=State) ->
- dismiss_worker(Pid),
- {next_state, ready, State#state{overflow=0}};
+ dismiss_worker(Pid), %% TODO add demonitor to all checkins and check get_all_monitors in other tests!
+ Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
+ {value, {_, Ref}, Left} -> erlang:demonitor(Ref), Left;
+ false -> []
+ end,
+ {next_state, ready, State#state{overflow=0, monitors=Monitors}};
overflow({checkin, Pid}, #state{overflow=Overflow}=State) ->
dismiss_worker(Pid),
- {next_state, overflow, State#state{overflow=Overflow-1}};
+ Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
+ {value, {_, Ref}, Left} -> erlang:demonitor(Ref), Left;
+ false -> State#state.monitors
+ end,
+ {next_state, overflow, State#state{overflow=Overflow-1, monitors=Monitors}};
overflow(_Event, State) ->
{next_state, overflow, State}.
@@ -109,25 +117,25 @@ overflow(_Event, _From, State) ->
full({checkin, Pid}, #state{waiting=Waiting, max_overflow=MaxOverflow,
overflow=Overflow}=State) ->
+ Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
+ {value, {_, Ref0}, Left0} -> erlang:demonitor(Ref0), Left0;
+ false -> State#state.monitors
+ end,
case queue:out(Waiting) of
{{value, {FromPid, _}=From}, Left} ->
Ref = erlang:monitor(process, FromPid),
- Monitors = [{Pid, Ref} | State#state.monitors],
+ Monitors1 = [{Pid, Ref} | Monitors],
gen_fsm:reply(From, Pid),
{next_state, full, State#state{waiting=Left,
- monitors=Monitors}};
+ monitors=Monitors1}};
{empty, Empty} when MaxOverflow < 1 ->
Workers = queue:in(Pid, State#state.workers),
- Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
- {value, {_, Ref}, Left} -> erlang:demonitor(Ref), Left;
- false -> []
- end,
{next_state, ready, State#state{workers=Workers, waiting=Empty,
monitors=Monitors}};
{empty, Empty} ->
dismiss_worker(Pid),
{next_state, overflow, State#state{waiting=Empty,
- overflow=Overflow-1}}
+ overflow=Overflow-1, monitors=Monitors}}
end;
full(_Event, State) ->
{next_state, full, State}.
@@ -148,6 +156,8 @@ handle_sync_event(get_avail_workers, _From, StateName, #state{workers=Workers}=S
handle_sync_event(get_all_workers, _From, StateName, #state{worker_sup=Sup}=State) ->
WorkerList = supervisor:which_children(Sup),
{reply, WorkerList, StateName, State};
+handle_sync_event(get_all_monitors, _From, StateName, #state{monitors=Monitors}=State) ->
+ {reply, Monitors, StateName, State};
handle_sync_event(stop, _From, _StateName, State) ->
{stop, normal, ok, State};
handle_sync_event(_Event, _From, StateName, State) ->
@@ -155,11 +165,13 @@ handle_sync_event(_Event, _From, StateName, State) ->
{reply, Reply, StateName, State}.
handle_info({'DOWN', Ref, _, _, _}, StateName, State) ->
- Monitors = case lists:keytake(Ref, 2, State#state.monitors) of
- {value, {Pid, _}, Left} -> dismiss_worker(Pid), Left;
- false -> State#state.monitors
- end,
- {next_state, StateName, State#state{monitors=Monitors}};
+ case lists:keyfind(Ref, 2, State#state.monitors) of
+ {Pid, Ref} ->
+ exit(Pid, kill),
+ {next_state, StateName, State};
+ false ->
+ {next_state, StateName, State}
+ end;
handle_info({'EXIT', Pid, _}, StateName, #state{worker_sup=Sup,
overflow=Overflow,
waiting=Waiting,
View
43 test/poolboy_tests.erl
@@ -42,6 +42,9 @@ pool_test_() ->
},
{"Non-blocking pool behaves when it's full",
fun pool_full_nonblocking/0
+ },
+ {"Pool behaves right on user death",
+ fun user_death/0
}
]
}.
@@ -78,6 +81,8 @@ pool_startup() ->
checkin_worker(Pid, Worker),
?assertEqual(9, length(gen_fsm:sync_send_all_state_event(Pid,
get_avail_workers))),
+ ?assertEqual(1, length(gen_fsm:sync_send_all_state_event(Pid,
+ get_all_monitors))),
ok = gen_fsm:sync_send_all_state_event(Pid, stop).
pool_overflow() ->
@@ -113,6 +118,8 @@ pool_overflow() ->
get_avail_workers))),
?assertEqual(5, length(gen_fsm:sync_send_all_state_event(Pid,
get_all_workers))),
+ ?assertEqual(0, length(gen_fsm:sync_send_all_state_event(Pid,
+ get_all_monitors))),
ok = gen_fsm:sync_send_all_state_event(Pid, stop).
pool_empty() ->
@@ -167,6 +174,8 @@ pool_empty() ->
get_avail_workers))),
?assertEqual(5, length(gen_fsm:sync_send_all_state_event(Pid,
get_all_workers))),
+ ?assertEqual(0, length(gen_fsm:sync_send_all_state_event(Pid,
+ get_all_monitors))),
ok = gen_fsm:sync_send_all_state_event(Pid, stop).
pool_empty_no_overflow() ->
@@ -215,6 +224,8 @@ pool_empty_no_overflow() ->
get_avail_workers))),
?assertEqual(5, length(gen_fsm:sync_send_all_state_event(Pid,
get_all_workers))),
+ ?assertEqual(0, length(gen_fsm:sync_send_all_state_event(Pid,
+ get_all_monitors))),
ok = gen_fsm:sync_send_all_state_event(Pid, stop).
@@ -244,6 +255,8 @@ worker_death() ->
get_avail_workers))),
?assertEqual(5, length(gen_fsm:sync_send_all_state_event(Pid,
get_all_workers))),
+ ?assertEqual(4, length(gen_fsm:sync_send_all_state_event(Pid,
+ get_all_monitors))),
ok = gen_fsm:sync_send_all_state_event(Pid, stop).
worker_death_while_full() ->
@@ -287,6 +300,8 @@ worker_death_while_full() ->
get_avail_workers))),
?assertEqual(6, length(gen_fsm:sync_send_all_state_event(Pid,
get_all_workers))),
+ ?assertEqual(6, length(gen_fsm:sync_send_all_state_event(Pid,
+ get_all_monitors))),
ok = gen_fsm:sync_send_all_state_event(Pid, stop).
@@ -335,6 +350,8 @@ worker_death_while_full_no_overflow() ->
get_avail_workers))),
?assertEqual(5, length(gen_fsm:sync_send_all_state_event(Pid,
get_all_workers))),
+ ?assertEqual(3, length(gen_fsm:sync_send_all_state_event(Pid,
+ get_all_monitors))),
ok = gen_fsm:sync_send_all_state_event(Pid, stop).
@@ -354,6 +371,8 @@ pool_full_nonblocking_no_overflow() ->
A = hd(Workers),
checkin_worker(Pid, A),
?assertEqual(A, poolboy:checkout(Pid)),
+ ?assertEqual(5, length(gen_fsm:sync_send_all_state_event(Pid,
+ get_all_monitors))),
ok = gen_fsm:sync_send_all_state_event(Pid, stop).
pool_full_nonblocking() ->
@@ -374,7 +393,31 @@ pool_full_nonblocking() ->
?assertEqual(false, is_process_alive(A)), %% overflow workers get shut down
?assert(is_pid(NewWorker)),
?assertEqual(full, poolboy:checkout(Pid)),
+ ?assertEqual(10, length(gen_fsm:sync_send_all_state_event(Pid,
+ get_all_monitors))),
+ ok = gen_fsm:sync_send_all_state_event(Pid, stop).
+
+user_death() ->
+ %% check that a dead user (a process that died with a worker checked out)
+ %% causes the pool to dismiss the worker and prune the state space.
+ {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
+ {worker_module, poolboy_test_worker},
+ {size, 5}, {max_overflow, 5}, {checkout_blocks, false}]),
+ spawn(fun() ->
+ %% you'll have to pry it from my cold, dead hands
+ poolboy:checkout(Pid),
+ receive after 500 -> exit(normal) end
+ end),
+ %% on a long enough timeline, the survival rate for everyone drops to zero.
+ receive after 1000 -> ok end,
+ ?assertEqual(5, length(gen_fsm:sync_send_all_state_event(Pid,
+ get_avail_workers))),
+ ?assertEqual(5, length(gen_fsm:sync_send_all_state_event(Pid,
+ get_all_workers))),
+ ?assertEqual(0, length(gen_fsm:sync_send_all_state_event(Pid,
+ get_all_monitors))),
ok = gen_fsm:sync_send_all_state_event(Pid, stop).
+
-endif.
Something went wrong with that request. Please try again.