Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

ack-based listkeys backpressure between vnode and fsm/fitting

enable by setting riak_kv:listkeys_backpressure to 'true'
  • Loading branch information...
commit 24f103d14734bb65b6f1db9c95b2e25d358b15db 1 parent afdeec2
@beerriot beerriot authored
View
7 include/riak_kv_vnode.hrl
@@ -29,6 +29,11 @@
bucket :: binary() | tuple(),
item_filter :: function()}).
+%% same as _v3, but triggers ack-based backpressure
+-record(riak_kv_listkeys_req_v4, {
+ bucket :: binary() | tuple(),
+ item_filter :: function()}).
+
-record(riak_kv_listbuckets_req_v1, {
item_filter :: function()}).
@@ -57,7 +62,7 @@
-define(KV_GET_REQ, #riak_kv_get_req_v1).
-define(KV_MGET_REQ, #riak_kv_mget_req_v1).
-define(KV_LISTBUCKETS_REQ, #riak_kv_listbuckets_req_v1).
--define(KV_LISTKEYS_REQ, #riak_kv_listkeys_req_v3).
+-define(KV_LISTKEYS_REQ, #riak_kv_listkeys_req_v4).
-define(KV_INDEX_REQ, #riak_kv_index_req_v1).
-define(KV_VNODE_STATUS_REQ, #riak_kv_vnode_status_req_v1).
-define(KV_DELETE_REQ, #riak_kv_delete_req_v1).
View
7 src/riak_kv.app.src
@@ -33,6 +33,13 @@
%% listing operations.
{legacy_keylisting, true},
+ %% This option toggles compatibility of keylisting with 1.0
+ %% and earlier versions. Once a rolling upgrade to a version
+ %% > 1.0 is completed for a cluster, this should be set to
+ %% true for better control of memory usage during key listing
+ %% operations
+ {listkeys_backpressure, false},
+
%% use the legacy routines for tracking kv stats
{legacy_stats, true},
View
31 src/riak_kv_keys_fsm.erl
@@ -42,6 +42,8 @@
-export([init/2,
process_results/2,
finish/2]).
+-export([use_ack_backpressure/0,
+ req/2]).
-type from() :: {atom(), req_id(), pid()}.
-type req_id() :: non_neg_integer().
@@ -49,6 +51,26 @@
-record(state, {client_type :: plain | mapred,
from :: from()}).
+%% @doc Returns `true' if the new ack-based backpressure listkeys
+%% protocol should be used. This decision is based on the
+%% `listkeys_backpressure' setting in `riak_kv''s application
+%% environment.
+-spec use_ack_backpressure() -> boolean().
+use_ack_backpressure() ->
+ app_helper:get_env(riak_kv, listkeys_backpressure) == true.
+
+%% @doc Construct the correct listkeys command record.
+-spec req(binary(), term()) -> term().
+req(Bucket, ItemFilter) ->
+ case use_ack_backpressure() of
+ true ->
+ ?KV_LISTKEYS_REQ{bucket=Bucket,
+ item_filter=ItemFilter};
+ false ->
+ #riak_kv_listkeys_req_v3{bucket=Bucket,
+ item_filter=ItemFilter}
+ end.
+
%% @doc Return a tuple containing the ModFun to call per vnode,
%% the number of primary preflist vnodes the operation
%% should cover, the service to use to check for available nodes,
@@ -65,11 +87,16 @@ init(From={_, _, ClientPid}, [Bucket, ItemFilter, Timeout, ClientType]) ->
BucketProps = riak_core_bucket:get_bucket(Bucket),
NVal = proplists:get_value(n_val, BucketProps),
%% Construct the key listing request
- Req = ?KV_LISTKEYS_REQ{bucket=Bucket,
- item_filter=ItemFilter},
+ Req = req(Bucket, ItemFilter),
{Req, all, NVal, 1, riak_kv, riak_kv_vnode_master, Timeout,
#state{client_type=ClientType, from=From}}.
+process_results({From, Bucket, Keys},
+ StateData=#state{client_type=ClientType,
+ from={raw, ReqId, ClientPid}}) ->
+ process_keys(ClientType, Bucket, Keys, ReqId, ClientPid),
+ riak_kv_vnode:ack_keys(From), % tell that vnode we're ready for more
+ {ok, StateData};
process_results({Bucket, Keys},
StateData=#state{client_type=ClientType,
from={raw, ReqId, ClientPid}}) ->
View
8 src/riak_kv_pipe_listkeys.erl
@@ -45,7 +45,6 @@
done/1,
queue_existing_pipe/3]).
--include("riak_kv_vnode.hrl").
-include_lib("riak_pipe/include/riak_pipe.hrl").
-include_lib("riak_pipe/include/riak_pipe_log.hrl").
@@ -76,8 +75,7 @@ process(Input, _Last, #state{p=Partition, fd=FittingDetails}=State) ->
end,
ReqId = erlang:phash2(erlang:now()), % stolen from riak_client
riak_core_vnode_master:coverage(
- ?KV_LISTKEYS_REQ{bucket=Bucket,
- item_filter=Filters},
+ riak_kv_keys_fsm:req(Bucket, Filters),
{Partition, node()},
FilterVNodes,
{raw, ReqId, self()},
@@ -87,6 +85,10 @@ process(Input, _Last, #state{p=Partition, fd=FittingDetails}=State) ->
keysend_loop(ReqId, Partition, FittingDetails) ->
receive
+ {ReqId, {From, Bucket, Keys}} ->
+ keysend(Bucket, Keys, Partition, FittingDetails),
+ riak_kv_vnode:ack_keys(From),
+ keysend_loop(ReqId, Partition, FittingDetails);
{ReqId, {Bucket, Keys}} ->
keysend(Bucket, Keys, Partition, FittingDetails),
keysend_loop(ReqId, Partition, FittingDetails);
View
95 src/riak_kv_vnode.erl
@@ -37,7 +37,8 @@
list_keys/4,
fold/3,
get_vclocks/2,
- vnode_status/1]).
+ vnode_status/1,
+ ack_keys/1]).
%% riak_core_vnode API
-export([init/1,
@@ -399,34 +400,20 @@ handle_coverage(?KV_LISTBUCKETS_REQ{item_filter=ItemFilter},
_ ->
{noreply, State}
end;
+handle_coverage(#riak_kv_listkeys_req_v3{bucket=Bucket,
+ item_filter=ItemFilter},
+ FilterVNodes, Sender, State) ->
+ %% v3 == no backpressure
+ ResultFun = result_fun(Bucket, Sender),
+ handle_coverage_listkeys(Bucket, ItemFilter, ResultFun,
+ FilterVNodes, Sender, State);
handle_coverage(?KV_LISTKEYS_REQ{bucket=Bucket,
item_filter=ItemFilter},
- FilterVNodes,
- Sender,
- State=#state{async_backend=AsyncBackend,
- idx=Index,
- key_buf_size=BufferSize,
- mod=Mod,
- modstate=ModState}) ->
- %% Construct the filter function
- FilterVNode = proplists:get_value(Index, FilterVNodes),
- Filter = riak_kv_coverage_filter:build_filter(Bucket, ItemFilter, FilterVNode),
- BufferMod = riak_kv_fold_buffer,
- Buffer = BufferMod:new(BufferSize, result_fun(Bucket, Sender)),
- FoldFun = fold_fun(keys, BufferMod, Filter),
- FinishFun = finish_fun(BufferMod, Sender),
- case AsyncBackend of
- true ->
- Opts = [async_fold, {bucket, Bucket}];
- false ->
- Opts = [{bucket, Bucket}]
- end,
- case list(FoldFun, FinishFun, Mod, fold_keys, ModState, Opts, Buffer) of
- {async, AsyncWork} ->
- {async, {fold, AsyncWork, FinishFun}, Sender, State};
- _ ->
- {noreply, State}
- end;
+ FilterVNodes, Sender, State) ->
+ %% v4 == ack-based backpressure
+ ResultFun = result_fun_ack(Bucket, Sender),
+ handle_coverage_listkeys(Bucket, ItemFilter, ResultFun,
+ FilterVNodes, Sender, State);
handle_coverage(?KV_INDEX_REQ{bucket=Bucket,
item_filter=ItemFilter,
qry=Query},
@@ -463,6 +450,34 @@ handle_coverage(?KV_INDEX_REQ{bucket=Bucket,
{reply, {error, {indexes_not_supported, Mod}}, State}
end.
+%% Convenience for handling both v3 and v4 coverage-based listkeys
+handle_coverage_listkeys(Bucket, ItemFilter, ResultFun,
+ FilterVNodes, Sender,
+ State=#state{async_backend=AsyncBackend,
+ idx=Index,
+ key_buf_size=BufferSize,
+ mod=Mod,
+ modstate=ModState}) ->
+ %% Construct the filter function
+ FilterVNode = proplists:get_value(Index, FilterVNodes),
+ Filter = riak_kv_coverage_filter:build_filter(Bucket, ItemFilter, FilterVNode),
+ BufferMod = riak_kv_fold_buffer,
+ Buffer = BufferMod:new(BufferSize, ResultFun),
+ FoldFun = fold_fun(keys, BufferMod, Filter),
+ FinishFun = finish_fun(BufferMod, Sender),
+ case AsyncBackend of
+ true ->
+ Opts = [async_fold, {bucket, Bucket}];
+ false ->
+ Opts = [{bucket, Bucket}]
+ end,
+ case list(FoldFun, FinishFun, Mod, fold_keys, ModState, Opts, Buffer) of
+ {async, AsyncWork} ->
+ {async, {fold, AsyncWork, FinishFun}, Sender, State};
+ _ ->
+ {noreply, State}
+ end.
+
%% While in handoff, vnodes have the option of returning {forward, State}
%% which will cause riak_core to forward the request to the handoff target
%% node. For riak_kv, we issue a put locally as well as forward it in case
@@ -843,6 +858,29 @@ result_fun(Bucket, Sender) ->
riak_core_vnode:reply(Sender, {Bucket, Items})
end.
+%% wait for acknowledgement that results were received before
+%% continuing, as a way of providing backpressure for processes that
+%% can't handle results as fast as we can send them
+result_fun_ack(Bucket, Sender) ->
+ fun(Items) ->
+ Monitor = riak_core_vnode:monitor(Sender),
+ riak_core_vnode:reply(Sender, {{self(), Monitor}, Bucket, Items}),
+ receive
+ {Monitor, ok} ->
+ erlang:demonitor(Monitor, [flush]);
+ {'DOWN', Monitor, process, _Pid, _Reason} ->
+ throw(receiver_down)
+ end
+ end.
+
+%% @doc If a listkeys request sends a result of `{From, Bucket,
+%% Items}', that means it wants acknowledgement of those items before
+%% it will send more. Call this function with that `From' to trigger
+%% the next batch.
+-spec ack_keys(From::{pid(), reference()}) -> term().
+ack_keys({Pid, Ref}) ->
+ Pid ! {Ref, ok}.
+
%% @private
finish_fun(BufferMod, Sender) ->
fun(Buffer) ->
@@ -1333,6 +1371,9 @@ result_listener_keys(Acc) ->
{'$gen_event', {_, done}} ->
result_listener_done(Acc);
{'$gen_event', {_, {_Bucket, Results}}} ->
+ result_listener_keys(Results ++ Acc);
+ {'$gen_event', {_, {From, _Bucket, Results}}} ->
+ riak_kv_vnode:ack_keys(From),
result_listener_keys(Results ++ Acc)
after 5000 ->
result_listener_done({timeout, Acc})
Please sign in to comment.
Something went wrong with that request. Please try again.