Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Use rexi:stream/1 for view backpressure #65

Merged
merged 3 commits into from

2 participants

@kocolosk
Owner

This uses the new rexi:stream/1 API to allow rexi workers to stream results back to the coordinator process. This is intended to reduce the sensitivity of views to RTT between nodes involved in a view response.

@davisp davisp Use rexi:stream/1 for view backpressure
This uses the new rexi:stream/1 API to allow rexi workers to stream
results back to the coordinator process. This is intended to reduce the
sensitivity of views to RTT between nodes involved in a view response.
a448312
@kocolosk
Owner

Thinking about the upgrade strategy for something like this ... it's tricky. We could introduce the new rexi:stream interface and add new endpoints in fabric_rpc which use that interface (rather than replacing existing ones). Then in a second commit we'd modify the coordinator functions to use the new endpoints. There's a good bit of coordination involved.

Owner

@kocolosk Yeah. I'm not at all sure on the best way here. I'm currently puzzling through how to try and maintain the merge sort properly between shards but I keep needing to add a field to the collector record which worries me because of how its used in fabric_view_changes for long running requests.

kocolosk added some commits
@kocolosk kocolosk Use fabric_rpc2 endpoints
BugzID: 13311
093b86a
@kocolosk kocolosk Preserve original implementation in fabric_rpc
Coordinators on nodes running the old release will be the only ones
hitting fabric_rpc for views, and those coordinators will be expecting
the original API.

BugzID: 13311
e1eff10
@kocolosk
Owner

I think this is ready to go. There's a bit of new stuff in fabric_rpc regarding attachments, but fabric_doc_update is invoking the fabric_rpc endpoint directly so we'll still pick that up.

@davisp
Owner

LGTM

@kocolosk kocolosk merged commit b73311b into master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jun 14, 2012
  1. @davisp

    Use rexi:stream/1 for view backpressure

    davisp authored
    This uses the new rexi:stream/1 API to allow rexi workers to stream
    results back to the coordinator process. This is intended to reduce the
    sensitivity of views to RTT between nodes involved in a view response.
Commits on Nov 20, 2012
  1. @kocolosk

    Use fabric_rpc2 endpoints

    kocolosk authored
    BugzID: 13311
  2. @kocolosk

    Preserve original implementation in fabric_rpc

    kocolosk authored
    Coordinators on nodes running the old release will be the only ones
    hitting fabric_rpc for views, and those coordinators will be expecting
    the original API.
    
    BugzID: 13311
