diff --git a/Makefile b/Makefile index ad61e3dcc..204872ff4 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ PULSE_TESTS = worker_pool_pulse all: deps compile -compile: +compile: ./rebar compile deps: @@ -19,7 +19,7 @@ clean: distclean: clean ./rebar delete-deps -test: all +test: all ./rebar skip_deps=true eunit # You should 'clean' before your first run of this target diff --git a/ebin/riak_core.app b/ebin/riak_core.app index 6a5265844..0be3c781f 100644 --- a/ebin/riak_core.app +++ b/ebin/riak_core.app @@ -3,7 +3,7 @@ {application, riak_core, [ {description, "Riak Core"}, - {vsn, "1.4.0"}, + {vsn, "1.4.1"}, {modules, [ app_helper, bloom, @@ -87,7 +87,10 @@ riak_core_ssl_util, supervisor_pre_r14b04, vclock, - riak_core_bg_manager + riak_core_bg_manager, + riak_core_token_manager, + riak_core_table_manager, + riak_core_bg_manager_sup ]}, {registered, []}, {included_applications, [folsom]}, diff --git a/include/riak_core_bg_manager.hrl b/include/riak_core_bg_manager.hrl new file mode 100644 index 000000000..06dce6a2d --- /dev/null +++ b/include/riak_core_bg_manager.hrl @@ -0,0 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-define(LM_ETS_TABLE, lock_mgr_table). %% name of private lock manager ETS table +-define(LM_ETS_OPTS, [private, bag]). %% creation time properties of lock manager ETS table + +-type concurrency_limit() :: non_neg_integer() | infinity. diff --git a/include/riak_core_token_manager.hrl b/include/riak_core_token_manager.hrl new file mode 100644 index 000000000..2c50b51ca --- /dev/null +++ b/include/riak_core_token_manager.hrl @@ -0,0 +1,53 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-type tm_token() :: any(). +-type tm_meta() :: {atom(), any()}. %% meta data to associate with a token +-type tm_period() :: pos_integer(). %% refill period in milliseconds +-type tm_count() :: pos_integer(). %% refill tokens to count at each refill period +-type tm_rate() :: {tm_period(), tm_count()}. %% token refresh rate +-type tm_stat_event() :: refill_event | give_event. %% stat event type + +%% Results of a "ps" of live given or blocked tokens +-record(tm_stat_live, + { + token :: tm_token(), %% token type + consumer :: pid(), %% process asking for token + meta :: [tm_meta()], %% associated meta data + state :: given | blocked | failed %% result of last request + }). +-type tm_stat_live() :: #tm_stat_live{}. + +%% Results of a "head" or "tail", per token +-record(tm_stat_hist, + { + limit :: tm_count(), %% maximum available, defined by token rate during interval + refills :: tm_count(), %% number of times this token was refilled during interval + given :: tm_count(), %% number of this token type given in interval + blocked :: tm_count() %% number of blocked processes waiting for a token + }). +-type tm_stat_hist() :: #tm_stat_hist{}. +-define(DEFAULT_TM_STAT_HIST, + #tm_stat_hist{limit=0, refills=0, given=0, blocked=0}). + +-define(DEFAULT_TM_SAMPLE_WINDOW, 60). %% in seconds +-define(DEFAULT_TM_OUTPUT_SAMPLES, 20). %% default number of sample windows displayed +-define(DEFAULT_TM_KEPT_SAMPLES, 10000). %% number of history samples to keep +-define(TM_ETS_TABLE, token_mgr_table). %% name of private token manager ETS table +-define(TM_ETS_OPTS, [private, bag]). %% creation time properties of token manager ETS table diff --git a/rebar.config b/rebar.config index d6d5d03df..4465ac32e 100644 --- a/rebar.config +++ b/rebar.config @@ -6,14 +6,14 @@ {deps, [ {lager, "2.0.0", {git, "git://github.com/basho/lager", {tag, "2.0.0"}}}, - {poolboy, ".*", {git, "git://github.com/basho/poolboy", {branch, "master"}}}, + {poolboy, ".*", {git, "git://github.com/basho/poolboy", {branch, "develop"}}}, {protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs", - {branch, "master"}}}, - {basho_stats, ".*", {git, "git://github.com/basho/basho_stats", "HEAD"}}, - {riak_sysmon, ".*", {git, "git://github.com/basho/riak_sysmon", {branch, "master"}}}, + {tag, "0.8.1p1"}}}, + {basho_stats, ".*", {git, "git://github.com/basho/basho_stats", {branch, "develop"}}}, + {riak_sysmon, ".*", {git, "git://github.com/basho/riak_sysmon", {branch, "develop"}}}, {webmachine, ".*", {git, "git://github.com/basho/webmachine", - {tag, "64176ef9b"}}}, - {folsom, ".*", {git, "git://github.com/basho/folsom.git", {tag, "0.7.4p1"}}}, + {tag, "1.10.4p1"}}}, + {folsom, ".*", {git, "git://github.com/basho/folsom.git", {tag, "0.7.4p2"}}}, {ranch, "0.4.0-p1", {git, "git://github.com/basho/ranch.git", {tag, "0.4.0-p1"}}} ]}. diff --git a/rebar.config.script b/rebar.config.script new file mode 100644 index 000000000..bd368568c --- /dev/null +++ b/rebar.config.script @@ -0,0 +1,11 @@ +case erlang:system_info(otp_release) =< "R15B01" of + true -> + HashDefine = [{d,old_hash}], + case lists:keysearch(erl_opts, 1, CONFIG) of + {value, {erl_opts, Opts}} -> + lists:keyreplace(erl_opts,1,CONFIG,{erl_opts,Opts++HashDefine}); + false -> + CONFIG ++ [{erl_opts, HashDefine}] + end; + false -> CONFIG +end. diff --git a/src/chash.erl b/src/chash.erl index 5a4d11e05..5a8ab8d69 100644 --- a/src/chash.erl +++ b/src/chash.erl @@ -97,12 +97,20 @@ lookup(IndexAsInt, CHash) -> {IndexAsInt, X} = proplists:lookup(IndexAsInt, Nodes), X. +-ifndef(old_hash). +sha(Bin) -> + crypto:hash(sha, Bin). +-else. +sha(Bin) -> + crypto:sha(Bin). +-endif. + %% @doc Given any term used to name an object, produce that object's key %% into the ring. Two names with the same SHA-1 hash value are %% considered the same name. -spec key_of(ObjectName :: term()) -> index(). key_of(ObjectName) -> - crypto:sha(term_to_binary(ObjectName)). + sha(term_to_binary(ObjectName)). %% @doc Return all Nodes that own any partitions in the ring. -spec members(CHash :: chash()) -> [chash_node()]. diff --git a/src/merkerl.erl b/src/merkerl.erl index dfa36fed4..f81e24102 100644 --- a/src/merkerl.erl +++ b/src/merkerl.erl @@ -323,8 +323,13 @@ contains_node(Tree,Node) -> getkids(Tree) -> [V || {_K,V} <- orddict:to_list(Tree#merk.children)]. +-ifndef(old_hash). +sha(X) -> + crypto:hash(sha, term_to_binary(X)). +-else. sha(X) -> crypto:sha(term_to_binary(X)). +-endif. -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/src/riak_core_apl.erl b/src/riak_core_apl.erl index 3829aa911..cc143dbf6 100644 --- a/src/riak_core_apl.erl +++ b/src/riak_core_apl.erl @@ -325,7 +325,7 @@ six_node_test() -> ok. chbin_test_() -> - {timeout, 60, fun chbin_test_scenario/0}. + {timeout, 180, fun chbin_test_scenario/0}. chbin_test_scenario() -> [chbin_test_scenario(Size, NumNodes) diff --git a/src/riak_core_bg_manager.erl b/src/riak_core_bg_manager.erl index 9aa98a43a..33069823f 100644 --- a/src/riak_core_bg_manager.erl +++ b/src/riak_core_bg_manager.erl @@ -22,7 +22,13 @@ -behaviour(gen_server). %% API --export([start_link/0, +-export([ + %% Universal + start_link/0, + enable/0, + disable/0, + %% Locks + %% TODO: refactor the lock implementation to another module ala tokens get_lock/1, get_lock/2, get_lock/3, @@ -31,22 +37,42 @@ lock_types/0, all_locks/0, query_locks/1, - enable/0, - enable/1, - disable/0, - disable/1, - disable/2, + enable_locks/0, + enable_locks/1, + disable_locks/0, + disable_locks/1, + disable_locks/2, concurrency_limit/1, set_concurrency_limit/2, set_concurrency_limit/3, - concurrency_limit_reached/1]). + concurrency_limit_reached/1, + %% Tokens, all proxied to riak_core_token_manager + set_token_rate/2, + token_rate/1, + enable_tokens/0, + enable_tokens/1, + disable_tokens/0, + disable_tokens/1, + get_token/1, + get_token/2, + get_token/3, + get_token_sync/1, + get_token_sync/2, + get_token_sync/3, + token_types/0, + tokens_given/0, + tokens_given/1, + tokens_waiting/0, + tokens_waiting/1 + ]). + +-include("riak_core_token_manager.hrl"). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {held :: ordict:orddict(), - info :: orddict:orddict(), +-record(state, {table_id:: ets:tid(), %% TableID of ?LM_ETS_TABLE enabled :: boolean()}). -record(lock_info, {concurrency_limit :: non_neg_integer(), @@ -57,6 +83,8 @@ -define(limit(X), (X)#lock_info.concurrency_limit). -define(enabled(X), (X)#lock_info.enabled). -define(DEFAULT_LOCK_INFO, #lock_info{enabled=true, concurrency_limit=?DEFAULT_CONCURRENCY}). +-define(LM_ETS_TABLE, lock_mgr_table). %% name of private lock manager ETS table +-define(LM_ETS_OPTS, [private, bag]). %% creation time properties of lock manager ETS table -type concurrency_limit() :: non_neg_integer() | infinity. @@ -69,6 +97,105 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +%% @doc Enable handing out of all background locks and tokens +-spec enable() -> ok. +enable() -> + enable_tokens(), + enable_locks(). + +%% @doc Disable handing out of all background locks and tokens +-spec disable() -> ok. +disable() -> + disable_tokens(), + disable_locks(). + +%%% Token API proxies to the token manager + +%% @doc Set the refill rate of tokens. +-spec set_token_rate(any(), riak_core_token_manager:tm_rate()) -> riak_core_token_manager:tm_rate(). +set_token_rate(Type, {Period, Count}) -> + riak_core_token_manager:set_token_rate(Type, {Period, Count}). + +-spec token_rate(any()) -> riak_core_token_manager:tm_rate(). +token_rate(Type) -> + riak_core_token_manager:token_rate(Type). + +%% @doc Asynchronously get a token of kind Type. returns "max_tokens" if empty +-spec get_token(any()) -> ok | max_tokens. +get_token(Type) -> + get_token(Type, self()). + +%% @doc Asynchronously get a token of kind Type. +%% Associate token with provided pid or metadata. If metadata +%% is provided the lock is associated with the calling process. +%% Returns "max_tokens" if empty. +-spec get_token(any(), pid() | [{atom(), any()}]) -> ok | max_tokens. +get_token(Type, Pid) when is_pid(Pid) -> + get_token(Type, Pid, []); +get_token(Type, Meta) -> + get_token(Type, self(), Meta). + +-spec get_token(any(), pid(), [{atom(), any()}]) -> ok | max_concurrency. +get_token(Type, Pid, Meta) -> + riak_core_token_manager:get_token_async(Type, Pid, Meta). + +%% @doc Synchronously get a token of type Type. returns "max_tokens" if empty +-spec get_token_sync(any()) -> ok | max_tokens. +get_token_sync(Type) -> + get_token_sync(Type, self()). + +%% @doc Synchronously get a token of kind Type. +%% Associate token with provided pid or metadata. If metadata +%% is provided the lock is associated with the calling process. +%% Returns "max_tokens" if empty. +-spec get_token_sync(any(), pid() | [{atom(), any()}]) -> ok | max_tokens. +get_token_sync(Type, Pid) when is_pid(Pid) -> + get_token_sync(Type, Pid, []); +get_token_sync(Type, Meta) -> + get_token(Type, self(), Meta). + +-spec get_token_sync(any(), pid(), [{atom(), any()}]) -> ok | max_concurrency. +get_token_sync(Type, Pid, Meta) -> + riak_core_token_manager:get_token_sync(Type, Pid, Meta). + +token_types() -> + riak_core_token_manager:token_types(). + +tokens_given() -> + riak_core_token_manager:tokens_given(). + +tokens_given(Type) -> + riak_core_token_manager:tokens_given(Type). + +tokens_waiting() -> + riak_core_token_manager:tokens_waiting(). + +tokens_waiting(Type) -> + riak_core_token_manager:tokens_waiting(Type). + +%% @doc Enable handing out of any tokens +-spec enable_tokens() -> ok. +enable_tokens() -> + riak_core_token_manager:enable(). + +%% @doc Disable handing out of any tokens +-spec disable_tokens() -> ok. +disable_tokens() -> + riak_core_token_manager:disable(). + +%% @doc Enable handing out of tokens of the given type. +-spec enable_tokens(any()) -> ok. +enable_tokens(Type) -> + riak_core_token_manager:enable(Type). + + +%% @doc same as `disable(Type, false)' +-spec disable_tokens(any()) -> ok. +disable_tokens(Type) -> + riak_core_token_manager:enable(Type). + +%%% Locks + %% @doc Acquire a concurrency lock of the given type, if available, %% and associate the lock with the calling process. -spec get_lock(any()) -> ok | max_concurrency. @@ -124,30 +251,29 @@ query_locks(Query) -> gen_server:call(?MODULE, {query_locks, Query}, infinity). %% @doc Enable handing out of any locks --spec enable() -> ok. -enable() -> +-spec enable_locks() -> ok. +enable_locks() -> gen_server:cast(?MODULE, enable). %% @doc Disable handing out of any locks --spec disable() -> ok. -disable() -> +-spec disable_locks() -> ok. +disable_locks() -> gen_server:cast(?MODULE, disable). %% @doc Enable handing out of locks of the given type. --spec enable(any()) -> ok. -enable(Type) -> +-spec enable_locks(any()) -> ok. +enable_locks(Type) -> gen_server:cast(?MODULE, {enable, Type}). - -%% @doc same as `disable(Type, false)' --spec disable(any()) -> ok. -disable(Type) -> - disable(Type, false). +%% @doc same as `disable_locks(Type, false)' +-spec disable_locks(any()) -> ok. +disable_locks(Type) -> + disable_locks(Type, false). %% @doc Disable handing out of locks of the given type. If `Kill' is `true' any processes %% holding locks for the given type will be killed with reaseon `max_concurrency' --spec disable(any(), boolean()) -> ok. -disable(Type, Kill) -> +-spec disable_locks(any(), boolean()) -> ok. +disable_locks(Type, Kill) -> gen_server:cast(?MODULE, {disable, Type, Kill}). %% @doc Get the current maximum concurrency for the given lock type. @@ -186,8 +312,10 @@ concurrency_limit_reached(Type) -> ignore | {stop, term()}. init([]) -> - {ok, #state{info=orddict:new(), - held=orddict:new(), + lager:debug("Background Manager starting up."), + %% claiming the table will result in a handle_info('ETS-TRANSFER', ...) message. + ok = riak_core_table_manager:claim_table(?LM_ETS_TABLE), + {ok, #state{table_id=undefined, enabled=true}}. %% @private @@ -204,16 +332,16 @@ handle_call({get_lock, LockType, Pid, Info}, _From, State) -> {reply, Reply, State2}; handle_call({lock_count, LockType}, _From, State) -> {reply, held_count(LockType, State), State}; -handle_call(lock_count, _From, State=#state{held=Locks}) -> - Count = orddict:fold(fun(_, Held, Total) -> Total + length(Held) end, - 0, Locks), +handle_call(lock_count, _From, State) -> + Count = length(held_locks(State)), {reply, Count, State}; handle_call({lock_limit_reached, LockType}, _From, State) -> HeldCount = held_count(LockType, State), Limit = ?limit(lock_info(LockType, State)), {reply, HeldCount >= Limit, State}; -handle_call(lock_types, _From, State=#state{info=Info}) -> - Types = [{Type, ?enabled(LI), ?limit(LI)} || {Type, LI} <- orddict:to_list(Info)], +handle_call(lock_types, _From, State=#state{table_id=TableId}) -> + Infos = [{Type,Info} || {{info, Type},Info} <- ets:match_object(TableId, {{info, '_'},'_'})], + Types = [{Type, ?enabled(LI), ?limit(LI)} || {Type, LI} <- Infos], {reply, Types, State}; handle_call({query_locks, Query}, _From, State) -> Results = query_locks(Query, State), @@ -252,6 +380,11 @@ handle_cast(disable, State) -> -spec handle_info(term(), #state{}) -> {noreply, #state{}} | {noreply, #state{}, non_neg_integer()} | {stop, term(), #state{}}. +%% Handle transfer of ETS table from table manager +handle_info({'ETS-TRANSFER', TableId, Pid, _Data}, State) -> + lager:debug("table_mgr (~p) -> bg_mgr (~p) receiving ownership of TableId: ~p", [Pid, self(), TableId]), + State2 = State#state{table_id=TableId}, + {noreply, State2}; handle_info({'DOWN', Ref, _, _, _}, State) -> State2 = release_lock(Ref, State), {noreply, State2}; @@ -277,7 +410,8 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== -query_locks(FullQuery, State=#state{held=Locks}) -> +query_locks(FullQuery, State) -> + Locks = held_locks(State), Base = case proplists:get_value(type, FullQuery) of undefined -> Locks; LockType -> orddict:from_list([{LockType, held_locks(LockType, State)}]) @@ -317,19 +451,22 @@ try_lock(LockType, Pid, Info, State=#state{enabled=GlobalEnabled}) -> try_lock(false, _LockType, _Pid, _Info, State) -> {max_concurrency, State}; -try_lock(true, LockType, Pid, Info, State=#state{held=Locks}) -> +try_lock(true, LockType, Pid, Info, State) -> Ref = monitor(process, Pid), - NewLocks = orddict:append(LockType, {Pid, Ref, Info}, Locks), - {ok, State#state{held=NewLocks}}. + State2 = add_lock(LockType, {Pid, Ref, Info}, State), + {ok, State2}. -release_lock(Ref, State=#state{held=Locks}) -> - %% TODO: this makes me (jordan) :( - Released = orddict:map(fun(Type, Held) -> release_lock(Ref, Type, Held) end, - Locks), - State#state{held=Released}. +add_lock(LockType, Lock, State=#state{table_id=TableId}) -> + Key = {held, LockType}, + ets:insert(TableId, {Key, Lock}), + State. -release_lock(Ref, _LockType, Held) -> - lists:keydelete(Ref, 2, Held). +release_lock(Ref, State=#state{table_id=TableId}) -> + %% There should only be one instance of the object, but we'll zap all that match. + Pattern = {{held, '_'}, {'_', Ref, '_'}}, + Matches = [Lock || {{held, _Type},Lock} <- ets:match_object(TableId, Pattern)], + [ets:delete_object(TableId, Obj) || Obj <- Matches], + State. maybe_honor_limit(true, LockType, Limit, State) -> Held = held_locks(LockType, State), @@ -348,11 +485,11 @@ maybe_honor_limit(false, _LockType, _Limit, _State) -> held_count(LockType, State) -> length(held_locks(LockType, State)). -held_locks(LockType, #state{held=Locks}) -> - case orddict:find(LockType, Locks) of - error -> []; - {ok, Held} -> Held - end. +held_locks(#state{table_id=TableId}) -> + [Lock || {{held, _Type},Lock} <- ets:match_object(TableId, {{held, '_'},'_'})]. + +held_locks(LockType, #state{table_id=TableId}) -> + [Lock || {{held, _Type},Lock} <- ets:match_object(TableId, {{held, LockType},'_'})]. enable_lock(LockType, State) -> update_lock_enabled(LockType, true, State). @@ -372,13 +509,21 @@ update_concurrency_limit(LockType, Limit, State) -> ?DEFAULT_LOCK_INFO#lock_info{concurrency_limit=Limit}, State). -update_lock_info(LockType, Fun, Default, State=#state{info=Info}) -> - NewInfo = orddict:update(LockType, Fun, Default, Info), - State#state{info=NewInfo}. - - -lock_info(LockType, #state{info=Info}) -> - case orddict:find(LockType, Info) of - error -> ?DEFAULT_LOCK_INFO; - {ok, LockInfo} -> LockInfo +update_lock_info(LockType, Fun, Default, State=#state{table_id=TableId}) -> + Key = {info, LockType}, + NewInfo = case ets:lookup(TableId, Key) of + [] -> Default; + [{_Key,LockInfo}] -> Fun(LockInfo) + end, + ets:insert(TableId, {Key, NewInfo}), + State. + +lock_info(LockType, #state{table_id=TableId}) -> + Key = {info,LockType}, + case ets:lookup(TableId, Key) of + [] -> ?DEFAULT_LOCK_INFO; + [{_Key,LockInfo}] -> LockInfo; + [First | _Rest] -> + lager:error("Unexpected multiple instances of key ~p in table", [{info, LockType}]), + First %% try to keep going end. diff --git a/src/riak_core_bg_manager_sup.erl b/src/riak_core_bg_manager_sup.erl new file mode 100644 index 000000000..739bf5a5e --- /dev/null +++ b/src/riak_core_bg_manager_sup.erl @@ -0,0 +1,47 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2013 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(riak_core_bg_manager_sup). +-behaviour(supervisor). + +%% beahvior functions +-export([start_link/0, + init/1 + ]). + +-define(CHILD(I,Type), {I,{I,start_link,[]},permanent,5000,Type,[I]}). + +%% begins the supervisor, init/1 will be called +start_link () -> + supervisor:start_link({local,?MODULE},?MODULE,[]). + +%% Supervisor of all background manager processes +%% +%% Lock manager (currently in bg_manager) and Token manager both use ETS +%% tables to manage persistent state. The table_manager is started from +%% our parent supervisor (riak_core_sup) and creates the tables used by +%% background managers. So, riak_core_table_manager needs to be started +%% before this supervisor. + +%% @private +init ([]) -> + {ok,{{one_for_all,10,10}, + [?CHILD(riak_core_bg_manager, worker), + ?CHILD(riak_core_token_manager, worker)]}}. diff --git a/src/riak_core_stat.erl b/src/riak_core_stat.erl index 1a1900f2d..6d44e59b8 100644 --- a/src/riak_core_stat.erl +++ b/src/riak_core_stat.erl @@ -179,7 +179,7 @@ vnodeq_stats() -> vnodeq_len(Pid) -> try element(2, erlang:process_info(Pid, message_queue_len)) - catch _ -> + catch _:_ -> 0 end. diff --git a/src/riak_core_sup.erl b/src/riak_core_sup.erl index 4115b41c4..c4d6655de 100644 --- a/src/riak_core_sup.erl +++ b/src/riak_core_sup.erl @@ -2,7 +2,7 @@ %% %% riak_core: Core Riak Application %% -%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file @@ -24,6 +24,9 @@ -behaviour(supervisor). +-include("riak_core_bg_manager.hrl"). +-include("riak_core_token_manager.hrl"). + %% API -export([start_link/0, stop_webs/0, restart_webs/0]). @@ -31,8 +34,17 @@ -export([init/1]). %% Helper macro for declaring children of supervisor --define(CHILD(I, Type, Timeout), {I, {I, start_link, []}, permanent, Timeout, Type, [I]}). --define(CHILD(I, Type), ?CHILD(I, Type, 5000)). +-define(DEFAULT_TIMEOUT, 5000). +-define(CHILD(I, Type, Timeout, Args), {I, {I, start_link, Args}, permanent, Timeout, Type, [I]}). +-define(CHILD(I, Type, Timeout), ?CHILD(I, Type, Timeout, [])). +-define(CHILD(I, Type), ?CHILD(I, Type, ?DEFAULT_TIMEOUT)). + +%% ETS tables to be created and maintained by riak_core_table_manager, which is not linked +%% to any processes except this supervisor. Please keep it that way so tables don't get lost +%% when their user processes crash. Implement ETS-TRANSFER handler for user processes. +-define(TBL_MGR_ARGS, [{?TM_ETS_TABLE, ?TM_ETS_OPTS}, + {?LM_ETS_TABLE, ?LM_ETS_OPTS} + ]). %% =================================================================== %% API functions @@ -56,7 +68,8 @@ restart_webs() -> init([]) -> Children = lists:flatten( [?CHILD(riak_core_sysmon_minder, worker), - ?CHILD(riak_core_bg_manager, worker), + ?CHILD(riak_core_table_manager, worker, ?DEFAULT_TIMEOUT, [?TBL_MGR_ARGS]), + ?CHILD(riak_core_bg_manager_sup, supervisor), ?CHILD(riak_core_vnode_sup, supervisor, 305000), ?CHILD(riak_core_eventhandler_sup, supervisor), ?CHILD(riak_core_ring_events, worker), diff --git a/src/riak_core_table_manager.erl b/src/riak_core_table_manager.erl new file mode 100644 index 000000000..2b41c9994 --- /dev/null +++ b/src/riak_core_table_manager.erl @@ -0,0 +1,187 @@ +%% ------------------------------------------------------------------- +%% +%% riak_core_table_manager: ETS table ownership and crash protection +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc A gen_server process that creates and serves as heir to a +%% ETS table; and coordinates with matching user processes. If a user +%% process exits, this server will inherit the ETS table and hand it +%% back to the user process when it restarts. +%% +%% For theory of operation, please see the web page: +%% http://steve.vinoski.net/blog/2011/03/23/dont-lose-your-ets-tables/ + +-module(riak_core_table_manager). + +-behaviour(gen_server). + +%% API +-export([start_link/1, + claim_table/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, {tables}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% +%% @end +%%-------------------------------------------------------------------- +-spec start_link([term()]) -> {ok, Pid::pid()} | ignore | {error, Error::term()}. +start_link(TableSpecs) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [TableSpecs], []). + + +%%-------------------------------------------------------------------- +%% @doc +%% Gives the registration table away to the caller, which should be +%% the registrar process. +%% +%% @end +%%-------------------------------------------------------------------- +-spec claim_table(atom()) -> ok. +claim_table(TableName) -> + gen_server:call(?SERVER, {claim_table, TableName}, infinity). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initializes the server +%% @end +%%-------------------------------------------------------------------- +-spec init([term()]) -> {ok, undefined}. +%% Tables :: [{TableName, [props]}] +%% Table specs are provided by the process that creates this table manager, +%% presumably a supervisor such as riak_core_sup. +init([TableSpecs]) -> + lager:debug("Table Manager starting up with tables: ~p", [TableSpecs]), + Tables = lists:foldl(fun(Spec, TT) -> create_table(Spec, TT) end, [], TableSpecs), + {ok, #state{tables=Tables}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% @end +%%-------------------------------------------------------------------- +-spec handle_call(Msg::term(), From::{pid(), term()}, State::term()) -> + {reply, Reply::term(), State::term()} | + {noreply, State::term()}. +%% TableName :: atom() +handle_call({claim_table, TableName}, {Pid, _Tag}, State) -> + %% The user process is (re-)claiming the table. Give it away. + %% We remain the heir in case the user process exits. + case lookup_table(TableName, State) of + undefined -> + %% Table does not exist, which is madness. + {reply, {undefined_table, TableName}, State}; + TableId -> + lager:debug("Giving away table ~p (~p) to ~p", [TableName, TableId, Pid]), + ets:give_away(TableId, Pid, undefined), + Reply = ok, + {reply, Reply, State} + end; + +handle_call(_Msg, _From, State) -> + {noreply, State}. + + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% @end +%%-------------------------------------------------------------------- +-spec handle_cast(term(), term()) -> {noreply, State::term()}. +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_info({'ETS-TRANSFER', TableId, FromPid, _HeirData}, State) -> + %% The table's user process exited and transferred the table back to us. + lager:debug("Table user process ~p exited, ~p received table ~p", [FromPid, self(), TableId]), + {noreply, State}; + +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% +%% @end +%%-------------------------------------------------------------------- +-spec terminate(term(), term()) -> ok. +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @end +%%-------------------------------------------------------------------- +-spec code_change(term(), term(), term()) -> {ok, term()}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +%% Create the initial table based on a table name and properties. +%% The table will eventually be given away by claim_table, but we +%% want to remain the heir in case the claimer crashes. +create_table({TableName, TableProps}, Tables) -> + TableId = ets:new(TableName, TableProps), + ets:setopts(TableId, [{heir, self(), undefined}]), + [{TableName, TableId} | Tables]. + +-spec lookup_table(TableName::atom(), State::term()) -> ets:tid() | undefined. +lookup_table(TableName, #state{tables=Tables}) -> + proplists:get_value(TableName, Tables). diff --git a/src/riak_core_tcp_mon.erl b/src/riak_core_tcp_mon.erl index f9922ac63..751b31d7e 100644 --- a/src/riak_core_tcp_mon.erl +++ b/src/riak_core_tcp_mon.erl @@ -206,7 +206,7 @@ handle_info({nodeup, Node, _InfoList}, State) -> lager:error("Could not get dist for ~p\n~p\n", [Node, DistCtrl]), {noreply, State}; Port -> - {noreply, add_dist_conn(Port, Node, State)} + {noreply, add_dist_conn(Node, Port, State)} end; handle_info({nodedown, Node, _InfoList}, State) -> diff --git a/src/riak_core_token_manager.erl b/src/riak_core_token_manager.erl new file mode 100644 index 000000000..e700fbd93 --- /dev/null +++ b/src/riak_core_token_manager.erl @@ -0,0 +1,705 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ./rebar skip_deps=true eunit suite=token_manager +%% +%% ------------------------------------------------------------------- +-module(riak_core_token_manager). + +-include("riak_core_token_manager.hrl"). + +-behaviour(gen_server). + + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +-export([start_link/0, + start_link/1, + enable/0, + enable/1, + disable/0, + disable/1, + token_rate/1, + set_token_rate/2, + get_token_async/3, + get_token_sync/3, + token_types/0, + tokens_given/0, + tokens_given/1, + tokens_blocked/0, + tokens_blocked/1 + ]). + +%% testing +-export([start/1]). + +%% reporting +-export([clear_history/0, + head/0, + head/1, + head/2, + head/3, + tail/0, + tail/1, + tail/2, + tail/3, + ps/0, + ps/1]). + +-define(SERVER, ?MODULE). + +%% We use an ETS table to store critical data. In the event this process crashes, +%% the table will be given back to the table manager and we can reclaim it when +%% we restart. Thus, token rates and states are maintained across restarts of the +%% module, but not of the application. Since we are supervised by riak_core_sup, +%% that's fine. +%% +%% The table must be a bag and is best if private. See ?TM_ETS_OPTS in MODULE.hrl. +%% Table Schema... +%% KEY Data Notes +%% --- ---- ----- +%% {info, TokenType} #token_info One object per key. +%% {given, TokenType} #token_entry Multiple objects per key. +%% {blocked, TokenType} queue of #token_entry(s) One queue object per key. + +-record(state, {table_id:: ets:tid(), %% TableID of ?TM_ETS_TABLE + %% None of the following data is persisted across process crashes. + enabled :: boolean(), %% Global enable/disable switch + %% stats + window :: orddict:orddict(), %% tm_token() -> tm_stat_hist() + history :: queue(), %% tm_token() -> queue of tm_stat_hist() + window_interval :: tm_period(), %% history window size in seconds + window_tref :: reference() %% reference to history window sampler timer + }). + +%% General settings of a token type. +-record(token_info, {rate :: tm_rate(), + enabled :: boolean()}). + +-define(rate(X), (X)#token_info.rate). +-define(enabled(X), (X)#token_info.enabled). +-define(DEFAULT_RATE, {0,0}). %% DO NOT CHANGE. DEFAULT SET TO ENFORCE "REGISTRATION" +-define(DEFAULT_TOKEN_INFO, #token_info{enabled=true, rate=?DEFAULT_RATE}). + +%% An instance of a token entry in "given" or "blocked" +-record(token_entry, {token :: tm_token(), + pid :: pid(), + meta :: tm_meta(), + from :: {pid(), term()}, + state :: given | blocked + }). %% undefined unless on queue + +-define(TOKEN_ENTRY(Type, Pid, Meta, From, Status), + #token_entry{token=Type, pid=Pid, meta=Meta, from=From, state=Status}). +-define(token(X), (X)#token_entry.token). +-define(pid(X), (X)#token_entry.pid). +-define(meta(X), (X)#token_entry.meta). +-define(from(X), (X)#token_entry.from). + +%% Stats + +clear_history() -> + gen_server:cast(?SERVER, clear_history). + +%% List history of token manager +%% @doc show history of token request/grants over default and custom intervals. +%% offset is forwards-relative to the oldest sample interval +-spec head() -> [[tm_stat_hist()]]. +head() -> + head(all). +-spec head(tm_token()) -> [[tm_stat_hist()]]. +head(Token) -> + head(Token, ?DEFAULT_TM_OUTPUT_SAMPLES). +-spec head(tm_token(), tm_count()) -> [[tm_stat_hist()]]. +head(Token, NumSamples) -> + head(Token, 0, NumSamples). +-spec head(tm_token(), tm_count(), tm_count()) -> [[tm_stat_hist()]]. +head(Token, Offset, NumSamples) -> + gen_server:call(?SERVER, {head, Token, Offset, NumSamples}, infinity). + +%% @doc return history of token request/grants over default and custom intervals. +%% offset is backwards-relative to the newest sample interval +-spec tail() -> [[tm_stat_hist()]]. +tail() -> + tail(all). +-spec tail(tm_token()) -> [[tm_stat_hist()]]. +tail(Token) -> + tail(Token, ?DEFAULT_TM_OUTPUT_SAMPLES). +-spec tail(tm_token(), tm_count()) -> [[tm_stat_hist()]]. +tail(Token, NumSamples) -> + tail(Token, NumSamples, NumSamples). +-spec tail(tm_token(), tm_count(), tm_count()) -> [[tm_stat_hist()]]. +tail(Token, Offset, NumSamples) -> + gen_server:call(?SERVER, {tail, Token, Offset, NumSamples}, infinity). + +%% @doc List most recent requests/grants for tokens of all token types +-spec ps() -> [tm_stat_live()]. +ps() -> + ps(all). +%% @doc List most recent requests/grants for tokens for given token type +-spec ps(tm_token()) -> [tm_stat_live()]. +ps(Token) -> + gen_server:call(?SERVER, {ps, Token}, infinity). + +%% @doc Set the refill rate of tokens. +-spec set_token_rate(tm_token(), tm_rate()) -> ok. +set_token_rate(Type, Rate) -> + gen_server:call(?SERVER, {set_token_rate, Type, Rate}, infinity). + +-spec token_rate(tm_token()) -> tm_rate(). +token_rate(Type) -> + gen_server:call(?SERVER, {token_rate, Type}, infinity). + +token_types() -> + gen_server:call(?SERVER, token_types, infinity). + +%% @doc Return a list of all tokens in current given set +tokens_given() -> + gen_server:call(?SERVER, tokens_given, infinity). + +%% @doc Return a list of all tokens of type Type in current given set +tokens_given(Type) -> + gen_server:call(?SERVER, {tokens_given, Type}, infinity). + +%% @doc Return a list of all blocked tokens +tokens_blocked() -> + gen_server:call(?SERVER, tokens_blocked, infinity). + +%% @doc Return a list of all blocked tokens of type Type +tokens_blocked(Type) -> + gen_server:call(?SERVER, {tokens_blocked, Type}, infinity). + +%% @doc Asynchronously get a token of kind Type. +%% Associate token with provided pid and metadata. +%% Returns "max_tokens" if empty. +-spec get_token_async(tm_token(), pid(), [tm_meta()]) -> ok | max_tokens. +get_token_async(Type, Pid, Meta) -> + gen_server:call(?SERVER, {get_token_async, Type, Pid, Meta}, infinity). + +%% @doc Synchronously get a token of kind Type. +%% Associate token with provided pid and metadata. +%% Returns "max_tokens" if empty. +-spec get_token_sync(tm_token(), pid(), [tm_meta()]) -> ok | max_tokens. +get_token_sync(Type, Pid, Meta) -> + gen_server:call(?SERVER, {get_token_sync, Type, Pid, Meta}, infinity). + +%% @doc Enable handing out of any tokens +-spec enable() -> ok. +enable() -> + gen_server:cast(?SERVER, enable). + +%% @doc Disable handing out of any tokens +-spec disable() -> ok. +disable() -> + gen_server:cast(?SERVER, disable). + +%% @doc Enable handing out of tokens of the given type. +-spec enable(tm_token()) -> ok. +enable(Type) -> + gen_server:cast(?SERVER, {enable, Type}). + + +%% @doc Siable handing out any tokens of the given type. +-spec disable(tm_token()) -> ok. +disable(Type) -> + gen_server:cast(?SERVER, {disable, Type}). + +%% @doc Starts the server +-spec start_link() -> {ok, pid()} | ignore | {error, term}. +start_link() -> + start_link(?DEFAULT_TM_SAMPLE_WINDOW). + +-spec start_link(tm_period()) -> {ok, pid()} | ignore | {error, term}. +start_link(Interval) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [Interval], []). + +%% Test entry point to start stand-alone server +start(Interval) -> + gen_server:start({local, ?SERVER}, ?MODULE, [Interval], []). + +%%%% Gen Server %%%%%%% + +%% @private +%% @doc Initializes the server with a history window interval in seconds, +%% defaults to 1 minute if empty list supplied. +-spec init([tm_period()] | []) -> {ok, #state{}} | + {ok, #state{}, non_neg_integer() | infinity} | + ignore | + {stop, term()}. +init([]) -> + init([?DEFAULT_TM_SAMPLE_WINDOW]); +init([Interval]) -> + lager:debug("Token Manager starting up."), + %% claiming the table will result in a handle_info('ETS-TRANSFER', ...) message. + ok = riak_core_table_manager:claim_table(?TM_ETS_TABLE), + State = #state{table_id=undefined, %% resolved in the ETS-TRANSFER handler + window=orddict:new(), + history=queue:new(), + enabled=true, + window_interval=Interval}, + State2 = schedule_sample_history(State), + {ok, State2}. + +%% @private +%% @doc Handling call messages +-spec handle_call(term(), {pid(), term()}, #state{}) -> + {reply, term(), #state{}} | + {reply, term(), #state{}, non_neg_integer()} | + {noreply, #state{}} | + {noreply, #state{}, non_neg_integer()} | + {stop, term(), term(), #state{}} | + {stop, term(), #state{}}. +handle_call({token_rate, TokenType}, _From, State) -> + Rate = ?rate(token_info(TokenType, State)), + {reply, Rate, State}; +handle_call({set_token_rate, TokenType, Rate}, _From, State) -> + {OldRate, State2} = do_set_token_rate(TokenType, Rate, State), + {reply, OldRate, State2}; +handle_call(token_types, _From, State) -> + Result = do_token_types(State), + {reply, Result, State}; +handle_call(tokens_given, _From, State) -> + Result = do_ps(all, [given], State), + {reply, Result, State}; +handle_call({tokens_given, Token}, _From, State) -> + Result = do_ps(Token, [given], State), + {reply, Result, State}; +handle_call(tokens_blocked, _From, State) -> + Result = do_ps(all, [blocked], State), + {reply, Result, State}; +handle_call({tokens_blocked, Token}, _From, State) -> + Result = do_ps(Token, [blocked], State), + {reply, Result, State}; +handle_call({get_token_async, Type, Pid, Meta}, _From, State) -> + do_get_token_async(Type, Pid, Meta, State); +handle_call({get_token_sync, Type, Pid, Meta}, From, State) -> + do_get_token_sync(Type, Pid, Meta, From, State); +handle_call({head, Token, Offset, Count}, _From, State) -> + Result = do_hist(head, Token, Offset, Count, State), + {reply, Result, State}; +handle_call({tail, Token, Offset, Count}, _From, State) -> + Result = do_hist(tail, Token, Offset, Count, State), + {reply, Result, State}; +handle_call({ps, Token}, _From, State) -> + Result = do_ps(Token, [given, blocked], State), + {reply, Result, State}; +handle_call(Call, _From, State) -> + lager:warning("Unhandled call: ~p", [Call]), + Reply = {unhanded_call, Call}, + {reply, Reply, State}. + +handle_cast(clear_history, State) -> + State2 = do_clear_history(State), + {noreply, State2}; +handle_cast({enable, Type}, State) -> + State2 = enable_token(Type, State), + {noreply, State2}; +handle_cast({disable, Type}, State) -> + State2 = disable_token(Type, State), + {noreply, State2}; +handle_cast(enable, State) -> + State2 = State#state{enabled=true}, + {noreply, State2}; +handle_cast(disable, State) -> + State2 = State#state{enabled=false}, + {noreply, State2}; +handle_cast(Cast, State) -> + lager:warning("Unhandled cast: ~p", [Cast]), + Reply = {unhandled_cast, Cast}, + {reply, Reply, State}. + +handle_info(sample_history, State) -> + State2 = schedule_sample_history(State), + State3 = do_sample_history(State2), + {noreply, State3}; +handle_info({refill_tokens, Type}, State) -> + State2 = do_refill_tokens(Type, State), + schedule_refill_tokens(Type, State2), + {noreply, State2}; +%% Handle transfer of ETS table from table manager +handle_info({'ETS-TRANSFER', TableId, Pid, _Data}, State) -> + lager:debug("table_mgr (~p) -> token_mgr (~p) receiving ownership of TableId: ~p", [Pid, self(), TableId]), + State2 = State#state{table_id=TableId}, + reschedule_token_refills(State2), + {noreply, State2}; +handle_info({'DOWN', Ref, _, _, _}, State) -> + lager:debug("Linked process died with ref ~p: ", [Ref]), + {noreply, State}; +handle_info(Info, State) -> + lager:warning("Unhandled info: ~p", [Info]), + {noreply, State}. + +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec terminate(term(), #state{}) -> term(). +terminate(_Reason, _State) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec code_change(term() | {down, term()}, #state{}, term()) -> {ok, #state{}}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +%% Get stat history for given token type from sample set +-spec stat_window(tm_token(), orddict:orddict()) -> tm_stat_hist(). +stat_window(TokenType, Window) -> + case orddict:find(TokenType, Window) of + error -> ?DEFAULT_TM_STAT_HIST; + {ok, StatHist} -> StatHist + end. + +%% return list of registered TokenType(s) +do_token_types(#state{table_id=TableId}) -> + %% match against info objects in table and return just the TokenType + [TokenType || {{info, TokenType},_} <- ets:match_object(TableId, {{info, '_'},'_'})]. + +do_set_token_rate(TokenType, Rate, State) -> + OldRate = ?rate(token_info(TokenType, State)), + State2 = update_token_rate(TokenType, Rate, State), + schedule_refill_tokens(TokenType, State2), + %% maybe reschedule blocked callers + State3 = maybe_unblock_blocked(TokenType, State2), + {OldRate, State3}. + +%% erase saved history +do_clear_history(State=#state{window_tref=TRef}) -> + erlang:cancel_timer(TRef), + State2 = State#state{history=queue:new()}, + schedule_sample_history(State2). + +%% Return stats history from head or tail of stats history queue +do_hist(End, TokenType, Offset, Count, State) when Offset =< 0 -> + do_hist(End, TokenType, 1, Count, State); +do_hist(End, TokenType, Offset, Count, State) when Count =< 0 -> + do_hist(End, TokenType, Offset, ?DEFAULT_TM_OUTPUT_SAMPLES, State); +do_hist(End, TokenType, Offset, Count, #state{history=HistQueue}) -> + QLen = queue:len(HistQueue), + First = max(1, case End of + head -> Offset; + tail -> QLen - Offset + 1 + end), + Last = min(QLen, max(First + Count - 1, 1)), + case segment_queue(First, Last, HistQueue) of + empty -> []; + {ok, Hist } -> + case TokenType of + all -> + StatsDictList = queue:to_list(Hist), + [orddict:to_list(Stat) || Stat <- StatsDictList]; + _T -> + [[{TokenType, stat_window(TokenType, StatsDict)}] || StatsDict <- queue:to_list(Hist)] + end + end. + +segment_queue(First, Last, Queue) -> + QLen = queue:len(Queue), + case QLen >= Last andalso QLen > 0 of + true -> + %% trim off extra tail, then trim head + Front = case QLen == Last of + true -> Queue; + false -> + {QFirst, _QRest} = queue:split(Last, Queue), + QFirst + end, + case First == 1 of + true -> {ok, Front}; + false -> + {_Skip, Back} = queue:split(First-1, Front), + {ok, Back} + end; + false -> + %% empty + empty + end. + + +format_entry(Entry) -> + #tm_stat_live + { + token = Entry#token_entry.token, + consumer = Entry#token_entry.pid, + meta = Entry#token_entry.meta, + state = Entry#token_entry.state + }. + +update_token_queue(TokenType, TokenQueue, State=#state{table_id=TableId}) -> + Key = {blocked, TokenType}, + Object = {Key, TokenQueue}, + %% replace existing queue. Must delete existing one since we're using a bag table + ets:delete(TableId, Key), + ets:insert(TableId, Object), + State. + +token_queue(TokenType, #state{table_id=TableId}) -> + Key = {blocked, TokenType}, + case ets:lookup(TableId, Key) of + [] -> queue:new(); + [{Key,TokenQueue}] -> TokenQueue + end. + +all_blocked_queues(#state{table_id=TableId}) -> + %% there is just one queue per token type. More like a "set". The queue is in the table! + [Queue || {{blocked, _Token}, Queue} <- ets:match_object(TableId, {{blocked, '_'},'_'})]. + +fmt_live_tokens(Entries) -> + [format_entry(Entry) || Entry <- Entries]. + +%% Status :: [given | blocked] +do_ps(all, Status, State) -> + E1 = case lists:member(given, Status) of + true -> + lists:flatten(all_given_entries(State)); + false -> + [] + end, + E2 = case lists:member(blocked, Status) of + true -> + Queues = all_blocked_queues(State), + E1 ++ lists:flatten( + [[Entry || Entry <- queue:to_list(Q)] || Q <- Queues]); + false -> + E1 + end, + fmt_live_tokens(E2); +do_ps(TokenType, Status, State) -> + E1 = case lists:member(given, Status) of + true -> + tokens_given(TokenType, State); + false -> + [] + end, + E2 = case lists:member(blocked, Status) of + true -> + E1 ++ queue:to_list(token_queue(TokenType, State)); + false -> + E1 + end, + fmt_live_tokens(E2). + +%% Possibly send replies to processes blocked on tokens of Type. +%% Returns new State. +give_available_tokens(Type, 0, TokenQueue, State) -> + %% no more available tokens to give out + update_token_queue(Type, TokenQueue, State); +give_available_tokens(Type, NumAvailable, TokenQueue, State) -> + case queue:out(TokenQueue) of + {empty, _Q} -> + %% no more blocked entries + update_token_queue(Type, TokenQueue, State); + {{value, Entry}, TokenQueue2} -> + %% queue entry to unblock + Pid = ?pid(Entry), + Meta = ?meta(Entry), + From = ?from(Entry), + %% account for given token + State2 = give_token(Type, Pid, Meta, State), + %% send reply to blocked caller, unblocking them. + gen_server:reply(From, ok), + %% unblock next blocked in queue + give_available_tokens(Type, NumAvailable-1,TokenQueue2,State2) + end. + +%% For the given type, check the current given count and if less +%% than the rate limit, give out as many tokens as are available +%% to callers on the blocked list. They need a reply because they +%% made a gen_server:call() that we have not replied to yet. +maybe_unblock_blocked(Type, State) -> + Entries = tokens_given(Type, State), + {_Period, MaxCount} = ?rate(token_info(Type, State)), + PosNumAvailable = erlang:max(MaxCount - length(Entries), 0), + Queue = token_queue(Type, State), + give_available_tokens(Type, PosNumAvailable, Queue, State). + +%% Get existing token type info from ETS table and schedule all for refill. +%% This is needed because we just reloaded our saved persisent state data +%% after a crash. +reschedule_token_refills(State) -> + Types = do_token_types(State), + [schedule_refill_tokens(Type, State) || Type <- Types]. + +%% Schedule a timer event to refill tokens of given type +schedule_refill_tokens(Type, State) -> + {Period, _Count} = ?rate(token_info(Type, State)), + erlang:send_after(Period*1000, self(), {refill_tokens, Type}). + +%% Schedule a timer event to snapshot the current history +schedule_sample_history(State=#state{window_interval=Interval}) -> + TRef = erlang:send_after(Interval*1000, self(), sample_history), + State#state{window_tref=TRef}. + +do_sample_history(State=#state{window=Window, history=Histories}) -> + %% Move the current window of measurements onto the history queues. + %% Trim queue down to DEFAULT_TM_KEPT_SAMPLES if too big now. + Queue2 = queue:in(Window, Histories), + Trimmed = case queue:len(Queue2) > ?DEFAULT_TM_KEPT_SAMPLES of + true -> + {_Discarded, Rest} = queue:out(Queue2), + Rest; + false -> + Queue2 + end, + EmptyWindow = orddict:new(), + State#state{window=EmptyWindow, history=Trimmed}. + +update_stat_window(TokenType, Fun, Default, State=#state{window=Window}) -> + NewWindow = orddict:update(TokenType, Fun, Default, Window), + State#state{window=NewWindow}. + +default_refill(Token, State) -> + {_Rate, Limit} = ?rate(token_info(Token, State)), + ?DEFAULT_TM_STAT_HIST#tm_stat_hist{refills=1, limit=Limit}. + +default_given(Token, State) -> + {_Rate, Limit} = ?rate(token_info(Token, State)), + ?DEFAULT_TM_STAT_HIST#tm_stat_hist{given=1, limit=Limit}. + +increment_stat_refills(Token, State) -> + update_stat_window(Token, + fun(Stat) -> Stat#tm_stat_hist{refills=1+Stat#tm_stat_hist.refills} end, + default_refill(Token, State), + State). + +increment_stat_given(Token, State) -> + update_stat_window(Token, + fun(Stat) -> Stat#tm_stat_hist{given=1+Stat#tm_stat_hist.given} end, + default_given(Token, State), + State). + +increment_stat_blocked(Token, State) -> + {_Rate, Limit} = ?rate(token_info(Token, State)), + update_stat_window(Token, + fun(Stat) -> Stat#tm_stat_hist{blocked=1+Stat#tm_stat_hist.blocked} end, + ?DEFAULT_TM_STAT_HIST#tm_stat_hist{blocked=1, limit=Limit}, + State). + +%% Token refill timer event handler. +%% Capture stats of what was given in the previous period, +%% Clear all tokens of this type from the given set, +%% Unblock blocked processes if possible. +do_refill_tokens(Type, State) -> + State2 = increment_stat_refills(Type, State), + remove_given_entries(Type, State), + maybe_unblock_blocked(Type, State2). + +token_info(TokenType, #state{table_id=TableId}) -> + Key = {info,TokenType}, + case ets:lookup(TableId, Key) of + [] -> ?DEFAULT_TOKEN_INFO; + [{_Key,TokenInfo}] -> TokenInfo; + [First | _Rest] -> + lager:error("Unexpected multiple instances of key ~p in table", [{info, TokenType}]), + First %% try to keep going + end. + +enable_token(TokenType, State) -> + update_token_enabled(TokenType, true, State). + +disable_token(TokenType, State) -> + update_token_enabled(TokenType, false, State). + +update_token_enabled(TokenType, Value, State) -> + update_token_info(TokenType, + fun(TokenInfo) -> TokenInfo#token_info{enabled=Value} end, + ?DEFAULT_TOKEN_INFO#token_info{enabled=Value}, + State). + +update_token_rate(TokenType, Rate, State) -> + update_token_info(TokenType, + fun(TokenInfo) -> TokenInfo#token_info{rate=Rate} end, + ?DEFAULT_TOKEN_INFO#token_info{rate=Rate}, + State). + +update_token_info(TokenType, Fun, Default, State=#state{table_id=TableId}) -> + Key = {info, TokenType}, + NewInfo = case ets:lookup(TableId, Key) of + [] -> Default; + [{_Key,TokenInfo}] -> Fun(TokenInfo) + end, + ets:insert(TableId, {Key, NewInfo}), + State. + +all_given_entries(#state{table_id=TableId}) -> + %% multiple entries per token type, i.e. uses the "bag" + [Entry || {{given, _Token}, Entry} <- ets:match_object(TableId, {{given, '_'},'_'})]. + +tokens_given(Type, #state{table_id=TableId}) -> + Key = {given, Type}, + [Given || {_K,Given} <- ets:lookup(TableId, Key)]. + +add_given_entry(TokenType, Entry, TableId) -> + Key = {given, TokenType}, + ets:insert(TableId, {Key, Entry}). + +remove_given_entries(TokenType, #state{table_id=TableId}) -> + Key = {given, TokenType}, + ets:delete(TableId, Key). + +%% Add a token of type to our given set and remove from blocked set +give_token(Type, Pid, Meta, State=#state{table_id=TableId}) -> + Entry = ?TOKEN_ENTRY(Type, Pid, Meta, undefined, given), + add_given_entry(Type, Entry, TableId), + %% update given stats + increment_stat_given(Type, State). + +%% Put a token request on the blocked queue. We'll reply later when a token +%% becomes available +enqueue_request(Type, Pid, Meta, From, State) -> + OldQueue = token_queue(Type, State), + NewQueue = queue:in(?TOKEN_ENTRY(Type, Pid, Meta, From, blocked), OldQueue), + %% update blocked stats + State2 = increment_stat_blocked(Type, State), + %% Put new queue back in state + update_token_queue(Type, NewQueue, State2). + +%% reply ok now if available or max_tokens if not. Non-blocking +do_get_token_async(Type, Pid, Meta, State) -> + Info = token_info(Type, State), + Entries = tokens_given(Type, State), + {_Period, MaxCount} = ?rate(Info), + case length(Entries) < MaxCount of + true -> + %% tokens are available + {reply, ok, give_token(Type, Pid, Meta, State)}; + false -> + {reply, max_tokens, State} + end. + +%% reply now if available or reply later if en-queued. Blocking +do_get_token_sync(Type, Pid, Meta, From, State) -> + case do_get_token_async(Type, Pid, Meta, State) of + {reply, max_tokens, _S} -> + {noreply, enqueue_request(Type, Pid, Meta, From, State)}; + {reply, ok, State2} -> + {reply, ok, State2} + end. diff --git a/src/riak_core_util.erl b/src/riak_core_util.erl index d26b50333..1047887e4 100644 --- a/src/riak_core_util.erl +++ b/src/riak_core_util.erl @@ -176,11 +176,19 @@ integer_to_list(I0, Base, R0) -> integer_to_list(I1, Base, R1) end. +-ifndef(old_hash). +sha(Bin) -> + crypto:hash(sha, Bin). +-else. +sha(Bin) -> + crypto:sha(Bin). +-endif. + %% @spec unique_id_62() -> string() %% @doc Create a random identifying integer, returning its string %% representation in base 62. unique_id_62() -> - Rand = crypto:sha(term_to_binary({make_ref(), os:timestamp()})), + Rand = sha(term_to_binary({make_ref(), os:timestamp()})), <> = Rand, integer_to_list(I, 62). diff --git a/test/token_manager_tests.erl b/test/token_manager_tests.erl new file mode 100644 index 000000000..66f36841c --- /dev/null +++ b/test/token_manager_tests.erl @@ -0,0 +1,292 @@ +-module(token_manager_tests). +-compile(export_all). + +-include_lib("riak_core_token_manager.hrl"). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +-define(TOK_MGR, riak_core_token_manager). + +token_mgr_test_() -> + {timeout, 60000, %% Seconds to finish all of the tests + {setup, fun() -> + riak_core_table_manager:start_link([{?TM_ETS_TABLE, ?TM_ETS_OPTS}]), + start_token_mgr() + end, + fun(_) -> ok end, %% cleanup + fun(_) -> + [ %% Tests + { "set/get token rates + verify rates", + fun() -> + setup_token_rates(), + verify_token_rates() + end}, + + { "crash token manager + verify rates persist", + fun() -> + crash_and_restart_token_manager(), + verify_token_rates() + end}, + + { "head - empty", + fun() -> + %% we haven't taken a sample yet, so history is empty + Hist = ?TOK_MGR:head(), + ?assertEqual([], Hist) + end}, + + { "token types", + fun() -> + Types = ?TOK_MGR:token_types(), + ?assertEqual(lists:sort(expected_token_types()), lists:sort(Types)) + end}, + + { "no tokens given", + fun() -> + Given = ?TOK_MGR:tokens_given(), + ?assertEqual([], Given) + end}, + + { "get_token_sync + tokens_given", + fun() -> + Meta1 = [{foo,bar}], + Meta2 = [{moo,tar,blar}], + Pid = self(), + ok = ?TOK_MGR:get_token_async(token1, Pid, Meta1), + ok = ?TOK_MGR:get_token_async(token2, Pid, Meta2), + AllGiven = ?TOK_MGR:tokens_given(), + ?assertEqual(lists:sort([make_live_stat(token1, Pid, Meta1, given), + make_live_stat(token2, Pid, Meta2, given)]), + lists:sort(AllGiven)), + Given = ?TOK_MGR:tokens_given(token1), + ?assertEqual([make_live_stat(token1, Pid, Meta1, given)], Given) + end}, + + { "get_token_sync + tokens_given + max_concurrency", + fun() -> + ?assertNot([] == ?TOK_MGR:tokens_given()), + %% let tokens refill and confirm available + MaxPeriod = (max_token_period() * 1000) + 100, + timer:sleep(MaxPeriod), + ?assertEqual([], ?TOK_MGR:tokens_given()), + + Pid = self(), + DoOne = + fun(Token) -> + %% Max out the tokens for this period + N = expected_token_limit(Token), + [ok = ?TOK_MGR:get_token_async(Token,Pid,[{meta,M}]) + || M <- lists:seq(1,N)], + Expected = [make_live_stat(Token,Pid,[{meta,M}],given) + || M <- lists:seq(1,N)], + %% try to get another, but it should fail + ?assertEqual(max_tokens, ?TOK_MGR:get_token_async(Token,Pid,{meta,N+1})), + Given = ?TOK_MGR:tokens_given(Token), + ?assertEqual(Expected, Given) + end, + %% all of our test token types + Tokens = expected_token_types(), + %% can get requested and nothing should balk + [DoOne(Token) || Token <- Tokens], + %% Does the total number of given tokens add up the expected limit? + TotalLimits = lists:foldl(fun(Limit,Sum) -> Limit+Sum end, 0, + [expected_token_limit(Token) || Token <- Tokens]), + AllGiven = ?TOK_MGR:tokens_given(), + ?assertEqual(TotalLimits, erlang:length(AllGiven)) + end}, + + { "get_token_sync blocking", + fun() -> + %% all the tokens have been max'd out now. + %% start sub-process that will block on a token + Meta = [{i,am,blocked}], + TestPid = self(), + Pid = spawn(fun() -> + TestPid ! {blocking, self()}, + ok = ?TOK_MGR:get_token_sync(token1, self(), Meta), + TestPid ! {unblocked, self()} + end), + receive + {blocking, Pid} -> + timer:sleep(10), + %% check that we have one on the waiting list + Blocked1 = ?TOK_MGR:tokens_blocked(), + ?assertEqual([make_live_stat(token1, Pid, Meta, blocked)], Blocked1) + end, + %% let tokens refill and confirm available and blocked is now given + MaxPeriod = (max_token_period() * 1000) + 100, + receive + {unblocked,Pid} -> + %% check that our token was given and the blocked queue is empty + Blocked2 = ?TOK_MGR:tokens_blocked(), + Given2 = ?TOK_MGR:tokens_given(token1), + ?assertEqual([make_live_stat(token1, Pid, Meta, given)], Given2), + ?assertEqual([],Blocked2) + after MaxPeriod -> + ?assert(false) + end + end}, + + { "head - with samples", + fun() -> + %% + Hist = ?TOK_MGR:head(), + ?assertNot([] == Hist) + end}, + + { "clear history", + fun() -> + ?TOK_MGR:clear_history(), + Hist = ?TOK_MGR:head(), + ?assertEqual([], Hist) + end}, + + { "clear all tokens given and confirm none given", + fun() -> + %% wait a little longer than longest refill time. + MaxPeriod = (max_token_period() * 1000) + 100, + timer:sleep(MaxPeriod), + %% confirm none given now + ?assertEqual([], ?TOK_MGR:tokens_given()) + end}, + + { "synchronous gets and ps/history", + fun() -> + Tokens = expected_token_types(), + %% clear history and let 1 sample be taken with empty stats + %% Sample1: empty + ?TOK_MGR:clear_history(), + E1 = [{Token, make_hist_stat(expected_token_limit(Token),1,0,0)} || Token <- Tokens], + timer:sleep(1000), + %% Sample2: 1 given for each token + E2 = [get_n_tokens(Token, self(), 1) || Token <- Tokens], + timer:sleep(1000), + %% Sample3: max given for each token, plus 2 blocked + E3 = [max_out_plus(Token, self(), 2) || Token <- Tokens], + + %% let all spawned procs run + timer:sleep(100), + + PS = ?TOK_MGR:ps(), + Given = ?TOK_MGR:tokens_given(), + Blocked = ?TOK_MGR:tokens_blocked(), + ?assertNot(PS == []), + ?assertEqual(PS, Given++Blocked), + + timer:sleep(1000-100), + %% verify history + %% TODO: refills is hard to calculate and needs a better solution + ?assertEqual([E1,E2,E3], ?TOK_MGR:head()), + ?assertEqual([E1], ?TOK_MGR:head(all,1)), + ?assertEqual([E2], ?TOK_MGR:head(all,2,1)), %% head(offset, count) + ?assertEqual([E3], ?TOK_MGR:head(all,3,1)), + ?assertEqual([E1,E2], ?TOK_MGR:head(all,2)), + ?assertEqual([E2,E3], ?TOK_MGR:head(all,2,2)), + ?assertEqual([E1,E2,E3], ?TOK_MGR:head(all,3)), + ?assertEqual([E3], ?TOK_MGR:tail(all,1)), + ?assertEqual([E2,E3], ?TOK_MGR:tail(all,2)), %% tail(offset) + ?assertEqual([E1,E2,E3], ?TOK_MGR:tail(all,3)), + ?assertEqual([E2,E3], ?TOK_MGR:tail(all,2,2)), %% tail(offset,count) + ?assertEqual([E2], ?TOK_MGR:tail(all,2,1)), + %% test range guards on head/tail + ?assertEqual([E1,E2,E3], ?TOK_MGR:head(all,0)), + ?assertEqual([E1,E2,E3], ?TOK_MGR:head(all,-1)), + ?assertEqual([E3], ?TOK_MGR:tail(all,0)), + ?assertEqual([E3], ?TOK_MGR:tail(all,-1)), + + %% calling head on a specific token yields a list of lists, + %% but each "of lists" has only one thing in it. + [check_head_token(Token, [E1,E2,E3]) || Token <- Tokens] + + end} + + ] end} + }. + +check_head_token(Token, StatsList) -> + F1 = fun(Stats) -> filter_stat(Token, Stats) end, + ?assertEqual(?TOK_MGR:head(Token), lists:map(F1, StatsList)). + +filter_stat(Token, Stats) -> + lists:filter(fun({T,_Stat}) -> Token==T end, Stats). + +spawn_sync_request(Token, Pid, Meta) -> + spawn(fun() -> ok = ?TOK_MGR:get_token_sync(Token, Pid, Meta) end). + +get_n_tokens(Token, Pid, N) -> + Limit = expected_token_limit(Token), + [spawn_sync_request(Token,Pid,[{meta,M}]) || M <- lists:seq(1,N)], + {Token, make_hist_stat(Limit, 1, N, 0)}. + +max_out_plus(Token, Pid, X) -> + %% Max out the tokens for this period and request X more than Max + N = expected_token_limit(Token), + get_n_tokens(Token, Pid, N+X), + %% Hack alert: given is N + X becuase we "know" we got a refill in this period. + {Token, make_hist_stat(N, 1, N+X, X)}. + +make_hist_stat(Limit, Refills, Given, Blocked) -> + #tm_stat_hist + { + limit=Limit, + refills=Refills, + given=Given, + blocked=Blocked + }. + +make_live_stat(Token, Pid, Meta, Status) -> + #tm_stat_live + { + token=Token, + consumer=Pid, + meta=Meta, + state=Status + }. + +-spec some_token_rates() -> [{tm_token(), {tm_period(), tm_count()}}]. +some_token_rates() -> + [ {token1, {1, 5}}, + {token2, {1, 4}}, + {{token3,stuff3}, {1, 3}} + ]. + +expected_token_types() -> + [ Type || {Type, _Rate} <- some_token_rates()]. + +max_token_period() -> + lists:foldl(fun({_T,{Period,_Limit}},Max) -> erlang:max(Period,Max) end, 0, some_token_rates()). + +expected_token_limit(Token) -> + {_Period, Limit} = proplists:get_value(Token, some_token_rates()), + Limit. + +setup_token_rates() -> + [?TOK_MGR:set_token_rate(Type, Rate) || {Type, Rate} <- some_token_rates()]. + +verify_token_rate(Type, ExpectedRate) -> + Rate = ?TOK_MGR:token_rate(Type), + ?assertEqual(ExpectedRate, Rate). + +verify_token_rates() -> + [verify_token_rate(Type, Rate) || {Type, Rate} <- some_token_rates()], + %% check un-registered token is not setup + Rate = ?TOK_MGR:token_rate(bogusToken), + DefaultRate = {0,0}, + ?assertEqual(DefaultRate, Rate). + +%% start a stand-alone server, not linked, so that when we crash it, it +%% doesn't take down our test too. +start_token_mgr() -> + %% setup with history window to 1 seconds + ?TOK_MGR:start(1). + +crash_and_restart_token_manager() -> + Pid = erlang:whereis(?TOK_MGR), + ?assertNot(Pid == undefined), + erlang:exit(Pid, kill), + timer:sleep(100), + start_token_mgr(), + timer:sleep(100). + +-endif.