Skip to content
This repository has been archived by the owner on Sep 19, 2019. It is now read-only.

Commit

Permalink
Merge pull request #65 from cloudant/13311-improve-view-back-pressure
Browse files Browse the repository at this point in the history
Use rexi:stream/1 for view backpressure

BugzID: 13311
  • Loading branch information
kocolosk committed Nov 20, 2012
2 parents 37b66d3 + e1eff10 commit b73311b
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 56 deletions.
2 changes: 1 addition & 1 deletion src/fabric_util.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ remove_down_workers(Workers, BadNode) ->
end. end.


submit_jobs(Shards, EndPoint, ExtraArgs) -> submit_jobs(Shards, EndPoint, ExtraArgs) ->
submit_jobs(Shards, fabric_rpc, EndPoint, ExtraArgs). submit_jobs(Shards, fabric_rpc2, EndPoint, ExtraArgs).


submit_jobs(Shards, Module, EndPoint, ExtraArgs) -> submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
lists:map(fun(#shard{node=Node, name=ShardName} = Shard) -> lists:map(fun(#shard{node=Node, name=ShardName} = Shard) ->
Expand Down
45 changes: 9 additions & 36 deletions src/fabric_view.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
-module(fabric_view). -module(fabric_view).


-export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1, -export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1,
maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1, transform_row/1, keydict/1, extract_view/4, get_shards/2,
extract_view/4, get_shards/2, remove_down_shards/2]). remove_down_shards/2]).


-include("fabric.hrl"). -include("fabric.hrl").
-include_lib("mem3/include/mem3.hrl"). -include_lib("mem3/include/mem3.hrl").
Expand Down Expand Up @@ -91,31 +91,6 @@ remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) ->
end end
end, Shards). end, Shards).


maybe_pause_worker(Worker, From, State) ->
#collector{buffer_size = BufferSize, counters = Counters} = State,
case fabric_dict:lookup_element(Worker, Counters) of
BufferSize ->
State#collector{blocked = [{Worker,From} | State#collector.blocked]};
_Count ->
gen_server:reply(From, ok),
State
end.

maybe_resume_worker(Worker, State) ->
#collector{buffer_size = Buffer, counters = C, blocked = B} = State,
case fabric_dict:lookup_element(Worker, C) of
Count when Count < Buffer/2 ->
case couch_util:get_value(Worker, B) of
undefined ->
State;
From ->
gen_server:reply(From, ok),
State#collector{blocked = lists:keydelete(Worker, 1, B)}
end;
_Other ->
State
end.

