Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

* added enable/disable

* added (set_)concurrency_limit functions
* renamed and reorganized some things
  • Loading branch information...
commit b21903f95d2c788c2c92bcdfeb6a268710e5bf90 1 parent dc3c631
@jrwest jrwest authored
Showing with 75 additions and 28 deletions.
  1. +75 −28 src/riak_core_bg_manager.erl
View
103 src/riak_core_bg_manager.erl
@@ -30,20 +30,24 @@
lock_count/1,
enable/1,
disable/1,
- concurrency/1,
- set_concurrency/2
- ]).
+ concurrency_limit/1,
+ set_concurrency_limit/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--define(DEFAULT_CONCURRENCY, 2).
+-record(state, {held :: ordict:orddict(),
+ info :: orddict:orddict()}).
--define(SERVER, ?MODULE).
+-record(lock_info, {concurrency_limit :: non_neg_integer(),
+ enabled :: boolean()}).
--record(state, { locks :: ordict:orddict(),
- limits :: orddict:orddict() }).
+-define(SERVER, ?MODULE).
+-define(DEFAULT_CONCURRENCY, 2).
+-define(limit(X), (X)#lock_info.concurrency_limit).
+-define(enabled(X), (X)#lock_info.enabled).
+-define(DEFAULT_LOCK_INFO, #lock_info{enabled=true, concurrency_limit=?DEFAULT_CONCURRENCY}).
%%%===================================================================
%%% API
@@ -86,23 +90,25 @@ lock_count(Type) ->
%% @doc Enable handing out of locks of the given type.
-spec enable(any()) -> ok.
enable(Type) ->
+ %% TODO: should this be a cast?

Agreed. This should be a cast.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
gen_server:call(?MODULE, {enable, Type}, infinity).
%% @doc Disable handing out of locks of the given type.
-spec disable(any()) -> ok.
disable(Type) ->
+ %% TODO: should this be a cast?

Yes. A cast.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
gen_server:call(?MODULE, {disable, Type}, infinity).
%% @doc Get the current maximum concurrency for the given lock type.
--spec concurrency(any()) -> integer().
-concurrency(Type) ->
- gen_server:call(?MODULE, {get_max_concurrency, Type}, infinity).
+-spec concurrency_limit(any()) -> non_neg_integer().
+concurrency_limit(Type) ->
+ gen_server:call(?MODULE, {concurrency_limit, Type}, infinity).
%% @doc Set a new maximum concurrency for the given lock type;
%% and return the previous maximum or default.
--spec set_concurrency(any(), integer()) -> integer().
-set_concurrency(Type, Max) ->
- gen_server:call(?MODULE, {set_max_concurrency, Type, Max}, infinity).
+-spec set_concurrency_limit(any(), non_neg_integer()) -> non_neg_integer().
+set_concurrency_limit(Type, Limit) ->
+ gen_server:call(?MODULE, {set_concurrency_limit, Type, Limit}, infinity).
%%%===================================================================
@@ -116,7 +122,7 @@ set_concurrency(Type, Max) ->
ignore |
{stop, term()}.
init([]) ->
- {ok, #state{limits=orddict:new(), locks=orddict:new()}}.
+ {ok, #state{info=orddict:new(), held=orddict:new()}}.
%% @private
%% @doc Handling call messages
@@ -132,10 +138,23 @@ handle_call({get_lock, LockType, Pid, Opts}, _From, State) ->
{reply, Reply, State2};
handle_call({lock_count, LockType}, _From, State) ->
{reply, held_count(LockType, State), State};
-handle_call(lock_count, _From, State=#state{locks=Locks}) ->
+handle_call(lock_count, _From, State=#state{held=Locks}) ->
Count = orddict:fold(fun(_, Held, Total) -> Total + length(Held) end,
0, Locks),
- {reply, Count, State}.
+ {reply, Count, State};
+handle_call({enable, LockType}, _From, State) ->
+ State2 = enable_lock(LockType, State),
+ {reply, ok, State2};
+handle_call({disable, LockType}, _From, State) ->
+ State2 = disable_lock(LockType, State),
+ {reply, ok, State2};
+handle_call({concurrency_limit, LockType}, _From, State) ->
+ Limit = ?limit(lock_info(LockType, State)),
+ {reply, Limit, State};
+handle_call({set_concurrency_limit, LockType, Limit}, _From, State) ->
+ OldLimit = ?limit(lock_info(LockType, State)),
+ NewState = update_concurrency_limit(LockType, Limit, State),
+ {reply, OldLimit, NewState}.
%% @private
%% @doc Handling cast messages
@@ -176,35 +195,63 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%===================================================================
try_lock(LockType, Pid, Opts, State) ->
- Limit = lock_limit(LockType, State),
+ LockInfo = lock_info(LockType, State),
+ Enabled = ?enabled(LockInfo),
+ Limit = ?limit(LockInfo),
Held = held_count(LockType, State),
- try_lock(Held >= Limit, LockType, Pid, Opts, State).
+ try_lock(Enabled andalso not (Held >= Limit), LockType, Pid, Opts, State).
-try_lock(true, _LockType, _Pid, _Opts, State) ->
+try_lock(false, _LockType, _Pid, _Opts, State) ->
{max_concurrency, State};
-try_lock(false, LockType, Pid, _Opts, State=#state{locks=Locks}) ->
+try_lock(true, LockType, Pid, _Opts, State=#state{held=Locks}) ->
Ref = monitor(process, Pid),
NewLocks = orddict:append(LockType, {Pid, Ref}, Locks),
- {ok, State#state{locks=NewLocks}}.
+ {ok, State#state{held=NewLocks}}.
-release_lock(Ref, State=#state{locks=Locks}) ->
+release_lock(Ref, State=#state{held=Locks}) ->
%% TODO: this makes me (jordan) :(
Released = orddict:map(fun(Type, Held) -> release_lock(Ref, Type, Held) end,
Locks),
- State#state{locks=Released}.
+ State#state{held=Released}.
release_lock(Ref, _LockType, Held) ->
lists:keydelete(Ref, 2, Held).
-held_count(LockType, #state{locks=Locks}) ->
+held_count(LockType, #state{held=Locks}) ->
case orddict:find(LockType, Locks) of
error -> 0;
{ok, Held} -> length(Held)
end.
-lock_limit(LockType, #state{limits=Limits}) ->
- case orddict:find(LockType, Limits) of
- error -> ?DEFAULT_CONCURRENCY;
- {ok, Limit} -> Limit
+enable_lock(LockType, State) ->
+ update_lock_enabled(LockType, true, State).
+
+disable_lock(LockType, State) ->
+ %% TODO: should we also kill all processes that hold the lock/release all locks?

I think not. Disable should allow existing things to complete.
Maybe another API that would terminate processes is needed, but seems harsh.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ update_lock_enabled(LockType, false, State).
+
+update_lock_enabled(LockType, Value, State) ->
+ update_lock_info(LockType,
+ fun(LockInfo) -> LockInfo#lock_info{enabled=Value} end,
+ ?DEFAULT_LOCK_INFO#lock_info{enabled=Value},
+ State).
+
+update_concurrency_limit(LockType, Limit, State) ->
+ %% TODO: if Limit < Number of Currently held locks, should we kill # Held - Limit

Interesting. But again, it seems harsh to be killing processes. It would be hard to guess which ones to kill.
Probably would want a different API to force the limit to be honored, which could kill processes. Probably more useful once we have some idea of "progress" and "how long until it's done?" so that we don't kill useful things that are nearing completion. It would be easy to thrash.

@jrwest
jrwest added a note

hmm, so this is how it works for handoff manager. perhaps an lock option passed to disable? https://github.com/basho/riak_core/blob/master/src/riak_core_handoff_manager.erl#L206-L220

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ %% processes and release their locks
+ update_lock_info(LockType,
+ fun(LockInfo) -> LockInfo#lock_info{concurrency_limit=Limit} end,
+ ?DEFAULT_LOCK_INFO#lock_info{concurrency_limit=Limit},
+ State).
+
+update_lock_info(LockType, Fun, Default, State=#state{info=Info}) ->
+ NewInfo = orddict:update(LockType, Fun, Default, Info),
+ State#state{info=NewInfo}.
+
+
+lock_info(LockType, #state{info=Info}) ->
+ case orddict:find(LockType, Info) of
+ error -> ?DEFAULT_LOCK_INFO;
+ {ok, LockInfo} -> LockInfo
end.
Please sign in to comment.
Something went wrong with that request. Please try again.