Permalink
Browse files

Add visibility into transfers/handoff

Purpose
----------

Visibility into handoff is really poor.  The typical method used to
discover handoff information is `riak-admin transfers` but that gives
hardly any useful information, as shown below.

    'dev3@127.0.0.1' waiting to handoff 7 partitions
    'dev2@127.0.0.1' waiting to handoff 4 partitions
    'dev1@127.0.0.1' waiting to handoff 5 partitions

This PR adds visibility transfers/handoff by tracking various stats on
active transfers and displaying this information in a human friendly
way, as shown below.

    ./dev/dev1/bin/riak-admin transfers
    'dev3@127.0.0.1' waiting to handoff 6 partitions
    'dev2@127.0.0.1' waiting to handoff 4 partitions
    'dev1@127.0.0.1' waiting to handoff 6 partitions

    Active Transfers:

    transfer type: ownership_handoff
    vnode type: riak_kv_vnode
    partition: 365375409332725729550921208179070754913983135744
    started: 2012-04-24 18:43:44 [5.96 s ago]
    last update: 2012-04-24 18:43:48 [1.91 s ago]
    objects transferred: 8651

                           2135 Objs/s
      dev3@127.0.0.1 =======================>   dev1@127.0.0.1
                            17.62 MB/s

This PR also gets rid of the annoying side effect of resetting the
inactivity timeout when calling `riak-admin transfers`.  This would
often cause users to wonder why handoffs were never occurring.

Implementation Details
----------

One issue with handoff is that it uses vnode folds to do all it's
work.  This has the one nice benefit that it avoids a local copy of
data (1) but has bad side effect of using uninterruptable fold.  That
is, the vnode fold does the work as fast as it can and doesn't stop
until it's done (2).

In order to get status updates about the handoff the accumulator keeps
some local stats and _approximately_ every 2 seconds sends those stats
to the handoff manager via the `status_update/2` API.  I say the
timing is approximate because expiration of the interval is only
checked during a sender/receiver sync phase (determined by
`ACK_COUNT`).  If the receiver can't keep up or the sender fold is
slow then the status updates could take longer.  Essentially, this
code assumes that `ACK_COUNT` objects can be transferred in less than
2s.  **N.B.** The duration of the status update interval will not
invalidate the stats since they are based on start time and time of
last sync (see `riak_core_handoff_manager:update_stats/2`).

The reason the sender only sends a status update every 2s and only
checks if this interval has expired on sender/receiver sync is because
the vnode fold is a tight loop.  Sending an update for every object
would be too chatty and checking the interval every object could
potentially slow from overhead of getting time and doing math.

There are two types of transfer currently, _ownership handoff_ and
_hinted handoff_.  Soon there will be another type, _repair_.  In
order to disambiguate the two types of handoff I have to determine if
the source vnode is primary or not.  In the case of ownership handoff
it is a `primary -> secondary` handoff (where the secondary becomes
primary after handoff completes) and for hinted handoff it's
`secondary -> primary`.

In order to make the stats a little easier to read I added a little
human friendly formatting.  I decided to put the code to support this
in Core rather than KV.  I stole and modified the code from
@seancribbs.

One aspect of this PR I'm not wild about is the fact that in order to
get the status a msg must be sent to each handoff manager on each each
node for every time `riak-admin transfers` is called (3).  I'd rather
see a push system where all active status data is collated at a
particular node, like the claimant node in ownership, and the status
call simply reads that.  The pull system is probably fine for now but
could cause trouble on larger clusters, especially if some script
accidentally calls it in a tight loop.

I'm wondering if I should have make use of the stats API for the
collection of data in the handoff manager rather than a dict?

Footnotes
----------

1: That is, if the handoff sender process itself was running the
handoff then the vnode data would have to be copied from vnode heap to
sender heap.

2: In the future I think an iterator/cursor based approach to handoff
that is async, interruptable, and rate limited would be good.

3: Which calls `riak_core_status:all_active_transfers` where the RPC
is done.
  • Loading branch information...
