Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port Anti-Entropy Manager to use background manager (locks and tokens) #645

Closed
wants to merge 6 commits into from
98 changes: 52 additions & 46 deletions src/riak_kv_entropy_manager.erl
Expand Up @@ -51,8 +51,6 @@
-record(state, {mode = automatic :: automatic | manual, -record(state, {mode = automatic :: automatic | manual,
trees = [] :: [{index(), pid()}], trees = [] :: [{index(), pid()}],
tree_queue = [] :: [{index(), pid()}], tree_queue = [] :: [{index(), pid()}],
locks = [] :: [{pid(), reference()}],
build_tokens = 0 :: non_neg_integer(),
exchange_queue = [] :: [exchange()], exchange_queue = [] :: [exchange()],
exchanges = [] :: [{index(), reference(), pid()}] exchanges = [] :: [{index(), reference(), pid()}]
}). }).
Expand Down Expand Up @@ -191,16 +189,15 @@ init([]) ->
false -> false ->
automatic automatic
end, end,
set_lock_maximums(),
set_build_token_limits(),
set_debug(proplists:is_defined(debug, Opts)), set_debug(proplists:is_defined(debug, Opts)),
State = #state{mode=Mode, State = #state{mode=Mode,
trees=[], trees=[],
tree_queue=[], tree_queue=[],
locks=[],
exchanges=[], exchanges=[],
exchange_queue=[]}, exchange_queue=[]},
State2 = reset_build_tokens(State), {ok, State}.
schedule_reset_build_tokens(),
{ok, State2}.


handle_call({set_mode, Mode}, _From, State) -> handle_call({set_mode, Mode}, _From, State) ->
{reply, ok, State#state{mode=Mode}}; {reply, ok, State#state{mode=Mode}};
Expand Down Expand Up @@ -266,10 +263,6 @@ handle_cast(_Msg, State) ->
handle_info(tick, State) -> handle_info(tick, State) ->
State2 = maybe_tick(State), State2 = maybe_tick(State),
{noreply, State2}; {noreply, State2};
handle_info(reset_build_tokens, State) ->
State2 = reset_build_tokens(State),
schedule_reset_build_tokens(),
{noreply, State2};
handle_info({{hashtree_pid, Index}, Reply}, State) -> handle_info({{hashtree_pid, Index}, Reply}, State) ->
case Reply of case Reply of
{ok, Pid} when is_pid(Pid) -> {ok, Pid} when is_pid(Pid) ->
Expand All @@ -279,10 +272,9 @@ handle_info({{hashtree_pid, Index}, Reply}, State) ->
{noreply, State} {noreply, State}
end; end;
handle_info({'DOWN', Ref, _, Obj, Status}, State) -> handle_info({'DOWN', Ref, _, Obj, Status}, State) ->
State2 = maybe_release_lock(Ref, State), State2 = maybe_clear_exchange(Ref, Status, State),
State3 = maybe_clear_exchange(Ref, Status, State2), State3 = maybe_clear_registered_tree(Obj, State2),
State4 = maybe_clear_registered_tree(Obj, State3), {noreply, State3};
{noreply, State4};
handle_info(_Info, State) -> handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.


Expand All @@ -296,15 +288,29 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions %%% Internal functions
%%%=================================================================== %%%===================================================================


schedule_reset_build_tokens() -> %% Concurrency locks are ruled by one limit, which is the
{_, Reset} = app_helper:get_env(riak_kv, anti_entropy_build_limit, %% anti_entropy_concurrency limit. The sum of build, rehash, and
?DEFAULT_BUILD_LIMIT), %% exchange locks should never exceed anti_entropy_concurrency.
erlang:send_after(Reset, self(), reset_build_tokens). %% Instead of implementing a complicated "set" or "child" model of

%% locks, we just set them each to the limit; and first get a lock
reset_build_tokens(State) -> %% on anti_entropy_concurrency.
{Tokens, _} = app_helper:get_env(riak_kv, anti_entropy_build_limit, set_lock_maximums() ->
?DEFAULT_BUILD_LIMIT), Concurrency = app_helper:get_env(riak_kv,
State#state{build_tokens=Tokens}. anti_entropy_concurrency,
?DEFAULT_CONCURRENCY),
KillProcsIfLimitsReached = true,
[ riak_core_bg_manager:set_concurrency_limit(LockType,
Concurrency,
KillProcsIfLimitsReached)
|| LockType <- [anti_entropy_concurrency, build, rehash, exchange] ].

%% AAE build work is constrained by limiting the number of available work "token"
%% per period of time. AAE builds trees at a rate governed by anti_entropy_build_limit.
%% The background manager tracks tokens given and refills them at the requested rate.
set_build_token_limits() ->
{Rate, Limit} = app_helper:get_env(riak_kv, anti_entropy_build_limit,
?DEFAULT_BUILD_LIMIT),
riak_core_bg_manager:set_token_rate(aae_build_token, {Rate, Limit}).


-spec settings() -> {boolean(), proplists:proplist()}. -spec settings() -> {boolean(), proplists:proplist()}.
settings() -> settings() ->
Expand Down Expand Up @@ -361,37 +367,37 @@ add_hashtree_pid(true, Index, Pid, State=#state{trees=Trees}) ->


-spec do_get_lock(any(),pid(),state()) -spec do_get_lock(any(),pid(),state())
-> {ok | max_concurrency | build_limit_reached, state()}. -> {ok | max_concurrency | build_limit_reached, state()}.
do_get_lock(Type, Pid, State=#state{locks=Locks}) -> do_get_lock(Type, Pid, State) ->
Concurrency = app_helper:get_env(riak_kv, %% Get the parent lock of all AAE locks, which enforces the limit
anti_entropy_concurrency, %% of any AAE manager operations, e.g. build, rehash, exchange.
?DEFAULT_CONCURRENCY), case riak_core_bg_manager:get_lock(anti_entropy_concurrency) of
case length(Locks) >= Concurrency of max_concurrency ->
true ->
{max_concurrency, State}; {max_concurrency, State};
false -> ok ->
case check_lock_type(Type, State) of %% possibly impose other checks, like build token availability
{ok, State2} -> case check_lock_type(Type, Pid) of
Ref = monitor(process, Pid), ok ->
State3 = State2#state{locks=[{Pid,Ref}|Locks]}, %% This can't fail, because we already got the
{ok, State3}; %% the anti-entropy lock, but test anyhow!
case riak_core_bg_manager:get_lock(Type, Pid) of
ok -> {ok, State};
Err -> {Err, State}
end;
Error -> Error ->
{Error, State} {Error, State}
end end
end. end.


check_lock_type(build, State=#state{build_tokens=Tokens}) -> check_lock_type(build, Pid) ->
if Tokens > 0 -> %% non-blocking get of build token; was registered by init()
{ok, State#state{build_tokens=Tokens-1}}; case riak_core_bg_manager:get_token(aae_build_token, Pid) of
true -> ok ->
ok;
max_tokens ->
build_limit_reached build_limit_reached
end; end;
check_lock_type(_Type, State) -> check_lock_type(_Type, _Pid) ->
{ok, State}. ok.

-spec maybe_release_lock(reference(), state()) -> state().
maybe_release_lock(Ref, State) ->
Locks = lists:keydelete(Ref, 2, State#state.locks),
State#state{locks=Locks}.


-spec maybe_clear_exchange(reference(), term(), state()) -> state(). -spec maybe_clear_exchange(reference(), term(), state()) -> state().
maybe_clear_exchange(Ref, Status, State) -> maybe_clear_exchange(Ref, Status, State) ->
Expand Down