Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add poolboy_worker behaviour

  • Loading branch information...
commit 9aa1a6ad3ac343fe7ce10230123825f78112e58e 1 parent beca617
@devinus authored
View
44 README.md
@@ -11,8 +11,11 @@ poolboy:checkin(PoolName, Worker),
Reply.
```
-Example Application
--------------------
+Example
+-------
+
+This is an example application showcasing database connection pools using
+Poolboy and Will Glozer's [epgsql](https://github.com/wg/epgsql).
### example.app
@@ -54,14 +57,21 @@ Example Application
-behaviour(application).
-behaviour(supervisor).
--export([start/0, stop/0, start/2, stop/1, init/1, squery/2, equery/3]).
+-export([start/0, stop/0, query/2, query/3]).
+-export([start/2, stop/1]).
+-export([init/1]).
+
+start() ->
+ application:start(?MODULE).
-start() -> application:start(?MODULE).
-stop() -> application:stop(?MODULE).
+stop() ->
+ application:stop(?MODULE).
start(_Type, _Args) ->
supervisor:start_link({local, example_sup}, ?MODULE, []).
-stop(_State) -> ok.
+
+stop(_State) ->
+ ok.
init([]) ->
{ok, Pools} = application:get_env(example, pools),
@@ -74,15 +84,15 @@ init([]) ->
end, Pools),
{ok, {{one_for_one, 10, 10}, PoolSpecs}}.
-squery(PoolName, Sql) ->
+query(PoolName, Sql) ->
Worker = poolboy:checkout(PoolName),
- Reply = gen_server:call(Worker, {squery, Sql}),
+ Reply = gen_server:call(Worker, {query, Sql}),
poolboy:checkin(PoolName, Worker),
Reply.
-equery(PoolName, Stmt, Params) ->
+query(PoolName, Stmt, Params) ->
Worker = poolboy:checkout(PoolName),
- Reply = gen_server:call(Worker, {equery, Stmt, Params}),
+ Reply = gen_server:call(Worker, {query, Stmt, Params}),
poolboy:checkin(PoolName, Worker),
Reply.
```
@@ -92,14 +102,16 @@ equery(PoolName, Stmt, Params) ->
```erlang
-module(example_worker).
-behaviour(gen_server).
+-behaviour(poolboy_worker).
--export([start_link/1, stop/0, init/1, handle_call/3, handle_cast/2,
- handle_info/2, terminate/2, code_change/3]).
+-export([start_link/1]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
-record(state, {conn}).
-start_link(Args) -> gen_server:start_link(?MODULE, Args, []).
-stop() -> gen_server:cast(?MODULE, stop).
+start_link(Args) ->
+ gen_server:start_link(?MODULE, Args, []).
init(Args) ->
process_flag(trap_exit, true),
@@ -112,9 +124,9 @@ init(Args) ->
]),
{ok, #state{conn=Conn}}.
-handle_call({squery, Sql}, _From, #state{conn=Conn}=State) ->
+handle_call({query, Sql}, _From, #state{conn=Conn}=State) ->
{reply, pgsql:squery(Conn, Sql), State};
-handle_call({equery, Stmt, Params}, _From, #state{conn=Conn}=State) ->
+handle_call({query, Stmt, Params}, _From, #state{conn=Conn}=State) ->
{reply, pgsql:equery(Conn, Stmt, Params), State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
View
46 src/poolboy.erl
@@ -56,12 +56,12 @@ init(Args) ->
process_flag(trap_exit, true),
init(Args, #state{}).
-init([{worker_module, Mod} | Rest], State) ->
+init([{worker_module, Mod} | Rest], State) when is_atom(Mod) ->
{ok, Sup} = poolboy_sup:start_link(Mod, Rest),
init(Rest, State#state{worker_sup=Sup});
-init([{size, Size} | Rest], State) ->
+init([{size, Size} | Rest], State) when is_integer(Size) ->
init(Rest, State#state{size=Size});
-init([{max_overflow, MaxOverflow} | Rest], State) ->
+init([{max_overflow, MaxOverflow} | Rest], State) when is_integer(MaxOverflow) ->
init(Rest, State#state{max_overflow=MaxOverflow});
init([{init_fun, InitFun} | Rest], State) when is_function(InitFun) ->
init(Rest, State#state{worker_init=InitFun});
@@ -70,7 +70,7 @@ init([{stop_fun, StopFun} | Rest], State) when is_function(StopFun) ->
init([_ | Rest], State) ->
init(Rest, State);
init([], #state{size=Size, worker_sup=Sup, worker_init=InitFun,
- max_overflow=MaxOverflow}=State) ->
+ max_overflow=MaxOverflow}=State) ->
Workers = prepopulate(Size, Sup, InitFun),
StartState = case queue:len(Workers) of
0 when MaxOverflow =:= 0 -> full;
@@ -103,12 +103,9 @@ ready({checkout, Block, Timeout}, {FromPid, _}=From, State) ->
Ref = erlang:monitor(process, FromPid),
Monitors = [{Pid, Ref} | State#state.monitors],
NextState = case queue:len(Left) of
- 0 when MaxOverflow == 0 ->
- full;
- 0 ->
- overflow;
- _ ->
- ready
+ 0 when MaxOverflow =:= 0 -> full;
+ 0 -> overflow;
+ _ -> ready
end,
{reply, Pid, NextState, State#state{workers=Left,
monitors=Monitors}};
@@ -121,9 +118,8 @@ ready({checkout, Block, Timeout}, {FromPid, _}=From, State) ->
{empty, Empty} when Block =:= false ->
{reply, full, full, State#state{workers=Empty}};
{empty, Empty} ->
- Waiting = State#state.waiting,
- {next_state, full, State#state{workers=Empty,
- waiting=add_waiting(From, Timeout, Waiting)}}
+ Waiting = add_waiting(From, Timeout, State#state.waiting),
+ {next_state, full, State#state{workers=Empty, waiting=Waiting}}
end;
ready(_Event, _From, State) ->
{reply, ok, ready, State}.
@@ -136,10 +132,10 @@ overflow({checkin, Pid}, #state{overflow=0}=State) ->
true -> ready;
_ -> overflow
end,
+ Workers = queue:in(Pid, State#state.workers),
{next_state, NextState, State#state{overflow=0, monitors=Monitors,
- workers=queue:in(Pid, State#state.workers)}};
+ workers=Workers}};
false ->
- %% unknown process checked in, ignore it
{next_state, overflow, State}
end;
overflow({checkin, Pid}, State) ->
@@ -151,7 +147,6 @@ overflow({checkin, Pid}, State) ->
{next_state, overflow, State#state{overflow=Overflow-1,
monitors=Monitors}};
_ ->
- %% unknown process checked in, ignore it
{next_state, overflow, State}
end;
overflow(_Event, State) ->
@@ -164,8 +159,8 @@ overflow({checkout, Block, Timeout}, From, #state{overflow=Overflow,
false ->
{reply, full, full, State};
Block ->
- Waiting = State#state.waiting,
- {next_state, full, State#state{waiting=add_waiting(From, Timeout, Waiting)}}
+ Waiting = add_waiting(From, Timeout, State#state.waiting),
+ {next_state, full, State#state{waiting=Waiting}}
end;
overflow({checkout, _Block, _Timeout}, {From, _}, State) ->
#state{worker_sup=Sup, overflow=Overflow,
@@ -220,8 +215,9 @@ full(_Event, State) ->
full({checkout, false, _Timeout}, _From, State) ->
{reply, full, full, State};
-full({checkout, _Block, Timeout}, From, #state{waiting=Waiting}=State) ->
- {next_state, full, State#state{waiting=add_waiting(From, Timeout, Waiting)}};
+full({checkout, _Block, Timeout}, From, State) ->
+ Waiting = add_waiting(From, Timeout, State#state.waiting),
+ {next_state, full, State#state{waiting=Waiting}};
full(_Event, _From, State) ->
{reply, ok, full, State}.
@@ -272,7 +268,7 @@ handle_info({'EXIT', Pid, Reason}, StateName, State) ->
W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
{next_state, ready, State#state{workers=queue:in(new_worker(Sup, InitFun), W),
monitors=Monitors}};
- overflow when Overflow == 0 ->
+ overflow when Overflow =:= 0 ->
W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
{next_state, ready, State#state{workers=queue:in(new_worker(Sup, InitFun), W),
monitors=Monitors}};
@@ -359,18 +355,18 @@ dismiss_worker(Pid, StopFun) ->
unlink(Pid),
StopFun(Pid).
-prepopulate(0, _Sup, _InitFun) ->
+prepopulate(0, _, _) ->
queue:new();
prepopulate(N, Sup, InitFun) ->
prepopulate(N, Sup, queue:new(), InitFun).
-prepopulate(0, _Sup, Workers, _InitFun) ->
+prepopulate(0, _, Workers, _) ->
Workers;
prepopulate(N, Sup, Workers, InitFun) ->
prepopulate(N-1, Sup, queue:in(new_worker(Sup, InitFun), Workers), InitFun).
-add_waiting(From, Timeout, Queue) ->
- queue:in({From, Timeout, os:timestamp()}, Queue).
+add_waiting(Pid, Timeout, Queue) ->
+ queue:in({Pid, Timeout, os:timestamp()}, Queue).
wait_valid(infinity, _) ->
true;
View
10 src/poolboy_worker.erl
@@ -0,0 +1,10 @@
+%% Poolboy - A hunky Erlang worker pool factory
+
+-module(poolboy_worker).
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [{start_link, 1}];
+behaviour_info(_Other) ->
+ undefined.
View
11 test/poolboy_test_worker.erl
@@ -1,18 +1,17 @@
-module(poolboy_test_worker).
-
-behaviour(gen_server).
+-behaviour(poolboy_worker).
-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--record(state, {}).
-
start_link(_Args) ->
gen_server:start_link(?MODULE, [], []).
init([]) ->
- {ok, #state{}}.
+ process_flag(trap_exit, true),
+ {ok, undefined}.
handle_call(die, _From, State) ->
{stop, {error, died}, dead, State};
@@ -23,7 +22,9 @@ handle_cast(_Event, State) ->
{noreply, State}.
handle_info(stop, State) ->
- {stop, normal, State};
+ {stop, shutdown, State};
+handle_info({'EXIT', _, _}, State) ->
+ {stop, shutdown, State};
handle_info(_Info, State) ->
{noreply, State}.
Please sign in to comment.
Something went wrong with that request. Please try again.