Skip to content
This repository
  • 2 commits
  • 3 files changed
  • 0 comments
  • 1 contributor
Apr 15, 2012
Ulf Wiger Bug fixes around sharing named counters
- Regulators can be defined as top-level queue options, and are
  automatically moved into the {regulators, [...]} option.
- COUNTER_SAMPLE_INTERVAL is now infinity (rather than the arbitrary 2000)
- if queue goes from empty to non-empty, check the queue immediately
- always revisit queues when counters are returned. Previously, this
  only happened if the counter regulator was at Max, but with
  Incr > 1, this could mean that some jobs never get to execute.
- When revisiting queues, first sort them on latest_dispatch, to get
  more fair distribution.

- put an -ifdef(EQC) wrapper into the jobs_eqc_queue module
cd4135e
Ulf Wiger fix jobs_server:q_out(infinity,Q) 72f7a21
2  include/jobs.hrl
@@ -133,7 +133,7 @@
133 133
 -define(COUNTER(Name), {c,l,{?MODULE,Name}}).
134 134
 -define(   AGGR(Name), {a,l,{?MODULE,Name}}).
135 135
 
136  
--define(COUNTER_SAMPLE_INTERVAL, 20000).  % UW: how was this value picked?
  136
+-define(COUNTER_SAMPLE_INTERVAL, infinity).
137 137
 
138 138
 %% The jobs_server may, under certain circumstances, generate error reports
139 139
 %% This value, in microseconds, defines the highest frequency with which 
72  src/jobs_server.erl
@@ -317,11 +317,25 @@ init_queue({Name, Action}, _S) when Action==approve; Action==reject ->
317 317
     #queue{name = Name, type = {action, Action}};
318 318
 init_queue({Name, producer, F, Opts}, S) ->
319 319
     init_queue({Name, [{type, {producer, F}} | Opts]}, S);
320  
-init_queue({Name, Opts}, S) when is_list(Opts) ->
  320
+init_queue({Name, Opts0}, S) when is_list(Opts0) ->
  321
+    %% Allow the regulators to be named at the top-level.
  322
+    %% This makes it possible to write {q, [{counter, [{limit,1}]}]},
  323
+    %% instead of {q, [{regulators, [{counter, [{limit,1}]}]}]}.
  324
+    {Regs0, Opts} = lists:foldr(
  325
+		       fun(X, {R,O}) when is_tuple(X) ->
  326
+			       case lists:member(
  327
+				      element(1,X), [counter, rate,
  328
+						     named_counter,
  329
+						     group_rate]) of
  330
+				   true  -> {[X|R], O};
  331
+				   false -> {R, [X|O]}
  332
+			       end;
  333
+			  (X, {R, O}) -> {R, [X|O]}
  334
+		       end, {[], []}, Opts0),
321 335
     [ChkI, Regs] =
322 336
         [get_value(K,Opts,D) ||
323 337
             {K, D} <- [{check_interval,undefined},
324  
-                       {regulators, []}]],
  338
+                       {regulators, Regs0}]],
325 339
     Q0 = q_new([{name,Name}|Opts]),
