Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'master' of github.com:devinus/poolboy

Conflicts:
	.travis.yml
	README.md
  • Loading branch information...
commit 914b4f42097d5c5610458d09b194febdb338f9c5 2 parents b325371 + 69c8ef1
@Vagabond Vagabond authored
View
68 README.md
@@ -6,15 +6,20 @@ Poolboy - A hunky Erlang worker pool factory
Usage
-----
-```erlang
-Worker = poolboy:checkout(PoolName),
-Reply = gen_server:call(Worker, WorkerFun),
-poolboy:checkin(PoolName, Worker),
-Reply.
+```erl-sh
+1> Worker = poolboy:checkout(PoolName).
+<0.9001.0>
+2> gen_server:call(Worker, Request).
+ok
+3> poolboy:checkin(PoolName, Worker).
+ok
```
-Example Application
--------------------
+Example
+-------
+
+This is an example application showcasing database connection pools using
+Poolboy and Will Glozer's [epgsql](https://github.com/wg/epgsql).
### example.app
@@ -56,14 +61,21 @@ Example Application
-behaviour(application).
-behaviour(supervisor).
--export([start/0, stop/0, start/2, stop/1, init/1, squery/2, equery/3]).
+-export([start/0, stop/0, squery/2, equery/3]).
+-export([start/2, stop/1]).
+-export([init/1]).
-start() -> application:start(?MODULE).
-stop() -> application:stop(?MODULE).
+start() ->
+ application:start(?MODULE).
+
+stop() ->
+ application:stop(?MODULE).
start(_Type, _Args) ->
supervisor:start_link({local, example_sup}, ?MODULE, []).
-stop(_State) -> ok.
+
+stop(_State) ->
+ ok.
init([]) ->
{ok, Pools} = application:get_env(example, pools),
@@ -71,22 +83,19 @@ init([]) ->
Args = [{name, {local, PoolName}},
{worker_module, example_worker}]
++ PoolConfig,
- {PoolName, {poolboy, start_link, [Args]},
- permanent, 5000, worker, [poolboy]}
+ poolboy:child_spec(PoolName, Args)
end, Pools),
{ok, {{one_for_one, 10, 10}, PoolSpecs}}.
squery(PoolName, Sql) ->
- Worker = poolboy:checkout(PoolName),
- Reply = gen_server:call(Worker, {squery, Sql}),
- poolboy:checkin(PoolName, Worker),
- Reply.
+ poolboy:transaction(PoolName, fun(Worker) ->
+ gen_server:call(Worker, {squery, Sql})
+ end).
equery(PoolName, Stmt, Params) ->
- Worker = poolboy:checkout(PoolName),
- Reply = gen_server:call(Worker, {equery, Stmt, Params}),
- poolboy:checkin(PoolName, Worker),
- Reply.
+ poolboy:transaction(PoolName, fun(Worker) ->
+ gen_server:call(Worker, {equery, Stmt, Params})
+ end).
```
### example_worker.erl
@@ -94,17 +103,18 @@ equery(PoolName, Stmt, Params) ->
```erlang
-module(example_worker).
-behaviour(gen_server).
+-behaviour(poolboy_worker).
--export([start_link/1, stop/0, init/1, handle_call/3, handle_cast/2,
- handle_info/2, terminate/2, code_change/3]).
+-export([start_link/1]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
-record(state, {conn}).
-start_link(Args) -> gen_server:start_link(?MODULE, Args, []).
-stop() -> gen_server:cast(?MODULE, stop).
+start_link(Args) ->
+ gen_server:start_link(?MODULE, Args, []).
init(Args) ->
- process_flag(trap_exit, true),
Hostname = proplists:get_value(hostname, Args),
Database = proplists:get_value(database, Args),
Username = proplists:get_value(username, Args),
@@ -124,15 +134,11 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info(stop, State) ->
- {stop, shutdown, State};
-handle_info({'EXIT', _, _}, State) ->
- {stop, shutdown, State};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #state{conn=Conn}) ->
- pgsql:close(Conn),
+ ok = pgsql:close(Conn),
ok.
code_change(_OldVsn, State, _Extra) ->
View
BIN  rebar
Binary file not shown
View
4 src/poolboy.app.src
@@ -1,6 +1,6 @@
{application, poolboy, [
{description, "A hunky Erlang worker pool factory"},
- {vsn, "0.6.1"},
+ {vsn, "0.8.1"},
{applications, [kernel, stdlib]},
- {registered, []}
+ {registered, [poolboy]}
]}.
View
460 src/poolboy.erl
@@ -3,32 +3,29 @@
-module(poolboy).
-behaviour(gen_fsm).
--export([start_link/1, checkout/1, checkout/2, checkout/3, checkin/2, stop/1]).
+-export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2,
+ child_spec/2, start_link/1, stop/1, status/1]).
-export([init/1, ready/2, ready/3, overflow/2, overflow/3, full/2, full/3,
handle_event/3, handle_sync_event/4, handle_info/3, terminate/3,
code_change/4]).
-define(TIMEOUT, 5000).
--define(DEFAULT_INIT_FUN, fun(Worker) -> {ok, Worker} end).
--define(DEFAULT_STOP_FUN, fun(Worker) -> Worker ! stop end).
-record(state, {
+ supervisor :: pid(),
workers :: queue(),
- worker_sup :: pid(),
- waiting = queue:new() :: queue(),
- monitors = [] :: list(),
+ waiting :: queue(),
+ monitors :: ets:tid(),
size = 5 :: non_neg_integer(),
overflow = 0 :: non_neg_integer(),
- max_overflow = 10 :: non_neg_integer(),
- worker_init = ?DEFAULT_INIT_FUN :: fun((Worker :: pid()) -> {ok, pid()}),
- worker_stop = ?DEFAULT_STOP_FUN :: fun((Worker :: pid()) -> stop)
+ max_overflow = 10 :: non_neg_integer()
}).
-spec checkout(Pool :: node()) -> pid().
checkout(Pool) ->
checkout(Pool, true).
--spec checkout(Pool :: node(), boolean()) -> pid() | full.
+-spec checkout(Pool :: node(), Block :: boolean()) -> pid() | full.
checkout(Pool, Block) ->
checkout(Pool, Block, ?TIMEOUT).
@@ -38,10 +35,27 @@ checkout(Pool, Block, Timeout) ->
gen_fsm:sync_send_event(Pool, {checkout, Block, Timeout}, Timeout).
-spec checkin(Pool :: node(), Worker :: pid()) -> ok.
-checkin(Pool, Worker) ->
+checkin(Pool, Worker) when is_pid(Worker) ->
gen_fsm:send_event(Pool, {checkin, Worker}).
-start_link(Args) ->
+-spec transaction(Pool :: node(), Fun :: fun((Worker :: pid()) -> any()))
+ -> any().
+transaction(Pool, Fun) ->
+ Worker = poolboy:checkout(Pool),
+ try
+ Fun(Worker)
+ after
+ ok = poolboy:checkin(Pool, Worker)
+ end.
+
+-spec child_spec(Pool :: node(), Args :: proplists:proplist()) ->
+ supervisor:child_spec().
+child_spec(Pool, Args) ->
+ {Pool, {poolboy, start_link, [Args]},
+ permanent, 5000, worker, [poolboy]}.
+
+-spec start_link(Args :: proplists:proplist()) -> {ok, pid()}.
+start_link(Args) ->
case proplists:get_value(name, Args) of
undefined ->
gen_fsm:start_link(?MODULE, Args, []);
@@ -49,331 +63,327 @@ start_link(Args) ->
gen_fsm:start_link(Name, ?MODULE, Args, [])
end.
+-spec stop(Pool :: node()) -> ok.
stop(Pool) ->
gen_fsm:sync_send_all_state_event(Pool, stop).
+-spec status(Pool :: node()) -> {state, integer(), integer(), integer()}.
+status(Pool) ->
+ gen_fsm:sync_send_all_state_event(Pool, status).
+
init(Args) ->
process_flag(trap_exit, true),
- init(Args, #state{}).
+ Waiting = queue:new(),
+ Monitors = ets:new(monitors, [private]),
+ init(Args, #state{waiting=Waiting, monitors=Monitors}).
-init([{worker_module, Mod} | Rest], State) ->
+init([{worker_module, Mod} | Rest], State) when is_atom(Mod) ->
{ok, Sup} = poolboy_sup:start_link(Mod, Rest),
- init(Rest, State#state{worker_sup=Sup});
-init([{size, Size} | Rest], State) ->
+ init(Rest, State#state{supervisor=Sup});
+init([{size, Size} | Rest], State) when is_integer(Size) ->
init(Rest, State#state{size=Size});
-init([{max_overflow, MaxOverflow} | Rest], State) ->
+init([{max_overflow, MaxOverflow} | Rest], State) when is_integer(MaxOverflow) ->
init(Rest, State#state{max_overflow=MaxOverflow});
-init([{init_fun, InitFun} | Rest], State) when is_function(InitFun) ->
- init(Rest, State#state{worker_init=InitFun});
-init([{stop_fun, StopFun} | Rest], State) when is_function(StopFun) ->
- init(Rest, State#state{worker_stop=StopFun});
init([_ | Rest], State) ->
init(Rest, State);
-init([], #state{size=Size, worker_sup=Sup, worker_init=InitFun,
- max_overflow=MaxOverflow}=State) ->
- Workers = prepopulate(Size, Sup, InitFun),
- StartState = case queue:len(Workers) of
- 0 when MaxOverflow =:= 0 -> full;
- 0 -> overflow;
- _ -> ready
+init([], #state{size=Size, supervisor=Sup, max_overflow=MaxOverflow}=State) ->
+ Workers = prepopulate(Size, Sup),
+ StartState = case Size of
+ Size when Size < 1, MaxOverflow < 1 -> full;
+ Size when Size < 1 -> overflow;
+ Size -> ready
end,
{ok, StartState, State#state{workers=Workers}}.
ready({checkin, Pid}, State) ->
- case lists:keytake(Pid, 1, State#state.monitors) of
- {value, {_, Ref}, Monitors} ->
- erlang:demonitor(Ref),
+ Monitors = State#state.monitors,
+ case ets:lookup(Monitors, Pid) of
+ [{Pid, Ref}] ->
+ true = erlang:demonitor(Ref),
+ true = ets:delete(Monitors, Pid),
Workers = queue:in(Pid, State#state.workers),
- {next_state, ready, State#state{workers=Workers,
- monitors=Monitors}};
- false ->
- %% unknown process checked in, ignore it
+ {next_state, ready, State#state{workers=Workers}};
+ [] ->
{next_state, ready, State}
end;
ready(_Event, State) ->
{next_state, ready, State}.
ready({checkout, Block, Timeout}, {FromPid, _}=From, State) ->
- #state{workers = Workers,
- worker_sup = Sup,
- max_overflow = MaxOverflow,
- worker_init = InitFun} = State,
+ #state{supervisor = Sup,
+ workers = Workers,
+ monitors = Monitors,
+ max_overflow = MaxOverflow} = State,
case queue:out(Workers) of
{{value, Pid}, Left} ->
Ref = erlang:monitor(process, FromPid),
- Monitors = [{Pid, Ref} | State#state.monitors],
- NextState = case queue:len(Left) of
- 0 when MaxOverflow == 0 ->
- full;
- 0 ->
- overflow;
- _ ->
- ready
+ true = ets:insert(Monitors, {Pid, Ref}),
+ NextState = case queue:is_empty(Left) of
+ true when MaxOverflow < 1 -> full;
+ true -> overflow;
+ false -> ready
end,
- {reply, Pid, NextState, State#state{workers=Left,
- monitors=Monitors}};
+ {reply, Pid, NextState, State#state{workers=Left}};
{empty, Empty} when MaxOverflow > 0 ->
- {Pid, Ref} = new_worker(Sup, FromPid, InitFun),
- Monitors = [{Pid, Ref} | State#state.monitors],
- {reply, Pid, overflow, State#state{workers=Empty,
- monitors=Monitors,
- overflow=1}};
+ {Pid, Ref} = new_worker(Sup, FromPid),
+ true = ets:insert(Monitors, {Pid, Ref}),
+ {reply, Pid, overflow, State#state{workers=Empty, overflow=1}};
{empty, Empty} when Block =:= false ->
{reply, full, full, State#state{workers=Empty}};
{empty, Empty} ->
- Waiting = State#state.waiting,
- {next_state, full, State#state{workers=Empty,
- waiting=add_waiting(From, Timeout, Waiting)}}
+ Waiting = add_waiting(From, Timeout, State#state.waiting),
+ {next_state, full, State#state{workers=Empty, waiting=Waiting}}
end;
ready(_Event, _From, State) ->
{reply, ok, ready, State}.
overflow({checkin, Pid}, #state{overflow=0}=State) ->
- case lists:keytake(Pid, 1, State#state.monitors) of
- {value, {_, Ref}, Monitors} ->
- erlang:demonitor(Ref),
+ Monitors = State#state.monitors,
+ case ets:lookup(Monitors, Pid) of
+ [{Pid, Ref}] ->
+ true = erlang:demonitor(Ref),
+ true = ets:delete(Monitors, Pid),
NextState = case State#state.size > 0 of
- true -> ready;
- _ -> overflow
+ true -> ready;
+ false -> overflow
end,
- {next_state, NextState, State#state{overflow=0, monitors=Monitors,
- workers=queue:in(Pid, State#state.workers)}};
- false ->
- %% unknown process checked in, ignore it
+ Workers = queue:in(Pid, State#state.workers),
+ {next_state, NextState, State#state{overflow=0, workers=Workers}};
+ [] ->
{next_state, overflow, State}
end;
overflow({checkin, Pid}, State) ->
- #state{overflow=Overflow, worker_stop=StopFun} = State,
- case lists:keytake(Pid, 1, State#state.monitors) of
- {value, {_, Ref}, Monitors} ->
- dismiss_worker(Pid, StopFun),
- erlang:demonitor(Ref),
- {next_state, overflow, State#state{overflow=Overflow-1,
- monitors=Monitors}};
- _ ->
- %% unknown process checked in, ignore it
+ #state{supervisor=Sup, monitors=Monitors, overflow=Overflow} = State,
+ case ets:lookup(Monitors, Pid) of
+ [{Pid, Ref}] ->
+ ok = dismiss_worker(Sup, Pid),
+ true = erlang:demonitor(Ref),
+ true = ets:delete(Monitors, Pid),
+ {next_state, overflow, State#state{overflow=Overflow-1}};
+ [] ->
{next_state, overflow, State}
end;
overflow(_Event, State) ->
{next_state, overflow, State}.
-overflow({checkout, Block, Timeout}, From, #state{overflow=Overflow,
- max_overflow=MaxOverflow
- }=State) when Overflow >= MaxOverflow ->
+overflow({checkout, Block, Timeout}, From,
+ #state{overflow=Overflow,
+ max_overflow=MaxOverflow}=State) when Overflow >= MaxOverflow ->
case Block of
+ true ->
+ Waiting = add_waiting(From, Timeout, State#state.waiting),
+ {next_state, full, State#state{waiting=Waiting}};
false ->
- {reply, full, full, State};
- Block ->
- Waiting = State#state.waiting,
- {next_state, full, State#state{waiting=add_waiting(From, Timeout, Waiting)}}
+ {reply, full, full, State}
end;
overflow({checkout, _Block, _Timeout}, {From, _}, State) ->
- #state{worker_sup=Sup, overflow=Overflow,
- worker_init=InitFun, max_overflow=MaxOverflow} = State,
- {Pid, Ref} = new_worker(Sup, From, InitFun),
- Monitors = [{Pid, Ref} | State#state.monitors],
+ #state{supervisor = Sup,
+ overflow = Overflow,
+ max_overflow = MaxOverflow} = State,
+ {Pid, Ref} = new_worker(Sup, From),
+ true = ets:insert(State#state.monitors, {Pid, Ref}),
NewOverflow = Overflow + 1,
- Next = case NewOverflow >= MaxOverflow of
- true -> full;
- _ -> overflow
+ NextState = case NewOverflow >= MaxOverflow of
+ true -> full;
+ false -> overflow
end,
- {reply, Pid, Next, State#state{monitors=Monitors,
- overflow=NewOverflow}};
+ {reply, Pid, NextState, State#state{overflow=NewOverflow}};
overflow(_Event, _From, State) ->
{reply, ok, overflow, State}.
full({checkin, Pid}, State) ->
- #state{waiting = Waiting, max_overflow = MaxOverflow,
- overflow = Overflow, worker_stop = StopFun} = State,
- case lists:keytake(Pid, 1, State#state.monitors) of
- {value, {_, Ref0}, Monitors} ->
- erlang:demonitor(Ref0),
- case queue:out(Waiting) of
- {{value, {{FromPid, _}=From, Timeout, StartTime}}, Left} ->
- case wait_valid(StartTime, Timeout) of
- true ->
- Ref = erlang:monitor(process, FromPid),
- Monitors1 = [{Pid, Ref} | Monitors],
- gen_fsm:reply(From, Pid),
- {next_state, full, State#state{waiting=Left,
- monitors=Monitors1}};
- _ ->
- %% replay this event with cleaned up waiting queue
- full({checkin, Pid}, State#state{waiting=Left})
- end;
- {empty, Empty} when MaxOverflow < 1 ->
- Workers = queue:in(Pid, State#state.workers),
- {next_state, ready, State#state{workers=Workers, waiting=Empty,
- monitors=Monitors}};
- {empty, Empty} ->
- dismiss_worker(Pid, StopFun),
- {next_state, overflow, State#state{waiting=Empty,
- monitors=Monitors,
- overflow=Overflow-1}}
- end;
- false ->
- %% unknown process checked in, ignore it
+ #state{monitors = Monitors} = State,
+ case ets:lookup(Monitors, Pid) of
+ [{Pid, Ref}] ->
+ true = erlang:demonitor(Ref),
+ true = ets:delete(Monitors, Pid),
+ checkin_while_full(Pid, State);
+ [] ->
{next_state, full, State}
end;
full(_Event, State) ->
{next_state, full, State}.
+full({checkout, true, Timeout}, From, State) ->
+ Waiting = add_waiting(From, Timeout, State#state.waiting),
+ {next_state, full, State#state{waiting=Waiting}};
full({checkout, false, _Timeout}, _From, State) ->
{reply, full, full, State};
-full({checkout, _Block, Timeout}, From, #state{waiting=Waiting}=State) ->
- {next_state, full, State#state{waiting=add_waiting(From, Timeout, Waiting)}};
full(_Event, _From, State) ->
{reply, ok, full, State}.
handle_event(_Event, StateName, State) ->
{next_state, StateName, State}.
+handle_sync_event(status, _From, StateName, State) ->
+ {reply, {StateName, queue:len(State#state.workers), State#state.overflow,
+ ets:info(State#state.monitors, size)},
+ StateName, State};
handle_sync_event(get_avail_workers, _From, StateName, State) ->
Workers = State#state.workers,
WorkerList = queue:to_list(Workers),
{reply, WorkerList, StateName, State};
handle_sync_event(get_all_workers, _From, StateName, State) ->
- Sup = State#state.worker_sup,
+ Sup = State#state.supervisor,
WorkerList = supervisor:which_children(Sup),
{reply, WorkerList, StateName, State};
handle_sync_event(get_all_monitors, _From, StateName, State) ->
- Monitors = State#state.monitors,
+ Monitors = ets:tab2list(State#state.monitors),
{reply, Monitors, StateName, State};
-handle_sync_event(stop, _From, _StateName,State) ->
- Sup = State#state.worker_sup,
- exit(Sup, shutdown),
+handle_sync_event(stop, _From, _StateName, State) ->
+ Sup = State#state.supervisor,
+ true = exit(Sup, shutdown),
{stop, normal, ok, State};
-handle_sync_event(status, _From, StateName, State) ->
- {reply, {StateName, queue:len(State#state.workers),
- State#state.overflow, length(State#state.monitors)},
- StateName, State};
handle_sync_event(_Event, _From, StateName, State) ->
Reply = {error, invalid_message},
{reply, Reply, StateName, State}.
handle_info({'DOWN', Ref, _, _, _}, StateName, State) ->
- case lists:keyfind(Ref, 2, State#state.monitors) of
- {Pid, Ref} ->
- exit(Pid, kill),
+ case ets:match(State#state.monitors, {'$1', Ref}) of
+ [[Pid]] ->
+ Sup = State#state.supervisor,
+ ok = supervisor:terminate_child(Sup, Pid),
{next_state, StateName, State};
- false ->
+ [] ->
{next_state, StateName, State}
end;
-handle_info({'EXIT', Pid, Reason}, StateName, State) ->
- #state{worker_sup = Sup,
- overflow = Overflow,
- waiting = Waiting,
- max_overflow = MaxOverflow,
- worker_init = InitFun} = State,
- case lists:keytake(Pid, 1, State#state.monitors) of
- {value, {_, Ref}, Monitors} -> erlang:demonitor(Ref),
- case StateName of
- ready ->
- W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
- {next_state, ready, State#state{workers=queue:in(new_worker(Sup, InitFun), W),
- monitors=Monitors}};
- overflow when Overflow == 0 ->
- W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
- {next_state, ready, State#state{workers=queue:in(new_worker(Sup, InitFun), W),
- monitors=Monitors}};
- overflow ->
- {next_state, overflow, State#state{monitors=Monitors,
- overflow=Overflow-1}};
- full when MaxOverflow < 1 ->
- case queue:out(Waiting) of
- {{value, {{FromPid, _}=From, Timeout, StartTime}}, LeftWaiting} ->
- case wait_valid(StartTime, Timeout) of
- true ->
- MonitorRef = erlang:monitor(process, FromPid),
- NewWorker = new_worker(Sup, InitFun),
- Monitors2 = [{NewWorker, MonitorRef} | Monitors],
- gen_fsm:reply(From, NewWorker),
- {next_state, full, State#state{waiting=LeftWaiting,
- monitors=Monitors2}};
- _ ->
- %% replay it
- handle_info({'EXIT', Pid, Reason}, StateName, State#state{waiting=LeftWaiting})
- end;
- {empty, Empty} ->
- Workers2 = queue:in(new_worker(Sup, InitFun), State#state.workers),
- {next_state, ready, State#state{monitors=Monitors,
- waiting=Empty,
- workers=Workers2}}
- end;
- full when Overflow =< MaxOverflow ->
- case queue:out(Waiting) of
- {{value, {{FromPid, _}=From, Timeout, StartTime}}, LeftWaiting} ->
- case wait_valid(StartTime, Timeout) of
- true ->
- MonitorRef = erlang:monitor(process, FromPid),
- NewWorker = new_worker(Sup, InitFun),
- Monitors2 = [{NewWorker, MonitorRef} | Monitors],
- gen_fsm:reply(From, NewWorker),
- {next_state, full, State#state{waiting=LeftWaiting,
- monitors=Monitors2}};
- _ ->
- %% replay it
- handle_info({'EXIT', Pid, Reason}, StateName, State#state{waiting=LeftWaiting})
- end;
- {empty, Empty} ->
- {next_state, overflow, State#state{monitors=Monitors,
- overflow=Overflow-1,
- waiting=Empty}}
- end;
- full ->
- {next_state, full, State#state{monitors=Monitors,
- overflow=Overflow-1}}
- end;
- _ ->
- %% not a monitored pid, is it in the worker queue?
+handle_info({'EXIT', Pid, _Reason}, StateName, State) ->
+ #state{supervisor = Sup,
+ monitors = Monitors} = State,
+ case ets:lookup(Monitors, Pid) of
+ [{Pid, Ref}] ->
+ true = erlang:demonitor(Ref),
+ true = ets:delete(Monitors, Pid),
+ handle_worker_exit(Pid, StateName, State);
+ [] ->
case queue:member(Pid, State#state.workers) of
true ->
- %% a checked in worker died
W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
- {next_state, StateName, State#state{workers=queue:in(new_worker(Sup, InitFun), W)}};
- _ ->
- %% completely irrelevant pid exited, don't change anything
+ {next_state, StateName, State#state{workers=queue:in(new_worker(Sup), W)}};
+ false ->
{next_state, StateName, State}
end
end;
handle_info(_Info, StateName, State) ->
{next_state, StateName, State}.
-terminate(_Reason, _StateName, _State) -> ok.
+terminate(_Reason, _StateName, _State) ->
+ ok.
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.
-new_worker(Sup, InitFun) ->
+new_worker(Sup) ->
{ok, Pid} = supervisor:start_child(Sup, []),
- {ok, Pid1} = InitFun(Pid),
- link(Pid1),
- Pid1.
+ true = link(Pid),
+ Pid.
-new_worker(Sup, FromPid, InitFun) ->
- Pid = new_worker(Sup, InitFun),
+new_worker(Sup, FromPid) ->
+ Pid = new_worker(Sup),
Ref = erlang:monitor(process, FromPid),
{Pid, Ref}.
-dismiss_worker(Pid, StopFun) ->
- unlink(Pid),
- StopFun(Pid).
+dismiss_worker(Sup, Pid) ->
+ true = unlink(Pid),
+ supervisor:terminate_child(Sup, Pid).
-prepopulate(0, _Sup, _InitFun) ->
+prepopulate(N, _) when N < 1 ->
queue:new();
-prepopulate(N, Sup, InitFun) ->
- prepopulate(N, Sup, queue:new(), InitFun).
+prepopulate(N, Sup) ->
+ prepopulate(N, Sup, queue:new()).
-prepopulate(0, _Sup, Workers, _InitFun) ->
+prepopulate(0, _, Workers) ->
Workers;
-prepopulate(N, Sup, Workers, InitFun) ->
- prepopulate(N-1, Sup, queue:in(new_worker(Sup, InitFun), Workers), InitFun).
+prepopulate(N, Sup, Workers) ->
+ prepopulate(N-1, Sup, queue:in(new_worker(Sup), Workers)).
-add_waiting(From, Timeout, Queue) ->
- queue:in({From, Timeout, os:timestamp()}, Queue).
+add_waiting(Pid, Timeout, Queue) ->
+ queue:in({Pid, Timeout, os:timestamp()}, Queue).
wait_valid(infinity, _) ->
true;
wait_valid(StartTime, Timeout) ->
Waited = timer:now_diff(os:timestamp(), StartTime),
(Waited div 1000) < Timeout.
+
+handle_worker_exit(Pid, StateName, State) ->
+ #state{supervisor = Sup,
+ overflow = Overflow,
+ waiting = Waiting,
+ monitors = Monitors,
+ max_overflow = MaxOverflow} = State,
+ case StateName of
+ ready ->
+ W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
+ {next_state, ready, State#state{workers=queue:in(new_worker(Sup), W)}};
+ overflow when Overflow =:= 0 ->
+ W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
+ {next_state, ready, State#state{workers=queue:in(new_worker(Sup), W)}};
+ overflow ->
+ {next_state, overflow, State#state{overflow=Overflow-1}};
+ full when MaxOverflow < 1 ->
+ case queue:out(Waiting) of
+ {{value, {{FromPid, _}=From, Timeout, StartTime}}, LeftWaiting} ->
+ case wait_valid(StartTime, Timeout) of
+ true ->
+ MonitorRef = erlang:monitor(process, FromPid),
+ NewWorker = new_worker(Sup),
+ true = ets:insert(Monitors, {NewWorker, MonitorRef}),
+ gen_fsm:reply(From, NewWorker),
+ {next_state, full, State#state{waiting=LeftWaiting}};
+ false ->
+ handle_worker_exit(Pid, StateName, State#state{waiting=LeftWaiting})
+ end;
+ {empty, Empty} ->
+ Workers2 = queue:in(new_worker(Sup), State#state.workers),
+ {next_state, ready, State#state{waiting=Empty,
+ workers=Workers2}}
+ end;
+ full when Overflow =< MaxOverflow ->
+ case queue:out(Waiting) of
+ {{value, {{FromPid, _}=From, Timeout, StartTime}}, LeftWaiting} ->
+ case wait_valid(StartTime, Timeout) of
+ true ->
+ MonitorRef = erlang:monitor(process, FromPid),
+ NewWorker = new_worker(Sup),
+ true = ets:insert(Monitors, {NewWorker, MonitorRef}),
+ gen_fsm:reply(From, NewWorker),
+ {next_state, full, State#state{waiting=LeftWaiting}};
+ _ ->
+ handle_worker_exit(Pid, StateName, State#state{waiting=LeftWaiting})
+ end;
+ {empty, Empty} ->
+ {next_state, overflow, State#state{overflow=Overflow-1,
+ waiting=Empty}}
+ end;
+ full ->
+ {next_state, full, State#state{overflow=Overflow-1}}
+ end.
+
+checkin_while_full(Pid, State) ->
+ #state{supervisor = Sup,
+ waiting = Waiting,
+ monitors = Monitors,
+ max_overflow = MaxOverflow,
+ overflow = Overflow} = State,
+ case queue:out(Waiting) of
+ {{value, {{FromPid, _}=From, Timeout, StartTime}}, Left} ->
+ case wait_valid(StartTime, Timeout) of
+ true ->
+ Ref1 = erlang:monitor(process, FromPid),
+ true = ets:insert(Monitors, {Pid, Ref1}),
+ gen_fsm:reply(From, Pid),
+ {next_state, full, State#state{waiting=Left}};
+ false ->
+ checkin_while_full(Pid, State#state{waiting=Left})
+ end;
+ {empty, Empty} when MaxOverflow < 1 ->
+ Workers = queue:in(Pid, State#state.workers),
+ {next_state, ready, State#state{workers=Workers,
+ waiting=Empty}};
+ {empty, Empty} ->
+ ok = dismiss_worker(Sup, Pid),
+ {next_state, overflow, State#state{waiting=Empty,
+ overflow=Overflow-1}}
+ end.
+
View
2  src/poolboy_sup.erl
@@ -11,4 +11,4 @@ start_link(Mod, Args) ->
init({Mod, Args}) ->
{ok, {{simple_one_for_one, 0, 1},
[{Mod, {Mod, start_link, [Args]},
- temporary, brutal_kill, worker, [Mod]}]}}.
+ temporary, 5000, worker, [Mod]}]}}.
View
10 src/poolboy_worker.erl
@@ -0,0 +1,10 @@
+%% Poolboy - A hunky Erlang worker pool factory
+
+-module(poolboy_worker).
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [{start_link, 1}];
+behaviour_info(_Other) ->
+ undefined.
View
481 test/poolboy_eqc.erl
@@ -1,227 +1,254 @@
--module(poolboy_eqc).
--compile([export_all]).
-
--ifdef(TEST).
--ifdef(EQC).
--include_lib("eqc/include/eqc.hrl").
--include_lib("eqc/include/eqc_statem.hrl").
-
--include_lib("eunit/include/eunit.hrl").
-
-poolboy_test_() ->
- {timeout, 20,
- fun() ->
- ?assert(eqc:quickcheck(eqc:testing_time(4,
- poolboy_eqc:prop_sequential()))),
- ?assert(eqc:quickcheck(eqc:testing_time(4,
- poolboy_eqc:prop_parallel())))
- end
- }.
-
--record(state,
- {
- pid,
- size,
- max_overflow,
- checked_out = []
- }).
-
-initial_state() ->
- #state{}.
-
-command(S) ->
- oneof(
- [{call, ?MODULE, start_poolboy, make_args(S, nat(), nat())} || S#state.pid == undefined] ++
- [{call, ?MODULE, stop_poolboy, [S#state.pid]} || S#state.pid /= undefined] ++
- [{call, ?MODULE, checkout_nonblock, [S#state.pid]} || S#state.pid /= undefined] ++
- %% checkout shrinks to checkout_nonblock so we can simplify counterexamples
- [{call, ?MODULE, ?SHRINK(checkout_block, [checkout_nonblock]), [S#state.pid]} || S#state.pid /= undefined] ++
- [{call, ?MODULE, checkin, [S#state.pid, fault({call, ?MODULE, spawn_process, []}, elements(S#state.checked_out))]} || S#state.pid /= undefined, S#state.checked_out /= []] ++
- [{call, ?MODULE, kill_worker, [elements(S#state.checked_out)]} || S#state.pid /= undefined, S#state.checked_out /= []] ++
- [{call, ?MODULE, kill_idle_worker, [S#state.pid]} || S#state.pid /= undefined] ++
- [{call, ?MODULE, spurious_exit, [S#state.pid]} || S#state.pid /= undefined]
- ).
-
-make_args(_S, Size, Overflow) ->
- [[{size, Size}, {max_overflow, Overflow}, {worker_module, poolboy_test_worker}, {name, {local, poolboy_eqc}}]].
-
-spawn_process() ->
- {spawn(fun() ->
- timer:sleep(5000)
- end), self()}.
-
-spawn_linked_process(Pool) ->
- Parent = self(),
- Pid = spawn(fun() ->
- link(Pool),
- Parent ! {linked, self()},
- timer:sleep(5000)
- end),
- receive
- {linked, Pid} ->
- Pid
- end.
-
-start_poolboy(Args) ->
- {ok, Pid} = poolboy:start_link(Args),
- Pid.
-
-stop_poolboy(Pid) ->
- gen_fsm:sync_send_all_state_event(Pid, stop),
- timer:sleep(1).
-
-checkout_nonblock(Pool) ->
- {poolboy:checkout(Pool, false), self()}.
-
-checkout_block(Pool) ->
- {catch(poolboy:checkout(Pool, true, 100)), self()}.
-
-checkin(Pool, {Worker, _}) ->
- Res = poolboy:checkin(Pool, Worker),
- gen_fsm:sync_send_all_state_event(Pool, get_avail_workers),
- Res.
-
-kill_worker({Worker, _}) ->
- exit(Worker, kill),
- timer:sleep(1).
-
-kill_idle_worker(Pool) ->
- Pid = poolboy:checkout(Pool, false),
- case Pid of
- _ when is_pid(Pid) ->
- poolboy:checkin(Pool, Pid),
- kill_worker({Pid, self()});
- _ ->
- timer:sleep(1),
- kill_idle_worker(Pool)
- end.
-
-spurious_exit(Pool) ->
- Pid = spawn_linked_process(Pool),
- exit(Pid, kill).
-
-precondition(S,{call,_,start_poolboy,_}) ->
- %% only start new pool when old one is stopped
- S#state.pid == undefined;
-precondition(S,_) when S#state.pid == undefined ->
- %% all other states need a running pool
- false;
-precondition(S, {call, _, kill_worker, [Pid]}) ->
- lists:member(Pid, S#state.checked_out);
-precondition(S,{call,_,kill_idle_worker,[_Pool]}) ->
- length(S#state.checked_out) < S#state.size;
-precondition(_S,{call,_,_,_}) ->
- true.
-
-%% XXX comment out for parallel mode XXX
-%dynamic_precondition(S = #state{pid=Pid},_) when Pid /= undefined ->
- %State = if length(S#state.checked_out) == S#state.size + S#state.max_overflow ->
- %full;
- %length(S#state.checked_out) >= S#state.size ->
- %overflow;
- %true ->
- %ready
- %end,
-
- %Workers = max(0, S#state.size - length(S#state.checked_out)),
- %OverFlow = max(0, length(S#state.checked_out) - S#state.size),
- %Monitors = length(S#state.checked_out),
-
- %RealStatus = gen_fsm:sync_send_all_state_event(Pid, status),
- %case RealStatus == {State, Workers, OverFlow, Monitors} of
- %true ->
- %true;
- %_ ->
- %exit({wrong_state, RealStatus, {State, Workers, OverFlow, Monitors}})
- %end;
-%dynamic_precondition(_,_) ->
- %true.
-
-postcondition(S,{call,_,checkout_block,[_Pool]},R) ->
- case R of
- {{'EXIT', {timeout, _}}, _} ->
- length(S#state.checked_out) >= S#state.size + S#state.max_overflow;
- _ ->
- length(S#state.checked_out) < S#state.size + S#state.max_overflow
- end;
-postcondition(S,{call,_,checkout_nonblock,[_Pool]},R) ->
- case R of
- {full, _} ->
- length(S#state.checked_out) >= S#state.size + S#state.max_overflow;
- _ ->
- length(S#state.checked_out) < S#state.size + S#state.max_overflow
- end;
-postcondition(_S, {call,_,checkin,_}, R) ->
- R == ok;
-postcondition(_S,{call,_,_,_},_R) ->
- true.
-
-next_state(S,V,{call,_,start_poolboy, [Args]}) ->
- S#state{pid=V,
- size=proplists:get_value(size, Args),
- max_overflow=proplists:get_value(max_overflow, Args)
- };
-next_state(S,_V,{call,_,stop_poolboy, [_Args]}) ->
- S#state{pid=undefined, checked_out=[]};
-next_state(S,V,{call,_,checkout_block,_}) ->
- %% if the model says the checkout worked, store the result
- case checkout_ok(S) of
- false ->
- S;
- _ ->
- S#state{checked_out=S#state.checked_out++[V]}
- end;
-next_state(S,V,{call,_,checkout_nonblock,_}) ->
- %% if the model says the checkout worked, store the result
- case checkout_ok(S) of
- false ->
- S;
- _ ->
- S#state{checked_out=S#state.checked_out++[V]}
- end;
-next_state(S,_V,{call, _, checkin, [_Pool, Worker]}) ->
- S#state{checked_out=S#state.checked_out -- [Worker]};
-next_state(S,_V,{call, _, kill_worker, [Worker]}) ->
- S#state{checked_out=S#state.checked_out -- [Worker]};
-next_state(S,_V,{call, _, kill_idle_worker, [_Pool]}) ->
- S;
-next_state(S,_V,{call, _, spurious_exit, [_Pool]}) ->
- S;
-next_state(S,V,{call, erlang, self, []}) ->
- %% added after test generation, values are never symbolic
- S#state{checked_out=[{Worker, Pid} || {Worker, Pid} <- S#state.checked_out, Pid /= V]}.
-
-
-prop_sequential() ->
- fault_rate(1, 10,
- ?FORALL(Cmds,commands(?MODULE),
- ?TRAPEXIT(
- aggregate(command_names(Cmds),
- begin
- {H,S,Res} = run_commands(?MODULE,Cmds),
- catch(stop_poolboy(whereis(poolboy_eqc))),
- ?WHENFAIL(io:format("History: ~p\nState: ~p\nRes: ~p\n~p\n",
- [H,S,Res, zip(tl(Cmds), [Y || {_, Y} <- H])]),
- Res == ok)
- end)))).
-
-prop_parallel() ->
- fault_rate(1, 10,
- ?FORALL(Cmds={Seq,Par},parallel_commands(?MODULE),
- ?TRAPEXIT(
- aggregate(command_names(Cmds),
- begin
- NewPar = [P ++ [{set, {var, 0}, {call, erlang, self, []}}] || P <- Par],
- {H,S,Res} = run_parallel_commands(?MODULE,{Seq,NewPar}),
- catch(stop_poolboy(whereis(poolboy_eqc))),
- ?WHENFAIL(io:format("History: ~p\nState: ~p\nRes: ~p\n",
- [H,S,Res]),
- Res == ok)
- end)))).
-
-
-checkout_ok(S) ->
- length(S#state.checked_out) < S#state.size + S#state.max_overflow.
-
--endif.
--endif.
+-module(poolboy_eqc).
+-compile([export_all]).
+
+-ifdef(TEST).
+-ifdef(EQC).
+-include_lib("eqc/include/eqc.hrl").
+-include_lib("eqc/include/eqc_statem.hrl").
+
+-include_lib("eunit/include/eunit.hrl").
+
+poolboy_test_() ->
+ {timeout, 20,
+ fun() ->
+ ?assert(eqc:quickcheck(eqc:testing_time(4,
+ poolboy_eqc:prop_sequential()))),
+ ?assert(eqc:quickcheck(eqc:testing_time(4,
+ poolboy_eqc:prop_parallel())))
+ end
+ }.
+
+-record(state,
+ {
+ pid,
+ size,
+ max_overflow,
+ checked_out = []
+ }).
+
+initial_state() ->
+ #state{}.
+
+command(S) ->
+ oneof(
+ [{call, ?MODULE, start_poolboy, make_args(S, nat(), nat())} || S#state.pid == undefined] ++
+ [{call, ?MODULE, stop_poolboy, [S#state.pid]} || S#state.pid /= undefined] ++
+ [{call, ?MODULE, checkout_nonblock, [S#state.pid]} || S#state.pid /= undefined] ++
+ %% checkout shrinks to checkout_nonblock so we can simplify counterexamples
+ [{call, ?MODULE, ?SHRINK(checkout_block, [checkout_nonblock]), [S#state.pid]} || S#state.pid /= undefined] ++
+ [{call, ?MODULE, checkin, [S#state.pid, fault({call, ?MODULE, spawn_process, []}, elements(S#state.checked_out))]} || S#state.pid /= undefined, S#state.checked_out /= []] ++
+ [{call, ?MODULE, kill_worker, [elements(S#state.checked_out)]} || S#state.pid /= undefined, S#state.checked_out /= []] ++
+ [{call, ?MODULE, kill_idle_worker, [S#state.pid]} || S#state.pid /= undefined] ++
+ [{call, ?MODULE, spurious_exit, [S#state.pid]} || S#state.pid /= undefined]
+ ).
+
+make_args(_S, Size, Overflow) ->
+ [[{size, Size}, {max_overflow, Overflow}, {worker_module, poolboy_test_worker}, {name, {local, poolboy_eqc}}]].
+
+spawn_process() ->
+ {spawn(fun() ->
+ timer:sleep(5000)
+ end), self()}.
+
+spawn_linked_process(Pool) ->
+ Parent = self(),
+ Pid = spawn(fun() ->
+ link(Pool),
+ Parent ! {linked, self()},
+ timer:sleep(5000)
+ end),
+ receive
+ {linked, Pid} ->
+ Pid
+ end.
+
+start_poolboy(Args) ->
+ {ok, Pid} = poolboy:start_link(Args),
+ Pid.
+
+stop_poolboy(Pid) ->
+ gen_fsm:sync_send_all_state_event(Pid, stop),
+ timer:sleep(1).
+
+checkout_nonblock(Pool) ->
+ {poolboy:checkout(Pool, false), self()}.
+
+checkout_block(Pool) ->
+ {catch(poolboy:checkout(Pool, true, 100)), self()}.
+
+checkin(Pool, {Worker, _}) ->
+ Res = poolboy:checkin(Pool, Worker),
+ gen_fsm:sync_send_all_state_event(Pool, get_avail_workers),
+ Res.
+
+kill_worker({Worker, _}) ->
+ exit(Worker, kill),
+ timer:sleep(1).
+
+kill_idle_worker(Pool) ->
+ Pid = poolboy:checkout(Pool, false),
+ case Pid of
+ _ when is_pid(Pid) ->
+ poolboy:checkin(Pool, Pid),
+ kill_worker({Pid, self()});
+ _ ->
+ timer:sleep(1),
+ kill_idle_worker(Pool)
+ end.
+
+spurious_exit(Pool) ->
+ Pid = spawn_linked_process(Pool),
+ exit(Pid, kill).
+
+precondition(S,{call,_,start_poolboy,_}) ->
+ %% only start new pool when old one is stopped
+ S#state.pid == undefined;
+precondition(S,_) when S#state.pid == undefined ->
+ %% all other states need a running pool
+ false;
+precondition(S, {call, _, kill_worker, [Pid]}) ->
+ lists:member(Pid, S#state.checked_out);
+precondition(S,{call,_,kill_idle_worker,[_Pool]}) ->
+ length(S#state.checked_out) < S#state.size;
+precondition(S,{call,_,checkin,[_Pool, Pid]}) ->
+ lists:member(Pid, S#state.checked_out);
+precondition(_S,{call,_,_,_}) ->
+ true.
+
+%% XXX comment out for parallel mode XXX
+%dynamic_precondition(S = #state{pid=Pid},_) when Pid /= undefined ->
+ %State = if length(S#state.checked_out) == S#state.size + S#state.max_overflow ->
+ %full;
+ %length(S#state.checked_out) >= S#state.size ->
+ %overflow;
+ %true ->
+ %ready
+ %end,
+
+ %Workers = max(0, S#state.size - length(S#state.checked_out)),
+ %OverFlow = max(0, length(S#state.checked_out) - S#state.size),
+ %Monitors = length(S#state.checked_out),
+
+ %RealStatus = gen_fsm:sync_send_all_state_event(Pid, status),
+ %case RealStatus == {State, Workers, OverFlow, Monitors} of
+ %true ->
+ %true;
+ %_ ->
+ %exit({wrong_state, RealStatus, {State, Workers, OverFlow, Monitors}})
+ %end;
+%dynamic_precondition(_,_) ->
+ %true.
+
+postcondition(S,{call,_,checkout_block,[_Pool]},R) ->
+ case R of
+ {{'EXIT', {timeout, _}}, _} ->
+ case length(S#state.checked_out) >= S#state.size + S#state.max_overflow of
+ true ->
+ true;
+ _ ->
+ {checkout_block, R}
+ end;
+ _ ->
+ case length(S#state.checked_out) < S#state.size + S#state.max_overflow of
+ true ->
+ true;
+ _ ->
+ {checkout_block, R}
+ end
+ end;
+postcondition(S,{call,_,checkout_nonblock,[_Pool]},R) ->
+ case R of
+ {full, _} ->
+ case length(S#state.checked_out) >= S#state.size + S#state.max_overflow of
+ true ->
+ true;
+ _ ->
+ {checkout_nonblock, R}
+ end;
+ _ ->
+ case length(S#state.checked_out) < S#state.size + S#state.max_overflow of
+ true ->
+ true;
+ _ ->
+ {checkout_block, R}
+ end
+ end;
+postcondition(_S, {call,_,checkin,_}, R) ->
+ case R of
+ ok ->
+ true;
+ _ ->
+ {checkin, R}
+ end;
+postcondition(_S,{call,_,_,_},_R) ->
+ true.
+
+next_state(S,V,{call,_,start_poolboy, [Args]}) ->
+ S#state{pid=V,
+ size=proplists:get_value(size, Args),
+ max_overflow=proplists:get_value(max_overflow, Args)
+ };
+next_state(S,_V,{call,_,stop_poolboy, [_Args]}) ->
+ S#state{pid=undefined, checked_out=[]};
+next_state(S,V,{call,_,checkout_block,_}) ->
+ %% if the model says the checkout worked, store the result
+ case checkout_ok(S) of
+ false ->
+ S;
+ _ ->
+ S#state{checked_out=S#state.checked_out++[V]}
+ end;
+next_state(S,V,{call,_,checkout_nonblock,_}) ->
+ %% if the model says the checkout worked, store the result
+ case checkout_ok(S) of
+ false ->
+ S;
+ _ ->
+ S#state{checked_out=S#state.checked_out++[V]}
+ end;
+next_state(S,_V,{call, _, checkin, [_Pool, Worker]}) ->
+ S#state{checked_out=S#state.checked_out -- [Worker]};
+next_state(S,_V,{call, _, kill_worker, [Worker]}) ->
+ S#state{checked_out=S#state.checked_out -- [Worker]};
+next_state(S,_V,{call, _, kill_idle_worker, [_Pool]}) ->
+ S;
+next_state(S,_V,{call, _, spurious_exit, [_Pool]}) ->
+ S;
+next_state(S,V,{call, erlang, self, []}) ->
+ %% added after test generation, values are never symbolic
+ S#state{checked_out=[{Worker, Pid} || {Worker, Pid} <- S#state.checked_out, Pid /= V]}.
+
+
+prop_sequential() ->
+ fault_rate(1, 10,
+ ?FORALL(Cmds,commands(?MODULE),
+ ?TRAPEXIT(
+ aggregate(command_names(Cmds),
+ begin
+ {H,S,Res} = run_commands(?MODULE,Cmds),
+ catch(stop_poolboy(whereis(poolboy_eqc))),
+ ?WHENFAIL(io:format("History: ~p\nState: ~p\nRes: ~p\n~p\n",
+ [H,S,Res, zip(Cmds, [Y || {_, Y} <- H])]),
+ Res == ok)
+ end)))).
+
+prop_parallel() ->
+ fault_rate(1, 10,
+ ?FORALL(Cmds={Seq,Par},parallel_commands(?MODULE),
+ ?TRAPEXIT(
+ aggregate(command_names(Cmds),
+ begin
+ NewPar = [P ++ [{set, {var, 0}, {call, erlang, self, []}}] || P <- Par],
+ {H,S,Res} = run_parallel_commands(?MODULE,{Seq,NewPar}),
+ catch(stop_poolboy(whereis(poolboy_eqc))),
+ ?WHENFAIL(io:format("History: ~p\nState: ~p\nRes: ~p\n",
+ [H,S,Res]),
+ Res == ok)
+ end)))).
+
+
+checkout_ok(S) ->
+ length(S#state.checked_out) < S#state.size + S#state.max_overflow.
+
+-endif.
+-endif.
View
8 test/poolboy_test_worker.erl
@@ -1,18 +1,16 @@
-module(poolboy_test_worker).
-
-behaviour(gen_server).
+-behaviour(poolboy_worker).
-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--record(state, {}).
-
start_link(_Args) ->
gen_server:start_link(?MODULE, [], []).
init([]) ->
- {ok, #state{}}.
+ {ok, undefined}.
handle_call(die, _From, State) ->
{stop, {error, died}, dead, State};
@@ -22,8 +20,6 @@ handle_call(_Event, _From, State) ->
handle_cast(_Event, State) ->
{noreply, State}.
-handle_info(stop, State) ->
- {stop, normal, State};
handle_info(_Info, State) ->
{noreply, State}.
View
103 test/poolboy_tests.erl
@@ -48,11 +48,11 @@ pool_test_() ->
{<<"Pool behaves on owner death">>,
fun owner_death/0
},
- {<<"Pool worker init function called when workers when created">>,
- fun worker_init_fun/0
+ {<<"Worker checked-in after an exception in a transaction">>,
+ fun checkin_after_exception_in_transaction/0
},
- {<<"Pool worker stop function called on workers when destroyed">>,
- fun worker_stop_fun/0
+ {<<"Pool returns status">>,
+ fun pool_returns_status/0
}
]
}.
@@ -75,9 +75,7 @@ checkin_worker(Pid, Worker) ->
pool_startup() ->
%% Check basic pool operation.
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, 10}, {max_overflow, 5}]),
+ {ok, Pid} = new_pool(10, 5),
?assertEqual(10, length(?sync(Pid, get_avail_workers))),
poolboy:checkout(Pid),
?assertEqual(9, length(?sync(Pid, get_avail_workers))),
@@ -90,9 +88,7 @@ pool_startup() ->
pool_overflow() ->
%% Check that the pool overflows properly.
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, 5}, {max_overflow, 5}]),
+ {ok, Pid} = new_pool(5, 5),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)],
?assertEqual(0, length(?sync(Pid, get_avail_workers))),
?assertEqual(7, length(?sync(Pid, get_all_workers))),
@@ -118,9 +114,7 @@ pool_overflow() ->
pool_empty() ->
%% Checks that the the pool handles the empty condition correctly when
%% overflow is enabled.
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, 5}, {max_overflow, 2}]),
+ {ok, Pid} = new_pool(5, 2),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)],
?assertEqual(0, length(?sync(Pid, get_avail_workers))),
?assertEqual(7, length(?sync(Pid, get_all_workers))),
@@ -166,9 +160,7 @@ pool_empty() ->
pool_empty_no_overflow() ->
%% Checks the pool handles the empty condition properly when overflow is
%% disabled.
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, 5}, {max_overflow, 0}]),
+ {ok, Pid} = new_pool(5, 0),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)],
?assertEqual(0, length(?sync(Pid, get_avail_workers))),
?assertEqual(5, length(?sync(Pid, get_all_workers))),
@@ -210,9 +202,7 @@ pool_empty_no_overflow() ->
worker_death() ->
%% Check that dead workers are only restarted when the pool is not full
%% and the overflow count is 0. Meaning, don't restart overflow workers.
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, 5}, {max_overflow, 2}]),
+ {ok, Pid} = new_pool(5, 2),
Worker = poolboy:checkout(Pid),
kill_worker(Worker),
?assertEqual(5, length(?sync(Pid, get_avail_workers))),
@@ -233,9 +223,7 @@ worker_death_while_full() ->
%% Check that if a worker dies while the pool is full and there is a
%% queued checkout, a new worker is started and the checkout serviced.
%% If there are no queued checkouts, a new worker is not started.
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, 5}, {max_overflow, 2}]),
+ {ok, Pid} = new_pool(5, 2),
Worker = poolboy:checkout(Pid),
kill_worker(Worker),
?assertEqual(5, length(?sync(Pid, get_avail_workers))),
@@ -276,9 +264,7 @@ worker_death_while_full_no_overflow() ->
%% Check that if a worker dies while the pool is full and there's no
%% overflow, a new worker is started unconditionally and any queued
%% checkouts are serviced.
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, 5}, {max_overflow, 0}]),
+ {ok, Pid} = new_pool(5, 0),
Worker = poolboy:checkout(Pid),
kill_worker(Worker),
?assertEqual(5, length(?sync(Pid, get_avail_workers))),
@@ -320,9 +306,7 @@ worker_death_while_full_no_overflow() ->
pool_full_nonblocking_no_overflow() ->
%% Check that when the pool is full, checkouts return 'full' when the
%% option to use non-blocking checkouts is used.
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, 5}, {max_overflow, 0}]),
+ {ok, Pid} = new_pool(5, 0),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)],
?assertEqual(0, length(?sync(Pid, get_avail_workers))),
?assertEqual(5, length(?sync(Pid, get_all_workers))),
@@ -337,9 +321,7 @@ pool_full_nonblocking_no_overflow() ->
pool_full_nonblocking() ->
%% Check that when the pool is full, checkouts return 'full' when the
%% option to use non-blocking checkouts is used.
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, 5}, {max_overflow, 5}]),
+ {ok, Pid} = new_pool(5, 5),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 9)],
?assertEqual(0, length(?sync(Pid, get_avail_workers))),
?assertEqual(10, length(?sync(Pid, get_all_workers))),
@@ -356,9 +338,7 @@ pool_full_nonblocking() ->
owner_death() ->
%% Check that a dead owner (a process that dies with a worker checked out)
%% causes the pool to dismiss the worker and prune the state space.
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, 5}, {max_overflow, 5}]),
+ {ok, Pid} = new_pool(5, 5),
spawn(fun() ->
poolboy:checkout(Pid),
receive after 500 -> exit(normal) end
@@ -369,38 +349,29 @@ owner_death() ->
?assertEqual(0, length(?sync(Pid, get_all_monitors))),
ok = ?sync(Pid, stop).
-worker_init_fun() ->
- Self = self(),
- InitFun = fun(Worker) ->
- Self ! worked,
- {ok, Worker}
+checkin_after_exception_in_transaction() ->
+ {ok, Pool} = new_pool(2, 0),
+ ?assertEqual(2, length(?sync(Pool, get_avail_workers))),
+ Tx = fun(Worker) ->
+ ?assert(is_pid(Worker)),
+ ?assertEqual(1, length(?sync(Pool, get_avail_workers))),
+ throw(it_on_the_ground),
+ ?assert(false)
end,
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {init_fun, InitFun}]),
- poolboy:checkout(Pid),
- receive
- worked -> ?assert(true)
- after
- 1000 -> ?assert(false)
+ try
+ poolboy:transaction(Pool, Tx)
+ catch
+ throw:it_on_the_ground -> ok
end,
- ok = ?sync(Pid, stop).
+ ?assertEqual(2, length(?sync(Pool, get_avail_workers))),
+ ok = ?sync(Pool, stop).
-worker_stop_fun() ->
- Self = self(),
- StopFun = fun(Worker) ->
- Self ! worked,
- Worker ! stop
- end,
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {stop_fun, StopFun},
- {size, 0}, {max_overflow, 1}]),
- Worker = poolboy:checkout(Pid),
- checkin_worker(Pid, Worker),
- receive
- worked -> ?assert(true)
- after
- 1000 -> ?assert(false)
- end,
- ok = ?sync(Pid, stop).
+pool_returns_status() ->
+ {ok, Pool} = new_pool(2, 0),
+ ?assertEqual({ready, 2, 0, 0}, poolboy:status(Pool)),
+ ok = ?sync(Pool, stop).
+
+new_pool(Size, MaxOverflow) ->
+ poolboy:start_link([{name, {local, poolboy_test}},
+ {worker_module, poolboy_test_worker},
+ {size, Size}, {max_overflow, MaxOverflow}]).
Please sign in to comment.
Something went wrong with that request. Please try again.