Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add a background lock and task manager module to riak core.

* The goal is to allow riak sub-systems to coordinate use of shared resources,
* e.g. protect from concurrent vnode folds on the same partition.
* Locks and tokens have a settable maximum concurrency limit.
* "Taken" locks and tokens are tracked in an ETS table.
* max_concurrency is returned when the set limits are reached.
* Processes that take locks are monitored.
* Locks are released when the taking processes terminate.
* Tokens are refreshed at a specified periodic rate.
* Token processes are not monitored because tokens never "release".
* A table manager is introduced to add persistence across process crashes,
* and to allow proper table transfer to occur without losing the table.
* An EQC test exercises the majority of the API. see test/bg_manager_eqc.erl
* See the original PR for background manager here: #364
  • Loading branch information...
commit cfb482fd3b570148d102d27e9868d0444222e022 1 parent 155429b
@buddhisthead buddhisthead authored
View
88 include/riak_core_bg_manager.hrl
@@ -0,0 +1,88 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% NOTES:
+%% The background manager allows tokens and locks to be "acquired" by
+%% competing processes in a way that limits the total load on the cluster.
+%%
+%% The model is different than your typical semaphore. Here, we are
+%% interested in coordinating background jobs that start, run, and die.
+%%
+%%
+%% The term "given" is a general version of "held", "acquired", or
+%% "allocated" for both locks and tokens. Held doesn't make sense for
+%% tokens since they aren't held. So, "given" applies to both locks
+%% and tokens, but you can think "held" for locks if that's more fun.
+%%
+%% Resources are defined by their "names", which is the same as "type"
+%% or "kind". A lock name might be the atom 'aae_hashtree_lock' or the
+%% tuple '{my_ultimate_lock, 42}'.
+%%
+%% Usage:
+%% 1. register your lock/token and set it's max concurrency/rate.
+%% 2. "get" a lock/token by it's resource type/name
+%% 3. do stuff
+%% 4. let your process die, which gives back a lock.
+%% -------------------------------------------------------------------
+-type bg_lock() :: any().
+-type bg_token() :: any().
+-type bg_resource() :: bg_token() | bg_lock().
+-type bg_resource_type() :: lock | token.
+
+-type bg_meta() :: {atom(), any()}. %% meta data to associate with a lock/token
+-type bg_period() :: pos_integer(). %% token refill period in milliseconds
+-type bg_count() :: pos_integer(). %% token refill tokens to count at each refill period
+-type bg_rate() :: undefined | {bg_period(), bg_count()}. %% token refill rate
+-type bg_concurrency_limit() :: non_neg_integer() | infinity. %% max lock concurrency allowed
+-type bg_state() :: given | blocked | failed. %% state of an instance of a resource.
+
+%% Results of a "ps" of live given or blocked locks/tokens
+-record(bg_stat_live,
+ {
+ resource :: bg_resource(), %% resource name, e.g. 'aae_hashtree_lock'
+ type :: bg_resource_type(), %% resource type, e.g. 'lock'
+ consumer :: pid(), %% process asking for token
+ meta :: [bg_meta()], %% associated meta data
+ state :: bg_state() %% result of last request, e.g. 'given'
+ }).
+-type bg_stat_live() :: #bg_stat_live{}.
+
+%% Results of a "head" or "tail", per resource. Historical query result.
+-record(bg_stat_hist,
+ {
+ type :: undefined | bg_resource_type(), %% undefined only on default
+ limit :: non_neg_integer(), %% maximum available, defined by token rate during interval
+ refills :: non_neg_integer(), %% number of times a token was refilled during interval. 0 if lock
+ given :: non_neg_integer(), %% number of times this resource was handed out within interval
+ blocked :: non_neg_integer() %% number of blocked processes waiting for a token
+ }).
+-type bg_stat_hist() :: #bg_stat_hist{}.
+-define(BG_DEFAULT_STAT_HIST,
+ #bg_stat_hist{type=undefined, limit=undefined, refills=0, given=0, blocked=0}).
+
+-define(BG_DEFAULT_WINDOW_INTERVAL, 60*1000). %% in milliseconds
+-define(BG_DEFAULT_OUTPUT_SAMPLES, 20). %% default number of sample windows displayed
+-define(BG_DEFAULT_KEPT_SAMPLES, 10000). %% number of history samples to keep
+
+-define(BG_INFO_ETS_TABLE, background_mgr_info_table). %% name of private lock/token manager info ETS table
+-define(BG_INFO_ETS_OPTS, [private, set]). %% creation time properties of info ETS table
+
+-define(BG_ENTRY_ETS_TABLE, background_mgr_entry_table). %% name of private lock/token manager entry ETS table
+-define(BG_ENTRY_ETS_OPTS, [private, bag]). %% creation time properties of entry ETS table
+
+
View
1,142 src/riak_core_bg_manager.erl
@@ -0,0 +1,1142 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% @doc
+%% We use two ETS tables to store critical data. In the event this process crashes,
+%% the tables will be given back to the table manager and we can reclaim them when
+%% we restart. Thus, limits and states are maintained across restarts of the
+%% module, but not of the application. Since we are supervised by riak_core_sup,
+%% that's fine.
+%%
+%% === Info Table ===
+%% The table must be a set and is best if private. See ?BG_INFO_ETS_OPTS in MODULE.hrl.
+%% Table Schema...
+%% KEY Data Notes
+%% --- ---- -----
+%% {info, Resource} #resource_info One token object per key.
+%% bypassed boolean()
+%% enabled boolean()
+%%
+%% === Entries Table ===
+%% The table must be a bag and is best if private. See ?BG_ENTRY_ETS_OPTS in MODULE.hrl.
+%% KEY Data Notes
+%% --- ---- -----
+%% {given, Resource} #resource_entry Multiple objects per key.
+%%
+%% -------------------------------------------------------------------
+-module(riak_core_bg_manager).
+
+-behaviour(gen_server).
+
+-include("riak_core_bg_manager.hrl").
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
+%% API
+-export([
+ %% Universal
+ start_link/0,
+ bypass/1,
+ bypassed/0,
+ enabled/0,
+ enabled/1,
+ enable/0,
+ enable/1,
+ disable/0,
+ disable/1,
+ disable/2,
+ query_resource/3,
+ all_resources/0,
+ all_given/0,
+ %% Locks
+ concurrency_limit/1,
+ set_concurrency_limit/2,
+ set_concurrency_limit/3,
+ concurrency_limit_reached/1,
+ get_lock/1,
+ get_lock/2,
+ get_lock/3,
+ lock_info/0,
+ lock_info/1,
+ lock_count/1,
+ all_locks/0,
+ locks_held/0,
+ locks_held/1,
+ %% Tokens
+ set_token_rate/2,
+ token_rate/1,
+ get_token/1,
+ get_token/2,
+ get_token/3,
+ token_info/0,
+ token_info/1,
+ all_tokens/0,
+ tokens_given/0,
+ tokens_given/1,
+ %% Testing
+ start/1
+ ]).
+
+%% reporting
+-export([clear_history/0,
+ head/0,
+ head/1,
+ head/2,
+ head/3,
+ tail/0,
+ tail/1,
+ tail/2,
+ tail/3,
+ ps/0,
+ ps/1
+ ]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-define(NOT_TRANSFERED(S), S#state.info_table == undefined orelse S#state.entry_table == undefined).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+%% @doc Starts the server
+-spec start_link() -> {ok, pid()} | ignore | {error, term}.
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+%% Test entry point to start stand-alone server
+start(Interval) ->
+ gen_server:start({local, ?SERVER}, ?MODULE, [Interval], []).
+
+%% @doc Global kill switch - causes all locks/tokens to be given out freely without limits.
+%% Nothing will be tracked or recorded.
+-spec bypass(boolean()) -> ok.
+bypass(Switch) ->
+ gen_server:cast(?SERVER, {bypass, Switch}).
+
+%% @doc Return bypass state as boolean.
+-spec bypassed() -> boolean().
+bypassed() ->
+ gen_server:call(?SERVER, bypassed).
+
+%% @doc Enable handing out of all locks and tokens
+-spec enable() -> enabled | bypassed.
+enable() ->
+ gen_server:call(?SERVER, enable).
+
+%% @doc Disable handing out of all locks and tokens
+-spec disable() -> disabled | bypassed.
+disable() ->
+ gen_server:call(?SERVER, disable).
+
+%% @doc Return global enabled status.
+-spec enabled() -> enabled | disabled | bypassed.
+enabled() ->
+ gen_server:call(?SERVER, enabled).
+
+%% @doc Enable handing out resources of the kind specified. If the resource
+%% has not already been registered, this will have no effect.
+-spec enable(bg_resource()) -> enabled | unregistered | bypassed.
+enable(Resource) ->
+ gen_server:call(?SERVER, {enable, Resource}).
+
+%% @doc Disable handing out resource of the given kind.
+-spec disable(bg_resource()) -> disabled | unregistered | bypassed.
+disable(Resource) ->
+ gen_server:call(?SERVER, {disable, Resource}).
+
+-spec enabled(bg_resource()) -> enabled | disabled | bypassed.
+enabled(Resource) ->
+ gen_server:call(?SERVER, {enabled, Resource}).
+
+%% @doc Disable handing out resource of the given kind. If kill == true,
+%% processes that currently hold the given resource will be killed.
+-spec disable(bg_resource(), boolean()) -> disabled | unregistered | bypassed.
+disable(Resource, Kill) ->
+ gen_server:call(?SERVER, {disable, Resource, Kill}).
+
+%% @doc Query the current set of registered resources by name, states, and types.
+%% The special atom 'all' querys all resources. A list of states and a list
+%% of types allows selective query.
+-spec query_resource(bg_resource() | all, [bg_state()], [bg_resource_type()]) -> [bg_stat_live()].
+query_resource(Resource, States, Types) ->
+ gen_server:call(?SERVER, {query_resource, Resource, States, Types}, infinity).
+
+%% @doc Get a list of all resources of all types in all states
+-spec all_resources() -> [bg_stat_live()].
+all_resources() ->
+ query_resource(all, [given], [token, lock]).
+
+%% @doc Get a list of all resources of all kinds in the given state
+-spec all_given() -> [bg_stat_live()].
+all_given() ->
+ query_resource(all, [given], [token, lock]).
+
+%%%%%%%%%%%
+%% Lock API
+%%%%%%%%%%%
+
+%% @doc Get the current maximum concurrency for the given lock type.
+-spec concurrency_limit(bg_lock()) -> bg_concurrency_limit().
+concurrency_limit(Lock) ->
+ gen_server:call(?MODULE, {concurrency_limit, Lock}, infinity).
+
+%% @doc same as `set_concurrency_limit(Type, Limit, false)'
+-spec set_concurrency_limit(bg_lock(), bg_concurrency_limit()) -> bg_concurrency_limit().
+set_concurrency_limit(Lock, Limit) ->
+ set_concurrency_limit(Lock, Limit, false).
+
+%% @doc Set a new maximum concurrency for the given lock type and return
+%% the previous maximum or default. If more locks are held than the new
+%% limit how they are handled depends on the value of `Kill'. If `true',
+%% then the extra locks are released by killing processes with reason `max_concurrency'.
+%% If `false', then the processes holding the extra locks are aloud to do so until they
+%% are released.
+-spec set_concurrency_limit(bg_lock(), bg_concurrency_limit(), boolean()) -> bg_concurrency_limit().
+set_concurrency_limit(Lock, Limit, Kill) ->
+ gen_server:call(?MODULE, {set_concurrency_limit, Lock, Limit, Kill}, infinity).
+
+%% @doc Returns true if the number of held locks is at the limit for the given lock type
+-spec concurrency_limit_reached(bg_lock()) -> boolean().
+concurrency_limit_reached(Lock) ->
+ gen_server:call(?MODULE, {lock_limit_reached, Lock}, infinity).
+
+%% @doc Acquire a concurrency lock of the given name, if available,
+%% and associate the lock with the calling process. Returns the
+%% reference to the monitored process or max_concurrency.
+-spec get_lock(bg_lock()) -> {ok, reference()} | max_concurrency.
+get_lock(Lock) ->
+ get_lock(Lock, self()).
+
+%% @doc Acquire a concurrency lock, if available, and associate the
+%% lock with the provided pid or metadata. If metadata
+%% is provided the lock is associated with the calling process
+%% If no locks are available, max_concurrency is returned.
+-spec get_lock(bg_lock(), pid() | [{atom(), any()}]) -> {ok, reference()} | max_concurrency.
+get_lock(Lock, Pid) when is_pid(Pid) ->
+ get_lock(Lock, Pid, []);
+get_lock(Lock, Opts) when is_list(Opts)->
+ get_lock(Lock, self(), Opts).
+
+%% @doc Acquire a concurrency lock, if available, and associate
+%% the lock with the provided pid and metadata.
+-spec get_lock(bg_lock(), pid(), [{atom(), any()}]) -> {ok, reference()} | max_concurrency.
+get_lock(Lock, Pid, Meta) ->
+ gen_server:call(?MODULE, {get_lock, Lock, Pid, Meta}, infinity).
+
+%% @doc Return the current concurrency count of the given lock type.
+-spec lock_count(bg_lock()) -> integer() | unregistered.
+lock_count(Lock) ->
+ gen_server:call(?MODULE, {lock_count, Lock}, infinity).
+
+%% @doc Return list of lock types and associated info. To be returned in this list
+%% a lock type must have had its concurrency set or have been enabled/disabled.
+-spec lock_info() -> [{bg_lock(), boolean(), bg_concurrency_limit()}].
+lock_info() ->
+ gen_server:call(?MODULE, lock_info, infinity).
+
+%% @doc Return the registration info for the named Lock
+-spec lock_info(bg_lock()) -> {boolean(), bg_concurrency_limit()} | unregistered.
+lock_info(Lock) ->
+ gen_server:call(?MODULE, {lock_info, Lock}, infinity).
+
+%% @doc Returns all locks.
+-spec all_locks() -> [bg_stat_live()].
+all_locks() ->
+ query_resource(all, [given], [lock]).
+
+
+%% @doc Returns all currently held locks or those that match Lock
+-spec locks_held() -> [bg_stat_live()].
+locks_held() ->
+ locks_held(all).
+
+-spec locks_held(bg_lock() | all) -> [bg_stat_live()].
+locks_held(Lock) ->
+ query_resource(Lock, [given], [lock]).
+
+%%%%%%%%%%%%
+%% Token API
+%%%%%%%%%%%%
+
+%% @doc Set the refill rate of tokens. Return previous value.
+-spec set_token_rate(bg_token(), bg_rate()) -> bg_rate().
+set_token_rate(_Token, undefined) -> undefined;
+set_token_rate(Token, Rate={_Period, _Count}) ->
+ gen_server:call(?SERVER, {set_token_rate, Token, Rate}, infinity).
+
+%% @doc Get the current refill rate of named token.
+-spec token_rate(bg_token()) -> bg_rate().
+token_rate(Token) ->
+ gen_server:call(?SERVER, {token_rate, Token}, infinity).
+
+%% @doc Get a token without blocking.
+%% Associate token with provided pid or metadata. If metadata
+%% is provided the lock is associated with the calling process.
+%% Returns "max_concurrency" if empty.
+-spec get_token(bg_token(), pid() | [{atom(), any()}]) -> ok | max_concurrency.
+get_token(Token, Pid) when is_pid(Pid) ->
+ get_token(Token, Pid, []);
+get_token(Token, Meta) ->
+ get_token(Token, self(), Meta).
+
+-spec get_token(bg_token()) -> ok | max_concurrency.
+get_token(Token) ->
+ get_token(Token, self()).
+
+-spec get_token(bg_token(), pid(), [{atom(), any()}]) -> ok | max_concurrency.
+get_token(Token, Pid, Meta) ->
+ gen_server:call(?SERVER, {get_token, Token, Pid, Meta}, infinity).
+
+%% @doc Return list of token kinds and associated info. To be returned in this list
+%% a token must have had its rate set.
+-spec token_info() -> [{bg_token(), boolean(), bg_rate()}].
+token_info() ->
+ gen_server:call(?MODULE, token_info, infinity).
+
+%% @doc Return the registration info for the named Token
+-spec token_info(bg_token()) -> {boolean(), bg_rate()}.
+token_info(Token) ->
+ gen_server:call(?MODULE, {token_info, Token}, infinity).
+
+-spec all_tokens() -> [bg_stat_live()].
+all_tokens() ->
+ query_resource(all, [given], [token]).
+
+%% @doc Get a list of token resources in the given state.
+tokens_given() ->
+ tokens_given(all).
+-spec tokens_given(bg_token() | all) -> [bg_stat_live()].
+tokens_given(Token) ->
+ query_resource(Token, [given], [token]).
+
+%% Stats/Reporting
+
+clear_history() ->
+ gen_server:cast(?SERVER, clear_history).
+
+%% List history of token manager
+%% @doc show history of token request/grants over default and custom intervals.
+%% offset is forwards-relative to the oldest sample interval
+-spec head() -> [[bg_stat_hist()]].
+head() ->
+ head(all).
+-spec head(bg_token()) -> [[bg_stat_hist()]].
+head(Token) ->
+ head(Token, ?BG_DEFAULT_OUTPUT_SAMPLES).
+-spec head(bg_token(), non_neg_integer()) -> [[bg_stat_hist()]].
+head(Token, NumSamples) ->
+ head(Token, 0, NumSamples).
+-spec head(bg_token(), non_neg_integer(), bg_count()) -> [[bg_stat_hist()]].
+head(Token, Offset, NumSamples) ->
+ gen_server:call(?SERVER, {head, Token, Offset, NumSamples}, infinity).
+
+%% @doc return history of token request/grants over default and custom intervals.
+%% offset is backwards-relative to the newest sample interval
+-spec tail() -> [[bg_stat_hist()]].
+tail() ->
+ tail(all).
+-spec tail(bg_token()) -> [[bg_stat_hist()]].
+tail(Token) ->
+ tail(Token, ?BG_DEFAULT_OUTPUT_SAMPLES).
+-spec tail(bg_token(), bg_count()) -> [[bg_stat_hist()]].
+tail(Token, NumSamples) ->
+ tail(Token, NumSamples, NumSamples).
+-spec tail(bg_token(), bg_count(), bg_count()) -> [[bg_stat_hist()]].
+tail(Token, Offset, NumSamples) ->
+ gen_server:call(?SERVER, {tail, Token, Offset, NumSamples}, infinity).
+
+%% @doc List most recent requests/grants for all tokens and locks
+-spec ps() -> [bg_stat_live()].
+ps() ->
+ ps(all).
+%% @doc List most recent requests/grants for named resource or one of
+%% either 'token' or 'lock'. The later two options will list all
+%% resources of that type in the given/locked.
+-spec ps(bg_resource() | token | lock) -> [bg_stat_live()].
+ps(Arg) ->
+ gen_server:call(?SERVER, {ps, Arg}, infinity).
+
+%%%===================================================================
+%%% Data Structures
+%%%===================================================================
+
+-type bg_limit() :: bg_concurrency_limit() | bg_rate().
+
+%% General settings of a lock type.
+-record(resource_info,
+ {type :: bg_resource_type(),
+ limit :: bg_limit(),
+ enabled :: boolean()}).
+
+-define(resource_type(X), (X)#resource_info.type).
+-define(resource_limit(X), (X)#resource_info.limit).
+-define(resource_enabled(X), (X)#resource_info.enabled).
+
+-define(DEFAULT_CONCURRENCY, 0). %% DO NOT CHANGE. DEFAULT SET TO 0 TO ENFORCE "REGISTRATION"
+-define(DEFAULT_RATE, undefined).%% DO NOT CHANGE. DEFAULT SET TO 0 TO ENFORCE "REGISTRATION"
+-define(DEFAULT_LOCK_INFO, #resource_info{type=lock, enabled=true, limit=?DEFAULT_CONCURRENCY}).
+-define(DEFAULT_TOKEN_INFO, #resource_info{type= token, enabled=true, limit=?DEFAULT_RATE}).
+
+%% An instance of a resource entry in "given"
+-record(resource_entry,
+ {resource :: bg_resource(),
+ type :: bg_resource_type(),
+ pid :: pid(), %% owning process
+ meta :: bg_meta(), %% associated metadata
+ ref :: reference(), %% optional monitor reference to owning process
+ state :: bg_state() %% state of item on given
+ }).
+
+-define(RESOURCE_ENTRY(Resource, Type, Pid, Meta, Ref, State),
+ #resource_entry{resource=Resource, type=Type, pid=Pid, meta=Meta, ref=Ref, state=State}).
+-define(e_resource(X), (X)#resource_entry.resource).
+-define(e_type(X), (X)#resource_entry.type).
+-define(e_pid(X), (X)#resource_entry.pid).
+-define(e_meta(X), (X)#resource_entry.meta).
+-define(e_ref(X), (X)#resource_entry.ref).
+-define(e_state(X), (X)#resource_entry.state).
+
+%%%
+%%% Gen Server State record
+%%%
+
+-record(state,
+ {info_table:: ets:tid(), %% TableID of ?BG_INFO_ETS_TABLE
+ entry_table:: ets:tid(), %% TableID of ?BG_ENTRY_ETS_TABLE
+ %% NOTE: None of the following data is persisted across process crashes.
+ enabled :: boolean(), %% Global enable/disable switch, true at startup
+ bypassed:: boolean(), %% Global kill switch. false at startup
+ %% stats
+ window :: orddict:orddict(), %% bg_resource() -> bg_stat_hist()
+ history :: queue(), %% bg_resource() -> queue of bg_stat_hist()
+ window_interval :: bg_period(), %% history window size in milliseconds
+ window_tref :: reference() %% reference to history window sampler timer
+ }).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+%% @private
+%% @doc Initializes the server
+-spec init([]) -> {ok, #state{}} |
+ {ok, #state{}, non_neg_integer() | infinity} |
+ ignore |
+ {stop, term()}.
+init([]) ->
+ init([?BG_DEFAULT_WINDOW_INTERVAL]);
+init([Interval]) ->
+ lager:debug("Background Manager starting up."),
+ %% Claiming a table will result in a handle_info('ETS-TRANSFER', ...) message.
+ %% We have two to claim...
+ ok = riak_core_table_manager:claim_table(?BG_INFO_ETS_TABLE),
+ ok = riak_core_table_manager:claim_table(?BG_ENTRY_ETS_TABLE),
+ State = #state{info_table=undefined, %% resolved in the ETS-TRANSFER handler
+ entry_table=undefined, %% resolved in the ETS-TRANSFER handler
+ window=orddict:new(),
+ enabled=true,
+ bypassed=false,
+ window_interval=Interval,
+ history=queue:new()},
+ State2 = schedule_sample_history(State),
+ {ok, State2}.
+
+%% @private
+%% @doc Handling call messages
+-spec handle_call(term(), {pid(), term()}, #state{}) ->
+ {reply, term(), #state{}} |
+ {reply, term(), #state{}, non_neg_integer()} |
+ {noreply, #state{}} |
+ {noreply, #state{}, non_neg_integer()} |
+ {stop, term(), term(), #state{}} |
+ {stop, term(), #state{}}.
+
+handle_call(bypassed, _From, State=#state{bypassed=Bypassed}) ->
+ {reply, Bypassed, State};
+handle_call({enabled, Resource}, _From, State) ->
+ do_handle_call_exception(fun do_enabled/2, [Resource, State], State);
+handle_call({enable, Resource}, _From, State) ->
+ do_handle_call_exception(fun do_enable_resource/3, [Resource, true, State], State);
+handle_call({disable, Resource}, _From, State) ->
+ do_handle_call_exception(fun do_enable_resource/3, [Resource, false, State], State);
+handle_call({disable, Lock, Kill}, _From, State) ->
+ do_handle_call_exception(fun do_disable_lock/3, [Lock, Kill, State], State);
+handle_call(enabled, _From, State) ->
+ {reply, status_of(true, State), State};
+handle_call(enable, _From, State) ->
+ State2 = update_enabled(true, State),
+ {reply, status_of(true, State2), State2};
+handle_call(disable, _From, State) ->
+ State2 = update_enabled(false, State),
+ {reply, status_of(true, State2), State2};
+handle_call({query_resource, Resource, States, Types}, _From, State) ->
+ Result = do_query(Resource, States, Types, State),
+ {reply, Result, State};
+handle_call({get_lock, Lock, Pid, Meta}, _From, State) ->
+ do_handle_call_exception(fun do_get_resource/5, [Lock, lock, Pid, Meta, State], State);
+handle_call({lock_count, Lock}, _From, State) ->
+ {reply, held_count(Lock, State), State};
+handle_call({lock_limit_reached, Lock}, _From, State) ->
+ do_handle_call_exception(fun do_lock_limit_reached/2, [Lock, State], State);
+handle_call(lock_info, _From, State) ->
+ do_handle_call_exception(fun do_get_type_info/2, [lock, State], State);
+handle_call({lock_info, Lock}, _From, State) ->
+ do_handle_call_exception(fun do_resource_info/2, [Lock, State], State);
+handle_call({concurrency_limit, Lock}, _From, State) ->
+ do_handle_call_exception(fun do_resource_limit/3, [lock, Lock, State], State);
+handle_call({set_concurrency_limit, Lock, Limit, Kill}, _From, State) ->
+ do_set_concurrency_limit(Lock, Limit, Kill, State);
+handle_call({token_rate, Token}, _From, State) ->
+ do_handle_call_exception(fun do_resource_limit/3, [token, Token, State], State);
+handle_call(token_info, _From, State) ->
+ do_handle_call_exception(fun do_get_type_info/2, [token, State], State);
+handle_call({token_info, Token}, _From, State) ->
+ do_handle_call_exception(fun do_resource_info/2, [Token, State], State);
+handle_call({set_token_rate, Token, Rate}, _From, State) ->
+ do_handle_call_exception(fun do_set_token_rate/3, [Token, Rate, State], State);
+handle_call({get_token, Token, Pid, Meta}, _From, State) ->
+ do_handle_call_exception(fun do_get_resource/5, [Token, token, Pid, Meta, State], State);
+handle_call({head, Token, Offset, Count}, _From, State) ->
+ Result = do_hist(head, Token, Offset, Count, State),
+ {reply, Result, State};
+handle_call({tail, Token, Offset, Count}, _From, State) ->
+ Result = do_hist(tail, Token, Offset, Count, State),
+ {reply, Result, State};
+handle_call({ps, lock}, _From, State) ->
+ Result = do_query(all, [given], [lock], State),
+ {reply, Result, State};
+handle_call({ps, token}, _From, State) ->
+ Result = do_query(all, [given], [token], State),
+ {reply, Result, State};
+handle_call({ps, Resource}, _From, State) ->
+ Result = do_query(Resource, [given], [token, lock], State),
+ {reply, Result, State}.
+
+%% @private
+%% @doc Handling cast messages
+-spec handle_cast(term(), #state{}) -> {noreply, #state{}} |
+ {noreply, #state{}, non_neg_integer()} |
+ {stop, term(), #state{}}.
+handle_cast({bypass, false}, State) ->
+ {noreply, update_bypassed(false,State)};
+handle_cast({bypass, true}, State) ->
+ {noreply, update_bypassed(true,State)};
+handle_cast({bypass, _Other}, State) ->
+ {noreply, State};
+handle_cast(clear_history, State) ->
+ State2 = do_clear_history(State),
+ {noreply, State2}.
+
+%% @private
+%% @doc Handling all non call/cast messages
+-spec handle_info(term(), #state{}) -> {noreply, #state{}} |
+ {noreply, #state{}, non_neg_integer()} |
+ {stop, term(), #state{}}.
+%% Handle transfer of ETS table from table manager
+handle_info({'ETS-TRANSFER', TableId, Pid, TableName}, State) ->
+ lager:debug("table_mgr (~p) -> bg_mgr (~p) receiving ownership of TableId: ~p", [Pid, self(), TableId]),
+ State2 = case TableName of
+ ?BG_INFO_ETS_TABLE -> State#state{info_table=TableId};
+ ?BG_ENTRY_ETS_TABLE -> State#state{entry_table=TableId}
+ end,
+ case {State2#state.info_table, State2#state.entry_table} of
+ {undefined, _E} -> {noreply, State2};
+ {_I, undefined} -> {noreply, State2};
+ {_I, _E} ->
+ %% Got both tables, we can proceed with reviving ourself
+ State3 = validate_holds(State2),
+ State4 = restore_enabled(true, State3),
+ State5 = restore_bypassed(false, State4),
+ reschedule_token_refills(State5),
+ {noreply, State5}
+ end;
+handle_info({'DOWN', Ref, _, _, _}, State) ->
+ State2 = release_resource(Ref, State),
+ {noreply, State2};
+handle_info(sample_history, State) ->
+ State2 = schedule_sample_history(State),
+ State3 = do_sample_history(State2),
+ {noreply, State3};
+handle_info({refill_tokens, Type}, State) ->
+ State2 = do_refill_tokens(Type, State),
+ schedule_refill_tokens(Type, State2),
+ {noreply, State2};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%% @private
+%% @doc
+%% This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_server terminates
+%% with Reason. The return value is ignored.
+-spec terminate(term(), #state{}) -> term().
+terminate(_Reason, _State) ->
+ ok.
+
+%% @private
+%% @doc Convert process state when code is changed
+-spec code_change(term() | {down, term()}, #state{}, term()) -> {ok, #state{}}.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+%% @doc bypass > enable/disable
+status_of(_Enabled, #state{bypassed=true}) -> bypassed;
+status_of(true, #state{enabled=true}) -> enabled;
+status_of(_E,_S) -> disabled.
+
+%% @private
+%% @doc We must have just gotten the table data back after a crash/restart.
+%% Walk through the given resources and release any holds by dead processes.
+%% Assumes TableId is always valid (called only after transfer)
+validate_holds(State=#state{entry_table=TableId}) ->
+ [validate_hold(Obj, TableId) || Obj <- ets:match_object(TableId, {{given, '_'},'_'})],
+ State.
+
+%% @private
+%% @doc If the given entry has no alive process associated with it,
+%% remove the hold from the ETS table. If it is alive, then we need
+%% to re-monitor it and update the table with the new ref.
+validate_hold({Key,Entry}=Obj, TableId) when ?e_type(Entry) == lock ->
+ case is_process_alive(?e_pid(Entry)) of
+ true ->
+ %% Still alive. Re-monitor and update table
+ Ref = monitor(process, ?e_pid(Entry)),
+ Entry2 = Entry#resource_entry{ref=Ref},
+ ets:delete_object(TableId, Obj),
+ ets:insert(TableId, {Key, Entry2});
+ false ->
+ %% Process is not alive - release the lock
+ ets:delete_object(TableId, Obj)
+ end;
+validate_hold(_Obj, _TableId) -> %% tokens don't monitor processes
+ ok.
+
+%% @doc Update state with bypassed status and store to ETS
+update_bypassed(_Bypassed, State) when ?NOT_TRANSFERED(State) ->
+ State;
+update_bypassed(Bypassed, State=#state{info_table=TableId}) ->
+ ets:insert(TableId, {bypassed, Bypassed}),
+ State#state{bypassed=Bypassed}.
+
+%% @doc Update state with enabled status and store to ETS
+update_enabled(_Enabled, State) when ?NOT_TRANSFERED(State) ->
+ State;
+update_enabled(Enabled, State=#state{info_table=TableId}) ->
+ ets:insert(TableId, {enabled, Enabled}),
+ State#state{enabled=Enabled}.
+
+%% Assumes tables have been transfered.
+restore_boolean(Key, Default, #state{info_table=TableId}) ->
+ case ets:lookup(TableId, Key) of
+ [] ->
+ ets:insert(TableId, {Key, Default}),
+ Default;
+ [{_Key,Value} | _Rest] ->
+ Value
+ end.
+
+%% Assumes tables have been transfered.
+restore_bypassed(Default, State) ->
+ State#state{bypassed=restore_boolean(bypassed, Default, State)}.
+
+%% Assumes tables have been transfered.
+restore_enabled(Default, State) ->
+ State#state{enabled=restore_boolean(enabled, Default, State)}.
+
+%% @private
+%% @doc Wrap a call, to a function with args, with a try/catch that handles
+%% thrown exceptions, namely '{unregistered, Resource}' and return the
+%% failed error response for a gen server call.
+do_handle_call_exception(Function, Args, State) ->
+ try apply(Function, Args)
+ catch
+ Error ->
+ lager:error("Exception: ~p in function ~p", [Error, Function]),
+ {reply, Error, State}
+ end.
+
+%% @doc Throws {unregistered, Resource} for unknown Lock.
+do_disable_lock(_Lock, _Kill, State) when ?NOT_TRANSFERED(State) ->
+ {noreply, State};
+do_disable_lock(Lock, Kill, State) ->
+ Info = resource_info(Lock, State),
+ enforce_type_or_throw(Lock, lock, Info),
+ maybe_honor_limit(Kill, Lock, 0, State),
+ do_enable_resource(Lock, false, State).
+
+%% @doc Throws unregistered for unknown Token
+do_set_token_rate(Token, Rate, State) ->
+ try
+ Info = resource_info(Token, State),
+ OldRate = Info#resource_info.limit,
+ enforce_type_or_throw(Token, token, Info),
+ State2 = update_limit(Token, Rate, Info, State),
+ schedule_refill_tokens(Token, State2),
+ {reply, OldRate, State2}
+ catch
+ table_id_undefined ->
+ %% This could go into a queue to be played when the transfer happens.
+ {reply, undefined, State};
+ {unregistered, Token} ->
+ {reply, undefined, update_limit(Token, Rate, ?DEFAULT_TOKEN_INFO, State)};
+ {badtype, _Token}=Error ->
+ {reply, Error, State}
+ end.
+
+do_get_type_info(_Type, State) when ?NOT_TRANSFERED(State) ->
+ %% Table not trasnferred yet.
+ [];
+do_get_type_info(Type, State) ->
+ S = fun({R,_T,E,L}) -> {R,E,L} end,
+ Resources = all_registered_resources(Type, State),
+ Infos = [S(resource_info_tuple(Resource, State)) || Resource <- Resources],
+ {reply, Infos, State}.
+
+%% Returns empty if the ETS table has not been transferred to us yet.
+do_resource_limit(lock, _Resource, State) when ?NOT_TRANSFERED(State) ->
+ {reply, 0, State};
+do_resource_limit(token, _Resource, State) when ?NOT_TRANSFERED(State) ->
+ {reply, {0,0}, State};
+do_resource_limit(_Type, Resource, State) ->
+ Info = resource_info(Resource, State),
+ Rate = ?resource_limit(Info),
+ {reply, Rate, State}.
+
+enforce_type_or_throw(Resource, Type, Info) ->
+ case ?resource_type(Info) of
+ Type -> ok;
+ _Other -> throw({badtype, Resource})
+ end.
+
+do_set_concurrency_limit(Lock, Limit, Kill, State) ->
+ try
+ Info = resource_info(Lock, State),
+ enforce_type_or_throw(Lock, lock, Info),
+ OldLimit = limit(Info),
+ State2 = update_limit(Lock, Limit, ?DEFAULT_LOCK_INFO, State),
+ maybe_honor_limit(Kill, Lock, Limit, State2),
+ {reply, OldLimit, State2}
+ catch
+ table_id_undefined ->
+ %% This could go into a queue to be played when the transfer happens.
+ {reply, 0, State};
+ {unregistered, Lock} ->
+ {reply, 0, update_limit(Lock, Limit, ?DEFAULT_LOCK_INFO, State)};
+ {badtype, _Lock}=Error ->
+ {reply, Error, State}
+ end.
+
+%% @doc Throws unregistered for unknown Lock
+do_resource_info(Lock, State) ->
+ {_R,_T,E,L} = resource_info_tuple(Lock, State),
+ {reply, {E,L}, State}.
+
+%% @doc Throws unregistered for unknown Lock
+do_lock_limit_reached(Lock, State) ->
+ Info = resource_info(Lock, State),
+ enforce_type_or_throw(Lock, lock, Info),
+ HeldCount = held_count(Lock, State),
+ Limit = limit(Info),
+ {reply, HeldCount >= Limit, State}.
+
+%% @private
+%% @doc Return the maximum allowed number of resources for the given
+%% info, which considers the type of resource, e.g. lock vs token.
+limit(#resource_info{type=lock, limit=Limit}) -> Limit;
+limit(#resource_info{type=token, limit={_Period,MaxCount}}) -> MaxCount.
+
+%% @private
+%% @doc Release the resource associated with the given reference. This is mostly
+%% meaningful for locks.
+release_resource(_Ref, State) when ?NOT_TRANSFERED(State) ->
+ State;
+release_resource(Ref, State=#state{entry_table=TableId}) ->
+ %% There should only be one instance of the object, but we'll zap all that match.
+ Given = [Obj || Obj <- ets:match_object(TableId, {{given, '_'},'_'})],
+ Matches = [Obj || {_Key,Entry}=Obj <- Given, ?e_ref(Entry) == Ref],
+ [ets:delete_object(TableId, Obj) || Obj <- Matches],
+ State.
+
+maybe_honor_limit(true, Lock, Limit, State) ->
+ Entries = all_given_entries(State),
+ Held = [Entry || Entry <- Entries, ?e_type(Entry) == lock, ?e_resource(Entry) == Lock],
+ case Limit < length(Held) of
+ true ->
+ {_Keep, Discards} = lists:split(Limit, Held),
+ %% killing of processes will generate 'DOWN' messages and release the locks
+ [erlang:exit(?e_pid(Discard), max_concurrency) || Discard <- Discards],
+ ok;
+ false ->
+ ok
+ end;
+maybe_honor_limit(false, _LockType, _Limit, _State) ->
+ ok.
+
+held_count(Resource, State) ->
+ length(resources_given(Resource, State)).
+
+do_enabled(Resource, State) ->
+ Info = resource_info(Resource, State),
+ {reply, status_of(?resource_enabled(Info), State), State}.
+
+do_enable_resource(Resource, Enabled, State) ->
+ Info = resource_info(Resource, State),
+ State2 = update_resource_enabled(Resource, Enabled, Info, State),
+ {reply, status_of(Enabled, State2), State2}.
+
+update_resource_enabled(Resource, Value, Default, State) ->
+ update_resource_info(Resource,
+ fun(Info) -> Info#resource_info{enabled=Value} end,
+ Default#resource_info{enabled=Value},
+ State).
+
+update_limit(Resource, Limit, Default, State) ->
+ update_resource_info(Resource,
+ fun(Info) -> Info#resource_info{limit=Limit} end,
+ Default#resource_info{limit=Limit},
+ State).
+
+update_resource_info(Resource, Fun, Default, State=#state{info_table=TableId}) ->
+ Key = {info, Resource},
+ NewInfo = case ets:lookup(TableId, Key) of
+ [] -> Default;
+ [{_Key,Info} | _Rest] -> Fun(Info)
+ end,
+ ets:insert(TableId, {Key, NewInfo}),
+ State.
+
+%% @doc Throws unregistered for unknown Resource
+resource_info(_Resource, State) when ?NOT_TRANSFERED(State) ->
+ throw(table_id_undefined);
+resource_info(Resource, #state{info_table=TableId}) ->
+ Key = {info,Resource},
+ case ets:lookup(TableId, Key) of
+ [] -> throw({unregistered, Resource});
+ [{_Key,Info}] -> Info;
+ [{_Key,_Info} | _Rest] -> throw({too_many_info_objects, Resource})
+ end.
+
+%% @doc Throws unregistered for unknown Resource
+resource_info_tuple(Resource, State) ->
+ Info = resource_info(Resource, State),
+ {Resource, ?resource_type(Info), ?resource_enabled(Info), ?resource_limit(Info)}.
+
+%% @private
+%% @doc
+%% Get existing token type info from ETS table and schedule all for refill.
+%% This is needed because we just reloaded our saved persisent state data
+%% after a crash. Assumes table is available. Called only after Transfer.
+reschedule_token_refills(State) ->
+ Tokens = all_registered_resources(token, State),
+ [schedule_refill_tokens(Token, State) || Token <- Tokens].
+
+%% Schedule a timer event to refill tokens of given type
+schedule_refill_tokens(_Token, State) when ?NOT_TRANSFERED(State) ->
+ ok;
+schedule_refill_tokens(Token, State) ->
+ case ?resource_limit(resource_info(Token, State)) of
+ undefined ->
+ ok;
+ {Period, _Count} ->
+ erlang:send_after(Period, self(), {refill_tokens, Token})
+ end.
+
+%% Schedule a timer event to snapshot the current history
+schedule_sample_history(State=#state{window_interval=Interval}) ->
+ TRef = erlang:send_after(Interval, self(), sample_history),
+ State#state{window_tref=TRef}.
+
+%% @doc Update the "limit" history stat for all registered resources into current window.
+update_stat_all_limits(State) ->
+ lists:foldl(fun({Resource, Info}, S) ->
+ increment_stat_limit(Resource, ?resource_limit(Info), S)
+ end,
+ State,
+ all_resource_info(State)).
+
+do_sample_history(State) ->
+ %% Update window with current limits before copying it
+ State2 = update_stat_all_limits(State),
+ %% Move the current window of measurements onto the history queues.
+ %% Trim queue down to ?BG_DEFAULT_KEPT_SAMPLES if too big now.
+ Queue2 = queue:in(State2#state.window, State2#state.history),
+ Trimmed = case queue:len(Queue2) > ?BG_DEFAULT_KEPT_SAMPLES of
+ true ->
+ {_Discarded, Rest} = queue:out(Queue2),
+ Rest;
+ false ->
+ Queue2
+ end,
+ EmptyWindow = orddict:new(),
+ State2#state{window=EmptyWindow, history=Trimmed}.
+
+update_stat_window(Resource, Fun, Default, State=#state{window=Window}) ->
+ NewWindow = orddict:update(Resource, Fun, Default, Window),
+ State#state{window=NewWindow}.
+
+resources_given(Resource, #state{entry_table=TableId}) ->
+ [Entry || {{given,_R},Entry} <- ets:match_object(TableId, {{given, Resource},'_'})].
+
+%% Key = {given, Resource},
+%% [Given || {_K,Given} <- ets:lookup(TableId, Key)].
+
+%% @private
+%% @doc Add a Resource Entry to the "given" table. Here, we really do want
+%% to allow multiple entries because each Resource "name" can be given multiple
+%% times.
+add_given_entry(Resource, Entry, TableId) ->
+ Key = {given, Resource},
+ ets:insert(TableId, {Key, Entry}).
+
+remove_given_entries(Resource, State=#state{entry_table=TableId}) ->
+ Key = {given, Resource},
+ ets:delete(TableId, Key),
+ State.
+
+%% @private
+%% @doc Add a resource queue entry to our given set.
+give_resource(Entry, State=#state{entry_table=TableId}) ->
+ Resource = ?e_resource(Entry),
+ Type = ?e_type(Entry),
+ add_given_entry(Resource, Entry#resource_entry{state=given}, TableId),
+ %% update given stats
+ increment_stat_given(Resource, Type, State).
+
+%% @private
+%% @doc Add Resource to our given set.
+give_resource(Resource, Type, Pid, Ref, Meta, State) ->
+ Entry = ?RESOURCE_ENTRY(Resource, Type, Pid, Meta, Ref, given),
+ give_resource(Entry, State).
+
+-spec try_get_resource(boolean(), bg_resource(), bg_resource_type(), pid(), [{atom(), any()}], #state{}) ->
+ {max_concurrency, #state{}}
+ | {ok, #state{}}
+ | {{ok, reference()}, #state{}}.
+try_get_resource(false, _Resource, _Type, _Pid, _Meta, State) ->
+ {max_concurrency, State};
+try_get_resource(true, Resource, Type, Pid, Meta, State) ->
+ case Type of
+ token ->
+ Ref = random_bogus_ref(),
+ {ok, give_resource(Resource, Type, Pid, Ref, Meta, State)};
+ lock ->
+ Ref = monitor(process, Pid),
+ {{ok,Ref}, give_resource(Resource, Type, Pid, Ref, Meta, State)}
+ end.
+
+%% @private
+%% @doc reply now if resource is available. Returns max_concurrency
+%% if resource not available or globally or specifically disabled.
+-spec do_get_resource(bg_resource(), bg_resource_type(), pid(), [{atom(), any()}], #state{}) ->
+ {reply, max_concurrency, #state{}}
+ | {reply, {ok, #state{}}}
+ | {reply, {{ok, reference()}, #state{}}}.
+do_get_resource(_Resource, _Type, _Pid, _Meta, State) when ?NOT_TRANSFERED(State) ->
+ %% Table transfer has not occurred yet. Reply "max_concurrency" so that callers
+ %% will try back later, hopefully when we have our table back.
+ {reply, max_concurrency, State};
+%% @doc When the API is bypassed, we ignore concurrency limits.
+do_get_resource(Resource, Type, Pid, Meta, State=#state{bypassed=true}) ->
+ {Result, State2} = try_get_resource(true, Resource, Type, Pid, Meta, State),
+ {reply, Result, State2};
+do_get_resource(_Resource, _Type, _Pid, _Meta, State=#state{enabled=false}) ->
+ {reply, max_concurrency, State};
+do_get_resource(Resource, Type, Pid, Meta, State) ->
+ Info = resource_info(Resource, State),
+ enforce_type_or_throw(Resource, Type, Info),
+ Enabled = ?resource_enabled(Info),
+ Limit = limit(Info),
+ Given = length(resources_given(Resource, State)),
+ {Result, State2} = try_get_resource(Enabled andalso not (Given >= Limit),
+ Resource, Type, Pid, Meta, State),
+ {reply, Result, State2}.
+
+%% @private
+%% @doc This should create a unique reference every time it's called; and should
+%% not repeat across process restarts since our ETS table lives across process
+%% lifetimes. This is needed to create unique entries in the "given" table.
+random_bogus_ref() ->
+ make_ref().
+
+all_resource_info(#state{info_table=TableId}) ->
+ [{Resource, Info} || {{info, Resource}, Info} <- ets:match_object(TableId, {{info, '_'},'_'})].
+
+all_registered_resources(Type, #state{info_table=TableId}) ->
+ [Resource || {{info, Resource}, Info} <- ets:match_object(TableId, {{info, '_'},'_'}),
+ ?resource_type(Info) == Type].
+
+all_given_entries(#state{entry_table=TableId}) ->
+ %% multiple entries per resource type, i.e. uses the "bag"
+ [Entry || {{given, _Resource}, Entry} <- ets:match_object(TableId, {{given, '_'},'_'})].
+
+format_entry(Entry) ->
+ #bg_stat_live
+ {
+ resource = ?e_resource(Entry),
+ type = ?e_type(Entry),
+ consumer = ?e_pid(Entry),
+ meta = ?e_meta(Entry),
+ state = ?e_state(Entry)
+ }.
+
+fmt_live_entries(Entries) ->
+ [format_entry(Entry) || Entry <- Entries].
+
+%% States :: [given], Types :: [lock | token]
+do_query(_Resource, _States, _Types, State) when ?NOT_TRANSFERED(State) ->
+ %% Table hasn't been transfered yet.
+ [];
+do_query(all, States, Types, State) ->
+ E1 = case lists:member(given, States) of
+ true ->
+ Entries = all_given_entries(State),
+ lists:flatten([Entry || Entry <- Entries,
+ lists:member(?e_type(Entry), Types)]);
+ false ->
+ []
+ end,
+ fmt_live_entries(E1);
+do_query(Resource, States, Types, State) ->
+ E1 = case lists:member(given, States) of
+ true ->
+ Entries = resources_given(Resource, State),
+ [Entry || Entry <- Entries, lists:member(?e_type(Entry), Types)];
+ false ->
+ []
+ end,
+ fmt_live_entries(E1).
+
+%% @private
+%% @doc Token refill timer event handler.
+%% Capture stats of what was given in the previous period,
+%% Clear all tokens of this type from the given set,
+do_refill_tokens(Token, State) ->
+ State2 = increment_stat_refills(Token, State),
+ remove_given_entries(Token, State2).
+
+default_refill(Token, State) ->
+ Limit = limit(resource_info(Token, State)),
+ ?BG_DEFAULT_STAT_HIST#bg_stat_hist{type=token, refills=1, limit=Limit}.
+
+default_given(Token, Type, State) ->
+ Limit = limit(resource_info(Token, State)),
+ ?BG_DEFAULT_STAT_HIST#bg_stat_hist{type=Type, given=1, limit=Limit}.
+
+increment_stat_limit(_Resource, undefined, State) ->
+ State;
+increment_stat_limit(Resource, Limit, State) ->
+ {Type, Count} = case Limit of
+ {_Period, C} -> {token, C};
+ N -> {lock, N}
+ end,
+ update_stat_window(Resource,
+ fun(Stat) -> Stat#bg_stat_hist{limit=Count} end,
+ ?BG_DEFAULT_STAT_HIST#bg_stat_hist{type=Type, limit=Count},
+ State).
+
+increment_stat_refills(Token, State) ->
+ update_stat_window(Token,
+ fun(Stat) -> Stat#bg_stat_hist{refills=1+Stat#bg_stat_hist.refills} end,
+ default_refill(Token, State),
+ State).
+
+increment_stat_given(Token, Type, State) ->
+ update_stat_window(Token,
+ fun(Stat) -> Stat#bg_stat_hist{given=1+Stat#bg_stat_hist.given} end,
+ default_given(Token, Type, State),
+ State).
+
+%% erase saved history
+do_clear_history(State=#state{window_tref=TRef}) ->
+ erlang:cancel_timer(TRef),
+ State2 = State#state{history=queue:new()},
+ schedule_sample_history(State2).
+
+%% Return stats history from head or tail of stats history queue
+do_hist(_End, _Resource, _Offset, _Count, State) when ?NOT_TRANSFERED(State) ->
+ [];
+do_hist(End, Resource, Offset, Count, State) when Offset < 0 ->
+ do_hist(End, Resource, 0, Count, State);
+do_hist(_End, _Resource, _Offset, Count, _State) when Count =< 0 ->
+ [];
+do_hist(End, Resource, Offset, Count, #state{history=HistQueue}) ->
+ QLen = queue:len(HistQueue),
+ First = max(1, case End of
+ head -> min(Offset+1, QLen);
+ tail -> QLen - Offset + 1
+ end),
+ Last = min(QLen, max(First + Count - 1, 1)),
+ H = case segment_queue(First, Last, HistQueue) of
+ empty -> [];
+ {ok, Hist } ->
+ case Resource of
+ all ->
+ StatsDictList = queue:to_list(Hist),
+ [orddict:to_list(Stat) || Stat <- StatsDictList];
+ _T ->
+ [[{Resource, stat_window(Resource, StatsDict)}]
+ || StatsDict <- queue:to_list(Hist), stat_window(Resource, StatsDict) =/= undefined]
+ end
+ end,
+ %% Remove empty windows
+ lists:filter(fun(S) -> S =/= [] end, H).
+
+segment_queue(First, Last, _Q) when Last < First ->
+ empty;
+segment_queue(First, Last, Queue) ->
+ QLen = queue:len(Queue),
+ case QLen >= Last andalso QLen > 0 of
+ true ->
+ %% trim off extra tail, then trim head
+ Front = case QLen == Last of
+ true -> Queue;
+ false ->
+ {QFirst, _QRest} = queue:split(Last, Queue),
+ QFirst
+ end,
+ case First == 1 of
+ true -> {ok, Front};
+ false ->
+ {_Skip, Back} = queue:split(First-1, Front),
+ {ok, Back}
+ end;
+ false ->
+ %% empty
+ empty
+ end.
+
+%% @private
+%% @doc Get stat history for given token type from sample set
+-spec stat_window(bg_resource(), orddict:orddict()) -> bg_stat_hist().
+stat_window(Resource, Window) ->
+ case orddict:find(Resource, Window) of
+ error -> undefined;
+ {ok, StatHist} -> StatHist
+ end.
View
18 src/riak_core_sup.erl
@@ -2,7 +2,7 @@
%%
%% riak_core: Core Riak Application
%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
+%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
@@ -24,6 +24,8 @@
-behaviour(supervisor).
+-include("riak_core_bg_manager.hrl").
+
%% API
-export([start_link/0]).
-export([ensembles_enabled/0]).
@@ -32,9 +34,17 @@
-export([init/1]).
%% Helper macro for declaring children of supervisor
--define(CHILD(I, Type, Timeout), {I, {I, start_link, []}, permanent, Timeout, Type, [I]}).
+-define(CHILD(I, Type, Timeout, Args), {I, {I, start_link, Args}, permanent, Timeout, Type, [I]}).
+-define(CHILD(I, Type, Timeout), ?CHILD(I, Type, Timeout, [])).
-define(CHILD(I, Type), ?CHILD(I, Type, 5000)).
+%% ETS tables to be created and maintained by riak_core_table_manager, which is not linked
+%% to any processes except this supervisor. Please keep it that way so tables don't get lost
+%% when their user processes crash. Implement ETS-TRANSFER handler for user processes.
+-define(TBL_MGR_ARGS, [{?BG_INFO_ETS_TABLE, ?BG_INFO_ETS_OPTS},
+ {?BG_ENTRY_ETS_TABLE, ?BG_ENTRY_ETS_OPTS}
+ ]).
+
%% ===================================================================
%% API functions
%% ===================================================================
@@ -56,7 +66,9 @@ init([]) ->
permanent, 30000, supervisor, [riak_ensemble_sup]},
Children = lists:flatten(
- [?CHILD(riak_core_sysmon_minder, worker),
+ [?CHILD(riak_core_table_manager, worker, 5000, [?TBL_MGR_ARGS]),
+ ?CHILD(riak_core_bg_manager, worker),
+ ?CHILD(riak_core_sysmon_minder, worker),
?CHILD(riak_core_vnode_sup, supervisor, 305000),
?CHILD(riak_core_eventhandler_sup, supervisor),
[?CHILD(riak_core_dist_mon, worker) || DistMonEnabled],
View
187 src/riak_core_table_manager.erl
@@ -0,0 +1,187 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_core_table_manager: ETS table ownership and crash protection
+%%
+%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc A gen_server process that creates and serves as heir to a
+%% ETS table; and coordinates with matching user processes. If a user
+%% process exits, this server will inherit the ETS table and hand it
+%% back to the user process when it restarts.
+%%
+%% For theory of operation, please see the web page:
+%% http://steve.vinoski.net/blog/2011/03/23/dont-lose-your-ets-tables/
+
+-module(riak_core_table_manager).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/1,
+ claim_table/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, {tables}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts the server
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec start_link([term()]) -> {ok, Pid::pid()} | ignore | {error, Error::term()}.
+start_link(TableSpecs) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [TableSpecs], []).
+
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Gives the registration table away to the caller, which should be
+%% the registrar process.
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec claim_table(atom()) -> ok.
+claim_table(TableName) ->
+ gen_server:call(?SERVER, {claim_table, TableName}, infinity).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Initializes the server
+%% @end
+%%--------------------------------------------------------------------
+-spec init([term()]) -> {ok, undefined}.
+%% Tables :: [{TableName, [props]}]
+%% Table specs are provided by the process that creates this table manager,
+%% presumably a supervisor such as riak_core_sup.
+init([TableSpecs]) ->
+ lager:debug("Table Manager starting up with tables: ~p", [TableSpecs]),
+ Tables = lists:foldl(fun(Spec, TT) -> create_table(Spec, TT) end, [], TableSpecs),
+ {ok, #state{tables=Tables}}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling call messages
+%% @end
+%%--------------------------------------------------------------------
+-spec handle_call(Msg::term(), From::{pid(), term()}, State::term()) ->
+ {reply, Reply::term(), State::term()} |
+ {noreply, State::term()}.
+%% TableName :: atom()
+handle_call({claim_table, TableName}, {Pid, _Tag}, State) ->
+ %% The user process is (re-)claiming the table. Give it away.
+ %% We remain the heir in case the user process exits.
+ case lookup_table(TableName, State) of
+ undefined ->
+ %% Table does not exist, which is madness.
+ {reply, {undefined_table, TableName}, State};
+ TableId ->
+ lager:debug("Giving away table ~p (~p) to ~p", [TableName, TableId, Pid]),
+ ets:give_away(TableId, Pid, TableName),
+ Reply = ok,
+ {reply, Reply, State}
+ end;
+
+handle_call(_Msg, _From, State) ->
+ {noreply, State}.
+
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling cast messages
+%% @end
+%%--------------------------------------------------------------------
+-spec handle_cast(term(), term()) -> {noreply, State::term()}.
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling all non call/cast messages
+%%
+%% @spec handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_info({'ETS-TRANSFER', TableId, FromPid, _HeirData}, State) ->
+ %% The table's user process exited and transferred the table back to us.
+ lager:debug("Table user process ~p exited, ~p received table ~p", [FromPid, self(), TableId]),
+ {noreply, State};
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_server terminates
+%% with Reason. The return value is ignored.
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec terminate(term(), term()) -> ok.
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Convert process state when code is changed
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec code_change(term(), term(), term()) -> {ok, term()}.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+%% Create the initial table based on a table name and properties.
+%% The table will eventually be given away by claim_table, but we
+%% want to remain the heir in case the claimer crashes.
+create_table({TableName, TableProps}, Tables) ->
+ TableId = ets:new(TableName, TableProps),
+ ets:setopts(TableId, [{heir, self(), undefined}]),
+ [{TableName, TableId} | Tables].
+
+-spec lookup_table(TableName::atom(), State::term()) -> ets:tid() | undefined.
+lookup_table(TableName, #state{tables=Tables}) ->
+ proplists:get_value(TableName, Tables).
View
1,003 test/bg_manager_eqc.erl
@@ -0,0 +1,1003 @@
+%%% @author Jordan West <>
+%%% @copyright (C) 2013, Jordan West
+%%% @doc
+%%%
+%%% @end
+%%% Created : 13 Nov 2013 by Jordan West <>
+
+-module(bg_manager_eqc).
+
+%%-ifdef(TEST).
+%% -ifdef(EQC).
+
+-include("riak_core_bg_manager.hrl").
+-include_lib("eqc/include/eqc.hrl").
+-include_lib("eqc/include/eqc_statem.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+-type bg_eqc_type() :: atom().
+-type bg_eqc_limit() :: non_neg_integer().
+
+-record(state,{
+ %% whether or not the bgmgr is running
+ alive :: boolean(),
+ %% whether or not the global bypass switch is engaged
+ bypassed :: boolean(),
+ %% whether or not the global enable switch is engaged
+ enabled :: boolean(),
+ %% processes started by the test and the processes state
+ procs :: [{pid(), running | not_running}],
+ %% resources that are disabled are on the list
+ disabled :: [{bg_eqc_type()}],
+ %% concurrency limits for lock types
+ limits :: [{bg_eqc_type(), bg_eqc_limit()}],
+ %% max counts per "period" for token types
+ counts :: [{bg_eqc_type(), bg_eqc_limit()}],
+ %% locks held (or once held then released) for each lock type
+ %% and their state
+ locks :: [{bg_eqc_type(), [{reference(), pid(), [], held | released}]}],
+ %% number of tokens taken by type
+ tokens :: [{bg_eqc_type(), non_neg_integer()}],
+ %% current history samples accumulator: [{resource, limit, refills, given}]
+ samples :: [{bg_eqc_type(), non_neg_integer(), non_neg_integer(), non_neg_integer()}],
+ %% snapshot of samples on window interval
+ history :: [[{bg_eqc_type(), non_neg_integer(), non_neg_integer(), non_neg_integer()}]]
+ }).
+
+run_eqc() ->
+ run_eqc(100).
+
+run_eqc(Type) when is_atom(Type) ->
+ run_eqc(100, Type);
+run_eqc(N) ->
+ run_eqc(N, simple).
+
+run_eqc(N, simple) ->
+ run_eqc(N, prop_bgmgr());
+run_eqc(N, para) ->
+ run_eqc(N, parallel);
+run_eqc(N, parallel) ->
+ run_eqc(N, prop_bgmgr_parallel());
+run_eqc(N, Prop) ->
+ eqc:quickcheck(eqc:numtests(N, Prop)).
+
+run_check() ->
+ eqc:check(prop_bgmgr()).
+
+run_recheck() ->
+ eqc:recheck(prop_bgmgr()).
+
+
+%% @doc Returns the state in which each test case starts. (Unless a different
+%% initial state is supplied explicitly to, e.g. commands/2.)
+initial_state() ->
+ #state{
+ alive = true,
+ bypassed = false,
+ enabled = true,
+ disabled = [],
+ procs = [],
+ limits = [],
+ counts = [],
+ locks = [],
+ tokens = [],
+ samples = [],
+ history = []
+ }.
+
+%% ------ Grouped operator: set_concurrency_limit
+%% @doc set_concurrency_limit_command - Command generator
+set_concurrency_limit_args(_S) ->
+ %% TODO: change Kill (3rd arg, to boolean gen)
+ [lock_type(), lock_limit(), false].
+
+%% @doc set_concurrency_limit precondition
+set_concurrency_limit_pre(S) ->
+ is_alive(S).
+
+%% @doc set_concurreny_limit command
+set_concurrency_limit(Type, Limit, false) ->
+ riak_core_bg_manager:set_concurrency_limit(Type, Limit).
+
+%% @doc state transition for set_concurrency_limit command
+set_concurrency_limit_next(S=#state{limits=Limits}, _Value, [Type,Limit,_Kill]) ->
+ S#state{ limits = lists:keystore(Type, 1, Limits, {Type, Limit}) }.
+
+%% @doc set_concurrency_limit_post - Postcondition for set_concurrency_limit
+set_concurrency_limit_post(S, [Type,_Limit,_Kill], Res) ->
+ %% check returned value is equal to value we have in state prior to this call
+ %% since returned value is promised to be previous one that was set
+ eq(limit(Type, S), Res).
+
+%% ------ Grouped operator: concurrency_limit
+%% @doc concurrency_limit command arguments generator
+concurrency_limit_args(_S) ->
+ [lock_type()].
+
+%% @doc concurrency_limit precondition
+concurrency_limit_pre(S) ->
+ is_alive(S).
+
+%% @doc concurrency limit command
+concurrency_limit(Type) ->
+ riak_core_bg_manager:concurrency_limit(Type).
+
+%% @doc Postcondition for concurrency_limit
+concurrency_limit_post(S, [Type], Limit) ->
+ ExpectedLimit = limit(Type, {unregistered, Type}, S),
+ eq(ExpectedLimit, Limit).
+
+%% ------ Grouped operator: concurrency_limit_reached
+%% @doc concurrency_limit_reached command argument generator
+concurrency_limit_reached_args(_S) ->
+ [lock_type()].
+
+%% @doc concurrency_limit_reached precondition
+concurrency_limit_reached_pre(S) ->
+ is_alive(S).
+
+%% @doc concurrency_limit_reached command
+concurrency_limit_reached(Type) ->
+ riak_core_bg_manager:concurrency_limit_reached(Type).
+
+%% @doc concurrency_limit_reached_post - Postcondition for concurrency_limit_reached
+concurrency_limit_reached_post(S, [Type], {unregistered, Type}) ->
+ eq(limit(Type, undefined, S), undefined);
+concurrency_limit_reached_post(S, [Type], Res) ->
+ Limit = limit(Type, S),
+ ExistingCount = length(held_locks(Type, S)),
+ eq(ExistingCount >= Limit, Res).
+
+
+%% ------ Grouped operator: get_lock
+%% @doc argument generator for get_lock command
+get_lock_args(S) ->
+ %% TODO: test getting locks on behalf of calling process instead of other process
+ %% TODO: test trying to get lock on behalf of killed process?
+ [lock_type(), oneof(running_procs(S)), []].
+
+%% @doc Precondition for generation of get_lock command
+get_lock_pre(S) ->
+ %% need some running procs to get locks on behalf of
+ RunningProcs = length(running_procs(S)) > 0,
+ RunningProcs andalso is_alive(S).
+
+%% @doc Precondition for generation of get_lock command
+get_lock_pre(S, [Type, _Pid, _Meta]) ->
+ %% must call set_concurrency_limit at least once
+ %% TODO: we can probably remove and test this restriction instead
+ is_integer(limit(Type, unregistered, S)).
+
+get_lock(Type, Pid, Meta) ->
+ case riak_core_bg_manager:get_lock(Type, Pid, Meta) of
+ {ok, Ref} -> Ref;
+ Other -> Other
+ end.
+
+
+%% @doc State transition for get_lock command
+%% `Res' is either the lock reference or max_concurrency
+get_lock_next(S=#state{enabled=Enabled, bypassed=Bypassed}, Res, [Type, Pid, Meta]) ->
+ TypeLimit = limit(Type, S),
+ Held = held_locks(Type, S),
+ ReallyEnabled = Enabled andalso resource_enabled(Type, S),
+ case (ReallyEnabled andalso length(Held) < TypeLimit) orelse Bypassed of
+ %% got lock
+ true -> add_held_lock(Type, Res, Pid, Meta, S);
+ %% failed to get lock
+ false -> S
+ end.
+
+%% @doc Postcondition for get_lock
+%% We expect to get max_concurrency if globally disabled or we hit the limit.
+%% We expect to get ok if bypassed or under the limit.
+get_lock_post(#state{bypassed=true}, [_Type, _Pid, _Meta], max_concurrency) ->
+ 'max_concurrency returned when bypassed';
+get_lock_post(S=#state{enabled=Enabled}, [Type, _Pid, _Meta], max_concurrency) ->
+ %% Since S reflects the state before we check that it
+ %% was already at the limit.
+ Limit = limit(Type, S),
+ ExistingCount = length(held_locks(Type, S)),
+ %% check >= because we may have lowered limit *without*
+ %% forcing some processes to release their locks by killing them
+ ReallyEnabled = Enabled andalso resource_enabled(Type, S),
+ case (not ReallyEnabled) orelse ExistingCount >= Limit of
+ true -> true;
+ false ->
+ %% hack to get more informative post-cond failure (like eq)
+ {ExistingCount, 'not >=', Limit}
+ end;
+get_lock_post(S=#state{bypassed=Bypassed, enabled=Enabled}, [Type, _Pid, _Meta], _LockRef) ->
+ %% Since S reflects the state before we check that it
+ %% was not already at the limit.
+ Limit = limit(Type, S),
+ ExistingCount = length(held_locks(Type, S)),
+ ReallyEnabled = Enabled andalso resource_enabled(Type, S),
+ case (ReallyEnabled andalso ExistingCount < Limit) orelse Bypassed of
+ true -> true;
+ false ->
+ %% hack to get more informative post-cond failure (like eq)
+ {ExistingCount, 'not <', Limit}
+ end.
+
+%% ------ Grouped operator: start_process
+%% @doc args generator for start_process
+start_process_args(_S) ->
+ [].
+
+%% @doc start_process_pre - Precondition for generation
+start_process_pre(S) ->
+ %% limit the number of running processes in the test, we should need an unbounded amount
+ %% TODO: move "20" to define
+ length(running_procs(S)) < 5.
+
+start_process() ->
+ spawn(fun() ->
+ receive die -> ok
+ %% this protects us against leaking too many processes when running
+ %% prop_bgmgr_parallel(), which doesn't clean up the processes it starts
+ after 360000 -> timeout
+ end
+ end).
+
+%% @doc state transition for start_process command
+start_process_next(S=#state{procs=Procs}, Value, []) ->
+ S#state{ procs = lists:keystore(Value, 1, Procs, {Value, running}) }.
+
+%% @doc postcondition for start_process
+start_process_post(_S, [], Pid)->
+ is_process_alive(Pid).
+
+%% ------ Grouped operator: stop_process
+%% @doc stop_process_command - Argument generator
+stop_process_args(S) ->
+ [oneof(running_procs(S))].
+
+%% @doc stop_process_pre - Precondition for generation
+stop_process_pre(S) ->
+ %% need some running procs in order to kill one.
+ length(running_procs(S)) > 0.
+
+%% @doc stop_process_pre - Precondition for stop_process
+stop_process_pre(S, [Pid]) ->
+ %% only interesting to kill processes that hold locks
+ lists:keyfind(Pid, 2, all_locks(S)) /= false.
+
+%% @doc stop_process command
+stop_process(Pid) ->
+ Pid ! die,
+ wait_for_pid(Pid).
+
+%% @doc state transition for stop_process command
+stop_process_next(S=#state{procs=Procs}, _Value, [Pid]) ->
+ %% mark process as no longer running and release all locks held by the process
+ UpdatedProcs = lists:keystore(Pid, 1, Procs, {Pid, not_running}),
+ release_locks(Pid, S#state{procs = UpdatedProcs}).
+
+
+%% @doc postcondition for stop_process
+stop_process_post(_S, [Pid], ok) ->
+ not is_process_alive(Pid);
+stop_process_post(_S, [Pid], {error, didnotexit}) ->
+ {error, {didnotexit, Pid}}.
+
+%% ------ Grouped operator: set_token_rate
+%% @doc set_token_rate arguments generator
+set_token_rate_args(_S) ->
+ %% NOTE: change token_type() to lock_type() to provoke failure due to registration of lock/token under same name
+ %% (waiting for fix in bg mgr).
+ [token_type(), token_count()].
+
+%% @doc set_token_rate precondition
+set_token_rate_pre(S) ->
+ is_alive(S).
+
+%% @doc set_token_rate state transition
+%% Note that set_token_rate takes a rate, which is {Period, Count},
+%% but this test generates it's own refill messages, so rate is not modeled.
+set_token_rate_next(S=#state{counts=Counts}, _Value, [Type, Count]) ->
+ S2 = update_sample(Type, Count, 0, 0, S),
+ S2#state{ counts = lists:keystore(Type, 1, Counts, {Type, Count}) }.
+
+%% @doc set_token_rate command
+set_token_rate(Type, Count) ->
+ %% we refill tokens as a command in the model so we use
+ %% token rate to give us the biggest refill period we can get.
+ %% no test should run longer than that or we have a problem.
+ riak_core_bg_manager:set_token_rate(Type, mk_token_rate(Count)).
+
+%% @doc Postcondition for set_token_rate
+set_token_rate_post(S, [Type, _Count], Res) ->
+ %% check returned value is equal to value we have in state prior to this call
+ %% since returned value is promised to be previous one that was set
+ eq(Res, mk_token_rate(max_num_tokens(Type, undefined, S))).
+
+%% ------ Grouped operator: token_rate
+%% @doc token_rate_command
+token_rate_args(_S) ->
+ [token_type()].
+
+%% @doc token_rate precondition
+token_rate_pre(S) ->
+ is_alive(S).
+
+%% @doc token_rate command
+token_rate(Type) ->
+ riak_core_bg_manager:token_rate(Type).
+
+%% @doc Postcondition for token_rate
+token_rate_post(S, [Type], Res) ->
+ ExpectedRate = mk_token_rate(max_num_tokens(Type, {unregistered, Type}, S)),
+ eq(ExpectedRate, Res).
+
+%% ------ Grouped operator: get_token
+%% @doc get_token args generator
+get_token_args(S) ->
+ %% TODO: generate meta for future query tests
+ ArityTwo = [[token_type(), oneof(running_procs(S))] || length(running_procs(S)) > 0],
+ ArityOne = [[token_type()]],
+ oneof(ArityTwo ++ ArityOne).
+
+%% @doc Precondition for get_token
+get_token_pre(S, [Type, _Pid]) ->
+ get_token_pre(S, [Type]);
+get_token_pre(S, [Type]) ->
+ %% must call set_token_rate at least once
+ %% TODO: we can probably remove and test this restriction instead
+ is_integer(max_num_tokens(Type, unregistered, S)) andalso is_alive(S).
+
+%% @doc get_token state transition
+get_token_next(S, Value, [Type, _Pid]) ->
+ get_token_next(S, Value, [Type]);
+get_token_next(S=#state{bypassed=Bypassed, enabled=Enabled}, _Value, [Type]) ->
+ CurCount = num_tokens(Type, S),
+ %% NOTE: this assumes the precondition requires we call set_token_rate at least once
+ %% in case we don't we treat the max as 0
+ Max = max_num_tokens(Type, unregistered, S),
+ ReallyEnabled = Enabled andalso resource_enabled(Type, S),
+ case (ReallyEnabled andalso CurCount < Max) orelse Bypassed of
+ true -> increment_token_count(Type, S);
+ false -> S
+ end.
+
+get_token(Type) ->
+ riak_core_bg_manager:get_token(Type).
+
+get_token(Type, Pid) ->
+ riak_core_bg_manager:get_token(Type, Pid).
+
+%% @doc Postcondition for get_token
+%% We expect to get max_concurrency if globally disabled or we hit the limit.
+%% We expect to get ok if bypassed or under the limit.
+get_token_post(S, [Type, _Pid], Res) ->
+ get_token_post(S, [Type], Res);
+get_token_post(#state{bypassed=true}, [_Type], max_concurrency) ->
+ 'max_concurrency returned while bypassed';
+get_token_post(S=#state{enabled=Enabled}, [Type], max_concurrency) ->
+ CurCount = num_tokens(Type, S),
+ %% NOTE: this assumes the precondition requires we call set_token_rate at least once
+ %% in case we don't we treat the max as 0
+ Max = max_num_tokens(Type, unregistered, S),
+ ReallyEnabled = Enabled andalso resource_enabled(Type, S),
+ case (not ReallyEnabled) orelse CurCount >= Max of
+ true -> true;
+ false ->
+ %% hack to get more info out of postcond failure
+ {CurCount, 'not >=', Max}
+ end;
+get_token_post(S=#state{bypassed=Bypassed, enabled=Enabled}, [Type], ok) ->
+ CurCount = num_tokens(Type, S),
+ %% NOTE: this assumes the precondition requires we call set_token_rate at least once
+ %% in case we don't we treat the max as 0
+ Max = max_num_tokens(Type, unregistered, S),
+ ReallyEnabled = Enabled andalso resource_enabled(Type, S),
+ case (ReallyEnabled andalso CurCount < Max) orelse Bypassed of
+ true -> true;
+ false ->
+ {CurCount, 'not <', Max}
+ end.
+
+%% ------ Grouped operator: refill_tokens
+%% @doc refill_tokens args generator
+refill_tokens_args(_S) ->
+ [token_type()].
+
+%% @doc refill_tokens precondition
+refill_tokens_pre(S, [Type]) ->
+ %% only refill tokens if we have registered type (called set_token_rate at least once)
+ is_integer(max_num_tokens(Type, unregistered, S)) andalso is_alive(S).
+
+%% @doc refill_tokens state transition
+refill_tokens_next(S, _Value, [Type]) ->
+ reset_token_count(Type, S).
+
+refill_tokens(Type) ->
+ riak_core_bg_manager ! {refill_tokens, Type},
+ %% TODO: find way to get rid of this timer sleep
+ timer:sleep(100).
+
+%% ------ Grouped operator: sample_history
+%% @doc sample_history args generator
+sample_history_args(_S) ->
+ [].
+
+%% @doc sample_history precondition
+sample_history_pre(S, []) ->
+ is_alive(S).
+
+%% @doc sample_history next state function
+sample_history_next(S, _Value, []) ->
+ do_sample_history(S).
+
+%% @doc sample_history command
+sample_history() ->
+ riak_core_bg_manager ! sample_history,
+ timer:sleep(100).
+
+%% ------ Grouped operator: crash
+%% @doc crash args generator
+crash_args(_S) ->
+ [].
+
+%% @doc precondition for crash command
+crash_pre(#state{alive=Alive}) ->
+ %% only crash if actually running
+ Alive.
+
+%% @doc state transition for crash command
+crash_next(S, _Value, _Args) ->
+ S#state{ alive = false }.
+
+%% @doc crash command
+crash() ->
+ stop_pid(whereis(riak_core_bg_manager)).
+
+%% @doc crash command post condition
+crash_post(_S, _Args, _Res) ->
+ %% TODO: anything we want to validate here?
+ true.
+
+%% ------ Grouped operator: revive
+%% @doc revive arguments generator
+revive_args(_S) ->
+ [].
+
+%% @doc revive precondition
+revive_pre(#state{alive=Alive}) ->
+ %% only revive if we are in a crashed state
+ not Alive.
+
+%% @doc revive_next - Next state function
+revive_next(S, _Value, _Args) ->
+ S2 = S#state{ alive = true },
+ clear_history(S2).
+
+%% @doc revive command
+revive() ->
+ {ok, _BgMgr} = riak_core_bg_manager:start(window_interval()).
+
+%% @doc revive_post - Postcondition for revive
+revive_post(_S, _Args, _Res) ->
+ %% TODO: what to validate here, if anything?
+ true.
+
+%% ------ Grouped operator: ps query
+%% @doc ps arguments generator
+ps_args(_S) ->
+ [oneof([all, lock_type(), token_type()])].
+
+%% @doc ps precondition
+ps_pre(S) ->
+ is_alive(S).
+
+%% @doc ps next state function
+ps_next(S, _Value, _Args) ->
+ S.
+
+%% @doc ps command
+ps(Resource) ->
+ riak_core_bg_manager:ps(Resource).
+
+%% @doc ps postcondition
+ps_post(State, [Resource], Result) ->
+ %% only one of these will have non-zero result unless Resource = all
+ NumLocks = length(held_locks(Resource, State)),
+ NumTokens = num_tokens_taken(Resource, State),
+ %% TODO: could validate record entries in addition to correct counts
+ eq(length(Result), NumLocks+NumTokens).
+
+%% ------ Grouped operator: head query
+%% @doc head arguments generator
+head_args(_S) ->
+ %% [Resource, Offset, NumSamples]
+ [oneof([all, lock_type(), token_type()]), 0, choose(0,5)].
+
+%% @doc ps precondition
+head_pre(S) ->
+ is_alive(S).
+
+%% @doc ps next state function
+head_next(S, _Value, _Args) ->
+ S.
+
+%% @doc head command
+head(Resource, Offset, NumSamples) ->
+ riak_core_bg_manager:head(Resource, Offset, NumSamples).
+
+%% @doc ps postcondition
+head_post(#state{history=RevHistory}, [Resource, Offset, NumSamples], Result) ->
+ History = lists:reverse(RevHistory),
+ Start = Offset+1,
+ Len = min(NumSamples, length(History) - Offset),
+ Keep = lists:sublist(History, Start, Len),
+ H2 = [lists:filter(fun({R,_L,_R,_G}) -> Resource == all orelse R == Resource end, Samples)
+ || Samples <- Keep, Samples =/= []],
+ H3 = lists:filter(fun(S) -> S =/= [] end, H2),
+ eq(length(Result), length(H3)).
+
+%% ------ Grouped operator: tail query
+%% @doc tail arguments generator
+tail_args(_S) ->
+ %% [Resource, Offset, NumSamples]
+ [oneof([all, lock_type(), token_type()]), 0, choose(0,5)].
+
+%% @doc ps precondition
+tail_pre(S) ->
+ is_alive(S).
+
+%% @doc ps next state function
+tail_next(S, _Value, _Args) ->
+ S.
+
+%% @doc tail command
+tail(Resource, Offset, NumSamples) ->
+ riak_core_bg_manager:tail(Resource, Offset, NumSamples).
+
+%% @doc ps postcondition
+tail_post(#state{history=RevHistory}, [Resource, Offset, NumSamples], Result) ->
+ History = lists:reverse(RevHistory),
+ HistLen = length(History),
+ Start = HistLen - Offset + 1,
+ Len = min(NumSamples, HistLen - Offset),
+ Keep = lists:sublist(History, Start, Len),
+ H2 = [lists:filter(fun({R,_L,_R,_G}) -> Resource == all orelse R == Resource end, Samples)
+ || Samples <- Keep, Samples =/= []],
+ H3 = lists:filter(fun(S) -> S =/= [] end, H2),
+ eq(length(Result), length(H3)).
+
+%% ------ Grouped operator: bypass
+%% @doc bypass arguments generator
+bypass_args(_S) ->
+ [oneof([true, false])].
+
+%% @doc bypass precondition
+bypass_pre(S) ->
+ is_alive(S).
+
+%% @doc bypass next state function
+bypass_next(S, _Value, [Switch]) ->
+ S#state{bypassed=Switch}.
+
+%% @doc bypass command
+bypass(Switch) ->
+ Res = riak_core_bg_manager:bypass(Switch), %% expect 'ok'
+ Value = riak_core_bg_manager:bypassed(), %% expect eq(Value, Switch)
+ {Res, Value}.
+
+%% @doc bypass postcondition
+bypass_post(_S, [Switch], Result) ->
+ eq(Result, {ok, Switch}).
+
+%% ------ Grouped operator: bypassed
+%% @doc bypass arguments generator
+bypassed_args(_S) ->
+ [].
+
+%% @doc eanble precondition
+bypassed_pre(S) ->
+ is_alive(S).
+
+%% @doc bypassed next state function
+bypassed_next(S, _Value, []) ->
+ S.
+
+%% @doc bypassed command
+bypassed() ->
+ riak_core_bg_manager:bypassed().
+
+%% @doc bypassed postcondition
+bypassed_post(#state{bypassed=Bypassed}, _Value, Result) ->
+ eq(Result, Bypassed).
+
+%% ------ Grouped operator: enable
+%% @doc bypass arguments generator
+enable_args(_S) ->
+ [oneof([[], token_type(), lock_type()])].
+
+%% @doc enable precondition
+%% global enable
+enable_pre(S) ->
+ is_alive(S).
+
+%% per resource enable
+enable_pre(S,[Type]) ->
+ is_integer(max_num_tokens(Type, unregistered, S)) andalso is_alive(S).
+
+%% @doc enable next state function
+%% global enable
+enable_next(S, _Value, []) ->
+ S#state{enabled=true};
+%% per resource enable
+enable_next(S, _Value, [Type]) ->
+ enable_resource(Type, S).
+
+%% @doc enable command
+enable() ->
+ riak_core_bg_manager:enable().
+
+enable(Resource) ->
+ riak_core_bg_manager:enable(Resource).
+
+%% @doc enable postcondition
+%% global enable
+enable_post(S, [], Result) ->
+ eq(Result, status_of(true, S#state{enabled=true}));
+%% per resource enable
+enable_post(S, [_Resource], Result) ->
+ ResourceEnabled = true,
+ eq(Result, status_of(ResourceEnabled, S)).
+
+%% ------ Grouped operator: disable
+%% @doc bypass arguments generator
+disable_args(_S) ->
+ [oneof([[], token_type(), lock_type()])].
+
+%% @doc eanble precondition
+%% global disable
+disable_pre(S) ->
+ is_alive(S).
+
+%% per resource disable
+disable_pre(S,[Type]) ->
+ is_integer(max_num_tokens(Type, unregistered, S)) andalso is_alive(S).
+
+%% @doc disable next state function
+%% global disable
+disable_next(S, _Value, []) ->
+ S#state{enabled=false};
+%% per resource disable
+disable_next(S, _Value, [Type]) ->
+ disable_resource(Type, S).
+
+%% @doc disable command
+disable() ->
+ riak_core_bg_manager:disable().
+
+disable(Resource) ->
+ riak_core_bg_manager:disable(Resource).
+
+%% @doc disable postcondition
+%% global
+disable_post(S, [], Result) ->
+ Ignored = true,
+ eq(Result, status_of(Ignored, S#state{enabled=false}));
+%% per resource
+disable_post(S, [_Resource], Result) ->
+ ResourceEnabled = false,
+ eq(Result, status_of(ResourceEnabled, S)).
+
+%% ------ Grouped operator: enabled
+%% @doc bypass arguments generator
+enabled_args(_S) ->
+ [].
+
+%% @doc eanble precondition
+enabled_pre(S) ->
+ is_alive(S).
+
+%% @doc enabled next state function
+enabled_next(S, _Value, []) ->
+ S.
+
+%% @doc enabled command
+enabled() ->
+ riak_core_bg_manager:enabled().
+
+%% @doc enabled postcondition
+enabled_post(S, _Value, Result) ->
+ eq(Result, status_of(true, S)).
+
+%%------------ helpers -------------------------
+%% @doc resources are disabled iff they appear on the "disabled" list
+resource_enabled(Resource, #state{disabled=Disabled}) ->
+ not lists:member(Resource, Disabled).
+
+%% @doc enable the resource by removing from the "disabled" list
+enable_resource(Resource, State=#state{disabled=Disabled}) ->
+ State#state{disabled=lists:delete(Resource, Disabled)}.
+
+disable_resource(Resource, State=#state{disabled=Disabled}) ->
+ State#state{disabled=[Resource | lists:delete(Resource, Disabled)]}.
+
+%% @doc return status considering Resource status, enbaled, and bypassed
+status_of(_Enabled, #state{bypassed=true}) -> bypassed;
+status_of(true, #state{enabled=true}) -> enabled;
+status_of(_E,_S) -> disabled.
+
+%% -- Generators
+lock_type() ->
+ oneof([a,b,c,d]). %%,e,f,g,h,i]).
+
+token_type() ->
+ oneof(['A','B','C','D']). %%,'E','F','G','H','I']).
+
+lock_limit() ->
+ choose(0, 5).
+
+token_count() ->
+ choose(0, 5).
+
+ps_() ->
+ choose(0, 10).
+
+%% @doc weight/2 - Distribution of calls
+weight(_S, set_concurrency_limit) -> 3;
+weight(_S, concurrency_limit) -> 3;
+weight(_S, concurrency_limit_reached) -> 3;
+weight(_S, start_process) -> 3;
+weight(#state{alive=true}, stop_process) -> 3;
+weight(#state{alive=false}, stop_process) -> 3;
+weight(_S, get_lock) -> 20;
+weight(_S, set_token_rate) -> 3;
+weight(_S, token_rate) -> 0;
+weight(_S, get_token) -> 20;
+weight(_S, refill_tokens) -> 10;
+weight(_S, sample_history) -> 10;
+weight(_S, ps) -> 3;
+weight(_S, head) -> 3;
+weight(_S, tail) -> 3;
+weight(_S, crash) -> 3;
+weight(_S, revive) -> 1;
+weight(_S, _Cmd) -> 1.
+
+%% Other Functions
+limit(Type, State) ->
+ limit(Type, 0, State).
+
+limit(Type, Default, #state{limits=Limits}) ->
+ case lists:keyfind(Type, 1, Limits) of
+ false -> Default;
+ {Type, Limit} -> Limit
+ end.
+
+num_tokens(Type, #state{tokens=Tokens}) ->
+ case lists:keyfind(Type, 1, Tokens) of
+ false -> 0;
+ {Type, NumTokens} -> NumTokens
+ end.
+
+max_num_tokens(Type, Default, #state{counts=Counts}) ->
+ case lists:keyfind(Type, 1, Counts) of
+ false -> Default;
+ {Type, Limit} -> Limit
+ end.
+
+num_tokens_taken(all, #state{tokens=Tokens}) ->
+ lists:foldl(fun({_Resource, Count}, Sum) -> Count+Sum end, 0, Tokens);
+num_tokens_taken(Resource, #state{tokens=Tokens}) ->
+ lists:foldl(fun(Count, Sum) -> Count+Sum end,
+ 0,
+ [Count || {R, Count} <- Tokens, R == Resource]).
+
+clear_history(State) ->
+ State#state{history=[], samples=[]}.
+
+%% @doc Snapshot current history samples and reset samples to empty
+do_sample_history(State=#state{limits=Limits, counts=Counts}) ->
+ %% First, grab updates for all resources
+ S2 = lists:foldl(fun({Resource, Limit}, S) ->
+ update_sample(Resource, Limit, 0, 0, S)
+ end,
+ State,
+ Limits++Counts),
+ NewHistory = [S2#state.samples | S2#state.history],
+ S2#state{history=NewHistory, samples=[]}.
+
+%% @doc Update the current samples with supplied increments.
+%% Limit is overwritten unless undefined. It's not expected to change too often,
+%% hopefully less often than the sampling window (Niquist!).
+%% This should probably be called approximately every time the API is called.
+update_sample(Resource, Limit, Refill, Given, State=#state{samples=Samples}) ->
+ %% find sample counts for specified resource and increment per arguments.
+ {_R, Limit1, Refills1, Given1} = case lists:keyfind(Resource, 1, Samples) of
+ false -> {Resource, 0, 0, 0};
+ S -> S
+ end,
+ Sample = {Resource, defined_or_default(Limit, Limit1), Refill+Refills1,
+ Given+Given1},
+ Samples2 = lists:keystore(Resource, 1, Samples, Sample),
+ State#state{samples=Samples2}.
+
+defined_or_default(undefined, Default) -> Default;
+defined_or_default(Value, _Default) -> Value.
+
+is_alive(#state{alive=Alive}) ->
+ Alive.
+
+mk_token_rate({unregistered, _}=Unreg) ->
+ Unreg;
+mk_token_rate(undefined) ->
+ undefined;
+mk_token_rate(Count) ->
+ %% erlang:send_after max is used so that we can trigger token refilling from EQC test
+ {max_send_after(), Count}.
+
+window_interval() ->
+ max_send_after().
+
+max_send_after() ->
+ 4294967295.
+
+running_procs(#state{procs=Procs}) ->
+ [Pid || {Pid, running} <- Procs].
+
+all_locks(#state{locks=Locks}) ->
+ lists:flatten([ByType || {_Type, ByType} <- Locks]).
+
+all_locks(all, State) ->
+ all_locks(State);
+all_locks(Type, #state{locks=Locks}) ->
+ case lists:keyfind(Type, 1, Locks) of
+ false -> [];
+ {Type, All} -> All
+ end.
+
+held_locks(Type, State) ->
+ [{Ref, Pid, Meta, held} || {Ref, Pid, Meta, held} <- all_locks(Type, State)].
+
+update_locks(Type, TypeLocks, State=#state{locks=Locks}) ->
+ State#state{ locks = lists:keystore(Type, 1, Locks, {Type, TypeLocks}) }.
+
+add_held_lock(Type, Ref, Pid, Meta, State) ->
+ All = all_locks(Type, State),
+ S2 = update_sample(Type, undefined, 0, 1, State),
+ update_locks(Type, [{Ref, Pid, Meta, held} | All], S2).
+
+release_locks(Pid, State=#state{locks=Locks}) ->
+ lists:foldl(fun({Type, ByType}, StateAcc) ->
+ NewLocks = mark_locks_released(Pid, ByType),
+ update_locks(Type, NewLocks, StateAcc)
+ end,
+ State, Locks).
+
+mark_locks_released(Pid, Locks) ->
+ WithoutPid = [Lock || Lock <- Locks, element(2, Lock) =/= Pid],
+ MarkedReleased = [{Ref, LockPid, Meta, released} || {Ref, LockPid, Meta, _} <- Locks, LockPid =:= Pid],
+ MarkedReleased ++ WithoutPid.
+
+increment_token_count(Type, State=#state{tokens=Tokens}) ->
+ CurCount = num_tokens(Type, State),
+ S2 = update_sample(Type, undefined, 0, 1, State),
+ S2#state{ tokens = lists:keystore(Type, 1, Tokens, {Type, CurCount + 1}) }.
+
+reset_token_count(Type, State=#state{tokens=Tokens}) ->
+ S2 = update_sample(Type, undefined, 1, 0, State),
+ S2#state{ tokens = lists:keystore(Type, 1, Tokens, {Type, 0}) }.
+
+stop_pid(Other) when not is_pid(Other) ->
+ ok;
+stop_pid(Pid) ->
+ unlink(Pid),
+ exit(Pid, shutdown),
+ ok = wait_for_pid(Pid).
+
+wait_for_pid(Pid) ->
+ Mref = erlang:monitor(process, Pid),
+ receive
+ {'DOWN', Mref, process, _, _} ->
+ ok
+ after
+ 5000 ->
+ {error, didnotexit}
+ end.
+
+bg_manager_monitors() ->
+ bg_manager_monitors(whereis(riak_core_bg_manager)).
+
+bg_manager_monitors(undefined) ->
+ crashed;
+bg_manager_monitors(Pid) ->
+ process_info(Pid, monitors).
+
+prop_bgmgr() ->
+ ?FORALL(Cmds, commands(?MODULE),
+ aggregate(command_names(Cmds),
+ ?TRAPEXIT(
+ begin
+ stop_pid(whereis(riak_core_table_manager)),
+ stop_pid(whereis(riak_core_bg_manager)),
+ {ok, _TableMgr} = riak_core_table_manager:start_link([{?BG_INFO_ETS_TABLE,
+ [protected, set, named_table]},
+ {?BG_ENTRY_ETS_TABLE,
+ [protected, bag, named_table]}]),
+ {ok, _BgMgr} = riak_core_bg_manager:start(window_interval()),
+ {H, S, Res} = run_commands(?MODULE,Cmds),
+ InfoTable = ets:tab2list(?BG_INFO_ETS_TABLE),
+ EntryTable = ets:tab2list(?BG_ENTRY_ETS_TABLE),
+ Monitors = bg_manager_monitors(),
+ RunnngPids = running_procs(S),
+ %% cleanup processes not killed during test
+ [stop_pid(Pid) || Pid <- RunnngPids],
+ stop_pid(whereis(riak_core_table_manager)),
+ stop_pid(whereis(riak_core_bg_manager)),
+ ?WHENFAIL(
+ begin
+ io:format("~n~nFinal State: ~n"),
+ io:format("---------------~n"),
+ io:format("alive = ~p~n", [S#state.alive]),