Refactor'd token/lock manager. #412

Closed
wants to merge 30 commits into
from

Projects

None yet

3 participants

@buddhisthead
Contributor

This is a refactor'd branch of cet-bg-mgr and previous PR #364

The integration with the handoff manager has been removed and will appear in a separate PR.

The work here merges the lock and token managers and addresses previous PR comments.
Also added are lots more tests.

@jrwest
Contributor
jrwest commented Nov 11, 2013

@buddhisthead the function below illustrates the case I mentioned on mumble where a lock is held indefinitely if a process holding the lock dies while the background manager is dead.

hold_forever() ->
    {ok, P1} = riak_core_table_manager:start_link([{background_mgr_table,
                                                  [protected, named_table, bag]}]),
    unlink(P1),
    {ok, P2} = riak_core_bg_manager:start_link(),
    unlink(P2),
    riak_core_bg_manager:set_concurrency_limit(a, 1),
    P3 = spawn(fun() ->
                       riak_core_bg_manager:get_lock(a),
                       receive
                           kill ->
                               io:format("I'm Dead~n")
                       end
               end),
    io:format("REACHED: ~p~n", [riak_core_bg_manager:concurrency_limit_reached(a)]),
    io:format("TABLE: ~p~n", [ets:tab2list(background_mgr_table)]),
    exit(whereis(riak_core_bg_manager), kill),
    io:format("After Kill~n~n"),
    io:format("TABLE: ~p~n", [ets:tab2list(background_mgr_table)]),
    P3 ! kill,
    timer:sleep(1000),
    io:format("TABLE: ~p~n", [ets:tab2list(background_mgr_table)]),
    riak_core_bg_manager:start_link(),
    io:format("REACHED: ~p~n", [riak_core_bg_manager:concurrency_limit_reached(a)]),
    io:format("TABLE: ~p~n", [ets:tab2list(background_mgr_table)]).
@jrwest jrwest and 1 other commented on an outdated diff Nov 12, 2013
src/riak_core_bg_manager.erl
+ {{value, Entry}, Queue2} ->
+ %% account for given resource
+ State2 = give_resource(Entry, State),
+ %% send reply to blocked caller, unblocking them.
+ gen_server:reply(?e_from(Entry), ok),
+ %% unblock next blocked in queue
+ give_available_resources(Resource, NumAvailable-1, Queue2,State2)
+ end.
+
+%% @private
+%% @doc
+%% For the given type, check the current given count and if less
+%% than the rate limit, give out as many tokens as are available
+%% to callers on the blocked list. They need a reply because they
+%% made a gen_server:call() that we have not replied to yet.
+maybe_unblock_blocked(Resource, State) ->
@jrwest
jrwest Nov 12, 2013 Contributor

There are no calls to this function for lock resources, so processes blocked waiting for locks, do so indefinitely.

Running block_simple() below you will never see the output from P2.

block_simple() ->
    {ok, S1} = riak_core_table_manager:start_link([{background_mgr_table, 
                                                   [protected, named_table, bag]}]),
    {ok, S2} = riak_core_bg_manager:start_link(),
    unlink(S1),
    unlink(S2),
    riak_core_bg_manager:set_concurrency_limit(a, 1),
    P1 = spawn(fun() ->
                       riak_core_bg_manager:get_lock_blocking(a, 10000),
                       io:format("P1: Got Lock, Waiting to Die~n"),
                       receive kill -> io:format("P1: I'm Dead~n") end
               end),
    P2 = spawn(fun() ->
                       timer:sleep(1000),
                       case riak_core_bg_manager:get_lock_blocking(a, 1000000) of
                           ok ->
                               io:format("P2: Got Lock, Waiting to Die~n"),
                               receive kill -> io:format("P2: I'm Dead~n") end;
                           timeout ->
                               io:format("P2: Timed out waiting for kill~n")
                       end
               end),
    io:format("P1: ~p, P2: ~p~n", [P1, P2]),
    timer:sleep(2000),
    P1 ! kill,
    timer:sleep(10000),
    io:format("TABLE: ~p~n", [ets:tab2list(background_mgr_table)]),
    exit(whereis(riak_core_bg_manager), kill),
    exit(whereis(riak_core_table_manager), kill).

loop_forever() ->
    timer:sleep(10000),
    loop_forever().
@buddhisthead
buddhisthead Nov 25, 2013 Contributor

This issue will be addressed by removing the blocking API, which we will do last in order to save as much of it as possible. However, we have decided that a better way to provide a locking API is to put the callers on a queue and send them a message when the resource becomes available. This avoids blocking the caller in the middle of a gen_server:call and allows a status message to be responded to, in addition to other messages. Credit to @jtuple for that idea.

