Permalink
Browse files

Performance improvements and simplifications

- Monitors are now stored in an ETS table. Simple profiling revealed
  that a lot of time was being taken in `lists:keytake/3` looking up the
  refs by pid. This should provide O(1) constant time lookup.
- Workers are now no longer required to handle `stop` messages. Workers
  are terminated using `supervisor:terminate_child/2` and the shutdown
  strategy is no longer brutal_kill.
- Other various cleanups.
  • Loading branch information...
1 parent f2bfc2b commit c7fbad1328c492e1aa3c34795a4faf6d7618c0d2 @devinus committed Apr 27, 2012
Showing with 145 additions and 201 deletions.
  1. +144 −156 src/poolboy.erl
  2. +1 −1 src/poolboy_sup.erl
  3. +0 −2 test/poolboy_test_worker.erl
  4. +0 −42 test/poolboy_tests.erl
View
300 src/poolboy.erl
@@ -3,32 +3,29 @@
-module(poolboy).
-behaviour(gen_fsm).
--export([start_link/1, checkout/1, checkout/2, checkout/3, checkin/2, stop/1]).
+-export([checkout/1, checkout/2, checkout/3, checkin/2,
+ child_spec/2, start_link/1, stop/1]).
-export([init/1, ready/2, ready/3, overflow/2, overflow/3, full/2, full/3,
handle_event/3, handle_sync_event/4, handle_info/3, terminate/3,
code_change/4]).
-define(TIMEOUT, 5000).
--define(DEFAULT_INIT_FUN, fun(Worker) -> {ok, Worker} end).
--define(DEFAULT_STOP_FUN, fun(Worker) -> Worker ! stop end).
-record(state, {
+ supervisor :: pid(),
workers :: queue(),
- worker_sup :: pid(),
- waiting = queue:new() :: queue(),
- monitors = [] :: list(),
+ waiting :: queue(),
+ monitors :: ets:tid(),
size = 5 :: non_neg_integer(),
overflow = 0 :: non_neg_integer(),
- max_overflow = 10 :: non_neg_integer(),
- worker_init = ?DEFAULT_INIT_FUN :: fun((Worker :: pid()) -> {ok, pid()}),
- worker_stop = ?DEFAULT_STOP_FUN :: fun((Worker :: pid()) -> stop)
+ max_overflow = 10 :: non_neg_integer()
}).
-spec checkout(Pool :: node()) -> pid().
checkout(Pool) ->
checkout(Pool, true).
--spec checkout(Pool :: node(), boolean()) -> pid() | full.
+-spec checkout(Pool :: node(), Block :: boolean()) -> pid() | full.
checkout(Pool, Block) ->
checkout(Pool, Block, ?TIMEOUT).
@@ -41,6 +38,13 @@ checkout(Pool, Block, Timeout) ->
checkin(Pool, Worker) ->
gen_fsm:send_event(Pool, {checkin, Worker}).
+-spec child_spec(Pool :: node(), Args :: proplists:proplist()) ->
+ supervisor:child_spec().
+child_spec(Pool, Args) ->
+ {Pool, {poolboy, start_link, [Args]},
+ permanent, 5000, worker, [poolboy]}.
+
+-spec start_link(Args :: proplists:proplist()) -> {ok, pid()}.
start_link(Args) ->
case proplists:get_value(name, Args) of
undefined ->
@@ -49,72 +53,67 @@ start_link(Args) ->
gen_fsm:start_link(Name, ?MODULE, Args, [])
end.
+-spec stop(Pool :: node()) -> ok.
stop(Pool) ->
gen_fsm:sync_send_all_state_event(Pool, stop).
init(Args) ->
process_flag(trap_exit, true),
- init(Args, #state{}).
+ Waiting = queue:new(),
+ Monitors = ets:new(monitors, [private]),
+ init(Args, #state{waiting=Waiting, monitors=Monitors}).
init([{worker_module, Mod} | Rest], State) when is_atom(Mod) ->
{ok, Sup} = poolboy_sup:start_link(Mod, Rest),
- init(Rest, State#state{worker_sup=Sup});
+ init(Rest, State#state{supervisor=Sup});
init([{size, Size} | Rest], State) when is_integer(Size) ->
init(Rest, State#state{size=Size});
init([{max_overflow, MaxOverflow} | Rest], State) when is_integer(MaxOverflow) ->
init(Rest, State#state{max_overflow=MaxOverflow});
-init([{init_fun, InitFun} | Rest], State) when is_function(InitFun) ->
- init(Rest, State#state{worker_init=InitFun});
-init([{stop_fun, StopFun} | Rest], State) when is_function(StopFun) ->
- init(Rest, State#state{worker_stop=StopFun});
init([_ | Rest], State) ->
init(Rest, State);
-init([], #state{size=Size, worker_sup=Sup, worker_init=InitFun,
- max_overflow=MaxOverflow}=State) ->
- Workers = prepopulate(Size, Sup, InitFun),
- StartState = case queue:len(Workers) of
- 0 when MaxOverflow =:= 0 -> full;
- 0 -> overflow;
- _ -> ready
+init([], #state{size=Size, supervisor=Sup, max_overflow=MaxOverflow}=State) ->
+ Workers = prepopulate(Size, Sup),
+ StartState = case {Size, MaxOverflow} of
+ {0, 0} -> full;
+ {0, _} -> overflow;
+ {_, _} -> ready
end,
{ok, StartState, State#state{workers=Workers}}.
ready({checkin, Pid}, State) ->
- case lists:keytake(Pid, 1, State#state.monitors) of
- {value, {_, Ref}, Monitors} ->
- erlang:demonitor(Ref),
+ Monitors = State#state.monitors,
+ case ets:lookup(Monitors, Pid) of
+ [{Pid, Ref}] ->
+ true = erlang:demonitor(Ref),
+ true = ets:delete(Monitors, Pid),
Workers = queue:in(Pid, State#state.workers),
- {next_state, ready, State#state{workers=Workers,
- monitors=Monitors}};
- false ->
- %% unknown process checked in, ignore it
+ {next_state, ready, State#state{workers=Workers}};
+ [] ->
{next_state, ready, State}
end;
ready(_Event, State) ->
{next_state, ready, State}.
ready({checkout, Block, Timeout}, {FromPid, _}=From, State) ->
- #state{workers = Workers,
- worker_sup = Sup,
- max_overflow = MaxOverflow,
- worker_init = InitFun} = State,
+ #state{supervisor = Sup,
+ workers = Workers,
+ monitors = Monitors,
+ max_overflow = MaxOverflow} = State,
case queue:out(Workers) of
{{value, Pid}, Left} ->
Ref = erlang:monitor(process, FromPid),
- Monitors = [{Pid, Ref} | State#state.monitors],
- NextState = case queue:len(Left) of
- 0 when MaxOverflow =:= 0 -> full;
- 0 -> overflow;
- _ -> ready
+ true = ets:insert(Monitors, {Pid, Ref}),
+ NextState = case {queue:len(Left), MaxOverflow} of
+ {0, 0} -> full;
+ {0, _} -> overflow;
+ {_, _} -> ready
end,
- {reply, Pid, NextState, State#state{workers=Left,
- monitors=Monitors}};
+ {reply, Pid, NextState, State#state{workers=Left}};
{empty, Empty} when MaxOverflow > 0 ->
- {Pid, Ref} = new_worker(Sup, FromPid, InitFun),
- Monitors = [{Pid, Ref} | State#state.monitors],
- {reply, Pid, overflow, State#state{workers=Empty,
- monitors=Monitors,
- overflow=1}};
+ {Pid, Ref} = new_worker(Sup, FromPid),
+ true = ets:insert(Monitors, {Pid, Ref}),
+ {reply, Pid, overflow, State#state{workers=Empty, overflow=1}};
{empty, Empty} when Block =:= false ->
{reply, full, full, State#state{workers=Empty}};
{empty, Empty} ->
@@ -125,36 +124,37 @@ ready(_Event, _From, State) ->
{reply, ok, ready, State}.
overflow({checkin, Pid}, #state{overflow=0}=State) ->
- case lists:keytake(Pid, 1, State#state.monitors) of
- {value, {_, Ref}, Monitors} ->
- erlang:demonitor(Ref),
+ Monitors = State#state.monitors,
+ case ets:lookup(Monitors, Pid) of
+ [{Pid, Ref}] ->
+ true = erlang:demonitor(Ref),
+ true = ets:delete(Monitors, Pid),
NextState = case State#state.size > 0 of
- true -> ready;
- _ -> overflow
+ true -> ready;
+ false -> overflow
end,
Workers = queue:in(Pid, State#state.workers),
- {next_state, NextState, State#state{overflow=0, monitors=Monitors,
- workers=Workers}};
- false ->
+ {next_state, NextState, State#state{overflow=0, workers=Workers}};
+ [] ->
{next_state, overflow, State}
end;
overflow({checkin, Pid}, State) ->
- #state{overflow=Overflow, worker_stop=StopFun} = State,
- case lists:keytake(Pid, 1, State#state.monitors) of
- {value, {_, Ref}, Monitors} ->
- dismiss_worker(Pid, StopFun),
- erlang:demonitor(Ref),
- {next_state, overflow, State#state{overflow=Overflow-1,
- monitors=Monitors}};
- _ ->
+ #state{supervisor=Sup, monitors=Monitors, overflow=Overflow} = State,
+ case ets:lookup(Monitors, Pid) of
+ [{Pid, Ref}] ->
+ ok = dismiss_worker(Sup, Pid),
+ true = erlang:demonitor(Ref),
+ true = ets:delete(Monitors, Pid),
+ {next_state, overflow, State#state{overflow=Overflow-1}};
+ [] ->
{next_state, overflow, State}
end;
overflow(_Event, State) ->
{next_state, overflow, State}.
-overflow({checkout, Block, Timeout}, From, #state{overflow=Overflow,
- max_overflow=MaxOverflow
- }=State) when Overflow >= MaxOverflow ->
+overflow({checkout, Block, Timeout}, From,
+ #state{overflow=Overflow,
+ max_overflow=MaxOverflow}=State) when Overflow >= MaxOverflow ->
case Block of
false ->
{reply, full, full, State};
@@ -163,51 +163,50 @@ overflow({checkout, Block, Timeout}, From, #state{overflow=Overflow,
{next_state, full, State#state{waiting=Waiting}}
end;
overflow({checkout, _Block, _Timeout}, {From, _}, State) ->
- #state{worker_sup=Sup, overflow=Overflow,
- worker_init=InitFun, max_overflow=MaxOverflow} = State,
- {Pid, Ref} = new_worker(Sup, From, InitFun),
- Monitors = [{Pid, Ref} | State#state.monitors],
- NewOverflow = Overflow + 1,
- Next = case NewOverflow >= MaxOverflow of
- true -> full;
- _ -> overflow
+ #state{supervisor = Sup,
+ overflow = Overflow,
+ max_overflow = MaxOverflow} = State,
+ {Pid, Ref} = new_worker(Sup, From),
+ true = ets:insert(State#state.monitors, {Pid, Ref}),
+ NextState = case Overflow+1 >= MaxOverflow of
+ true -> full;
+ false -> overflow
end,
- {reply, Pid, Next, State#state{monitors=Monitors,
- overflow=NewOverflow}};
+ {reply, Pid, NextState, State#state{overflow=Overflow+1}};
overflow(_Event, _From, State) ->
{reply, ok, overflow, State}.
full({checkin, Pid}, State) ->
- #state{waiting = Waiting, max_overflow = MaxOverflow,
- overflow = Overflow, worker_stop = StopFun} = State,
- case lists:keytake(Pid, 1, State#state.monitors) of
- {value, {_, Ref0}, Monitors} ->
- erlang:demonitor(Ref0),
+ #state{supervisor = Sup,
+ waiting = Waiting,
+ monitors = Monitors,
+ max_overflow = MaxOverflow,
+ overflow = Overflow} = State,
+ case ets:lookup(Monitors, Pid) of
+ [{Pid, Ref}] ->
+ true = erlang:demonitor(Ref),
+ true = ets:delete(Monitors, Pid),
case queue:out(Waiting) of
{{value, {{FromPid, _}=From, Timeout, StartTime}}, Left} ->
case wait_valid(StartTime, Timeout) of
true ->
- Ref = erlang:monitor(process, FromPid),
- Monitors1 = [{Pid, Ref} | Monitors],
+ Ref1 = erlang:monitor(process, FromPid),
+ true = ets:insert(Monitors, {Pid, Ref1}),
gen_fsm:reply(From, Pid),
- {next_state, full, State#state{waiting=Left,
- monitors=Monitors1}};
- _ ->
- %% replay this event with cleaned up waiting queue
+ {next_state, full, State#state{waiting=Left}};
+ false ->
full({checkin, Pid}, State#state{waiting=Left})
end;
{empty, Empty} when MaxOverflow < 1 ->
Workers = queue:in(Pid, State#state.workers),
- {next_state, ready, State#state{workers=Workers, waiting=Empty,
- monitors=Monitors}};
+ {next_state, ready, State#state{workers=Workers,
+ waiting=Empty}};
{empty, Empty} ->
- dismiss_worker(Pid, StopFun),
+ ok = dismiss_worker(Sup, Pid),
{next_state, overflow, State#state{waiting=Empty,
- monitors=Monitors,
overflow=Overflow-1}}
end;
- false ->
- %% unknown process checked in, ignore it
+ [] ->
{next_state, full, State}
end;
full(_Event, State) ->
@@ -224,76 +223,72 @@ full(_Event, _From, State) ->
handle_event(_Event, StateName, State) ->
{next_state, StateName, State}.
+handle_sync_event(status, _From, StateName, State) ->
+ {reply, {StateName, queue:len(State#state.workers), State#state.overflow,
+ ets:info(State#state.monitors, size)},
+ StateName, State};
handle_sync_event(get_avail_workers, _From, StateName, State) ->
Workers = State#state.workers,
WorkerList = queue:to_list(Workers),
{reply, WorkerList, StateName, State};
handle_sync_event(get_all_workers, _From, StateName, State) ->
- Sup = State#state.worker_sup,
+ Sup = State#state.supervisor,
WorkerList = supervisor:which_children(Sup),
{reply, WorkerList, StateName, State};
handle_sync_event(get_all_monitors, _From, StateName, State) ->
- Monitors = State#state.monitors,
+ Monitors = ets:tab2list(State#state.monitors),
{reply, Monitors, StateName, State};
-handle_sync_event(stop, _From, _StateName,State) ->
- Sup = State#state.worker_sup,
- exit(Sup, shutdown),
+handle_sync_event(stop, _From, _StateName, State) ->
+ Sup = State#state.supervisor,
+ true = exit(Sup, shutdown),
{stop, normal, ok, State};
-handle_sync_event(status, _From, StateName, State) ->
- {reply, {StateName, queue:len(State#state.workers),
- State#state.overflow, length(State#state.monitors)},
- StateName, State};
handle_sync_event(_Event, _From, StateName, State) ->
Reply = {error, invalid_message},
{reply, Reply, StateName, State}.
handle_info({'DOWN', Ref, _, _, _}, StateName, State) ->
- case lists:keyfind(Ref, 2, State#state.monitors) of
- {Pid, Ref} ->
- exit(Pid, kill),
+ case ets:match(State#state.monitors, {'$1', Ref}) of
+ [[Pid]] ->
+ true = exit(Pid, kill),
{next_state, StateName, State};
- false ->
+ [] ->
{next_state, StateName, State}
end;
handle_info({'EXIT', Pid, Reason}, StateName, State) ->
- #state{worker_sup = Sup,
+ #state{supervisor = Sup,
overflow = Overflow,
waiting = Waiting,
- max_overflow = MaxOverflow,
- worker_init = InitFun} = State,
- case lists:keytake(Pid, 1, State#state.monitors) of
- {value, {_, Ref}, Monitors} -> erlang:demonitor(Ref),
+ monitors = Monitors,
+ max_overflow = MaxOverflow} = State,
+ case ets:lookup(Monitors, Pid) of
+ [{Pid, Ref}] ->
+ true = erlang:demonitor(Ref),
+ true = ets:delete(Monitors, Pid),
case StateName of
ready ->
W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
- {next_state, ready, State#state{workers=queue:in(new_worker(Sup, InitFun), W),
- monitors=Monitors}};
+ {next_state, ready, State#state{workers=queue:in(new_worker(Sup), W)}};
overflow when Overflow =:= 0 ->
W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
- {next_state, ready, State#state{workers=queue:in(new_worker(Sup, InitFun), W),
- monitors=Monitors}};
+ {next_state, ready, State#state{workers=queue:in(new_worker(Sup), W)}};
overflow ->
- {next_state, overflow, State#state{monitors=Monitors,
- overflow=Overflow-1}};
+ {next_state, overflow, State#state{overflow=Overflow-1}};
full when MaxOverflow < 1 ->
case queue:out(Waiting) of
{{value, {{FromPid, _}=From, Timeout, StartTime}}, LeftWaiting} ->
case wait_valid(StartTime, Timeout) of
true ->
MonitorRef = erlang:monitor(process, FromPid),
- NewWorker = new_worker(Sup, InitFun),
- Monitors2 = [{NewWorker, MonitorRef} | Monitors],
+ NewWorker = new_worker(Sup),
+ true = ets:insert(Monitors, {NewWorker, MonitorRef}),
gen_fsm:reply(From, NewWorker),
- {next_state, full, State#state{waiting=LeftWaiting,
- monitors=Monitors2}};
- _ ->
- %% replay it
+ {next_state, full, State#state{waiting=LeftWaiting}};
+ false ->
handle_info({'EXIT', Pid, Reason}, StateName, State#state{waiting=LeftWaiting})
end;
{empty, Empty} ->
- Workers2 = queue:in(new_worker(Sup, InitFun), State#state.workers),
- {next_state, ready, State#state{monitors=Monitors,
- waiting=Empty,
+ Workers2 = queue:in(new_worker(Sup), State#state.workers),
+ {next_state, ready, State#state{waiting=Empty,
workers=Workers2}}
end;
full when Overflow =< MaxOverflow ->
@@ -302,68 +297,61 @@ handle_info({'EXIT', Pid, Reason}, StateName, State) ->
case wait_valid(StartTime, Timeout) of
true ->
MonitorRef = erlang:monitor(process, FromPid),
- NewWorker = new_worker(Sup, InitFun),
- Monitors2 = [{NewWorker, MonitorRef} | Monitors],
+ NewWorker = new_worker(Sup),
+ true = ets:insert(Monitors, {NewWorker, MonitorRef}),
gen_fsm:reply(From, NewWorker),
- {next_state, full, State#state{waiting=LeftWaiting,
- monitors=Monitors2}};
+ {next_state, full, State#state{waiting=LeftWaiting}};
_ ->
- %% replay it
handle_info({'EXIT', Pid, Reason}, StateName, State#state{waiting=LeftWaiting})
end;
{empty, Empty} ->
- {next_state, overflow, State#state{monitors=Monitors,
- overflow=Overflow-1,
+ {next_state, overflow, State#state{overflow=Overflow-1,
waiting=Empty}}
end;
full ->
- {next_state, full, State#state{monitors=Monitors,
- overflow=Overflow-1}}
+ {next_state, full, State#state{overflow=Overflow-1}}
end;
- _ ->
- %% not a monitored pid, is it in the worker queue?
+ [] ->
case queue:member(Pid, State#state.workers) of
true ->
- %% a checked in worker died
W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
- {next_state, StateName, State#state{workers=queue:in(new_worker(Sup, InitFun), W)}};
- _ ->
- %% completely irrelevant pid exited, don't change anything
+ {next_state, StateName, State#state{workers=queue:in(new_worker(Sup), W)}};
+ false ->
{next_state, StateName, State}
end
end;
handle_info(_Info, StateName, State) ->
{next_state, StateName, State}.
-terminate(_Reason, _StateName, _State) -> ok.
+terminate(_Reason, _StateName, _State) ->
+ ok.
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.
-new_worker(Sup, InitFun) ->
+new_worker(Sup) ->
{ok, Pid} = supervisor:start_child(Sup, []),
- {ok, Pid1} = InitFun(Pid),
- link(Pid1),
- Pid1.
+ true = link(Pid),
+ Pid.
-new_worker(Sup, FromPid, InitFun) ->
- Pid = new_worker(Sup, InitFun),
+new_worker(Sup, FromPid) ->
+ Pid = new_worker(Sup),
Ref = erlang:monitor(process, FromPid),
{Pid, Ref}.
-dismiss_worker(Pid, StopFun) ->
- unlink(Pid),
- StopFun(Pid).
+dismiss_worker(Sup, Pid) ->
+ true = unlink(Pid),
+ supervisor:terminate_child(Sup, Pid).
-prepopulate(0, _, _) ->
+prepopulate(0, _) ->
queue:new();
-prepopulate(N, Sup, InitFun) ->
- prepopulate(N, Sup, queue:new(), InitFun).
+prepopulate(N, Sup) ->
+ prepopulate(N, Sup, queue:new()).
-prepopulate(0, _, Workers, _) ->
+prepopulate(0, _, Workers) ->
Workers;
-prepopulate(N, Sup, Workers, InitFun) ->
- prepopulate(N-1, Sup, queue:in(new_worker(Sup, InitFun), Workers), InitFun).
+prepopulate(N, Sup, Workers) ->
+ prepopulate(N-1, Sup, queue:in(new_worker(Sup), Workers)).
add_waiting(Pid, Timeout, Queue) ->
queue:in({Pid, Timeout, os:timestamp()}, Queue).
View
2 src/poolboy_sup.erl
@@ -11,4 +11,4 @@ start_link(Mod, Args) ->
init({Mod, Args}) ->
{ok, {{simple_one_for_one, 0, 1},
[{Mod, {Mod, start_link, [Args]},
- temporary, brutal_kill, worker, [Mod]}]}}.
+ temporary, 5000, worker, [Mod]}]}}.
View
2 test/poolboy_test_worker.erl
@@ -21,8 +21,6 @@ handle_call(_Event, _From, State) ->
handle_cast(_Event, State) ->
{noreply, State}.
-handle_info(stop, State) ->
- {stop, shutdown, State};
handle_info({'EXIT', _, _}, State) ->
{stop, shutdown, State};
handle_info(_Info, State) ->
View
42 test/poolboy_tests.erl
@@ -47,12 +47,6 @@ pool_test_() ->
},
{<<"Pool behaves on owner death">>,
fun owner_death/0
- },
- {<<"Pool worker init function called when workers when created">>,
- fun worker_init_fun/0
- },
- {<<"Pool worker stop function called on workers when destroyed">>,
- fun worker_stop_fun/0
}
]
}.
@@ -368,39 +362,3 @@ owner_death() ->
?assertEqual(5, length(?sync(Pid, get_all_workers))),
?assertEqual(0, length(?sync(Pid, get_all_monitors))),
ok = ?sync(Pid, stop).
-
-worker_init_fun() ->
- Self = self(),
- InitFun = fun(Worker) ->
- Self ! worked,
- {ok, Worker}
- end,
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {init_fun, InitFun}]),
- poolboy:checkout(Pid),
- receive
- worked -> ?assert(true)
- after
- 1000 -> ?assert(false)
- end,
- ok = ?sync(Pid, stop).
-
-worker_stop_fun() ->
- Self = self(),
- StopFun = fun(Worker) ->
- Self ! worked,
- Worker ! stop
- end,
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {stop_fun, StopFun},
- {size, 0}, {max_overflow, 1}]),
- Worker = poolboy:checkout(Pid),
- checkin_worker(Pid, Worker),
- receive
- worked -> ?assert(true)
- after
- 1000 -> ?assert(false)
- end,
- ok = ?sync(Pid, stop).

0 comments on commit c7fbad1

Please sign in to comment.