Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add support for weighted ring claim and add claim dry-run logic

  • Loading branch information...
commit 386f00014e2d8674f567a192499f395040c92e7a 1 parent 864de1e
@jtuple jtuple authored
View
1  ebin/riak_core.app
@@ -22,6 +22,7 @@
riak_core_cinfo_core,
riak_core_claim,
riak_core_new_claim,
+ riak_core_claim_util,
riak_core_config,
riak_core_console,
riak_core_coverage_fsm,
View
30 src/riak_core_claim.erl
@@ -124,6 +124,22 @@ wants_claim_v1(Ring0, Node) ->
no
end.
+calc_expected_claim(Ring, Active, Node) ->
+ RingSize = riak_core_ring:num_partitions(Ring),
+ Weights0 =
+ [case riak_core_ring:get_member_meta(Ring, Member, claim_weight) of
+ undefined ->
+ {Member, 1};
+ Weight ->
+ {Member, Weight}
+ end || Member <- Active],
+ Weights = orddict:from_list(Weights0),
+ Total = lists:foldl(fun({_, Weight}, Total) ->
+ Total + Weight
+ end, 0, Weights),
+ NodeWeight = orddict:fetch(Node, Weights),
+ RingSize * NodeWeight div Total.
+
wants_claim_v2(Ring) ->
wants_claim_v2(Ring, node()).
@@ -131,15 +147,13 @@ wants_claim_v2(Ring, Node) ->
Active = riak_core_ring:claiming_members(Ring),
Owners = riak_core_ring:all_owners(Ring),
Counts = get_counts(Active, Owners),
- NodeCount = erlang:length(Active),
- RingSize = riak_core_ring:num_partitions(Ring),
- Avg = RingSize div NodeCount,
+ Expected = calc_expected_claim(Ring, Active, Node),
Count = proplists:get_value(Node, Counts, 0),
- case Count < Avg of
+ case Count < Expected of
false ->
no;
true ->
- {yes, Avg - Count}
+ {yes, Expected - Count}
end.
%% @deprecated
@@ -171,8 +185,10 @@ choose_claim_v2(Ring, Node) ->
Counts = get_counts(Active, Owners),
RingSize = riak_core_ring:num_partitions(Ring),
NodeCount = erlang:length(Active),
- Avg = RingSize div NodeCount,
- Deltas = [{Member, Avg - Count} || {Member, Count} <- Counts],
+ Deltas = [begin
+ Expected = calc_expected_claim(Ring, Active, Member),
+ {Member, Expected - Count}
+ end || {Member, Count} <- Counts],
{_, Want} = lists:keyfind(Node, 1, Deltas),
TargetN = app_helper:get_env(riak_core, target_n_val),
AllIndices = lists:zip(lists:seq(0, length(Owners)-1),
View
122 src/riak_core_claim_util.erl
@@ -0,0 +1,122 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_core: Core Riak Application
+%%
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(riak_core_claim_util).
+-export([dryrun/1, dryrun_new_weights/1, update_weights/1]).
+
+update_weights(Weights) ->
+ riak_core_ring_manager:ring_trans(
+ fun(Ring, _) ->
+ ThisNode = node(),
+ case riak_core_ring:claimant(Ring) of
+ ThisNode ->
+ NewRing =
+ lists:foldl(fun({Member, Weight}, Ring0) ->
+ set_weight(Member, Weight, Ring0)
+ end, Ring, Weights),
+ {new_ring, NewRing};
+ Claimant ->
+ io:format("You can only change weights from the "
+ "claimant node: ~p~n", [Claimant]),
+ ignore
+ end
+ end, []),
+ ok.
+
+dryrun(Weights) when is_list(Weights) ->
+ Labels = lists:seq(1, length(Weights)),
+ Nodes = lists:zip(Labels, Weights),
+ do_dryrun(Nodes);
+dryrun(NumToAdd) when is_integer(NumToAdd) ->
+ Labels = lists:seq(1, NumToAdd),
+ Nodes = [{Label, 1} || Label <- Labels],
+ do_dryrun(Nodes).
+
+do_dryrun(Nodes) ->
+ ThisNode = node(),
+ {ok, Ring} = riak_core_ring_manager:get_raw_ring(),
+ case riak_core_ring:claimant(Ring) of
+ ThisNode ->
+ do_dryrun(Nodes, Ring);
+ Claimant ->
+ io:format("Dry runs should be run on the claimant node: ~p~n",
+ [Claimant]),
+ ok
+ end.
+
+do_dryrun(Nodes, Ring0) ->
+ case riak_core_ring:pending_changes(Ring0) of
+ [] ->
+ io:format("Current ring:~n"),
+ riak_core_ring:pretty_print(Ring0, [legend]),
+ lists:foldl(
+ fun({N, Weight}, Ring) ->
+ Node =
+ list_to_atom(lists:flatten(io_lib:format("sim~b@basho.com",[N]))),
+ io:format("~nAdding ~p~n", [Node]),
+ Ring2 = riak_core_ring:add_member(Node, Ring, Node),
+ Ring3 = set_weight(Node, Weight, Ring2),
+ Ring4 = dryrun_claim(Ring3),
+ Ring4
+ end, Ring0, Nodes);
+ _ ->
+ io:format("Cannot perform a dryrun on a cluster with pending "
+ "transfers~n")
+ end,
+ ok.
+
+dryrun_new_weights(Weights) ->
+ {ok, Ring} = riak_core_ring_manager:get_raw_ring(),
+ Ring2 = lists:foldl(fun({Node, Weight}, Ring0) ->
+ set_weight(Node, Weight, Ring0)
+ end, Ring, Weights),
+ io:format("Current ring:~n"),
+ riak_core_ring:pretty_print(Ring, [legend]),
+ dryrun_claim(Ring2),
+ ok.
+
+dryrun_claim(Ring) ->
+ Members = riak_core_ring:claiming_members(Ring),
+ Ring2 = lists:foldl(
+ fun(Node, Ring0) ->
+ riak_core_gossip:claim_until_balanced(Ring0, Node)
+ end, Ring, Members),
+ Owners1 = riak_core_ring:all_owners(Ring),
+ Owners2 = riak_core_ring:all_owners(Ring2),
+ Owners3 = lists:zip(Owners1, Owners2),
+ Next = [{Idx, PrevOwner, NewOwner}
+ || {{Idx, PrevOwner}, {Idx, NewOwner}} <- Owners3,
+ PrevOwner /= NewOwner],
+ Tally =
+ lists:foldl(fun({_, PrevOwner, NewOwner}, Tally) ->
+ dict:update_counter({PrevOwner, NewOwner}, 1, Tally)
+ end, dict:new(), Next),
+ riak_core_ring:pretty_print(Ring2, [legend]),
+ io:format("Pending: ~p~n", [length(Next)]),
+ io:format("Check: ~p~n", [riak_core_ring_util:check_ring(Ring2)]),
+ [io:format("~b transfers from ~p to ~p~n", [Count, PrevOwner, NewOwner])
+ || {{PrevOwner, NewOwner}, Count} <- dict:to_list(Tally)],
+ Ring2.
+
+set_weight(Member, Weight, Ring) ->
+ riak_core_ring:update_member_meta(node(), Ring, Member, claim_weight,
+ Weight).
View
20 src/riak_core_gossip.erl
@@ -464,15 +464,27 @@ log_membership_changes(OldRing, NewRing) ->
ok.
claim_until_balanced(Ring, Node) ->
+ claim_until_balanced(Ring, Node, 5).
+
+claim_until_balanced(Ring, Node, Retries) ->
{WMod, WFun} = app_helper:get_env(riak_core, wants_claim_fun),
NeedsIndexes = apply(WMod, WFun, [Ring, Node]),
case NeedsIndexes of
no ->
Ring;
- {yes, _NumToClaim} ->
- {CMod, CFun} = app_helper:get_env(riak_core, choose_claim_fun),
- NewRing = CMod:CFun(Ring, Node),
- claim_until_balanced(NewRing, Node)
+ {yes, NumToClaim} ->
+ case Retries of
+ 0 ->
+ lager:warning("~p wanted to claim ~b more partitions, but "
+ "was unable to does so safely",
+ [Node, NumToClaim]),
+ Ring;
+ _ ->
+ {CMod, CFun} =
+ app_helper:get_env(riak_core, choose_claim_fun),
+ NewRing = CMod:CFun(Ring, Node),
+ claim_until_balanced(NewRing, Node, Retries-1)
+ end
end.
remove_from_cluster(Ring, ExitingNode) ->
Please sign in to comment.
Something went wrong with that request. Please try again.