diff --git a/src/poolboy.erl b/src/poolboy.erl index d4671ab..2f52744 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -3,32 +3,29 @@ -module(poolboy). -behaviour(gen_fsm). --export([start_link/1, checkout/1, checkout/2, checkout/3, checkin/2, stop/1]). +-export([checkout/1, checkout/2, checkout/3, checkin/2, + child_spec/2, start_link/1, stop/1]). -export([init/1, ready/2, ready/3, overflow/2, overflow/3, full/2, full/3, handle_event/3, handle_sync_event/4, handle_info/3, terminate/3, code_change/4]). -define(TIMEOUT, 5000). --define(DEFAULT_INIT_FUN, fun(Worker) -> {ok, Worker} end). --define(DEFAULT_STOP_FUN, fun(Worker) -> Worker ! stop end). -record(state, { + supervisor :: pid(), workers :: queue(), - worker_sup :: pid(), - waiting = queue:new() :: queue(), - monitors = [] :: list(), + waiting :: queue(), + monitors :: ets:tid(), size = 5 :: non_neg_integer(), overflow = 0 :: non_neg_integer(), - max_overflow = 10 :: non_neg_integer(), - worker_init = ?DEFAULT_INIT_FUN :: fun((Worker :: pid()) -> {ok, pid()}), - worker_stop = ?DEFAULT_STOP_FUN :: fun((Worker :: pid()) -> stop) + max_overflow = 10 :: non_neg_integer() }). -spec checkout(Pool :: node()) -> pid(). checkout(Pool) -> checkout(Pool, true). --spec checkout(Pool :: node(), boolean()) -> pid() | full. +-spec checkout(Pool :: node(), Block :: boolean()) -> pid() | full. checkout(Pool, Block) -> checkout(Pool, Block, ?TIMEOUT). @@ -41,6 +38,13 @@ checkout(Pool, Block, Timeout) -> checkin(Pool, Worker) -> gen_fsm:send_event(Pool, {checkin, Worker}). +-spec child_spec(Pool :: node(), Args :: proplists:proplist()) -> + supervisor:child_spec(). +child_spec(Pool, Args) -> + {Pool, {poolboy, start_link, [Args]}, + permanent, 5000, worker, [poolboy]}. + +-spec start_link(Args :: proplists:proplist()) -> {ok, pid()}. start_link(Args) -> case proplists:get_value(name, Args) of undefined -> @@ -49,72 +53,67 @@ start_link(Args) -> gen_fsm:start_link(Name, ?MODULE, Args, []) end. +-spec stop(Pool :: node()) -> ok. stop(Pool) -> gen_fsm:sync_send_all_state_event(Pool, stop). init(Args) -> process_flag(trap_exit, true), - init(Args, #state{}). + Waiting = queue:new(), + Monitors = ets:new(monitors, [private]), + init(Args, #state{waiting=Waiting, monitors=Monitors}). init([{worker_module, Mod} | Rest], State) when is_atom(Mod) -> {ok, Sup} = poolboy_sup:start_link(Mod, Rest), - init(Rest, State#state{worker_sup=Sup}); + init(Rest, State#state{supervisor=Sup}); init([{size, Size} | Rest], State) when is_integer(Size) -> init(Rest, State#state{size=Size}); init([{max_overflow, MaxOverflow} | Rest], State) when is_integer(MaxOverflow) -> init(Rest, State#state{max_overflow=MaxOverflow}); -init([{init_fun, InitFun} | Rest], State) when is_function(InitFun) -> - init(Rest, State#state{worker_init=InitFun}); -init([{stop_fun, StopFun} | Rest], State) when is_function(StopFun) -> - init(Rest, State#state{worker_stop=StopFun}); init([_ | Rest], State) -> init(Rest, State); -init([], #state{size=Size, worker_sup=Sup, worker_init=InitFun, - max_overflow=MaxOverflow}=State) -> - Workers = prepopulate(Size, Sup, InitFun), - StartState = case queue:len(Workers) of - 0 when MaxOverflow =:= 0 -> full; - 0 -> overflow; - _ -> ready +init([], #state{size=Size, supervisor=Sup, max_overflow=MaxOverflow}=State) -> + Workers = prepopulate(Size, Sup), + StartState = case {Size, MaxOverflow} of + {0, 0} -> full; + {0, _} -> overflow; + {_, _} -> ready end, {ok, StartState, State#state{workers=Workers}}. ready({checkin, Pid}, State) -> - case lists:keytake(Pid, 1, State#state.monitors) of - {value, {_, Ref}, Monitors} -> - erlang:demonitor(Ref), + Monitors = State#state.monitors, + case ets:lookup(Monitors, Pid) of + [{Pid, Ref}] -> + true = erlang:demonitor(Ref), + true = ets:delete(Monitors, Pid), Workers = queue:in(Pid, State#state.workers), - {next_state, ready, State#state{workers=Workers, - monitors=Monitors}}; - false -> - %% unknown process checked in, ignore it + {next_state, ready, State#state{workers=Workers}}; + [] -> {next_state, ready, State} end; ready(_Event, State) -> {next_state, ready, State}. ready({checkout, Block, Timeout}, {FromPid, _}=From, State) -> - #state{workers = Workers, - worker_sup = Sup, - max_overflow = MaxOverflow, - worker_init = InitFun} = State, + #state{supervisor = Sup, + workers = Workers, + monitors = Monitors, + max_overflow = MaxOverflow} = State, case queue:out(Workers) of {{value, Pid}, Left} -> Ref = erlang:monitor(process, FromPid), - Monitors = [{Pid, Ref} | State#state.monitors], - NextState = case queue:len(Left) of - 0 when MaxOverflow =:= 0 -> full; - 0 -> overflow; - _ -> ready + true = ets:insert(Monitors, {Pid, Ref}), + NextState = case {queue:len(Left), MaxOverflow} of + {0, 0} -> full; + {0, _} -> overflow; + {_, _} -> ready end, - {reply, Pid, NextState, State#state{workers=Left, - monitors=Monitors}}; + {reply, Pid, NextState, State#state{workers=Left}}; {empty, Empty} when MaxOverflow > 0 -> - {Pid, Ref} = new_worker(Sup, FromPid, InitFun), - Monitors = [{Pid, Ref} | State#state.monitors], - {reply, Pid, overflow, State#state{workers=Empty, - monitors=Monitors, - overflow=1}}; + {Pid, Ref} = new_worker(Sup, FromPid), + true = ets:insert(Monitors, {Pid, Ref}), + {reply, Pid, overflow, State#state{workers=Empty, overflow=1}}; {empty, Empty} when Block =:= false -> {reply, full, full, State#state{workers=Empty}}; {empty, Empty} -> @@ -125,36 +124,37 @@ ready(_Event, _From, State) -> {reply, ok, ready, State}. overflow({checkin, Pid}, #state{overflow=0}=State) -> - case lists:keytake(Pid, 1, State#state.monitors) of - {value, {_, Ref}, Monitors} -> - erlang:demonitor(Ref), + Monitors = State#state.monitors, + case ets:lookup(Monitors, Pid) of + [{Pid, Ref}] -> + true = erlang:demonitor(Ref), + true = ets:delete(Monitors, Pid), NextState = case State#state.size > 0 of - true -> ready; - _ -> overflow + true -> ready; + false -> overflow end, Workers = queue:in(Pid, State#state.workers), - {next_state, NextState, State#state{overflow=0, monitors=Monitors, - workers=Workers}}; - false -> + {next_state, NextState, State#state{overflow=0, workers=Workers}}; + [] -> {next_state, overflow, State} end; overflow({checkin, Pid}, State) -> - #state{overflow=Overflow, worker_stop=StopFun} = State, - case lists:keytake(Pid, 1, State#state.monitors) of - {value, {_, Ref}, Monitors} -> - dismiss_worker(Pid, StopFun), - erlang:demonitor(Ref), - {next_state, overflow, State#state{overflow=Overflow-1, - monitors=Monitors}}; - _ -> + #state{supervisor=Sup, monitors=Monitors, overflow=Overflow} = State, + case ets:lookup(Monitors, Pid) of + [{Pid, Ref}] -> + ok = dismiss_worker(Sup, Pid), + true = erlang:demonitor(Ref), + true = ets:delete(Monitors, Pid), + {next_state, overflow, State#state{overflow=Overflow-1}}; + [] -> {next_state, overflow, State} end; overflow(_Event, State) -> {next_state, overflow, State}. -overflow({checkout, Block, Timeout}, From, #state{overflow=Overflow, - max_overflow=MaxOverflow - }=State) when Overflow >= MaxOverflow -> +overflow({checkout, Block, Timeout}, From, + #state{overflow=Overflow, + max_overflow=MaxOverflow}=State) when Overflow >= MaxOverflow -> case Block of false -> {reply, full, full, State}; @@ -163,51 +163,50 @@ overflow({checkout, Block, Timeout}, From, #state{overflow=Overflow, {next_state, full, State#state{waiting=Waiting}} end; overflow({checkout, _Block, _Timeout}, {From, _}, State) -> - #state{worker_sup=Sup, overflow=Overflow, - worker_init=InitFun, max_overflow=MaxOverflow} = State, - {Pid, Ref} = new_worker(Sup, From, InitFun), - Monitors = [{Pid, Ref} | State#state.monitors], - NewOverflow = Overflow + 1, - Next = case NewOverflow >= MaxOverflow of - true -> full; - _ -> overflow + #state{supervisor = Sup, + overflow = Overflow, + max_overflow = MaxOverflow} = State, + {Pid, Ref} = new_worker(Sup, From), + true = ets:insert(State#state.monitors, {Pid, Ref}), + NextState = case Overflow+1 >= MaxOverflow of + true -> full; + false -> overflow end, - {reply, Pid, Next, State#state{monitors=Monitors, - overflow=NewOverflow}}; + {reply, Pid, NextState, State#state{overflow=Overflow+1}}; overflow(_Event, _From, State) -> {reply, ok, overflow, State}. full({checkin, Pid}, State) -> - #state{waiting = Waiting, max_overflow = MaxOverflow, - overflow = Overflow, worker_stop = StopFun} = State, - case lists:keytake(Pid, 1, State#state.monitors) of - {value, {_, Ref0}, Monitors} -> - erlang:demonitor(Ref0), + #state{supervisor = Sup, + waiting = Waiting, + monitors = Monitors, + max_overflow = MaxOverflow, + overflow = Overflow} = State, + case ets:lookup(Monitors, Pid) of + [{Pid, Ref}] -> + true = erlang:demonitor(Ref), + true = ets:delete(Monitors, Pid), case queue:out(Waiting) of {{value, {{FromPid, _}=From, Timeout, StartTime}}, Left} -> case wait_valid(StartTime, Timeout) of true -> - Ref = erlang:monitor(process, FromPid), - Monitors1 = [{Pid, Ref} | Monitors], + Ref1 = erlang:monitor(process, FromPid), + true = ets:insert(Monitors, {Pid, Ref1}), gen_fsm:reply(From, Pid), - {next_state, full, State#state{waiting=Left, - monitors=Monitors1}}; - _ -> - %% replay this event with cleaned up waiting queue + {next_state, full, State#state{waiting=Left}}; + false -> full({checkin, Pid}, State#state{waiting=Left}) end; {empty, Empty} when MaxOverflow < 1 -> Workers = queue:in(Pid, State#state.workers), - {next_state, ready, State#state{workers=Workers, waiting=Empty, - monitors=Monitors}}; + {next_state, ready, State#state{workers=Workers, + waiting=Empty}}; {empty, Empty} -> - dismiss_worker(Pid, StopFun), + ok = dismiss_worker(Sup, Pid), {next_state, overflow, State#state{waiting=Empty, - monitors=Monitors, overflow=Overflow-1}} end; - false -> - %% unknown process checked in, ignore it + [] -> {next_state, full, State} end; full(_Event, State) -> @@ -224,76 +223,72 @@ full(_Event, _From, State) -> handle_event(_Event, StateName, State) -> {next_state, StateName, State}. +handle_sync_event(status, _From, StateName, State) -> + {reply, {StateName, queue:len(State#state.workers), State#state.overflow, + ets:info(State#state.monitors, size)}, + StateName, State}; handle_sync_event(get_avail_workers, _From, StateName, State) -> Workers = State#state.workers, WorkerList = queue:to_list(Workers), {reply, WorkerList, StateName, State}; handle_sync_event(get_all_workers, _From, StateName, State) -> - Sup = State#state.worker_sup, + Sup = State#state.supervisor, WorkerList = supervisor:which_children(Sup), {reply, WorkerList, StateName, State}; handle_sync_event(get_all_monitors, _From, StateName, State) -> - Monitors = State#state.monitors, + Monitors = ets:tab2list(State#state.monitors), {reply, Monitors, StateName, State}; -handle_sync_event(stop, _From, _StateName,State) -> - Sup = State#state.worker_sup, - exit(Sup, shutdown), +handle_sync_event(stop, _From, _StateName, State) -> + Sup = State#state.supervisor, + true = exit(Sup, shutdown), {stop, normal, ok, State}; -handle_sync_event(status, _From, StateName, State) -> - {reply, {StateName, queue:len(State#state.workers), - State#state.overflow, length(State#state.monitors)}, - StateName, State}; handle_sync_event(_Event, _From, StateName, State) -> Reply = {error, invalid_message}, {reply, Reply, StateName, State}. handle_info({'DOWN', Ref, _, _, _}, StateName, State) -> - case lists:keyfind(Ref, 2, State#state.monitors) of - {Pid, Ref} -> - exit(Pid, kill), + case ets:match(State#state.monitors, {'$1', Ref}) of + [[Pid]] -> + true = exit(Pid, kill), {next_state, StateName, State}; - false -> + [] -> {next_state, StateName, State} end; handle_info({'EXIT', Pid, Reason}, StateName, State) -> - #state{worker_sup = Sup, + #state{supervisor = Sup, overflow = Overflow, waiting = Waiting, - max_overflow = MaxOverflow, - worker_init = InitFun} = State, - case lists:keytake(Pid, 1, State#state.monitors) of - {value, {_, Ref}, Monitors} -> erlang:demonitor(Ref), + monitors = Monitors, + max_overflow = MaxOverflow} = State, + case ets:lookup(Monitors, Pid) of + [{Pid, Ref}] -> + true = erlang:demonitor(Ref), + true = ets:delete(Monitors, Pid), case StateName of ready -> W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers), - {next_state, ready, State#state{workers=queue:in(new_worker(Sup, InitFun), W), - monitors=Monitors}}; + {next_state, ready, State#state{workers=queue:in(new_worker(Sup), W)}}; overflow when Overflow =:= 0 -> W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers), - {next_state, ready, State#state{workers=queue:in(new_worker(Sup, InitFun), W), - monitors=Monitors}}; + {next_state, ready, State#state{workers=queue:in(new_worker(Sup), W)}}; overflow -> - {next_state, overflow, State#state{monitors=Monitors, - overflow=Overflow-1}}; + {next_state, overflow, State#state{overflow=Overflow-1}}; full when MaxOverflow < 1 -> case queue:out(Waiting) of {{value, {{FromPid, _}=From, Timeout, StartTime}}, LeftWaiting} -> case wait_valid(StartTime, Timeout) of true -> MonitorRef = erlang:monitor(process, FromPid), - NewWorker = new_worker(Sup, InitFun), - Monitors2 = [{NewWorker, MonitorRef} | Monitors], + NewWorker = new_worker(Sup), + true = ets:insert(Monitors, {NewWorker, MonitorRef}), gen_fsm:reply(From, NewWorker), - {next_state, full, State#state{waiting=LeftWaiting, - monitors=Monitors2}}; - _ -> - %% replay it + {next_state, full, State#state{waiting=LeftWaiting}}; + false -> handle_info({'EXIT', Pid, Reason}, StateName, State#state{waiting=LeftWaiting}) end; {empty, Empty} -> - Workers2 = queue:in(new_worker(Sup, InitFun), State#state.workers), - {next_state, ready, State#state{monitors=Monitors, - waiting=Empty, + Workers2 = queue:in(new_worker(Sup), State#state.workers), + {next_state, ready, State#state{waiting=Empty, workers=Workers2}} end; full when Overflow =< MaxOverflow -> @@ -302,68 +297,61 @@ handle_info({'EXIT', Pid, Reason}, StateName, State) -> case wait_valid(StartTime, Timeout) of true -> MonitorRef = erlang:monitor(process, FromPid), - NewWorker = new_worker(Sup, InitFun), - Monitors2 = [{NewWorker, MonitorRef} | Monitors], + NewWorker = new_worker(Sup), + true = ets:insert(Monitors, {NewWorker, MonitorRef}), gen_fsm:reply(From, NewWorker), - {next_state, full, State#state{waiting=LeftWaiting, - monitors=Monitors2}}; + {next_state, full, State#state{waiting=LeftWaiting}}; _ -> - %% replay it handle_info({'EXIT', Pid, Reason}, StateName, State#state{waiting=LeftWaiting}) end; {empty, Empty} -> - {next_state, overflow, State#state{monitors=Monitors, - overflow=Overflow-1, + {next_state, overflow, State#state{overflow=Overflow-1, waiting=Empty}} end; full -> - {next_state, full, State#state{monitors=Monitors, - overflow=Overflow-1}} + {next_state, full, State#state{overflow=Overflow-1}} end; - _ -> - %% not a monitored pid, is it in the worker queue? + [] -> case queue:member(Pid, State#state.workers) of true -> - %% a checked in worker died W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers), - {next_state, StateName, State#state{workers=queue:in(new_worker(Sup, InitFun), W)}}; - _ -> - %% completely irrelevant pid exited, don't change anything + {next_state, StateName, State#state{workers=queue:in(new_worker(Sup), W)}}; + false -> {next_state, StateName, State} end end; handle_info(_Info, StateName, State) -> {next_state, StateName, State}. -terminate(_Reason, _StateName, _State) -> ok. +terminate(_Reason, _StateName, _State) -> + ok. code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}. -new_worker(Sup, InitFun) -> +new_worker(Sup) -> {ok, Pid} = supervisor:start_child(Sup, []), - {ok, Pid1} = InitFun(Pid), - link(Pid1), - Pid1. + true = link(Pid), + Pid. -new_worker(Sup, FromPid, InitFun) -> - Pid = new_worker(Sup, InitFun), +new_worker(Sup, FromPid) -> + Pid = new_worker(Sup), Ref = erlang:monitor(process, FromPid), {Pid, Ref}. -dismiss_worker(Pid, StopFun) -> - unlink(Pid), - StopFun(Pid). +dismiss_worker(Sup, Pid) -> + true = unlink(Pid), + supervisor:terminate_child(Sup, Pid). -prepopulate(0, _, _) -> +prepopulate(0, _) -> queue:new(); -prepopulate(N, Sup, InitFun) -> - prepopulate(N, Sup, queue:new(), InitFun). +prepopulate(N, Sup) -> + prepopulate(N, Sup, queue:new()). -prepopulate(0, _, Workers, _) -> +prepopulate(0, _, Workers) -> Workers; -prepopulate(N, Sup, Workers, InitFun) -> - prepopulate(N-1, Sup, queue:in(new_worker(Sup, InitFun), Workers), InitFun). +prepopulate(N, Sup, Workers) -> + prepopulate(N-1, Sup, queue:in(new_worker(Sup), Workers)). add_waiting(Pid, Timeout, Queue) -> queue:in({Pid, Timeout, os:timestamp()}, Queue). diff --git a/src/poolboy_sup.erl b/src/poolboy_sup.erl index 0473afa..e6485a6 100644 --- a/src/poolboy_sup.erl +++ b/src/poolboy_sup.erl @@ -11,4 +11,4 @@ start_link(Mod, Args) -> init({Mod, Args}) -> {ok, {{simple_one_for_one, 0, 1}, [{Mod, {Mod, start_link, [Args]}, - temporary, brutal_kill, worker, [Mod]}]}}. + temporary, 5000, worker, [Mod]}]}}. diff --git a/test/poolboy_test_worker.erl b/test/poolboy_test_worker.erl index 613c03a..3d4b53f 100644 --- a/test/poolboy_test_worker.erl +++ b/test/poolboy_test_worker.erl @@ -21,8 +21,6 @@ handle_call(_Event, _From, State) -> handle_cast(_Event, State) -> {noreply, State}. -handle_info(stop, State) -> - {stop, shutdown, State}; handle_info({'EXIT', _, _}, State) -> {stop, shutdown, State}; handle_info(_Info, State) -> diff --git a/test/poolboy_tests.erl b/test/poolboy_tests.erl index e1d3dbc..a44ec61 100644 --- a/test/poolboy_tests.erl +++ b/test/poolboy_tests.erl @@ -47,12 +47,6 @@ pool_test_() -> }, {<<"Pool behaves on owner death">>, fun owner_death/0 - }, - {<<"Pool worker init function called when workers when created">>, - fun worker_init_fun/0 - }, - {<<"Pool worker stop function called on workers when destroyed">>, - fun worker_stop_fun/0 } ] }. @@ -368,39 +362,3 @@ owner_death() -> ?assertEqual(5, length(?sync(Pid, get_all_workers))), ?assertEqual(0, length(?sync(Pid, get_all_monitors))), ok = ?sync(Pid, stop). - -worker_init_fun() -> - Self = self(), - InitFun = fun(Worker) -> - Self ! worked, - {ok, Worker} - end, - {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}}, - {worker_module, poolboy_test_worker}, - {init_fun, InitFun}]), - poolboy:checkout(Pid), - receive - worked -> ?assert(true) - after - 1000 -> ?assert(false) - end, - ok = ?sync(Pid, stop). - -worker_stop_fun() -> - Self = self(), - StopFun = fun(Worker) -> - Self ! worked, - Worker ! stop - end, - {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}}, - {worker_module, poolboy_test_worker}, - {stop_fun, StopFun}, - {size, 0}, {max_overflow, 1}]), - Worker = poolboy:checkout(Pid), - checkin_worker(Pid, Worker), - receive - worked -> ?assert(true) - after - 1000 -> ?assert(false) - end, - ok = ?sync(Pid, stop).