maybe_send_row(#collector{limit=0} = State) -> maybe_send_row(#collector{limit=0} = State) ->
#collector{counters=Counters, user_acc=AccIn, callback=Callback} = State, #collector{counters=Counters, user_acc=AccIn, callback=Callback} = State,
case fabric_dict:any(0, Counters) of case fabric_dict:any(0, Counters) of
Expand Down Expand Up @@ -226,25 +201,23 @@ get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined ->
case dict:find(Key, RowDict) of case dict:find(Key, RowDict) of
{ok, Records} -> {ok, Records} ->
NewRowDict = dict:erase(Key, RowDict), NewRowDict = dict:erase(Key, RowDict),
Counters = lists:foldl(fun(#view_row{worker=Worker}, CountersAcc) -> Counters = lists:foldl(fun(#view_row{worker={Worker,From}}, CntrsAcc) ->
fabric_dict:update_counter(Worker, -1, CountersAcc) rexi:stream_ack(From),
fabric_dict:update_counter(Worker, -1, CntrsAcc)
end, Counters0, Records), end, Counters0, Records),
Wrapped = [[V] || #view_row{value=V} <- Records], Wrapped = [[V] || #view_row{value=V} <- Records],
{ok, [Reduced]} = couch_query_servers:rereduce(Proc, [RedSrc], Wrapped), {ok, [Reduced]} = couch_query_servers:rereduce(Proc, [RedSrc], Wrapped),
NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters}, NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters},
NewState = lists:foldl(fun(#view_row{worker=Worker}, StateAcc) -> {#view_row{key=Key, id=reduced, value=Reduced}, NewSt};
maybe_resume_worker(Worker, StateAcc)
end, NewSt, Records),
{#view_row{key=Key, id=reduced, value=Reduced}, NewState};
error -> error ->
get_next_row(St#collector{keys=RestKeys}) get_next_row(St#collector{keys=RestKeys})
end; end;
get_next_row(State) -> get_next_row(State) ->
#collector{rows = [Row|Rest], counters = Counters0} = State, #collector{rows = [Row|Rest], counters = Counters0} = State,
Worker = Row#view_row.worker, {Worker, From} = Row#view_row.worker,
rexi:stream_ack(From),
Counters1 = fabric_dict:update_counter(Worker, -1, Counters0), Counters1 = fabric_dict:update_counter(Worker, -1, Counters0),
NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}), {Row, State#collector{rows = Rest, counters=Counters1}}.
{Row, NewState#collector{rows = Rest}}.


find_next_key(nil, Dir, RowDict) -> find_next_key(nil, Dir, RowDict) ->
case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of
Expand Down
7 changes: 2 additions & 5 deletions src/fabric_view_all_docs.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@


go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) -> go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) ->
Workers = fabric_util:submit_jobs(mem3:shards(DbName),all_docs,[QueryArgs]), Workers = fabric_util:submit_jobs(mem3:shards(DbName),all_docs,[QueryArgs]),
BufferSize = couch_config:get("fabric", "map_buffer_size", "2"),
#view_query_args{limit = Limit, skip = Skip} = QueryArgs, #view_query_args{limit = Limit, skip = Skip} = QueryArgs,
State = #collector{ State = #collector{
query_args = QueryArgs, query_args = QueryArgs,
callback = Callback, callback = Callback,
buffer_size = list_to_integer(BufferSize),
counters = fabric_dict:init(Workers, 0), counters = fabric_dict:init(Workers, 0),
skip = Skip, skip = Skip,
limit = Limit, limit = Limit,
Expand Down Expand Up @@ -124,11 +122,10 @@ handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) ->
handle_message(#view_row{} = Row, {Worker, From}, State) -> handle_message(#view_row{} = Row, {Worker, From}, State) ->
#collector{query_args = Args, counters = Counters0, rows = Rows0} = State, #collector{query_args = Args, counters = Counters0, rows = Rows0} = State,
Dir = Args#view_query_args.direction, Dir = Args#view_query_args.direction,
Rows = merge_row(Dir, Row#view_row{worker=Worker}, Rows0), Rows = merge_row(Dir, Row#view_row{worker={Worker, From}}, Rows0),
Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
State1 = State#collector{rows=Rows, counters=Counters1}, State1 = State#collector{rows=Rows, counters=Counters1},
State2 = fabric_view:maybe_pause_worker(Worker, From, State1), fabric_view:maybe_send_row(State1);
fabric_view:maybe_send_row(State2);


handle_message(complete, Worker, State) -> handle_message(complete, Worker, State) ->
Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
Expand Down
4 changes: 2 additions & 2 deletions src/fabric_view_changes.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) -> Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) ->
case lists:member(Shard, AllLiveShards) of case lists:member(Shard, AllLiveShards) of
true -> true ->
Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}), Ref = rexi:cast(N, {fabric_rpc2, changes, [Name,ChangesArgs,Seq]}),
[{Shard#shard{ref = Ref}, Seq}]; [{Shard#shard{ref = Ref}, Seq}];
false -> false ->
% Find some replacement shards to cover the missing range % Find some replacement shards to cover the missing range
% TODO It's possible in rare cases of shard merging to end up % TODO It's possible in rare cases of shard merging to end up
% with overlapping shard ranges from this technique % with overlapping shard ranges from this technique
lists:map(fun(#shard{name=Name2, node=N2} = NewShard) -> lists:map(fun(#shard{name=Name2, node=N2} = NewShard) ->
Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}), Ref = rexi:cast(N2, {fabric_rpc2, changes, [Name2,ChangesArgs,0]}),
{NewShard#shard{ref = Ref}, 0} {NewShard#shard{ref = Ref}, 0}
end, find_replacement_shards(Shard, AllLiveShards)) end, find_replacement_shards(Shard, AllLiveShards))
end end
Expand Down
9 changes: 3 additions & 6 deletions src/fabric_view_map.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@ go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
go(DbName, DDoc, View, Args, Callback, Acc0) -> go(DbName, DDoc, View, Args, Callback, Acc0) ->
Shards = fabric_view:get_shards(DbName, Args), Shards = fabric_view:get_shards(DbName, Args),
Workers = fabric_util:submit_jobs(Shards, map_view, [DDoc, View, Args]), Workers = fabric_util:submit_jobs(Shards, map_view, [DDoc, View, Args]),
BufferSize = couch_config:get("fabric", "map_buffer_size", "2"),
#view_query_args{limit = Limit, skip = Skip, keys = Keys} = Args, #view_query_args{limit = Limit, skip = Skip, keys = Keys} = Args,
State = #collector{ State = #collector{
db_name=DbName, db_name=DbName,
query_args = Args, query_args = Args,
callback = Callback, callback = Callback,
buffer_size = list_to_integer(BufferSize),
counters = fabric_dict:init(Workers, 0), counters = fabric_dict:init(Workers, 0),
skip = Skip, skip = Skip,
limit = Limit, limit = Limit,
Expand Down Expand Up @@ -115,7 +113,7 @@ handle_message(#view_row{}, {_, _}, #collector{limit=0} = State) ->
handle_message(#view_row{} = Row, {_,From}, #collector{sorted=false} = St) -> handle_message(#view_row{} = Row, {_,From}, #collector{sorted=false} = St) ->
#collector{callback=Callback, user_acc=AccIn, limit=Limit} = St, #collector{callback=Callback, user_acc=AccIn, limit=Limit} = St,
{Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn), {Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn),
gen_server:reply(From, ok), rexi:stream_ack(From),
{Go, St#collector{user_acc=Acc, limit=Limit-1}}; {Go, St#collector{user_acc=Acc, limit=Limit-1}};


handle_message(#view_row{} = Row, {Worker, From}, State) -> handle_message(#view_row{} = Row, {Worker, From}, State) ->
Expand All @@ -125,11 +123,10 @@ handle_message(#view_row{} = Row, {Worker, From}, State) ->
rows = Rows0, rows = Rows0,
keys = KeyDict keys = KeyDict
} = State, } = State,
Rows = merge_row(Dir, KeyDict, Row#view_row{worker=Worker}, Rows0), Rows = merge_row(Dir, KeyDict, Row#view_row{worker={Worker, From}}, Rows0),
Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
State1 = State#collector{rows=Rows, counters=Counters1}, State1 = State#collector{rows=Rows, counters=Counters1},
State2 = fabric_view:maybe_pause_worker(Worker, From, State1), fabric_view:maybe_send_row(State1);
fabric_view:maybe_send_row(State2);


handle_message(complete, Worker, State) -> handle_message(complete, Worker, State) ->
Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
Expand Down
9 changes: 3 additions & 6 deletions src/fabric_view_reduce.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ go(DbName, DDoc, VName, Args, Callback, Acc0) ->
{NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce), {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce),
{VName, RedSrc} = lists:nth(NthRed, View#view.reduce_funs), {VName, RedSrc} = lists:nth(NthRed, View#view.reduce_funs),
Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) -> Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) ->
Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,Group,VName,Args]}), Ref = rexi:cast(N, {fabric_rpc2, reduce_view, [Name,Group,VName,Args]}),
Shard#shard{ref = Ref} Shard#shard{ref = Ref}
end, fabric_view:get_shards(DbName, Args)), end, fabric_view:get_shards(DbName, Args)),
RexiMon = fabric_util:create_monitors(Workers), RexiMon = fabric_util:create_monitors(Workers),
BufferSize = couch_config:get("fabric", "reduce_buffer_size", "20"),
#view_query_args{limit = Limit, skip = Skip} = Args, #view_query_args{limit = Limit, skip = Skip} = Args,
OsProc = case os_proc_needed(RedSrc) of OsProc = case os_proc_needed(RedSrc) of
true -> couch_query_servers:get_os_process(Lang); true -> couch_query_servers:get_os_process(Lang);
Expand All @@ -44,7 +43,6 @@ go(DbName, DDoc, VName, Args, Callback, Acc0) ->
db_name = DbName, db_name = DbName,
query_args = Args, query_args = Args,
callback = Callback, callback = Callback,
buffer_size = list_to_integer(BufferSize),
counters = fabric_dict:init(Workers, 0), counters = fabric_dict:init(Workers, 0),
keys = Args#view_query_args.keys, keys = Args#view_query_args.keys,
skip = Skip, skip = Skip,
Expand Down Expand Up @@ -94,13 +92,12 @@ handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
gen_server:reply(From, stop), gen_server:reply(From, stop),
{ok, State}; {ok, State};
_ -> _ ->
Rows = dict:append(Key, Row#view_row{worker=Worker}, Rows0), Rows = dict:append(Key, Row#view_row{worker={Worker, From}}, Rows0),
C1 = fabric_dict:update_counter(Worker, 1, Counters0), C1 = fabric_dict:update_counter(Worker, 1, Counters0),
% TODO time this call, if slow don't do it every time % TODO time this call, if slow don't do it every time
C2 = fabric_view:remove_overlapping_shards(Worker, C1), C2 = fabric_view:remove_overlapping_shards(Worker, C1),
State1 = State#collector{rows=Rows, counters=C2}, State1 = State#collector{rows=Rows, counters=C2},
State2 = fabric_view:maybe_pause_worker(Worker, From, State1), fabric_view:maybe_send_row(State1)
fabric_view:maybe_send_row(State2)
end; end;


handle_message(complete, Worker, #collector{counters = Counters0} = State) -> handle_message(complete, Worker, #collector{counters = Counters0} = State) ->
Expand Down

0 comments on commit b73311b

Please sign in to comment.