Skip to content

Commit

Permalink
Merge pull request #6 from kevsmith/init_fun
Browse files Browse the repository at this point in the history
Added init and stop funs to poolboy's config
  • Loading branch information
Devin Torres committed Nov 18, 2011
2 parents 280c223 + 36bf84b commit b010d10
Showing 1 changed file with 50 additions and 40 deletions.
90 changes: 50 additions & 40 deletions src/poolboy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
code_change/4]).

-define(TIMEOUT, 5000).
-define(DEFAULT_INIT_FUN, fun(Worker) -> {ok, Worker} end).
-define(DEFAULT_STOP_FUN, fun(Worker) -> Worker ! stop end).

-record(state, {workers, worker_sup, waiting=queue:new(), monitors=[],
size=5, overflow=0, max_overflow=10}).
size=5, overflow=0, max_overflow=10, worker_init=?DEFAULT_INIT_FUN,
worker_stop=?DEFAULT_STOP_FUN}).

checkout(Pool) ->
checkout(Pool, true, ?TIMEOUT).
Expand All @@ -35,19 +38,23 @@ start_link(Args) ->

init(Args) ->
process_flag(trap_exit, true),
init(Args, #state{}).
init(Args, Args, #state{}).

init([{worker_module, Mod} | Rest], State) ->
init([{worker_module, Mod} | Rest], AllArgs, State) ->
{ok, Sup} = poolboy_sup:start_link(Mod, Rest),
init(Rest, State#state{worker_sup=Sup});
init([{size, PoolSize} | Rest], State) ->
init(Rest, State#state{size=PoolSize});
init([{max_overflow, MaxOverflow} | Rest], State) ->
init(Rest, State#state{max_overflow=MaxOverflow});
init([_ | Rest], State) ->
init(Rest, State);
init([], #state{size=Size, worker_sup=Sup}=State) ->
Workers = prepopulate(Size, Sup),
init(Rest, AllArgs, State#state{worker_sup=Sup});
init([{size, PoolSize} | Rest], AllArgs, State) ->
init(Rest, AllArgs, State#state{size=PoolSize});
init([{max_overflow, MaxOverflow} | Rest], AllArgs, State) ->
init(Rest, AllArgs, State#state{max_overflow=MaxOverflow});
init([{init_fun, InitFun} | Rest], AllArgs, State) when is_function(InitFun) ->
init(Rest, AllArgs, State#state{worker_init=InitFun});
init([{stop_fun, StopFun} | Rest], AllArgs, State) when is_function(StopFun) ->
init(Rest, AllArgs, State#state{worker_stop=StopFun});
init([_ | Rest], AllArgs, State) ->
init(Rest, AllArgs, State);
init([], _AllArgs, #state{size=Size, worker_sup=Sup, worker_init=InitFun}=State) ->
Workers = prepopulate(Size, Sup, InitFun),
{ok, ready, State#state{workers=Workers}}.

ready({checkin, Pid}, State) ->
Expand All @@ -62,7 +69,8 @@ ready(_Event, State) ->

ready({checkout, Block}, {FromPid, _}=From, #state{workers=Workers,
worker_sup=Sup,
max_overflow=MaxOverflow
max_overflow=MaxOverflow,
worker_init=InitFun
}=State) ->
case queue:out(Workers) of
{{value, Pid}, Left} ->
Expand All @@ -71,7 +79,7 @@ ready({checkout, Block}, {FromPid, _}=From, #state{workers=Workers,
{reply, Pid, ready, State#state{workers=Left,
monitors=Monitors}};
{empty, Empty} when MaxOverflow > 0 ->
{Pid, Ref} = new_worker(Sup, FromPid),
{Pid, Ref} = new_worker(Sup, FromPid, InitFun),
Monitors = [{Pid, Ref} | State#state.monitors],
{reply, Pid, overflow, State#state{workers=Empty,
monitors=Monitors,
Expand All @@ -86,15 +94,15 @@ ready({checkout, Block}, {FromPid, _}=From, #state{workers=Workers,
ready(_Event, _From, State) ->
{reply, ok, ready, State}.

overflow({checkin, Pid}, #state{overflow=1}=State) ->
dismiss_worker(Pid),
overflow({checkin, Pid}, #state{overflow=1, worker_stop=StopFun}=State) ->
dismiss_worker(Pid, StopFun),
Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
{value, {_, Ref}, Left} -> erlang:demonitor(Ref), Left;
false -> []
end,
{next_state, ready, State#state{overflow=0, monitors=Monitors}};
overflow({checkin, Pid}, #state{overflow=Overflow}=State) ->
dismiss_worker(Pid),
overflow({checkin, Pid}, #state{overflow=Overflow, worker_stop=StopFun}=State) ->
dismiss_worker(Pid, StopFun),
Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
{value, {_, Ref}, Left} -> erlang:demonitor(Ref), Left;
false -> State#state.monitors
Expand All @@ -114,16 +122,16 @@ overflow({checkout, Block}, From, #state{overflow=Overflow,
{next_state, full, State#state{waiting=queue:in(From, Waiting)}}
end;
overflow({checkout, _Block}, {From, _},
#state{worker_sup=Sup, overflow=Overflow}=State) ->
{Pid, Ref} = new_worker(Sup, From),
#state{worker_sup=Sup, overflow=Overflow, worker_init=InitFun}=State) ->
{Pid, Ref} = new_worker(Sup, From, InitFun),
Monitors = [{Pid, Ref} | State#state.monitors],
{reply, Pid, overflow, State#state{monitors=Monitors,
overflow=Overflow+1}};
overflow(_Event, _From, State) ->
{reply, ok, overflow, State}.

full({checkin, Pid}, #state{waiting=Waiting, max_overflow=MaxOverflow,
overflow=Overflow}=State) ->
overflow=Overflow, worker_stop=StopFun}=State) ->
Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
{value, {_, Ref0}, Left0} -> erlang:demonitor(Ref0), Left0;
false -> State#state.monitors
Expand All @@ -140,7 +148,7 @@ full({checkin, Pid}, #state{waiting=Waiting, max_overflow=MaxOverflow,
{next_state, ready, State#state{workers=Workers, waiting=Empty,
monitors=Monitors}};
{empty, Empty} ->
dismiss_worker(Pid),
dismiss_worker(Pid, StopFun),
{next_state, overflow, State#state{waiting=Empty,
monitors=Monitors,
overflow=Overflow-1}}
Expand Down Expand Up @@ -187,7 +195,8 @@ handle_info({'DOWN', Ref, _, _, _}, StateName, State) ->
handle_info({'EXIT', Pid, _}, StateName, #state{worker_sup=Sup,
overflow=Overflow,
waiting=Waiting,
max_overflow=MaxOverflow
max_overflow=MaxOverflow,
worker_init=InitFun
}=State) ->
Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
{value, {_, Ref}, Left} -> erlang:demonitor(Ref), Left;
Expand All @@ -196,7 +205,7 @@ handle_info({'EXIT', Pid, _}, StateName, #state{worker_sup=Sup,
case StateName of
ready ->
W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
{next_state, ready, State#state{workers=queue:in(new_worker(Sup), W),
{next_state, ready, State#state{workers=queue:in(new_worker(Sup, InitFun), W),
monitors=Monitors}};
overflow when Overflow =< 1 ->
{next_state, ready, State#state{monitors=Monitors, overflow=0}};
Expand All @@ -208,11 +217,11 @@ handle_info({'EXIT', Pid, _}, StateName, #state{worker_sup=Sup,
{{value, {FromPid, _}=From}, LeftWaiting} ->
MonitorRef = erlang:monitor(process, FromPid),
Monitors2 = [{FromPid, MonitorRef} | Monitors],
gen_fsm:reply(From, new_worker(Sup)),
gen_fsm:reply(From, new_worker(Sup, InitFun)),
{next_state, full, State#state{waiting=LeftWaiting,
monitors=Monitors2}};
{empty, Empty} ->
Workers2 = queue:in(new_worker(Sup), State#state.workers),
Workers2 = queue:in(new_worker(Sup, InitFun), State#state.workers),
{next_state, ready, State#state{monitors=Monitors,
waiting=Empty,
workers=Workers2}}
Expand All @@ -222,7 +231,7 @@ handle_info({'EXIT', Pid, _}, StateName, #state{worker_sup=Sup,
{{value, {FromPid, _}=From}, LeftWaiting} ->
MonitorRef = erlang:monitor(process, FromPid),
Monitors2 = [{FromPid, MonitorRef} | Monitors],
NewWorker = new_worker(Sup),
NewWorker = new_worker(Sup, InitFun),
gen_fsm:reply(From, NewWorker),
{next_state, full, State#state{waiting=LeftWaiting,
monitors=Monitors2}};
Expand All @@ -243,26 +252,27 @@ terminate(_Reason, _StateName, _State) -> ok.
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.

new_worker(Sup) ->
new_worker(Sup, InitFun) ->
{ok, Pid} = supervisor:start_child(Sup, []),
link(Pid),
Pid.
{ok, Pid1} = InitFun(Pid),
link(Pid1),
Pid1.

new_worker(Sup, FromPid) ->
Pid = new_worker(Sup),
new_worker(Sup, FromPid, InitFun) ->
Pid = new_worker(Sup, InitFun),
Ref = erlang:monitor(process, FromPid),
{Pid, Ref}.

dismiss_worker(Pid) ->
dismiss_worker(Pid, StopFun) ->
unlink(Pid),
Pid ! stop.
StopFun(Pid).

prepopulate(0, _Sup) ->
prepopulate(0, _Sup, _InitFun) ->
queue:new();
prepopulate(N, Sup) ->
prepopulate(N, Sup, queue:new()).
prepopulate(N, Sup, InitFun) ->
prepopulate(N, Sup, queue:new(), InitFun).

prepopulate(0, _Sup, Workers) ->
prepopulate(0, _Sup, Workers, _InitFun) ->
Workers;
prepopulate(N, Sup, Workers) ->
prepopulate(N-1, Sup, queue:in(new_worker(Sup), Workers)).
prepopulate(N, Sup, Workers, InitFun) ->
prepopulate(N-1, Sup, queue:in(new_worker(Sup, InitFun), Workers), InitFun).

0 comments on commit b010d10

Please sign in to comment.