Skip to content

Commit

Permalink
Bug fixes around sharing named counters
Browse files Browse the repository at this point in the history
- Regulators can be defined as top-level queue options, and are
  automatically moved into the {regulators, [...]} option.
- COUNTER_SAMPLE_INTERVAL is now infinity (rather than the arbitrary 2000)
- if queue goes from empty to non-empty, check the queue immediately
- always revisit queues when counters are returned. Previously, this
  only happened if the counter regulator was at Max, but with
  Incr > 1, this could mean that some jobs never get to execute.
- When revisiting queues, first sort them on latest_dispatch, to get
  more fair distribution.

- put an -ifdef(EQC) wrapper into the jobs_eqc_queue module
  • Loading branch information
uwiger committed Apr 15, 2012
1 parent 26e181a commit cd4135e
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 17 deletions.
2 changes: 1 addition & 1 deletion include/jobs.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
-define(COUNTER(Name), {c,l,{?MODULE,Name}}).
-define( AGGR(Name), {a,l,{?MODULE,Name}}).

-define(COUNTER_SAMPLE_INTERVAL, 20000). % UW: how was this value picked?
-define(COUNTER_SAMPLE_INTERVAL, infinity).

%% The jobs_server may, under certain circumstances, generate error reports
%% This value, in microseconds, defines the highest frequency with which
Expand Down
66 changes: 50 additions & 16 deletions src/jobs_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,25 @@ init_queue({Name, Action}, _S) when Action==approve; Action==reject ->
#queue{name = Name, type = {action, Action}};
init_queue({Name, producer, F, Opts}, S) ->
init_queue({Name, [{type, {producer, F}} | Opts]}, S);
init_queue({Name, Opts}, S) when is_list(Opts) ->
init_queue({Name, Opts0}, S) when is_list(Opts0) ->
%% Allow the regulators to be named at the top-level.
%% This makes it possible to write {q, [{counter, [{limit,1}]}]},
%% instead of {q, [{regulators, [{counter, [{limit,1}]}]}]}.
{Regs0, Opts} = lists:foldr(
fun(X, {R,O}) when is_tuple(X) ->
case lists:member(
element(1,X), [counter, rate,
named_counter,
group_rate]) of
true -> {[X|R], O};
false -> {R, [X|O]}
end;
(X, {R, O}) -> {R, [X|O]}
end, {[], []}, Opts0),
[ChkI, Regs] =
[get_value(K,Opts,D) ||
{K, D} <- [{check_interval,undefined},
{regulators, []}]],
{regulators, Regs0}]],
Q0 = q_new([{name,Name}|Opts]),
Q1 = init_regulators(Regs, Q0#queue{check_interval = ChkI}),
calculate_check_interval(Q1, S).
Expand Down Expand Up @@ -833,12 +847,17 @@ do_modify_counter_regulator(Name, Opts, #queue{regulators = Regs} = Q) ->
badarg
end.

job_queued(#queue{check_counter = Ctr} = Q, TS, S) ->
job_queued(#queue{check_counter = Ctr} = Q, PrevSz, TS, S) ->
case Ctr + 1 of
C when C > 10 ->
perform_queue_check(Q, TS, S);
C ->
update_queue(Q#queue{check_counter = C}, S)
Q1 = Q#queue{check_counter = C},
if PrevSz == 0 ->
perform_queue_check(Q1, TS, S);
true ->
update_queue(Q#queue{check_counter = C}, S)
end
end.

perform_queue_check(Q, TS, S) ->
Expand Down Expand Up @@ -945,7 +964,12 @@ update_regulators(Regs, Q0, S0) ->
{integer(), [any()]}.
%%
check_regulators(Regs, TS, #queue{latest_dispatch = TL}) ->
check_regulators(Regs, TS, TL, infinity, []).
case check_regulators(Regs, TS, TL, undefined, []) of
{undefined, _} ->
{0, []};
Other ->
Other
end.

check_regulators([R|Regs], TS, TL, N, Cs) ->
case R of
Expand Down Expand Up @@ -1048,24 +1072,26 @@ restore_counters(Cs, #st{} = S) ->
lists:foldl(fun restore_counter/2, {[], S}, Cs).

restore_counter({C, I}, {Revisit, #st{counters = Counters} = S}) ->
#cr{value = Val, queues = Qs, rate = #rate{limit = Max}} = CR =
#cr{value = Val, queues = Qs} = CR =
lists:keyfind(C, #cr.name, Counters),
CR1 = CR#cr{value = Val - I},
Counters1 = lists:keyreplace(C, #cr.name, Counters, CR1),
S1 = S#st{counters = Counters1},
if Val == Max, I > 0 ->
{union(Qs, Revisit), S1};
true ->
{Revisit, S1}
end.
{union(Qs, Revisit), S1}.

union(L1, L2) ->
(L1 -- L2) ++ L2.

revisit_queues(Qs, S) ->
[revisit_queue(Q) || Q <- Qs],
Expanded = [{Q, get_latest_dispatch(Q, S)} || Q <- Qs],
[revisit_queue(Q) || {Q,_} <- lists:keysort(2, Expanded)],
S.

get_latest_dispatch(Q, #st{queues = Qs}) ->
#queue{latest_dispatch = Tl} =
lists:keyfind(Q, #queue.name, Qs),
Tl.

revisit_queue(Qname) ->
self() ! {check_queue, Qname}.

Expand Down Expand Up @@ -1186,6 +1212,7 @@ apply_corr(Type, Corr, R) ->

get_rate(#rr {rate = R}) -> R;
get_rate(#cr {rate = R}) -> R;
get_rate({#cr {rate = R},_}) -> R;
get_rate(#grp{rate = R}) -> R.

set_rate(R, #rr {} = Reg) -> Reg#rr {rate = R};
Expand All @@ -1211,11 +1238,11 @@ queue_job(TS, From, #queue{max_size = MaxSz} = Q, S) ->
{OldJobs, Q1} ->
[timeout(J) || J <- OldJobs],
%% update_queue(q_in(TS, From, Q1), S)
job_queued(q_in(TS, From, Q1), TS, S)
job_queued(q_in(TS, From, Q1), CurSz, TS, S)
end;
true ->
%% update_queue(q_in(TS, From, Q), S)
job_queued(q_in(TS, From, Q), TS, S)
job_queued(q_in(TS, From, Q), CurSz, TS, S)
end.

do_enqueue(TS, Item, #queue{max_size = MaxSz} = Q, S) ->
Expand Down Expand Up @@ -1300,15 +1327,22 @@ q_in(TS, From, #queue{mod = Mod, oldest_job = OJ} = Q) ->
%%
next_time(_TS, #queue{oldest_job = undefined}) ->
undefined;
next_time(_TS, #queue{check_interval = infinity}) ->
undefined;
next_time(TS, #queue{latest_dispatch = TS1,
check_interval = I0}) ->
I = case I0 of
_ when is_number(I0) -> I0;
infinity -> undefined;
{M, F, As} ->
M:F(TS, TS1, As)
end,
Since = (TS - TS1) div 1000,
erlang:max(0, trunc(I - Since)).
if is_number(I0) ->
Since = (TS - TS1) div 1000,
erlang:max(0, trunc(I - Since));
true ->
undefined
end.


%% Microsecond timestamp; never wraps
Expand Down
3 changes: 3 additions & 0 deletions test/jobs_eqc_queue.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
-module(jobs_eqc_queue).

-ifdef(EQC).
-include_lib("eqc/include/eqc.hrl").
-include("jobs.hrl").

Expand Down Expand Up @@ -182,3 +183,5 @@ catching(F) ->

set_time(#model { time = T}) ->
meck:expect(jobs_lib, timestamp, fun() -> T end).

-endif.

0 comments on commit cd4135e

Please sign in to comment.