Skip to content

Commit

Permalink
Guard table during transfers and fix over-matching delete that remove…
Browse files Browse the repository at this point in the history
…d multiple given entries for the same resource.
  • Loading branch information
buddhisthead committed Nov 20, 2013
1 parent 978e466 commit f67b60e
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 19 deletions.
71 changes: 54 additions & 17 deletions src/riak_core_bg_manager.erl
Expand Up @@ -518,11 +518,11 @@ handle_call(lock_info, _From, State) ->
handle_call({lock_info, Lock}, _From, State) ->
do_handle_call_exception(fun do_resource_info/2, [Lock, State], State);
handle_call({concurrency_limit, Lock}, _From, State) ->
do_handle_call_exception(fun do_resource_limit/2, [Lock, State], State);
do_handle_call_exception(fun do_resource_limit/3, [lock, Lock, State], State);
handle_call({set_concurrency_limit, Lock, Limit, Kill}, _From, State) ->
do_set_concurrency_limit(Lock, Limit, Kill, State);
handle_call({token_rate, Token}, _From, State) ->
do_handle_call_exception(fun do_resource_limit/2, [Token, State], State);
do_handle_call_exception(fun do_resource_limit/3, [token, Token, State], State);
handle_call(token_info, _From, State) ->
do_handle_call_exception(fun do_get_type_info/2, [token, State], State);
handle_call({token_info, Token}, _From, State) ->
Expand Down Expand Up @@ -623,6 +623,7 @@ code_change(_OldVsn, State, _Extra) ->
%% @private
%% @doc We must have just gotten the table data back after a crash/restart.
%% Walk through the given resources and release any holds by dead processes.
%% Assumes TableId is always valid (called only after transfer)
validate_holds(State=#state{table_id=TableId}) ->
[validate_hold(Obj, TableId) || Obj <- ets:match_object(TableId, {{given, '_'},'_'})],
State.
Expand All @@ -632,14 +633,15 @@ validate_holds(State=#state{table_id=TableId}) ->
%% remove the hold from the ETS table. If it is alive, then we need
%% to re-monitor it and update the table with the new ref.
validate_hold({Key,Entry}=Obj, TableId) when ?e_type(Entry) == lock ->
%% If the process is not alive, release the lock
case is_process_alive(?e_pid(Entry)) of
true ->
%% Still alive. Re-monitor and update table
Ref = monitor(process, ?e_pid(Entry)),
Entry2 = Entry#resource_entry{ref=Ref},
{given,Resource} = Key,
update_given_entry(Resource, Entry2, TableId);
ets:delete_object(TableId, Obj),
ets:insert(TableId, {Key, Entry2});
false ->
%% Process is not alive - release the lock
ets:delete_object(TableId, Obj)
end;
validate_hold(_Obj, _TableId) -> %% tokens don't monitor processes
Expand Down Expand Up @@ -670,6 +672,8 @@ do_handle_call_exception(Function, Args, State) ->
end.

%% @doc Throws {unregistered, Resource} for unknown Lock.
do_disable_lock(_Lock, _Kill, State=#state{table_id=undefined}) ->
{noreply, State};
do_disable_lock(Lock, Kill, State) ->
Info = resource_info(Lock, State),
enforce_type_or_throw(Lock, lock, Info),
Expand All @@ -688,19 +692,30 @@ do_set_token_rate(Token, Rate, State) ->
State3 = maybe_unblock_blocked(Token, State2),
{reply, OldRate, State3}
catch
table_id_undefined ->
%% This could go into a queue to be played when the transfer happens.
{reply, {undefined, 0}, State};
{unregistered, Token} ->
{reply, {0, 0}, update_limit(Token, Rate, ?DEFAULT_TOKEN_INFO, State)};
{reply, {undefined, 0}, update_limit(Token, Rate, ?DEFAULT_TOKEN_INFO, State)};
{badtype, _Token}=Error ->
{reply, Error, State}
end.

do_get_type_info(_Type, #state{table_id=undefined}) ->
%% Table not trasnferred yet.
[];
do_get_type_info(Type, State) ->
S = fun({R,_T,E,L}) -> {R,E,L} end,
Resources = all_registered_resources(Type, State),
Infos = [S(resource_info_tuple(Resource, State)) || Resource <- Resources],
{reply, Infos, State}.

do_resource_limit(Resource, State) ->
%% Returns empty if the ETS table has not been transferred to us yet.
do_resource_limit(lock, _Resource, State=#state{table_id=undefined}) ->
{reply, 0, State};
do_resource_limit(token, _Resource, State=#state{table_id=undefined}) ->
{reply, {0,0}, State};
do_resource_limit(_Type, Resource, State) ->
Info = resource_info(Resource, State),
Rate = ?resource_limit(Info),
{reply, Rate, State}.
Expand All @@ -720,6 +735,9 @@ do_set_concurrency_limit(Lock, Limit, Kill, State) ->
maybe_honor_limit(Kill, Lock, Limit, State2),
{reply, OldLimit, State2}
catch
table_id_undefined ->
%% This could go into a queue to be played when the transfer happens.
{reply, 0, State};
{unregistered, Lock} ->
{reply, 0, update_limit(Lock, Limit, ?DEFAULT_LOCK_INFO, State)};
{badtype, _Lock}=Error ->
Expand Down Expand Up @@ -748,6 +766,8 @@ limit(#resource_info{type=token, limit={_Period,MaxCount}}) -> MaxCount.
%% @private
%% @doc Release the resource associated with the given reference. This is mostly
%% meaningful for locks.
release_resource(_Ref, State=#state{table_id=undefined}) ->
State;
release_resource(Ref, State=#state{table_id=TableId}) ->
%% There should only be one instance of the object, but we'll zap all that match.
Given = [Obj || Obj <- ets:match_object(TableId, {{given, '_'},'_'})],
Expand Down Expand Up @@ -804,6 +824,8 @@ update_resource_info(Resource, Fun, Default, State=#state{table_id=TableId}) ->
State.

%% @doc Throws unregistered for unknown Resource
resource_info(_Resource, #state{table_id=undefined}) ->
throw(table_id_undefined);
resource_info(Resource, #state{table_id=TableId}) ->
Key = {info,Resource},
case ets:lookup(TableId, Key) of
Expand Down Expand Up @@ -870,15 +892,22 @@ do_reply_timeout(Resource, From, State) ->
%% @doc
%% Get existing token type info from ETS table and schedule all for refill.
%% This is needed because we just reloaded our saved persisent state data
%% after a crash.
%% after a crash. Assumes table is available. Called only after Transfer.
reschedule_token_refills(State) ->
Tokens = all_registered_resources(token, State),
[schedule_refill_tokens(Token, State) || Token <- Tokens].

%% Schedule a timer event to refill tokens of given type
schedule_refill_tokens(_Token, #state{table_id=undefined}) ->
ok;
schedule_refill_tokens(Token, State) ->
{Period, _Count} = ?resource_limit(resource_info(Token, State)),
erlang:send_after(Period, self(), {refill_tokens, Token}).
case Period of
undefined ->
ok;
_P ->
erlang:send_after(Period, self(), {refill_tokens, Token})
end.

%% Schedule a timer event to snapshot the current history
schedule_sample_history(State=#state{window_interval=Interval}) ->
Expand Down Expand Up @@ -921,12 +950,6 @@ remove_given_entries(Resource, #state{table_id=TableId}) ->
Key = {given, Resource},
ets:delete(TableId, Key).

update_given_entry(Resource, Entry, TableId) ->
Key = {given, Resource},
%% TODO: lock? maybe use ets:update_element() instead?
ets:delete(TableId, Key),
ets:insert(TableId, {Key, Entry}).

%% @private
%% @doc Add a resource queue entry to our given set.
give_resource(Entry, State=#state{table_id=TableId}) ->
Expand Down Expand Up @@ -959,6 +982,10 @@ try_get_resource(true, Resource, Type, Pid, Meta, State) ->
%% @private
%% @doc reply now if resource is available. Returns max_concurrency
%% if resource not available or globally or specifically disabled.
do_get_resource(_Resource, _Type, _Pid, _Meta, State=#state{table_id=undefined}) ->
%% Table transfer has not occurred yet. Reply "max_concurrency" so that callers
%% will try back later, hopefully when we have our table back.
{reply, max_concurrency, State};
do_get_resource(_Resource, _Type, _Pid, _Meta, State=#state{enabled=false}) ->
{reply, max_concurrency, State};
do_get_resource(Resource, Type, Pid, Meta, State) ->
Expand All @@ -967,7 +994,8 @@ do_get_resource(Resource, Type, Pid, Meta, State) ->
Enabled = ?resource_enabled(Info),
Limit = limit(Info),
Given = length(resources_given(Resource, State)),
{Result, State2} = try_get_resource(Enabled andalso not (Given >= Limit), Resource, Type, Pid, Meta, State),
{Result, State2} = try_get_resource(Enabled andalso not (Given >= Limit),
Resource, Type, Pid, Meta, State),
{reply, Result, State2}.

%% @private
Expand Down Expand Up @@ -1002,8 +1030,12 @@ blocked_queue(Resource, #state{table_id=TableId}) ->
[{Key,Queue} | _Rest] -> Queue
end.

%% @private
%% @doc This should create a unique reference every time it's called; and should
%% not repeat across process restarts since our ETS table lives across process
%% lifetimes. This is needed to create unique entries in the "given" table.
random_bogus_ref() ->
random:uniform().
make_ref().

%% @private
%% @doc Put a resource request on the blocked queue. We'll reply later when resources
Expand Down Expand Up @@ -1047,6 +1079,9 @@ fmt_live_entries(Entries) ->
[format_entry(Entry) || Entry <- Entries].

%% States :: [given | blocked], Types :: [lock | token]
do_query(_Resource, _States, _Types, #state{table_id=undefined}) ->
%% Table hasn't been transfered yet.
[];
do_query(all, States, Types, State) ->
E1 = case lists:member(given, States) of
true ->
Expand Down Expand Up @@ -1128,6 +1163,8 @@ do_clear_history(State=#state{window_tref=TRef}) ->
schedule_sample_history(State2).

%% Return stats history from head or tail of stats history queue
do_hist(_End, _TokenType, _Offset, _Count, #state{table_id=undefined}) ->
[];
do_hist(End, TokenType, Offset, Count, State) when Offset =< 0 ->
do_hist(End, TokenType, 1, Count, State);
do_hist(End, TokenType, Offset, Count, State) when Count =< 0 ->
Expand Down
2 changes: 1 addition & 1 deletion test/bg_manager_eqc.erl
Expand Up @@ -491,7 +491,7 @@ is_alive(#state{alive=Alive}) ->
mk_token_rate({unregistered, _}=Unreg) ->
Unreg;
mk_token_rate(0) ->
{0, 0};
{undefined, 0};
mk_token_rate(Count) ->
%% 4294967295 is erlang:send_after max which is used for token refilling
{4294967295, Count}.
Expand Down
2 changes: 1 addition & 1 deletion test/bg_manager_tests.erl
Expand Up @@ -277,7 +277,7 @@ bg_mgr_test_() ->
%% Trying to set the rate on a token of the wrong type looks
%% like an unregistered token.
?assertEqual({unregistered, token_a}, riak_core_bg_manager:get_token(token_a)),
?assertEqual({0,0}, riak_core_bg_manager:set_token_rate(token_a, {1,5})),
?assertEqual({undefined,0}, riak_core_bg_manager:set_token_rate(token_a, {1,5})),
?assertEqual({badtype, token_a},
riak_core_bg_manager:set_concurrency_limit(token_a, 42)),

Expand Down

0 comments on commit f67b60e

Please sign in to comment.