diff --git a/ebin/riak_core.app b/ebin/riak_core.app index 505f1fa78..b1f2da40b 100644 --- a/ebin/riak_core.app +++ b/ebin/riak_core.app @@ -30,9 +30,13 @@ riak_core_eventhandler_sup, riak_core_gossip, riak_core_handoff_listener, + riak_core_handoff_listener_sup, riak_core_handoff_manager, riak_core_handoff_receiver, + riak_core_handoff_receiver_sup, riak_core_handoff_sender, + riak_core_handoff_sender_sup, + riak_core_handoff_sup, riak_core_nodeid, riak_core_node_watcher, riak_core_node_watcher_events, @@ -98,7 +102,7 @@ %% Vnode inactivity timeout (how often to check if fallback vnodes %% should return their data) in ms. {vnode_inactivity_timeout, 60000}, - + %% Number of VNodes allowed to do handoff concurrently. {handoff_concurrency, 4}, diff --git a/src/riak_core_handoff_listener.erl b/src/riak_core_handoff_listener.erl index ed5aef015..9d282167a 100644 --- a/src/riak_core_handoff_listener.erl +++ b/src/riak_core_handoff_listener.erl @@ -67,7 +67,7 @@ terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. new_connection(Socket, State = #state{ssl_opts = SslOpts}) -> - {ok, Pid} = riak_core_handoff_receiver:start_link(SslOpts), + {ok, Pid} = riak_core_handoff_receiver_sup:start_receiver(SslOpts), gen_tcp:controlling_process(Socket, Pid), ok = riak_core_handoff_receiver:set_socket(Pid, Socket), {ok, State}. diff --git a/src/riak_core_handoff_listener_sup.erl b/src/riak_core_handoff_listener_sup.erl new file mode 100644 index 000000000..bcdb935c8 --- /dev/null +++ b/src/riak_core_handoff_listener_sup.erl @@ -0,0 +1,39 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2011 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(riak_core_handoff_listener_sup). +-behaviour(supervisor). + +%% beahvior functions +-export([start_link/0, + init/1 + ]). + +-define(CHILD(I,Type), {I,{I,start_link,[]},permanent,brutal_kill,Type,[I]}). + +%% begins the supervisor, init/1 will be called +start_link () -> + supervisor:start_link({local,?MODULE},?MODULE,[]). + +%% @private +init ([]) -> + {ok,{{simple_one_for_one,10,10}, + [?CHILD(riak_core_handoff_listener,worker) + ]}}. diff --git a/src/riak_core_handoff_receiver_sup.erl b/src/riak_core_handoff_receiver_sup.erl new file mode 100644 index 000000000..014a1a609 --- /dev/null +++ b/src/riak_core_handoff_receiver_sup.erl @@ -0,0 +1,47 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2011 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(riak_core_handoff_receiver_sup). +-behaviour(supervisor). + +%% beahvior functions +-export([start_link/0, + init/1 + ]). + +%% public functions +-export([start_receiver/1 + ]). + +-define(CHILD(I,Type), {I,{I,start_link,[]},permanent,brutal_kill,Type,[I]}). + +%% begins the supervisor, init/1 will be called +start_link () -> + supervisor:start_link({local,?MODULE},?MODULE,[]). + +%% @private +init ([]) -> + {ok,{{simple_one_for_one,10,10}, + [?CHILD(riak_core_handoff_receiver,worker) + ]}}. + +%% start a sender process +start_receiver (SSLOpts) -> + supervisor:start_child(?MODULE,[SSLOpts]). diff --git a/src/riak_core_handoff_sender_sup.erl b/src/riak_core_handoff_sender_sup.erl new file mode 100644 index 000000000..90d6038fb --- /dev/null +++ b/src/riak_core_handoff_sender_sup.erl @@ -0,0 +1,47 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2011 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(riak_core_handoff_sender_sup). +-behaviour(supervisor). + +%% beahvior functions +-export([start_link/0, + init/1 + ]). + +%% public functions +-export([start_sender/3 + ]). + +-define(CHILD(I,Type), {I,{I,start_link,[]},permanent,brutal_kill,Type,[I]}). + +%% begins the supervisor, init/1 will be called +start_link () -> + supervisor:start_link({local,?MODULE},?MODULE,[]). + +%% @private +init ([]) -> + {ok,{{simple_one_for_one,10,10}, + [?CHILD(riak_core_handoff_sender,worker) + ]}}. + +%% start a sender process +start_sender (TargetNode,Module,Partition) -> + supervisor:start_child(?MODULE,[TargetNode,Module,Partition]). diff --git a/src/riak_core_handoff_sup.erl b/src/riak_core_handoff_sup.erl new file mode 100644 index 000000000..56f5e914e --- /dev/null +++ b/src/riak_core_handoff_sup.erl @@ -0,0 +1,50 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2011 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(riak_core_handoff_sup). +-behaviour(supervisor). + +%% beahvior functions +-export([start_link/0, + init/1 + ]). + +%% public functions +-export([start_sender/0 + ]). + +-define(CHILD(I,Type), {I,{I,start_link,[]},permanent,brutal_kill,Type,[I]}). + +%% begins the supervisor, init/1 will be called +start_link () -> + supervisor:start_link({local,?MODULE},?MODULE,[]). + +%% @private +init ([]) -> + {ok,{{one_for_all,10,10}, + [?CHILD(riak_core_handoff_receiver_sup,supervisor), + ?CHILD(riak_core_handoff_sender_sup,supervisor), + ?CHILD(riak_core_handoff_listener_sup,supervisor), + ?CHILD(riak_core_handoff_manager,worker) + ]}}. + +%% start a sender process +start_sender () -> + supervisor:start_child(?MODULE,[]). diff --git a/src/riak_core_sup.erl b/src/riak_core_sup.erl index afc81b873..51db5380e 100644 --- a/src/riak_core_sup.erl +++ b/src/riak_core_sup.erl @@ -61,8 +61,7 @@ init([]) -> ?CHILD(riak_core_stat, worker), ?CHILD(riak_core_vnode_sup, supervisor), ?CHILD(riak_core_eventhandler_sup, supervisor), - ?CHILD(riak_core_handoff_manager, worker), - ?CHILD(riak_core_handoff_listener, worker), + ?CHILD(riak_core_handoff_sup, supervisor), ?CHILD(riak_core_ring_events, worker), ?CHILD(riak_core_ring_manager, worker), ?CHILD(riak_core_node_watcher_events, worker), diff --git a/src/riak_core_vnode.erl b/src/riak_core_vnode.erl index 3e2d08dfb..2d47e6c4b 100644 --- a/src/riak_core_vnode.erl +++ b/src/riak_core_vnode.erl @@ -528,7 +528,9 @@ start_handoff(State=#state{index=Idx, mod=Mod, modstate=ModState}, TargetNode) - NewState = State#state{modstate=NewModState, handoff_token=HandoffToken, handoff_node=TargetNode}, - {ok, HandoffPid} = riak_core_handoff_sender:start_link(TargetNode, Mod, Idx), + {ok, HandoffPid} = riak_core_handoff_sender_sup:start_sender(TargetNode, Mod, Idx), + io:format(">>>>> ~w~n", [HandoffPid]), + %{ok, HandoffPid} = riak_core_handoff_sender:start_link(TargetNode, Mod, Idx), riak_core_handoff_manager:add_handoff(Mod, Idx, TargetNode), continue(NewState#state{handoff_pid=HandoffPid}) end