Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Return items' byte size on work queue dequeue

Change-Id: Ied08c23c76db2e5e2a06b3079854b4f7d948273d
Reviewed-on: http://review.couchbase.org/12408
Reviewed-by: Damien Katz <damien@couchbase.com>
Tested-by: Filipe David Borba Manana <fdmanana@gmail.com>
  • Loading branch information...
commit 05e25bdc012127122e9c91619294cf1c3eadd891 1 parent 1dc4fc4
@fdmanana fdmanana authored fdmanana committed
View
6 src/couch_set_view/src/couch_set_view_updater.erl
@@ -288,7 +288,7 @@ do_maps(#set_view_group{query_server = Qs} = Group, MapQueue, WriteQueue) ->
closed ->
couch_work_queue:close(WriteQueue),
couch_query_servers:stop_doc_map(Group#set_view_group.query_server);
- {ok, Queue} ->
+ {ok, Queue, _QueueSize} ->
lists:foreach(
fun({Seq, #doc{id = Id, deleted = true}, PartitionId}) ->
Item = {Seq, Id, PartitionId, []},
@@ -310,7 +310,7 @@ do_batched_maps(#set_view_group{query_server = Qs} = Group, MapQueue, WriteQueue
compute_map_results(Group, WriteQueue, Acc),
couch_work_queue:close(WriteQueue),
couch_query_servers:stop_doc_map(Qs);
- {ok, Queue} ->
+ {ok, Queue, _QueueSize} ->
Acc2 = Acc ++ Queue,
case length(Acc2) >= ?MIN_MAP_BATCH_SIZE of
true ->
@@ -350,7 +350,7 @@ do_writes(#writer_acc{kvs = Kvs, write_queue = WriteQueue} = Acc) ->
closed ->
#writer_acc{group = NewGroup} = flush_writes(Acc#writer_acc{final_batch = true}),
NewGroup;
- {ok, Queue} ->
+ {ok, Queue, _QueueSize} ->
Kvs2 = Kvs ++ Queue,
case length(Kvs2) >= ?MIN_FLUSH_BATCH_SIZE of
true ->
View
2  src/couchdb/couch_replicator.erl
@@ -655,7 +655,7 @@ changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
closed ->
From ! {closed, self()};
- {ok, Changes} ->
+ {ok, Changes, _Size} ->
#doc_info{local_seq = Seq} = lists:last(Changes),
ReportSeq = {Ts, Seq},
ok = gen_server:cast(Parent, {report_seq, ReportSeq}),
View
4 src/couchdb/couch_view_updater.erl
@@ -169,7 +169,7 @@ do_maps(#group{query_server = Qs} = Group, MapQueue, WriteQueue) ->
closed ->
couch_work_queue:close(WriteQueue),
couch_query_servers:stop_doc_map(Group#group.query_server);
- {ok, Queue} ->
+ {ok, Queue, _QueueSize} ->
lists:foreach(
fun({Seq, #doc{id = Id, deleted = true}}) ->
Item = {Seq, Id, []},
@@ -190,7 +190,7 @@ do_writes(Parent, Owner, Group, WriteQueue, InitialBuild, ViewEmptyKVs, Acc) ->
Group2 = flush_writes(
Parent, Owner, Group, InitialBuild, ViewEmptyKVs, Acc),
Parent ! {new_group, Group2};
- {ok, Queue} ->
+ {ok, Queue, _QueueSize} ->
Acc2 = Acc ++ Queue,
case length(Acc2) >= ?MIN_FLUSH_BATCH_SIZE of
true ->
View
10 src/couchdb/couch_work_queue.erl
@@ -102,8 +102,8 @@ handle_call({queue, Item, Size}, From, #q{work_waiters = []} = Q0) ->
{reply, ok, Q}
end;
-handle_call({queue, Item, _}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) ->
- gen_server:reply(W, {ok, [Item]}),
+handle_call({queue, Item, Size}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) ->
+ gen_server:reply(W, {ok, [Item], Size}),
{reply, ok, Q#q{work_waiters = Rest}};
handle_call({dequeue, Max}, From, Q) ->
@@ -144,16 +144,16 @@ deliver_queue_items(Max, Q) ->
Q2 = Q#q{
items = Count - Max, size = Size2, blocked = Blocked2, queue = Queue2
},
- {reply, {ok, Items}, Q2};
+ {reply, {ok, Items, Size - Size2}, Q2};
true ->
lists:foreach(fun(F) -> gen_server:reply(F, ok) end, Blocked),
Q2 = Q#q{items = 0, size = 0, blocked = [], queue = queue:new()},
Items = [Item || {Item, _} <- queue:to_list(Queue)],
case Close of
false ->
- {reply, {ok, Items}, Q2};
+ {reply, {ok, Items, Size}, Q2};
true ->
- {stop, normal, {ok, Items}, Q2}
+ {stop, normal, {ok, Items, Size}, Q2}
end
end.
View
34 test/etap/042-work-queue.t
@@ -52,7 +52,7 @@ test_single_consumer_max_item_count() ->
etap:is(ping(Producer), ok, "Producer not blocked"),
etap:is(ping(Consumer), ok, "Consumer unblocked"),
- etap:is(last_consumer_items(Consumer), {ok, [Item1]},
+ etap:is(last_consumer_items(Consumer), {ok, [Item1], 10},
"Consumer received the right item"),
Item2 = produce(Producer, 20),
@@ -70,14 +70,14 @@ test_single_consumer_max_item_count() ->
consume(Consumer, 2),
etap:is(ping(Consumer), ok,
"Consumer not blocked when attempting to dequeue 2 items from queue"),
- etap:is(last_consumer_items(Consumer), {ok, [Item2, Item3]},
+ etap:is(last_consumer_items(Consumer), {ok, [Item2, Item3], 35},
"Consumer received the right items"),
etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
consume(Consumer, 2),
etap:is(ping(Consumer), ok,
"Consumer not blocked when attempting to dequeue 2 items from queue"),
- etap:is(last_consumer_items(Consumer), {ok, [Item4]},
+ etap:is(last_consumer_items(Consumer), {ok, [Item4], 3},
"Consumer received the right item"),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
@@ -103,14 +103,14 @@ test_single_consumer_max_item_count() ->
etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
etap:is(ping(Consumer), ok, "Consumer unblocked"),
- etap:is(last_consumer_items(Consumer), {ok, [Item5]},
+ etap:is(last_consumer_items(Consumer), {ok, [Item5], 11},
"Consumer received the first queued item"),
etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
consume(Consumer, all),
etap:is(ping(Consumer), ok,
"Consumer not blocked when attempting to dequeue all items from queue"),
- etap:is(last_consumer_items(Consumer), {ok, [Item6, Item7, Item8]},
+ etap:is(last_consumer_items(Consumer), {ok, [Item6, Item7, Item8], 54},
"Consumer received all queued items"),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
@@ -145,7 +145,7 @@ test_single_consumer_max_size() ->
etap:is(ping(Producer), ok, "Producer not blocked"),
etap:is(ping(Consumer), ok, "Consumer unblocked"),
- etap:is(last_consumer_items(Consumer), {ok, [Item1]},
+ etap:is(last_consumer_items(Consumer), {ok, [Item1], 50},
"Consumer received the right item"),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
@@ -169,7 +169,7 @@ test_single_consumer_max_size() ->
consume(Consumer, 1),
etap:is(ping(Consumer), ok,
"Consumer not blocked when attempting to dequeue 1 item from full queue"),
- etap:is(last_consumer_items(Consumer), {ok, [Item2]},
+ etap:is(last_consumer_items(Consumer), {ok, [Item2], 50},
"Consumer received the right item"),
etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
etap:is(couch_work_queue:size(Q), 111, "Queue size is 111 bytes"),
@@ -190,7 +190,7 @@ test_single_consumer_max_size() ->
consume(Consumer, 2),
etap:is(ping(Consumer), ok,
"Consumer not blocked when attempting to dequeue 2 items from full queue"),
- etap:is(last_consumer_items(Consumer), {ok, [Item3, Item4]},
+ etap:is(last_consumer_items(Consumer), {ok, [Item3, Item4], 111},
"Consumer received the right items"),
etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
etap:is(couch_work_queue:size(Q), 60, "Queue size is 60 bytes"),
@@ -201,7 +201,7 @@ test_single_consumer_max_size() ->
consume(Consumer, all),
etap:is(ping(Consumer), ok,
"Consumer not blocked when attempting to dequeue all items from queue"),
- etap:is(last_consumer_items(Consumer), {ok, [Item5, Item6]},
+ etap:is(last_consumer_items(Consumer), {ok, [Item5, Item6], 60},
"Consumer received the right items"),
etap:is(couch_work_queue:item_count(Q), closed, "Queue closed"),
@@ -239,7 +239,7 @@ test_single_consumer_max_item_count_and_size() ->
consume(Consumer, all),
etap:is(ping(Consumer), ok,
"Consumer not blocked when attempting to dequeue all items from queue"),
- etap:is(last_consumer_items(Consumer), {ok, [Item1, Item2]},
+ etap:is(last_consumer_items(Consumer), {ok, [Item1, Item2], 210},
"Consumer received the right items"),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
@@ -265,7 +265,7 @@ test_single_consumer_max_item_count_and_size() ->
consume(Consumer, 1),
etap:is(ping(Consumer), ok,
"Consumer not blocked when attempting to dequeue 1 item from queue"),
- etap:is(last_consumer_items(Consumer), {ok, [Item3]},
+ etap:is(last_consumer_items(Consumer), {ok, [Item3], 10},
"Consumer received 1 item"),
etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
etap:is(couch_work_queue:size(Q), 6, "Queue size is 6 bytes"),
@@ -276,7 +276,7 @@ test_single_consumer_max_item_count_and_size() ->
consume(Consumer, 1),
etap:is(ping(Consumer), ok,
"Consumer not blocked when attempting to dequeue 1 item from queue"),
- etap:is(last_consumer_items(Consumer), {ok, [Item4]},
+ etap:is(last_consumer_items(Consumer), {ok, [Item4], 4},
"Consumer received 1 item"),
etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
etap:is(couch_work_queue:size(Q), 2, "Queue size is 2 bytes"),
@@ -291,7 +291,7 @@ test_single_consumer_max_item_count_and_size() ->
consume(Consumer, all),
etap:is(ping(Consumer), ok,
"Consumer not blocked when attempting to dequeue all items from queue"),
- etap:is(last_consumer_items(Consumer), {ok, [Item5, Item6]},
+ etap:is(last_consumer_items(Consumer), {ok, [Item5, Item6], 52},
"Consumer received all queued items"),
etap:is(couch_work_queue:item_count(Q), closed, "Queue closed"),
@@ -344,19 +344,19 @@ test_multiple_consumers() ->
etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
etap:is(ping(Consumer1), ok, "Consumer 1 unblocked"),
- etap:is(last_consumer_items(Consumer1), {ok, [Item1]},
+ etap:is(last_consumer_items(Consumer1), {ok, [Item1], 50},
"Consumer 1 received 1 item"),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
etap:is(ping(Consumer2), ok, "Consumer 2 unblocked"),
- etap:is(last_consumer_items(Consumer2), {ok, [Item2]},
+ etap:is(last_consumer_items(Consumer2), {ok, [Item2], 50},
"Consumer 2 received 1 item"),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
etap:is(ping(Consumer3), ok, "Consumer 3 unblocked"),
- etap:is(last_consumer_items(Consumer3), {ok, [Item3]},
+ etap:is(last_consumer_items(Consumer3), {ok, [Item3], 50},
"Consumer 3 received 1 item"),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
@@ -379,7 +379,7 @@ test_multiple_consumers() ->
etap:is(close_queue(Q), ok, "Closed queue"),
etap:is(ping(Consumer1), ok, "Consumer 1 unblocked"),
- etap:is(last_consumer_items(Consumer1), {ok, [Item4]},
+ etap:is(last_consumer_items(Consumer1), {ok, [Item4], 50},
"Consumer 1 received 1 item"),
etap:is(couch_work_queue:item_count(Q), closed, "Queue closed"),
Please sign in to comment.
Something went wrong with that request. Please try again.