1 parent 72a671a commit 577ac9aa3e2db1ea89193246fbefb16f2a2c2665 @rzezeski rzezeski committed Apr 18, 2012
View
@@ -28,6 +28,7 @@
riak_core_coverage_plan,
riak_core_eventhandler_guard,
riak_core_eventhandler_sup,
+ riak_core_format,
riak_core_gossip,
riak_core_handoff_listener,
riak_core_handoff_listener_sup,
@@ -3,3 +3,13 @@
-define(PT_MSG_OLDSYNC, 2).
-define(PT_MSG_SYNC, 3).
-define(PT_MSG_CONFIGURE, 4).
+
+-record(ho_stats,
+ {
+ interval_end :: erlang:timestamp(),
+ last_update :: erlang:timestamp(),
+ objs=0 :: non_neg_integer(),
+ bytes=0 :: non_neg_integer()
+ }).
+
+-type ho_stats() :: #ho_stats{}.
View
@@ -0,0 +1,79 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc Functions for formatting data.
+
+-module(riak_core_format).
+-export([fmt/2,
+ human_size_fmt/2,
+ human_time_fmt/2]).
+
+%% @doc Created a string `Str' based on the format string `FmtStr' and
+%% list of args `Args'.
+-spec fmt(string(), list()) -> Str::string().
+fmt(FmtStr, Args) ->
+ lists:flatten(io_lib:format(FmtStr, Args)).
+
+%% @doc Create a human friendly string `Str' for number of bytes
+%% `Bytes' and format based on format string `Fmt'.
+-spec human_size_fmt(string(), non_neg_integer()) -> Str::string().
+human_size_fmt(Fmt, Bytes) ->
+ Fmt2 = Fmt ++ " ~s",
+ {Value, Units} = human_size(Bytes, ["B","KB","MB","GB","TB","PB"]),
+ fmt(Fmt2, [Value, Units]).
+
+%% @doc Create a human friendly string `Str' for the given time in
+%% microseconds `Micros'. Format according to format string
+%% `Fmt'.
+-spec human_time_fmt(string(), non_neg_integer()) -> Str::string().
+human_time_fmt(Fmt, Micros) ->
+ Fmt2 = Fmt ++ " ~s",
+ {Value, Units} = human_time(Micros),
+ fmt(Fmt2, [Value, Units]).
+
+%%%===================================================================
+%%% Private
+%%%===================================================================
+
+%% @private
+%%
+%% @doc Formats a byte size into a human-readable size with units.
+%% Thanks StackOverflow:
+%% http://stackoverflow.com/questions/2163691/simpler-way-to-format-bytesize-in-a-human-readable-way
+-spec human_size(non_neg_integer(), list()) -> iolist().
+human_size(S, [_|[_|_] = L]) when S >= 1024 -> human_size(S/1024, L);
+human_size(S, [M|_]) ->
+ {float(S), M}.
+
+%% @private
+%%
+%% @doc Given a number of `Micros' returns a human friendly time
+%% duration in the form of `{Value, Units}'.
+-spec human_time(non_neg_integer()) -> {Value::number(), Units::string()}.
+human_time(Micros) ->
+ human_time(Micros, {1000, "us"}, [{1000, "ms"}, {1000, "s"}, {60, "min"}, {60, "hr"}, {24, "d"}]).
+
+-spec human_time(non_neg_integer(), {pos_integer(), string()},
+ [{pos_integer(), string()}]) ->
+ {number(), string()}.
+human_time(T, {Divisor, Unit}, Units) when T < Divisor orelse Units == [] ->
+ {float(T), Unit};
+human_time(T, {Divisor, _}, [Next|Units]) ->
+ human_time(T / Divisor, Next, Units).
@@ -36,6 +36,8 @@
-export([add_outbound/4,
add_inbound/1,
status/0,
+ status/1,
+ status_update/2,
set_concurrency/1,
kill_handoffs/0
]).
@@ -46,17 +48,20 @@
-include_lib("eunit/include/eunit.hrl").
-endif.
--type mod() :: atom().
-type index() :: integer().
+-type modindex() :: {module(), index()}.
-record(handoff_status,
- { modindex :: {mod(),index()},
- node :: atom(),
+ { modindex :: modindex(),
+ src_node :: node(),
+ target_node :: node(),
direction :: inbound | outbound,
transport_pid :: pid(),
timestamp :: tuple(),
status :: any(),
- vnode_pid :: pid() | undefined
+ stats :: dict(),
+ vnode_pid :: pid() | undefined,
+ type :: ownership | hinted_handoff
}).
-record(state,
@@ -67,22 +72,34 @@
%% this can be overridden with riak_core handoff_concurrency
-define(HANDOFF_CONCURRENCY,2).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
{ok, #state{excl=ordsets:new(), handoffs=[]}}.
-%% handoff_manager API
-
add_outbound(Module,Idx,Node,VnodePid) ->
gen_server:call(?MODULE,{add_outbound,Module,Idx,Node,VnodePid}).
add_inbound(SSLOpts) ->
gen_server:call(?MODULE,{add_inbound,SSLOpts}).
status() ->
- gen_server:call(?MODULE,status).
+ status(none).
+
+status(Filter) ->
+ gen_server:call(?MODULE, {status, Filter}).
+
+%% @doc Send status updates `Stats' to the handoff manager for a
+%% particular handoff identified by `ModIdx'.
+-spec status_update(modindex(), proplists:proplist()) -> ok.
+status_update(ModIdx, Stats) ->
+ gen_server:cast(?MODULE, {status_update, ModIdx, Stats}).
set_concurrency(Limit) ->
gen_server:call(?MODULE,{set_concurrency,Limit}).
@@ -99,7 +116,10 @@ remove_exclusion(Module, Index) ->
get_exclusions(Module) ->
gen_server:call(?MODULE, {get_exclusions, Module}, infinity).
-%% gen_server API
+
+%%%===================================================================
+%%% Callbacks
+%%%===================================================================
handle_call({get_exclusions, Module}, _From, State=#state{excl=Excl}) ->
Reply = [I || {M, I} <- ordsets:to_list(Excl), M =:= Module],
@@ -120,10 +140,11 @@ handle_call({add_inbound,SSLOpts},_From,State=#state{handoffs=HS}) ->
Error ->
{reply,Error,State}
end;
-handle_call(status,_From,State=#state{handoffs=HS}) ->
- Handoffs=[{M,N,D,active,S} ||
- #handoff_status{modindex=M,node=N,direction=D,status=S} <- HS],
- {reply, Handoffs, State};
+
+handle_call({status, Filter}, _From, State=#state{handoffs=HS}) ->
+ Status = lists:filter(filter(Filter), [build_status(HO) || HO <- HS]),
+ {reply, Status, State};
+
handle_call({set_concurrency,Limit},_From,State=#state{handoffs=HS}) ->
application:set_env(riak_core,handoff_concurrency,Limit),
case Limit < erlang:length(HS) of
@@ -140,7 +161,6 @@ handle_call({set_concurrency,Limit},_From,State=#state{handoffs=HS}) ->
{reply, ok, State}
end.
-
handle_cast({del_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) ->
{noreply, State#state{excl=ordsets:del_element({Mod, Idx}, Excl)}};
handle_cast({add_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) ->
@@ -152,8 +172,19 @@ handle_cast({add_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) ->
_ ->
ok
end,
- {noreply, State#state{excl=ordsets:add_element({Mod, Idx}, Excl)}}.
+ {noreply, State#state{excl=ordsets:add_element({Mod, Idx}, Excl)}};
+handle_cast({status_update, ModIdx, StatsUpdate}, State=#state{handoffs=HS}) ->
+ case lists:keyfind(ModIdx, #handoff_status.modindex, HS) of
+ false ->
+ lager:error("status_update for non-existing handoff ~p", [ModIdx]),
+ {noreply, State};
+ HO ->
+ Stats2 = update_stats(StatsUpdate, HO#handoff_status.stats),
+ HO2 = HO#handoff_status{stats=Stats2},
+ HS2 = lists:keyreplace(ModIdx, #handoff_status.modindex, HS, HO2),
+ {noreply, State#state{handoffs=HS2}}
+ end.
handle_info({'DOWN',_Ref,process,Pid,Reason},State=#state{handoffs=HS}) ->
case lists:keytake(Pid,#handoff_status.transport_pid,HS) of
@@ -201,9 +232,57 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-%%
-%% @private functions
-%%
+
+%%%===================================================================
+%%% Private
+%%%===================================================================
+
+build_status(HO) ->
+ #handoff_status{modindex={Mod, Partition},
+ src_node=SrcNode,
+ target_node=TargetNode,
+ direction=Dir,
+ status=Status,
+ timestamp=StartTS,
+ transport_pid=TPid,
+ stats=Stats,
+ type=Type}=HO,
+ {status_v2, [{mod, Mod},
+ {partition, Partition},
+ {src_node, SrcNode},
+ {target_node, TargetNode},
+ {direction, Dir},
+ {status, Status},
+ {start_ts, StartTS},
+ {sender_pid, TPid},
+ {stats, calc_stats(Stats, StartTS)},
+ {type, Type}]}.
+
+calc_stats(Stats, StartTS) ->
+ case dict:find(last_update, Stats) of
+ error ->
+ no_stats;
+ {ok, LastUpdate} ->
+ Objs = dict:fetch(objs, Stats),
+ Bytes = dict:fetch(bytes, Stats),
+ ElapsedS = timer:now_diff(LastUpdate, StartTS) / 1000000,
+ ObjsS = round(Objs / ElapsedS),
+ BytesS = round(Bytes / ElapsedS),
+ [{objs_total, Objs},
+ {objs_per_s, ObjsS},
+ {bytes_per_s, BytesS},
+ {last_update, LastUpdate}]
+ end.
+
+filter(none) ->
+ fun(_) -> true end;
+filter({Key, Value}=_Filter) ->
+ fun({status_v2, Status}) ->
+ case proplists:get_value(Key, Status) of
+ Value -> true;
+ _ -> false
+ end
+ end.
get_concurrency_limit () ->
app_helper:get_env(riak_core,handoff_concurrency,?HANDOFF_CONCURRENCY).
@@ -226,7 +305,7 @@ send_handoff (Mod,Idx,Node,Vnode,HS) ->
case lists:keyfind({Mod,Idx},#handoff_status.modindex,HS) of
false ->
true;
- Handoff=#handoff_status{node=Node,vnode_pid=Vnode} ->
+ Handoff=#handoff_status{target_node=Node,vnode_pid=Vnode} ->
{false,Handoff};
#handoff_status{transport_pid=Sender} ->
%% found a running handoff with a different vnode
@@ -238,6 +317,13 @@ send_handoff (Mod,Idx,Node,Vnode,HS) ->
case ShouldHandoff of
true ->
+ {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ %% assumes local node is doing the sending
+ Primary = riak_core_ring:is_primary(Ring, {Idx, node()}),
+ HOType = if Primary -> ownership_handoff;
+ true -> hinted_handoff
+ end,
+
%% start the sender process
{ok,Pid}=riak_core_handoff_sender_sup:start_sender(Node,
Mod,
@@ -249,10 +335,13 @@ send_handoff (Mod,Idx,Node,Vnode,HS) ->
{ok, #handoff_status{ transport_pid=Pid,
direction=outbound,
timestamp=now(),
- node=Node,
+ src_node=node(),
+ target_node=Node,
modindex={Mod,Idx},
vnode_pid=Vnode,
- status=[]
+ status=[],
+ stats=dict:new(),
+ type=HOType
}
};
@@ -276,15 +365,24 @@ receive_handoff (SSLOpts) ->
direction=inbound,
timestamp=now(),
modindex={undefined,undefined},
- node=undefined,
- status=[]
+ src_node=undefined,
+ target_node=undefined,
+ status=[],
+ stats=dict:new()
}
}
end.
-%%
-%% EUNIT tests...
-%%
+update_stats(StatsUpdate, Stats) ->
+ #ho_stats{last_update=LU, objs=Objs, bytes=Bytes}=StatsUpdate,
+ Stats2 = dict:update_counter(objs, Objs, Stats),
+ Stats3 = dict:update_counter(bytes, Bytes, Stats2),
+ dict:store(last_update, LU, Stats3).
+
+
+%%%===================================================================
+%%% Tests
+%%%===================================================================
-ifdef (TEST_BROKEN_AZ_TICKET_1042).
Oops, something went wrong.

0 comments on commit 577ac9a

Please sign in to comment.