Permalink
Browse files

Monitor calling process during wait

  • Loading branch information...
1 parent c2c939b commit 3899c4052fb9974c7fbb75e62b8a914005d383ef @devinus committed Oct 25, 2013
Showing with 21 additions and 49 deletions.
  1. +21 −49 src/poolboy.erl
View
70 src/poolboy.erl
@@ -16,9 +16,9 @@
workers :: queue(),
waiting :: queue(),
monitors :: ets:tid(),
- size = 5 :: non_neg_integer(),
+ size = 0 :: non_neg_integer(),
overflow = 0 :: non_neg_integer(),
- max_overflow = 10 :: non_neg_integer()
+ max_overflow = 0 :: non_neg_integer()
}).
-spec checkout(Pool :: node()) -> pid().
@@ -31,12 +31,8 @@ checkout(Pool, Block) ->
-spec checkout(Pool :: node(), Block :: boolean(), Timeout :: timeout())
-> pid() | full.
-checkout(Pool, Block, infinity) ->
- gen_server:call(Pool, {checkout, Block, infinity}, infinity);
checkout(Pool, Block, Timeout) ->
- {MegaSecs, Secs, MicroSecs} = os:timestamp(),
- Deadline = {MegaSecs, Secs, MicroSecs + Timeout},
- gen_server:call(Pool, {checkout, Block, Deadline}, Timeout).
+ gen_server:call(Pool, {checkout, Block}, Timeout).
-spec checkin(Pool :: node(), Worker :: pid()) -> ok.
checkin(Pool, Worker) when is_pid(Worker) ->
@@ -134,7 +130,7 @@ handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) ->
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_call({checkout, Block, Deadline}, {FromPid, _} = From, State) ->
+handle_call({checkout, Block}, {FromPid, _} = From, State) ->
#state{supervisor = Sup,
workers = Workers,
monitors = Monitors,
@@ -152,7 +148,8 @@ handle_call({checkout, Block, Deadline}, {FromPid, _} = From, State) ->
{empty, Empty} when Block =:= false ->
{reply, full, State#state{workers = Empty}};
{empty, Empty} ->
- Waiting = add_waiting(From, Deadline, State#state.waiting),
+ Ref = erlang:monitor(process, FromPid),
+ Waiting = queue:in({From, Ref}, State#state.waiting),
{noreply, State#state{workers = Empty, waiting = Waiting}}
end;
@@ -174,27 +171,20 @@ handle_call(get_all_monitors, _From, State) ->
Monitors = ets:tab2list(State#state.monitors),
{reply, Monitors, State};
handle_call(stop, _From, State) ->
- Sup = State#state.supervisor,
- true = exit(Sup, shutdown),
+ true = exit(State#state.supervisor, shutdown),
{stop, normal, ok, State};
handle_call(_Msg, _From, State) ->
Reply = {error, invalid_message},
{reply, Reply, State}.
handle_info({'DOWN', Ref, _, _, _}, State) ->
+ Waiting = queue:filter(fun ({_, R}) -> R =/= Ref end, State#state.waiting),
case ets:match(State#state.monitors, {'$1', Ref}) of
[[Pid]] ->
- Sup = State#state.supervisor,
- ok = supervisor:terminate_child(Sup, Pid),
- %% Don't wait for the EXIT message to come in.
- %% Deal with the worker exit right now to avoid
- %% a race condition with messages waiting in the
- %% mailbox.
- true = ets:delete(State#state.monitors, Pid),
- NewState = handle_worker_exit(Pid, State),
+ NewState = handle_checkin(Pid, State#state{waiting = Waiting}),
{noreply, NewState};
[] ->
- {noreply, State}
+ {noreply, State#state{waiting = Waiting}}
end;
handle_info({'EXIT', Pid, _Reason}, State) ->
#state{supervisor = Sup,
@@ -256,30 +246,17 @@ prepopulate(0, _Sup, Workers) ->
prepopulate(N, Sup, Workers) ->
prepopulate(N-1, Sup, queue:in(new_worker(Sup), Workers)).
-add_waiting(Pid, Deadline, Queue) ->
- queue:in({Pid, Deadline}, Queue).
-
-past_deadline(infinity) ->
- false;
-past_deadline(Deadline) ->
- timer:now_diff(os:timestamp(), Deadline) < 0.
-
handle_checkin(Pid, State) ->
#state{supervisor = Sup,
waiting = Waiting,
monitors = Monitors,
overflow = Overflow} = State,
case queue:out(Waiting) of
- {{value, {{FromPid, _} = From, Deadline}}, Left} ->
- case past_deadline(Deadline) of
- false ->
- Ref1 = erlang:monitor(process, FromPid),
- true = ets:insert(Monitors, {Pid, Ref1}),
- gen_server:reply(From, Pid),
- State#state{waiting = Left};
- true ->
- handle_checkin(Pid, State#state{waiting = Left})
- end;
+ {{value, {{FromPid, _} = From, _}}, Left} ->
+ Ref = erlang:monitor(process, FromPid),
+ true = ets:insert(Monitors, {Pid, Ref}),
+ gen_server:reply(From, Pid),
+ State#state{waiting = Left};
{empty, Empty} when Overflow > 0 ->
ok = dismiss_worker(Sup, Pid),
State#state{waiting = Empty, overflow = Overflow - 1};
@@ -293,17 +270,12 @@ handle_worker_exit(Pid, State) ->
monitors = Monitors,
overflow = Overflow} = State,
case queue:out(State#state.waiting) of
- {{value, {{FromPid, _} = From, Deadline}}, LeftWaiting} ->
- case past_deadline(Deadline) of
- false ->
- MonitorRef = erlang:monitor(process, FromPid),
- NewWorker = new_worker(State#state.supervisor),
- true = ets:insert(Monitors, {NewWorker, MonitorRef}),
- gen_server:reply(From, NewWorker),
- State#state{waiting = LeftWaiting};
- true ->
- handle_worker_exit(Pid, State#state{waiting = LeftWaiting})
- end;
+ {{value, {{FromPid, _} = From, _}}, LeftWaiting} ->
+ MonitorRef = erlang:monitor(process, FromPid),
+ NewWorker = new_worker(State#state.supervisor),
+ true = ets:insert(Monitors, {NewWorker, MonitorRef}),
+ gen_server:reply(From, NewWorker),
+ State#state{waiting = LeftWaiting};
{empty, Empty} when Overflow > 0 ->
State#state{overflow = Overflow - 1, waiting = Empty};
{empty, Empty} ->

0 comments on commit 3899c40

Please sign in to comment.