Permalink
Browse files

Add table, lock and token managers to core's supervision tree.

  • Loading branch information...
1 parent 7a2cec3 commit 68c3abef043b33c6acc4c30117bcdb4177f4e3cf @buddhisthead buddhisthead committed Sep 6, 2013
View
@@ -89,7 +89,8 @@
vclock,
riak_core_bg_manager,
riak_core_token_manager,
- riak_core_table_manager
+ riak_core_table_manager,
+ riak_core_bg_manager_sup
]},
{registered, []},
{included_applications, [folsom]},
@@ -0,0 +1,23 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+-define(LM_ETS_TABLE, lock_mgr_table). %% name of private lock manager ETS table
+-define(LM_ETS_OPTS, [private, bag]). %% creation time properties of lock manager ETS table
+
+-type concurrency_limit() :: non_neg_integer() | infinity.
@@ -28,6 +28,7 @@
enable/0,
disable/0,
%% Locks
+ %% TODO: refactor the lock implementation to another module ala tokens
get_lock/1,
get_lock/2,
get_lock/3,
@@ -72,14 +73,12 @@
terminate/2, code_change/3]).
-record(state, {table_id:: ets:tid(), %% TableID of ?LM_ETS_TABLE
- held :: ordict:orddict(),
enabled :: boolean()}).
-record(lock_info, {concurrency_limit :: non_neg_integer(),
enabled :: boolean()}).
-define(SERVER, ?MODULE).
--define(TOKEN_MODULE, riak_core_token_manager).
-define(DEFAULT_CONCURRENCY, 0). %% DO NOT CHANGE. DEFAULT SET TO 0 TO ENFORCE "REGISTRATION"
-define(limit(X), (X)#lock_info.concurrency_limit).
-define(enabled(X), (X)#lock_info.enabled).
@@ -96,7 +95,6 @@
%% @doc Starts the server
-spec start_link() -> {ok, pid()} | ignore | {error, term}.
start_link() ->
- gen_server:start_link({local, ?TOKEN_MODULE}, ?TOKEN_MODULE, [], []),
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%% @doc Enable handing out of all background locks and tokens
@@ -314,8 +312,10 @@ concurrency_limit_reached(Type) ->
ignore |
{stop, term()}.
init([]) ->
- {ok, #state{
- held=orddict:new(),
+ lager:debug("Background Manager starting up."),
+ %% claiming the table will result in a handle_info('ETS-TRANSFER', ...) message.
+ ok = riak_core_table_manager:claim_table(?LM_ETS_TABLE),
+ {ok, #state{table_id=undefined,
enabled=true}}.
%% @private
@@ -332,9 +332,8 @@ handle_call({get_lock, LockType, Pid, Info}, _From, State) ->
{reply, Reply, State2};
handle_call({lock_count, LockType}, _From, State) ->
{reply, held_count(LockType, State), State};
-handle_call(lock_count, _From, State=#state{held=Locks}) ->
- Count = orddict:fold(fun(_, Held, Total) -> Total + length(Held) end,
- 0, Locks),
+handle_call(lock_count, _From, State) ->
+ Count = length(held_locks(State)),
{reply, Count, State};
handle_call({lock_limit_reached, LockType}, _From, State) ->
HeldCount = held_count(LockType, State),
@@ -411,7 +410,8 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
-query_locks(FullQuery, State=#state{held=Locks}) ->
+query_locks(FullQuery, State) ->
+ Locks = held_locks(State),
Base = case proplists:get_value(type, FullQuery) of
undefined -> Locks;
LockType -> orddict:from_list([{LockType, held_locks(LockType, State)}])
@@ -451,19 +451,22 @@ try_lock(LockType, Pid, Info, State=#state{enabled=GlobalEnabled}) ->
try_lock(false, _LockType, _Pid, _Info, State) ->
{max_concurrency, State};
-try_lock(true, LockType, Pid, Info, State=#state{held=Locks}) ->
+try_lock(true, LockType, Pid, Info, State) ->
Ref = monitor(process, Pid),
- NewLocks = orddict:append(LockType, {Pid, Ref, Info}, Locks),
- {ok, State#state{held=NewLocks}}.
+ State2 = add_lock(LockType, {Pid, Ref, Info}, State),
+ {ok, State2}.
-release_lock(Ref, State=#state{held=Locks}) ->
- %% TODO: this makes me (jordan) :(
- Released = orddict:map(fun(Type, Held) -> release_lock(Ref, Type, Held) end,
- Locks),
- State#state{held=Released}.
+add_lock(LockType, Lock, State=#state{table_id=TableId}) ->
+ Key = {held, LockType},
+ ets:insert(TableId, {Key, Lock}),
+ State.
-release_lock(Ref, _LockType, Held) ->
- lists:keydelete(Ref, 2, Held).
+release_lock(Ref, State=#state{table_id=TableId}) ->
+ %% There should only be one instance of the object, but we'll zap all that match.
+ Pattern = {{held, '_'}, {'_', Ref, '_'}},
+ Matches = [Lock || {{held, _Type},Lock} <- ets:match_object(TableId, Pattern)],
+ [ets:delete_object(TableId, Obj) || Obj <- Matches],
+ State.
maybe_honor_limit(true, LockType, Limit, State) ->
Held = held_locks(LockType, State),
@@ -482,11 +485,11 @@ maybe_honor_limit(false, _LockType, _Limit, _State) ->
held_count(LockType, State) ->
length(held_locks(LockType, State)).
-held_locks(LockType, #state{held=Locks}) ->
- case orddict:find(LockType, Locks) of
- error -> [];
- {ok, Held} -> Held
- end.
+held_locks(#state{table_id=TableId}) ->
+ [Lock || {{held, _Type},Lock} <- ets:match_object(TableId, {{held, '_'},'_'})].
+
+held_locks(LockType, #state{table_id=TableId}) ->
+ [Lock || {{held, _Type},Lock} <- ets:match_object(TableId, {{held, LockType},'_'})].
enable_lock(LockType, State) ->
update_lock_enabled(LockType, true, State).
@@ -0,0 +1,47 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2013 Basho Technologies, Inc.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(riak_core_bg_manager_sup).
+-behaviour(supervisor).
+
+%% beahvior functions
+-export([start_link/0,
+ init/1
+ ]).
+
+-define(CHILD(I,Type), {I,{I,start_link,[]},permanent,5000,Type,[I]}).
+
+%% begins the supervisor, init/1 will be called
+start_link () ->
+ supervisor:start_link({local,?MODULE},?MODULE,[]).
+
+%% Supervisor of all background manager processes
+%%
+%% Lock manager (currently in bg_manager) and Token manager both use ETS
+%% tables to manage persistent state. The table_manager is started from
+%% our parent supervisor (riak_core_sup) and creates the tables used by
+%% background managers. So, riak_core_table_manager needs to be started
+%% before this supervisor.
+
+%% @private
+init ([]) ->
+ {ok,{{one_for_all,10,10},
+ [?CHILD(riak_core_bg_manager, worker),
+ ?CHILD(riak_core_token_manager, worker)]}}.
View
@@ -2,7 +2,7 @@
%%
%% riak_core: Core Riak Application
%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
+%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
@@ -24,15 +24,27 @@
-behaviour(supervisor).
+-include("riak_core_bg_manager.hrl").
+-include("riak_core_token_manager.hrl").
+
%% API
-export([start_link/0, stop_webs/0, restart_webs/0]).
%% Supervisor callbacks
-export([init/1]).
%% Helper macro for declaring children of supervisor
--define(CHILD(I, Type, Timeout), {I, {I, start_link, []}, permanent, Timeout, Type, [I]}).
--define(CHILD(I, Type), ?CHILD(I, Type, 5000)).
+-define(DEFAULT_TIMEOUT, 5000).
+-define(CHILD(I, Type, Timeout, Args), {I, {I, start_link, Args}, permanent, Timeout, Type, [I]}).
+-define(CHILD(I, Type, Timeout), ?CHILD(I, Type, Timeout, [])).
+-define(CHILD(I, Type), ?CHILD(I, Type, ?DEFAULT_TIMEOUT)).
+
+%% ETS tables to be created and maintained by riak_core_table_manager, which is not linked
+%% to any processes except this supervisor. Please keep it that way so tables don't get lost
+%% when their user processes crash. Implement ETS-TRANSFER handler for user processes.
+-define(TBL_MGR_ARGS, [{?TM_ETS_TABLE, ?TM_ETS_OPTS},
+ {?LM_ETS_TABLE, ?LM_ETS_OPTS}
+ ]).
%% ===================================================================
%% API functions
@@ -56,7 +68,8 @@ restart_webs() ->
init([]) ->
Children = lists:flatten(
[?CHILD(riak_core_sysmon_minder, worker),
- ?CHILD(riak_core_bg_manager, worker),
+ ?CHILD(riak_core_table_manager, worker, ?DEFAULT_TIMEOUT, [?TBL_MGR_ARGS]),
+ ?CHILD(riak_core_bg_manager_sup, supervisor),
?CHILD(riak_core_vnode_sup, supervisor, 305000),
?CHILD(riak_core_eventhandler_sup, supervisor),
?CHILD(riak_core_ring_events, worker),
@@ -85,6 +85,7 @@ claim_table(TableName) ->
%% Table specs are provided by the process that creates this table manager,
%% presumably a supervisor such as riak_core_sup.
init([TableSpecs]) ->
+ lager:debug("Table Manager starting up with tables: ~p", [TableSpecs]),
Tables = lists:foldl(fun(Spec, TT) -> create_table(Spec, TT) end, [], TableSpecs),
{ok, #state{tables=Tables}}.
@@ -241,12 +241,16 @@ start(Interval) ->
%%%% Gen Server %%%%%%%
%% @private
-%% @doc Initializes the server with a history window interval in seconds
--spec init([tm_period()]) -> {ok, #state{}} |
+%% @doc Initializes the server with a history window interval in seconds,
+%% defaults to 1 minute if empty list supplied.
+-spec init([tm_period()] | []) -> {ok, #state{}} |
{ok, #state{}, non_neg_integer() | infinity} |
ignore |
{stop, term()}.
+init([]) ->
+ init([?DEFAULT_TM_SAMPLE_WINDOW]);
init([Interval]) ->
+ lager:debug("Token Manager starting up."),
%% claiming the table will result in a handle_info('ETS-TRANSFER', ...) message.
ok = riak_core_table_manager:claim_table(?TM_ETS_TABLE),
State = #state{table_id=undefined, %% resolved in the ETS-TRANSFER handler
@@ -340,7 +344,7 @@ handle_info({'ETS-TRANSFER', TableId, Pid, _Data}, State) ->
reschedule_token_refills(State2),
{noreply, State2};
handle_info({'DOWN', Ref, _, _, _}, State) ->
- lager:info("Linked process died with ref ~p: ", [Ref]),
+ lager:debug("Linked process died with ref ~p: ", [Ref]),
{noreply, State};
handle_info(Info, State) ->
lager:warning("Unhandled info: ~p", [Info]),

0 comments on commit 68c3abe

Please sign in to comment.