@jrwest jrwest commented on an outdated diff Nov 12, 2013
src/riak_core_bg_manager.erl
+%% @private
+%% @doc Return the queue of blocked resources named 'Resource'.
+blocked_queue(Resource, #state{table_id=TableId}) ->
+ Key = {blocked, Resource},
+ case ets:lookup(TableId, Key) of
+ [] -> queue:new();
+ [{Key,Queue} | _Rest] -> Queue
+ end.
+
+%% @private
+%% @doc Put a resource request on the blocked queue. We'll reply later when resources
+%% of that type become available.
+enqueue_request(Resource, Type, Pid, Meta, From, Timeout, State) ->
+%% ?debugFmt("queueing ~p~n", [Resource]),
+ OldQueue = blocked_queue(Resource, State),
+ Ref = monitor(process, Pid),
@jrwest
jrwest Nov 12, 2013 Contributor

Because get_locked_blocking/3 can take a lock on behalf of another process, this monitor isn't sufficient. If Process A requests a lock using the pid of process B and A dies before the lock is unblocked we have some ambiguous behavior if A is not linked to B.

EDIT: @buddhisthead and I discussed removing the ability to block when taking locks on behalf of other processes to avoid this case

@jrwest jrwest and 1 other commented on an outdated diff Nov 12, 2013
src/riak_core_bg_manager.erl
+ window_interval=Interval,
+ history=queue:new()},
+ State2 = schedule_sample_history(State),
+ {ok, State2}.
+
+%% @private
+%% @doc Handling call messages
+-spec handle_call(term(), {pid(), term()}, #state{}) ->
+ {reply, term(), #state{}} |
+ {reply, term(), #state{}, non_neg_integer()} |
+ {noreply, #state{}} |
+ {noreply, #state{}, non_neg_integer()} |
+ {stop, term(), term(), #state{}} |
+ {stop, term(), #state{}}.
+
+handle_call({query_resource, Resource, States, Types}, _From, State) ->
@jrwest
jrwest Nov 12, 2013 Contributor

most of the operations here assume that the ETS-TRANSFER message has already been received by the background manager, which may not be the case.

@buddhisthead
buddhisthead Nov 25, 2013 Contributor

Addressed by f67b60e, which guards the API prior to ETS table transfer.

@jrwest
Contributor
jrwest commented Nov 12, 2013

The use of bag for the bg manager's ets table makes sense for some of its data. In many places, however, there are non-atomic delete-then-insert operations (e.g. the blocked queue, resource info). These are probably good candidates to move to a second ets table of type set. We already try to force their uniqueness, outside of ets, and our method for doing so may have race conditions if we move some things like querying resources out of the bg manager process into the calling one.

@jrwest
Contributor
jrwest commented Nov 12, 2013

See https://gist.github.com/jrwest/1001c05e427582516e34 for dialyzer errors I get when running against this branch. It would be great to resolve these before merging.

+1 to that - @buddhisthead

Resolved by bb6ae81

@jrwest jrwest and 1 other commented on an outdated diff Nov 12, 2013
include/riak_core_bg_manager.hrl
+%% tuple '{my_ultimate_lock, 42}'.
+%%
+%% Usage:
+%% 1. register your lock/token and set it's max concurrency/rate.
+%% 2. "get" a lock/token by it's resource type/name
+%% 3. do stuff
+%% 4. let your process die, which gives back a lock.
+%% -------------------------------------------------------------------
+-type bg_lock() :: any().
+-type bg_token() :: any().
+-type bg_resource() :: bg_token() | bg_lock().
+-type bg_resource_type() :: lock | token.
+
+-type bg_meta() :: {atom(), any()}. %% meta data to associate with a lock/token
+-type bg_period() :: pos_integer(). %% token refill period in seconds
+-type bg_count() :: pos_integer(). %% token refill tokens to count at each refill period
@jrwest
jrwest Nov 12, 2013 Contributor

This is one of the sources of the dialyzer errors. Should it be non_neg_integer() instead? Below in BG_DEFAULT_STAT_HIST we initialize some bg_count() typed values to 0.

@buddhisthead
buddhisthead Nov 29, 2013 Contributor

Yes. Resolved by bb6ae81

@jrwest jrwest and 1 other commented on an outdated diff Nov 12, 2013
src/riak_core_bg_manager.erl
+%% meaningful for locks.
+release_resource(Ref, State=#state{table_id=TableId}) ->
+ %% There should only be one instance of the object, but we'll zap all that match.
+ Entries = all_given_entries(State),
+ Matches = [Entry || {{given, _Resource},Entry} <- Entries, ?e_ref(Entry) == Ref],
+ [ets:delete_object(TableId, Obj) || Obj <- Matches],
+ State.
+
+maybe_honor_limit(true, Lock, Limit, State) ->
+ Entries = all_given_entries(State),
+ Held = [Entry || Entry <- Entries, ?e_type(Entry) == lock, ?e_resource(Entry) == Lock],
+ case Limit < length(Held) of
+ true ->
+ {_Keep, Discards} = lists:split(Limit, Held),
+ %% killing of processes will generate 'DOWN' messages and release the locks
+ [erlang:exit(Pid, max_concurrency) || Discard <- Discards, Pid = ?e_pid(Discard)],
@jrwest
jrwest Nov 12, 2013 Contributor

Dialyzer caught this. Guessing this got mangled in the transition to this PR. Think this should be:

[erlang:exit(?e_pid(Discard), max_concurrency) || Discard <- Discards]
@buddhisthead
buddhisthead Nov 29, 2013 Contributor

Resolved by bb6ae81

@jrwest jrwest commented on an outdated diff Nov 13, 2013
src/riak_core_bg_manager.erl
+ Limit = limit(Info),
+ {reply, HeldCount >= Limit, State}.
+
+%% @private
+%% @doc Return the maximum allowed number of resources for the given
+%% info, which considers the type of resource, e.g. lock vs token.
+limit(#resource_info{type=lock, limit=Limit}) -> Limit;
+limit(#resource_info{type=token, limit={_Period,MaxCount}}) -> MaxCount.
+
+%% @private
+%% @doc Release the resource associated with the given resource. This is mostly
+%% meaningful for locks.
+release_resource(Ref, State=#state{table_id=TableId}) ->
+ %% There should only be one instance of the object, but we'll zap all that match.
+ Entries = all_given_entries(State),
+ Matches = [Entry || {{given, _Resource},Entry} <- Entries, ?e_ref(Entry) == Ref],
@jrwest
jrwest Nov 13, 2013 Contributor

Because Entries is already filtered in all_given_entries, Matches is an empty list and thus no locks are released here. The following crashes because concurreny_limit_reached continues to return true when it shouldn't:

locks_never_release() ->
    catch exit(whereis(riak_core_bg_manager), kill),
    catch exit(whereis(riak_core_table_manager), kill),
    timer:sleep(100),
    {ok, S1} = riak_core_table_manager:start_link([{background_mgr_table, [protected, named_table, bag]}]),
    {ok, S2} = riak_core_bg_manager:start_link(),
    unlink(S1),
    unlink(S2),
    riak_core_bg_manager:set_concurrency_limit(a, 1),
    spawn(fun() ->
                  ok = riak_core_bg_manager:get_lock(a)
          end),
    timer:sleep(2000),
    false = riak_core_bg_manager:concurrency_limit_reached(a),
    exit(whereis(riak_core_bg_manager), kill),
    exit(whereis(riak_core_table_manager), kill).
@jrwest
jrwest Nov 14, 2013 Contributor

When this gets fixed its going to introduce a new bug...

Processes getting tokens are monitored just like those getting locks, which results in a {'DOWN', ...} message and a call to this function. Tokens should only be refilled on a timer, however, so we don't want to "release" them.

@jrwest jrwest and 1 other commented on an outdated diff Nov 14, 2013
src/riak_core_bg_manager.erl
+
+%% @doc Set the refill rate of tokens. Return previous value.
+-spec set_token_rate(bg_token(), bg_rate()) -> bg_rate().
+set_token_rate(Token, Rate={_Period, _Count}) ->
+ gen_server:call(?SERVER, {set_token_rate, Token, Rate}, infinity).
+
+%% @doc Get the current refill rate of named token.
+-spec token_rate(bg_token()) -> bg_rate().
+token_rate(Token) ->
+ gen_server:call(?SERVER, {token_rate, Token}, infinity).
+
+%% @doc Get a token without blocking.
+%% Associate token with provided pid or metadata. If metadata
+%% is provided the lock is associated with the calling process.
+%% Returns "max_tokens" if empty.
+-spec get_token(bg_token(), pid() | [{atom(), any()}]) -> ok | max_tokens.
@jrwest
jrwest Nov 14, 2013 Contributor

get_token/1,2,3 claim to return max_tokens when the limit is reached but in fact they return max_concurrency:

3> riak_core_bg_manager:set_token_rate(a, {10000, 2}).
0
4> riak_core_bg_manager:get_token(a).
ok
5> riak_core_bg_manager:get_token(a).
ok
6> riak_core_bg_manager:get_token(a).
max_concurrency
@buddhisthead
buddhisthead Nov 14, 2013 Contributor

That's a comment/spec error. I have changed everything to return max_concurrency so that it's easier to share code. I will fix up the specs.

@jrwest jrwest commented on the diff Nov 14, 2013
src/riak_core_bg_manager.erl
+ query_resource(Lock, [given], [lock]).
+
+%% @doc Returns all currently blocked locks.
+locks_blocked() ->
+ locks_blocked(all).
+
+-spec locks_blocked(bg_lock() | all) -> [bg_stat_live()].
+locks_blocked(Lock) ->
+ query_resource(Lock, [blocked], [token]).
+
+%%%%%%%%%%%%
+%% Token API
+%%%%%%%%%%%%
+
+%% @doc Set the refill rate of tokens. Return previous value.
+-spec set_token_rate(bg_token(), bg_rate()) -> bg_rate().
@jrwest
jrwest Nov 14, 2013 Contributor

only the previous Count is returned from set_token_rate instead of {Period, Count}

@buddhisthead
buddhisthead Nov 19, 2013 Contributor

Addressed by 51675e5

@jrwest jrwest and 1 other commented on an outdated diff Nov 14, 2013
src/riak_core_bg_manager.erl
+ Queue2 = queue:filter(fun(Entry) -> ?e_from(Entry) /= From end, Queue),
+ update_blocked_queue(Resource, Queue2, State).
+
+%% @private
+%% @doc
+%% Get existing token type info from ETS table and schedule all for refill.
+%% This is needed because we just reloaded our saved persisent state data
+%% after a crash.
+reschedule_token_refills(State) ->
+ Tokens = all_registered_resources(token, State),
+ [schedule_refill_tokens(Token, State) || Token <- Tokens].
+
+%% Schedule a timer event to refill tokens of given type
+schedule_refill_tokens(Token, State) ->
+ {Period, _Count} = ?resource_limit(resource_info(Token, State)),
+ erlang:send_after(Period*1000, self(), {refill_tokens, Token}).
@jrwest
jrwest Nov 14, 2013 Contributor

I don't think Period should be in seconds. It makes it less flexible and many erlang apis (like send_after and timer:sleep) use millis.

@buddhisthead
buddhisthead Nov 15, 2013 Contributor

Hmm. I know it's non-standard, but it makes no sense to refill tokens that fast. But I will change it; and maybe add a warning message if the number seems too small.

@jrwest
jrwest Nov 15, 2013 Contributor

values smaller than a second may be too fast but I may want 1500ms instead of 1000 or 2000. Most things in Riak also tend to be configurable by millis (and exposed in different units via cuttlefish, now). Don't really think a warning message is needed. imo, better to document that set_token_rate expects period to be in millis.

@jrwest
jrwest Nov 15, 2013 Contributor

Also if you really wan't to prevent small values use a guard (although, I still don't think its necessary).

@buddhisthead
buddhisthead Nov 19, 2013 Contributor

Addressed by 978e466 ...The API now uses milliseconds.

Updated API doc, bg-mgr module, and all tests to use milliseconds.

@jrwest jrwest and 1 other commented on an outdated diff Nov 14, 2013
src/riak_core_bg_manager.erl
+ [format_entry(Entry) || Entry <- Entries].
+
+%% States :: [given | blocked], Types :: [lock | token]
+do_query(all, States, Types, State) ->
+ E1 = case lists:member(given, States) of
+ true ->
+ Entries = all_given_entries(State),
+ lists:flatten([Entry || Entry <- Entries,
+ lists:member(?e_type(Entry), Types)]);
+ false ->
+ []
+ end,
+ E2 = case lists:member(blocked, States) of
+ true ->
+ Queues = all_blocked_queues(State),
+%% ?debugFmt("All blocked queues: ~p~n", [Queues]),
@jrwest
jrwest Nov 14, 2013 Contributor

remove

@buddhisthead
buddhisthead Nov 15, 2013 Contributor

I've blasted all the debug statements.

@jrwest jrwest and 1 other commented on an outdated diff Nov 14, 2013
src/riak_core_bg_manager.erl
+ true ->
+ %% Oh Erlang, why is your scoping so messed up?
+ Entries2 = queue:to_list(blocked_queue(Resource, State)),
+ E1 ++ [Entry || Entry <- Entries2, lists:member(?e_type(Entry), Types)];
+ false ->
+ E1
+ end,
+ fmt_live_entries(E2).
+
+%% @private
+%% @doc Token refill timer event handler.
+%% Capture stats of what was given in the previous period,
+%% Clear all tokens of this type from the given set,
+%% Unblock blocked processes if possible.
+do_refill_tokens(Token, State) ->
+%% ?debugFmt("Refilling ~p~n", [Token]),
@jrwest
jrwest Nov 14, 2013 Contributor

remove

@buddhisthead
buddhisthead Nov 29, 2013 Contributor

All debug statements have been removed.

@jrwest jrwest and 1 other commented on an outdated diff Nov 14, 2013
src/riak_core_bg_manager_sup.erl
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(riak_core_bg_manager_sup).
@jrwest
jrwest Nov 14, 2013 Contributor

is this supervisor still necessary?

@buddhisthead
buddhisthead Nov 14, 2013 Contributor

No, we can directly supervise the background manager from riak_core_sup.erl.
I'll push a change for that.

@buddhisthead
buddhisthead Nov 14, 2013 Contributor

Addressed by 534523c

@jrwest
Contributor
jrwest commented Nov 14, 2013

set_token_rate/2 and set_concurrency_limit/2 may be called on the same time with no errors, yet this leaves the bg manager in a confused state. See output of background_mgr_table in EQC output below:

Shrinking......(6 times)
[{set,{var,5},{call,bg_manager_eqc,set_token_rate,[h,1]}},
 {set,{var,8},{call,bg_manager_eqc,set_concurrency_limit,[h,0,false]}}]


Final State:
---------------
procs = []
limits = []
locks = []
---------------


background_mgr_table:
---------------
[{{info,h},{resource_info,token,0,true}}]
---------------

bg_manager_eqc:set_token_rate(h, 1) -> 0
bg_manager_eqc:set_concurrency_limit(h, 0, false) -> 1

Reason: {postcondition, {0, '/=', 1}}
false

After getting in this state, calling either get_token or get_lock causes the background manager to crash:

23> riak_core_bg_manager:set_token_rate(a, {10000, 2}).
0
24> riak_core_bg_manager:set_concurrency_limit(a, 10).
2
25> riak_core_bg_manager:get_token(a).

=ERROR REPORT==== 14-Nov-2013::14:34:54 ===
** Generic server riak_core_bg_manager terminating
** Last message in was {get_token,a,<0.68.0>,[]}
** When Server state == {state,background_mgr_table,true,[],
                               {[],[]},
                               60,#Ref<0.0.0.180>}
** Reason for termination ==
** {function_clause,
       [{riak_core_bg_manager,limit,
            [{resource_info,token,10,true}],
            [{file,"src/riak_core_bg_manager.erl"},{line,699}]},
        {riak_core_bg_manager,do_get_resource,5,
            [{file,"src/riak_core_bg_manager.erl"},{line,908}]},
        {riak_core_bg_manager,do_handle_call_exception,3,
            [{file,"src/riak_core_bg_manager.erl"},{line,632}]},
        {gen_server,handle_msg,5,[{file,"gen_server.erl"},{line,585}]},
        {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,239}]}]}
** exception exit: function_clause
     in function  riak_core_bg_manager:limit/1
        called as riak_core_bg_manager:limit({resource_info,token,10,true})
     in call from riak_core_bg_manager:do_get_resource/5 (src/riak_core_bg_manager.erl, line 908)
     in call from riak_core_bg_manager:do_handle_call_exception/3 (src/riak_core_bg_manager.erl, line 632)
     in call from gen_server:handle_msg/5 (gen_server.erl, line 585)
     in call from proc_lib:init_p_do_apply/3 (proc_lib.erl, line 239)

---- RESTARTED STUFF ----

31> riak_core_bg_manager:set_token_rate(a, {10000, 2}).
0
32> riak_core_bg_manager:set_concurrency_limit(a, 1).
2
33> riak_core_bg_manager:get_lock(a).

=ERROR REPORT==== 14-Nov-2013::15:11:18 ===
** Generic server riak_core_bg_manager terminating
** Last message in was {get_lock,a,<0.80.0>,[]}
** When Server state == {state,background_mgr_table,true,[],
                               {[[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
                                 [],[],[]],
                                [[]]},
                               60,#Ref<0.0.0.257>}
** Reason for termination ==
** {function_clause,
       [{riak_core_bg_manager,limit,
            [{resource_info,token,1,true}],
            [{file,"src/riak_core_bg_manager.erl"},{line,699}]},
        {riak_core_bg_manager,do_get_resource,5,
            [{file,"src/riak_core_bg_manager.erl"},{line,908}]},
        {riak_core_bg_manager,do_handle_call_exception,3,
            [{file,"src/riak_core_bg_manager.erl"},{line,632}]},
        {gen_server,handle_msg,5,[{file,"gen_server.erl"},{line,585}]},
        {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,239}]}]}
** exception exit: function_clause
     in function  riak_core_bg_manager:limit/1
        called as riak_core_bg_manager:limit({resource_info,token,1,true})
     in call from riak_core_bg_manager:do_get_resource/5 (src/riak_core_bg_manager.erl, line 908)
     in call from riak_core_bg_manager:do_handle_call_exception/3 (src/riak_core_bg_manager.erl, line 632)
     in call from gen_server:handle_msg/5 (gen_server.erl, line 585)
     in call from proc_lib:init_p_do_apply/3 (proc_lib.erl, line 239)

@jrwest
Contributor
jrwest commented Nov 14, 2013

Following up on the previous comment (since I hit "Comment" on accident):

Besides the state above, I can also call get_lock when in fact what I am "given" is a token:

(riak_core@jrw-desktop2013)133> riak_core_table_manager:start_link([{background_mgr_table, [private, bag]}]).
{ok,<0.3818.6>}
(riak_core@jrw-desktop2013)134> riak_core_bg_manager:start_link().
{ok,<0.3826.6>}
(riak_core@jrw-desktop2013)135> riak_core_bg_manager:set_token_rate(a, {10000, 2}).
0
(riak_core@jrw-desktop2013)136> riak_core_bg_manager:get_lock(a).
ok

NOTE: resolved by 3dc4ef6

@jrwest jrwest commented on an outdated diff Nov 14, 2013
ebin/riak_core.app
@@ -98,7 +98,10 @@
riak_core_metadata_hashtree,
riak_core_metadata_exchange_fsm,
riak_core_broadcast,
- riak_core_broadcast_handler
+ riak_core_broadcast_handler,
+ riak_core_bg_manager,
+ riak_core_table_manager,
+ riak_core_bg_manager_sup
@jrwest
jrwest Nov 14, 2013 Contributor

this should have been removed when the _sup was removed.

@jrwest jrwest commented on the diff Nov 14, 2013
src/riak_core_sup.erl
?CHILD(riak_core_gossip, worker),
?CHILD(riak_core_claimant, worker),
- ?CHILD(riak_core_stat_sup, supervisor)
+ ?CHILD(riak_core_stat_sup, supervisor),
+ ?CHILD(riak_core_table_manager, worker, 5000, [?TBL_MGR_ARGS]),
+ ?CHILD(riak_core_bg_manager, supervisor)
@jrwest
jrwest Nov 14, 2013 Contributor

supervisor -> worker

@jrwest jrwest and 1 other commented on an outdated diff Nov 15, 2013
src/riak_core_bg_manager.erl
+ increment_stat_given(Resource, Type, State).
+
+%% @private
+%% @doc Add Resource to our given set.
+give_resource(Resource, Type, Pid, Ref, Meta, State) ->
+ From = undefined,
+ Entry = ?RESOURCE_ENTRY(Resource, Type, Pid, Meta, From, Ref, given),
+ give_resource(Entry, State).
+
+
+try_get_resource(false, _Resource, _Type, _Pid, _Meta, State) ->
+ {max_concurrency, State};
+try_get_resource(true, Resource, Type, Pid, Meta, State) ->
+ case Type of
+ token ->
+ {ok, give_resource(Resource, Type, Pid, undefined, Meta, State)};
@jrwest
jrwest Nov 15, 2013 Contributor

The change made here to not monitor the Pid and not carry a ref in the resource_entry record allows a process to obtain more tokens per period than the limit. Looking at the final state of background_mgr_table, in the EQC output below, this is because the resource_entry for separate get_token token calls from the same process (or for the same given pid) is no longer unique. One fix would be to use duplicate_bag but I think that may open up a whole new can of worms. I would just create a dummy reference instead and stick it back in the call to give_resource.

EQC Output:

Shrinking.......(7 times)
[{set,{var,2},{call,bg_manager_eqc,set_token_rate,['C',2]}},
 {set,{var,6},{call,bg_manager_eqc,get_token,['C']}},
 {set,{var,8},{call,bg_manager_eqc,get_token,['C']}},
 {set,{var,12},{call,bg_manager_eqc,get_token,['C']}}]


Final State:
---------------
procs = []
limits = []
locks = []
---------------


background_mgr_table:
---------------
[{{given,'C'},
  {resource_entry,'C',token,<0.2327.0>,[],undefined,undefined,given}},
 {{info,'C'},{resource_info,token,{4294967,2},true}}]
---------------

bg_manager_eqc:set_token_rate('C', 2) -> 0
bg_manager_eqc:get_token('C') -> ok
bg_manager_eqc:get_token('C') -> ok
bg_manager_eqc:get_token('C') -> ok

Reason: {postcondition, {2, 'not <', 2}}
@buddhisthead
buddhisthead Nov 15, 2013 Contributor

Yeah, I like the dummy ref idea better. Thanks.

@buddhisthead
buddhisthead Nov 15, 2013 Contributor

Addressed with 8a2b5dd by creating a random unique Ref and associating it with the Token.

@jrwest jrwest commented on an outdated diff Nov 15, 2013
src/riak_core_bg_manager.erl
+ Object = {Key, Queue},
+ %% replace existing queue. Must delete existing one since we're using a bag table
+ ets:delete(TableId, Key),
+ ets:insert(TableId, Object),
+ State.
+
+%% @private
+%% @doc Return the queue of blocked resources named 'Resource'.
+blocked_queue(Resource, #state{table_id=TableId}) ->
+ Key = {blocked, Resource},
+ case ets:lookup(TableId, Key) of
+ [] -> queue:new();
+ [{Key,Queue} | _Rest] -> Queue
+ end.
+
+random_bogus_ref() ->
@jrwest
jrwest Nov 15, 2013 Contributor

There is no need for this. Just use make_ref/0.

@jrwest
jrwest Nov 15, 2013 Contributor

Already commented we should change, but I just found it interesting EQC found a bug in this as well (if the seed does not change across restarts its easy to get the same number generated twice). The third get_token call doesn't change the state of the ets table at all because the "ref" is the same.

Failed!
[{set,{var,5},{call,bg_manager_eqc,set_token_rate,['B',3]}},
 {set,{var,6},{call,bg_manager_eqc,get_token,['B']}},
 {set,{var,8},{call,bg_manager_eqc,get_token,['B']}},
 {set,{var,10},{call,bg_manager_eqc,crash,[]}},
 {set,{var,11},{call,bg_manager_eqc,revive,[]}},
 {set,{var,12},{call,bg_manager_eqc,get_token,['B']}},
 {set,{var,13},{call,bg_manager_eqc,get_token,['B']}}]


Final State:
---------------
alive = true
procs = []
limits = []
locks = []
counts = [{'B',3}]
tokens = [{'B',3}]
---------------


background_mgr_table:
---------------
[{{info,'B'},{resource_info,token,{4294967,3},true}},
 {{given,'B'},
  {resource_entry,'B',token,<0.10102.2>,[],undefined,0.4435846174457203,
                  given}},
 {{given,'B'},
  {resource_entry,'B',token,<0.10102.2>,[],undefined,0.7230402056221108,
                  given}}]
---------------

bg_manager_eqc:set_token_rate('B', 3) -> 0
bg_manager_eqc:get_token('B') -> ok
bg_manager_eqc:get_token('B') -> ok
bg_manager_eqc:crash() -> ok
bg_manager_eqc:revive() -> true
bg_manager_eqc:get_token('B') -> ok
bg_manager_eqc:get_token('B') -> ok

Reason: {postcondition, {3, 'not <', 3}}
@jrwest jrwest commented on an outdated diff Nov 15, 2013
src/riak_core_bg_manager.erl
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+%% @private
+%% @doc We must have just gotten the table data back after a crash/restart.
+%% Walk through the given resources and release any holds by dead processes.
+validate_holds(State=#state{table_id=TableId}) ->
+ [validate_hold(Obj, TableId) || Obj <- ets:match_object(TableId, {{given, '_'},'_'})],
+ State.
+
+%% @private
+%% @doc If the given entry has no alive process associated with it,
+%% remove the hold from the ETS table.
+validate_hold({_Key,Entry}=Obj, TableId) ->
+ %% If the process is not alive, release the lock
@jrwest
jrwest Nov 15, 2013 Contributor

entry may be a token as well, which we don't want to release (only refilling should do that). Found by EQC:

Shrinking....................(20 times)
[{set,{var,1},{call,bg_manager_eqc,set_token_rate,['E',1]}},
 {set,{var,2},{call,bg_manager_eqc,start_process,[]}},
 {set,{var,3},{call,bg_manager_eqc,set_concurrency_limit,[a,1,false]}},
 {set,{var,4},{call,bg_manager_eqc,get_lock,[a,{var,2},[]]}},
 {set,{var,7},{call,bg_manager_eqc,get_token,['E',{var,2}]}},
 {set,{var,8},{call,bg_manager_eqc,stop_process,[{var,2}]}},
 {set,{var,18},{call,bg_manager_eqc,crash,[]}},
 {set,{var,19},{call,bg_manager_eqc,revive,[]}},
 {set,{var,23},{call,bg_manager_eqc,get_token,['E']}}]


Final State:
---------------
alive = true
procs = [{<0.16363.7>,not_running}]
limits = [{a,1}]
locks = [{a,[{#Ref<0.0.5.122064>,<0.16363.7>,[],released}]}]
counts = [{'E',1}]
tokens = [{'E',1}]
---------------


background_mgr_table:
---------------
[{{given,'E'},
  {resource_entry,'E',token,<0.16360.7>,[],undefined,#Ref<0.0.5.122073>,
                  given}},
 {{info,'E'},{resource_info,token,{4294967,1},true}},
 {{info,a},{resource_info,lock,1,true}}]
---------------

bg_manager_eqc:set_token_rate('E', 1) -> 0
bg_manager_eqc:start_process() -> <0.16363.7>
bg_manager_eqc:set_concurrency_limit(a, 1, false) -> 0
bg_manager_eqc:get_lock(a, <0.16363.7>, []) -> #Ref<0.0.5.122064>
bg_manager_eqc:get_token('E', <0.16363.7>) -> ok
bg_manager_eqc:stop_process(<0.16363.7>) -> ok
bg_manager_eqc:crash() -> ok
bg_manager_eqc:revive() -> true
bg_manager_eqc:get_token('E') -> ok

Reason: {postcondition, {1, 'not <', 1}}
@jrwest
jrwest Nov 15, 2013 Contributor

Here is a slightly better failure case generated by EQC (this one doesn't require you to notice the stop_process races w/ crash):

Shrinking..........(10 times)
[{set,{var,10},{call,bg_manager_eqc,set_concurrency_limit,[b,1,false]}},
 {set,{var,13},{call,bg_manager_eqc,start_process,[]}},
 {set,{var,14},{call,bg_manager_eqc,set_token_rate,['D',1]}},
 {set,{var,19},{call,bg_manager_eqc,get_token,['D',{var,13}]}},
 {set,{var,21},{call,bg_manager_eqc,get_lock,[b,{var,13},[]]}},
 {set,{var,23},{call,bg_manager_eqc,crash,[]}},
 {set,{var,24},{call,bg_manager_eqc,stop_process,[{var,13}]}},
 {set,{var,27},{call,bg_manager_eqc,revive,[]}},
 {set,{var,28},{call,bg_manager_eqc,get_token,['D']}}]


Final State:
---------------
alive = true
procs = [{<0.26915.1>,not_running}]
limits = [{b,1}]
locks = [{b,[{#Ref<0.0.0.138195>,<0.26915.1>,[],released}]}]
counts = [{'D',1}]
tokens = [{'D',1}]
---------------


background_mgr_table:
---------------
[{{given,'D'},
  {resource_entry,'D',token,<0.26912.1>,[],undefined,#Ref<0.0.0.138202>,
                  given}},
 {{info,'D'},{resource_info,token,{4294967,1},true}},
 {{info,b},{resource_info,lock,1,true}}]
---------------

bg_manager_eqc:set_concurrency_limit(b, 1, false) -> 0
bg_manager_eqc:start_process() -> <0.26915.1>
bg_manager_eqc:set_token_rate('D', 1) -> 0
bg_manager_eqc:get_token('D', <0.26915.1>) -> ok
bg_manager_eqc:get_lock(b, <0.26915.1>, []) -> #Ref<0.0.0.138195>
bg_manager_eqc:crash() -> ok
bg_manager_eqc:stop_process(<0.26915.1>) -> ok
bg_manager_eqc:revive() -> true
bg_manager_eqc:get_token('D') -> ok

Reason: {postcondition, {1, 'not <', 1}}
@jrwest jrwest commented on an outdated diff Nov 15, 2013
src/riak_core_bg_manager.erl
+
+%% @private
+%% @doc We must have just gotten the table data back after a crash/restart.
+%% Walk through the given resources and release any holds by dead processes.
+validate_holds(State=#state{table_id=TableId}) ->
+ [validate_hold(Obj, TableId) || Obj <- ets:match_object(TableId, {{given, '_'},'_'})],
+ State.
+
+%% @private
+%% @doc If the given entry has no alive process associated with it,
+%% remove the hold from the ETS table.
+validate_hold({_Key,Entry}=Obj, TableId) ->
+ %% If the process is not alive, release the lock
+ case is_process_alive(?e_pid(Entry)) of
+ true ->
+ ok;
@jrwest
jrwest Nov 15, 2013 Contributor

if the process is alive, we need to remonitor it and update the ets table with the new ref. EQC Failure (see bg_manager monitors output, there are no monitors in the final state b/c when the process crashes we lose them):

EDIT: Oops, pasted the wrong failure. Its fixed now.

[{set,{var,1},{call,bg_manager_eqc,set_concurrency_limit,[d,1,false]}},
 {set,{var,2},{call,bg_manager_eqc,start_process,[]}},
 {set,{var,3},{call,bg_manager_eqc,get_lock,[d,{var,2},[]]}},
 {set,{var,10},{call,bg_manager_eqc,crash,[]}},
 {set,{var,11},{call,bg_manager_eqc,start_process,[]}},
 {set,{var,12},{call,bg_manager_eqc,revive,[]}},
 {set,{var,29},{call,bg_manager_eqc,stop_process,[{var,2}]}},
 {set,{var,30},{call,bg_manager_eqc,get_lock,[d,{var,11},[]]}}]


Final State:
---------------
alive = true
procs = [{<0.32107.4>,not_running},{<0.32108.4>,running}]
limits = [{d,1}]
locks = [{d,[{#Ref<0.0.1.168363>,<0.32107.4>,[],released}]}]
counts = []
tokens = []
---------------


background_mgr_table:
---------------
[{{given,d},
  {resource_entry,d,lock,<0.32107.4>,[],undefined,#Ref<0.0.1.168363>,given}},
 {{info,d},{resource_info,lock,1,true}}]
---------------


bg_manager monitors:
---------------
{monitors,[]}
---------------

bg_manager_eqc:set_concurrency_limit(d, 1, false) -> 0
bg_manager_eqc:start_process() -> <0.32107.4>
bg_manager_eqc:get_lock(d, <0.32107.4>, []) -> #Ref<0.0.1.168363>
bg_manager_eqc:crash() -> ok
bg_manager_eqc:start_process() -> <0.32108.4>
bg_manager_eqc:revive() -> true
bg_manager_eqc:stop_process(<0.32107.4>) -> ok
bg_manager_eqc:get_lock(d, <0.32108.4>, []) -> max_concurrency

Reason: {postcondition, {0, 'not >=', 1}}
false
@jrwest jrwest and 1 other commented on an outdated diff Nov 18, 2013
src/riak_core_bg_manager.erl
+ Result = do_query(all, [given, blocked], [token], State),
+ {reply, Result, State};
+handle_call({ps, Resource}, _From, State) ->
+ Result = do_query(Resource, [given, blocked], [token, lock], State),
+ {reply, Result, State}.
+
+%% @private
+%% @doc Handling cast messages
+-spec handle_cast(term(), #state{}) -> {noreply, #state{}} |
+ {noreply, #state{}, non_neg_integer()} |
+ {stop, term(), #state{}}.
+handle_cast({enable, Resource}, State) ->
+ do_handle_cast_exception(fun do_enable_resource/3, [Resource, true, State], State);
+handle_cast({disable, Resource}, State) ->
+ do_handle_cast_exception(fun do_enable_resource/3, [Resource, false, State], State);
+handle_cast({disable, Lock, Kill}, State) ->
@jrwest
jrwest Nov 18, 2013 Contributor

There is no gen_server:call matching this message

@buddhisthead
buddhisthead Nov 19, 2013 Contributor

Addressed by fdaeeb5

@jrwest jrwest and 2 others commented on an outdated diff Nov 18, 2013
src/riak_core_bg_manager.erl
+ {noreply, State}
+ end.
+
+%% @private
+%% @doc Wrap a call, to a function with args, with a try/catch that handles
+%% thrown exceptions, namely '{unregistered, Resource}' and return the
+%% failed error response for a gen server call.
+do_handle_call_exception(Function, Args, State) ->
+ try apply(Function, Args)
+ catch
+ Error ->
+ lager:error("Exception: ~p in function ~p", [Error, Function]),
+ {reply, Error, State}
+ end.
+
+%% xyzzy
@jrwest
jrwest Nov 18, 2013 Contributor

accidental keyboarding?

@buddhisthead
buddhisthead Nov 18, 2013 Contributor

Yes. One of my first computer games played. I must have left this in the code by accident; I often place it in code as a place to teleport later to continue my work.

@buddhisthead
Contributor

@jrwest I believe that the only unresolved comment at this point is the issue about the ETS table type bag vs set. I think you may have proposed to spit things into two tables so that we could use more type-appropriate tables for each specific use case. I think that's cool, but I have to weigh the tradeoff of having then two tables to claim, transfer, and guard. Your point about it being easier and safer for people adding new code is well taken, but I wonder if, at this point, we can defer that for a later PR. Since the table is always memory resident, we won't have any issues between different versions of riak. There is currently no public sharing of the tables. So, I propose that we defer this until we tackle doing queries outside of the gen_server; and focus our time on integration with sub-systems.. How does that feel to you?

@jrwest jrwest commented on the diff Dec 2, 2013
test/bg_manager_eqc.erl
@@ -0,0 +1,647 @@
+%%% @author Jordan West <>
+%%% @copyright (C) 2013, Jordan West
+%%% @doc
+%%%
+%%% @end
+%%% Created : 13 Nov 2013 by Jordan West <>
+
+-module(bg_manager_eqc).
+
+%%-ifdef(TEST).
@jrwest
jrwest Dec 2, 2013 Contributor

these got commented out on accident along the way

@jrwest jrwest commented on the diff Dec 2, 2013
test/bg_manager_eqc.erl
@@ -0,0 +1,647 @@
+%%% @author Jordan West <>
@jrwest
jrwest Dec 2, 2013 Contributor

more a note to myself, but while I'm here, we need to change this header.

@buddhisthead
Contributor

Making a note to ensure that we add a toggle switch to enable/disable this for the subsystems that integrate with BG manager.

buddhisthead added some commits Dec 4, 2013
@buddhisthead buddhisthead Add EQC tests for ps and head. Fix bugs found for those. f4ec30b
@buddhisthead buddhisthead Add tail tests and fix bugs. Remove redundant eunit tests. 7da7dfb
@buddhisthead buddhisthead Split bg mgr table into two tables.
* Info is a set (new).
* Token/Lock entries is a bag (unchanged).
* Update supervisor to start table manager with both tables, with correc properties.
* BG-MGR must now wait until both tables are claimed before servicing calls.
* Remove superfluous ETS delete on 'set' table.
* Update EQC and EUnit tests to use new tables.
* Fix discovered bug in EQC head/tail tests - don't reverse history until it's used.

Remove superfluous ETS delete on a set table.
a95759a
@buddhisthead buddhisthead Remove all blocking API and code from branch. No test changes. 799d635
@buddhisthead
Contributor

@jrwest This branch is now ready, IMHO, for a final review of the query API. It no longer has any blocking API, so it's on the glide path for rebasing and merge into develop. I will be starting work on integration with replication and core as separate PRs while you review these final changes. Thanks for your attention!

@buddhisthead
Contributor

Note latest addition of bypass API that allows one call to remove the lock/token manager from limiting any subsystems from running. This is the "kill switch" for the background manager that will render it neutralized. All locks and tokens will be given out if bypass is set to true via the bypass(boolean()) API call.
This is for you @jonmeredith and all beloved CSE's :-)

@buddhisthead
Contributor

Replaced by #484 which has merged :-)

@seancribbs seancribbs deleted the cet-bg-mgr-refactor branch Apr 1, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment