Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

* lock & type querying

* kill processes holding locks when lowering concurrency limit or disabling
  • Loading branch information...
commit 7033a3bf1a876aa82c14dcfd063541bae25893c4 1 parent b21903f
@jrwest jrwest authored
Showing with 177 additions and 47 deletions.
  1. +177 −47 src/riak_core_bg_manager.erl
View
224 src/riak_core_bg_manager.erl
@@ -28,27 +28,38 @@
get_lock/3,
lock_count/0,
lock_count/1,
+ lock_types/0,
+ all_locks/0,
+ query_locks/1,
+ enable/0,
enable/1,
+ disable/0,
disable/1,
+ disable/2,
concurrency_limit/1,
- set_concurrency_limit/2]).
+ set_concurrency_limit/2,
+ set_concurrency_limit/3,
+ concurrency_limit_reached/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {held :: ordict:orddict(),
- info :: orddict:orddict()}).
+-record(state, {held :: ordict:orddict(),
+ info :: orddict:orddict(),
+ enabled :: boolean()}).
-record(lock_info, {concurrency_limit :: non_neg_integer(),
enabled :: boolean()}).
-define(SERVER, ?MODULE).
--define(DEFAULT_CONCURRENCY, 2).
+-define(DEFAULT_CONCURRENCY, 0). %% DO NOT CHANGE. DEFAULT SET TO 0 TO ENFORCE "REGISTRATION"
-define(limit(X), (X)#lock_info.concurrency_limit).
-define(enabled(X), (X)#lock_info.enabled).
-define(DEFAULT_LOCK_INFO, #lock_info{enabled=true, concurrency_limit=?DEFAULT_CONCURRENCY}).
+-type concurrency_limit() :: non_neg_integer() | infinity.
+
%%%===================================================================
%%% API
%%%===================================================================
@@ -65,17 +76,19 @@ get_lock(Type) ->
get_lock(Type, self()).
%% @doc Acquire a concurrency lock of the given type, if available,
-%% and associate the lock with the provided pid.
--spec get_lock(any(), pid()) -> ok | max_concurrency.
-get_lock(Type, Pid) ->
- get_lock(Type, Pid, []).
+%% and associate the lock with the provided pid or metadata. If metadata
+%% is provided the lock is associated with the calling process
+-spec get_lock(any(), pid() | [{atom(), any()}]) -> ok | max_concurrency.
+get_lock(Type, Pid) when is_pid(Pid) ->
+ get_lock(Type, Pid, []);
+get_lock(Type, Opts) when is_list(Opts)->
+ get_lock(Type, self(), Opts).
%% @doc Acquire a concurrency lock of the given type, if available,
-%% and associate the lock with the provided pid.
-%% TODO: info on options, but there are none right now
+%% and associate the lock with the provided pid and metadata.
-spec get_lock(any(), pid(), [{atom(), any()}]) -> ok | max_concurrency.
-get_lock(Type, Pid, Opts) ->
- gen_server:call(?MODULE, {get_lock, Type, Pid, Opts}, infinity).
+get_lock(Type, Pid, Info) ->
+ gen_server:call(?MODULE, {get_lock, Type, Pid, Info}, infinity).
%% @doc Return the current concurrency count for all lock types
-spec lock_count() -> integer().
@@ -87,29 +100,80 @@ lock_count() ->
lock_count(Type) ->
gen_server:call(?MODULE, {lock_count, Type}, infinity).
+%% @doc Return list of lock types and associated info. To be returned in this list
+%% a lock type must have had its concurrency set or have been enabled/disabled.
+-spec lock_types() -> [{any(), boolean(), concurrency_limit()}].
+lock_types() ->
+ gen_server:call(?MODULE, lock_types, infinity).
+
+%% @doc Returns all currently held locks
+-spec all_locks() -> [{any(), pid(), reference(), [{atom(), any()}]}].
+all_locks() ->
+ query_locks([]).
+
+%% @doc Queries the currently held locks returning any locks that match the given criteria.
+%% If no criteria is present then all held locks are returned. The query is a proplist of
+%% 2-tuples. The keys 'pid' and 'type', have special meaning. If they are keys in the
+%% query proplists, only locks matching the corresponding pid or lock type are
+%% returned. All other pairs are compared for equality against the proplist passed as the third
+%% argument to get_lock/3. The returned value is a list of 4-tuples. The first element
+%% is the lock type; the second, the pid holding the lock; the third, the lock refernce,
+%% and the fourth is the metadata passed to get_lock/3.
+-spec query_locks([{atom(), any()}]) -> [{any(), pid(), reference(), [{atom(), any()}]}].
+query_locks(Query) ->
+ gen_server:call(?MODULE, {query_locks, Query}, infinity).
+
+%% @doc Enable handing out of any locks
+-spec enable() -> ok.
+enable() ->
+ gen_server:cast(?MODULE, enable).
+
+%% @doc Disable handing out of any locks
+-spec disable() -> ok.
+disable() ->
+ gen_server:cast(?MODULE, disable).
+
%% @doc Enable handing out of locks of the given type.
-spec enable(any()) -> ok.
enable(Type) ->
- %% TODO: should this be a cast?
- gen_server:call(?MODULE, {enable, Type}, infinity).
+ gen_server:cast(?MODULE, {enable, Type}).
+
-%% @doc Disable handing out of locks of the given type.
+%% @doc same as `disable(Type, false)'
-spec disable(any()) -> ok.
disable(Type) ->
- %% TODO: should this be a cast?
- gen_server:call(?MODULE, {disable, Type}, infinity).
+ disable(Type, false).
+
+%% @doc Disable handing out of locks of the given type. If `Kill' is `true' any processes
+%% holding locks for the given type will be killed with reaseon `max_concurrency'
+-spec disable(any(), boolean()) -> ok.
+disable(Type, Kill) ->
+ gen_server:cast(?MODULE, {disable, Type, Kill}).
%% @doc Get the current maximum concurrency for the given lock type.
--spec concurrency_limit(any()) -> non_neg_integer().
+-spec concurrency_limit(any()) -> concurrency_limit().
concurrency_limit(Type) ->
gen_server:call(?MODULE, {concurrency_limit, Type}, infinity).
-%% @doc Set a new maximum concurrency for the given lock type;
-%% and return the previous maximum or default.
--spec set_concurrency_limit(any(), non_neg_integer()) -> non_neg_integer().
+%% @doc same as `set_concurrency_limit(Type, Limit, false)'
+-spec set_concurrency_limit(any(), concurrency_limit()) -> concurrency_limit().
set_concurrency_limit(Type, Limit) ->
- gen_server:call(?MODULE, {set_concurrency_limit, Type, Limit}, infinity).
-
+ set_concurrency_limit(Type, Limit, false).
+
+%% @doc Set a new maximum concurrency for the given lock type and return
+%% the previous maximum or default. If more locks are held than the new
+%% limit how they are handled depends on the value of `Kill'. If `true',
+%% then the extra locks are released by killing processes with reason `max_concurrency'.
+%% If `false', then the processes holding the extra locks are aloud to do so until they
+%% are released.
+-spec set_concurrency_limit(any(), concurrency_limit(), boolean()) -> concurrency_limit().
+set_concurrency_limit(Type, Limit, Kill) ->
+ gen_server:call(?MODULE, {set_concurrency_limit, Type, Limit, Kill}, infinity).
+
+%% @doc Returns true if the number of held locks is at the limit for the given lock type
+-spec concurrency_limit_reached(any()) -> boolean().
+concurrency_limit_reached(Type) ->
+ gen_server:call(?MODULE, {lock_limit_reached, Type}, infinity).
%%%===================================================================
%%% gen_server callbacks
@@ -122,7 +186,9 @@ set_concurrency_limit(Type, Limit) ->
ignore |
{stop, term()}.
init([]) ->
- {ok, #state{info=orddict:new(), held=orddict:new()}}.
+ {ok, #state{info=orddict:new(),
+ held=orddict:new(),
+ enabled=true}}.
%% @private
%% @doc Handling call messages
@@ -133,8 +199,8 @@ init([]) ->
{noreply, #state{}, non_neg_integer()} |
{stop, term(), term(), #state{}} |
{stop, term(), #state{}}.
-handle_call({get_lock, LockType, Pid, Opts}, _From, State) ->
- {Reply, State2} = try_lock(LockType, Pid, Opts, State),
+handle_call({get_lock, LockType, Pid, Info}, _From, State) ->
+ {Reply, State2} = try_lock(LockType, Pid, Info, State),
{reply, Reply, State2};
handle_call({lock_count, LockType}, _From, State) ->
{reply, held_count(LockType, State), State};
@@ -142,27 +208,44 @@ handle_call(lock_count, _From, State=#state{held=Locks}) ->
Count = orddict:fold(fun(_, Held, Total) -> Total + length(Held) end,
0, Locks),
{reply, Count, State};
-handle_call({enable, LockType}, _From, State) ->
- State2 = enable_lock(LockType, State),
- {reply, ok, State2};
-handle_call({disable, LockType}, _From, State) ->
- State2 = disable_lock(LockType, State),
- {reply, ok, State2};
+handle_call({lock_limit_reached, LockType}, _From, State) ->
+ HeldCount = held_count(LockType, State),
+ Limit = ?limit(lock_info(LockType, State)),
+ {reply, HeldCount >= Limit, State};
+handle_call(lock_types, _From, State=#state{info=Info}) ->
+ Types = [{Type, ?enabled(LI), ?limit(LI)} || {Type, LI} <- orddict:to_list(Info)],
+ {reply, Types, State};
+handle_call({query_locks, Query}, _From, State) ->
+ Results = query_locks(Query, State),
+ {reply, Results, State};
handle_call({concurrency_limit, LockType}, _From, State) ->
Limit = ?limit(lock_info(LockType, State)),
{reply, Limit, State};
-handle_call({set_concurrency_limit, LockType, Limit}, _From, State) ->
+handle_call({set_concurrency_limit, LockType, Limit, Kill}, _From, State) ->
OldLimit = ?limit(lock_info(LockType, State)),
- NewState = update_concurrency_limit(LockType, Limit, State),
- {reply, OldLimit, NewState}.
+ State2 = update_concurrency_limit(LockType, Limit, State),
+ maybe_honor_limit(Kill, LockType, Limit, State2),
+ {reply, OldLimit, State2}.
+
%% @private
%% @doc Handling cast messages
-spec handle_cast(term(), #state{}) -> {noreply, #state{}} |
{noreply, #state{}, non_neg_integer()} |
{stop, term(), #state{}}.
-handle_cast(_Msg, State) ->
- {noreply, State}.
+handle_cast({enable, LockType}, State) ->
+ State2 = enable_lock(LockType, State),
+ {noreply, State2};
+handle_cast({disable, LockType, Kill}, State) ->
+ State2 = disable_lock(LockType, State),
+ maybe_honor_limit(Kill, LockType, 0, State),
+ {noreply, State2};
+handle_cast(enable, State) ->
+ State2 = State#state{enabled=true},
+ {noreply, State2};
+handle_cast(disable, State) ->
+ State2 = State#state{enabled=false},
+ {noreply, State2}.
%% @private
%% @doc Handling all non call/cast messages
@@ -194,18 +277,49 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
-try_lock(LockType, Pid, Opts, State) ->
+query_locks(FullQuery, State=#state{held=Locks}) ->
+ Base = case proplists:get_value(type, FullQuery) of
+ undefined -> Locks;
+ LockType -> orddict:from_list([{LockType, held_locks(LockType, State)}])
+ end,
+ Query = proplists:delete(type, FullQuery),
+ Matching = orddict:fold(fun(Type, Held, Matching) ->
+ [matching_locks(Type, Held, Query) | Matching]
+ end,
+ [], Base),
+ lists:flatten(Matching).
+
+matching_locks(Type, Held, FullQuery) ->
+ QueryPid = proplists:get_value(pid, FullQuery),
+ Query = proplists:delete(pid, FullQuery),
+ [{Type, Pid, Ref, Info} || {Pid, Ref, Info} <- Held,
+ matches_pid(QueryPid, Pid),
+ matches_query(Info, Query)].
+
+matches_pid(undefined, _) ->
+ true;
+matches_pid(QueryPid, QueryPid) ->
+ true;
+matches_pid(_, _) ->
+ false.
+
+matches_query(Info, Query) ->
+ SortedInfo = lists:ukeysort(1, Info),
+ SortedQuery = lists:ukeysort(1, Query),
+ (SortedQuery -- SortedInfo) =:= [].
+
+try_lock(LockType, Pid, Info, State=#state{enabled=GlobalEnabled}) ->
LockInfo = lock_info(LockType, State),
- Enabled = ?enabled(LockInfo),
+ Enabled = GlobalEnabled andalso ?enabled(LockInfo),
Limit = ?limit(LockInfo),
Held = held_count(LockType, State),
- try_lock(Enabled andalso not (Held >= Limit), LockType, Pid, Opts, State).
+ try_lock(Enabled andalso not (Held >= Limit), LockType, Pid, Info, State).
-try_lock(false, _LockType, _Pid, _Opts, State) ->
+try_lock(false, _LockType, _Pid, _Info, State) ->
{max_concurrency, State};
-try_lock(true, LockType, Pid, _Opts, State=#state{held=Locks}) ->
+try_lock(true, LockType, Pid, Info, State=#state{held=Locks}) ->
Ref = monitor(process, Pid),
- NewLocks = orddict:append(LockType, {Pid, Ref}, Locks),
+ NewLocks = orddict:append(LockType, {Pid, Ref, Info}, Locks),
{ok, State#state{held=NewLocks}}.
release_lock(Ref, State=#state{held=Locks}) ->
@@ -216,12 +330,28 @@ release_lock(Ref, State=#state{held=Locks}) ->
release_lock(Ref, _LockType, Held) ->
lists:keydelete(Ref, 2, Held).
-
-held_count(LockType, #state{held=Locks}) ->
+maybe_honor_limit(true, LockType, Limit, State) ->
+ Held = held_locks(LockType, State),
+ case Limit < length(Held) of
+ true ->
+ {_Keep, Discard} = lists:split(Limit, Held),
+ %% killing of processes will generate down messages and release the locks
+ [erlang:exit(Pid, max_concurrency) || {Pid, _, _} <- Discard],
+ ok;
+ false ->
+ ok
+ end;
+maybe_honor_limit(false, _LockType, _Limit, _State) ->
+ ok.
+
+held_count(LockType, State) ->
+ length(held_locks(LockType, State)).
+
+held_locks(LockType, #state{held=Locks}) ->
case orddict:find(LockType, Locks) of
- error -> 0;
- {ok, Held} -> length(Held)
+ error -> [];
+ {ok, Held} -> Held
end.
enable_lock(LockType, State) ->
Please sign in to comment.
Something went wrong with that request. Please try again.