Skip to content

Commit

Permalink
Merge pull request #34 from seth/of/queueing
Browse files Browse the repository at this point in the history
Queueing requestors with an external api
  • Loading branch information
oferrigni committed Oct 6, 2014
2 parents 3fa1c3f + 893f05f commit d0b0e6b
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 30 deletions.
156 changes: 131 additions & 25 deletions src/pooler.erl
Expand Up @@ -30,6 +30,7 @@
-export([accept_member/2,
start_link/1,
take_member/1,
take_member/2,
take_group_member/1,
return_group_member/2,
return_group_member/3,
Expand All @@ -40,7 +41,8 @@
new_pool/1,
pool_child_spec/1,
rm_pool/1,
rm_group/1]).
rm_group/1
]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
Expand Down Expand Up @@ -176,7 +178,19 @@ accept_member(PoolName, MemberPid) ->
%%
-spec take_member(atom() | pid()) -> pid() | error_no_members.
take_member(PoolName) when is_atom(PoolName) orelse is_pid(PoolName) ->
gen_server:call(PoolName, take_member, infinity).
gen_server:call(PoolName, {take_member, 0}, infinity).

%% @doc Obtain exclusive access to a member of 'PoolName'.
%%
%% If no members are available, wait for up to Timeout milliseconds for a member
%% to become available. Waiting requests are served in FIFO order. If no member
%% is available within the specified timeout, error_no_members is returned.
%% `Timeout' can be either milliseconds as integer or `{duration, time_unit}'
%%
-spec take_member(atom() | pid(), non_neg_integer() | time_spec()) -> pid() | error_no_members.
take_member(PoolName, Timeout) when is_atom(PoolName) orelse is_pid(PoolName) ->
gen_server:call(PoolName, {take_member, time_as_millis(Timeout)}, infinity).


