Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #91 from basho/az447-choose-claim-1.0

  • Loading branch information...
commit 827a5a582a5c8228a210301688922e80b19ccdf1 2 parents d2dec31 + c0e29ad
@jtuple jtuple authored
View
1  ebin/riak_core.app
@@ -21,6 +21,7 @@
riak_core_bucket,
riak_core_cinfo_core,
riak_core_claim,
+ riak_core_new_claim,
riak_core_config,
riak_core_console,
riak_core_coverage_fsm,
View
43 src/riak_core_claim.erl
@@ -56,7 +56,9 @@
never_wants_claim/1, random_choose_claim/1]).
-export([default_choose_claim/2,
default_wants_claim/2,
- claim_rebalance_n/2]).
+ claim_rebalance_n/2,
+ meets_target_n/2,
+ diagonal_stripe/2]).
-ifdef(TEST).
-ifdef(EQC).
@@ -71,22 +73,26 @@
default_wants_claim(Ring) ->
default_wants_claim(Ring, node()).
-default_wants_claim(Ring0, Node) ->
- Ring = riak_core_ring:upgrade(Ring0),
+get_member_count(Ring, Node) ->
%% Determine how many nodes are involved with the ring; if the requested
%% node is not yet part of the ring, include it in the count.
AllMembers = riak_core_ring:claiming_members(Ring),
case lists:member(Node, AllMembers) of
true ->
- Mval = length(AllMembers);
+ length(AllMembers);
false ->
- Mval = length(AllMembers) + 1
- end,
+ length(AllMembers) + 1
+ end.
+get_expected_partitions(Ring, Node) ->
+ riak_core_ring:num_partitions(Ring) div get_member_count(Ring, Node).
+
+default_wants_claim(Ring0, Node) ->
+ Ring = riak_core_ring:upgrade(Ring0),
%% Calculate the expected # of partitions for a perfectly balanced ring. Use
%% this expectation to determine the relative balance of the ring. If the
%% ring isn't within +-2 partitions on all nodes, we need to rebalance.
- ExpParts = riak_core_ring:num_partitions(Ring) div Mval,
+ ExpParts = get_expected_partitions(Ring, Node),
PCounts = lists:foldl(fun({_Index, ANode}, Acc) ->
orddict:update_counter(ANode, 1, Acc)
end, [{Node, 0}], riak_core_ring:all_owners(Ring)),
@@ -235,8 +241,16 @@ find_biggest_hole(Mine) ->
claim_rebalance_n(Ring0, Node) ->
Ring = riak_core_ring:upgrade(Ring0),
- %% diagonal stripes guarantee most disperse data
Nodes = lists:usort([Node|riak_core_ring:claiming_members(Ring)]),
+ Zipped = diagonal_stripe(Ring, Nodes),
+ lists:foldl(fun({P, N}, Acc) ->
+ riak_core_ring:transfer_node(P, N, Acc)
+ end,
+ Ring,
+ Zipped).
+
+diagonal_stripe(Ring, Nodes) ->
+ %% diagonal stripes guarantee most disperse data
Partitions = lists:sort([ I || {I, _} <- riak_core_ring:all_owners(Ring) ]),
Zipped = lists:zip(Partitions,
lists:sublist(
@@ -245,16 +259,15 @@ claim_rebalance_n(Ring0, Node) ->
1+(length(Partitions) div length(Nodes)),
Nodes)),
1, length(Partitions))),
- lists:foldl(fun({P, N}, Acc) ->
- riak_core_ring:transfer_node(P, N, Acc)
- end,
- Ring,
- Zipped).
+ Zipped.
+
+random_choose_claim(Ring) ->
+ random_choose_claim(Ring, node()).
-random_choose_claim(Ring0) ->
+random_choose_claim(Ring0, Node) ->
Ring = riak_core_ring:upgrade(Ring0),
riak_core_ring:transfer_node(riak_core_ring:random_other_index(Ring),
- node(), Ring).
+ Node, Ring).
%% @spec never_wants_claim(riak_core_ring()) -> no
%% @doc For use by nodes that should not claim any partitions.
View
194 src/riak_core_new_claim.erl
@@ -0,0 +1,194 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_core: Core Riak Application
+%%
+%% Copyright (c) 2007-2011 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(riak_core_new_claim).
+-export([new_wants_claim/2, new_choose_claim/2]).
+
+new_wants_claim(Ring, Node) ->
+ Active = riak_core_ring:claiming_members(Ring),
+ Owners = riak_core_ring:all_owners(Ring),
+ Counts = get_counts(Active, Owners),
+ NodeCount = erlang:length(Active),
+ RingSize = riak_core_ring:num_partitions(Ring),
+ Avg = RingSize div NodeCount,
+ Count = proplists:get_value(Node, Counts, 0),
+ case Count < Avg of
+ false ->
+ no;
+ true ->
+ {yes, Avg - Count}
+ end.
+
+new_choose_claim(Ring, Node) ->
+ Active = riak_core_ring:claiming_members(Ring),
+ Owners = riak_core_ring:all_owners(Ring),
+ Counts = get_counts(Active, Owners),
+ RingSize = riak_core_ring:num_partitions(Ring),
+ NodeCount = erlang:length(Active),
+ Avg = RingSize div NodeCount,
+ Deltas = [{Member, Avg - Count} || {Member, Count} <- Counts],
+ {_, Want} = lists:keyfind(Node, 1, Deltas),
+ TargetN = app_helper:get_env(riak_core, target_n_val),
+ AllIndices = lists:zip(lists:seq(0, length(Owners)-1),
+ [Idx || {Idx, _} <- Owners]),
+
+ EnoughNodes =
+ (NodeCount > TargetN)
+ or ((NodeCount == TargetN) and (RingSize rem TargetN =:= 0)),
+
+ case EnoughNodes of
+ true ->
+ %% If we have enough nodes to meet target_n, then we prefer to
+ %% claim indices that are currently causing violations, and then
+ %% fallback to indices in linear order. The filtering steps below
+ %% will ensure no new violations are introduced.
+ Violated = lists:flatten(find_violations(Ring, TargetN)),
+ Violated2 = [lists:keyfind(Idx, 2, AllIndices) || Idx <- Violated],
+ Indices = Violated2 ++ (AllIndices -- Violated2);
+ false ->
+ %% If we do not have enough nodes to meet target_n, then we prefer
+ %% claiming the same indices that would occur during a
+ %% re-diagonalization of the ring with target_n nodes, falling
+ %% back to linear offsets off these preferred indices when the
+ %% number of indices desired is less than the computed set.
+ Padding = lists:duplicate(TargetN, undefined),
+ Expanded = lists:sublist(Active ++ Padding, TargetN),
+ PreferredClaim = riak_core_claim:diagonal_stripe(Ring, Expanded),
+ PreferredNth = [begin
+ {Nth, Idx} = lists:keyfind(Idx, 2, AllIndices),
+ Nth
+ end || {Idx,Owner} <- PreferredClaim,
+ Owner =:= Node],
+ Offsets = lists:seq(0, RingSize div length(PreferredNth)),
+ AllNth = lists:sublist([(X+Y) rem RingSize || Y <- Offsets,
+ X <- PreferredNth],
+ RingSize),
+ Indices = [lists:keyfind(Nth, 1, AllIndices) || Nth <- AllNth]
+ end,
+
+ %% Filter out indices that conflict with the node's existing ownership
+ Indices2 = prefilter_violations(Ring, Node, AllIndices, Indices,
+ TargetN, RingSize),
+ %% Claim indices from the remaining candidate set
+ Claim = select_indices(Owners, Deltas, Indices2, TargetN, RingSize),
+ Claim2 = lists:sublist(Claim, Want),
+ NewRing = lists:foldl(fun(Idx, Ring0) ->
+ riak_core_ring:transfer_node(Idx, Node, Ring0)
+ end, Ring, Claim2),
+
+ RingChanged = ([] /= Claim2),
+ RingMeetsTargetN = riak_core_claim:meets_target_n(NewRing, TargetN),
+ case {RingChanged, EnoughNodes, RingMeetsTargetN} of
+ {false, _, _} ->
+ %% Unable to claim, fallback to re-diagonalization
+ riak_core_claim:claim_rebalance_n(Ring, Node);
+ {_, true, false} ->
+ %% Failed to meet target_n, fallback to re-diagonalization
+ riak_core_claim:claim_rebalance_n(Ring, Node);
+ _ ->
+ NewRing
+ end.
+
+%% Counts up the number of partitions owned by each node
+get_counts(Nodes, Ring) ->
+ Empty = [{Node, 0} || Node <- Nodes],
+ Counts = lists:foldl(fun({_Idx, Node}, Counts) ->
+ case lists:member(Node, Nodes) of
+ true ->
+ dict:update_counter(Node, 1, Counts);
+ false ->
+ Counts
+ end
+ end, dict:from_list(Empty), Ring),
+ dict:to_list(Counts).
+
+%% Filter out candidate indices that would violate target_n given a node's
+%% current partition ownership.
+prefilter_violations(Ring, Node, AllIndices, Indices, TargetN, RingSize) ->
+ CurrentIndices = riak_core_ring:indices(Ring, Node),
+ CurrentNth = [lists:keyfind(Idx, 2, AllIndices) || Idx <- CurrentIndices],
+ [{Nth, Idx} || {Nth, Idx} <- Indices,
+ lists:all(fun({CNth, _}) ->
+ spaced_by_n(CNth, Nth, TargetN, RingSize)
+ end, CurrentNth)].
+
+%% Select indices from a given candidate set, according to two goals.
+%% 1. Ensure greedy/local target_n spacing between indices. Note that this
+%% goal intentionally does not reject overall target_n violations.
+%%
+%% 2. Select indices based on the delta between current ownership and
+%% expected ownership. In other words, if A owns 5 partitions and
+%% the desired ownership is 3, then we try to claim at most 2 partitions
+%% from A.
+select_indices(_Owners, _Deltas, [], _TargetN, _RingSize) ->
+ [];
+select_indices(Owners, Deltas, Indices, TargetN, RingSize) ->
+ OwnerDT = dict:from_list(Owners),
+ {FirstNth, _} = hd(Indices),
+ {Claim, _, _} =
+ lists:foldl(fun({Nth, Idx}, {Out, LastNth, DeltaDT}) ->
+ Owner = dict:fetch(Idx, OwnerDT),
+ Delta = dict:fetch(Owner, DeltaDT),
+ First = (LastNth =:= Nth),
+ MeetsTN = spaced_by_n(LastNth, Nth, TargetN,
+ RingSize),
+ case (Delta < 0) and (First or MeetsTN) of
+ true ->
+ NextDeltaDT =
+ dict:update_counter(Owner, 1, DeltaDT),
+ {[Idx|Out], Nth, NextDeltaDT};
+ false ->
+ {Out, LastNth, DeltaDT}
+ end
+ end,
+ {[], FirstNth, dict:from_list(Deltas)},
+ Indices),
+ lists:reverse(Claim).
+
+%% Determines indices that violate the given target_n spacing property.
+find_violations(Ring, TargetN) ->
+ Owners = riak_core_ring:all_owners(Ring),
+ Suffix = lists:sublist(Owners, TargetN-1),
+ Owners2 = Owners ++ Suffix,
+ %% Use a sliding window to determine violations
+ {Bad, _} = lists:foldl(fun(P={Idx, Owner}, {Out, Window}) ->
+ Window2 = lists:sublist([P|Window], TargetN-1),
+ case lists:keyfind(Owner, 2, Window) of
+ {PrevIdx, Owner} ->
+ {[[PrevIdx, Idx] | Out], Window2};
+ false ->
+ {Out, Window2}
+ end
+ end, {[], []}, Owners2),
+ lists:reverse(Bad).
+
+%% Determine if two positions in the ring meet target_n spacing.
+spaced_by_n(NthA, NthB, TargetN, RingSize) ->
+ case NthA > NthB of
+ true ->
+ NFwd = NthA - NthB,
+ NBack = NthB - NthA + RingSize;
+ false ->
+ NFwd = NthA - NthB + RingSize,
+ NBack = NthB - NthA
+ end,
+ (NFwd >= TargetN) and (NBack >= TargetN).
View
57 src/riak_core_ring.erl
@@ -60,6 +60,7 @@
downgrade/2,
claimant/1,
member_status/2,
+ pretty_print/2,
all_member_status/1,
update_member_meta/5,
get_member_meta/3,
@@ -643,6 +644,58 @@ handoff_complete(State, Idx, Mod) ->
ring_changed(Node, State) ->
internal_ring_changed(Node, State).
+pretty_print(Ring, Opts) ->
+ OptNumeric = lists:member(numeric, Opts),
+ OptLegend = lists:member(legend, Opts),
+ Out = proplists:get_value(out, Opts, standard_io),
+ TargetN = proplists:get_value(target_n, Opts,
+ app_helper:get_env(riak_core, target_n_val)),
+
+ Owners = riak_core_ring:all_members(Ring),
+ Indices = riak_core_ring:all_owners(Ring),
+ RingSize = length(Indices),
+ Numeric = OptNumeric or (length(Owners) > 26),
+ case Numeric of
+ true ->
+ Ids = [integer_to_list(N) || N <- lists:seq(1, length(Owners))];
+ false ->
+ Ids = [[Letter] || Letter <- lists:seq(97, 96+length(Owners))]
+ end,
+ Names = lists:zip(Owners, Ids),
+ case OptLegend of
+ true ->
+ io:format(Out, "~36..=s Nodes ~36..=s~n", ["", ""]),
+ [begin
+ NodeIndices = [Idx || {Idx,Owner} <- Indices,
+ Owner =:= Node],
+ RingPercent = length(NodeIndices) * 100 / RingSize,
+ io:format(Out, "Node ~s: ~5.1f% ~s~n",
+ [Name, RingPercent, Node])
+ end || {Node, Name} <- Names],
+ io:format(Out, "~36..=s Ring ~37..=s~n", ["", ""]);
+ false ->
+ ok
+ end,
+
+ case Numeric of
+ true ->
+ Ownership =
+ [orddict:fetch(Owner, Names) || {_Idx, Owner} <- Indices],
+ io:format(Out, "~p~n", [Ownership]);
+ false ->
+ lists:foldl(fun({_, Owner}, N) ->
+ Name = orddict:fetch(Owner, Names),
+ case N rem TargetN of
+ 0 ->
+ io:format(Out, "~s|", [[Name]]);
+ _ ->
+ io:format(Out, "~s", [[Name]])
+ end,
+ N+1
+ end, 1, Indices),
+ io:format(Out, "~n", [])
+ end.
+
%% ===================================================================
%% Legacy reconciliation
%% ===================================================================
@@ -895,7 +948,9 @@ update_ring(CNode, CState) ->
%% Rebalance the ring as necessary
Next3 = rebalance_ring(CNode, CState4),
-
+ lager:debug("Pending ownership transfers: ~b~n",
+ [length(pending_changes(CState4))]),
+
%% Remove transfers to/from down nodes
Next4 = handle_down_nodes(CState4, Next3),
View
135 test/claim_simulation.erl
@@ -0,0 +1,135 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2011 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% Based on an earlier edition provided by Greg Nelson and Ryan Zezeski:
+%% https://gist.github.com/992317
+
+-module(claim_simulation).
+-compile(export_all).
+
+%%-define(SIMULATE,1).
+-ifdef(SIMULATE).
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+-define(node(N), list_to_atom(lists:flatten(io_lib:format("riak@node~w",[N])))).
+-define(get(K, PL, D), proplists:get_value(K, PL, D)).
+
+basic_test_() ->
+ {timeout, 60000, [fun basic_default/0,
+ fun basic_new/0]}.
+
+basic_default() ->
+ Opts = [{suffix, "_default"},
+ {wc_mf, {riak_core_claim, default_wants_claim}},
+ {cc_mf, {riak_core_claim, default_choose_claim}},
+ {target_n_val, 4},
+ {ring_size, 32},
+ {node_count, 8},
+ {node_capacity, 24}
+ ],
+ run(Opts).
+
+basic_new() ->
+ Opts = [{suffix, "_new"},
+ {wc_mf, {riak_core_new_claim, new_wants_claim}},
+ {cc_mf, {riak_core_new_claim, new_choose_claim}},
+ {target_n_val, 4},
+ {ring_size, 32},
+ {node_count, 8},
+ {node_capacity, 24}
+ ],
+ run(Opts).
+
+run(Opts) ->
+ application:load(riak_core),
+
+ WCMod = ?get(wc_mf, Opts, {riak_core_claim, default_wants_claim}),
+ CCMod = ?get(cc_mf, Opts, {riak_core_claim, default_choose_claim}),
+ TargetN = ?get(target_n_val, Opts, 4),
+ Suffix = ?get(suffix, Opts, ""),
+
+ application:set_env(riak_core, wants_claim_fun, WCMod),
+ application:set_env(riak_core, choose_claim_fun, CCMod),
+ application:set_env(riak_core, target_n_val, TargetN),
+
+ RingSize = ?get(ring_size, Opts, 2048),
+ NodeCount = ?get(node_count, Opts, 100),
+ NodeCapacity = ?get(node_capacity, Opts, 24), %% in TB
+
+ Ring1 = riak_core_ring:fresh(RingSize, 'riak@node1'),
+ {Rings, _} = lists:mapfoldl(
+ fun(N, Prev) ->
+ Node = ?node(N),
+ R = riak_core_ring:add_member(Node, Prev, Node),
+ Next =
+ riak_core_gossip:claim_until_balanced(R,
+ Node),
+ {Next, Next}
+ end, Ring1, lists:seq(2, NodeCount)),
+
+ Owners1 = riak_core_ring:all_owners(Ring1),
+ Owners = lists:map(fun riak_core_ring:all_owners/1, Rings),
+
+ {Movers, _} =
+ lists:mapfoldl(
+ fun(Curr, Prev) ->
+ Sum = length(lists:filter(fun not_same_node/1,
+ lists:zip(Prev, Curr))),
+ {Sum, Curr}
+ end, Owners1, Owners),
+
+ MeetTargetN = [riak_core_claim:meets_target_n(R, TargetN) || R <- Rings],
+
+ FName = io_lib:format("/tmp/rings_~w_~w~s.txt",
+ [RingSize, NodeCount, Suffix]),
+ {ok, Out} = file:open(FName, [write]),
+ [print_info(Out, O, N, M, lists:nth(N - 1, MeetTargetN),
+ RingSize, NodeCapacity)
+ || {O, M, N} <- lists:zip3(Owners, Movers, lists:seq(2, NodeCount))],
+ lists:foreach(fun(RingOut) ->
+ riak_core_ring:pretty_print(RingOut,
+ [{out, Out},
+ {target_n, TargetN}])
+ end, [Ring1|Rings]).
+
+not_same_node({{P,N}, {P,N}}) -> false;
+not_same_node({{P,_N}, {P,_M}}) -> true.
+
+print_info(Out, Owners, NodeCount, PartitionsMoved, MeetsTargetN,
+ RingSize, NodeCapacity) ->
+ Expect = round(RingSize / NodeCount),
+ ActualPercent = 100 * (PartitionsMoved / RingSize),
+ Terabytes = 0.8 * NodeCount * NodeCapacity * (PartitionsMoved / RingSize),
+ %% Terabytes = Total Amount of Data * Fraction Moved
+
+ F = fun({_,Node}, Acc) ->
+ dict:update_counter(Node, 1, Acc)
+ end,
+ C = lists:keysort(1, dict:to_list(lists:foldl(F, dict:new(), Owners))),
+ io:format(Out,
+ "add node ~p, expect=~p, actual=~p, "
+ "percent=~p, terabytes=~p, meets_target_n=~p~n"
+ "counts: ~p~n~n",
+ [NodeCount, Expect, PartitionsMoved, ActualPercent,
+ Terabytes, MeetsTargetN,C]).
+
+-endif.
+-endif.
Please sign in to comment.
Something went wrong with that request. Please try again.