Skip to content
This repository
Browse code

Track handing off nodes and active nodes

  • Loading branch information...
commit 5d13b5ca2db28bbc2f010971155924ec4fd8b71b 1 parent 112c62e
Russell Brown authored August 22, 2011
32  src/riak_core_handoff_manager.erl
@@ -21,13 +21,17 @@
21 21
 -export([add_exclusion/2, get_handoff_lock/1, get_exclusions/1]).
22 22
 -export([remove_exclusion/2]).
23 23
 -export([release_handoff_lock/2]).
24  
--record(state, {excl}).
  24
+-export([add_handoff/3]).
  25
+-export([remove_handoff/2]).
  26
+-export([get_handoff/2]).
  27
+-export([all_handoffs/0]).
  28
+-record(state, {excl, handoffs}).
25 29
 
26 30
 start_link() ->
27 31
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
28 32
 
29 33
 init([]) ->
30  
-    {ok, #state{excl=ordsets:new()}}.
  34
+    {ok, #state{excl=ordsets:new(), handoffs=dict:new()}}.
31 35
 
32 36
 add_exclusion(Module, Index) ->
33 37
     gen_server:cast(?MODULE, {add_exclusion, {Module, Index}}).
@@ -38,6 +42,18 @@ remove_exclusion(Module, Index) ->
38 42
 get_exclusions(Module) ->
39 43
     gen_server:call(?MODULE, {get_exclusions, Module}, infinity).
40 44
 
  45
+add_handoff(Module, Index, TargetHost) ->
  46
+    gen_server:cast(?MODULE, {add_handoff, {Module, Index, TargetHost}}).
  47
+
  48
+remove_handoff(Module, Index) ->
  49
+    gen_server:cast(?MODULE, {del_handoff, {Module, Index}}).
  50
+
  51
+get_handoff(Module, Index) ->
  52
+    gen_server:call(?MODULE, {get_handoff, {Module, Index}}).
  53
+
  54
+all_handoffs() ->
  55
+    gen_server:call(?MODULE, all_handoffs).
  56
+
41 57
 get_handoff_lock(LockId) ->
42 58
     TokenCount = app_helper:get_env(riak_core, handoff_concurrency, 4),
43 59
     get_handoff_lock(LockId, TokenCount).
@@ -57,14 +73,22 @@ release_handoff_lock(LockId, Token) ->
57 73
     
58 74
 handle_call({get_exclusions, Module}, _From, State=#state{excl=Excl}) ->
59 75
     Reply =  [I || {M, I} <- ordsets:to_list(Excl), M =:= Module],
60  
-    {reply, {ok, Reply}, State}.
  76
+    {reply, {ok, Reply}, State};
  77
+handle_call({get_handoff, HandOff}, _From, State=#state{handoffs=HandOffs}) ->
  78
+    {reply, {ok, dict:find(HandOff, HandOffs)}, State};
  79
+handle_call(all_handoffs, _From, State=#state{handoffs=HandOffs}) ->
  80
+    {reply, dict:to_list(HandOffs), State}.
61 81
 
62 82
 handle_cast({del_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) ->
63 83
     {noreply, State#state{excl=ordsets:del_element({Mod, Idx}, Excl)}};
64 84
 handle_cast({add_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) ->
65 85
     {ok, Ring} = riak_core_ring_manager:get_my_ring(),
66 86
     riak_core_ring_events:ring_update(Ring),
67  
-    {noreply, State#state{excl=ordsets:add_element({Mod, Idx}, Excl)}}.    
  87
+    {noreply, State#state{excl=ordsets:add_element({Mod, Idx}, Excl)}};
  88
+handle_cast({add_handoff, {Mod, Idx, TargetNode}}, State=#state{handoffs=HandOffs}) ->
  89
+    {noreply, State#state{handoffs=dict:append({Mod, Idx}, TargetNode, HandOffs)}};
  90
+handle_cast({del_handoff, HandOff}, State=#state{handoffs=HandOffs}) ->
  91
+    {noreply, State#state{handoffs=dict:erase(HandOff, HandOffs)}}.
68 92
 
69 93
 handle_info(_Info, State) ->
70 94
     {noreply, State}.
3  src/riak_core_vnode.erl
@@ -214,6 +214,7 @@ active(handoff_complete, State=#state{mod=Mod,
214 214
                                       handoff_token=HT}) ->
215 215
     %% ?debugFmt("Finished HO: ~p :: ~p -> ~p~n", [Idx, Prev, New]),
216 216
     riak_core_handoff_manager:release_handoff_lock({Mod, Idx}, HT),
  217
+    riak_core_handoff_manager:remove_handoff(Mod, Idx),
217 218
     Mod:handoff_finished(HN, ModState),
218 219
     finish_handoff(State);
219 220
 active({handoff_error, _Err, _Reason}, State=#state{mod=Mod, 
@@ -221,6 +222,7 @@ active({handoff_error, _Err, _Reason}, State=#state{mod=Mod,
221 222
                                                     index=Idx, 
222 223
                                                     handoff_token=HT}) ->
223 224
     riak_core_handoff_manager:release_handoff_lock({Mod, Idx}, HT),
  225
+    riak_core_handoff_manager:remove_handoff(Mod, Idx),
224 226
     %% it would be nice to pass {Err, Reason} to the vnode but the 
225 227
     %% API doesn't currently allow for that.
226 228
     Mod:handoff_cancelled(ModState),
@@ -361,6 +363,7 @@ start_handoff(State=#state{index=Idx, mod=Mod, modstate=ModState}, TargetNode) -
361 363
                                            handoff_token=HandoffToken,
362 364
                                            handoff_node=TargetNode},
363 365
                     {ok, HandoffPid} = riak_core_handoff_sender:start_link(TargetNode, Mod, Idx),
  366
+                    riak_core_handoff_manager:add_handoff(Mod, Idx, TargetNode),
364 367
                     continue(NewState#state{handoff_pid=HandoffPid})
365 368
             end
366 369
     end.
13  src/riak_core_vnode_master.erl
@@ -25,7 +25,7 @@
25 25
 -module(riak_core_vnode_master).
26 26
 -include_lib("riak_core_vnode.hrl").
27 27
 -behaviour(gen_server).
28  
--export([start_link/1, start_link/2, get_vnode_pid/2,
  28
+-export([start_link/1, start_link/2, get_vnode_pid/2, is_vnode_pid/2,
29 29
          start_vnode/2, command/3, command/4, sync_command/3,
30 30
          coverage/5,
31 31
          command_return_vnode/4,
@@ -59,6 +59,10 @@ get_vnode_pid(Index, VNodeMod) ->
59 59
     RegName = reg_name(VNodeMod),
60 60
     gen_server:call(RegName, {Index, get_vnode}, infinity).
61 61
 
  62
+is_vnode_pid(Index, VNodeMod) ->
  63
+    RegName = reg_name(VNodeMod),
  64
+    gen_server:call(RegName, {Index, is_vnode}, infinity).
  65
+
62 66
 command(Preflist, Msg, VMaster) ->
63 67
     command(Preflist, Msg, ignore, VMaster).
64 68
 
@@ -206,6 +210,13 @@ handle_call(all_nodes, _From, State) ->
206 210
 handle_call({Partition, get_vnode}, _From, State) ->
207 211
     Pid = get_vnode(Partition, State),
208 212
     {reply, {ok, Pid}, State};
  213
+handle_call({Partition, is_vnode}, _From, State) ->
  214
+    Reply = case idx2vnode(Partition, State) of
  215
+                no_match ->
  216
+                    false;
  217
+                Pid -> {true, Pid}
  218
+            end,
  219
+    {reply, Reply, State};
209 220
 handle_call(Other, From, State=#state{legacy=Legacy}) when Legacy =/= undefined ->
210 221
     case catch Legacy:rewrite_call(Other, From) of
211 222
         {ok, ?VNODE_REQ{}=Req} ->

0 notes on commit 5d13b5c

Please sign in to comment.
Something went wrong with that request. Please try again.