Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'master' into pevm-coverage-timeout-refactor

Conflicts:
	src/riak_core_coverage_fsm.erl
  • Loading branch information...
commit 52a15dd89b43151f9686d6cf250e8a93e9310277 2 parents 1fe4059 + 0c8dd97
@evanmcc evanmcc authored
View
10 contrib/Makefile
@@ -0,0 +1,10 @@
+
+
+INC=-I $(CURDIR)/../deps/riak_core/include/ -I $(CURDIR)/../include/
+ERLC=erlc
+
+BUILD=$(ERLC) $(INC)
+
+all:
+ @echo $(CURDIR)
+ $(BUILD) handoff_perftool.erl
View
248 contrib/handoff_perftool.erl
@@ -0,0 +1,248 @@
+%%
+%% Helper program for testing and measuring handoff performance.
+%%
+%% Usage example (in the Erlang shell):
+%%
+%% code:add_path("/home/user/path-to-handoff_perftool"), l(handoff_perftool).
+%% handoff_perftool:go({10000, 1000}). % use 10000 objects of 1000 bytes each
+%% handoff_perftool:go(5, {10000, 1000}). % use 5 vnodes and 10000 objects of 1000 bytes each
+%% handoff_perftool:go(1, {10000, 1000}, strategy_roundrobin). % 1 vnode, 10k objects of 1k each, round-robin strategy
+%%
+
+-module(handoff_perftool).
+
+-export([
+ %% If we get any more options, we should use something associative rather than adding more options:
+ go/0, go/1, go/2, go/3,
+
+ get_ring_members/0,
+ get_ring_owners/0
+ ]).
+
+-include("riak_core_vnode.hrl").
+-include("riak_core_handoff.hrl").
+
+-ifndef(MD_VTAG).
+ -define(MD_VTAG, <<"X-Riak-VTag">>).
+-endif.
+-ifndef(MD_LASTMOD).
+ -define(MD_LASTMOD, <<"X-Riak-Last-Modified">>).
+-endif.
+
+-define(HARNESS, (rt:config(rt_harness))).
+
+%% JFW: hack until we can get this to play nicely with rebar:
+log_info(Message) -> lager:log(info, self(), Message).
+log_info(Message, Params) -> lager:log(info, self(), Message, Params).
+
+go() ->
+ go(1, {10000, 1000}).
+
+go({NObjs, ValueSize}) ->
+ go(1, {NObjs, ValueSize}).
+
+go(NVnodes, {NObjs, ValueSize}) ->
+ go(NVnodes, { NObjs, ValueSize }, strategy_other_owner).
+
+go(NVnodes, {NObjs, ValueSize}, GatherStrategy) ->
+ go(NVnodes, {NObjs, ValueSize}, GatherStrategy, use_existing_concurrency).
+
+go(NVnodes, {NObjs, ValueSize}, GatherStrategy, ConcurrencyN) ->
+
+ Targets = gather_targets(NVnodes, GatherStrategy),
+
+ log_info("Seeding ~p objects of size ~p to ~p nodes by strategy ~p...~n", [NObjs, ValueSize, NVnodes, GatherStrategy]),
+ lists:map(fun(Target) -> seed_data({NObjs, ValueSize}, Target) end, Targets),
+ log_info("Done seeding.~n"),
+
+ OldConcurrencyN = set_handoff_concurrency(ConcurrencyN),
+
+ log_info("Forcing handoff.~n"),
+ riak_core_vnode_manager:force_handoffs(),
+ log_info("Done forcing handoff.~n"),
+
+ %% Be a friendly citizen and restore the original concurrency settings:
+ set_handoff_concurrency(OldConcurrencyN),
+
+ true.
+
+%%%%%%%%%%
+
+%%
+%% Different ways of gathering target vnodes for handoff:
+%%
+
+gather_targets(NVnodes, GatherStrategy) ->
+ log_info("Using gather strategy ~p.~n", [GatherStrategy]),
+ case GatherStrategy of
+ strategy_other_owner -> gather_vnodes_1(NVnodes);
+ strategy_roundrobin -> gather_vnodes_rr(NVnodes);
+ _ -> log_info("Invalid gather strategy " ++ GatherStrategy),
+ erlang:throw(invalid_gather_strategy)
+ end.
+
+%% Construct a list of target vnodes such that we select from vnodes that we don't own:
+gather_vnodes_1(NVnodes) ->
+ Secondaries = get_secondaries(),
+
+ case length(Secondaries) >= NVnodes of
+ false -> log_info("Insufficent vnodes for requested test (have ~p secondaries, require ~p)", [length(Secondaries), NVnodes]),
+ erlang:throw(insufficient_vnodes);
+ true -> true
+ end,
+
+ %% Select the requested number of secondaries from the whole list:
+ lists:sublist(get_secondaries(), NVnodes).
+
+%% Construct a list of target vnodes such that we select a total of N vnodes from different nodes,
+%% round-robin fashion.
+%% Note: This algorithm is surely inefficient, but N is expected to be small.
+gather_vnodes_rr(NVnodes) ->
+
+ %% Map owners to their vnode ids (not including ourselves):
+ HandoffMap = dict:erase(node(), lists:foldl(fun({NodeID, NodeName}, AccDict) ->
+ dict:append(NodeName, NodeID, AccDict)
+ end,
+ dict:new(), get_ring_owners())),
+
+ HandoffMembers = lists:dropwhile(fun(NodeName) -> node() == NodeName end, get_ring_members()),
+
+ %% Find the smallest set in the group:
+ {_MinKey, MinLen} = dict:fold(fun shortest_bucket/3, { undef, infinity }, HandoffMap),
+
+ case MinLen < NVnodes of
+ false -> ok;
+ true -> log_info("Requested more vnodes than available in smallest target"),
+ erlang:throw(requested_too_many_vnodes)
+ end,
+
+ merge_values(MinLen, HandoffMembers, HandoffMap, []).
+
+%%
+%% Selection utilities:
+%%
+
+%% Select secondary handoff vnodes (ones we don't own):
+get_secondaries() ->
+ get_secondaries(get_ring_owners(), node()).
+get_secondaries(RingOwners, Node) ->
+ [Index || {Index, RingOwner} <- RingOwners, RingOwner =/= Node].
+
+get_ring_members() ->
+ { ok, Ring } = riak_core_ring_manager:get_raw_ring(),
+ riak_core_ring:all_members(Ring).
+
+get_ring_owners() ->
+ { ok, Ring } = riak_core_ring_manager:get_raw_ring(),
+ riak_core_ring:all_owners(Ring).
+
+%%
+%% Data object utilities:
+%%
+
+%% Construct test handoff objects and send them to the requested vnode:
+seed_data({0, _Size}, _SecondarySHA1) ->
+ ok;
+seed_data({NEntries, Size}, SecondarySHA1) ->
+
+ RObj = finalize_object(riak_object:new(<<"test_bucket">>,
+ <<NEntries:64/integer>>,
+ %% <<NEntries:64/integer>>)),
+ random_binary(Size, <<>>))),
+
+ riak_kv_vnode:local_put(SecondarySHA1, RObj),
+
+ seed_data({NEntries - 1, Size}, SecondarySHA1).
+
+%% Construct a random binary object:
+random_binary(0, Bin) ->
+ Bin;
+random_binary(N, Bin) ->
+ X = random:uniform(255),
+ random_binary(N-1, <<Bin/binary, X:8/integer>>).
+
+%% Directly "inject" a object w/ metadata, vtags, etc.:
+finalize_object(RObj) ->
+ MD0 = case dict:find(clean, riak_object:get_update_metadata(RObj)) of
+ {ok, true} ->
+ %% There have been no changes to updatemetadata. If we stash the
+ %% last modified in this dict, it will cause us to lose existing
+ %% metadata (bz://508). If there is only one instance of metadata,
+ %% we can safely update that one, but in the case of multiple siblings,
+ %% it's hard to know which one to use. In that situation, use the update
+ %% metadata as is.
+ case riak_object:get_metadatas(RObj) of
+ [MD] ->
+ MD;
+ _ ->
+ riak_object:get_update_metadata(RObj)
+ end;
+ _ ->
+ riak_object:get_update_metadata(RObj)
+ end,
+ Now = os:timestamp(),
+ NewMD = dict:store(?MD_VTAG, make_vtag(Now),
+ dict:store(?MD_LASTMOD, Now, MD0)),
+ riak_object:apply_updates(riak_object:update_metadata(RObj, NewMD)).
+
+make_vtag(Now) ->
+ <<HashAsNum:128/integer>> = crypto:md5(term_to_binary({node(), Now})),
+ riak_core_util:integer_to_list(HashAsNum,62).
+
+%%
+%% Other handoff-helper functions:
+%%
+
+%% Fiddle with the cluster's handoff concurrency:
+set_handoff_concurrency(ConcurrencyN) when is_integer(ConcurrencyN) ->
+ OriginalConcurrencyN = get_handoff_concurrency(),
+ log_info("Prior concurrency setting ~p, setting to ~p.~n", [OriginalConcurrencyN, ConcurrencyN]),
+ rpc:multicall(riak_core_handoff_manager, set_concurrency, [ConcurrencyN]),
+ log_info("Done setting concurrency.~n"),
+ OriginalConcurrencyN;
+
+set_handoff_concurrency(ConcurrencySettings) when is_list(ConcurrencySettings) ->
+ log_info("Restoring concurrency settings to ~p.~n", [ConcurrencySettings]),
+ rpc:multicall(riak_core_handoff_manager, set_concurrency, ConcurrencySettings),
+ log_info("Done restoring concurrency settings.~n");
+
+set_handoff_concurrency(use_existing_concurrency) ->
+ use_existing_concurrency.
+
+get_handoff_concurrency() ->
+ rpc:multicall(riak_core_handoff_manager, get_concurrency, []).
+
+%%
+%% General helper functions:
+%%
+
+%% N-way merge:
+merge_values(0, _SourceKeys, _SourceMap, Acc) ->
+ Acc;
+
+merge_values(N, SourceKeys, SourceMap, Acc) ->
+ { OutputAcc, OutputSourceMap } =
+ lists:foldl(fun(Key, { InnerAcc, InnerSourceMap }) ->
+ { Value, NewSourceMap } = pop_value(Key, InnerSourceMap),
+ { lists:append(InnerAcc, Value), NewSourceMap }
+ end,
+ { Acc, SourceMap },
+ SourceKeys),
+ merge_values(N - 1, SourceKeys, OutputSourceMap, OutputAcc).
+
+%% Collect the first value for a given key, then return the value and the mutated map:
+pop_value(Key, SourceMap) ->
+ Values = dict:fetch(Key, SourceMap),
+ { Value, NewValues } = lists:split(1, Values),
+ NewSourceMap = dict:store(Key, NewValues, SourceMap),
+ { Value, NewSourceMap }.
+
+%% Find the key and shortest length of "buckets" in a map of lists:
+shortest_bucket(Key, ValueList, { _, infinity}) ->
+ { Key, length(ValueList) };
+shortest_bucket(Key, ValueList, { MinKey, MinLen }) ->
+ L = length(ValueList),
+ case L < MinLen of
+ false -> { MinKey, MinLen };
+ true -> { Key, L }
+ end.
View
1  ebin/riak_core.app
@@ -53,6 +53,7 @@
riak_core_ring_handler,
riak_core_ring_manager,
riak_core_ring_util,
+ riak_core_send_msg,
riak_core_stat,
riak_core_stat_cache,
riak_core_stat_calc_proc,
View
4 include/riak_core_handoff.hrl
@@ -3,6 +3,7 @@
-define(PT_MSG_OLDSYNC, 2).
-define(PT_MSG_SYNC, 3).
-define(PT_MSG_CONFIGURE, 4).
+-define(PT_MSG_BATCH, 5).
-record(ho_stats,
{
@@ -34,6 +35,7 @@
vnode_mon :: reference(),
type :: ho_type(),
req_origin :: node(),
- filter_mod_fun :: {module(), atom()}
+ filter_mod_fun :: {module(), atom()},
+ size :: {non_neg_integer(), bytes | objects}
}).
-type handoff_status() :: #handoff_status{}.
View
4 rebar.config
@@ -4,7 +4,7 @@
{edoc_opts, [{preprocess, true}]}.
{deps, [
- {lager, ".*", {git, "git://github.com/basho/lager", {tag, "1.2.2"}}},
+ {lager, "2.0.0", {git, "git://github.com/basho/lager", {tag, "2.0.0"}}},
{poolboy, ".*", {git, "git://github.com/basho/poolboy", {branch, "master"}}},
{protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs",
{branch, "master"}}},
@@ -12,7 +12,7 @@
{riak_sysmon, ".*", {git, "git://github.com/basho/riak_sysmon", {branch, "master"}}},
{webmachine, ".*", {git, "git://github.com/basho/webmachine",
{tag, "64176ef9b"}}},
- {folsom, ".*", {git, "git://github.com/boundary/folsom.git", {branch, "master"}}},
+ {folsom, ".*", {git, "git://github.com/basho/folsom.git", {tag, "0.7.4p1"}}},
{ranch, "0.4.0-p1", {git, "git://github.com/basho/ranch.git", {tag, "0.4.0-p1"}}}
]}.
View
1  src/riak_core_app.erl
@@ -78,7 +78,6 @@ start(_StartType, _StartArgs) ->
riak_core:register(riak_core, [{stat_mod, riak_core_stat}]),
ok = riak_core_ring_events:add_guarded_handler(riak_core_ring_handler, []),
- %% Register capabilities
riak_core_capability:register({riak_core, vnode_routing},
[proxy, legacy],
legacy,
View
13 src/riak_core_connection_mgr_stats.erl
@@ -126,7 +126,18 @@ format_stat({{?APP, StatName, Addr, ProtocolId, total},N}) when is_atom(Protocol
format_stat({{?APP, StatName, Addr, ProtocolId},[{count,N},{one,_W}]}) when is_atom(ProtocolId) ->
{string_of_ipaddr(Addr)
++ "_" ++ atom_to_list(ProtocolId)
- ++ "_" ++ atom_to_list(StatName), N}.
+ ++ "_" ++ atom_to_list(StatName), N};
+format_stat({riak_conn_mgr_stats_stat_ts, S}) ->
+ UnivTime = riak_core_format:epoch_to_datetime(S),
+ {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:universal_time_to_local_time(UnivTime),
+ Fmt = riak_core_format:fmt("~4..0B-~2..0B-~2..0B ~2..0B:~2..0B:~2..0B",
+ [Year, Month, Day, Hour, Min, Sec]),
+ {"riak_conn_mgr_stats_stat_ts", Fmt};
+format_stat({StateName, N}) ->
+ {atom_to_list(StateName), N};
+format_stat(_) ->
+ [].
+
string_of_ipaddr({IP, Port}) when is_list(IP) ->
lists:flatten(io_lib:format("~s:~p", [IP, Port]));
View
202 src/riak_core_console.erl
@@ -23,7 +23,7 @@
stage_leave/1, stage_remove/1, stage_replace/1,
stage_force_replace/1, print_staged/1, commit_staged/1,
clear_staged/1, transfer_limit/1, pending_claim_percentage/2,
- pending_nodes_and_claim_percentages/1]).
+ pending_nodes_and_claim_percentages/1, transfers/1]).
%% @doc Return list of nodes, current and future claim.
pending_nodes_and_claim_percentages(Ring) ->
@@ -193,6 +193,206 @@ unreachable_status(Down) ->
"force-remove NODE) to allow the remaining nodes to settle.~n"),
ok.
+%% Provide a list of nodes with pending partition transfers (i.e. any secondary vnodes)
+%% and list any owned vnodes that are *not* running
+-spec(transfers([string()]) -> ok).
+transfers([]) ->
+ try
+ {DownNodes, Pending} = riak_core_status:transfers(),
+ case DownNodes of
+ [] -> ok;
+ _ -> io:format("Nodes ~p are currently down.\n", [DownNodes])
+ end,
+ F = fun({waiting_to_handoff, Node, Count}, Acc) ->
+ io:format("~p waiting to handoff ~p partitions\n", [Node, Count]),
+ Acc + 1;
+ ({stopped, Node, Count}, Acc) ->
+ io:format("~p does not have ~p primary partitions running\n", [Node, Count]),
+ Acc + 1
+ end,
+ case lists:foldl(F, 0, Pending) of
+ 0 ->
+ io:format("No transfers active\n"),
+ ok;
+ _ ->
+ error
+ end
+ catch
+ Exception:Reason ->
+ lager:error("Transfers failed ~p:~p", [Exception,
+ Reason]),
+ io:format("Transfers failed, see log for details~n"),
+ error
+ end,
+
+ %% Now display active transfers
+ {Xfers, Down} = riak_core_status:all_active_transfers(),
+
+ DisplayXfer =
+ fun({{Mod, Partition}, Node, outbound, active, _Status}) ->
+ print_v1_status(Mod, Partition, Node);
+
+ ({status_v2, Status}) ->
+ %% Display base status
+ Type = proplists:get_value(type, Status),
+ Mod = proplists:get_value(mod, Status),
+ SrcPartition = proplists:get_value(src_partition, Status),
+ TargetPartition = proplists:get_value(target_partition, Status),
+ StartTS = proplists:get_value(start_ts, Status),
+ SrcNode = proplists:get_value(src_node, Status),
+ TargetNode = proplists:get_value(target_node, Status),
+
+ print_v2_status(Type, Mod, {SrcPartition, TargetPartition}, StartTS),
+
+ %% Get info about stats if there is any yet
+ Stats = proplists:get_value(stats, Status),
+
+ print_stats(SrcNode, TargetNode, Stats),
+ io:format("~n");
+
+ (_) ->
+ ignore
+ end,
+ DisplayDown =
+ fun(Node) ->
+ io:format("Node ~p could not be contacted~n", [Node])
+ end,
+
+ io:format("~nActive Transfers:~n~n", []),
+ [DisplayXfer(Xfer) || Xfer <- lists:flatten(Xfers)],
+
+ io:format("~n"),
+ [DisplayDown(Node) || Node <- Down],
+ ok.
+
+print_v2_status(Type, Mod, {SrcPartition, TargetPartition}, StartTS) ->
+ StartTSStr = datetime_str(StartTS),
+ Running = timer:now_diff(os:timestamp(), StartTS),
+ RunningStr = riak_core_format:human_time_fmt("~.2f", Running),
+
+ io:format("transfer type: ~s~n", [Type]),
+ io:format("vnode type: ~p~n", [Mod]),
+ case Type of
+ repair ->
+ io:format("source partition: ~p~n", [SrcPartition]),
+ io:format("target partition: ~p~n", [TargetPartition]);
+ _ ->
+ io:format("partition: ~p~n", [TargetPartition])
+ end,
+ io:format("started: ~s [~s ago]~n", [StartTSStr, RunningStr]).
+
+print_v1_status(Mod, Partition, Node) ->
+ io:format("vnode type: ~p~n", [Mod]),
+ io:format("partition: ~p~n", [Partition]),
+ io:format("target node: ~p~n~n", [Node]).
+
+print_stats(SrcNode, TargetNode, no_stats) ->
+ io:format("last update: no updates seen~n"),
+ print_size(undefined),
+ io:format("objects transferred: unknown~n~n"),
+ print_arrowbox(SrcNode, TargetNode, "unknown", "unknown", 0.0);
+print_stats(SrcNode, TargetNode, Stats) ->
+ ObjsS = proplists:get_value(objs_per_s, Stats),
+ BytesS = proplists:get_value(bytes_per_s, Stats),
+ LastUpdate = proplists:get_value(last_update, Stats),
+ Diff = timer:now_diff(os:timestamp(), LastUpdate),
+ DiffStr = riak_core_format:human_time_fmt("~.2f", Diff),
+ Objs = proplists:get_value(objs_total, Stats),
+ ObjsSStr = riak_core_format:fmt("~p Objs/s", [ObjsS]),
+ ByteStr = riak_core_format:human_size_fmt("~.2f", BytesS) ++ "/s",
+ TS = datetime_str(LastUpdate),
+ Size = proplists:get_value(size, Stats),
+ DonePctDecimal = proplists:get_value(pct_done_decimal, Stats),
+ io:format("last update: ~s [~s ago]~n", [TS, DiffStr]),
+ print_size(Size),
+ io:format("objects transferred: ~p~n~n", [Objs]),
+ print_arrowbox(SrcNode, TargetNode, ObjsSStr, ByteStr, DonePctDecimal).
+
+print_size(undefined) ->
+ io:format("total size: unknown~n");
+print_size({Objs, objects}) ->
+ io:format("total size: ~p objects~n", [Objs]);
+print_size({Bytes, bytes}) ->
+ io:format("total size: ~p bytes~n", [Bytes]).
+
+-define(ARROW, "=======> ").
+
+print_arrowbox(SrcAtom, TargetAtom, Objs, Bytes, Progress) ->
+ Src0 = atom_to_list(SrcAtom),
+ Target0 = atom_to_list(TargetAtom),
+ {SCont1, SCont2} = wrap(Src0),
+ {TCont1, TCont2} = wrap(Target0),
+
+ Src = case SCont1 of
+ "" -> string:centre(Src0, 25);
+ _ -> Src0
+ end,
+ Target = case TCont1 of
+ "" -> string:centre(Target0, 25);
+ _ -> Target0
+ end,
+
+ ToFrom = riak_core_format:fmt("~25s ~10s ~25s",
+ [Src, ?ARROW, Target]),
+ Width = length(ToFrom),
+
+ Prog = progress(Progress, 50),
+
+ io:format("~s~n", [string:centre(Objs, Width)]),
+ io:format("~s~n", [ToFrom]),
+ Fmt = "~-25s ~10s ~-25s~n",
+ case SCont1 /= "" orelse TCont1 /= "" of
+ true ->
+ io:format(Fmt, [SCont1, "", TCont1]);
+ _ -> ok
+ end,
+ case SCont2 /= "" orelse TCont2 /= "" of
+ true ->
+ io:format(Fmt, [SCont2, "", TCont2]);
+ _ -> ok
+ end,
+ io:format("~s~n", [string:centre(" "++Prog, Width)]),
+ io:format("~s~n", [string:centre(Bytes, Width)]).
+
+
+wrap(String) ->
+ Len = length(String),
+ case Len of
+ N when N > 50 ->
+ One = lists:sublist(String, 26, 25),
+ Two = lists:sublist(String, 51, 25),
+ {One, Two};
+ N when N >= 25,
+ N =< 50->
+ One = lists:sublist(String, 26, 25),
+ {One, ""};
+ _ ->
+ {"", ""}
+ end.
+
+
+progress(undefined, MaxSize) ->
+ FormatStr = progress_fmt(progress_size(MaxSize), 0), %% this is wrong, need - 6 refactor
+ riak_core_format:fmt(FormatStr, ["", "", "N/A"]);
+progress(PctDecimal, MaxSize) ->
+ ProgressTotalSize = progress_size(MaxSize),
+ ProgressSize = trunc(PctDecimal * ProgressTotalSize),
+ PadSize = ProgressTotalSize - ProgressSize,
+ FormatStr = progress_fmt(ProgressSize, PadSize),
+ riak_core_format:fmt(FormatStr, ["", "", integer_to_list(trunc(PctDecimal * 100))]).
+
+progress_size(MaxSize) ->
+ MaxSize - 7. %% 7 fixed characters in progress bar
+
+progress_fmt(ArrowSize, PadSize) ->
+ riak_core_format:fmt("|~~~p..=s~~~p.. s| ~~3.. s%", [ArrowSize, PadSize]).
+
+datetime_str({_Mega, _Secs, _Micro}=Now) ->
+ datetime_str(calendar:now_to_datetime(Now));
+datetime_str({{Year, Month, Day}, {Hour, Min, Sec}}) ->
+ riak_core_format:fmt("~4..0B-~2..0B-~2..0B ~2..0B:~2..0B:~2..0B",
+ [Year,Month,Day,Hour,Min,Sec]).
+
stage_leave([]) ->
stage_leave(node());
stage_leave([NodeStr]) ->
View
47 src/riak_core_coverage_fsm.erl
@@ -112,7 +112,9 @@ behaviour_info(_) ->
required_responses :: pos_integer(),
response_count=0 :: non_neg_integer(),
timeout :: timeout(),
- vnode_master :: atom()
+ vnode_master :: atom(),
+ plan_fun :: function(),
+ process_fun :: function()
}).
%% ===================================================================
@@ -153,10 +155,13 @@ test_link(Mod, From, RequestArgs, _Options, StateProps) ->
init([Mod,
From={_, ReqId, _},
RequestArgs]) ->
+ Exports = Mod:module_info(exports),
{Request, VNodeSelector, NVal, PrimaryVNodeCoverage,
NodeCheckService, VNodeMaster, Timeout, ModState} =
Mod:init(From, RequestArgs),
gen_fsm:start_timer(Timeout, {timer_expired, Timeout}),
+ PlanFun = plan_callback(Mod, Exports),
+ ProcessFun = process_results_callback(Mod, Exports),
StateData = #state{mod=Mod,
mod_state=ModState,
node_check_service=NodeCheckService,
@@ -166,7 +171,9 @@ init([Mod,
request=Request,
req_id=ReqId,
timeout=infinity,
- vnode_master=VNodeMaster},
+ vnode_master=VNodeMaster,
+ plan_fun = PlanFun,
+ process_fun = ProcessFun},
{ok, initialize, StateData, 0};
init({test, Args, StateProps}) ->
%% Call normal init
@@ -195,7 +202,8 @@ initialize(timeout, StateData0=#state{mod=Mod,
request=Request,
req_id=ReqId,
timeout=Timeout,
- vnode_master=VNodeMaster}) ->
+ vnode_master=VNodeMaster,
+ plan_fun = PlanFun}) ->
CoveragePlan = riak_core_coverage_plan:create_plan(VNodeSelector,
NVal,
PVC,
@@ -205,13 +213,14 @@ initialize(timeout, StateData0=#state{mod=Mod,
{error, Reason} ->
Mod:finish({error, Reason}, ModState);
{CoverageVNodes, FilterVNodes} ->
+ {ok, UpModState} = PlanFun(CoverageVNodes, ModState),
Sender = {fsm, ReqId, self()},
riak_core_vnode_master:coverage(Request,
CoverageVNodes,
FilterVNodes,
Sender,
VNodeMaster),
- StateData = StateData0#state{coverage_vnodes=CoverageVNodes},
+ StateData = StateData0#state{coverage_vnodes=CoverageVNodes, mod_state=UpModState},
{next_state, waiting_results, StateData, Timeout}
end.
@@ -221,8 +230,9 @@ waiting_results({{ReqId, VNode}, Results},
mod=Mod,
mod_state=ModState,
req_id=ReqId,
- timeout=Timeout}) ->
- case Mod:process_results(Results, ModState) of
+ timeout=Timeout,
+ process_fun = ProcessFun}) ->
+ case ProcessFun(VNode, Results, ModState) of
{ok, UpdModState} ->
UpdStateData = StateData#state{mod_state=UpdModState},
{next_state, waiting_results, UpdStateData, Timeout};
@@ -275,3 +285,28 @@ terminate(Reason, _StateName, _State) ->
%% @private
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.
+
+plan_callback(Mod, Exports) ->
+ case exports(plan, Exports) of
+ true ->
+ fun(CoverageVNodes, ModState) ->
+ Mod:plan(CoverageVNodes, ModState) end;
+ _ -> fun(_, ModState) ->
+ {ok, ModState} end
+ end.
+
+process_results_callback(Mod, Exports) ->
+ case exports_arity(process_results, 3, Exports) of
+ true ->
+ fun(VNode, Results, ModState) ->
+ Mod:process_results(VNode, Results, ModState) end;
+ false ->
+ fun(_VNode, Results, ModState) ->
+ Mod:process_results(Results, ModState) end
+ end.
+
+exports(Function, Exports) ->
+ proplists:is_defined(Function, Exports).
+
+exports_arity(Function, Arity, Exports) ->
+ lists:member(Arity, proplists:get_all_values(Function, Exports)).
View
10 src/riak_core_format.erl
@@ -23,7 +23,8 @@
-module(riak_core_format).
-export([fmt/2,
human_size_fmt/2,
- human_time_fmt/2]).
+ human_time_fmt/2,
+ epoch_to_datetime/1]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@@ -52,6 +53,13 @@ human_time_fmt(Fmt, Micros) ->
{Value, Units} = human_time(Micros),
fmt(Fmt2, [Value, Units]).
+%% @doc Convert a folsom_utils:now_epoch() to a universal datetime
+-spec epoch_to_datetime(non_neg_integer()) -> calendar:datetime().
+epoch_to_datetime(S) ->
+ Epoch = {{1970,1,1},{0,0,0}},
+ Seconds = calendar:datetime_to_gregorian_seconds(Epoch) + S,
+ calendar:gregorian_seconds_to_datetime(Seconds).
+
%%%===================================================================
%%% Private
%%%===================================================================
View
55 src/riak_core_handoff_manager.erl
@@ -33,7 +33,7 @@
]).
%% handoff api
--export([add_outbound/4,
+-export([add_outbound/5,
add_inbound/1,
xfer/3,
kill_xfer/3,
@@ -73,12 +73,12 @@ start_link() ->
init([]) ->
{ok, #state{excl=ordsets:new(), handoffs=[]}}.
-add_outbound(Module,Idx,Node,VnodePid) ->
+add_outbound(Module,Idx,Node,VnodePid,Opts) ->
case application:get_env(riak_core, disable_outbound_handoff) of
{ok, true} ->
{error, max_concurrency};
_ ->
- gen_server:call(?MODULE,{add_outbound,Module,Idx,Node,VnodePid},infinity)
+ gen_server:call(?MODULE,{add_outbound,Module,Idx,Node,VnodePid,Opts},infinity)
end.
add_inbound(SSLOpts) ->
@@ -148,8 +148,8 @@ get_exclusions(Module) ->
handle_call({get_exclusions, Module}, _From, State=#state{excl=Excl}) ->
Reply = [I || {M, I} <- ordsets:to_list(Excl), M =:= Module],
{reply, {ok, Reply}, State};
-handle_call({add_outbound,Mod,Idx,Node,Pid},_From,State=#state{handoffs=HS}) ->
- case send_handoff(Mod,Idx,Node,Pid,HS) of
+handle_call({add_outbound,Mod,Idx,Node,Pid,Opts},_From,State=#state{handoffs=HS}) ->
+ case send_handoff(Mod,Idx,Node,Pid,HS,Opts) of
{ok,Handoff=#handoff_status{transport_pid=Sender}} ->
HS2 = HS ++ [Handoff],
{reply, {ok,Sender}, State#state{handoffs=HS2}};
@@ -247,7 +247,7 @@ handle_cast({send_handoff, Mod, {Src, Target}, ReqOrigin,
%% TODO: make a record?
{ok, VNode} = riak_core_vnode_manager:get_vnode_pid(Src, Mod),
case send_handoff({Mod, Src, Target}, ReqOrigin, VNode, HS,
- {Filter, FMF}, ReqOrigin) of
+ {Filter, FMF}, ReqOrigin, []) of
{ok, Handoff} ->
HS2 = HS ++ [Handoff],
{noreply, State#state{handoffs=HS2}};
@@ -348,7 +348,6 @@ build_status(HO) ->
status=Status,
timestamp=StartTS,
transport_pid=TPid,
- stats=Stats,
type=Type}=HO,
{status_v2, [{mod, Mod},
{src_partition, SrcP},
@@ -359,25 +358,41 @@ build_status(HO) ->
{status, Status},
{start_ts, StartTS},
{sender_pid, TPid},
- {stats, calc_stats(Stats, StartTS)},
+ {stats, calc_stats(HO)},
{type, Type}]}.
-calc_stats(Stats, StartTS) ->
+calc_stats(#handoff_status{stats=Stats,timestamp=StartTS,size=Size}) ->
case dict:find(last_update, Stats) of
error ->
no_stats;
{ok, LastUpdate} ->
Objs = dict:fetch(objs, Stats),
Bytes = dict:fetch(bytes, Stats),
+ CalcSize = get_size(Size),
+ Done = calc_pct_done(Objs, Bytes, CalcSize),
ElapsedS = timer:now_diff(LastUpdate, StartTS) / 1000000,
ObjsS = round(Objs / ElapsedS),
BytesS = round(Bytes / ElapsedS),
[{objs_total, Objs},
{objs_per_s, ObjsS},
{bytes_per_s, BytesS},
- {last_update, LastUpdate}]
+ {last_update, LastUpdate},
+ {size, CalcSize},
+ {pct_done_decimal, Done}]
end.
+get_size({F, dynamic}) ->
+ F();
+get_size(S) ->
+ S.
+
+calc_pct_done(_, _, undefined) ->
+ undefined;
+calc_pct_done(Objs, _, {Size, objects}) ->
+ Objs / Size;
+calc_pct_done(_, Bytes, {Size, bytes}) ->
+ Bytes / Size.
+
filter(none) ->
fun(_) -> true end;
filter({Key, Value}=_Filter) ->
@@ -399,8 +414,8 @@ handoff_concurrency_limit_reached () ->
ActiveSenders=proplists:get_value(active,Senders),
get_concurrency_limit() =< (ActiveReceivers + ActiveSenders).
-send_handoff(Mod, Partition, Node, Pid, HS) ->
- send_handoff({Mod, Partition, Partition}, Node, Pid, HS, {none, none}, none).
+send_handoff(Mod, Partition, Node, Pid, HS, Opts) ->
+ send_handoff({Mod, Partition, Partition}, Node, Pid, HS, {none, none}, none, Opts).
%% @private
%%
@@ -415,7 +430,7 @@ send_handoff(Mod, Partition, Node, Pid, HS) ->
{ok, handoff_status()}
| {error, max_concurrency}
| {false, handoff_status()}.
-send_handoff({Mod, Src, Target}, Node, Vnode, HS, {Filter, FilterModFun}, Origin) ->
+send_handoff({Mod, Src, Target}, Node, Vnode, HS, {Filter, FilterModFun}, Origin, Opts) ->
case handoff_concurrency_limit_reached() of
true ->
{error, max_concurrency};
@@ -466,6 +481,7 @@ send_handoff({Mod, Src, Target}, Node, Vnode, HS, {Filter, FilterModFun}, Origin
Node)
end,
PidM = monitor(process, Pid),
+ Size = validate_size(proplists:get_value(size, Opts)),
%% successfully started up a new sender handoff
{ok, #handoff_status{ transport_pid=Pid,
@@ -481,7 +497,8 @@ send_handoff({Mod, Src, Target}, Node, Vnode, HS, {Filter, FilterModFun}, Origin
stats=dict:new(),
type=HOType,
req_origin=Origin,
- filter_mod_fun=FilterModFun
+ filter_mod_fun=FilterModFun,
+ size=Size
}
};
@@ -521,6 +538,16 @@ update_stats(StatsUpdate, Stats) ->
Stats3 = dict:update_counter(bytes, Bytes, Stats2),
dict:store(last_update, LU, Stats3).
+validate_size(Size={N, U}) when is_number(N) andalso
+ N > 0 andalso
+ (U =:= bytes orelse U =:= objects) ->
+ Size;
+validate_size(Size={F, dynamic}) when is_function(F) ->
+ Size;
+validate_size(_) ->
+ undefined.
+
+
%% @private
%%
%% @doc Kill and remove _each_ xfer associated with `ModSrcTarget'
View
6 src/riak_core_handoff_receiver.erl
@@ -110,6 +110,12 @@ process_message(?PT_MSG_INIT, MsgData, State=#state{vnode_mod=VNodeMod}) ->
{vnode_pid, VNode}],
riak_core_handoff_manager:set_recv_data(self(), Data),
State#state{partition=Partition, vnode=VNode};
+
+process_message(?PT_MSG_BATCH, MsgData, State) ->
+ lists:foldl(fun(Obj, StateAcc) -> process_message(?PT_MSG_OBJ, Obj, StateAcc) end,
+ State,
+ binary_to_term(MsgData));
+
process_message(?PT_MSG_OBJ, MsgData, State=#state{vnode=VNode, count=Count}) ->
Msg = {handoff_data, MsgData},
case gen_fsm:sync_send_all_state_event(VNode, Msg, 60000) of
View
153 src/riak_core_handoff_sender.erl
@@ -43,16 +43,24 @@
%% Accumulator for the visit item HOF
-record(ho_acc,
{
- ack :: non_neg_integer(),
- error :: ok | {error, any()},
- filter :: function(),
- module :: module(),
- parent :: pid(),
- socket :: any(),
- src_target :: {non_neg_integer(), non_neg_integer()},
- stats :: #ho_stats{},
- tcp_mod :: module(),
- total :: non_neg_integer()
+ ack :: non_neg_integer(),
+ error :: ok | {error, any()},
+ filter :: function(),
+ module :: module(),
+ parent :: pid(),
+ socket :: any(),
+ src_target :: {non_neg_integer(), non_neg_integer()},
+ stats :: #ho_stats{},
+ tcp_mod :: module(),
+
+ total_objects :: non_neg_integer(),
+ total_bytes :: non_neg_integer(),
+
+ item_queue :: [binary()],
+ item_queue_length :: non_neg_integer(),
+ item_queue_byte_size :: non_neg_integer(),
+
+ acksync_threshold :: non_neg_integer()
}).
%%%===================================================================
@@ -114,6 +122,8 @@ start_fold(TargetNode, Module, {Type, Opts}, ParentPid, SslOpts) ->
RecvTimeout = get_handoff_receive_timeout(),
+ AckSyncThreshold = app_helper:get_env(riak_core, handoff_acksync_threshold, 25),
+
%% Now that handoff_concurrency applies to both outbound and
%% inbound conns there is a chance that the receiver may
%% decide to reject the senders attempt to start a handoff.
@@ -137,37 +147,55 @@ start_fold(TargetNode, Module, {Type, Opts}, ParentPid, SslOpts) ->
Stats = #ho_stats{interval_end=future_now(get_status_interval())},
Req = ?FOLD_REQ{foldfun=fun visit_item/3,
- acc0=#ho_acc{ack=0,
+ acc0=#ho_acc{
+ ack=0,
error=ok,
filter=Filter,
module=Module,
parent=ParentPid,
socket=Socket,
src_target={SrcPartition, TargetPartition},
- stats=Stats,
+ stats=Stats,
tcp_mod=TcpMod,
- total=0}},
+ total_bytes=0,
+ total_objects=0,
+
+ item_queue=[],
+ item_queue_length=0,
+ item_queue_byte_size=0,
+
+ acksync_threshold=AckSyncThreshold
+ }
+ },
%% IFF the vnode is using an async worker to perform the fold
%% then sync_command will return error on vnode crash,
%% otherwise it will wait forever but vnode crash will be
%% caught by handoff manager. I know, this is confusing, a
%% new handoff system will be written soon enough.
- R = riak_core_vnode_master:sync_command({SrcPartition, SrcNode},
- Req,
- VMaster, infinity),
- if R == {error, vnode_shutdown} ->
+
+ AccRecord0 = riak_core_vnode_master:sync_command({SrcPartition, SrcNode},
+ Req,
+ VMaster, infinity),
+
+ %% Send any straggler entries remaining in the buffer:
+ AccRecord = send_objects(AccRecord0#ho_acc.item_queue, AccRecord0),
+
+ if AccRecord == {error, vnode_shutdown} ->
?log_info("because the local vnode was shutdown", []),
throw({be_quiet, error, local_vnode_shutdown_requested});
true ->
ok % If not #ho_acc, get badmatch below
end,
- #ho_acc{error=ErrStatus,
+ #ho_acc{
+ error=ErrStatus,
module=Module,
parent=ParentPid,
tcp_mod=TcpMod,
- total=SentCount} = R,
+ total_objects=TotalObjects,
+ total_bytes=TotalBytes
+ } = AccRecord,
case ErrStatus of
ok ->
@@ -188,12 +216,14 @@ start_fold(TargetNode, Module, {Type, Opts}, ParentPid, SslOpts) ->
end,
FoldTimeDiff = end_fold_time(StartFoldTime),
+ ThroughputBytes = TotalBytes/FoldTimeDiff,
lager:info("~p transfer of ~p from ~p ~p to ~p ~p"
- " completed: sent ~p objects in ~.2f seconds",
- [Type, Module, SrcNode, SrcPartition,
- TargetNode, TargetPartition, SentCount,
- FoldTimeDiff]),
+ " completed: sent ~s bytes in ~p objects"
+ " in ~.2f seconds (~s/second)",
+ [Type, Module, SrcNode, SrcPartition, TargetNode, TargetPartition,
+ riak_core_format:human_size_fmt("~.2f", TotalBytes), TotalObjects,
+ FoldTimeDiff, riak_core_format:human_size_fmt("~.2f", ThroughputBytes)]),
case Type of
repair -> ok;
@@ -232,7 +262,7 @@ start_fold(TargetNode, Module, {Type, Opts}, ParentPid, SslOpts) ->
%% Since we can't abort the fold, this clause is just a no-op.
visit_item(_K, _V, Acc=#ho_acc{error={error, _Reason}}) ->
Acc;
-visit_item(K, V, Acc=#ho_acc{ack=?ACK_COUNT}) ->
+visit_item(K, V, Acc = #ho_acc{ack = _AccSyncThreshold, acksync_threshold = _AccSyncThreshold}) ->
#ho_acc{module=Module,
socket=Sock,
src_target={SrcPartition, TargetPartition},
@@ -260,34 +290,77 @@ visit_item(K, V, Acc=#ho_acc{ack=?ACK_COUNT}) ->
Acc#ho_acc{ack=0, error={error, Reason}, stats=Stats3}
end;
visit_item(K, V, Acc) ->
- #ho_acc{ack=Ack,
+ #ho_acc{
filter=Filter,
module=Module,
+ total_objects=TotalObjects,
+ item_queue=ItemQueue,
+ item_queue_length=ItemQueueLength,
+ item_queue_byte_size=ItemQueueByteSize
+ } = Acc,
+
+ case Filter(K) of
+ true ->
+ BinObj = Module:encode_handoff_item(K, V),
+
+ ItemQueue2 = [BinObj | ItemQueue],
+ ItemQueueLength2 = ItemQueueLength + 1,
+ ItemQueueByteSize2 = ItemQueueByteSize + byte_size(BinObj),
+
+ Acc2 = Acc#ho_acc{item_queue_length=ItemQueueLength2,
+ item_queue_byte_size=ItemQueueByteSize2},
+
+ %% Unit size is bytes:
+ HandoffBatchThreshold = app_helper:get_env(riak_core, handoff_batch_threshold, 1024*1024),
+
+ case ItemQueueByteSize2 =< HandoffBatchThreshold of
+ true -> Acc2#ho_acc{item_queue=ItemQueue2};
+ false -> send_objects(ItemQueue2, Acc2)
+ end;
+
+ false ->
+ Acc#ho_acc{error=ok, total_objects=TotalObjects+1}
+ end.
+
+send_objects([], Acc) ->
+ Acc;
+send_objects(ItemsReverseList, Acc) ->
+
+ Items = lists:reverse(ItemsReverseList),
+
+ #ho_acc{ack=Ack,
+ module=Module,
socket=Sock,
src_target={SrcPartition, TargetPartition},
stats=Stats,
tcp_mod=TcpMod,
- total=Total
+
+ total_objects=TotalObjects,
+ total_bytes=TotalBytes,
+ item_queue_length=NObjects
} = Acc,
- case Filter(K) of
- true ->
- BinObj = Module:encode_handoff_item(K, V),
- M = <<?PT_MSG_OBJ:8,BinObj/binary>>,
+ ObjectList = term_to_binary(Items),
+
+ M = <<?PT_MSG_BATCH:8, ObjectList/binary>>,
+
NumBytes = byte_size(M),
- Stats2 = incr_bytes(incr_objs(Stats), NumBytes),
+ Stats2 = incr_bytes(incr_objs(Stats, NObjects), NumBytes),
Stats3 = maybe_send_status({Module, SrcPartition, TargetPartition}, Stats2),
case TcpMod:send(Sock, M) of
ok ->
- Acc#ho_acc{ack=Ack+1, error=ok, stats=Stats3, total=Total+1};
+ Acc#ho_acc{ack=Ack+1, error=ok, stats=Stats3,
+ total_objects=TotalObjects+NObjects,
+ total_bytes=TotalBytes+NumBytes,
+
+ item_queue=[],
+ item_queue_length=0,
+ item_queue_byte_size=0};
{error, Reason} ->
Acc#ho_acc{error={error, Reason}, stats=Stats3}
- end;
- false ->
- Acc#ho_acc{error=ok, total=Total+1}
- end.
+ end.
get_handoff_ip(Node) when is_atom(Node) ->
case rpc:call(Node, riak_core_handoff_listener, get_handoff_ip, [],
@@ -366,10 +439,10 @@ incr_bytes(Stats=#ho_stats{bytes=Bytes}, NumBytes) ->
%% @private
%%
-%% @doc Increment `Stats' object count by 1.
--spec incr_objs(ho_stats()) -> NewStats::ho_stats().
-incr_objs(Stats=#ho_stats{objs=Objs}) ->
- Stats#ho_stats{objs=Objs+1}.
+%% @doc Increment `Stats' object count by NObjs:
+-spec incr_objs(ho_stats(), non_neg_integer()) -> NewStats::ho_stats().
+incr_objs(Stats=#ho_stats{objs=Objs}, NObjs) ->
+ Stats#ho_stats{objs=Objs+NObjs}.
%% @private
%%
View
57 src/riak_core_send_msg.erl
@@ -0,0 +1,57 @@
+%%
+%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc OTP equivalents for sending reply- and event-like things
+%% without blocking the sender.
+
+-module(riak_core_send_msg).
+
+-export([reply_unreliable/2,
+ cast_unreliable/2,
+ send_event_unreliable/2,
+ bang_unreliable/2]).
+
+-ifdef(TEST).
+-ifdef(PULSE).
+-compile(export_all).
+-compile({parse_transform, pulse_instrument}).
+-compile({pulse_replace_module, [{gen_fsm, pulse_gen_fsm},
+ {gen_server, pulse_gen_server}]}).
+-endif.
+-endif.
+
+%% NOTE: We'ed peeked inside gen_server.erl's guts to see its internals.
+reply_unreliable({To, Tag}, Reply) ->
+ bang_unreliable(To, {Tag, Reply}).
+
+cast_unreliable(Dest, Request) ->
+ bang_unreliable(Dest, {'$gen_cast', Request}).
+
+%% NOTE: We'ed peeked inside gen_fsm.erl's guts to see its internals.
+send_event_unreliable({global, _Name} = GlobalTo, Event) ->
+ erlang:error({unimplemented_send, GlobalTo, Event});
+send_event_unreliable({via, _Mod, _Name} = ViaTo, Event) ->
+ erlang:error({unimplemented_send, ViaTo, Event});
+send_event_unreliable(Name, Event) ->
+ bang_unreliable(Name, {'$gen_event', Event}),
+ ok.
+
+bang_unreliable(Dest, Msg) ->
+ catch erlang:send(Dest, Msg, [noconnect, nosuspend]),
+ Msg.
View
36 src/riak_core_ssl_util.erl
@@ -41,14 +41,14 @@ maybe_use_ssl(App) ->
{cacerts, load_certs(app_helper:get_env(App, cacertdir, undefined))},
{depth, app_helper:get_env(App, ssl_depth, 1)},
{verify_fun, {fun verify_ssl/3,
- get_my_common_name(app_helper:get_env(App, certfile,
- undefined))}},
+ {App, get_my_common_name(app_helper:get_env(App, certfile,
+ undefined))}}},
{verify, verify_peer},
{fail_if_no_peer_cert, true},
{secure_renegotiate, true} %% both sides are erlang, so we can force this
],
Enabled = app_helper:get_env(App, ssl_enabled, false) == true,
- case validate_ssl_config(Enabled, SSLOpts) of
+ case validate_ssl_config(App, Enabled, SSLOpts) of
true ->
SSLOpts;
{error, Reason} ->
@@ -60,45 +60,45 @@ maybe_use_ssl(App) ->
end.
-validate_ssl_config(false, _) ->
+validate_ssl_config(_App, false, _) ->
%% ssl is disabled
false;
-validate_ssl_config(true, []) ->
+validate_ssl_config(_App, true, []) ->
%% all options validated
true;
-validate_ssl_config(true, [{certfile, CertFile}|Rest]) ->
+validate_ssl_config(App, true, [{certfile, CertFile}|Rest]) ->
case filelib:is_regular(CertFile) of
true ->
- validate_ssl_config(true, Rest);
+ validate_ssl_config(App, true, Rest);
false ->
{error, lists:flatten(io_lib:format("Certificate ~p is not a file",
[CertFile]))}
end;
-validate_ssl_config(true, [{keyfile, KeyFile}|Rest]) ->
+validate_ssl_config(App, true, [{keyfile, KeyFile}|Rest]) ->
case filelib:is_regular(KeyFile) of
true ->
- validate_ssl_config(true, Rest);
+ validate_ssl_config(App, true, Rest);
false ->
{error, lists:flatten(io_lib:format("Key ~p is not a file",
[KeyFile]))}
end;
-validate_ssl_config(true, [{cacerts, CACerts}|Rest]) ->
+validate_ssl_config(App, true, [{cacerts, CACerts}|Rest]) ->
case CACerts of
undefined ->
{error, lists:flatten(
io_lib:format("CA cert dir ~p is invalid",
- [app_helper:get_env(riak_repl, cacertdir,
+ [app_helper:get_env(App, cacertdir,
undefined)]))};
[] ->
{error, lists:flatten(
io_lib:format("Unable to load any CA certificates from ~p",
- [app_helper:get_env(riak_repl, cacertdir,
+ [app_helper:get_env(App, cacertdir,
undefined)]))};
Certs when is_list(Certs) ->
- validate_ssl_config(true, Rest)
+ validate_ssl_config(App, true, Rest)
end;
-validate_ssl_config(true, [_|Rest]) ->
- validate_ssl_config(true, Rest).
+validate_ssl_config(App, true, [_|Rest]) ->
+ validate_ssl_config(App, true, Rest).
upgrade_client_to_ssl(Socket, App) ->
case maybe_use_ssl(App) of
@@ -164,10 +164,8 @@ verify_ssl(_, valid, UserState) ->
verify_ssl(_, valid_peer, undefined) ->
lager:error("Unable to determine local certificate's common name"),
{fail, bad_local_common_name};
-verify_ssl(Cert, valid_peer, MyCommonName) ->
-
+verify_ssl(Cert, valid_peer, {App, MyCommonName}) ->
CommonName = get_common_name(Cert),
-
case string:to_lower(CommonName) == string:to_lower(MyCommonName) of
true ->
lager:error("Peer certificate's common name matches local "
@@ -175,7 +173,7 @@ verify_ssl(Cert, valid_peer, MyCommonName) ->
{fail, duplicate_common_name};
_ ->
case validate_common_name(CommonName,
- app_helper:get_env(riak_repl, peer_common_name_acl, "*")) of
+ app_helper:get_env(App, peer_common_name_acl, "*")) of
{true, Filter} ->
lager:info("SSL connection from ~s granted by ACL ~s",
[CommonName, Filter]),
View
105 src/riak_core_tcp_mon.erl
@@ -23,6 +23,10 @@
-module(riak_core_tcp_mon).
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
-export([start_link/0, start_link/1, monitor/3, status/0, status/1, format/0, format/2]).
-export([default_status_funs/0, raw/2, diff/2, rate/2, kbps/2,
socket_status/1, format_socket_stats/2 ]).
@@ -95,20 +99,27 @@ format(Status, Stat) ->
format_header(Stat) ->
io_lib:format("~40w Value\n", [Stat]).
-format_entry({_Socket, Status}, Stat) ->
+format_entry(Status, Stat) ->
Tag = proplists:get_value(tag, Status),
Value = proplists:get_value(Stat, Status),
case Value of
Value when is_list(Value) ->
- [io_lib:format("~40s [", [Tag]),
- format_list(Value),
- "]\n"];
+ [format_tag(Tag),
+ " ",
+ format_list(Value),
+ "\n"];
_ ->
- [io_lib:format("~40s", [Tag]),
+ [format_tag(Tag),
+ " [",
format_value(Value),
"\n"]
end.
+format_tag(Tag) when is_list(Tag) ->
+ io_lib:format("~40s", [Tag]);
+format_tag(Tag) ->
+ io_lib:format("~40w", [Tag]).
+
format_value(Val) when is_float(Val) ->
io_lib:format("~7.1f", [Val]);
format_value(Val) ->
@@ -161,8 +172,8 @@ init(Props) ->
clear_after = proplists:get_value(clear_after, Props, ?DEFAULT_LIMIT)},
DistCtrl = erlang:system_info(dist_ctrl),
State = lists:foldl(fun({Node,Port}, DatState) ->
- add_dist_conn(Node, Port, DatState)
- end, State0, DistCtrl),
+ add_dist_conn(Node, Port, DatState)
+ end, State0, DistCtrl),
{ok, schedule_tick(State)}.
handle_call(status, _From, State = #state{conns = Conns,
@@ -198,9 +209,21 @@ handle_info({nodeup, Node, _InfoList}, State) ->
{noreply, add_dist_conn(Port, Node, State)}
end;
+handle_info({nodedown, Node, _InfoList}, State) ->
+ GbList = gb_trees:to_list(State#state.conns),
+ MaybePortConn = [{P, C} ||
+ {P, #conn{type = dist, tag = {node, MaybeNode}} = C} <- GbList,
+ MaybeNode =:= Node],
+ Conns2 = case MaybePortConn of
+ [{Port, Conn} | _] ->
+ erlang:send_after(State#state.clear_after, self(), {clear, Port}),
+ Conn2 = Conn#conn{type = error},
+ gb_trees:update(Port, Conn2, State#state.conns);
+ _ ->
+ State#state.conns
+ end,
+ {noreply, State#state{conns = Conns2}};
-handle_info({nodedown, _Node, _InfoList}, _State) ->
- {noreply, #state{}};
handle_info(measurement_tick, State = #state{limit = Limit, stats = Stats,
opts = Opts, conns = Conns}) ->
schedule_tick(State),
@@ -215,7 +238,7 @@ handle_info(measurement_tick, State = #state{limit = Limit, stats = Stats,
hist = Hist2}
catch
_E:_R ->
- %io:format("Error ~p: ~p\n", [E, R]),
+ %io:format("Error ~p: ~p\n", [_E, _R]),
%% Any problems with getstat/getopts mark in error
erlang:send_after(State#state.clear_after,
self(),
@@ -239,7 +262,9 @@ code_change(_OldVsn, State, _Extra) ->
%% Add a distributed connection to the state
add_dist_conn(Node, Port, State) ->
- add_conn(Port, #conn{tag = {node, Node}, type = dist}, State).
+ add_conn(Port, #conn{tag = {node, Node},
+ type = dist,
+ transport = ranch_tcp}, State).
%% Add connection to the state
add_conn(Socket, Conn, State = #state{conns = Conns}) ->
@@ -317,3 +342,61 @@ format_socket_stats([{K,V}|T], Buf) when
format_socket_stats([{K,V}|T], Buf) ->
format_socket_stats(T, [{K, V} | Buf]).
+-ifdef(TEST).
+updown() ->
+ %% Set the stat gathering interval to 100ms
+ {ok, TCPMonPid} = riak_core_tcp_mon:start_link([{interval, 100}]),
+ {ok, LS} = gen_tcp:listen(0, [{active, true}, binary]),
+ {ok, Port} = inet:port(LS),
+ Pid = self(),
+ spawn(
+ fun () ->
+ %% server
+ {ok, S} = gen_tcp:accept(LS),
+ riak_core_tcp_mon:monitor(S, "test", gen_tcp),
+ timer:sleep(1000),
+ receive
+ {tcp, S, _Data} ->
+ %% only receive one packet, let the others build
+ %% up
+ ok;
+ _ ->
+ ?assert(fail)
+ after
+ 1000 ->
+ ?assert(fail)
+ end,
+ _Stat1 = riak_core_tcp_mon:status(),
+ MPid = whereis(riak_core_tcp_mon),
+ MPid ! {nodedown, 'foo', []},
+ Stat2 = riak_core_tcp_mon:status(),
+ MPid ! {nodeup, 'foo', []},
+ Stat3 = riak_core_tcp_mon:status(),
+ ?assert(proplists:is_defined(socket,hd(Stat2))),
+ ?assert(proplists:is_defined(socket,hd(Stat3))),
+ gen_tcp:close(S),
+ Pid ! finished
+ end),
+ %% client
+ {ok, Socket} = gen_tcp:connect("localhost",Port,
+ [binary, {active, true}]),
+ lists:foreach(
+ fun (_) ->
+ gen_tcp:send(Socket, "TEST")
+ end,
+ lists:seq(1,10000)),
+ receive
+ finished -> ok;
+ {'EXIT', _, normal} -> ok;
+ X -> io:format(user, "Unexpected message received ~p~n", [X]),
+ ?assert(fail)
+ end,
+ gen_tcp:close(Socket),
+ unlink(TCPMonPid),
+ exit(TCPMonPid, kill),
+ ok.
+
+nodeupdown_test_() ->
+ {timeout, 60, fun updown/0}.
+
+-endif.
View
36 src/riak_core_vnode.erl
@@ -331,8 +331,8 @@ vnode_handoff_command(Sender, Request, State=#state{index=Index,
riak_core_vnode_worker_pool:handle_work(Pool, Work, From),
continue(State, NewModState);
{forward, NewModState} ->
- riak_core_vnode_master:command({Index, HN}, Request, Sender,
- riak_core_vnode_master:reg_name(Mod)),
+ riak_core_vnode_master:command_unreliable(
+ {Index,HN}, Request, Sender, riak_core_vnode_master:reg_name(Mod)),
continue(State, NewModState);
{drop, NewModState} ->
continue(State, NewModState);
@@ -684,36 +684,40 @@ start_handoff(State=#state{index=Idx, mod=Mod, modstate=ModState}, TargetNode) -
{true, NewModState} ->
finish_handoff(State#state{modstate=NewModState,
handoff_node=TargetNode});
+ {false, Size, NewModState} ->
+ start_handoff(State#state{modstate=NewModState}, TargetNode, [{size, Size}]);
{false, NewModState} ->
- case riak_core_handoff_manager:add_outbound(Mod,Idx,TargetNode,self()) of
- {ok, Pid} ->
- NewState = State#state{modstate=NewModState,
- handoff_pid=Pid,
- handoff_node=TargetNode},
- continue(NewState);
- {error,_Reason} ->
- continue(State#state{modstate=NewModState})
- end
+ start_handoff(State#state{modstate=NewModState}, TargetNode, [])
end.
+start_handoff(State=#state{mod=Mod, index=Idx}, TargetNode, Opts) ->
+ case riak_core_handoff_manager:add_outbound(Mod,Idx,TargetNode,self(),Opts) of
+ {ok, Pid} ->
+ NewState = State#state{handoff_pid=Pid,
+ handoff_node=TargetNode},
+ continue(NewState);
+ {error,_Reason} ->
+ continue(State)
+ end.
%% @doc Send a reply to a vnode request. If
%% the Ref is undefined just send the reply
%% for compatibility with pre-0.12 requestors.
%% If Ref is defined, send it along with the
%% reply.
+%% NOTE: We *always* send the reply using unreliable delivery.
%%
-spec reply(sender(), term()) -> any().
reply({fsm, undefined, From}, Reply) ->
- gen_fsm:send_event(From, Reply);
+ riak_core_send_msg:send_event_unreliable(From, Reply);
reply({fsm, Ref, From}, Reply) ->
- gen_fsm:send_event(From, {Ref, Reply});
+ riak_core_send_msg:send_event_unreliable(From, {Ref, Reply});
reply({server, undefined, From}, Reply) ->
- gen_server:reply(From, Reply);
+ riak_core_send_msg:reply_unreliable(From, Reply);
reply({server, Ref, From}, Reply) ->
- gen_server:reply(From, {Ref, Reply});
+ riak_core_send_msg:reply_unreliable(From, {Ref, Reply});
reply({raw, Ref, From}, Reply) ->
- From ! {Ref, Reply};
+ riak_core_send_msg:bang_unreliable(From, {Ref, Reply});
reply(ignore, _Reply) ->
ok.
View
81 src/riak_core_vnode_master.erl
@@ -26,10 +26,12 @@
-include("riak_core_vnode.hrl").
-behaviour(gen_server).
-export([start_link/1, start_link/2, start_link/3, get_vnode_pid/2,
- start_vnode/2, command/3, command/4, sync_command/3,
+ start_vnode/2,
+ command/3, command/4,
+ command_unreliable/3, command_unreliable/4,
+ sync_command/3, sync_command/4,
coverage/5,
command_return_vnode/4,
- sync_command/4,
sync_spawn_command/3, make_request/3,
make_coverage_request/4, all_nodes/1, reg_name/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -64,23 +66,39 @@ get_vnode_pid(Index, VNodeMod) ->
riak_core_vnode_manager:get_vnode_pid(Index, VNodeMod).
command(Preflist, Msg, VMaster) ->
- command(Preflist, Msg, ignore, VMaster).
+ command2(Preflist, Msg, ignore, VMaster, normal).
+
+command_unreliable(Preflist, Msg, VMaster) ->
+ command2(Preflist, Msg, ignore, VMaster, unreliable).
+
+command(PrefListOrCmd, Msg, Sender, VMaster) ->
+ command2(PrefListOrCmd, Msg, Sender, VMaster, normal).
+
+command_unreliable(PrefListOrCmd, Msg, Sender, VMaster) ->
+ command2(PrefListOrCmd, Msg, Sender, VMaster, unreliable).
%% Send the command to the preflist given with responses going to Sender
-command([], _Msg, _Sender, _VMaster) ->
+command2([], _Msg, _Sender, _VMaster, _How) ->
ok;
-command([{Index, Pid}|Rest], Msg, Sender, VMaster) when is_pid(Pid) ->
+
+command2([{Index, Pid}|Rest], Msg, Sender, VMaster, How=normal)
+ when is_pid(Pid) ->
gen_fsm:send_event(Pid, make_request(Msg, Sender, Index)),
- command(Rest, Msg, Sender, VMaster);
-command([{Index,Node}|Rest], Msg, Sender, VMaster) ->
- proxy_cast({VMaster, Node}, make_request(Msg, Sender, Index)),
- command(Rest, Msg, Sender, VMaster);
+ command2(Rest, Msg, Sender, VMaster, How);
+
+command2([{Index, Pid}|Rest], Msg, Sender, VMaster, How=unreliable)
+ when is_pid(Pid) ->
+ riak_core_send_msg:send_event_unreliable(Pid, make_request(Msg, Sender,
+ Index)),
+ command2(Rest, Msg, Sender, VMaster, How);
+command2([{Index,Node}|Rest], Msg, Sender, VMaster, How) ->
+ proxy_cast({VMaster, Node}, make_request(Msg, Sender, Index), How),
+ command2(Rest, Msg, Sender, VMaster, How);
-%% Send the command to an individual Index/Node combination
-command({Index, Pid}, Msg, Sender, _VMaster) when is_pid(Pid) ->
- gen_fsm:send_event(Pid, make_request(Msg, Sender, Index));
-command({Index,Node}, Msg, Sender, VMaster) ->
- proxy_cast({VMaster, Node}, make_request(Msg, Sender, Index)).
+command2(DestTuple, Msg, Sender, VMaster, How) when is_tuple(DestTuple) ->
+ %% Final case, tuple = single destination ... so make a list and
+ %% resubmit to this function.
+ command2([DestTuple], Msg, Sender, VMaster, How).
%% Send a command to a covering set of vnodes
coverage(Msg, CoverageVNodes, Keyspaces, {Type, Ref, From}, VMaster)
@@ -161,26 +179,43 @@ init([Service, VNodeMod, LegacyMod, _RegName]) ->
vnode_mod=VNodeMod,
legacy=LegacyMod}}.
-proxy_cast({VMaster, Node}, Req) ->
+proxy_cast(Who, Req) ->
+ proxy_cast(Who, Req, normal).
+
+proxy_cast({VMaster, Node}, Req, How) ->
case riak_core_capability:get({riak_core, vnode_routing}, legacy) of
legacy ->
- gen_server:cast({VMaster, Node}, Req);
+ if How == normal ->
+ gen_server:cast({VMaster, Node}, Req);
+ How == unreliable ->
+ riak_core_send_msg:cast_unreliable({VMaster, Node}, Req)
+ end;
proxy ->
- do_proxy_cast({VMaster, Node}, Req)
+ do_proxy_cast({VMaster, Node}, Req, How)
end.
-do_proxy_cast({VMaster, Node}, Req=?VNODE_REQ{index=Idx}) ->
+do_proxy_cast({VMaster, Node}, Req=?VNODE_REQ{index=Idx}, How) ->
Mod = vmaster_to_vmod(VMaster),
Proxy = riak_core_vnode_proxy:reg_name(Mod, Idx, Node),
- gen_fsm:send_event(Proxy, Req),
+ send_an_event(Proxy, Req, How),
ok;
-do_proxy_cast({VMaster, Node}, Req=?COVERAGE_REQ{index=Idx}) ->
+do_proxy_cast({VMaster, Node}, Req=?COVERAGE_REQ{index=Idx}, How) ->
Mod = vmaster_to_vmod(VMaster),
Proxy = riak_core_vnode_proxy:reg_name(Mod, Idx, Node),
- gen_fsm:send_event(Proxy, Req),
+ send_an_event(Proxy, Req, How),
ok;
-do_proxy_cast({VMaster, Node}, Other) ->
- gen_server:cast({VMaster, Node}, Other).
+do_proxy_cast({VMaster, Node}, Other, How) ->
+ send_a_cast({VMaster, Node}, Other, How).
+
+send_an_event(Dest, Event, normal) ->
+ gen_fsm:send_event(Dest, Event);
+send_an_event(Dest, Event, unreliable) ->
+ riak_core_send_msg:send_event_unreliable(Dest, Event).
+
+send_a_cast(Dest, Msg, normal) ->
+ gen_server:cast(Dest, Msg);
+send_a_cast(Dest, Msg, unreliable) ->
+ riak_core_send_msg:cast_unreliable(Dest, Msg).
handle_cast({wait_for_service, Service}, State) ->
case Service of
Please sign in to comment.
Something went wrong with that request. Please try again.