Skip to content

Commit

Permalink
Beginning work of supervisor work for handoffs.
Browse files Browse the repository at this point in the history
  • Loading branch information
massung committed Dec 8, 2011
1 parent c330a65 commit 2762f75
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 5 deletions.
6 changes: 5 additions & 1 deletion ebin/riak_core.app
Expand Up @@ -30,9 +30,13 @@
riak_core_eventhandler_sup, riak_core_eventhandler_sup,
riak_core_gossip, riak_core_gossip,
riak_core_handoff_listener, riak_core_handoff_listener,
riak_core_handoff_listener_sup,
riak_core_handoff_manager, riak_core_handoff_manager,
riak_core_handoff_receiver, riak_core_handoff_receiver,
riak_core_handoff_receiver_sup,
riak_core_handoff_sender, riak_core_handoff_sender,
riak_core_handoff_sender_sup,
riak_core_handoff_sup,
riak_core_nodeid, riak_core_nodeid,
riak_core_node_watcher, riak_core_node_watcher,
riak_core_node_watcher_events, riak_core_node_watcher_events,
Expand Down Expand Up @@ -98,7 +102,7 @@
%% Vnode inactivity timeout (how often to check if fallback vnodes %% Vnode inactivity timeout (how often to check if fallback vnodes
%% should return their data) in ms. %% should return their data) in ms.
{vnode_inactivity_timeout, 60000}, {vnode_inactivity_timeout, 60000},

%% Number of VNodes allowed to do handoff concurrently. %% Number of VNodes allowed to do handoff concurrently.
{handoff_concurrency, 4}, {handoff_concurrency, 4},


Expand Down
2 changes: 1 addition & 1 deletion src/riak_core_handoff_listener.erl
Expand Up @@ -67,7 +67,7 @@ terminate(_Reason, _State) -> ok.
code_change(_OldVsn, State, _Extra) -> {ok, State}. code_change(_OldVsn, State, _Extra) -> {ok, State}.


