Skip to content

Commit

Permalink
Merge branch 'rz-handoff-visibility'
Browse files Browse the repository at this point in the history
  • Loading branch information
rzezeski committed Apr 25, 2012
2 parents 72a671a + 577ac9a commit de05a91
Show file tree
Hide file tree
Showing 8 changed files with 337 additions and 60 deletions.
1 change: 1 addition & 0 deletions ebin/riak_core.app
Expand Up @@ -28,6 +28,7 @@
riak_core_coverage_plan,
riak_core_eventhandler_guard,
riak_core_eventhandler_sup,
riak_core_format,
riak_core_gossip,
riak_core_handoff_listener,
riak_core_handoff_listener_sup,
Expand Down
10 changes: 10 additions & 0 deletions include/riak_core_handoff.hrl
Expand Up @@ -3,3 +3,13 @@
-define(PT_MSG_OLDSYNC, 2).
-define(PT_MSG_SYNC, 3).
-define(PT_MSG_CONFIGURE, 4).

-record(ho_stats,
{
interval_end :: erlang:timestamp(),
last_update :: erlang:timestamp(),
objs=0 :: non_neg_integer(),
bytes=0 :: non_neg_integer()
}).

-type ho_stats() :: #ho_stats{}.
79 changes: 79 additions & 0 deletions src/riak_core_format.erl
@@ -0,0 +1,79 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

%% @doc Functions for formatting data.

-module(riak_core_format).
-export([fmt/2,
human_size_fmt/2,
human_time_fmt/2]).

%% @doc Created a string `Str' based on the format string `FmtStr' and
%% list of args `Args'.
-spec fmt(string(), list()) -> Str::string().
fmt(FmtStr, Args) ->
lists:flatten(io_lib:format(FmtStr, Args)).

%% @doc Create a human friendly string `Str' for number of bytes
%% `Bytes' and format based on format string `Fmt'.
-spec human_size_fmt(string(), non_neg_integer()) -> Str::string().
human_size_fmt(Fmt, Bytes) ->
Fmt2 = Fmt ++ " ~s",
{Value, Units} = human_size(Bytes, ["B","KB","MB","GB","TB","PB"]),
fmt(Fmt2, [Value, Units]).