%% @doc Take a member from a randomly selected member of the group
%% `GroupName'. Returns `MemberPid' or `error_no_members'. If no
Expand Down Expand Up @@ -297,9 +311,10 @@ init(#pool{}=Pool) ->

set_member_sup(#pool{} = Pool, MemberSup) ->
Pool#pool{member_sup = MemberSup}.
handle_call(take_member, {CPid, _Tag}, #pool{} = Pool) ->
{Member, NewPool} = take_member_from_pool(Pool, CPid),
{reply, Member, NewPool};

handle_call({take_member, Timeout}, From = {APid, _}, #pool{} = Pool) when is_pid(APid) ->
maybe_reply(take_member_from_pool_queued(Pool, From, Timeout));

handle_call({return_member, Pid, Status}, {_CPid, _Tag}, Pool) ->
{reply, ok, do_return_member(Pid, Status, Pool)};
handle_call({accept_member, Pid}, _From, Pool) ->
Expand All @@ -318,6 +333,14 @@ handle_cast(_Msg, Pool) ->
{noreply, Pool}.

-spec handle_info(_, _) -> {'noreply', _}.
handle_info({requestor_timeout, From}, Pool = #pool{ queued_requestors = RequestorQueue }) ->
NewQueue = queue:filter(fun({RequestorFrom, _TRef}) when RequestorFrom =:= From ->
gen_server:reply(RequestorFrom, error_no_members),
false;
({_, _}) ->
true
end, RequestorQueue),
{noreply, Pool#pool{ queued_requestors = NewQueue} };
handle_info(timeout, #pool{group = undefined} = Pool) ->
%% ignore
{noreply, Pool};
Expand Down Expand Up @@ -365,13 +388,12 @@ code_change(_OldVsn, State, _Extra) ->
do_accept_member({StarterPid, Pid},
#pool{
all_members = AllMembers,
free_pids = Free,
free_count = NumFree,
starting_members = StartingMembers0,
member_start_timeout = StartTimeout
} = Pool) when is_pid(Pid) ->
%% make sure we don't accept a timedout member
Pool1 = #pool{starting_members = StartingMembers}= remove_stale_starting_members(Pool, StartingMembers0, StartTimeout),
Pool1 = #pool{starting_members = StartingMembers} =
remove_stale_starting_members(Pool, StartingMembers0, StartTimeout),
case lists:keymember(StarterPid, 1, StartingMembers) of
false ->
%% A starter completed even though we invalidated the pid
Expand All @@ -388,21 +410,74 @@ do_accept_member({StarterPid, Pid},
Entry = {MRef, free, os:timestamp()},
AllMembers1 = store_all_members(Pid, Entry, AllMembers),
pooler_starter:stop(StarterPid),
Pool#pool{free_pids = Free ++ [Pid],
free_count = NumFree + 1,
all_members = AllMembers1,
starting_members = StartingMembers1}
maybe_reply_with_pid(Pid, Pool1#pool{all_members = AllMembers1,
starting_members = StartingMembers1})
end;
do_accept_member({StarterPid, _Reason}, #pool{starting_members = StartingMembers0,
member_start_timeout = StartTimeout} = Pool) ->
do_accept_member({StarterPid, _Reason},
#pool{starting_members = StartingMembers0,
member_start_timeout = StartTimeout} = Pool) ->
%% member start failed, remove in-flight ref and carry on.
pooler_starter:stop(StarterPid),
Pool1 = #pool{starting_members = StartingMembers} = remove_stale_starting_members(Pool, StartingMembers0,
StartTimeout),
Pool1 = #pool{starting_members = StartingMembers} =
remove_stale_starting_members(Pool, StartingMembers0,
StartTimeout),
StartingMembers1 = lists:keydelete(StarterPid, 1, StartingMembers),
Pool1#pool{starting_members = StartingMembers1}.


maybe_reply_with_pid(Pid,
Pool = #pool{queued_requestors = QueuedRequestors,
free_pids = Free,
free_count = NumFree}) when is_pid(Pid) ->
case queue:out(QueuedRequestors) of
{empty, _} ->
Pool#pool{free_pids = [Pid | Free],
free_count = NumFree + 1};
{{value, {From = {APid, _}, TRef}}, NewQueuedRequestors} when is_pid(APid) ->
timer:cancel(TRef),
Pool1 = take_member_bookkeeping(Pid, From, NewQueuedRequestors, Pool),
send_metric(Pool, in_use_count, Pool1#pool.in_use_count, histogram),
send_metric(Pool, free_count, Pool1#pool.free_count, histogram),
send_metric(Pool, events, error_no_members, history),
gen_server:reply(From, Pid),
Pool1
end.

-spec take_member_bookkeeping(pid(),
{pid(), _},
[pid()] | p_requestor_queue(),
#pool{}) -> #pool{}.
take_member_bookkeeping(MemberPid,
{CPid, _},
Rest,
Pool = #pool{in_use_count = NumInUse,
free_count = NumFree,
consumer_to_pid = CPMap,
all_members = AllMembers})
when is_pid(MemberPid),
is_pid(CPid),
is_list(Rest) ->
Pool#pool{free_pids = Rest,
in_use_count = NumInUse + 1,
free_count = NumFree - 1,
consumer_to_pid = add_member_to_consumer(MemberPid, CPid, CPMap),
all_members = set_cpid_for_member(MemberPid, CPid, AllMembers)
};
take_member_bookkeeping(MemberPid,
{ReplyPid, _Tag},
NewQueuedRequestors,
Pool = #pool{
in_use_count = NumInUse,
all_members = AllMembers,
consumer_to_pid = CPMap
}) ->
Pool#pool{
in_use_count = NumInUse + 1,
all_members = set_cpid_for_member(MemberPid, ReplyPid, AllMembers),
consumer_to_pid = add_member_to_consumer(MemberPid, ReplyPid, CPMap),
queued_requestors = NewQueuedRequestors
}.

