Permalink
Browse files

Polling of destination mailbox size: TODO: reword commit msg

  • Loading branch information...
1 parent 0e53a7d commit 6af7883da9fb511dc8969ed8765802e2abc41a05 Scott Lystig Fritchie committed Dec 9, 2011
Showing with 94 additions and 17 deletions.
  1. +38 −6 src/riak_kv_fold_buffer.erl
  2. +49 −4 src/riak_kv_keys_fsm.erl
  3. +7 −7 src/riak_kv_vnode.erl
@@ -29,7 +29,7 @@
-author('Kelly McLaughlin <kelly@basho.com>').
%% Public API
--export([new/2,
+-export([new/3,
add/2,
flush/1,
size/1]).
@@ -42,7 +42,9 @@
-record(buffer, {acc=[] :: [any()],
buffer_fun :: function(),
+ max_msgs :: pos_integer(),
max_size :: pos_integer(),
+ sender_pid :: pid(),
size=0 :: non_neg_integer()}).
-type buffer() :: #buffer{}.
@@ -52,10 +54,13 @@
%% @doc Returns a new buffer with the specified
%% maximum size and buffer function.
--spec new(pos_integer(), fun(([any()]) -> any())) -> buffer().
-new(MaxSize, Fun) ->
+-spec new(pos_integer(), {any(), any(), pid()}, fun(([any()]) -> any())) -> buffer().
+new(MaxSize, {_, _, SenderPid}, Fun) ->
+ MaxMsgs = app_helper:get_env(riak_kv, fold_buffer_max_msgs, 500),
#buffer{buffer_fun=Fun,
- max_size=MaxSize-1}.
+ max_msgs=MaxMsgs,
+ max_size=MaxSize-1,
+ sender_pid=SenderPid}.
%% @doc Add an item to the buffer. If the
%% size of the buffer is equal to the
@@ -66,11 +71,19 @@ new(MaxSize, Fun) ->
-spec add(any(), buffer()) -> buffer().
add(Item, #buffer{acc=Acc,
buffer_fun=Fun,
+ max_msgs=MaxMsgs,
max_size=MaxSize,
+ sender_pid=SenderPid,
size=MaxSize}=Buffer) ->
Fun([Item | Acc]),
- Buffer#buffer{acc=[],
- size=0};
+ NewBuffer = Buffer#buffer{acc=[],
+ size=0},
+ QLen = client_queue_len(SenderPid),
+ if QLen > MaxMsgs ->
+ sleep_and_recheck_client(10, SenderPid, MaxMsgs, NewBuffer);
+ true ->
+ NewBuffer
+ end;
add(Item, #buffer{acc=Acc,
size=Size}=Buffer) ->
Buffer#buffer{acc=[Item | Acc],
@@ -90,6 +103,25 @@ flush(#buffer{acc=Acc,
size(#buffer{size=Size}) ->
Size.
+client_queue_len(SenderPid) ->
+ {_, Len} = rpc:call(node(SenderPid),
+ erlang, process_info, [SenderPid, message_queue_len]),
+ Len.
+
+sleep_and_recheck_client(Delay, SenderPid, MaxMsgs, NewBuffer) ->
+ io:format("fold delay ~p ~p,", [self(), Delay]), %SLF
+ timer:sleep(Delay),
+ case client_queue_len(SenderPid) of
+ N when N > MaxMsgs ->
+ NewDelay = if Delay > 125 -> 250;
+ true -> Delay * 2
+ end,
+ sleep_and_recheck_client(NewDelay, SenderPid, MaxMsgs,
+ NewBuffer);
+ _ ->
+ NewBuffer
+ end.
+
%% ===================================================================
%% EUnit tests
%% ===================================================================
View
@@ -47,7 +47,9 @@
-type req_id() :: non_neg_integer().
-record(state, {client_type :: plain | mapred,
- from :: from()}).
+ from :: from(),
+ timeout_time :: {integer(), integer(), integer()},
+ max_msgs :: integer()}).
%% @doc Return a tuple containing the ModFun to call per vnode,
%% the number of primary preflist vnodes the operation
@@ -61,20 +63,39 @@ init(From={_, _, ClientPid}, [Bucket, ItemFilter, Timeout, ClientType]) ->
_ ->
ok
end,
+ {NowA, NowB, NowC} = now(),
+ TimeoutTime = if Timeout == infinity ->
+ {NowA + 999999999999, NowB, NowC}; % almost infinity
+ is_integer(Timeout) ->
+ {NowA, NowB, NowC + (Timeout * 1000)}
+ end,
%% Get the bucket n_val for use in creating a coverage plan
BucketProps = riak_core_bucket:get_bucket(Bucket),
NVal = proplists:get_value(n_val, BucketProps),
%% Construct the key listing request
Req = ?KV_LISTKEYS_REQ{bucket=Bucket,
item_filter=ItemFilter},
+ %% Fetch current queue len: may be non-zero before we start anything here
+ Len = client_queue_len(ClientPid),
+ MaxMsgs = app_helper:get_env(riak_kv, fold_buffer_max_msgs, 500) + Len,
{Req, all, NVal, 1, riak_kv, riak_kv_vnode_master, Timeout,
- #state{client_type=ClientType, from=From}}.
+ #state{client_type=ClientType,
+ from=From,
+ timeout_time=TimeoutTime,
+ max_msgs=MaxMsgs}}.
process_results({Bucket, Keys},
StateData=#state{client_type=ClientType,
- from={raw, ReqId, ClientPid}}) ->
+ from={raw, ReqId, ClientPid},
+ timeout_time=TOT,
+ max_msgs=MaxMsgs}) ->
process_keys(ClientType, Bucket, Keys, ReqId, ClientPid),
- {ok, StateData};
+ XXX = client_queue_len(ClientPid),
+ if XXX > MaxMsgs ->
+ sleep_and_recheck_client(10, TOT, ClientPid, MaxMsgs, StateData);
+ true ->
+ {ok, StateData}
+ end;
process_results(done, StateData) ->
{done, StateData};
process_results({error, Reason}, _State) ->
@@ -118,3 +139,27 @@ process_keys(mapred, Bucket, Keys, _ReqId, ClientPid) ->
catch _:_ ->
exit(self(), normal)
end.
+
+client_queue_len(ClientPid) ->
+ {_, Len} = rpc:call(node(ClientPid),
+ erlang, process_info, [ClientPid, message_queue_len]),
+ Len.
+
+sleep_and_recheck_client(Delay, TOT, ClientPid, MaxMsgs, StateData) ->
+ case timer:now_diff(now(), TOT) of
+ Diff when Diff > 0 ->
+ {error, timeout}; % Absolute wall clock timeout
+ _ ->
+ io:format("KeysFSM ~p/~p,", [Delay, element(2, process_info(self(), message_queue_len))]),
+ timer:sleep(Delay),
+ case client_queue_len(ClientPid) of
+ N when N > MaxMsgs ->
+ NewDelay = if Delay > 125 -> 250;
+ true -> Delay * 2
+ end,
+ sleep_and_recheck_client(NewDelay, TOT, ClientPid, MaxMsgs,
+ StateData);
+ _ ->
+ {ok, StateData}
+ end
+ end.
View
@@ -266,7 +266,7 @@ handle_command(?KV_MGET_REQ{bkeys=BKeys, req_id=ReqId, from=From}, _Sender, Stat
handle_command(#riak_kv_listkeys_req_v1{bucket=Bucket, req_id=ReqId}, _Sender,
State=#state{mod=Mod, modstate=ModState, idx=Idx}) ->
do_legacy_list_bucket(ReqId,Bucket,Mod,ModState,Idx,State);
-handle_command(#riak_kv_listkeys_req_v2{bucket=Input, req_id=ReqId, caller=Caller}, _Sender,
+handle_command(#riak_kv_listkeys_req_v2{bucket=Input, req_id=ReqId, caller=Caller}, Sender,
State=#state{async_backend=AsyncBackend,
key_buf_size=BufferSize,
mod=Mod,
@@ -308,7 +308,7 @@ handle_command(#riak_kv_listkeys_req_v2{bucket=Input, req_id=ReqId, caller=Calle
FoldFun = fold_fun(keys, BufferMod, Filter),
ModFun = fold_keys
end,
- Buffer = BufferMod:new(BufferSize, BufferFun),
+ Buffer = BufferMod:new(BufferSize, Sender, BufferFun),
FinishFun =
fun(Buffer1) ->
riak_kv_fold_buffer:flush(Buffer1),
@@ -384,7 +384,7 @@ handle_coverage(?KV_LISTBUCKETS_REQ{item_filter=ItemFilter},
%% Construct the filter function
Filter = riak_kv_coverage_filter:build_filter(all, ItemFilter, undefined),
BufferMod = riak_kv_fold_buffer,
- Buffer = BufferMod:new(BufferSize, result_fun(Sender)),
+ Buffer = BufferMod:new(BufferSize, Sender, result_fun(Sender)),
FoldFun = fold_fun(buckets, BufferMod, Filter),
FinishFun = finish_fun(BufferMod, Sender),
case AsyncBackend of
@@ -412,7 +412,7 @@ handle_coverage(?KV_LISTKEYS_REQ{bucket=Bucket,
FilterVNode = proplists:get_value(Index, FilterVNodes),
Filter = riak_kv_coverage_filter:build_filter(Bucket, ItemFilter, FilterVNode),
BufferMod = riak_kv_fold_buffer,
- Buffer = BufferMod:new(BufferSize, result_fun(Bucket, Sender)),
+ Buffer = BufferMod:new(BufferSize, Sender, result_fun(Bucket, Sender)),
FoldFun = fold_fun(keys, BufferMod, Filter),
FinishFun = finish_fun(BufferMod, Sender),
case AsyncBackend of
@@ -444,7 +444,7 @@ handle_coverage(?KV_INDEX_REQ{bucket=Bucket,
FilterVNode = proplists:get_value(Index, FilterVNodes),
Filter = riak_kv_coverage_filter:build_filter(Bucket, ItemFilter, FilterVNode),
BufferMod = riak_kv_fold_buffer,
- Buffer = BufferMod:new(BufferSize, result_fun(Bucket, Sender)),
+ Buffer = BufferMod:new(BufferSize, Sender, result_fun(Bucket, Sender)),
FoldFun = fold_fun(keys, BufferMod, Filter),
FinishFun = finish_fun(BufferMod, Sender),
case AsyncBackend of
@@ -799,8 +799,8 @@ list(FoldFun, FinishFun, Mod, ModFun, ModState, Opts, Buffer) ->
case Mod:ModFun(FoldFun, Buffer, Opts, ModState) of
{ok, Acc} ->
FinishFun(Acc);
- {async, AsyncWork} ->
- {async, AsyncWork}
+ {async, _AsyncWork} = AsyncReply ->
+ AsyncReply
end.
%% @private

0 comments on commit 6af7883

Please sign in to comment.