Skip to content

Commit 3d3f4f2

Browse files
nickvajaydoane
andauthored
Remove multi_workers option from couch_work_queue (#5414)
* Remove multi_workers option from couch_work_queue Nothing was using it so let's remove it. "Deleted code is debugged code" as they say. It turns out its unit tests never covered the actual use case of the work queue -- sending terms back and form, it just tested binaries. So add a few tests for terms and check some other error cases, thus increasing test coverage to 100%. * Update src/couch/src/couch_work_queue.erl Co-authored-by: Jay Doane <jaydoane@apache.org> --------- Co-authored-by: Jay Doane <jaydoane@apache.org>
1 parent 06c88fb commit 3d3f4f2

File tree

2 files changed

+99
-82
lines changed

2 files changed

+99
-82
lines changed

src/couch/src/couch_work_queue.erl

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,8 @@
2929
max_items,
3030
items = 0,
3131
size = 0,
32-
work_waiters = [],
33-
close_on_dequeue = false,
34-
multi_workers = false
32+
worker = undefined,
33+
close_on_dequeue = false
3534
}).
3635

3736
new(Options) ->
@@ -72,15 +71,17 @@ close(Wq) ->
7271
init(Options) ->
7372
Q = #q{
7473
max_size = couch_util:get_value(max_size, Options, nil),
75-
max_items = couch_util:get_value(max_items, Options, nil),
76-
multi_workers = couch_util:get_value(multi_workers, Options, false)
74+
max_items = couch_util:get_value(max_items, Options, nil)
7775
},
7876
{ok, Q, hibernate}.
7977

