Skip to content
Browse files

Adjustments from code review and a bug fix.

  • Loading branch information...
1 parent 65001dd commit 15ed44100dc54febc7559f8cd6200f0e7ea7e883 @massung massung committed Dec 21, 2011
Showing with 35 additions and 39 deletions.
  1. +0 −9 include/riak_core_handoff.hrl
  2. +35 −30 src/riak_core_handoff_manager.erl
View
9 include/riak_core_handoff.hrl
@@ -3,12 +3,3 @@
-define(PT_MSG_OLDSYNC, 2).
-define(PT_MSG_SYNC, 3).
-define(PT_MSG_CONFIGURE, 4).
-
-%% external information for handoffs
--record(handoff,
- { module :: atom(),
- index :: integer(),
- node :: atom(),
- type :: primary | fallback
- }).
-
View
65 src/riak_core_handoff_manager.erl
@@ -43,8 +43,12 @@
-include_lib("riak_core/include/riak_core_handoff.hrl").
-include_lib("eunit/include/eunit.hrl").
+-type mod() :: atom().
+-type index() :: integer().
+-type node_() :: atom().
+
-record(handoff_status,
- { handoff :: #handoff{},
+ { handoff :: {mod(),index(),node_()},
direction :: inbound | outbound,
transport_pid :: pid(),
timestamp :: tuple(),
@@ -56,7 +60,8 @@
handoffs :: [#handoff_status{}]
}).
--define(CONCURRENCY_LIMIT,1).
+%% this can be overridden with riak_core handoff_concurrency
+-define(HANDOFF_CONCURRENCY,1).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@@ -76,15 +81,10 @@ handoff_status() ->
gen_server:call(?MODULE,handoff_status).
set_concurrency(Limit) ->
- CurrentLimit=get_concurrency_limit(),
- application:set_env(riak_core,handoff_concurrency,Limit),
- case Limit < CurrentLimit of
- true -> gen_server:call(?MODULE,kill_old_handoffs);
- false -> ok
- end.
+ gen_server:call(?MODULE,{set_concurrency,Limit}).
kill_handoffs() ->
- gen_server:call(?MODULE,kill_handoffs).
+ set_concurrency(0).
add_exclusion(Module, Index) ->
gen_server:cast(?MODULE, {add_exclusion, {Module, Index}}).
@@ -103,32 +103,34 @@ handle_call({get_exclusions, Module}, _From, State=#state{excl=Excl}) ->
handle_call({add_outbound,Mod,Idx,Node,Pid},_From,State=#state{handoffs=HS}) ->
case send_handoff(Mod,Idx,Node,Pid) of
{ok,Handoff=#handoff_status{transport_pid=Sender}} ->
- {reply,{ok,Sender},State#state{handoffs=[Handoff|HS]}};
+ {reply,{ok,Sender},State#state{handoffs=HS ++ [Handoff]}};
Error ->
{reply,Error,State}
end;
handle_call({add_inbound,SSLOpts},_From,State=#state{handoffs=HS}) ->
case receive_handoff(SSLOpts) of
{ok,Handoff=#handoff_status{transport_pid=Receiver}} ->
- {reply,{ok,Receiver},State#state{handoffs=[Handoff|HS]}};
+ {reply,{ok,Receiver},State#state{handoffs=HS ++ [Handoff]}};
Error ->
{reply,Error,State}
end;
handle_call(handoff_status,_From,State=#state{handoffs=HS}) ->
Handoffs=[{H,D,active,S} || #handoff_status{ handoff=H,direction=D,status=S } <- HS],
{reply, {ok, Handoffs}, State};
-handle_call(kill_handoffs,_From,State=#state{handoffs=HS}) ->
- [erlang:exit(Pid,kill) || #handoff_status{transport_pid=Pid} <- HS],
- {reply, ok, State};
-handle_call(kill_old_handoffs,_From,State=#state{handoffs=HS}) ->
- Limit=get_concurrency_limit(),
- case length(HS) < Limit of
- true -> {reply, ok, State};
- false ->
- {Keep,Discard}=lists:split(Limit,HS),
+handle_call({set_concurrency,Limit},_From,State=#state{handoffs=HS}) ->
+ application:set_env(riak_core,handoff_concurrency,Limit),
+ case Limit < erlang:length(HS) of
+ true ->
+ %% Note: we don't update the state with the handoffs that we're
+ %% keeping because we'll still get the 'DOWN' messages with
+ %% a reason of 'max_concurrency' and we want to be able to do
+ %% something with that if necessary.
+ {_Keep,Discard}=lists:split(Limit,HS),
[erlang:exit(Pid,max_concurrency) ||
#handoff_status{transport_pid=Pid} <- Discard],
- {reply, ok, State#state{handoffs=Keep}}
+ {reply, ok, State};
+ false ->
+ {reply, ok, State}
end.
@@ -140,9 +142,17 @@ handle_cast({add_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) ->
{noreply, State#state{excl=ordsets:add_element({Mod, Idx}, Excl)}}.
-handle_info({'DOWN',_Ref,process,Pid,_Reason},State=#state{handoffs=HS}) ->
+handle_info({'DOWN',_Ref,process,Pid,normal},State=#state{handoffs=HS}) ->
NewHS=lists:keydelete(Pid,#handoff_status.transport_pid,HS),
{noreply, State#state{handoffs=NewHS}};
+handle_info({'DOWN',_Ref,process,Pid,Reason},State=#state{handoffs=HS}) ->
+ case lists:keytake(Pid,#handoff_status.transport_pid,HS) of
+ {value,#handoff_status{handoff={Mod,Index,_},direction=Dir},NewHS} ->
+ lager:error("An ~w handoff of partition ~w ~w was terminated for reason: ~w~n", [Dir,Mod,Index,Reason]),
+ {noreply, State#state{handoffs=NewHS}};
+ false ->
+ {noreply, State}
+ end;
handle_info(_Info, State) ->
{noreply, State}.
@@ -159,7 +169,7 @@ code_change(_OldVsn, State, _Extra) ->
%%
get_concurrency_limit () ->
- app_helper:get_env(riak_core,handoff_concurrency,?CONCURRENCY_LIMIT).
+ app_helper:get_env(riak_core,handoff_concurrency,?HANDOFF_CONCURRENCY).
%% true if handoff_concurrency (inbound + outbound) hasn't yet been reached
handoff_concurrency_limit_reached () ->
@@ -185,11 +195,7 @@ send_handoff (Module,Index,TargetNode,VnodePid) ->
{ok, #handoff_status{ transport_pid=Pid,
direction=outbound,
timestamp=now(),
- handoff=#handoff{ module=Module,
- index=Index,
- node=TargetNode,
- type=undefined
- }
+ handoff={Module,Index,TargetNode}
}
}
end.
@@ -207,8 +213,7 @@ receive_handoff (SSLOpts) ->
{ok, #handoff_status{ transport_pid=Pid,
direction=inbound,
timestamp=now(),
- handoff=#handoff{ type=undefined
- }
+ handoff={undefined,undefined,undefined}
}
}
end.

0 comments on commit 15ed441

Please sign in to comment.
Something went wrong with that request. Please try again.