Skip to content

Commit

Permalink
Several additions and improvements
Browse files Browse the repository at this point in the history
- Change a `queue:len/1` call during a checkout in the ready state to use
  `queue:is_empty/1`. All operations in poolboy that aren't informational
  or disaster recovery should now theoretically be O(1).
- There is now "transaction" support using `transaction/2`. This will
  run a user defined function passed the worker checked out in a try
  block, ensuring that the worker is checked back into the pool should
  the operation fail.
- A `child_spec/2` helper for embedding poolboy pools  within your
  supervisors.
- Documentation updates.
  • Loading branch information
Devin Torres committed Apr 27, 2012
1 parent fb7014a commit 4987c10
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 75 deletions.
36 changes: 15 additions & 21 deletions README.md
Expand Up @@ -4,11 +4,13 @@ Poolboy - A hunky Erlang worker pool factory
Usage
-----

```erlang
Worker = poolboy:checkout(PoolName),
Reply = gen_server:call(Worker, WorkerFun),
poolboy:checkin(PoolName, Worker),
Reply.
```erl-sh
1> Worker = poolboy:checkout(PoolName).
<0.9001.0>
2> gen_server:call(Worker, Request).
ok
3> poolboy:checkin(PoolName, Worker).
ok
```

Example
Expand Down Expand Up @@ -79,22 +81,19 @@ init([]) ->
Args = [{name, {local, PoolName}},
{worker_module, example_worker}]
++ PoolConfig,
{PoolName, {poolboy, start_link, [Args]},
permanent, 5000, worker, [poolboy]}
poolboy:child_spec(PoolName, Args)
end, Pools),
{ok, {{one_for_one, 10, 10}, PoolSpecs}}.

squery(PoolName, Sql) ->
Worker = poolboy:checkout(PoolName),
Reply = gen_server:call(Worker, {squery, Sql}),
poolboy:checkin(PoolName, Worker),
Reply.
poolboy:transaction(PoolName, fun(Worker) ->
gen_server:call(Worker, {squery, Sql})
end).

equery(PoolName, Stmt, Params) ->
Worker = poolboy:checkout(PoolName),
Reply = gen_server:call(Worker, {equery, Stmt, Params}),
poolboy:checkin(PoolName, Worker),
Reply.
poolboy:transaction(PoolName, fun(Worker) ->
gen_server:call(Worker, {equery, Stmt, Params})
end).
```

### example_worker.erl
Expand All @@ -114,7 +113,6 @@ start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).

init(Args) ->
process_flag(trap_exit, true),
Hostname = proplists:get_value(hostname, Args),
Database = proplists:get_value(database, Args),
Username = proplists:get_value(username, Args),
Expand All @@ -134,15 +132,11 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.

handle_info(stop, State) ->
{stop, shutdown, State};
handle_info({'EXIT', _, _}, State) ->
{stop, shutdown, State};
handle_info(_Info, State) ->
{noreply, State}.