This page is out of date. Refresh to see the latest.
View
2  src/fabric_util.erl
@@ -34,7 +34,7 @@ remove_down_workers(Workers, BadNode) ->
end.
submit_jobs(Shards, EndPoint, ExtraArgs) ->
- submit_jobs(Shards, fabric_rpc, EndPoint, ExtraArgs).
+ submit_jobs(Shards, fabric_rpc2, EndPoint, ExtraArgs).
submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
lists:map(fun(#shard{node=Node, name=ShardName} = Shard) ->
View
45 src/fabric_view.erl
@@ -15,8 +15,8 @@
-module(fabric_view).
-export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1,
- maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1,
- extract_view/4, get_shards/2, remove_down_shards/2]).
+ transform_row/1, keydict/1, extract_view/4, get_shards/2,
+ remove_down_shards/2]).
-include("fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
@@ -91,31 +91,6 @@ remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) ->
end
end, Shards).
-maybe_pause_worker(Worker, From, State) ->
- #collector{buffer_size = BufferSize, counters = Counters} = State,
- case fabric_dict:lookup_element(Worker, Counters) of
- BufferSize ->
- State#collector{blocked = [{Worker,From} | State#collector.blocked]};
- _Count ->
- gen_server:reply(From, ok),
- State
- end.
-
-maybe_resume_worker(Worker, State) ->
- #collector{buffer_size = Buffer, counters = C, blocked = B} = State,
- case fabric_dict:lookup_element(Worker, C) of
- Count when Count < Buffer/2 ->
- case couch_util:get_value(Worker, B) of
- undefined ->
- State;
- From ->
- gen_server:reply(From, ok),
- State#collector{blocked = lists:keydelete(Worker, 1, B)}
- end;
- _Other ->
- State
- end.
-
maybe_send_row(#collector{limit=0} = State) ->
#collector{counters=Counters, user_acc=AccIn, callback=Callback} = State,
case fabric_dict:any(0, Counters) of
@@ -226,25 +201,23 @@ get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined ->
case dict:find(Key, RowDict) of
{ok, Records} ->
NewRowDict = dict:erase(Key, RowDict),
- Counters = lists:foldl(fun(#view_row{worker=Worker}, CountersAcc) ->
- fabric_dict:update_counter(Worker, -1, CountersAcc)
+ Counters = lists:foldl(fun(#view_row{worker={Worker,From}}, CntrsAcc) ->
+ rexi:stream_ack(From),
+ fabric_dict:update_counter(Worker, -1, CntrsAcc)
end, Counters0, Records),
Wrapped = [[V] || #view_row{value=V} <- Records],
{ok, [Reduced]} = couch_query_servers:rereduce(Proc, [RedSrc], Wrapped),
NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters},
- NewState = lists:foldl(fun(#view_row{worker=Worker}, StateAcc) ->
- maybe_resume_worker(Worker, StateAcc)
- end, NewSt, Records),
- {#view_row{key=Key, id=reduced, value=Reduced}, NewState};
+ {#view_row{key=Key, id=reduced, value=Reduced}, NewSt};
error ->
get_next_row(St#collector{keys=RestKeys})
end;
get_next_row(State) ->
#collector{rows = [Row|Rest], counters = Counters0} = State,
- Worker = Row#view_row.worker,
+ {Worker, From} = Row#view_row.worker,
+ rexi:stream_ack(From),
Counters1 = fabric_dict:update_counter(Worker, -1, Counters0),
- NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}),
- {Row, NewState#collector{rows = Rest}}.
+ {Row, State#collector{rows = Rest, counters=Counters1}}.
find_next_key(nil, Dir, RowDict) ->
case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of
View
7 src/fabric_view_all_docs.erl
@@ -23,12 +23,10 @@
go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) ->
Workers = fabric_util:submit_jobs(mem3:shards(DbName),all_docs,[QueryArgs]),
- BufferSize = couch_config:get("fabric", "map_buffer_size", "2"),
#view_query_args{limit = Limit, skip = Skip} = QueryArgs,
State = #collector{
query_args = QueryArgs,
callback = Callback,
- buffer_size = list_to_integer(BufferSize),
counters = fabric_dict:init(Workers, 0),
skip = Skip,
limit = Limit,
@@ -124,11 +122,10 @@ handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) ->
handle_message(#view_row{} = Row, {Worker, From}, State) ->
#collector{query_args = Args, counters = Counters0, rows = Rows0} = State,
Dir = Args#view_query_args.direction,
- Rows = merge_row(Dir, Row#view_row{worker=Worker}, Rows0),
+ Rows = merge_row(Dir, Row#view_row{worker={Worker, From}}, Rows0),
Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
State1 = State#collector{rows=Rows, counters=Counters1},
- State2 = fabric_view:maybe_pause_worker(Worker, From, State1),
- fabric_view:maybe_send_row(State2);
+ fabric_view:maybe_send_row(State1);
handle_message(complete, Worker, State) ->
Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
View
4 src/fabric_view_changes.erl
@@ -96,14 +96,14 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) ->
case lists:member(Shard, AllLiveShards) of
true ->
- Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}),
+ Ref = rexi:cast(N, {fabric_rpc2, changes, [Name,ChangesArgs,Seq]}),
[{Shard#shard{ref = Ref}, Seq}];
false ->
% Find some replacement shards to cover the missing range
% TODO It's possible in rare cases of shard merging to end up
% with overlapping shard ranges from this technique
lists:map(fun(#shard{name=Name2, node=N2} = NewShard) ->
- Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}),
+ Ref = rexi:cast(N2, {fabric_rpc2, changes, [Name2,ChangesArgs,0]}),
{NewShard#shard{ref = Ref}, 0}
end, find_replacement_shards(Shard, AllLiveShards))
end
View
9 src/fabric_view_map.erl
@@ -27,13 +27,11 @@ go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
go(DbName, DDoc, View, Args, Callback, Acc0) ->
Shards = fabric_view:get_shards(DbName, Args),
Workers = fabric_util:submit_jobs(Shards, map_view, [DDoc, View, Args]),
- BufferSize = couch_config:get("fabric", "map_buffer_size", "2"),
#view_query_args{limit = Limit, skip = Skip, keys = Keys} = Args,
State = #collector{
db_name=DbName,
query_args = Args,
callback = Callback,
- buffer_size = list_to_integer(BufferSize),
counters = fabric_dict:init(Workers, 0),
skip = Skip,
limit = Limit,
@@ -115,7 +113,7 @@ handle_message(#view_row{}, {_, _}, #collector{limit=0} = State) ->
handle_message(#view_row{} = Row, {_,From}, #collector{sorted=false} = St) ->
#collector{callback=Callback, user_acc=AccIn, limit=Limit} = St,
{Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn),
- gen_server:reply(From, ok),
+ rexi:stream_ack(From),
{Go, St#collector{user_acc=Acc, limit=Limit-1}};
handle_message(#view_row{} = Row, {Worker, From}, State) ->
@@ -125,11 +123,10 @@ handle_message(#view_row{} = Row, {Worker, From}, State) ->
rows = Rows0,
keys = KeyDict
} = State,
- Rows = merge_row(Dir, KeyDict, Row#view_row{worker=Worker}, Rows0),
+ Rows = merge_row(Dir, KeyDict, Row#view_row{worker={Worker, From}}, Rows0),
Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
State1 = State#collector{rows=Rows, counters=Counters1},
- State2 = fabric_view:maybe_pause_worker(Worker, From, State1),
- fabric_view:maybe_send_row(State2);
+ fabric_view:maybe_send_row(State1);
handle_message(complete, Worker, State) ->
Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
View
9 src/fabric_view_reduce.erl
@@ -30,17 +30,15 @@ go(DbName, DDoc, VName, Args, Callback, Acc0) ->
{NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce),
{VName, RedSrc} = lists:nth(NthRed, View#view.reduce_funs),
Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) ->
- Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,Group,VName,Args]}),
+ Ref = rexi:cast(N, {fabric_rpc2, reduce_view, [Name,Group,VName,Args]}),
Shard#shard{ref = Ref}
end, fabric_view:get_shards(DbName, Args)),
RexiMon = fabric_util:create_monitors(Workers),
- BufferSize = couch_config:get("fabric", "reduce_buffer_size", "20"),
#view_query_args{limit = Limit, skip = Skip} = Args,
State = #collector{
db_name = DbName,
query_args = Args,
callback = Callback,
- buffer_size = list_to_integer(BufferSize),
counters = fabric_dict:init(Workers, 0),
keys = Args#view_query_args.keys,
skip = Skip,
@@ -87,13 +85,12 @@ handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
gen_server:reply(From, stop),
{ok, State};
_ ->
- Rows = dict:append(Key, Row#view_row{worker=Worker}, Rows0),
+ Rows = dict:append(Key, Row#view_row{worker={Worker, From}}, Rows0),
C1 = fabric_dict:update_counter(Worker, 1, Counters0),
% TODO time this call, if slow don't do it every time
C2 = fabric_view:remove_overlapping_shards(Worker, C1),
State1 = State#collector{rows=Rows, counters=C2},
- State2 = fabric_view:maybe_pause_worker(Worker, From, State1),
- fabric_view:maybe_send_row(State2)
+ fabric_view:maybe_send_row(State1)
end;
handle_message(complete, Worker, #collector{counters = Counters0} = State) ->
Something went wrong with that request. Please try again.