%% @doc Create a human friendly string `Str' for the given time in
%% microseconds `Micros'. Format according to format string
%% `Fmt'.
-spec human_time_fmt(string(), non_neg_integer()) -> Str::string().
human_time_fmt(Fmt, Micros) ->
Fmt2 = Fmt ++ " ~s",
{Value, Units} = human_time(Micros),
fmt(Fmt2, [Value, Units]).

%%%===================================================================
%%% Private
%%%===================================================================

%% @private
%%
%% @doc Formats a byte size into a human-readable size with units.
%% Thanks StackOverflow:
%% http://stackoverflow.com/questions/2163691/simpler-way-to-format-bytesize-in-a-human-readable-way
-spec human_size(non_neg_integer(), list()) -> iolist().
human_size(S, [_|[_|_] = L]) when S >= 1024 -> human_size(S/1024, L);
human_size(S, [M|_]) ->
{float(S), M}.

%% @private
%%
%% @doc Given a number of `Micros' returns a human friendly time
%% duration in the form of `{Value, Units}'.
-spec human_time(non_neg_integer()) -> {Value::number(), Units::string()}.
human_time(Micros) ->
human_time(Micros, {1000, "us"}, [{1000, "ms"}, {1000, "s"}, {60, "min"}, {60, "hr"}, {24, "d"}]).

-spec human_time(non_neg_integer(), {pos_integer(), string()},
[{pos_integer(), string()}]) ->
{number(), string()}.
human_time(T, {Divisor, Unit}, Units) when T < Divisor orelse Units == [] ->
{float(T), Unit};
human_time(T, {Divisor, _}, [Next|Units]) ->
human_time(T / Divisor, Next, Units).
148 changes: 123 additions & 25 deletions src/riak_core_handoff_manager.erl
Expand Up @@ -36,6 +36,8 @@
-export([add_outbound/4,
add_inbound/1,
status/0,
status/1,
status_update/2,
set_concurrency/1,
kill_handoffs/0
]).
Expand All @@ -46,17 +48,20 @@
-include_lib("eunit/include/eunit.hrl").
-endif.

-type mod() :: atom().
-type index() :: integer().
-type modindex() :: {module(), index()}.

-record(handoff_status,
{ modindex :: {mod(),index()},
node :: atom(),
{ modindex :: modindex(),
src_node :: node(),
target_node :: node(),
direction :: inbound | outbound,
transport_pid :: pid(),
timestamp :: tuple(),
status :: any(),
vnode_pid :: pid() | undefined
stats :: dict(),
vnode_pid :: pid() | undefined,
type :: ownership | hinted_handoff
}).

-record(state,
Expand All @@ -67,22 +72,34 @@
%% this can be overridden with riak_core handoff_concurrency
-define(HANDOFF_CONCURRENCY,2).


%%%===================================================================
%%% API
%%%===================================================================

start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

init([]) ->
{ok, #state{excl=ordsets:new(), handoffs=[]}}.

%% handoff_manager API

add_outbound(Module,Idx,Node,VnodePid) ->
gen_server:call(?MODULE,{add_outbound,Module,Idx,Node,VnodePid}).

add_inbound(SSLOpts) ->
gen_server:call(?MODULE,{add_inbound,SSLOpts}).

status() ->
gen_server:call(?MODULE,status).
status(none).

status(Filter) ->
gen_server:call(?MODULE, {status, Filter}).

%% @doc Send status updates `Stats' to the handoff manager for a
%% particular handoff identified by `ModIdx'.
-spec status_update(modindex(), proplists:proplist()) -> ok.
status_update(ModIdx, Stats) ->
gen_server:cast(?MODULE, {status_update, ModIdx, Stats}).

set_concurrency(Limit) ->
gen_server:call(?MODULE,{set_concurrency,Limit}).
Expand All @@ -99,7 +116,10 @@ remove_exclusion(Module, Index) ->
get_exclusions(Module) ->
gen_server:call(?MODULE, {get_exclusions, Module}, infinity).

%% gen_server API

%%%===================================================================
%%% Callbacks
%%%===================================================================

handle_call({get_exclusions, Module}, _From, State=#state{excl=Excl}) ->
Reply = [I || {M, I} <- ordsets:to_list(Excl), M =:= Module],
Expand All @@ -120,10 +140,11 @@ handle_call({add_inbound,SSLOpts},_From,State=#state{handoffs=HS}) ->
Error ->
{reply,Error,State}
end;
handle_call(status,_From,State=#state{handoffs=HS}) ->
Handoffs=[{M,N,D,active,S} ||
#handoff_status{modindex=M,node=N,direction=D,status=S} <- HS],
{reply, Handoffs, State};

handle_call({status, Filter}, _From, State=#state{handoffs=HS}) ->
Status = lists:filter(filter(Filter), [build_status(HO) || HO <- HS]),
{reply, Status, State};

handle_call({set_concurrency,Limit},_From,State=#state{handoffs=HS}) ->
application:set_env(riak_core,handoff_concurrency,Limit),
case Limit < erlang:length(HS) of
Expand All @@ -140,7 +161,6 @@ handle_call({set_concurrency,Limit},_From,State=#state{handoffs=HS}) ->
{reply, ok, State}
end.


handle_cast({del_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) ->
{noreply, State#state{excl=ordsets:del_element({Mod, Idx}, Excl)}};
handle_cast({add_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) ->
Expand All @@ -152,8 +172,19 @@ handle_cast({add_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) ->
_ ->
ok
end,
{noreply, State#state{excl=ordsets:add_element({Mod, Idx}, Excl)}}.
{noreply, State#state{excl=ordsets:add_element({Mod, Idx}, Excl)}};

handle_cast({status_update, ModIdx, StatsUpdate}, State=#state{handoffs=HS}) ->
case lists:keyfind(ModIdx, #handoff_status.modindex, HS) of
false ->
lager:error("status_update for non-existing handoff ~p", [ModIdx]),
{noreply, State};
HO ->
Stats2 = update_stats(StatsUpdate, HO#handoff_status.stats),
HO2 = HO#handoff_status{stats=Stats2},
HS2 = lists:keyreplace(ModIdx, #handoff_status.modindex, HS, HO2),
{noreply, State#state{handoffs=HS2}}
end.

handle_info({'DOWN',_Ref,process,Pid,Reason},State=#state{handoffs=HS}) ->
case lists:keytake(Pid,#handoff_status.transport_pid,HS) of
Expand Down Expand Up @@ -201,9 +232,57 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%
%% @private functions
%%

%%%===================================================================
%%% Private
%%%===================================================================

build_status(HO) ->
#handoff_status{modindex={Mod, Partition},
src_node=SrcNode,
target_node=TargetNode,
direction=Dir,
status=Status,
timestamp=StartTS,
transport_pid=TPid,
stats=Stats,
type=Type}=HO,
{status_v2, [{mod, Mod},
{partition, Partition},
{src_node, SrcNode},
{target_node, TargetNode},
{direction, Dir},
{status, Status},
{start_ts, StartTS},
{sender_pid, TPid},
{stats, calc_stats(Stats, StartTS)},
{type, Type}]}.

calc_stats(Stats, StartTS) ->
case dict:find(last_update, Stats) of
error ->
no_stats;
{ok, LastUpdate} ->
Objs = dict:fetch(objs, Stats),
Bytes = dict:fetch(bytes, Stats),
ElapsedS = timer:now_diff(LastUpdate, StartTS) / 1000000,
ObjsS = round(Objs / ElapsedS),
BytesS = round(Bytes / ElapsedS),
[{objs_total, Objs},
{objs_per_s, ObjsS},
{bytes_per_s, BytesS},
{last_update, LastUpdate}]
end.

filter(none) ->
fun(_) -> true end;
filter({Key, Value}=_Filter) ->
fun({status_v2, Status}) ->
case proplists:get_value(Key, Status) of
Value -> true;
_ -> false
end
end.

get_concurrency_limit () ->
app_helper:get_env(riak_core,handoff_concurrency,?HANDOFF_CONCURRENCY).
Expand All @@ -226,7 +305,7 @@ send_handoff (Mod,Idx,Node,Vnode,HS) ->
case lists:keyfind({Mod,Idx},#handoff_status.modindex,HS) of
false ->
true;
Handoff=#handoff_status{node=Node,vnode_pid=Vnode} ->
Handoff=#handoff_status{target_node=Node,vnode_pid=Vnode} ->
{false,Handoff};
#handoff_status{transport_pid=Sender} ->
%% found a running handoff with a different vnode
Expand All @@ -238,6 +317,13 @@ send_handoff (Mod,Idx,Node,Vnode,HS) ->

case ShouldHandoff of
true ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
%% assumes local node is doing the sending
Primary = riak_core_ring:is_primary(Ring, {Idx, node()}),
HOType = if Primary -> ownership_handoff;
true -> hinted_handoff
end,

%% start the sender process
{ok,Pid}=riak_core_handoff_sender_sup:start_sender(Node,
Mod,
Expand All @@ -249,10 +335,13 @@ send_handoff (Mod,Idx,Node,Vnode,HS) ->
{ok, #handoff_status{ transport_pid=Pid,
direction=outbound,
timestamp=now(),
node=Node,
src_node=node(),
target_node=Node,
modindex={Mod,Idx},
vnode_pid=Vnode,
status=[]
status=[],
stats=dict:new(),
type=HOType
}
};

Expand All @@ -276,15 +365,24 @@ receive_handoff (SSLOpts) ->
direction=inbound,
timestamp=now(),
modindex={undefined,undefined},
node=undefined,
status=[]
src_node=undefined,
target_node=undefined,
status=[],
stats=dict:new()
}
}
end.

%%
%% EUNIT tests...
%%
update_stats(StatsUpdate, Stats) ->
#ho_stats{last_update=LU, objs=Objs, bytes=Bytes}=StatsUpdate,
Stats2 = dict:update_counter(objs, Objs, Stats),
Stats3 = dict:update_counter(bytes, Bytes, Stats2),
dict:store(last_update, LU, Stats3).


%%%===================================================================
%%% Tests
%%%===================================================================

-ifdef (TEST_BROKEN_AZ_TICKET_1042).

Expand Down

0 comments on commit de05a91

Please sign in to comment.