Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Several additions and improvements

- Change a `queue:len/1` call during a checkout in the ready state to use
  `queue:is_empty/1`. All operations in poolboy that aren't informational
  or disaster recovery should now theoretically be O(1).
- There is now "transaction" support using `transaction/2`. This will
  run a user defined function passed the worker checked out in a try
  block, ensuring that the worker is checked back into the pool should
  the operation fail.
- A `child_spec/2` helper for embedding poolboy pools  within your
  supervisors.
- Documentation updates.
  • Loading branch information...
commit 4987c1051031ea8a5c5fc2a1ebad103483e3e0be 1 parent fb7014a
@devinus authored
View
36 README.md
@@ -4,11 +4,13 @@ Poolboy - A hunky Erlang worker pool factory
Usage
-----
-```erlang
-Worker = poolboy:checkout(PoolName),
-Reply = gen_server:call(Worker, WorkerFun),
-poolboy:checkin(PoolName, Worker),
-Reply.
+```erl-sh
+1> Worker = poolboy:checkout(PoolName).
+<0.9001.0>
+2> gen_server:call(Worker, Request).
+ok
+3> poolboy:checkin(PoolName, Worker).
+ok
```
Example
@@ -79,22 +81,19 @@ init([]) ->
Args = [{name, {local, PoolName}},
{worker_module, example_worker}]
++ PoolConfig,
- {PoolName, {poolboy, start_link, [Args]},
- permanent, 5000, worker, [poolboy]}
+ poolboy:child_spec(PoolName, Args)
end, Pools),
{ok, {{one_for_one, 10, 10}, PoolSpecs}}.
squery(PoolName, Sql) ->
- Worker = poolboy:checkout(PoolName),
- Reply = gen_server:call(Worker, {squery, Sql}),
- poolboy:checkin(PoolName, Worker),
- Reply.
+ poolboy:transaction(PoolName, fun(Worker) ->
+ gen_server:call(Worker, {squery, Sql})
+ end).
equery(PoolName, Stmt, Params) ->
- Worker = poolboy:checkout(PoolName),
- Reply = gen_server:call(Worker, {equery, Stmt, Params}),
- poolboy:checkin(PoolName, Worker),
- Reply.
+ poolboy:transaction(PoolName, fun(Worker) ->
+ gen_server:call(Worker, {equery, Stmt, Params})
+ end).
```
### example_worker.erl
@@ -114,7 +113,6 @@ start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).
init(Args) ->
- process_flag(trap_exit, true),
Hostname = proplists:get_value(hostname, Args),
Database = proplists:get_value(database, Args),
Username = proplists:get_value(username, Args),
@@ -134,15 +132,11 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info(stop, State) ->
- {stop, shutdown, State};
-handle_info({'EXIT', _, _}, State) ->
- {stop, shutdown, State};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #state{conn=Conn}) ->
- pgsql:close(Conn),
+ ok = pgsql:close(Conn),
ok.
code_change(_OldVsn, State, _Extra) ->
View
53 src/poolboy.erl
@@ -3,7 +3,7 @@
-module(poolboy).
-behaviour(gen_fsm).
--export([checkout/1, checkout/2, checkout/3, checkin/2,
+-export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2,
child_spec/2, start_link/1, stop/1]).
-export([init/1, ready/2, ready/3, overflow/2, overflow/3, full/2, full/3,
handle_event/3, handle_sync_event/4, handle_info/3, terminate/3,
@@ -35,9 +35,19 @@ checkout(Pool, Block, Timeout) ->
gen_fsm:sync_send_event(Pool, {checkout, Block, Timeout}, Timeout).
-spec checkin(Pool :: node(), Worker :: pid()) -> ok.
-checkin(Pool, Worker) ->
+checkin(Pool, Worker) when is_pid(Worker) ->
gen_fsm:send_event(Pool, {checkin, Worker}).
+-spec transaction(Pool :: node(), Fun :: fun((Worker :: pid()) -> any()))
+ -> any().
+transaction(Pool, Fun) ->
+ Worker = poolboy:checkout(Pool),
+ try
+ Fun(Worker)
+ after
+ ok = poolboy:checkin(Pool, Worker)
+ end.
+
-spec child_spec(Pool :: node(), Args :: proplists:proplist()) ->
supervisor:child_spec().
child_spec(Pool, Args) ->
@@ -45,7 +55,7 @@ child_spec(Pool, Args) ->
permanent, 5000, worker, [poolboy]}.
-spec start_link(Args :: proplists:proplist()) -> {ok, pid()}.
-start_link(Args) ->
+start_link(Args) ->
case proplists:get_value(name, Args) of
undefined ->
gen_fsm:start_link(?MODULE, Args, []);
@@ -74,10 +84,10 @@ init([_ | Rest], State) ->
init(Rest, State);
init([], #state{size=Size, supervisor=Sup, max_overflow=MaxOverflow}=State) ->
Workers = prepopulate(Size, Sup),
- StartState = case {Size, MaxOverflow} of
- {0, 0} -> full;
- {0, _} -> overflow;
- {_, _} -> ready
+ StartState = case Size of
+ Size when Size < 1, MaxOverflow < 1 -> full;
+ Size when Size < 1 -> overflow;
+ Size -> ready
end,
{ok, StartState, State#state{workers=Workers}}.
@@ -104,10 +114,10 @@ ready({checkout, Block, Timeout}, {FromPid, _}=From, State) ->
{{value, Pid}, Left} ->
Ref = erlang:monitor(process, FromPid),
true = ets:insert(Monitors, {Pid, Ref}),
- NextState = case {queue:len(Left), MaxOverflow} of
- {0, 0} -> full;
- {0, _} -> overflow;
- {_, _} -> ready
+ NextState = case queue:is_empty(Left) of
+ true when MaxOverflow < 1 -> full;
+ true -> overflow;
+ false -> ready
end,
{reply, Pid, NextState, State#state{workers=Left}};
{empty, Empty} when MaxOverflow > 0 ->
@@ -156,11 +166,11 @@ overflow({checkout, Block, Timeout}, From,
#state{overflow=Overflow,
max_overflow=MaxOverflow}=State) when Overflow >= MaxOverflow ->
case Block of
- false ->
- {reply, full, full, State};
- Block ->
+ true ->
Waiting = add_waiting(From, Timeout, State#state.waiting),
- {next_state, full, State#state{waiting=Waiting}}
+ {next_state, full, State#state{waiting=Waiting}};
+ false ->
+ {reply, full, full, State}
end;
overflow({checkout, _Block, _Timeout}, {From, _}, State) ->
#state{supervisor = Sup,
@@ -168,11 +178,12 @@ overflow({checkout, _Block, _Timeout}, {From, _}, State) ->
max_overflow = MaxOverflow} = State,
{Pid, Ref} = new_worker(Sup, From),
true = ets:insert(State#state.monitors, {Pid, Ref}),
- NextState = case Overflow+1 >= MaxOverflow of
+ NewOverflow = Overflow + 1,
+ NextState = case NewOverflow >= MaxOverflow of
true -> full;
false -> overflow
end,
- {reply, Pid, NextState, State#state{overflow=Overflow+1}};
+ {reply, Pid, NextState, State#state{overflow=NewOverflow}};
overflow(_Event, _From, State) ->
{reply, ok, overflow, State}.
@@ -212,11 +223,11 @@ full({checkin, Pid}, State) ->
full(_Event, State) ->
{next_state, full, State}.
-full({checkout, false, _Timeout}, _From, State) ->
- {reply, full, full, State};
-full({checkout, _Block, Timeout}, From, State) ->
+full({checkout, true, Timeout}, From, State) ->
Waiting = add_waiting(From, Timeout, State#state.waiting),
{next_state, full, State#state{waiting=Waiting}};
+full({checkout, false, _Timeout}, _From, State) ->
+ {reply, full, full, State};
full(_Event, _From, State) ->
{reply, ok, full, State}.
@@ -344,7 +355,7 @@ dismiss_worker(Sup, Pid) ->
true = unlink(Pid),
supervisor:terminate_child(Sup, Pid).
-prepopulate(0, _) ->
+prepopulate(N, _) when N < 1 ->
queue:new();
prepopulate(N, Sup) ->
prepopulate(N, Sup, queue:new()).
View
3  test/poolboy_test_worker.erl
@@ -10,7 +10,6 @@ start_link(_Args) ->
gen_server:start_link(?MODULE, [], []).
init([]) ->
- process_flag(trap_exit, true),
{ok, undefined}.
handle_call(die, _From, State) ->
@@ -21,8 +20,6 @@ handle_call(_Event, _From, State) ->
handle_cast(_Event, State) ->
{noreply, State}.
-handle_info({'EXIT', _, _}, State) ->
- {stop, shutdown, State};
handle_info(_Info, State) ->
{noreply, State}.
View
65 test/poolboy_tests.erl
@@ -47,6 +47,9 @@ pool_test_() ->
},
{<<"Pool behaves on owner death">>,
fun owner_death/0
+ },
+ {<<"Worker checked-in after an exception in a transaction">>,
+ fun checkin_after_exception_in_transaction/0
}
]
}.
@@ -69,9 +72,7 @@ checkin_worker(Pid, Worker) ->
pool_startup() ->
%% Check basic pool operation.
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, 10}, {max_overflow, 5}]),
+ {ok, Pid} = new_pool(10, 5),
?assertEqual(10, length(?sync(Pid, get_avail_workers))),
poolboy:checkout(Pid),
?assertEqual(9, length(?sync(Pid, get_avail_workers))),
@@ -84,9 +85,7 @@ pool_startup() ->
pool_overflow() ->
%% Check that the pool overflows properly.
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, 5}, {max_overflow, 5}]),
+ {ok, Pid} = new_pool(5, 5),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)],
?assertEqual(0, length(?sync(Pid, get_avail_workers))),
?assertEqual(7, length(?sync(Pid, get_all_workers))),
@@ -112,9 +111,7 @@ pool_overflow() ->
pool_empty() ->
%% Checks that the the pool handles the empty condition correctly when
%% overflow is enabled.
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, 5}, {max_overflow, 2}]),
+ {ok, Pid} = new_pool(5, 2),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)],
?assertEqual(0, length(?sync(Pid, get_avail_workers))),
?assertEqual(7, length(?sync(Pid, get_all_workers))),
@@ -160,9 +157,7 @@ pool_empty() ->
pool_empty_no_overflow() ->
%% Checks the pool handles the empty condition properly when overflow is
%% disabled.
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, 5}, {max_overflow, 0}]),
+ {ok, Pid} = new_pool(5, 0),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)],
?assertEqual(0, length(?sync(Pid, get_avail_workers))),
?assertEqual(5, length(?sync(Pid, get_all_workers))),
@@ -204,9 +199,7 @@ pool_empty_no_overflow() ->
worker_death() ->
%% Check that dead workers are only restarted when the pool is not full
%% and the overflow count is 0. Meaning, don't restart overflow workers.
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, 5}, {max_overflow, 2}]),
+ {ok, Pid} = new_pool(5, 2),
Worker = poolboy:checkout(Pid),
kill_worker(Worker),
?assertEqual(5, length(?sync(Pid, get_avail_workers))),
@@ -227,9 +220,7 @@ worker_death_while_full() ->
%% Check that if a worker dies while the pool is full and there is a
%% queued checkout, a new worker is started and the checkout serviced.
%% If there are no queued checkouts, a new worker is not started.
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, 5}, {max_overflow, 2}]),
+ {ok, Pid} = new_pool(5, 2),
Worker = poolboy:checkout(Pid),
kill_worker(Worker),
?assertEqual(5, length(?sync(Pid, get_avail_workers))),
@@ -270,9 +261,7 @@ worker_death_while_full_no_overflow() ->
%% Check that if a worker dies while the pool is full and there's no
%% overflow, a new worker is started unconditionally and any queued
%% checkouts are serviced.
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, 5}, {max_overflow, 0}]),
+ {ok, Pid} = new_pool(5, 0),
Worker = poolboy:checkout(Pid),
kill_worker(Worker),
?assertEqual(5, length(?sync(Pid, get_avail_workers))),
@@ -314,9 +303,7 @@ worker_death_while_full_no_overflow() ->
pool_full_nonblocking_no_overflow() ->
%% Check that when the pool is full, checkouts return 'full' when the
%% option to use non-blocking checkouts is used.
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, 5}, {max_overflow, 0}]),
+ {ok, Pid} = new_pool(5, 0),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)],
?assertEqual(0, length(?sync(Pid, get_avail_workers))),
?assertEqual(5, length(?sync(Pid, get_all_workers))),
@@ -331,9 +318,7 @@ pool_full_nonblocking_no_overflow() ->
pool_full_nonblocking() ->
%% Check that when the pool is full, checkouts return 'full' when the
%% option to use non-blocking checkouts is used.
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, 5}, {max_overflow, 5}]),
+ {ok, Pid} = new_pool(5, 5),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 9)],
?assertEqual(0, length(?sync(Pid, get_avail_workers))),
?assertEqual(10, length(?sync(Pid, get_all_workers))),
@@ -350,9 +335,7 @@ pool_full_nonblocking() ->
owner_death() ->
%% Check that a dead owner (a process that dies with a worker checked out)
%% causes the pool to dismiss the worker and prune the state space.
- {ok, Pid} = poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, 5}, {max_overflow, 5}]),
+ {ok, Pid} = new_pool(5, 5),
spawn(fun() ->
poolboy:checkout(Pid),
receive after 500 -> exit(normal) end
@@ -362,3 +345,25 @@ owner_death() ->
?assertEqual(5, length(?sync(Pid, get_all_workers))),
?assertEqual(0, length(?sync(Pid, get_all_monitors))),
ok = ?sync(Pid, stop).
+
+checkin_after_exception_in_transaction() ->
+ {ok, Pool} = new_pool(2, 0),
+ ?assertEqual(2, length(?sync(Pool, get_avail_workers))),
+ Tx = fun(Worker) ->
+ ?assert(is_pid(Worker)),
+ ?assertEqual(1, length(?sync(Pool, get_avail_workers))),
+ throw(it_on_the_ground),
+ ?assert(false)
+ end,
+ try
+ poolboy:transaction(Pool, Tx)
+ catch
+ throw:it_on_the_ground -> ok
+ end,
+ ?assertEqual(2, length(?sync(Pool, get_avail_workers))),
+ ok = ?sync(Pool, stop).
+
+new_pool(Size, MaxOverflow) ->
+ poolboy:start_link([{name, {local, poolboy_test}},
+ {worker_module, poolboy_test_worker},
+ {size, Size}, {max_overflow, MaxOverflow}]).
Please sign in to comment.
Something went wrong with that request. Please try again.