326 340
     Q1 = init_regulators(Regs, Q0#queue{check_interval = ChkI}),
327 341
     calculate_check_interval(Q1, S).
@@ -833,12 +847,17 @@ do_modify_counter_regulator(Name, Opts, #queue{regulators = Regs} = Q) ->
833 847
             badarg
834 848
     end.
835 849
 
836  
-job_queued(#queue{check_counter = Ctr} = Q, TS, S) ->
  850
+job_queued(#queue{check_counter = Ctr} = Q, PrevSz, TS, S) ->
837 851
     case Ctr + 1 of
838 852
 	C when C > 10 ->
839 853
 	    perform_queue_check(Q, TS, S);
840 854
 	C ->
841  
-	    update_queue(Q#queue{check_counter = C}, S)
  855
+	    Q1 = Q#queue{check_counter = C},
  856
+	    if PrevSz == 0 ->
  857
+		    perform_queue_check(Q1, TS, S);
  858
+	       true ->
  859
+		    update_queue(Q#queue{check_counter = C}, S)
  860
+	    end
842 861
     end.
843 862
 
844 863
 perform_queue_check(Q, TS, S) ->
@@ -945,7 +964,12 @@ update_regulators(Regs, Q0, S0) ->
945 964
 			      {integer(), [any()]}.
946 965
 %%
947 966
 check_regulators(Regs, TS, #queue{latest_dispatch = TL}) ->
948  
-    check_regulators(Regs, TS, TL, infinity, []).
  967
+    case check_regulators(Regs, TS, TL, undefined, []) of
  968
+	{undefined, _} ->
  969
+	    {0, []};
  970
+	Other ->
  971
+	    Other
  972
+    end.
949 973
 
950 974
 check_regulators([R|Regs], TS, TL, N, Cs) ->
951 975
     case R of
@@ -1048,24 +1072,26 @@ restore_counters(Cs, #st{} = S) ->
1048 1072
     lists:foldl(fun restore_counter/2, {[], S}, Cs).
1049 1073
 
1050 1074
 restore_counter({C, I}, {Revisit, #st{counters = Counters} = S}) ->
1051  
-    #cr{value = Val, queues = Qs, rate = #rate{limit = Max}} = CR =
  1075
+    #cr{value = Val, queues = Qs} = CR =
1052 1076
 	lists:keyfind(C, #cr.name, Counters),
1053 1077
     CR1 = CR#cr{value = Val - I},
1054 1078
     Counters1 = lists:keyreplace(C, #cr.name, Counters, CR1),
1055 1079
     S1 = S#st{counters = Counters1},
1056  
-    if Val == Max, I > 0 ->
1057  
-	    {union(Qs, Revisit), S1};
1058  
-       true ->
1059  
-	    {Revisit, S1}
1060  
-    end.
  1080
+    {union(Qs, Revisit), S1}. 
1061 1081
 
1062 1082
 union(L1, L2) ->
1063 1083
     (L1 -- L2) ++ L2.
1064 1084
 
1065 1085
 revisit_queues(Qs, S) ->
1066  
-    [revisit_queue(Q) || Q <- Qs],
  1086
+    Expanded = [{Q, get_latest_dispatch(Q, S)} || Q <- Qs],
  1087
+    [revisit_queue(Q) || {Q,_} <- lists:keysort(2, Expanded)],
1067 1088
     S.
1068 1089
 
  1090
+get_latest_dispatch(Q, #st{queues = Qs}) ->
  1091
+    #queue{latest_dispatch = Tl} =
  1092
+	lists:keyfind(Q, #queue.name, Qs),
  1093
+    Tl.
  1094
+
1069 1095
 revisit_queue(Qname) ->
1070 1096
     self() ! {check_queue, Qname}.
1071 1097
 
@@ -1186,6 +1212,7 @@ apply_corr(Type, Corr, R) ->
1186 1212
 
1187 1213
 get_rate(#rr {rate = R}) -> R;
1188 1214
 get_rate(#cr {rate = R}) -> R;
  1215
+get_rate({#cr {rate = R},_}) -> R;
1189 1216
 get_rate(#grp{rate = R}) -> R.
1190 1217
 
1191 1218
 set_rate(R, #rr {} = Reg) -> Reg#rr {rate = R};
@@ -1211,11 +1238,11 @@ queue_job(TS, From, #queue{max_size = MaxSz} = Q, S) ->
1211 1238
                 {OldJobs, Q1} ->
1212 1239
                     [timeout(J) || J <- OldJobs],
1213 1240
                     %% update_queue(q_in(TS, From, Q1), S)
1214  
-                    job_queued(q_in(TS, From, Q1), TS, S)
  1241
+                    job_queued(q_in(TS, From, Q1), CurSz, TS, S)
1215 1242
             end;
1216 1243
        true ->
1217 1244
             %% update_queue(q_in(TS, From, Q), S)
1218  
-            job_queued(q_in(TS, From, Q), TS, S)
  1245
+            job_queued(q_in(TS, From, Q), CurSz, TS, S)
1219 1246
     end.
1220 1247
 
1221 1248
 do_enqueue(TS, Item, #queue{max_size = MaxSz} = Q, S) ->
@@ -1283,9 +1310,9 @@ q_delete  (#queue{mod = Mod} = Q)     -> Mod:delete  (Q).
1283 1310
 %q_is_empty(#queue{type = #producer{}}) -> false;
1284 1311
 q_is_empty(#queue{mod = Mod} = Q)       -> Mod:is_empty(Q).
1285 1312
 %%
1286  
-q_out     (infinity, #queue{mod = Mod} = Q)  -> Mod:all (Q);
1287  
-q_out     (N , #queue{mod = Mod} = Q) -> Mod:out     (N, Q).
1288  
-q_info    (I , #queue{mod = Mod} = Q) -> Mod:info    (I, Q).
  1313
+q_out(infinity, #queue{mod = Mod} = Q) -> Mod:out(q_info(length, Q), Q);
  1314
+q_out      (N , #queue{mod = Mod} = Q) -> Mod:out     (N, Q).
  1315
+q_info     (I , #queue{mod = Mod} = Q) -> Mod:info    (I, Q).
1289 1316
 %%
1290 1317
 q_in(TS, From, #queue{mod = Mod, oldest_job = OJ} = Q) ->
1291 1318
     OJ1 = erlang:min(TS, OJ),    % Works even if OJ==undefined
@@ -1300,15 +1327,22 @@ q_in(TS, From, #queue{mod = Mod, oldest_job = OJ} = Q) ->
1300 1327
 %%
1301 1328
 next_time(_TS, #queue{oldest_job = undefined}) ->
1302 1329
     undefined;
  1330
+next_time(_TS, #queue{check_interval = infinity}) ->
  1331
+    undefined;
1303 1332
 next_time(TS, #queue{latest_dispatch = TS1,
1304 1333
 		     check_interval = I0}) ->
1305 1334
     I = case I0 of
1306 1335
 	    _ when is_number(I0) -> I0;
  1336
+	    infinity -> undefined;
1307 1337
 	    {M, F, As} ->
1308 1338
 		M:F(TS, TS1, As)
1309 1339
 	end,
1310  
-    Since = (TS - TS1) div 1000,
1311  
-    erlang:max(0, trunc(I - Since)).
  1340
+    if is_number(I0) ->
  1341
+	    Since = (TS - TS1) div 1000,
  1342
+	    erlang:max(0, trunc(I - Since));
  1343
+       true ->
  1344
+	    undefined
  1345
+    end.
1312 1346
 
1313 1347
 
1314 1348
 %% Microsecond timestamp; never wraps
3  test/jobs_eqc_queue.erl
... ...
@@ -1,5 +1,6 @@
1 1
 -module(jobs_eqc_queue).
2 2
 
  3
+-ifdef(EQC).
3 4
 -include_lib("eqc/include/eqc.hrl").
4 5
 -include("jobs.hrl").
5 6
 
@@ -182,3 +183,5 @@ catching(F) ->
182 183
 
183 184
 set_time(#model { time = T}) ->
184 185
     meck:expect(jobs_lib, timestamp, fun() -> T end).
  186
+
  187
+-endif.

No commit comments for this range

Something went wrong with that request. Please try again.