Permalink
Browse files

Move blocked and given lists to ETS.

  • Loading branch information...
1 parent 19abe9a commit 3cf55bccfa6c4119852bf4d1ea701d1298f34f34 @buddhisthead buddhisthead committed Sep 5, 2013
Showing with 66 additions and 44 deletions.
  1. +1 −1 include/riak_core_token_manager.hrl
  2. +64 −41 src/riak_core_token_manager.erl
  3. +1 −2 test/token_manager_tests.erl
@@ -50,4 +50,4 @@
-define(DEFAULT_TM_OUTPUT_SAMPLES, 20). %% default number of sample windows displayed
-define(DEFAULT_TM_KEPT_SAMPLES, 10000). %% number of history samples to keep
-define(TM_ETS_TABLE, token_mgr_table). %% name of private token manager ETS table
--define(TM_ETS_OPTS, [private, set]). %% creation time properties of token manager ETS table
+-define(TM_ETS_OPTS, [private, bag]). %% creation time properties of token manager ETS table
@@ -66,9 +66,22 @@
-define(SERVER, ?MODULE).
--record(state, {table_id:: ets:tid(), %% TableID of ?TM_ETS_TABLE
- blocked :: orddict:orddict(), %% tm_token() -> queue of token_entry()
- given :: orddict:orddict(), %% tm_token() -> [token_entry()]
+%% We use an ETS table to store critical data. In the event this process crashes,
+%% the table will be given back to the table manager and we can reclaim it when
+%% we restart. Thus, token rates and states are maintained across restarts of the
+%% module, but not of the application. Since we are supervised by riak_core_sup,
+%% that's fine.
+%%
+%% The table must be a bag and is best if private. See ?TM_ETS_OPTS in MODULE.hrl.
+%% Table Schema...
+%% KEY Data Notes
+%% --- ---- -----
+%% {info, TokenType} #token_info One object per key.
+%% {given, TokenType} #token_entry Multiple objects per key.
+%% {blocked, TokenType} queue of #token_entry(s) One queue object per key.
+
+-record(state, {table_id:: ets:tid(), %% TableID of ?TM_ETS_TABLE
+ %% None of the following data is persisted across process crashes.
enabled :: boolean(), %% Global enable/disable switch
%% stats
window :: orddict:orddict(), %% tm_token() -> tm_stat_hist()
@@ -230,8 +243,6 @@ init([Interval]) ->
%% claiming the table will result in a handle_info('ETS-TRANSFER', ...) message.
ok = riak_core_table_manager:claim_table(?TM_ETS_TABLE),
State = #state{table_id=undefined, %% resolved in the ETS-TRANSFER handler
- given=orddict:new(),
- blocked=orddict:new(),
window=orddict:new(),
history=queue:new(),
enabled=true,
@@ -399,7 +410,6 @@ do_hist(End, TokenType, Offset, Count, #state{history=HistQueue}) ->
segment_queue(First, Last, Queue) ->
QLen = queue:len(Queue),
-%% ?debugFmt("First: ~p, Last: ~p, QLen: ~p", [First, Last, QLen]),
case QLen >= Last andalso QLen > 0 of
true ->
%% trim off extra tail, then trim head
@@ -422,7 +432,6 @@ segment_queue(First, Last, Queue) ->
format_entry(Entry) ->
-%% ?debugFmt("Entry: ~p~n", [Entry]),
#tm_stat_live
{
token = Entry#token_entry.token,
@@ -431,20 +440,41 @@ format_entry(Entry) ->
state = Entry#token_entry.state
}.
+update_token_queue(TokenType, TokenQueue, State=#state{table_id=TableId}) ->
+ Key = {blocked, TokenType},
+ Object = {Key, TokenQueue},
+ %% replace existing queue. Must delete existing one since we're using a bag table
+ ets:delete(TableId, Key),
+ ets:insert(TableId, Object),
+ State.
+
+token_queue(TokenType, #state{table_id=TableId}) ->
+ Key = {blocked, TokenType},
+ case ets:lookup(TableId, Key) of
+ [] -> queue:new();
+ [{Key,TokenQueue}] -> TokenQueue
+ end.
+
+all_blocked_queues(#state{table_id=TableId}) ->
+ %% there is just one queue per token type. More like a "set". The queue is in the table!
+ [Queue || {{blocked, _Token}, Queue} <- ets:match_object(TableId, {{blocked, '_'},'_'})].
+
fmt_live_tokens(Entries) ->
[format_entry(Entry) || Entry <- Entries].
-do_ps(all, Status, #state{given=Given, blocked=Blocked}) ->
+%% Status :: [given | blocked]
+do_ps(all, Status, State) ->
E1 = case lists:member(given, Status) of
- true ->
- lists:flatten([Entries || {_T, Entries} <- orddict:to_list(Given)]);
+ true ->
+ lists:flatten(all_given_entries(State));
false ->
[]
end,
E2 = case lists:member(blocked, Status) of
true ->
+ Queues = all_blocked_queues(State),
E1 ++ lists:flatten(
- [[Entry || Entry <- queue:to_list(Q)] || {_T, Q} <- orddict:to_list(Blocked)]);
+ [[Entry || Entry <- queue:to_list(Q)] || Q <- Queues]);
false ->
E1
end,
@@ -473,9 +503,8 @@ give_available_tokens(Type, NumAvailable, TokenQueue, State) ->
case queue:out(TokenQueue) of
{empty, _Q} ->
%% no more blocked entries
- State;
+ update_token_queue(Type, TokenQueue, State);
{{value, Entry}, TokenQueue2} ->
-%% ?debugFmt("Entry: ~p", [Entry]),
%% queue entry to unblock
Pid = ?pid(Entry),
Meta = ?meta(Entry),
@@ -520,7 +549,6 @@ do_sample_history(State=#state{window=Window, history=Histories}) ->
false ->
Queue2
end,
-%% ?debugFmt("Storing a sample history, Queue[~p]: ~p", [queue:len(Trimmed), Trimmed]),
EmptyWindow = orddict:new(),
State#state{window=EmptyWindow, history=Trimmed}.
@@ -529,12 +557,10 @@ update_stat_window(TokenType, Fun, Default, State=#state{window=Window}) ->
State#state{window=NewWindow}.
default_refill(Token, State) ->
-%% ?debugFmt("default_refill: ~p", [Token]),
{_Rate, Limit} = ?rate(token_info(Token, State)),
?DEFAULT_TM_STAT_HIST#tm_stat_hist{refills=1, limit=Limit}.
default_given(Token, State) ->
-%% ?debugFmt("default_given: ~p", [Token]),
{_Rate, Limit} = ?rate(token_info(Token, State)),
?DEFAULT_TM_STAT_HIST#tm_stat_hist{given=1, limit=Limit}.
@@ -561,11 +587,10 @@ increment_stat_blocked(Token, State) ->
%% Capture stats of what was given in the previous period,
%% Clear all tokens of this type from the given set,
%% Unblock blocked processes if possible.
-do_refill_tokens(Type, State=#state{given=Given}) ->
-%% ?debugFmt("refilling tokens for type: ~p", [Type]),
+do_refill_tokens(Type, State) ->
State2 = increment_stat_refills(Type, State),
- NewGiven = orddict:erase(Type, Given),
- maybe_unblock_blocked(Type, State2#state{given=NewGiven}).
+ remove_given_entries(Type, State),
+ maybe_unblock_blocked(Type, State2).
token_info(TokenType, #state{table_id=TableId}) ->
Key = {info,TokenType},
@@ -577,16 +602,6 @@ token_info(TokenType, #state{table_id=TableId}) ->
First %% try to keep going
end.
-update_token_queue(TokenType, TokenQueue, State=#state{blocked=Orddict1}) ->
- Fun = fun(_Val) -> TokenQueue end,
- State#state{blocked=orddict:update(TokenType, Fun, TokenQueue, Orddict1)}.
-
-token_queue(TokenType, #state{blocked=Orddict1}) ->
- case orddict:find(TokenType, Orddict1) of
- error -> queue:new();
- {ok, TokenQueue} -> TokenQueue
- end.
-
enable_token(TokenType, State) ->
update_token_enabled(TokenType, true, State).
@@ -614,20 +629,28 @@ update_token_info(TokenType, Fun, Default, State=#state{table_id=TableId}) ->
ets:insert(TableId, {Key, NewInfo}),
State.
-tokens_given(Type, #state{given=AllGiven}) ->
- case orddict:find(Type, AllGiven) of
- error -> [];
- {ok, Given} -> Given
- end.
+all_given_entries(#state{table_id=TableId}) ->
+ %% multiple entries per token type, i.e. uses the "bag"
+ [Entry || {{given, _Token}, Entry} <- ets:match_object(TableId, {{given, '_'},'_'})].
+
+tokens_given(Type, #state{table_id=TableId}) ->
+ Key = {given, Type},
+ [Given || {_K,Given} <- ets:lookup(TableId, Key)].
+
+add_given_entry(TokenType, Entry, TableId) ->
+ Key = {given, TokenType},
+ ets:insert(TableId, {Key, Entry}).
+
+remove_given_entries(TokenType, #state{table_id=TableId}) ->
+ Key = {given, TokenType},
+ ets:delete(TableId, Key).
-%% Add a token of type to our given set and remove it from blocked set if present
-give_token(Type, Pid, Meta, State=#state{given=Given, blocked=Blocked}) ->
+%% Add a token of type to our given set and remove from blocked set
+give_token(Type, Pid, Meta, State=#state{table_id=TableId}) ->
Entry = ?TOKEN_ENTRY(Type, Pid, Meta, undefined, given),
- NewGiven = orddict:append(Type, Entry, Given),
- NewBlocked = orddict:erase(Type, Blocked),
+ add_given_entry(Type, Entry, TableId),
%% update given stats
- State2 = increment_stat_given(Type, State),
- State2#state{given=NewGiven, blocked=NewBlocked}.
+ increment_stat_given(Type, State).
%% Put a token request on the blocked queue. We'll reply later when a token
%% becomes available
@@ -11,7 +11,7 @@
token_mgr_test_() ->
{timeout, 60000, %% Seconds to finish all of the tests
{setup, fun() ->
- riak_core_table_manager:start_link([{?TM_ETS_TABLE, [private, set]}]),
+ riak_core_table_manager:start_link([{?TM_ETS_TABLE, ?TM_ETS_OPTS}]),
?TOK_MGR:start_link(1) %% setup with history window to 1 seconds
end,
fun(_) -> ok end, %% cleanup
@@ -126,7 +126,6 @@ token_mgr_test_() ->
fun() ->
%%
Hist = ?TOK_MGR:head(),
-%% ?debugFmt("History: ~p", [Hist]),
?assertNot([] == Hist)
end},

0 comments on commit 3cf55bc

Please sign in to comment.