Skip to content

Commit

Permalink
migrate handoff manager to bg manager
Browse files Browse the repository at this point in the history
  • Loading branch information
jrwest committed Jul 17, 2013
1 parent 230c914 commit 6368585
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 42 deletions.
46 changes: 9 additions & 37 deletions src/riak_core_handoff_manager.erl
Expand Up @@ -72,6 +72,7 @@ start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

init([]) ->
riak_core_bg_manager:set_concurrency_limit(handoff, ?HANDOFF_CONCURRENCY),
{ok, #state{excl=sets:new(), handoffs=[]}}.

-spec add_outbound(ho_type(),atom(),integer(),term(),pid(),[{atom(),term()}]) ->
Expand Down Expand Up @@ -128,10 +129,11 @@ status_update(ModSrcTgt, Stats) ->
gen_server:cast(?MODULE, {status_update, ModSrcTgt, Stats}).

set_concurrency(Limit) ->
gen_server:call(?MODULE,{set_concurrency,Limit}, infinity).
riak_core_bg_manager:set_concurrency_limit(handoff, Limit, true),
ok.

get_concurrency() ->
gen_server:call(?MODULE, get_concurrency, infinity).
riak_core_bg_manager:concurrency_limit(handoff).

%% @doc Kill the transfer of `ModSrcTarget' with `Reason'.
-spec kill_xfer(node(), tuple(), any()) -> ok.
Expand Down Expand Up @@ -201,27 +203,7 @@ handle_call({xfer_status, Xfer}, _From, State=#state{handoffs=HS}) ->

handle_call({status, Filter}, _From, State=#state{handoffs=HS}) ->
Status = lists:filter(filter(Filter), [build_status(HO) || HO <- HS]),
{reply, Status, State};

handle_call({set_concurrency,Limit},_From,State=#state{handoffs=HS}) ->
application:set_env(riak_core,handoff_concurrency,Limit),
case Limit < erlang:length(HS) of
true ->
%% Note: we don't update the state with the handoffs that we're
%% keeping because we'll still get the 'DOWN' messages with
%% a reason of 'max_concurrency' and we want to be able to do
%% something with that if necessary.
{_Keep,Discard}=lists:split(Limit,HS),
[erlang:exit(Pid,max_concurrency) ||
#handoff_status{transport_pid=Pid} <- Discard],
{reply, ok, State};
false ->
{reply, ok, State}
end;

handle_call(get_concurrency, _From, State) ->
Concurrency = get_concurrency_limit(),
{reply, Concurrency, State}.
{reply, Status, State}.

handle_cast({del_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) ->
Excl2 = sets:del_element({Mod, Idx}, Excl),
Expand Down Expand Up @@ -440,16 +422,8 @@ record_seen_index(Ring, Shrinking, NValMap, DefaultN, Mod, Src, Key, Seen) ->
FutureIndex -> ordsets:add_element(FutureIndex, Seen)
end.

get_concurrency_limit () ->
app_helper:get_env(riak_core,handoff_concurrency,?HANDOFF_CONCURRENCY).

%% true if handoff_concurrency (inbound + outbound) hasn't yet been reached
handoff_concurrency_limit_reached () ->
Receivers=supervisor:count_children(riak_core_handoff_receiver_sup),
Senders=supervisor:count_children(riak_core_handoff_sender_sup),
ActiveReceivers=proplists:get_value(active,Receivers),
ActiveSenders=proplists:get_value(active,Senders),
get_concurrency_limit() =< (ActiveReceivers + ActiveSenders).
riak_core_bg_manager:concurrency_limit_reached(handoff).

send_handoff(HOType, ModSrcTarget, Node, Pid, HS,Opts) ->
send_handoff(HOType, ModSrcTarget, Node, Pid, HS, {none, none}, none, Opts).
Expand Down Expand Up @@ -544,11 +518,9 @@ send_handoff(HOType, {Mod, Src, Target}, Node, Vnode, HS, {Filter, FilterModFun}

%% spawn a receiver process
receive_handoff (SSLOpts) ->
case handoff_concurrency_limit_reached() of
true ->
{error, max_concurrency};
false ->
{ok,Pid}=riak_core_handoff_receiver_sup:start_receiver(SSLOpts),
case riak_core_handoff_receiver_sup:start_receiver(SSLOpts) of
{error, max_concurrency} -> {error, max_concurrency};
{ok, Pid} ->
PidM = monitor(process, Pid),

%% successfully started up a new receiver
Expand Down
16 changes: 11 additions & 5 deletions src/riak_core_handoff_receiver.erl
Expand Up @@ -51,11 +51,17 @@ set_socket(Pid, Socket) ->
riak_core_gen_server:call(Pid, {set_socket, Socket}).

init([SslOpts]) ->
{ok, #state{ssl_opts = SslOpts,
tcp_mod = if SslOpts /= [] -> ssl;
true -> gen_tcp
end,
timeout_len = app_helper:get_env(riak_core, handoff_receive_timeout, ?RECV_TIMEOUT)}}.
LockMeta = [{direction, inbound}],
case riak_core_bg_manager:get_lock(handoff, LockMeta) of
max_concurrency -> {stop, max_concurrency};
ok ->
TimeoutLen = app_helper:get_env(riak_core, handoff_receive_timeout, ?RECV_TIMEOUT),
{ok, #state{ssl_opts = SslOpts,
tcp_mod = if SslOpts /= [] -> ssl;
true -> gen_tcp
end,
timeout_len = TimeoutLen}}
end.

handle_call({set_socket, Socket0}, _From, State = #state{ssl_opts = SslOpts}) ->
SockOpts = [{active, once}, {packet, 4}, {header, 1}],
Expand Down
11 changes: 11 additions & 0 deletions src/riak_core_handoff_sender.erl
Expand Up @@ -90,6 +90,17 @@ start_fold(TargetNode, Module, {Type, Opts}, ParentPid, SslOpts) ->
SrcNode = node(),
SrcPartition = get_src_partition(Opts),
TargetPartition = get_target_partition(Opts),
LockMeta = [{direction, outbound},
{handoff_type, Type},
{target_node, TargetNode},
{source_node, SrcNode},
{target_partition, TargetPartition},
{source_partition, SrcPartition}],
case riak_core_bg_manager:get_lock(handoff, LockMeta) of
max_concurrency -> exit(max_concurrency);
ok -> ok
end,

try
Filter = get_filter(Opts),
[_Name,Host] = string:tokens(atom_to_list(TargetNode), "@"),
Expand Down

0 comments on commit 6368585

Please sign in to comment.