-spec remove_stale_starting_members(#pool{}, [{reference(), erlang:timestamp()}],
time_spec()) -> #pool{}.
remove_stale_starting_members(Pool, StartingMembers, MaxAge) ->
Expand Down Expand Up @@ -458,7 +533,6 @@ take_member_from_pool(#pool{init_count = InitCount,
free_pids = Free,
in_use_count = NumInUse,
free_count = NumFree,
consumer_to_pid = CPMap,
starting_members = StartingMembers,
member_start_timeout = StartTimeout} = Pool,
From) ->
Expand All @@ -485,15 +559,31 @@ take_member_from_pool(#pool{init_count = InitCount,
send_metric(Pool, events, error_no_members, history),
{error_no_members, Pool2};
[Pid|Rest] ->
Pool2 = Pool1#pool{free_pids = Rest, in_use_count = NumInUse + 1,
free_count = NumFree - 1},
Pool2 = take_member_bookkeeping(Pid, From, Rest, Pool1),
send_metric(Pool, in_use_count, Pool2#pool.in_use_count, histogram),
send_metric(Pool, free_count, Pool2#pool.free_count, histogram),
{Pid, Pool2#pool{
consumer_to_pid = add_member_to_consumer(Pid, From, CPMap),
all_members = set_cpid_for_member(Pid, From,
Pool2#pool.all_members)
}}
{Pid, Pool2}
end.

-spec take_member_from_pool_queued(#pool{},
{pid(), _},
non_neg_integer()) ->
{error_no_members | queued | pid(), #pool{}}.
take_member_from_pool_queued(Pool0 = #pool{queue_max = QMax,
queued_requestors = Requestors},
From = {CPid, _},
Timeout) when is_pid(CPid) ->
case {take_member_from_pool(Pool0, From), queue:len(Requestors)} of
{{error_no_members, Pool1}, QLen} when QLen >= QMax ->
send_metric(Pool1, events, error_no_members, history),
send_metric(Pool1, queue_max_reached, {inc, 1}, counter),
{error_no_members, Pool1};
{{error_no_members, Pool1 = #pool{queued_requestors = QueuedRequestors}}, QueueCount} ->
{ok, TRef} = timer:send_after(Timeout, {requestor_timeout, From}),
send_metric(Pool1, queue_count, QueueCount, histogram),
{queued, Pool1#pool{queued_requestors = queue:in({From, TRef}, QueuedRequestors)}};
{{Member, NewPool}, _} when is_pid(Member) ->
{Member, NewPool}
end.

%% @doc Add `Count' members to `Pool' asynchronously. Returns updated
Expand Down Expand Up @@ -737,7 +827,11 @@ time_as_secs({Time, Unit}) ->
-spec time_as_millis(time_spec()) -> non_neg_integer().
%% @doc Convert time unit into milliseconds.
time_as_millis({Time, Unit}) ->
time_as_micros({Time, Unit}) div 1000.
time_as_micros({Time, Unit}) div 1000;
%% Allows blind convert
time_as_millis(Time) when is_integer(Time) ->
Time.


-spec time_as_micros(time_spec()) -> non_neg_integer().
%% @doc Convert time unit into microseconds
Expand All @@ -752,3 +846,15 @@ time_as_micros({Time, mu}) ->

secs_between({Mega1, Secs1, _}, {Mega2, Secs2, _}) ->
(Mega2 - Mega1) * 1000000 + (Secs2 - Secs1).

-spec maybe_reply({'queued' | 'error_no_members' | pid(), #pool{}}) ->
{noreply, #pool{}} | {reply, 'error_no_members' | pid(), #pool{}}.
maybe_reply({Member, NewPool}) ->
case Member of
queued ->
{noreply, NewPool};
error_no_members ->
{reply, error_no_members, NewPool};
Member when is_pid(Member) ->
{reply, Member, NewPool}
end.
10 changes: 9 additions & 1 deletion src/pooler.hrl
Expand Up @@ -3,6 +3,7 @@
-define(DEFAULT_MAX_AGE, {30, sec}).
-define(DEFAULT_MEMBER_START_TIMEOUT, {1, min}).
-define(POOLER_GROUP_TABLE, pooler_group_table).
-define(DEFAULT_POOLER_QUEUE_MAX, 50).

-type member_info() :: {string(), free | pid(), {_, _, _}}.
-type free_member_info() :: {string(), free, {_, _, _}}.
Expand All @@ -11,8 +12,10 @@

-ifdef(namespaced_types).
-type p_dict() :: dict:dict().
-type p_requestor_queue() :: queue:queue({{pid(), _}, timer:tref()}).
-else.
-type p_dict() :: dict().
-type p_requestor_queue() :: queue().
-endif.

-record(pool, {
Expand Down Expand Up @@ -78,7 +81,12 @@

%% The API used to call the metrics system. It supports both Folsom
%% and Exometer format.
metrics_api = folsom :: 'folsom' | 'exometer'
metrics_api = folsom :: 'folsom' | 'exometer',

%% A queue of requestors for blocking take member requests
queued_requestors = queue:new() :: p_requestor_queue(),
%% The max depth of the queue
queue_max = 50
}).

-define(gv(X, Y), proplists:get_value(X, Y)).
Expand Down
3 changes: 2 additions & 1 deletion src/pooler_config.erl
Expand Up @@ -21,7 +21,8 @@ list_to_pool(P) ->
max_age = ?gv(max_age, P, ?DEFAULT_MAX_AGE),
member_start_timeout = ?gv(member_start_timeout, P, ?DEFAULT_MEMBER_START_TIMEOUT),
metrics_mod = ?gv(metrics_mod, P, pooler_no_metrics),
metrics_api = ?gv(metrics_api, P, folsom)}.
metrics_api = ?gv(metrics_api, P, folsom),
queue_max = ?gv(queue_max, P, ?DEFAULT_POOLER_QUEUE_MAX)}.

%% Return `Value' for `Key' in proplist `P' or crashes with an
%% informative message if no value is found.
Expand Down

0 comments on commit d0b0e6b

Please sign in to comment.