new_connection(Socket, State = #state{ssl_opts = SslOpts}) -> new_connection(Socket, State = #state{ssl_opts = SslOpts}) ->
{ok, Pid} = riak_core_handoff_receiver:start_link(SslOpts), {ok, Pid} = riak_core_handoff_receiver_sup:start_receiver(SslOpts),
gen_tcp:controlling_process(Socket, Pid), gen_tcp:controlling_process(Socket, Pid),
ok = riak_core_handoff_receiver:set_socket(Pid, Socket), ok = riak_core_handoff_receiver:set_socket(Pid, Socket),
{ok, State}. {ok, State}.
Expand Down
39 changes: 39 additions & 0 deletions src/riak_core_handoff_listener_sup.erl
@@ -0,0 +1,39 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2011 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

-module(riak_core_handoff_listener_sup).
-behaviour(supervisor).

%% beahvior functions
-export([start_link/0,
init/1
]).

-define(CHILD(I,Type), {I,{I,start_link,[]},permanent,brutal_kill,Type,[I]}).

%% begins the supervisor, init/1 will be called
start_link () ->
supervisor:start_link({local,?MODULE},?MODULE,[]).

%% @private
init ([]) ->
{ok,{{simple_one_for_one,10,10},
[?CHILD(riak_core_handoff_listener,worker)
]}}.
47 changes: 47 additions & 0 deletions src/riak_core_handoff_receiver_sup.erl
@@ -0,0 +1,47 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2011 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

-module(riak_core_handoff_receiver_sup).
-behaviour(supervisor).

%% beahvior functions
-export([start_link/0,
init/1
]).

%% public functions
-export([start_receiver/1
]).

-define(CHILD(I,Type), {I,{I,start_link,[]},permanent,brutal_kill,Type,[I]}).

%% begins the supervisor, init/1 will be called
start_link () ->
supervisor:start_link({local,?MODULE},?MODULE,[]).

%% @private
init ([]) ->
{ok,{{simple_one_for_one,10,10},
[?CHILD(riak_core_handoff_receiver,worker)
]}}.

%% start a sender process
start_receiver (SSLOpts) ->
supervisor:start_child(?MODULE,[SSLOpts]).
47 changes: 47 additions & 0 deletions src/riak_core_handoff_sender_sup.erl
@@ -0,0 +1,47 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2011 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

-module(riak_core_handoff_sender_sup).
-behaviour(supervisor).

%% beahvior functions
-export([start_link/0,
init/1
]).

%% public functions
-export([start_sender/3
]).

-define(CHILD(I,Type), {I,{I,start_link,[]},permanent,brutal_kill,Type,[I]}).

%% begins the supervisor, init/1 will be called
start_link () ->
supervisor:start_link({local,?MODULE},?MODULE,[]).

%% @private
init ([]) ->
{ok,{{simple_one_for_one,10,10},
[?CHILD(riak_core_handoff_sender,worker)
]}}.

%% start a sender process
start_sender (TargetNode,Module,Partition) ->
supervisor:start_child(?MODULE,[TargetNode,Module,Partition]).
50 changes: 50 additions & 0 deletions src/riak_core_handoff_sup.erl
@@ -0,0 +1,50 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2011 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

-module(riak_core_handoff_sup).
-behaviour(supervisor).

%% beahvior functions
-export([start_link/0,
init/1
]).

%% public functions
-export([start_sender/0
]).

-define(CHILD(I,Type), {I,{I,start_link,[]},permanent,brutal_kill,Type,[I]}).

%% begins the supervisor, init/1 will be called
start_link () ->
supervisor:start_link({local,?MODULE},?MODULE,[]).

%% @private
init ([]) ->
{ok,{{one_for_all,10,10},
[?CHILD(riak_core_handoff_receiver_sup,supervisor),
?CHILD(riak_core_handoff_sender_sup,supervisor),
?CHILD(riak_core_handoff_listener_sup,supervisor),
?CHILD(riak_core_handoff_manager,worker)
]}}.

%% start a sender process
start_sender () ->
supervisor:start_child(?MODULE,[]).
3 changes: 1 addition & 2 deletions src/riak_core_sup.erl
Expand Up @@ -61,8 +61,7 @@ init([]) ->
?CHILD(riak_core_stat, worker), ?CHILD(riak_core_stat, worker),
?CHILD(riak_core_vnode_sup, supervisor), ?CHILD(riak_core_vnode_sup, supervisor),
?CHILD(riak_core_eventhandler_sup, supervisor), ?CHILD(riak_core_eventhandler_sup, supervisor),
?CHILD(riak_core_handoff_manager, worker), ?CHILD(riak_core_handoff_sup, supervisor),
?CHILD(riak_core_handoff_listener, worker),
?CHILD(riak_core_ring_events, worker), ?CHILD(riak_core_ring_events, worker),
?CHILD(riak_core_ring_manager, worker), ?CHILD(riak_core_ring_manager, worker),
?CHILD(riak_core_node_watcher_events, worker), ?CHILD(riak_core_node_watcher_events, worker),
Expand Down
4 changes: 3 additions & 1 deletion src/riak_core_vnode.erl
Expand Up @@ -528,7 +528,9 @@ start_handoff(State=#state{index=Idx, mod=Mod, modstate=ModState}, TargetNode) -
NewState = State#state{modstate=NewModState, NewState = State#state{modstate=NewModState,
handoff_token=HandoffToken, handoff_token=HandoffToken,
handoff_node=TargetNode}, handoff_node=TargetNode},
{ok, HandoffPid} = riak_core_handoff_sender:start_link(TargetNode, Mod, Idx), {ok, HandoffPid} = riak_core_handoff_sender_sup:start_sender(TargetNode, Mod, Idx),
io:format(">>>>> ~w~n", [HandoffPid]),
%{ok, HandoffPid} = riak_core_handoff_sender:start_link(TargetNode, Mod, Idx),
riak_core_handoff_manager:add_handoff(Mod, Idx, TargetNode), riak_core_handoff_manager:add_handoff(Mod, Idx, TargetNode),
continue(NewState#state{handoff_pid=HandoffPid}) continue(NewState#state{handoff_pid=HandoffPid})
end end
Expand Down

0 comments on commit 2762f75

Please sign in to comment.