80-
terminate(_Reason, #q{work_waiters = Workers}) ->
81-
lists:foreach(fun({W, _}) -> gen_server:reply(W, closed) end, Workers).
78+
terminate(_Reason, #q{worker = undefined}) ->
79+
ok;
80+
terminate(_Reason, #q{worker = {W, _}}) ->
81+
gen_server:reply(W, closed),
82+
ok.
8283

83-
handle_call({queue, Item, Size}, From, #q{work_waiters = []} = Q0) ->
84+
handle_call({queue, Item, Size}, From, #q{worker = undefined} = Q0) ->
8485
Q = Q0#q{
8586
size = Q0#q.size + Size,
8687
items = Q0#q.items + 1,
@@ -95,23 +96,20 @@ handle_call({queue, Item, Size}, From, #q{work_waiters = []} = Q0) ->
9596
false ->
9697
{reply, ok, Q, hibernate}
9798
end;
98-
handle_call({queue, Item, _}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) ->
99+
handle_call({queue, Item, _}, _From, #q{worker = {W, _Max}} = Q) ->
99100
gen_server:reply(W, {ok, [Item]}),
100-
{reply, ok, Q#q{work_waiters = Rest}, hibernate};
101-
handle_call({dequeue, Max}, From, Q) ->
102-
#q{work_waiters = Workers, multi_workers = Multi, items = Count} = Q,
103-
case {Workers, Multi} of
104-
{[_ | _], false} ->
105-
exit("Only one caller allowed to wait for this work at a time");
106-
{[_ | _], true} ->
107-
{noreply, Q#q{work_waiters = Workers ++ [{From, Max}]}};
108-
_ ->
109-
case Count of
110-
0 ->
111-
{noreply, Q#q{work_waiters = Workers ++ [{From, Max}]}};
112-
C when C > 0 ->
113-
deliver_queue_items(Max, Q)
114-
end
101+
{reply, ok, Q#q{worker = undefined}, hibernate};
102+
handle_call({dequeue, _Max}, _From, #q{worker = {_, _}}) ->
103+
% Something went wrong - the same or a different worker is
104+
% trying to dequeue an item. We only allow one worker to wait
105+
% for work at a time, so we exit with an error.
106+
exit(multiple_workers_error);
107+
handle_call({dequeue, Max}, From, #q{worker = undefined, items = Count} = Q) ->
108+
case Count of
109+
0 ->
110+
{noreply, Q#q{worker = {From, Max}}};
111+
C when C > 0 ->
112+
deliver_queue_items(Max, Q)
115113
end;
116114
handle_call(item_count, _From, Q) ->
117115
{reply, Q#q.items, Q};

src/couch/test/eunit/couch_work_queue_tests.erl

Lines changed: 77 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -31,27 +31,12 @@ setup_max_size() ->
3131
setup_max_items_and_size() ->
3232
setup([{max_size, 160}, {max_items, 3}]).
3333

34-
setup_multi_workers() ->
35-
{Q, Producer, Consumer1} = setup([
36-
{max_size, 160},
37-
{max_items, 3},
38-
{multi_workers, true}
39-
]),
40-
Consumer2 = spawn_consumer(Q),
41-
Consumer3 = spawn_consumer(Q),
42-
{Q, Producer, [Consumer1, Consumer2, Consumer3]}.
43-
44-
teardown({Q, Producer, Consumers}) when is_list(Consumers) ->
34+
teardown({Q, Producer, Consumer}) ->
4535
% consume all to unblock and let producer/consumer stop without timeout
46-
[consume(Consumer, all) || Consumer <- Consumers],
47-
36+
consume(Consumer, all),
4837
ok = close_queue(Q),
4938
ok = stop(Producer, "producer"),
50-
R = [stop(Consumer, "consumer") || Consumer <- Consumers],
51-
R = [ok || _ <- Consumers],
52-
ok;
53-
teardown({Q, Producer, Consumer}) ->
54-
teardown({Q, Producer, [Consumer]}).
39+
ok = stop(Consumer, "consumer").
5540

5641
single_consumer_test_() ->
5742
{
@@ -87,29 +72,16 @@ single_consumer_test_() ->
8772
]
8873
}.
8974

90-
multiple_consumers_test_() ->
91-
{
92-
"Single producer and multiple consumers",
93-
[
94-
{
95-
"Queue with max size of 160 bytes and 3 max items",
96-
{
97-
foreach,
98-
fun setup_multi_workers/0,
99-
fun teardown/1,
100-
common_cases() ++ multiple_consumers()
101-
}
102-
}
103-
]
104-
}.
105-
10675
common_cases() ->
10776
[
10877
fun should_block_consumer_on_dequeue_from_empty_queue/1,
10978
fun should_consume_right_item/1,
11079
fun should_timeout_on_close_non_empty_queue/1,
11180
fun should_not_block_producer_for_non_empty_queue_after_close/1,
112-
fun should_be_closed/1
81+
fun should_be_closed/1,
82+
fun should_produce_consume_any_term/1,
83+
fun should_crash_on_multiple_consumers/1,
84+
fun should_crash_on_random_messages/1
11385
].
11486

11587
single_consumer_max_item_count() ->
@@ -134,13 +106,6 @@ single_consumer_max_size() ->
134106
single_consumer_max_items_and_size() ->
135107
single_consumer_max_item_count() ++ single_consumer_max_size().
136108

137-
multiple_consumers() ->
138-
[
139-
fun should_have_zero_size_for_new_queue/1,
140-
fun should_have_no_items_for_new_queue/1,
141-
fun should_increase_queue_size_on_produce/1
142-
].
143-
144109
should_have_no_items_for_new_queue({Q, _, _}) ->
145110
?_assertEqual(0, couch_work_queue:item_count(Q)).
146111

@@ -279,19 +244,6 @@ should_not_block_producer_for_non_empty_queue_after_close({Q, Producer, _}) ->
279244

280245
?_assertEqual({ok, 1, 1}, {Pong, Size, Count}).
281246

282-
should_be_closed({Q, _, Consumers}) when is_list(Consumers) ->
283-
ok = close_queue(Q),
284-
285-
[consume(C, 1) || C <- Consumers],
286-
287-
LastConsumerItems = [last_consumer_items(C) || C <- Consumers],
288-
ItemsCount = couch_work_queue:item_count(Q),
289-
Size = couch_work_queue:size(Q),
290-
291-
?_assertEqual(
292-
{[closed, closed, closed], closed, closed},
293-
{LastConsumerItems, ItemsCount, Size}
294-
);
295247
should_be_closed({Q, _, Consumer}) ->
296248
ok = close_queue(Q),
297249

@@ -306,6 +258,46 @@ should_be_closed({Q, _, Consumer}) ->
306258
{LastConsumerItems, ItemsCount, Size}
307259
).
308260

261+
should_produce_consume_any_term({Q, Producer, Consumer}) ->
262+
Potato = produce_term(Q, Producer, potato, false),
263+
Tomato = produce_term(Q, Producer, {tomato}, false),
264+
Carrot = produce_term(Q, Producer, [carrot], false),
265+
266+
consume(Consumer, 2),
267+
{ok, Items1} = last_consumer_items(Consumer),
268+
?_assertEqual([Potato, Tomato], Items1),
269+
270+
consume(Consumer, 1),
271+
{ok, Items2} = last_consumer_items(Consumer),
272+
?_assertEqual([Carrot], Items2).
273+
274+
should_crash_on_multiple_consumers({_Q, _Producer, _Consumer}) ->
275+
% Do not want to crash the test, so set up a new Q process
276+
{ok, Q} = couch_work_queue:new([]),
277+
unlink(Q),
278+
Ref = monitor(process, Q),
279+
Pid1 = spawn(fun() -> couch_work_queue:dequeue(Q, 1) end),
280+
Pid2 = spawn(fun() -> couch_work_queue:dequeue(Q, 1) end),
281+
Reason =
282+
receive
283+
{'DOWN', Ref, _, _, Res} -> Res
284+
end,
285+
exit(Pid1, kill),
286+
exit(Pid2, kill),
287+
?_assertEqual(multiple_workers_error, Reason).
288+
289+
should_crash_on_random_messages({_Q, _Producer, _Consumer}) ->
290+
% Do not want to crash the test, so set up a new Q process
291+
{ok, Q} = couch_work_queue:new([]),
292+
unlink(Q),
293+
Ref = monitor(process, Q),
294+
Q ! eggplant,
295+
Reason =
296+
receive
297+
{'DOWN', Ref, _, _, Res} -> Res
298+
end,
299+
?_assertEqual(eggplant, Reason).
300+
309301
close_queue(Q) ->
310302
test_util:stop_sync(
311303
Q,
@@ -329,7 +321,10 @@ consumer_loop(Parent, Q, PrevItem) ->
329321
{last_item, Ref} ->
330322
Parent ! {item, Ref, PrevItem},
331323
consumer_loop(Parent, Q, PrevItem);
332-
{consume, N} ->
324+
{consume, all} ->
325+
Result = couch_work_queue:dequeue(Q),
326+
consumer_loop(Parent, Q, Result);
327+
{consume, N} when is_integer(N) ->
333328
Result = couch_work_queue:dequeue(Q, N),
334329
consumer_loop(Parent, Q, Result)
335330
end.
@@ -345,7 +340,11 @@ producer_loop(Parent, Q) ->
345340
{ping, Ref} ->
346341
Parent ! {pong, Ref},
347342
producer_loop(Parent, Q);
348-
{produce, Ref, Size} ->
343+
{produce_term, Ref, Term} ->
344+
Parent ! {item, Ref, Term},
345+
ok = couch_work_queue:queue(Q, Term),
346+
producer_loop(Parent, Q);
347+
{produce_binary, Ref, Size} ->
349348
Item = crypto:strong_rand_bytes(Size),
350349
Parent ! {item, Ref, Item},
351350
ok = couch_work_queue:queue(Q, Item),
@@ -365,10 +364,30 @@ last_consumer_items(Consumer) ->
365364
timeout
366365
end.
367366

367+
produce_term(Q, Producer, Term, Wait) ->
368+
Ref = make_ref(),
369+
ItemsCount = couch_work_queue:item_count(Q),
370+
Producer ! {produce_term, Ref, Term},
371+
receive
372+
{item, Ref, Item} when Wait ->
373+
ok = wait_increment(Q, ItemsCount),
374+
Item;
375+
{item, Ref, Item} ->
376+
Item
377+
after ?TIMEOUT ->
378+
erlang:error(
379+
{assertion_failed, [
380+
{module, ?MODULE},
381+
{line, ?LINE},
382+
{reason, "Timeout asking producer to produce a term"}
383+
]}
384+
)
385+
end.
386+
368387
produce(Q, Producer, Size, Wait) ->
369388
Ref = make_ref(),
370389
ItemsCount = couch_work_queue:item_count(Q),
371-
Producer ! {produce, Ref, Size},
390+
Producer ! {produce_binary, Ref, Size},
372391
receive
373392
{item, Ref, Item} when Wait ->
374393
ok = wait_increment(Q, ItemsCount),

0 commit comments

Comments
 (0)