diff --git a/ebin/riak_core.app b/ebin/riak_core.app index 01cbb91c5..624a5ca1a 100644 --- a/ebin/riak_core.app +++ b/ebin/riak_core.app @@ -28,6 +28,7 @@ riak_core_coverage_plan, riak_core_eventhandler_guard, riak_core_eventhandler_sup, + riak_core_format, riak_core_gossip, riak_core_handoff_listener, riak_core_handoff_listener_sup, diff --git a/include/riak_core_handoff.hrl b/include/riak_core_handoff.hrl index ad27f3a7c..00dacb567 100644 --- a/include/riak_core_handoff.hrl +++ b/include/riak_core_handoff.hrl @@ -3,3 +3,13 @@ -define(PT_MSG_OLDSYNC, 2). -define(PT_MSG_SYNC, 3). -define(PT_MSG_CONFIGURE, 4). + +-record(ho_stats, + { + interval_end :: erlang:timestamp(), + last_update :: erlang:timestamp(), + objs=0 :: non_neg_integer(), + bytes=0 :: non_neg_integer() + }). + +-type ho_stats() :: #ho_stats{}. diff --git a/src/riak_core_format.erl b/src/riak_core_format.erl new file mode 100644 index 000000000..0556dbae6 --- /dev/null +++ b/src/riak_core_format.erl @@ -0,0 +1,79 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc Functions for formatting data. + +-module(riak_core_format). +-export([fmt/2, + human_size_fmt/2, + human_time_fmt/2]). + +%% @doc Created a string `Str' based on the format string `FmtStr' and +%% list of args `Args'. +-spec fmt(string(), list()) -> Str::string(). +fmt(FmtStr, Args) -> + lists:flatten(io_lib:format(FmtStr, Args)). + +%% @doc Create a human friendly string `Str' for number of bytes +%% `Bytes' and format based on format string `Fmt'. +-spec human_size_fmt(string(), non_neg_integer()) -> Str::string(). +human_size_fmt(Fmt, Bytes) -> + Fmt2 = Fmt ++ " ~s", + {Value, Units} = human_size(Bytes, ["B","KB","MB","GB","TB","PB"]), + fmt(Fmt2, [Value, Units]). + +%% @doc Create a human friendly string `Str' for the given time in +%% microseconds `Micros'. Format according to format string +%% `Fmt'. +-spec human_time_fmt(string(), non_neg_integer()) -> Str::string(). +human_time_fmt(Fmt, Micros) -> + Fmt2 = Fmt ++ " ~s", + {Value, Units} = human_time(Micros), + fmt(Fmt2, [Value, Units]). + +%%%=================================================================== +%%% Private +%%%=================================================================== + +%% @private +%% +%% @doc Formats a byte size into a human-readable size with units. +%% Thanks StackOverflow: +%% http://stackoverflow.com/questions/2163691/simpler-way-to-format-bytesize-in-a-human-readable-way +-spec human_size(non_neg_integer(), list()) -> iolist(). +human_size(S, [_|[_|_] = L]) when S >= 1024 -> human_size(S/1024, L); +human_size(S, [M|_]) -> + {float(S), M}. + +%% @private +%% +%% @doc Given a number of `Micros' returns a human friendly time +%% duration in the form of `{Value, Units}'. +-spec human_time(non_neg_integer()) -> {Value::number(), Units::string()}. +human_time(Micros) -> + human_time(Micros, {1000, "us"}, [{1000, "ms"}, {1000, "s"}, {60, "min"}, {60, "hr"}, {24, "d"}]). + +-spec human_time(non_neg_integer(), {pos_integer(), string()}, + [{pos_integer(), string()}]) -> + {number(), string()}. +human_time(T, {Divisor, Unit}, Units) when T < Divisor orelse Units == [] -> + {float(T), Unit}; +human_time(T, {Divisor, _}, [Next|Units]) -> + human_time(T / Divisor, Next, Units). diff --git a/src/riak_core_handoff_manager.erl b/src/riak_core_handoff_manager.erl index b6a73664c..821dd488a 100644 --- a/src/riak_core_handoff_manager.erl +++ b/src/riak_core_handoff_manager.erl @@ -36,6 +36,8 @@ -export([add_outbound/4, add_inbound/1, status/0, + status/1, + status_update/2, set_concurrency/1, kill_handoffs/0 ]). @@ -46,17 +48,20 @@ -include_lib("eunit/include/eunit.hrl"). -endif. --type mod() :: atom(). -type index() :: integer(). +-type modindex() :: {module(), index()}. -record(handoff_status, - { modindex :: {mod(),index()}, - node :: atom(), + { modindex :: modindex(), + src_node :: node(), + target_node :: node(), direction :: inbound | outbound, transport_pid :: pid(), timestamp :: tuple(), status :: any(), - vnode_pid :: pid() | undefined + stats :: dict(), + vnode_pid :: pid() | undefined, + type :: ownership | hinted_handoff }). -record(state, @@ -67,14 +72,17 @@ %% this can be overridden with riak_core handoff_concurrency -define(HANDOFF_CONCURRENCY,2). + +%%%=================================================================== +%%% API +%%%=================================================================== + start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). init([]) -> {ok, #state{excl=ordsets:new(), handoffs=[]}}. -%% handoff_manager API - add_outbound(Module,Idx,Node,VnodePid) -> gen_server:call(?MODULE,{add_outbound,Module,Idx,Node,VnodePid}). @@ -82,7 +90,16 @@ add_inbound(SSLOpts) -> gen_server:call(?MODULE,{add_inbound,SSLOpts}). status() -> - gen_server:call(?MODULE,status). + status(none). + +status(Filter) -> + gen_server:call(?MODULE, {status, Filter}). + +%% @doc Send status updates `Stats' to the handoff manager for a +%% particular handoff identified by `ModIdx'. +-spec status_update(modindex(), proplists:proplist()) -> ok. +status_update(ModIdx, Stats) -> + gen_server:cast(?MODULE, {status_update, ModIdx, Stats}). set_concurrency(Limit) -> gen_server:call(?MODULE,{set_concurrency,Limit}). @@ -99,7 +116,10 @@ remove_exclusion(Module, Index) -> get_exclusions(Module) -> gen_server:call(?MODULE, {get_exclusions, Module}, infinity). -%% gen_server API + +%%%=================================================================== +%%% Callbacks +%%%=================================================================== handle_call({get_exclusions, Module}, _From, State=#state{excl=Excl}) -> Reply = [I || {M, I} <- ordsets:to_list(Excl), M =:= Module], @@ -120,10 +140,11 @@ handle_call({add_inbound,SSLOpts},_From,State=#state{handoffs=HS}) -> Error -> {reply,Error,State} end; -handle_call(status,_From,State=#state{handoffs=HS}) -> - Handoffs=[{M,N,D,active,S} || - #handoff_status{modindex=M,node=N,direction=D,status=S} <- HS], - {reply, Handoffs, State}; + +handle_call({status, Filter}, _From, State=#state{handoffs=HS}) -> + Status = lists:filter(filter(Filter), [build_status(HO) || HO <- HS]), + {reply, Status, State}; + handle_call({set_concurrency,Limit},_From,State=#state{handoffs=HS}) -> application:set_env(riak_core,handoff_concurrency,Limit), case Limit < erlang:length(HS) of @@ -140,7 +161,6 @@ handle_call({set_concurrency,Limit},_From,State=#state{handoffs=HS}) -> {reply, ok, State} end. - handle_cast({del_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) -> {noreply, State#state{excl=ordsets:del_element({Mod, Idx}, Excl)}}; handle_cast({add_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) -> @@ -152,8 +172,19 @@ handle_cast({add_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) -> _ -> ok end, - {noreply, State#state{excl=ordsets:add_element({Mod, Idx}, Excl)}}. + {noreply, State#state{excl=ordsets:add_element({Mod, Idx}, Excl)}}; +handle_cast({status_update, ModIdx, StatsUpdate}, State=#state{handoffs=HS}) -> + case lists:keyfind(ModIdx, #handoff_status.modindex, HS) of + false -> + lager:error("status_update for non-existing handoff ~p", [ModIdx]), + {noreply, State}; + HO -> + Stats2 = update_stats(StatsUpdate, HO#handoff_status.stats), + HO2 = HO#handoff_status{stats=Stats2}, + HS2 = lists:keyreplace(ModIdx, #handoff_status.modindex, HS, HO2), + {noreply, State#state{handoffs=HS2}} + end. handle_info({'DOWN',_Ref,process,Pid,Reason},State=#state{handoffs=HS}) -> case lists:keytake(Pid,#handoff_status.transport_pid,HS) of @@ -201,9 +232,57 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%% -%% @private functions -%% + +%%%=================================================================== +%%% Private +%%%=================================================================== + +build_status(HO) -> + #handoff_status{modindex={Mod, Partition}, + src_node=SrcNode, + target_node=TargetNode, + direction=Dir, + status=Status, + timestamp=StartTS, + transport_pid=TPid, + stats=Stats, + type=Type}=HO, + {status_v2, [{mod, Mod}, + {partition, Partition}, + {src_node, SrcNode}, + {target_node, TargetNode}, + {direction, Dir}, + {status, Status}, + {start_ts, StartTS}, + {sender_pid, TPid}, + {stats, calc_stats(Stats, StartTS)}, + {type, Type}]}. + +calc_stats(Stats, StartTS) -> + case dict:find(last_update, Stats) of + error -> + no_stats; + {ok, LastUpdate} -> + Objs = dict:fetch(objs, Stats), + Bytes = dict:fetch(bytes, Stats), + ElapsedS = timer:now_diff(LastUpdate, StartTS) / 1000000, + ObjsS = round(Objs / ElapsedS), + BytesS = round(Bytes / ElapsedS), + [{objs_total, Objs}, + {objs_per_s, ObjsS}, + {bytes_per_s, BytesS}, + {last_update, LastUpdate}] + end. + +filter(none) -> + fun(_) -> true end; +filter({Key, Value}=_Filter) -> + fun({status_v2, Status}) -> + case proplists:get_value(Key, Status) of + Value -> true; + _ -> false + end + end. get_concurrency_limit () -> app_helper:get_env(riak_core,handoff_concurrency,?HANDOFF_CONCURRENCY). @@ -226,7 +305,7 @@ send_handoff (Mod,Idx,Node,Vnode,HS) -> case lists:keyfind({Mod,Idx},#handoff_status.modindex,HS) of false -> true; - Handoff=#handoff_status{node=Node,vnode_pid=Vnode} -> + Handoff=#handoff_status{target_node=Node,vnode_pid=Vnode} -> {false,Handoff}; #handoff_status{transport_pid=Sender} -> %% found a running handoff with a different vnode @@ -238,6 +317,13 @@ send_handoff (Mod,Idx,Node,Vnode,HS) -> case ShouldHandoff of true -> + {ok, Ring} = riak_core_ring_manager:get_my_ring(), + %% assumes local node is doing the sending + Primary = riak_core_ring:is_primary(Ring, {Idx, node()}), + HOType = if Primary -> ownership_handoff; + true -> hinted_handoff + end, + %% start the sender process {ok,Pid}=riak_core_handoff_sender_sup:start_sender(Node, Mod, @@ -249,10 +335,13 @@ send_handoff (Mod,Idx,Node,Vnode,HS) -> {ok, #handoff_status{ transport_pid=Pid, direction=outbound, timestamp=now(), - node=Node, + src_node=node(), + target_node=Node, modindex={Mod,Idx}, vnode_pid=Vnode, - status=[] + status=[], + stats=dict:new(), + type=HOType } }; @@ -276,15 +365,24 @@ receive_handoff (SSLOpts) -> direction=inbound, timestamp=now(), modindex={undefined,undefined}, - node=undefined, - status=[] + src_node=undefined, + target_node=undefined, + status=[], + stats=dict:new() } } end. -%% -%% EUNIT tests... -%% +update_stats(StatsUpdate, Stats) -> + #ho_stats{last_update=LU, objs=Objs, bytes=Bytes}=StatsUpdate, + Stats2 = dict:update_counter(objs, Objs, Stats), + Stats3 = dict:update_counter(bytes, Bytes, Stats2), + dict:store(last_update, LU, Stats3). + + +%%%=================================================================== +%%% Tests +%%%=================================================================== -ifdef (TEST_BROKEN_AZ_TICKET_1042). diff --git a/src/riak_core_handoff_sender.erl b/src/riak_core_handoff_sender.erl index 93f704151..e9e31a31f 100644 --- a/src/riak_core_handoff_sender.erl +++ b/src/riak_core_handoff_sender.erl @@ -29,13 +29,16 @@ -define(ACK_COUNT, 1000). %% can be set with env riak_core, handoff_timeout -define(TCP_TIMEOUT, 60000). +%% can be set with env riak_core, handoff_status_interval +%% note this is in seconds +-define(STATUS_INTERVAL, 2). -start_link(TargetNode, Module, Partition, VnodePid) -> +start_link(TargetNode, Module, Partition, Vnode) -> SslOpts = get_handoff_ssl_options(), Pid = spawn_link(fun()->start_fold(TargetNode, Module, Partition, - VnodePid, + Vnode, SslOpts) end), {ok, Pid}. @@ -85,13 +88,16 @@ start_fold(TargetNode, Module, Partition, ParentPid, SslOpts) -> M = <>, ok = TcpMod:send(Socket, M), - StartFoldTime = now(), + StartFoldTime = os:timestamp(), MRef = monitor(process, ParentPid), process_flag(trap_exit, true), SPid = self(), + Stats = #ho_stats{interval_end=future_now(get_status_interval())}, + Req = ?FOLD_REQ{foldfun=fun visit_item/3, - acc0={Socket, ParentPid, Module, TcpMod, 0,0, ok}}, + acc0={Socket, ParentPid, Module, TcpMod, + 0, 0, Stats, Partition, ok}}, HPid = spawn_link( @@ -99,7 +105,7 @@ start_fold(TargetNode, Module, Partition, ParentPid, SslOpts) -> %% match structure here because otherwise %% you'll end up in infinite loop below if you %% return something other than what it expects. - R = {Socket,ParentPid,Module,TcpMod,_Ack,_,_} = + R = {Socket,ParentPid,Module,TcpMod,_Ack,_,_,_,_} = riak_core_vnode_master:sync_command({Partition, node()}, Req, VMaster, infinity), @@ -121,7 +127,7 @@ start_fold(TargetNode, Module, Partition, ParentPid, SslOpts) -> [Module, Partition, node(), TargetNode, From, E]), error; - {MRef, {Socket,ParentPid,Module,TcpMod,_Ack,SentCount,ErrStatus}} -> + {MRef, {Socket,ParentPid,Module,TcpMod,_Ack,SentCount,_,_,ErrStatus}} -> case ErrStatus of ok -> @@ -175,39 +181,59 @@ start_fold(TargetNode, Module, Partition, ParentPid, SslOpts) -> "TCP recv timeout in handoff of partition ~p ~p from ~p to ~p", [Module, Partition, node(), TargetNode]); Err:Reason -> - lager:error("Handoff of partition ~p ~p from ~p to ~p failed ~p:~p", + lager:error("Handoff of partition ~p ~p from ~p to ~p failed ~p:~p ~p", [Module, Partition, node(), TargetNode, - Err, Reason]), + Err, Reason, erlang:get_stacktrace()]), gen_fsm:send_event(ParentPid, {handoff_error, Err, Reason}) end. %% When a tcp error occurs, the ErrStatus argument is set to {error, Reason}. %% Since we can't abort the fold, this clause is just a no-op. -visit_item(_K, _V, {Socket, ParentPid, Module, TcpMod, Ack, Total, - {error, Reason}}) -> - {Socket, ParentPid, Module, TcpMod, Ack, Total, {error, Reason}}; -visit_item(K, V, {Socket, ParentPid, Module, TcpMod, ?ACK_COUNT, Total, _Err}) -> +%% +%% TODO: turn accumulator into record, this tuple is out of control +visit_item(_K, _V, {Socket, Parent, Module, TcpMod, Ack, Total, Stats, + Partition, {error, Reason}}) -> + {Socket, Parent, Module, TcpMod, Ack, Total, Stats, Partition, + {error, Reason}}; +visit_item(K, V, {Socket, Parent, Module, TcpMod, ?ACK_COUNT, Total, + Stats, Partition, _Err}) -> RecvTimeout = get_handoff_receive_timeout(), M = <>, + NumBytes = byte_size(M), + + Stats2 = incr_bytes(Stats, NumBytes), + Stats3 = maybe_send_status({Module, Partition}, Stats2), + case TcpMod:send(Socket, M) of ok -> case TcpMod:recv(Socket, 0, RecvTimeout) of {ok,[?PT_MSG_OLDSYNC|<<"sync">>]} -> - visit_item(K, V, {Socket, ParentPid, Module, TcpMod, 0, Total, ok}); + visit_item(K, V, {Socket, Parent, Module, TcpMod, 0, + Total, Stats3, Partition, ok}); {error, Reason} -> - {Socket, ParentPid, Module, TcpMod, 0, Total, {error, Reason}} + {Socket, Parent, Module, TcpMod, 0, Total, + Stats3, Partition, {error, Reason}} end; {error, Reason} -> - {Socket, ParentPid, Module, TcpMod, 0, Total, {error, Reason}} + {Socket, Parent, Module, TcpMod, 0, Total, Stats3, + Partition, {error, Reason}} end; -visit_item(K, V, {Socket, ParentPid, Module, TcpMod, Ack, Total, _ErrStatus}) -> +visit_item(K, V, {Socket, Parent, Module, TcpMod, Ack, Total, Stats, + Partition, _ErrStatus}) -> BinObj = Module:encode_handoff_item(K, V), M = <>, + NumBytes = byte_size(M), + + Stats2 = incr_bytes(incr_objs(Stats), NumBytes), + Stats3 = maybe_send_status({Module, Partition}, Stats2), + case TcpMod:send(Socket, M) of ok -> - {Socket, ParentPid, Module, TcpMod, Ack+1, Total+1, ok}; + {Socket, Parent, Module, TcpMod, Ack+1, Total+1, Stats3, + Partition, ok}; {error, Reason} -> - {Socket, ParentPid, Module, TcpMod, Ack, Total, {error, Reason}} + {Socket, Parent, Module, TcpMod, Ack, Total, Stats3, + Partition, {error, Reason}} end. get_handoff_port(Node) when is_atom(Node) -> @@ -250,5 +276,55 @@ get_handoff_receive_timeout() -> app_helper:get_env(riak_core, handoff_timeout, ?TCP_TIMEOUT). end_fold_time(StartFoldTime) -> - EndFoldTime = now(), + EndFoldTime = os:timestamp(), timer:now_diff(EndFoldTime, StartFoldTime) / 1000000. + +%% @private +%% +%% @doc Produce the value of `now/0' as if it were called `S' seconds +%% in the future. +-spec future_now(pos_integer()) -> erlang:timestamp(). +future_now(S) -> + {Megas, Secs, Micros} = os:timestamp(), + {Megas, Secs + S, Micros}. + +%% @private +%% +%% @doc Check if the given timestamp `TS' has elapsed. +-spec is_elapsed(erlang:timestamp()) -> boolean(). +is_elapsed(TS) -> + os:timestamp() >= TS. + +%% @private +%% +%% @doc Increment `Stats' byte count by `NumBytes'. +-spec incr_bytes(ho_stats(), non_neg_integer()) -> NewStats::ho_stats(). +incr_bytes(Stats=#ho_stats{bytes=Bytes}, NumBytes) -> + Stats#ho_stats{bytes=Bytes + NumBytes}. + +%% @private +%% +%% @doc Increment `Stats' object count by 1. +-spec incr_objs(ho_stats()) -> NewStats::ho_stats(). +incr_objs(Stats=#ho_stats{objs=Objs}) -> + Stats#ho_stats{objs=Objs+1}. + +%% @private +%% +%% @doc Check if the interval has elapsed and if so send handoff stats +%% for `ModIdx' to the manager and return a new stats record +%% `NetStats'. +-spec maybe_send_status({module(), non_neg_integer()}, ho_stats()) -> + NewStats::ho_stats(). +maybe_send_status(ModIdx, Stats=#ho_stats{interval_end=IntervalEnd}) -> + case is_elapsed(IntervalEnd) of + true -> + Stats2 = Stats#ho_stats{last_update=os:timestamp()}, + riak_core_handoff_manager:status_update(ModIdx, Stats2), + #ho_stats{interval_end=future_now(get_status_interval())}; + false -> + Stats + end. + +get_status_interval() -> + app_helper:get_env(riak_core, handoff_status_interval, ?STATUS_INTERVAL). diff --git a/src/riak_core_handoff_sender_sup.erl b/src/riak_core_handoff_sender_sup.erl index e4eb7bc0c..4830e9ec4 100644 --- a/src/riak_core_handoff_sender_sup.erl +++ b/src/riak_core_handoff_sender_sup.erl @@ -43,5 +43,6 @@ init ([]) -> ]}}. %% start a sender process -start_sender (TargetNode,Module,Idx,VnodePid) -> - supervisor:start_child(?MODULE,[TargetNode,Module,Idx,VnodePid]). +-spec start_sender(node(), module(), any(), pid()) -> {ok, Sender::pid()}. +start_sender(TargetNode, Module, Idx, Vnode) -> + supervisor:start_child(?MODULE, [TargetNode, Module, Idx, Vnode]). diff --git a/src/riak_core_ring.erl b/src/riak_core_ring.erl index c4a7eb4a4..d349ed124 100644 --- a/src/riak_core_ring.erl +++ b/src/riak_core_ring.erl @@ -91,7 +91,8 @@ ring_changed/2, set_cluster_name/2, reconcile_names/2, - reconcile_members/2]). + reconcile_members/2, + is_primary/2]). -export_type([riak_core_ring/0]). @@ -230,6 +231,13 @@ nearly_equal(RingA, RingB) -> TestRing = (RingA2 =:= RingB2), TestVC and TestRing. +%% @doc Determine if a given Index/Node `IdxNode' combination is a +%% primary. +-spec is_primary(chstate(), {integer(), node()}) -> boolean(). +is_primary(Ring, IdxNode) -> + Owners = all_owners(Ring), + lists:member(IdxNode, Owners). + %% @doc Produce a list of all nodes that are members of the cluster -spec all_members(State :: chstate()) -> [Node :: term()]. all_members(?CHSTATE{members=Members}) -> diff --git a/src/riak_core_status.erl b/src/riak_core_status.erl index ea0823618..5474b838d 100644 --- a/src/riak_core_status.erl +++ b/src/riak_core_status.erl @@ -20,7 +20,10 @@ %% %% ------------------------------------------------------------------- -module(riak_core_status). --export([ringready/0, transfers/0, ring_status/0]). +-export([ringready/0, + all_active_transfers/0, + transfers/0, + ring_status/0]). -spec(ringready() -> {ok, [atom()]} | {error, any()}). ringready() -> @@ -64,6 +67,16 @@ transfers() -> end, {Down, lists:foldl(F, [], Rings)}. +%% @doc Produce status for all active transfers in the cluster. +-spec all_active_transfers() -> {Xfers::list(), Down::list()}. +all_active_transfers() -> + {Xfers, Down} = + riak_core_util:rpc_every_member(riak_core_handoff_manager, + status, + [{direction, outbound}], + 5000), + {Xfers, Down}. + ring_status() -> %% Determine which nodes are reachable as well as what vnode modules %% are running on each node. @@ -140,21 +153,12 @@ rings_match(R1hash, [{N2, R2} | Rest]) -> {false, N2} end. - %% Get a list of active partition numbers - regardless of vnode type active_partitions(Node) -> - lists:foldl(fun({_,P}, Ps) -> + VNodes = gen_server:call({riak_core_vnode_manager, Node}, all_vnodes, 30000), + lists:foldl(fun({_, P, _}, Ps) -> ordsets:add_element(P, Ps) - end, [], running_vnodes(Node)). - -%% Get a list of running vnodes for a node -running_vnodes(Node) -> - Pids = vnode_pids(Node), - [rpc:call(Node, riak_core_vnode, get_mod_index, [Pid], 30000) || Pid <- Pids]. - -%% Get a list of vnode pids for a node -vnode_pids(Node) -> - [Pid || {_,Pid,_,_} <- supervisor:which_children({riak_core_vnode_sup, Node})]. + end, [], VNodes). %% Return a list of active primary partitions, active secondary partitions (to be handed off) %% and stopped partitions that should be started