Skip to content

Commit

Permalink
AAE Manager now uses token manager for build tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
buddhisthead committed Aug 30, 2013
1 parent 3e58d06 commit 59a529d
Showing 1 changed file with 21 additions and 27 deletions.
48 changes: 21 additions & 27 deletions src/riak_kv_entropy_manager.erl
Expand Up @@ -51,7 +51,6 @@
-record(state, {mode = automatic :: automatic | manual,
trees = [] :: [{index(), pid()}],
tree_queue = [] :: [{index(), pid()}],
build_tokens = 0 :: non_neg_integer(),
exchange_queue = [] :: [exchange()],
exchanges = [] :: [{index(), reference(), pid()}]
}).
Expand Down Expand Up @@ -191,15 +190,14 @@ init([]) ->
automatic
end,
set_lock_maximums(),
set_build_token_limits(),
set_debug(proplists:is_defined(debug, Opts)),
State = #state{mode=Mode,
trees=[],
tree_queue=[],
exchanges=[],
exchange_queue=[]},
State2 = reset_build_tokens(State),
schedule_reset_build_tokens(),
{ok, State2}.
{ok, State}.

handle_call({set_mode, Mode}, _From, State) ->
{reply, ok, State#state{mode=Mode}};
Expand Down Expand Up @@ -265,10 +263,6 @@ handle_cast(_Msg, State) ->
handle_info(tick, State) ->
State2 = maybe_tick(State),
{noreply, State2};
handle_info(reset_build_tokens, State) ->
State2 = reset_build_tokens(State),
schedule_reset_build_tokens(),
{noreply, State2};
handle_info({{hashtree_pid, Index}, Reply}, State) ->
case Reply of
{ok, Pid} when is_pid(Pid) ->
Expand Down Expand Up @@ -310,15 +304,13 @@ set_lock_maximums() ->
KillProcsIfLimitsReached)
|| LockType <- [anti_entropy_concurrency, build, rehash, exchange] ].

schedule_reset_build_tokens() ->
{_, Reset} = app_helper:get_env(riak_kv, anti_entropy_build_limit,
?DEFAULT_BUILD_LIMIT),
erlang:send_after(Reset, self(), reset_build_tokens).

reset_build_tokens(State) ->
{Tokens, _} = app_helper:get_env(riak_kv, anti_entropy_build_limit,
?DEFAULT_BUILD_LIMIT),
State#state{build_tokens=Tokens}.
%% AAE build work is constrained by limiting the number of available work "token"
%% per period of time. AAE builds trees at a rate governed by anti_entropy_build_limit.
%% The background manager tracks tokens given and refills them at the requested rate.
set_build_token_limits() ->
{Rate, Limit} = app_helper:get_env(riak_kv, anti_entropy_build_limit,
?DEFAULT_BUILD_LIMIT),
riak_core_bg_manager:set_token_rate(aae_build_token, {Rate, Limit}).

-spec settings() -> {boolean(), proplists:proplist()}.
settings() ->
Expand Down Expand Up @@ -383,27 +375,29 @@ do_get_lock(Type, Pid, State) ->
{max_concurrency, State};
ok ->
%% possibly impose other checks, like build token availability
case check_lock_type(Type, State) of
{ok, State2} ->
case check_lock_type(Type, Pid) of
ok ->
%% This can't fail, because we already got the
%% the anti-entropy lock, but test anyhow!
case riak_core_bg_manager:get_lock(Type, Pid) of
ok -> {ok, State2};
Err -> {Err, State2}
ok -> {ok, State};
Err -> {Err, State}
end;
Error ->
{Error, State}
end
end.

check_lock_type(build, State=#state{build_tokens=Tokens}) ->
if Tokens > 0 ->
{ok, State#state{build_tokens=Tokens-1}};
true ->
check_lock_type(build, Pid) ->
%% non-blocking get of build token; was registered by init()
case riak_core_bg_manager:get_token(aae_build_token, Pid) of
ok ->
ok;
max_tokens ->
build_limit_reached
end;
check_lock_type(_Type, State) ->
{ok, State}.
check_lock_type(_Type, _Pid) ->
ok.

-spec maybe_clear_exchange(reference(), term(), state()) -> state().
maybe_clear_exchange(Ref, Status, State) ->
Expand Down

0 comments on commit 59a529d

Please sign in to comment.