terminate(_Reason, #state{conn=Conn}) ->
pgsql:close(Conn),
ok = pgsql:close(Conn),
ok.

code_change(_OldVsn, State, _Extra) ->
Expand Down
53 changes: 32 additions & 21 deletions src/poolboy.erl
Expand Up @@ -3,7 +3,7 @@
-module(poolboy).
-behaviour(gen_fsm).

-export([checkout/1, checkout/2, checkout/3, checkin/2,
-export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2,
child_spec/2, start_link/1, stop/1]).
-export([init/1, ready/2, ready/3, overflow/2, overflow/3, full/2, full/3,
handle_event/3, handle_sync_event/4, handle_info/3, terminate/3,
Expand Down Expand Up @@ -35,17 +35,27 @@ checkout(Pool, Block, Timeout) ->
gen_fsm:sync_send_event(Pool, {checkout, Block, Timeout}, Timeout).

-spec checkin(Pool :: node(), Worker :: pid()) -> ok.
checkin(Pool, Worker) ->
checkin(Pool, Worker) when is_pid(Worker) ->
gen_fsm:send_event(Pool, {checkin, Worker}).

-spec transaction(Pool :: node(), Fun :: fun((Worker :: pid()) -> any()))
-> any().
transaction(Pool, Fun) ->
Worker = poolboy:checkout(Pool),
try
Fun(Worker)
after
ok = poolboy:checkin(Pool, Worker)
end.

-spec child_spec(Pool :: node(), Args :: proplists:proplist()) ->
supervisor:child_spec().
child_spec(Pool, Args) ->
{Pool, {poolboy, start_link, [Args]},
permanent, 5000, worker, [poolboy]}.

-spec start_link(Args :: proplists:proplist()) -> {ok, pid()}.
start_link(Args) ->
start_link(Args) ->
case proplists:get_value(name, Args) of
undefined ->
gen_fsm:start_link(?MODULE, Args, []);
Expand Down Expand Up @@ -74,10 +84,10 @@ init([_ | Rest], State) ->
init(Rest, State);
init([], #state{size=Size, supervisor=Sup, max_overflow=MaxOverflow}=State) ->
Workers = prepopulate(Size, Sup),
StartState = case {Size, MaxOverflow} of
{0, 0} -> full;
{0, _} -> overflow;
{_, _} -> ready
StartState = case Size of
Size when Size < 1, MaxOverflow < 1 -> full;
Size when Size < 1 -> overflow;
Size -> ready
end,
{ok, StartState, State#state{workers=Workers}}.

Expand All @@ -104,10 +114,10 @@ ready({checkout, Block, Timeout}, {FromPid, _}=From, State) ->
{{value, Pid}, Left} ->
Ref = erlang:monitor(process, FromPid),
true = ets:insert(Monitors, {Pid, Ref}),
NextState = case {queue:len(Left), MaxOverflow} of
{0, 0} -> full;
{0, _} -> overflow;
{_, _} -> ready
NextState = case queue:is_empty(Left) of
true when MaxOverflow < 1 -> full;
true -> overflow;
false -> ready
end,
{reply, Pid, NextState, State#state{workers=Left}};
{empty, Empty} when MaxOverflow > 0 ->
Expand Down Expand Up @@ -156,23 +166,24 @@ overflow({checkout, Block, Timeout}, From,
#state{overflow=Overflow,
max_overflow=MaxOverflow}=State) when Overflow >= MaxOverflow ->
case Block of
false ->
{reply, full, full, State};
Block ->
true ->
Waiting = add_waiting(From, Timeout, State#state.waiting),
{next_state, full, State#state{waiting=Waiting}}
{next_state, full, State#state{waiting=Waiting}};
false ->
{reply, full, full, State}
end;
overflow({checkout, _Block, _Timeout}, {From, _}, State) ->
#state{supervisor = Sup,
overflow = Overflow,
max_overflow = MaxOverflow} = State,
{Pid, Ref} = new_worker(Sup, From),
true = ets:insert(State#state.monitors, {Pid, Ref}),
NextState = case Overflow+1 >= MaxOverflow of
NewOverflow = Overflow + 1,
NextState = case NewOverflow >= MaxOverflow of
true -> full;
false -> overflow
end,
{reply, Pid, NextState, State#state{overflow=Overflow+1}};
{reply, Pid, NextState, State#state{overflow=NewOverflow}};
overflow(_Event, _From, State) ->
{reply, ok, overflow, State}.

Expand Down Expand Up @@ -212,11 +223,11 @@ full({checkin, Pid}, State) ->
full(_Event, State) ->
{next_state, full, State}.

full({checkout, false, _Timeout}, _From, State) ->
{reply, full, full, State};
full({checkout, _Block, Timeout}, From, State) ->
full({checkout, true, Timeout}, From, State) ->
Waiting = add_waiting(From, Timeout, State#state.waiting),
{next_state, full, State#state{waiting=Waiting}};
full({checkout, false, _Timeout}, _From, State) ->
{reply, full, full, State};
full(_Event, _From, State) ->
{reply, ok, full, State}.

Expand Down Expand Up @@ -344,7 +355,7 @@ dismiss_worker(Sup, Pid) ->
true = unlink(Pid),
supervisor:terminate_child(Sup, Pid).

prepopulate(0, _) ->
prepopulate(N, _) when N < 1 ->
queue:new();
prepopulate(N, Sup) ->
prepopulate(N, Sup, queue:new()).
Expand Down
3 changes: 0 additions & 3 deletions test/poolboy_test_worker.erl
Expand Up @@ -10,7 +10,6 @@ start_link(_Args) ->
gen_server:start_link(?MODULE, [], []).

init([]) ->
process_flag(trap_exit, true),
{ok, undefined}.

handle_call(die, _From, State) ->
Expand All @@ -21,8 +20,6 @@ handle_call(_Event, _From, State) ->
handle_cast(_Event, State) ->
{noreply, State}.

handle_info({'EXIT', _, _}, State) ->
{stop, shutdown, State};
handle_info(_Info, State) ->
{noreply, State}.

Expand Down
65 changes: 35 additions & 30 deletions test/poolboy_tests.erl
Expand Up @@ -47,6 +47,9 @@ pool_test_() ->
},
{<<"Pool behaves on owner death">>,
fun owner_death/0
},
{<<"Worker checked-in after an exception in a transaction">>,
fun checkin_after_exception_in_transaction/0
}
]
}.
Expand All @@ -69,9 +72,7 @@ checkin_worker(Pid, Worker) ->

pool_startup() ->
%% Check basic pool operation.
{ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
{worker_module, poolboy_test_worker},
{size, 10}, {max_overflow, 5}]),
{ok, Pid} = new_pool(10, 5),
?assertEqual(10, length(?sync(Pid, get_avail_workers))),
poolboy:checkout(Pid),
?assertEqual(9, length(?sync(Pid, get_avail_workers))),
Expand All @@ -84,9 +85,7 @@ pool_startup() ->

pool_overflow() ->
%% Check that the pool overflows properly.
{ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
{worker_module, poolboy_test_worker},
{size, 5}, {max_overflow, 5}]),
{ok, Pid} = new_pool(5, 5),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)],
?assertEqual(0, length(?sync(Pid, get_avail_workers))),
?assertEqual(7, length(?sync(Pid, get_all_workers))),
Expand All @@ -112,9 +111,7 @@ pool_overflow() ->
pool_empty() ->
%% Checks that the the pool handles the empty condition correctly when
%% overflow is enabled.
{ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
{worker_module, poolboy_test_worker},
{size, 5}, {max_overflow, 2}]),
{ok, Pid} = new_pool(5, 2),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)],
?assertEqual(0, length(?sync(Pid, get_avail_workers))),
?assertEqual(7, length(?sync(Pid, get_all_workers))),
Expand Down Expand Up @@ -160,9 +157,7 @@ pool_empty() ->
pool_empty_no_overflow() ->
%% Checks the pool handles the empty condition properly when overflow is
%% disabled.
{ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
{worker_module, poolboy_test_worker},
{size, 5}, {max_overflow, 0}]),
{ok, Pid} = new_pool(5, 0),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)],
?assertEqual(0, length(?sync(Pid, get_avail_workers))),
?assertEqual(5, length(?sync(Pid, get_all_workers))),
Expand Down Expand Up @@ -204,9 +199,7 @@ pool_empty_no_overflow() ->
worker_death() ->
%% Check that dead workers are only restarted when the pool is not full
%% and the overflow count is 0. Meaning, don't restart overflow workers.
{ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
{worker_module, poolboy_test_worker},
{size, 5}, {max_overflow, 2}]),
{ok, Pid} = new_pool(5, 2),
Worker = poolboy:checkout(Pid),
kill_worker(Worker),
?assertEqual(5, length(?sync(Pid, get_avail_workers))),
Expand All @@ -227,9 +220,7 @@ worker_death_while_full() ->
%% Check that if a worker dies while the pool is full and there is a
%% queued checkout, a new worker is started and the checkout serviced.
%% If there are no queued checkouts, a new worker is not started.
{ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
{worker_module, poolboy_test_worker},
{size, 5}, {max_overflow, 2}]),
{ok, Pid} = new_pool(5, 2),
Worker = poolboy:checkout(Pid),
kill_worker(Worker),
?assertEqual(5, length(?sync(Pid, get_avail_workers))),
Expand Down Expand Up @@ -270,9 +261,7 @@ worker_death_while_full_no_overflow() ->
%% Check that if a worker dies while the pool is full and there's no
%% overflow, a new worker is started unconditionally and any queued
%% checkouts are serviced.
{ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
{worker_module, poolboy_test_worker},
{size, 5}, {max_overflow, 0}]),
{ok, Pid} = new_pool(5, 0),
Worker = poolboy:checkout(Pid),
kill_worker(Worker),
?assertEqual(5, length(?sync(Pid, get_avail_workers))),
Expand Down Expand Up @@ -314,9 +303,7 @@ worker_death_while_full_no_overflow() ->
pool_full_nonblocking_no_overflow() ->
%% Check that when the pool is full, checkouts return 'full' when the
%% option to use non-blocking checkouts is used.
{ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
{worker_module, poolboy_test_worker},
{size, 5}, {max_overflow, 0}]),
{ok, Pid} = new_pool(5, 0),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)],
?assertEqual(0, length(?sync(Pid, get_avail_workers))),
?assertEqual(5, length(?sync(Pid, get_all_workers))),
Expand All @@ -331,9 +318,7 @@ pool_full_nonblocking_no_overflow() ->
pool_full_nonblocking() ->
%% Check that when the pool is full, checkouts return 'full' when the
%% option to use non-blocking checkouts is used.
{ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
{worker_module, poolboy_test_worker},
{size, 5}, {max_overflow, 5}]),
{ok, Pid} = new_pool(5, 5),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 9)],
?assertEqual(0, length(?sync(Pid, get_avail_workers))),
?assertEqual(10, length(?sync(Pid, get_all_workers))),
Expand All @@ -350,9 +335,7 @@ pool_full_nonblocking() ->
owner_death() ->
%% Check that a dead owner (a process that dies with a worker checked out)
%% causes the pool to dismiss the worker and prune the state space.
{ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
{worker_module, poolboy_test_worker},
{size, 5}, {max_overflow, 5}]),
{ok, Pid} = new_pool(5, 5),
spawn(fun() ->
poolboy:checkout(Pid),
receive after 500 -> exit(normal) end
Expand All @@ -362,3 +345,25 @@ owner_death() ->
?assertEqual(5, length(?sync(Pid, get_all_workers))),
?assertEqual(0, length(?sync(Pid, get_all_monitors))),
ok = ?sync(Pid, stop).

checkin_after_exception_in_transaction() ->
{ok, Pool} = new_pool(2, 0),
?assertEqual(2, length(?sync(Pool, get_avail_workers))),
Tx = fun(Worker) ->
?assert(is_pid(Worker)),
?assertEqual(1, length(?sync(Pool, get_avail_workers))),
throw(it_on_the_ground),
?assert(false)
end,
try
poolboy:transaction(Pool, Tx)
catch
throw:it_on_the_ground -> ok
end,
?assertEqual(2, length(?sync(Pool, get_avail_workers))),
ok = ?sync(Pool, stop).

new_pool(Size, MaxOverflow) ->
poolboy:start_link([{name, {local, poolboy_test}},
{worker_module, poolboy_test_worker},
{size, Size}, {max_overflow, MaxOverflow}]).

0 comments on commit 4987c10

Please sign in to comment.