Permalink
Browse files

Update staged clustering code after review/further testing

Update claim_v2 to compute claim deltas for non-active owning nodes, to
handle corner case hit by new staged clustering code.

Change riak_core_ring_manager:ring_trans to allow code to provide a reason
when ignoring the transaction.

Update riak_core_claimant plan/commit to return reasons for failures.

Fix bug with riak_core_claimant:compute_all_next_rings where two rings were
considered equal when they were not.

Fix bug with riak_core_ring:future_ring where leaving nodes that still owned
partitions would incorrectly be changed to the exiting state.

Add code that enables querying and setting handoff concurrency settings.
  • Loading branch information...
1 parent 68d878f commit 9ee2fcf9e3860896e9b72da516b04aecfb8d3659 @jtuple jtuple committed Jun 8, 2012
View
@@ -109,7 +109,7 @@
{vnode_inactivity_timeout, 60000},
%% Number of VNodes allowed to do handoff concurrently.
- {handoff_concurrency, 1},
+ {handoff_concurrency, 2},
%% Disable Nagle on HTTP sockets
{disable_http_nagle, false},
View
@@ -172,7 +172,8 @@ choose_claim_v2(Ring, Node) ->
RingSize = riak_core_ring:num_partitions(Ring),
NodeCount = erlang:length(Active),
Avg = RingSize div NodeCount,
- Deltas = [{Member, Avg - Count} || {Member, Count} <- Counts],
+ ActiveDeltas = [{Member, Avg - Count} || {Member, Count} <- Counts],
+ Deltas = add_default_deltas(Owners, ActiveDeltas, 0),
{_, Want} = lists:keyfind(Node, 1, Deltas),
TargetN = app_helper:get_env(riak_core, target_n_val),
AllIndices = lists:zip(lists:seq(0, length(Owners)-1),
@@ -425,6 +426,13 @@ get_counts(Nodes, Ring) ->
dict:to_list(Counts).
%% @private
+add_default_deltas(IdxOwners, Deltas, Default) ->
+ {_, Owners} = lists:unzip(IdxOwners),
+ Owners2 = lists:usort(Owners),
+ Defaults = [{Member, Default} || Member <- Owners2],
+ lists:usort(Deltas ++ Defaults).
+
+%% @private
get_expected_partitions(Ring, Node) ->
riak_core_ring:num_partitions(Ring) div get_member_count(Ring, Node).
View
@@ -165,8 +165,14 @@ handle_call({stage, Node, Action}, _From, State) ->
handle_call(plan, _From, State) ->
{ok, Ring} = riak_core_ring_manager:get_raw_ring(),
- {Reply, State2} = generate_plan(Ring, State),
- {reply, Reply, State2};
+ case riak_core_ring:ring_ready(Ring) of
+ false ->
+ Reply = {error, ring_not_ready},
+ {reply, Reply, State};
+ true ->
+ {Reply, State2} = generate_plan(Ring, State),
+ {reply, Reply, State2}
+ end;
handle_call(commit, _From, State) ->
{Reply, State2} = commit_staged(State),
@@ -209,32 +215,43 @@ maybe_stage(Node, Action, Ring, State=#state{changes=Changes}) ->
%% @private
%% @doc Determine how the staged set of cluster changes will affect
%% the cluster. See {@link plan/0} for additional details.
-generate_plan(Ring, State=#state{changes=Changes, seed=Seed}) ->
+generate_plan(Ring, State=#state{changes=Changes}) ->
Changes2 = filter_changes(Changes, Ring),
Joining = [{Node, join} || Node <- riak_core_ring:members(Ring, [joining])],
AllChanges = lists:ukeysort(1, Changes2 ++ Joining),
- case compute_all_next_rings(Changes2, Seed, Ring) of
+ State2 = State#state{changes=Changes2},
+ generate_plan(AllChanges, Ring, State2).
+
+generate_plan([], _, State) ->
+ %% There are no changes to apply
+ {{ok, [], []}, State};
+generate_plan(Changes, Ring, State=#state{seed=Seed}) ->
+ case compute_all_next_rings(Changes, Seed, Ring) of
legacy ->
- {legacy, State#state{changes=Changes2}};
+ {{error, legacy}, State};
{ok, NextRings} ->
{_, NextRing} = hd(NextRings),
- State2 = State#state{next_ring=NextRing, changes=Changes2},
- Reply = {ok, AllChanges, NextRings},
+ State2 = State#state{next_ring=NextRing},
+ Reply = {ok, Changes, NextRings},
{Reply, State2}
end.
%% @private
%% @doc Commit the set of staged cluster changes. See {@link commit/0}
%% for additional details.
+commit_staged(State=#state{next_ring=undefined}) ->
+ {{error, nothing_planned}, State};
commit_staged(State) ->
case maybe_commit_staged(State) of
{ok, _} ->
State2 = State#state{next_ring=undefined,
changes=[],
seed=erlang:now()},
- {true, State2};
- _ ->
- {false, State}
+ {ok, State2};
+ not_changed ->
Vagabond
Vagabond Jun 8, 2012 Contributor

What is the distinction here between this and the next clause, especially WRT return value?

jtuple
jtuple Jun 8, 2012 Contributor

One provides a reason, one doesn't. The {not_changed, Reason} option was added for use by the new staged clustering stuff to provide better error reporting when a commit fails (because commits happen within a ring transaction). I wasn't going to go and update every use of not_changed everywhere in Riak to provide a reason -- so we kept both return.

You're the one that suggested this approach on Mumble. ;)

Vagabond
Vagabond Jun 8, 2012 Contributor

Ok, didn't realize you weren't going to update all the return values to include a reason.

+ {error, State};
+ {not_changed, Reason} ->
+ {{error, Reason}, State}
end.
%% @private
@@ -246,7 +263,7 @@ maybe_commit_staged(Ring, State=#state{changes=Changes, seed=Seed}) ->
Changes2 = filter_changes(Changes, Ring),
case compute_next_ring(Changes2, Seed, Ring) of
{legacy, _} ->
- ignore;
+ {ignore, legacy};
{ok, NextRing} ->
maybe_commit_staged(Ring, NextRing, State)
end.
@@ -257,10 +274,14 @@ maybe_commit_staged(Ring, NextRing, #state{next_ring=PlannedRing}) ->
IsReady = riak_core_ring:ring_ready(Ring),
IsClaimant = (Claimant == node()),
IsSamePlan = same_plan(PlannedRing, NextRing),
- case IsReady and IsClaimant and IsSamePlan of
- false ->
+ case {IsReady, IsClaimant, IsSamePlan} of
+ {false, _, _} ->
+ {ignore, ring_not_ready};
+ {_, false, _} ->
ignore;
- true ->
+ {_, _, false} ->
+ {ignore, plan_changed};
+ _ ->
NewRing = riak_core_ring:increment_vclock(Claimant, NextRing),
{new_ring, NewRing}
end.
@@ -439,9 +460,7 @@ compute_all_next_rings(Changes, Seed, Ring, Acc) ->
legacy;
{ok, NextRing} ->
Acc2 = [{Ring, NextRing}|Acc],
- OwnersChanged = (riak_core_ring:all_owners(Ring) /= riak_core_ring:all_owners(NextRing)),
- HasPending = (riak_core_ring:pending_changes(NextRing) /= []),
- case OwnersChanged or HasPending of
+ case not same_plan(Ring, NextRing) of
true ->
FutureRing = riak_core_ring:future_ring(NextRing),
compute_all_next_rings([], Seed, FutureRing, Acc2);
View
@@ -21,7 +21,8 @@
-module(riak_core_console).
-export([member_status/1, ring_status/1, print_member_status/2,
stage_leave/1, stage_remove/1, stage_replace/1,
- stage_force_replace/1, print_staged/1, commit_staged/1, clear_staged/1]).
+ stage_force_replace/1, print_staged/1, commit_staged/1,
+ clear_staged/1, transfer_limit/1]).
member_status([]) ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
@@ -330,14 +331,18 @@ is_claimant_error(Node, Action) ->
print_staged([]) ->
case riak_core_claimant:plan() of
- legacy ->
+ {error, legacy} ->
io:format("The cluster is running in legacy mode and does not "
"support plan/commit.~n");
+ {error, ring_not_ready} ->
+ io:format("Cannot plan until cluster state has converged.~n"
+ "Check 'Ring Ready' in 'riak-admin ring_status'~n");
{ok, Changes, NextRings} ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
%% The last next ring is always the final ring after all changes,
%% which is uninteresting to show. Only print N-1 rings.
- NextRings2 = lists:sublist(NextRings, length(NextRings)-1),
+ NextRings2 = lists:sublist(NextRings,
+ erlang:max(0, length(NextRings)-1)),
print_plan(Changes, Ring, NextRings2),
ok
end.
@@ -452,10 +457,60 @@ tally(Changes) ->
commit_staged([]) ->
case riak_core_claimant:commit() of
- true ->
+ ok ->
io:format("Cluster changes committed~n");
- false ->
- io:format("Unable to commit cluster changes. Plan may have "
- "changed, please verify the plan and try to commit "
- "again~n")
+ {error, legacy} ->
+ io:format("The cluster is running in legacy mode and does not "
+ "support plan/commit.~n");
+ {error, nothing_planned} ->
+ io:format("You must verify the plan with "
+ "'riak-admin cluster plan' before committing~n");
+ {error, ring_not_ready} ->
+ io:format("Cannot commit until cluster state has converged.~n"
+ "Check 'Ring Ready' in 'riak-admin ring_status'~n");
+ {error, plan_changed} ->
+ io:format("The plan has changed. Verify with "
+ "'riak-admin cluster plan' before committing~n");
+ _ ->
+ io:format("Unable to commit cluster changes. Plan "
+ "may have changed, please verify the~n"
+ "plan and try to commit again~n")
end.
+
+transfer_limit([]) ->
+ {Limits, Down} =
+ riak_core_util:rpc_every_member_ann(riak_core_handoff_manager,
+ get_concurrency, [], 5000),
+ io:format("~s~n", [string:centre(" Transfer Limit ", 79, $=)]),
+ io:format("Limit Node~n"),
+ io:format("~79..-s~n", [""]),
+ lists:foreach(fun({Node, Limit}) ->
+ io:format("~5b ~p~n", [Limit, Node])
+ end, Limits),
+ lists:foreach(fun(Node) ->
+ io:format("(offline) ~p~n", [Node])
+ end, Down),
+ io:format("~79..-s~n", [""]),
+ io:format("Note: You can change transfer limits with "
+ "'riak-admin transfer_limit <limit>'~n"
+ " and 'riak-admin transfer_limit <node> <limit>'~n"),
+ ok;
+transfer_limit([LimitStr]) ->
+ Limit = list_to_integer(LimitStr),
+ io:format("Setting transfer limit to ~b across the cluster~n", [Limit]),
+ {_, Down} =
+ riak_core_util:rpc_every_member_ann(riak_core_handoff_manager,
+ set_concurrency, [Limit], 5000),
+ (Down == []) orelse
+ io:format("Failed to set limit for: ~p~n", [Down]),
+ ok;
+transfer_limit([NodeStr, LimitStr]) ->
+ Node = list_to_atom(NodeStr),
+ Limit = list_to_integer(LimitStr),
+ case rpc:call(Node, riak_core_handoff_manager, set_concurrency, [Limit]) of
+ {badrpc, _} ->
+ io:format("Failed to set transfer limit for ~p~n", [Node]);
+ _ ->
+ io:format("Set transfer limit for ~p to ~b~n", [Node, Limit])
+ end,
+ ok.
@@ -39,6 +39,7 @@
status/1,
status_update/2,
set_concurrency/1,
+ get_concurrency/0,
kill_handoffs/0
]).
@@ -104,6 +105,9 @@ status_update(ModIdx, Stats) ->
set_concurrency(Limit) ->
gen_server:call(?MODULE,{set_concurrency,Limit}).
+get_concurrency() ->
+ gen_server:call(?MODULE, get_concurrency).
+
kill_handoffs() ->
set_concurrency(0).
@@ -159,7 +163,11 @@ handle_call({set_concurrency,Limit},_From,State=#state{handoffs=HS}) ->
{reply, ok, State};
false ->
{reply, ok, State}
- end.
+ end;
+
+handle_call(get_concurrency, _From, State) ->
+ Concurrency = get_concurrency_limit(),
+ {reply, Concurrency, State}.
handle_cast({del_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) ->
{noreply, State#state{excl=ordsets:del_element({Mod, Idx}, Excl)}};
View
@@ -763,7 +763,12 @@ future_ring(State) ->
Leaving = get_members(FutureState?CHSTATE.members, [leaving]),
FutureState2 =
lists:foldl(fun(Node, StateAcc) ->
- riak_core_ring:exit_member(Node, StateAcc, Node)
+ case indices(StateAcc, Node) of
+ [] ->
+ riak_core_ring:exit_member(Node, StateAcc, Node);
+ _ ->
+ StateAcc
+ end
end, FutureState, Leaving),
FutureState2?CHSTATE{next=[]}.
@@ -256,6 +256,8 @@ handle_call({ring_trans, Fun, Args}, _From, State=#state{raw_ring=Ring}) ->
{reply, {ok, NewRing}, State#state{raw_ring=NewRing}};
ignore ->
{reply, not_changed, State};
+ {ignore, Reason} ->
+ {reply, {not_changed, Reason}, State};
Other ->
lager:error("ring_trans: invalid return value: ~p",
[Other]),

0 comments on commit 9ee2fcf

Please sign in to comment.