Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Add back-off ability. #22

Closed
wants to merge 1 commit into from

4 participants

@ddosia

Ratio

If worker can not not be spawned for some reason(database is offline) poolboy will die. This pull request adds new option to spawn worker:

{backoff, {Mod, Fun, Args}}

which is intended to increase time-outs between worker restarts if something going wrong.

Main idea

instead of worker spawn, we spawn a wrapper, which sleeps for some time, awakes, spawn worker and delegate this worker to poolboy.

If backoff option is not specified then worker will be spawned as usual (i.e. synchronous).

Tests

if you try to run test with backoff enabled option, some of them will fail (like those, who kill worker and immediately try to count them with get_avail_workers as spawn still in progress). Perhaps some of such calls should be re-designed. Thus with disabled backoff all tests are fine.

@devinus
Owner

I like the general idea, but it could use some work.

@ddosia

I love work!

@Vagabond Vagabond commented on the diff
src/poolboy.erl
((17 lines not shown))
{ok, Pid} = supervisor:start_child(Sup, []),
true = link(Pid),
Pid.
-new_worker(Sup, FromPid) ->
- Pid = new_worker(Sup),
- Ref = erlang:monitor(process, FromPid),
- {Pid, Ref}.
-
-dismiss_worker(Sup, Pid) ->
+-spec new_worker_async(state()) -> pid().
+new_worker_async(State = #state{backoff = Backoff}) ->
+ Self = self(),
+ proc_lib:spawn(fun () ->
@Vagabond Collaborator
Vagabond added a note

Is there's a reason this isn't a regular spawn(), also do we not care if the wrapper exits?

@Vagabond Collaborator
Vagabond added a note

Oh ok, we monitor the pid in bind_consumer.

@devinus Owner
devinus added a note

Yes, I was also curious why it wasn't just a spawn().

@ddosia
ddosia added a note

The idea was in case of spawn failure to produce OTP's failure report

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@wcummings

I'm interested in this feature, is it still being worked on? If not, is there somewhere I can pick up work on this?

@ddosia

@wcummings we successfuly use this changed in production about month, there was no issues. However to complete this feature (even in case if this feature will be declined by @devinus ) there should be done some work in tests suite. Some tests is could not be passed b/c of async nature(and must be replaced with their equivalent), some (as in travis failed build) must be investigated. I plan to do this when I will crop some time (not sure, maybe in two weeks). Also I need general criticism from @Vagabond and @devinus about this feature.

@devinus devinus closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Feb 11, 2013
  1. @ddosia

    Add back-off ability.

    ddosia authored
This page is out of date. Refresh to see the latest.
View
1  README.md
@@ -149,6 +149,7 @@ code_change(_OldVsn, State, _Extra) ->
- `worker_module`: the module that represents the workers
- `size`: maximum pool size
- `max_overflow`: maximum number of workers created if pool is empty
+- `backoff`: handle spawn timeouts in case if worker is unable to be started
## Authors
View
289 src/poolboy.erl
@@ -11,16 +11,30 @@
-define(TIMEOUT, 5000).
+-record(backoff, {
+ sleep = 0 :: number(),
+ mfargs :: {atom(), atom(), list()}
+}).
+
-record(state, {
- supervisor :: pid(),
- workers :: queue(),
- waiting :: queue(),
- monitors :: ets:tid(),
- size = 5 :: non_neg_integer(),
- overflow = 0 :: non_neg_integer(),
- max_overflow = 10 :: non_neg_integer()
+ supervisor :: pid(),
+ workers :: queue(),
+ waiting :: queue(),
+ monitors :: ets:tid(),
+ size = 5 :: non_neg_integer(),
+ overflow = 0 :: non_neg_integer(),
+ max_overflow = 10 :: non_neg_integer(),
+ backoff :: undefined | backoff()
}).
+-opaque state() :: #state{}.
+-opaque backoff() :: #backoff{}.
+
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
-spec checkout(Pool :: node()) -> pid().
checkout(Pool) ->
checkout(Pool, true).
@@ -97,6 +111,10 @@ stop(Pool) ->
status(Pool) ->
gen_server:call(Pool, status).
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
init({PoolArgs, WorkerArgs}) ->
process_flag(trap_exit, true),
Waiting = queue:new(),
@@ -108,19 +126,23 @@ init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) ->
init(Rest, WorkerArgs, State#state{supervisor = Sup});
init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) ->
init(Rest, WorkerArgs, State#state{size = Size});
-init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) when is_integer(MaxOverflow) ->
+init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State)
+ when is_integer(MaxOverflow) ->
init(Rest, WorkerArgs, State#state{max_overflow = MaxOverflow});
+init([{backoff, MFA = {M, F, A}} | Rest], WorkerArgs, State)
+ when is_atom(M), is_atom(F), is_list(A) ->
+ Backoff = #backoff{mfargs = MFA},
+ init(Rest, WorkerArgs, State#state{backoff = Backoff});
init([_ | Rest], WorkerArgs, State) ->
init(Rest, WorkerArgs, State);
-init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) ->
- Workers = prepopulate(Size, Sup),
+init([], _WorkerArgs, #state{size = Size} = State) ->
+ Workers = prepopulate(Size, State),
{ok, State#state{workers = Workers}}.
-handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) ->
- case ets:lookup(Monitors, Pid) of
+handle_cast({checkin, Worker}, State = #state{monitors = Monitors}) ->
+ case ets:lookup(Monitors, Worker) of
[{Pid, Ref}] ->
- true = erlang:demonitor(Ref),
- true = ets:delete(Monitors, Pid),
+ true = unbind_consumer(Worker, Ref, State),
NewState = handle_checkin(Pid, State),
{noreply, NewState};
[] ->
@@ -130,26 +152,51 @@ handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) ->
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_call({checkout, Block, Timeout}, {FromPid, _} = From, State) ->
- #state{supervisor = Sup,
- workers = Workers,
- monitors = Monitors,
+handle_call({checkout, Block, Timeout}, {Consumer, _} = From, State) ->
+ #state{workers = Workers,
overflow = Overflow,
+ waiting = Waiting,
max_overflow = MaxOverflow} = State,
case queue:out(Workers) of
- {{value, Pid}, Left} ->
- Ref = erlang:monitor(process, FromPid),
- true = ets:insert(Monitors, {Pid, Ref}),
- {reply, Pid, State#state{workers = Left}};
- {empty, Empty} when MaxOverflow > 0, Overflow < MaxOverflow ->
- {Pid, Ref} = new_worker(Sup, FromPid),
- true = ets:insert(Monitors, {Pid, Ref}),
- {reply, Pid, State#state{workers = Empty, overflow = Overflow + 1}};
- {empty, Empty} when Block =:= false ->
- {reply, full, State#state{workers = Empty}};
- {empty, Empty} ->
- Waiting = add_waiting(From, Timeout, State#state.waiting),
- {noreply, State#state{workers = Empty, waiting = Waiting}}
+ {{value, Worker}, Left} ->
+ true = bind_consumer(Worker, Consumer, State),
+ {reply, Worker, State#state{workers = Left}};
+ {empty, _} when MaxOverflow > 0, Overflow < MaxOverflow ->
+ case new_worker(State) of
+ {ok, Worker} ->
+ true = bind_consumer(Worker, Consumer, State),
+ {reply, Worker, State#state{overflow = Overflow + 1}};
+ {wait, _} ->
+ {noreply, State#state{
+ waiting = add_waiting(From, Timeout, Waiting),
+ overflow = Overflow + 1
+ }}
+ end;
+ {empty, _} when Block == false ->
+ {reply, full, State};
+ {empty, _} ->
+ {noreply, State#state{
+ waiting = add_waiting(From, Timeout, Waiting)
+ }}
+ end;
+
+handle_call({new_worker_spawned, Worker}, {Wrapper, _}, State) ->
+ case ets:lookup(State#state.monitors, Wrapper) of
+ [{Wrapper, WrapperRef}] ->
+ true = unbind_consumer(Wrapper, WrapperRef, State),
+ true = link(Worker),
+
+ Backoff = State#state.backoff,
+ Backoff#backoff.sleep == 0 orelse error_logger:info_msg(
+ "poolboy(~p): worker(~p) started successfuly", [self(), Worker]
+ ),
+
+ {reply, ok, handle_checkin(
+ Worker, State#state{backoff = Backoff#backoff{sleep = 0}}
+ )};
+ [] ->
+ Reply = {error, monitor_not_found},
+ {reply, Reply, State}
end;
handle_call(status, _From, State) ->
@@ -177,40 +224,45 @@ handle_call(_Msg, _From, State) ->
Reply = {error, invalid_message},
{reply, Reply, State}.
-handle_info({'DOWN', Ref, _, _, _}, State) ->
- case ets:match(State#state.monitors, {'$1', Ref}) of
- [[Pid]] ->
- Sup = State#state.supervisor,
- ok = supervisor:terminate_child(Sup, Pid),
+handle_info({'DOWN', Ref, _, Pid, _}, State) ->
+ #state{monitors = Monitors, supervisor = Sup} = State,
+ case ets:match(Monitors, {'$1', Ref}) of
+ [[Wrapper]] when Wrapper == Pid ->
+ %% future worker is going 'DOWN'
+ NewState = handle_worker_exit(
+ Wrapper,
+ State#state{backoff = inc_sleep(State#state.backoff)}
+ ),
+ {noreply, NewState};
+ [[Worker]] ->
+ ok = supervisor:terminate_child(Sup, Worker),
%% Don't wait for the EXIT message to come in.
%% Deal with the worker exit right now to avoid
%% a race condition with messages waiting in the
%% mailbox.
- true = ets:delete(State#state.monitors, Pid),
- NewState = handle_worker_exit(Pid, State),
+ NewState = handle_worker_exit(Worker, State),
{noreply, NewState};
[] ->
{noreply, State}
end;
-handle_info({'EXIT', Pid, _Reason}, State) ->
- #state{supervisor = Sup,
- monitors = Monitors} = State,
- case ets:lookup(Monitors, Pid) of
- [{Pid, Ref}] ->
- true = erlang:demonitor(Ref),
- true = ets:delete(Monitors, Pid),
- NewState = handle_worker_exit(Pid, State),
- {noreply, NewState};
+handle_info({'EXIT', Sup, Reason}, #state{supervisor = Sup}) ->
+ %% die in case of supervisor's death
+ exit(Reason);
+handle_info({'EXIT', Worker, _Reason}, State) ->
+ NewState = case ets:lookup(State#state.monitors, Worker) of
+ [{Worker, ConsumerRef}] ->
+ %% worker was in use
+ true = unbind_consumer(Worker, ConsumerRef, State),
+ handle_worker_exit(Worker, State);
[] ->
- case queue:member(Pid, State#state.workers) of
+ case queue:member(Worker, State#state.workers) of
true ->
- W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
- {noreply, State#state{workers = queue:in(new_worker(Sup), W)}};
+ handle_worker_exit(Worker, State);
false ->
- {noreply, State}
+ State
end
- end;
-
+ end,
+ {noreply, NewState};
handle_info(_Info, State) ->
{noreply, State}.
@@ -220,6 +272,10 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
start_pool(StartFun, PoolArgs, WorkerArgs) ->
case proplists:get_value(name, PoolArgs) of
undefined ->
@@ -228,29 +284,47 @@ start_pool(StartFun, PoolArgs, WorkerArgs) ->
gen_server:StartFun(Name, ?MODULE, {PoolArgs, WorkerArgs}, [])
end.
-new_worker(Sup) ->
+-spec new_worker(state()) -> {ok, pid()} | {wait, pid()}.
+new_worker(State = #state{backoff = undefined}) ->
+ {ok, new_worker_sync(State)};
+new_worker(State) ->
+ WrapperPid = new_worker_async(State),
+ %% we will add future pid's holder as a consumer
+ true = bind_consumer(WrapperPid, WrapperPid, State),
+ {wait, WrapperPid}.
+
+
+-spec new_worker_sync(state()) -> pid().
+new_worker_sync(#state{supervisor = Sup}) ->
{ok, Pid} = supervisor:start_child(Sup, []),
true = link(Pid),
Pid.
-new_worker(Sup, FromPid) ->
- Pid = new_worker(Sup),
- Ref = erlang:monitor(process, FromPid),
- {Pid, Ref}.
-
-dismiss_worker(Sup, Pid) ->
+-spec new_worker_async(state()) -> pid().
+new_worker_async(State = #state{backoff = Backoff}) ->
+ Self = self(),
+ proc_lib:spawn(fun () ->
@Vagabond Collaborator
Vagabond added a note

Is there's a reason this isn't a regular spawn(), also do we not care if the wrapper exits?

@Vagabond Collaborator
Vagabond added a note

Oh ok, we monitor the pid in bind_consumer.

@devinus Owner
devinus added a note

Yes, I was also curious why it wasn't just a spawn().

@ddosia
ddosia added a note

The idea was in case of spawn failure to produce OTP's failure report

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ timer:sleep(Backoff#backoff.sleep),
+ Pid = new_worker_sync(State),
+ ok = gen_server:call(Self, {new_worker_spawned, Pid}),
+ unlink(Pid)
+ end).
+
+-spec dismiss_worker(Worker :: pid(), state()) -> true.
+dismiss_worker(Pid, #state{supervisor = Sup}) ->
true = unlink(Pid),
- supervisor:terminate_child(Sup, Pid).
+ ok = supervisor:terminate_child(Sup, Pid),
+ true.
-prepopulate(N, _Sup) when N < 1 ->
+prepopulate(N, _State) when N < 1 ->
queue:new();
-prepopulate(N, Sup) ->
- prepopulate(N, Sup, queue:new()).
+prepopulate(N, State) ->
+ prepopulate(N, State, queue:new()).
-prepopulate(0, _Sup, Workers) ->
+prepopulate(0, _State, Workers) ->
Workers;
-prepopulate(N, Sup, Workers) ->
- prepopulate(N-1, Sup, queue:in(new_worker(Sup), Workers)).
+prepopulate(N, State, Workers) ->
+ prepopulate(N-1, State, queue:in(new_worker_sync(State), Workers)).
add_waiting(Pid, Timeout, Queue) ->
queue:in({Pid, Timeout, os:timestamp()}, Queue).
@@ -262,53 +336,54 @@ wait_valid(StartTime, Timeout) ->
(Waited div 1000) < Timeout.
handle_checkin(Pid, State) ->
- #state{supervisor = Sup,
- waiting = Waiting,
- monitors = Monitors,
- overflow = Overflow} = State,
+ #state{waiting = Waiting, overflow = Overflow} = State,
case queue:out(Waiting) of
{{value, {{FromPid, _} = From, Timeout, StartTime}}, Left} ->
case wait_valid(StartTime, Timeout) of
true ->
- Ref1 = erlang:monitor(process, FromPid),
- true = ets:insert(Monitors, {Pid, Ref1}),
+ true = bind_consumer(Pid, FromPid, State),
gen_server:reply(From, Pid),
State#state{waiting = Left};
false ->
handle_checkin(Pid, State#state{waiting = Left})
end;
- {empty, Empty} when Overflow > 0 ->
- ok = dismiss_worker(Sup, Pid),
- State#state{waiting = Empty, overflow = Overflow - 1};
- {empty, Empty} ->
+ {empty, _} when Overflow > 0 ->
+ true = dismiss_worker(Pid, State),
+ State#state{overflow = Overflow - 1};
+ {empty, _} ->
Workers = queue:in(Pid, State#state.workers),
- State#state{workers = Workers, waiting = Empty, overflow = 0}
+ State#state{workers = Workers, overflow = 0}
end.
-handle_worker_exit(Pid, State) ->
- #state{supervisor = Sup,
- monitors = Monitors,
- overflow = Overflow} = State,
- case queue:out(State#state.waiting) of
- {{value, {{FromPid, _} = From, Timeout, StartTime}}, LeftWaiting} ->
+handle_worker_exit(DeadPid, State) ->
+ #state{waiting = Waiting, overflow = Overflow} = State,
+ %% XXX: optimize serveral ets:delete calls
+ true = ets:delete(State#state.monitors, DeadPid),
+ case queue:out(Waiting) of
+ {{value, {{FromPid, _} = From, Timeout, StartTime}}, Left} ->
case wait_valid(StartTime, Timeout) of
true ->
- MonitorRef = erlang:monitor(process, FromPid),
- NewWorker = new_worker(State#state.supervisor),
- true = ets:insert(Monitors, {NewWorker, MonitorRef}),
- gen_server:reply(From, NewWorker),
- State#state{waiting = LeftWaiting};
- _ ->
- handle_worker_exit(Pid, State#state{waiting = LeftWaiting})
+ case new_worker(State) of
+ {ok, Worker} ->
+ true = bind_consumer(Worker, FromPid, State),
+ gen_server:reply(From, Worker),
+ State#state{waiting = Left};
+ {wait, _} ->
+ State
+ end;
+ false ->
+ handle_worker_exit(DeadPid, State#state{waiting = Left})
end;
- {empty, Empty} when Overflow > 0 ->
- State#state{overflow = Overflow - 1, waiting = Empty};
- {empty, Empty} ->
- Workers = queue:in(
- new_worker(Sup),
- queue:filter(fun (P) -> P =/= Pid end, State#state.workers)
- ),
- State#state{workers = Workers, waiting = Empty}
+ {empty, _} when Overflow > 0 ->
+ State#state{overflow = Overflow - 1};
+ {empty, _} ->
+ case new_worker(State) of
+ {ok, Worker} ->
+ Workers = queue:in(Worker, State#state.workers),
+ State#state{workers = Workers};
+ {wait, _} ->
+ State
+ end
end.
state_name(State = #state{overflow = Overflow}) when Overflow < 1 ->
@@ -322,3 +397,19 @@ state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) ->
full;
state_name(_State) ->
overflow.
+
+-spec bind_consumer(Worker :: pid(), Consumer :: pid(), state()) -> true.
+bind_consumer(Worker, Consumer, #state{monitors = Monitors})
+ when is_pid(Worker), is_pid(Consumer) ->
+ Ref = erlang:monitor(process, Consumer),
+ true = ets:insert(Monitors, {Worker, Ref}).
+
+-spec unbind_consumer(pid(), reference(), state()) -> true.
+unbind_consumer(Worker, ConsumerRef, #state{monitors = Monitors})
+ when is_pid(Worker), is_reference(ConsumerRef) ->
+ true = erlang:demonitor(ConsumerRef),
+ true = ets:delete(Monitors, Worker).
+
+-spec inc_sleep(backoff()) -> backoff().
+inc_sleep(B = #backoff{sleep = Sleep, mfargs = {M, F, A}}) ->
+ B#backoff{sleep = apply(M, F, [Sleep | A])}.
View
20 test/poolboy_test_worker.erl
@@ -6,11 +6,25 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-start_link(_Args) ->
- gen_server:start_link(?MODULE, [], []).
+start_link(Args) ->
+ gen_server:start_link(?MODULE, Args, []).
init([]) ->
- {ok, undefined}.
+ {ok, undefined};
+init([{locker, Locker} | Args]) ->
+ %% this is used by backoff tests to emulate database(or other resource)
+ %% slownest
+ Locker ! {may_i_proceed, self()},
+ receive
+ ok ->
+ init(Args);
+ Else ->
+ {error, Else}
+ after 200 ->
+ {error, timeout}
+ end;
+init([_A | Args]) ->
+ init(Args).
handle_call(die, _From, State) ->
{stop, {error, died}, dead, State};
View
75 test/poolboy_tests.erl
@@ -53,6 +53,9 @@ pool_test_() ->
},
{<<"Pool returns status">>,
fun pool_returns_status/0
+ },
+ {<<"Can not spawn worker for a while">>,
+ fun backoff_cant_spawn_worker/0
}
]
}.
@@ -395,7 +398,71 @@ pool_returns_status() ->
?assertEqual({full, 0, 0, 0}, poolboy:status(Pool4)),
ok = ?sync(Pool4, stop).
-new_pool(Size, MaxOverflow) ->
- poolboy:start_link([{name, {local, poolboy_test}},
- {worker_module, poolboy_test_worker},
- {size, Size}, {max_overflow, MaxOverflow}]).
+backoff_cant_spawn_worker() ->
+ Self = self(),
+ {ok, Pool} = new_backoff_pool(0, 1, [{locker, self()}]),
+ Nums = lists:seq(1, 3),
+ lists:foreach(
+ fun(_) ->
+ %% first worker will be wpawned, others will be queued
+ spawn(fun() ->
+ Worker = poolboy:checkout(Pool, true, 150),
+ Self ! {got_worker, Worker},
+ checkin_worker(Pool, Worker)
+ end)
+ end, Nums
+ ),
+
+ receive {may_i_proceed, FailWorker} ->
+ %% we disallow to init worker this time
+ erlang:monitor(process, FailWorker),
+ FailWorker ! no,
+ receive
+ {'DOWN', _, process, FailWorker, _} ->
+ ?assert(true)
+ after 150 ->
+ ?assert(false)
+ end
+ after 150 ->
+ ?assert(false)
+ end,
+
+ receive {may_i_proceed, FutureWorker} ->
+ %% then after some timeout poolboy wil try to reinit a worker
+ ?assert(true),
+ FutureWorker ! ok,
+ %% after 1st spawned process got the worker and made checkin,
+ %% worker's Pid must be reused (other checkout calls were queued)
+ lists:foreach(
+ fun(_) ->
+ receive {got_worker, FutureWorker} ->
+ ?assert(true)
+ after 50 ->
+ ?assert(false)
+ end
+ end, Nums
+ )
+ after 200 ->
+ ?assert(false)
+ end,
+
+ ok = ?sync(Pool, stop).
+
+new_backoff_pool(Size, MaxOverflow, WorkerArgs) when is_list(WorkerArgs) ->
+ AddPoolArgs = [
+ {size, Size}, {max_overflow, MaxOverflow},
+ {backoff, {erlang, '+', [100]}}
+ ],
+ new_pool(AddPoolArgs, WorkerArgs).
+
+new_pool(Size, MaxOverflow) when is_integer(Size), is_integer(MaxOverflow) ->
+ AddPoolArgs = [{size, Size}, {max_overflow, MaxOverflow}],
+ new_pool(AddPoolArgs, []);
+
+new_pool(AddPoolArgs, WorkerArgs)
+ when is_list(AddPoolArgs), is_list(WorkerArgs) ->
+ PoolArgs = [
+ {name, {local, poolboy_test}},
+ {worker_module, poolboy_test_worker}
+ ],
+ poolboy:start_link(AddPoolArgs ++ PoolArgs, WorkerArgs).
Something went wrong with that request. Please try again.