Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Stop fold when a vnode reaches page size #586

Merged
merged 1 commit into from

2 participants

@engelsanchez
Collaborator

Currently, 2i pagination may, with a sufficiently slow participating
node, receive many, many more keys than the page size (max_results) from
all the other participating nodes.
This change will track how many results have been received per vnode,
and send a stop fold message to a vnode as soon as it has sent more than
max_results.
Note, however, that vnodes will send in chunks possibly much larger than
the page size (100 items at a time I believe). It would be ideal to pass
max_results to the folding function and have it stop when it reaches
that number. But that will have to wait until later. Release is near.
This should at least help prevent a worst case of unbounded growth that could
consume all the node memory simply because one node is too slow.

/cc @russelldb @jtuple @evanmcc @jrwest

@engelsanchez engelsanchez Stop fold when a vnode reaches page size
Currently, 2i pagination may, with a sufficiently slow participating
node, receive many, many more keys than the page size (max_results) from
all the other participating nodes.
This change will track how many results have been received per vnode,
and send a stop fold message to a vnode as soon as it has sent more than
max_results.
Note, however, that vnodes will send in chunks possibly much larger than
the page size (100 items at a time I believe). It would be ideal to pass
max_results to the folding function and have it stop when it reaches
that number. But that will have to wait until later. Release is near.
This should at least prevent a worst case of unbounded growth that could
consume all the node memory
322eee9
@russelldb
Owner

Looks good to me. A sensible addition.

@engelsanchez engelsanchez merged commit 89177c7 into from
@engelsanchez engelsanchez deleted the branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jun 25, 2013
  1. @engelsanchez

    Stop fold when a vnode reaches page size

    engelsanchez authored
    Currently, 2i pagination may, with a sufficiently slow participating
    node, receive many, many more keys than the page size (max_results) from
    all the other participating nodes.
    This change will track how many results have been received per vnode,
    and send a stop fold message to a vnode as soon as it has sent more than
    max_results.
    Note, however, that vnodes will send in chunks possibly much larger than
    the page size (100 items at a time I believe). It would be ideal to pass
    max_results to the folding function and have it stop when it reaches
    that number. But that will have to wait until later. Release is near.
    This should at least prevent a worst case of unbounded growth that could
    consume all the node memory
This page is out of date. Refresh to see the latest.
Showing with 17 additions and 4 deletions.
  1. +17 −4 src/riak_kv_index_fsm.erl
View
21 src/riak_kv_index_fsm.erl
@@ -53,6 +53,7 @@
-record(state, {from :: from(),
merge_sort_buffer :: sms:sms(),
max_results :: all | pos_integer(),
+ results_per_vnode = dict:new() :: dict(),
results_sent = 0 :: non_neg_integer()}).
%% @doc Returns `true' if the new ack-based backpressure index
@@ -105,21 +106,33 @@ process_results(_VNode, {From, _Bucket, _Results}, State=#state{max_results=X, r
process_results(VNode, {From, Bucket, Results}, State) ->
case process_results(VNode, {Bucket, Results}, State) of
{ok, State2} ->
- riak_kv_vnode:ack_keys(From),
- {ok, State2};
+ #state{results_per_vnode=PerNode, max_results=MaxResults} = State2,
+ VNodeCount = dict:fetch(VNode, PerNode),
+ case VNodeCount < MaxResults of
+ true ->
+ riak_kv_vnode:ack_keys(From),
+ {ok, State2};
+ false ->
+ riak_kv_vnode:stop_fold(From),
+ {done, State2}
+ end;
{done, State2} ->
riak_kv_vnode:stop_fold(From),
{done, State2}
end;
process_results(VNode, {_Bucket, Results}, State) ->
- #state{merge_sort_buffer=MergeSortBuffer,
+ #state{merge_sort_buffer=MergeSortBuffer, results_per_vnode=PerNode,
from={raw, ReqId, ClientPid}, results_sent=ResultsSent, max_results=MaxResults} = State,
%% add new results to buffer
{ToSend, NewBuff} = update_buffer(VNode, Results, MergeSortBuffer),
+ NumResults = length(Results),
+ NewPerNode = dict:update(VNode, fun(C) -> C + NumResults end, NumResults, PerNode),
LenToSend = length(ToSend),
{Response, ResultsLen, ResultsToSend} = get_results_to_send(LenToSend, ToSend, ResultsSent, MaxResults),
send_results(ClientPid, ReqId, ResultsToSend),
- {Response, State#state{merge_sort_buffer=NewBuff, results_sent=ResultsSent+ResultsLen}};
+ {Response, State#state{merge_sort_buffer=NewBuff,
+ results_per_vnode=NewPerNode,
+ results_sent=ResultsSent+ResultsLen}};
process_results(VNode, done, State) ->
%% tell the sms buffer about the done vnode
#state{merge_sort_buffer=MergeSortBuffer} = State,
Something went wrong with that request. Please try again.