Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Port Anti-Entropy Manager to use background manager (locks and tokens) #645

Open
wants to merge 6 commits into from
This page is out of date. Refresh to see the latest.
Showing with 52 additions and 46 deletions.
  1. +52 −46 src/riak_kv_entropy_manager.erl
98 src/riak_kv_entropy_manager.erl
View
@@ -51,8 +51,6 @@
-record(state, {mode = automatic :: automatic | manual,
trees = [] :: [{index(), pid()}],
tree_queue = [] :: [{index(), pid()}],
- locks = [] :: [{pid(), reference()}],
- build_tokens = 0 :: non_neg_integer(),
exchange_queue = [] :: [exchange()],
exchanges = [] :: [{index(), reference(), pid()}]
}).
@@ -191,16 +189,15 @@ init([]) ->
false ->
automatic
end,
+ set_lock_maximums(),
+ set_build_token_limits(),
set_debug(proplists:is_defined(debug, Opts)),
State = #state{mode=Mode,
trees=[],
tree_queue=[],
- locks=[],
exchanges=[],
exchange_queue=[]},
- State2 = reset_build_tokens(State),
- schedule_reset_build_tokens(),
- {ok, State2}.
+ {ok, State}.
handle_call({set_mode, Mode}, _From, State) ->
{reply, ok, State#state{mode=Mode}};
@@ -266,10 +263,6 @@ handle_cast(_Msg, State) ->
handle_info(tick, State) ->
State2 = maybe_tick(State),
{noreply, State2};
-handle_info(reset_build_tokens, State) ->
- State2 = reset_build_tokens(State),
- schedule_reset_build_tokens(),
- {noreply, State2};
handle_info({{hashtree_pid, Index}, Reply}, State) ->
case Reply of
{ok, Pid} when is_pid(Pid) ->
@@ -279,10 +272,9 @@ handle_info({{hashtree_pid, Index}, Reply}, State) ->
{noreply, State}
end;
handle_info({'DOWN', Ref, _, Obj, Status}, State) ->
- State2 = maybe_release_lock(Ref, State),
- State3 = maybe_clear_exchange(Ref, Status, State2),
- State4 = maybe_clear_registered_tree(Obj, State3),
- {noreply, State4};
+ State2 = maybe_clear_exchange(Ref, Status, State),
+ State3 = maybe_clear_registered_tree(Obj, State2),
+ {noreply, State3};
handle_info(_Info, State) ->
{noreply, State}.
@@ -296,15 +288,29 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%===================================================================
-schedule_reset_build_tokens() ->
- {_, Reset} = app_helper:get_env(riak_kv, anti_entropy_build_limit,
- ?DEFAULT_BUILD_LIMIT),
- erlang:send_after(Reset, self(), reset_build_tokens).
-
-reset_build_tokens(State) ->
- {Tokens, _} = app_helper:get_env(riak_kv, anti_entropy_build_limit,
- ?DEFAULT_BUILD_LIMIT),
- State#state{build_tokens=Tokens}.
+%% Concurrency locks are ruled by one limit, which is the
+%% anti_entropy_concurrency limit. The sum of build, rehash, and
+%% exchange locks should never exceed anti_entropy_concurrency.
+%% Instead of implementing a complicated "set" or "child" model of
+%% locks, we just set them each to the limit; and first get a lock
+%% on anti_entropy_concurrency.
+set_lock_maximums() ->
+ Concurrency = app_helper:get_env(riak_kv,
+ anti_entropy_concurrency,
+ ?DEFAULT_CONCURRENCY),
+ KillProcsIfLimitsReached = true,
+ [ riak_core_bg_manager:set_concurrency_limit(LockType,
+ Concurrency,
+ KillProcsIfLimitsReached)
+ || LockType <- [anti_entropy_concurrency, build, rehash, exchange] ].
+
+%% AAE build work is constrained by limiting the number of available work "token"
+%% per period of time. AAE builds trees at a rate governed by anti_entropy_build_limit.
+%% The background manager tracks tokens given and refills them at the requested rate.
+set_build_token_limits() ->
+ {Rate, Limit} = app_helper:get_env(riak_kv, anti_entropy_build_limit,
+ ?DEFAULT_BUILD_LIMIT),
+ riak_core_bg_manager:set_token_rate(aae_build_token, {Rate, Limit}).
-spec settings() -> {boolean(), proplists:proplist()}.
settings() ->
@@ -361,37 +367,37 @@ add_hashtree_pid(true, Index, Pid, State=#state{trees=Trees}) ->
-spec do_get_lock(any(),pid(),state())
-> {ok | max_concurrency | build_limit_reached, state()}.
-do_get_lock(Type, Pid, State=#state{locks=Locks}) ->
- Concurrency = app_helper:get_env(riak_kv,
- anti_entropy_concurrency,
- ?DEFAULT_CONCURRENCY),
- case length(Locks) >= Concurrency of
- true ->
+do_get_lock(Type, Pid, State) ->
+ %% Get the parent lock of all AAE locks, which enforces the limit
+ %% of any AAE manager operations, e.g. build, rehash, exchange.
+ case riak_core_bg_manager:get_lock(anti_entropy_concurrency) of
+ max_concurrency ->
{max_concurrency, State};
- false ->
- case check_lock_type(Type, State) of
- {ok, State2} ->
- Ref = monitor(process, Pid),
- State3 = State2#state{locks=[{Pid,Ref}|Locks]},
- {ok, State3};
+ ok ->
+ %% possibly impose other checks, like build token availability
+ case check_lock_type(Type, Pid) of
+ ok ->
+ %% This can't fail, because we already got the
+ %% the anti-entropy lock, but test anyhow!
+ case riak_core_bg_manager:get_lock(Type, Pid) of
+ ok -> {ok, State};
+ Err -> {Err, State}
+ end;
Error ->
{Error, State}
end
end.
-check_lock_type(build, State=#state{build_tokens=Tokens}) ->
- if Tokens > 0 ->
- {ok, State#state{build_tokens=Tokens-1}};
- true ->
+check_lock_type(build, Pid) ->
+ %% non-blocking get of build token; was registered by init()
+ case riak_core_bg_manager:get_token(aae_build_token, Pid) of
+ ok ->
+ ok;
+ max_tokens ->
build_limit_reached
end;
-check_lock_type(_Type, State) ->
- {ok, State}.
-
--spec maybe_release_lock(reference(), state()) -> state().
-maybe_release_lock(Ref, State) ->
- Locks = lists:keydelete(Ref, 2, State#state.locks),
- State#state{locks=Locks}.
+check_lock_type(_Type, _Pid) ->
+ ok.
-spec maybe_clear_exchange(reference(), term(), state()) -> state().
maybe_clear_exchange(Ref, Status, State) ->
Something went wrong with that request. Please try again.