Permalink
Browse files

Add new partition claim function and claim simulator

Add riak_core_new_claim:new_wants_claim/2 and new_claim/2.
Merge in claim simulation code provided by Greg Nelson (grourk@dropcam.com).
Add pretty_print function to riak_core_ring.

The new claim function is designed to reduce the number of partition transfers
that occur when rebalancing the ring, aiming as close to possible for minimal
consistent hashing.
  • Loading branch information...
1 parent d2dec31 commit 86d506fbd98ff2fdaaf936458807c511c952c499 @jtuple jtuple committed Sep 20, 2011
Showing with 396 additions and 16 deletions.
  1. +1 −0 ebin/riak_core.app
  2. +28 −15 src/riak_core_claim.erl
  3. +189 −0 src/riak_core_new_claim.erl
  4. +56 −1 src/riak_core_ring.erl
  5. +122 −0 test/claim_simulation.erl
View
@@ -21,6 +21,7 @@
riak_core_bucket,
riak_core_cinfo_core,
riak_core_claim,
+ riak_core_new_claim,
riak_core_config,
riak_core_console,
riak_core_coverage_fsm,
View
@@ -56,7 +56,9 @@
never_wants_claim/1, random_choose_claim/1]).
-export([default_choose_claim/2,
default_wants_claim/2,
- claim_rebalance_n/2]).
+ claim_rebalance_n/2,
+ meets_target_n/2,
+ diagonal_stripe/2]).
-ifdef(TEST).
-ifdef(EQC).
@@ -71,22 +73,26 @@
default_wants_claim(Ring) ->
default_wants_claim(Ring, node()).
-default_wants_claim(Ring0, Node) ->
- Ring = riak_core_ring:upgrade(Ring0),
+get_member_count(Ring, Node) ->
%% Determine how many nodes are involved with the ring; if the requested
%% node is not yet part of the ring, include it in the count.
AllMembers = riak_core_ring:claiming_members(Ring),
case lists:member(Node, AllMembers) of
true ->
- Mval = length(AllMembers);
+ length(AllMembers);
false ->
- Mval = length(AllMembers) + 1
- end,
+ length(AllMembers) + 1
+ end.
+get_expected_partitions(Ring, Node) ->
+ riak_core_ring:num_partitions(Ring) div get_member_count(Ring, Node).
+
+default_wants_claim(Ring0, Node) ->
+ Ring = riak_core_ring:upgrade(Ring0),
%% Calculate the expected # of partitions for a perfectly balanced ring. Use
%% this expectation to determine the relative balance of the ring. If the
%% ring isn't within +-2 partitions on all nodes, we need to rebalance.
- ExpParts = riak_core_ring:num_partitions(Ring) div Mval,
+ ExpParts = get_expected_partitions(Ring, Node),
PCounts = lists:foldl(fun({_Index, ANode}, Acc) ->
orddict:update_counter(ANode, 1, Acc)
end, [{Node, 0}], riak_core_ring:all_owners(Ring)),
@@ -235,8 +241,16 @@ find_biggest_hole(Mine) ->
claim_rebalance_n(Ring0, Node) ->
Ring = riak_core_ring:upgrade(Ring0),
- %% diagonal stripes guarantee most disperse data
Nodes = lists:usort([Node|riak_core_ring:claiming_members(Ring)]),
+ Zipped = diagonal_stripe(Ring, Nodes),
+ lists:foldl(fun({P, N}, Acc) ->
+ riak_core_ring:transfer_node(P, N, Acc)
+ end,
+ Ring,
+ Zipped).
+
+diagonal_stripe(Ring, Nodes) ->
+ %% diagonal stripes guarantee most disperse data
Partitions = lists:sort([ I || {I, _} <- riak_core_ring:all_owners(Ring) ]),
Zipped = lists:zip(Partitions,
lists:sublist(
@@ -245,16 +259,15 @@ claim_rebalance_n(Ring0, Node) ->
1+(length(Partitions) div length(Nodes)),
Nodes)),
1, length(Partitions))),
- lists:foldl(fun({P, N}, Acc) ->
- riak_core_ring:transfer_node(P, N, Acc)
- end,
- Ring,
- Zipped).
+ Zipped.
+
+random_choose_claim(Ring) ->
+ random_choose_claim(Ring, node()).
-random_choose_claim(Ring0) ->
+random_choose_claim(Ring0, Node) ->
Ring = riak_core_ring:upgrade(Ring0),
riak_core_ring:transfer_node(riak_core_ring:random_other_index(Ring),
- node(), Ring).
+ Node, Ring).
%% @spec never_wants_claim(riak_core_ring()) -> no
%% @doc For use by nodes that should not claim any partitions.
View
@@ -0,0 +1,189 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_core: Core Riak Application
+%%
+%% Copyright (c) 2007-2011 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(riak_core_new_claim).
+-export([new_wants_claim/2, new_claim/2]).
+
+new_wants_claim(Ring, Node) ->
+ Active = riak_core_ring:claiming_members(Ring),
+ Owners = riak_core_ring:all_owners(Ring),
+ Counts = get_counts(Active, Owners),
+ NodeCount = erlang:length(Active),
+ RingSize = riak_core_ring:num_partitions(Ring),
+ Avg = RingSize div NodeCount,
+ Count = proplists:get_value(Node, Counts, 0),
+ case Count < Avg of
+ false ->
+ no;
+ true ->
+ {yes, Avg - Count}
+ end.
+
+new_claim(Ring, Node) ->
+ Active = riak_core_ring:claiming_members(Ring),
+ Owners = riak_core_ring:all_owners(Ring),
+ Counts = get_counts(Active, Owners),
+ RingSize = riak_core_ring:num_partitions(Ring),
+ NodeCount = erlang:length(Active),
+ Avg = RingSize div NodeCount,
+ Deltas = [{Member, Avg - Count} || {Member, Count} <- Counts],
+ {_, Want} = lists:keyfind(Node, 1, Deltas),
+ TargetN = app_helper:get_env(riak_core, target_n_val),
+ AllIndices = lists:zip(lists:seq(0, length(Owners)-1),
+ [Idx || {Idx, _} <- Owners]),
+
+ EnoughNodes =
+ (NodeCount > TargetN)
+ or ((NodeCount == TargetN) and (RingSize rem TargetN =:= 0)),
+
+ case EnoughNodes of
+ true ->
+ %% If we have enough nodes to meet target_n, then we prefer to
+ %% claim indices that are currently causing violations, and then
+ %% fallback to indices in linear order. The filtering steps below
+ %% will ensure no new violations are introduced.
+ Violated = lists:flatten(find_violations(Ring, TargetN)),
+ Violated2 = [lists:keyfind(Idx, 2, AllIndices) || Idx <- Violated],
+ Indices = Violated2 ++ (AllIndices -- Violated2);
+ false ->
+ %% If we do not have enough nodes to meet target_n, then we prefer
+ %% claiming the same indices that would occur during a
+ %% re-diagonalization of the ring with target_n nodes, falling
+ %% back to linear offsets off these preferred indices when the
+ %% number of indices desired is less than the computed set.
+ Padding = lists:duplicate(TargetN, undefined),
+ Expanded = lists:sublist(Active ++ Padding, TargetN),
+ PreferredClaim = riak_core_claim:diagonal_stripe(Ring, Expanded),
+ PreferredNth = [begin
+ {Nth, Idx} = lists:keyfind(Idx, 2, AllIndices),
+ Nth
+ end || {Idx,Owner} <- PreferredClaim,
+ Owner =:= Node],
+ Offsets = lists:seq(0, RingSize div length(PreferredNth)),
+ AllNth = lists:sublist([(X+Y) rem RingSize || Y <- Offsets,
+ X <- PreferredNth],
+ RingSize),
+ Indices = [lists:keyfind(Nth, 1, AllIndices) || Nth <- AllNth]
+ end,
+
+ %% Filter out indices that conflict with the node's existing ownership
+ Indices2 = prefilter_violations(Ring, Node, AllIndices, Indices,
+ TargetN, RingSize),
+ %% Claim indices from the remaining candidate set
+ Claim = select_indices(Owners, Deltas, Indices2, TargetN, RingSize),
+ Claim2 = lists:sublist(Claim, Want),
+ NewRing = lists:foldl(fun(Idx, Ring0) ->
+ riak_core_ring:transfer_node(Idx, Node, Ring0)
+ end, Ring, Claim2),
+
+ case {EnoughNodes, riak_core_claim:meets_target_n(NewRing, TargetN)} of
+ {false, _} ->
+ NewRing;
+ {true, {true, _}} ->
+ NewRing;
+ {true, false} ->
+ %% Last resort, fallback to re-diagonalization
+ riak_core_claim:claim_rebalance_n(Ring, Node)
+ end.
+
+%% Counts up the number of partitions owned by each node
+get_counts(Nodes, Ring) ->
+ Empty = [{Node, 0} || Node <- Nodes],
+ Counts = lists:foldl(fun({_Idx, Node}, Counts) ->
+ case lists:member(Node, Nodes) of
+ true ->
+ dict:update_counter(Node, 1, Counts);
+ false ->
+ Counts
+ end
+ end, dict:from_list(Empty), Ring),
+ dict:to_list(Counts).
+
+%% Filter out candidate indices that would violate target_n given a node's
+%% current partition ownership.
+prefilter_violations(Ring, Node, AllIndices, Indices, TargetN, RingSize) ->
+ CurrentIndices = riak_core_ring:indices(Ring, Node),
+ CurrentNth = [lists:keyfind(Idx, 2, AllIndices) || Idx <- CurrentIndices],
+ [{Nth, Idx} || {Nth, Idx} <- Indices,
+ lists:all(fun(CNth) ->
+ spaced_by_n(CNth, Nth, TargetN, RingSize)
+ end, CurrentNth)].
+
+%% Select indices from a given candidate set, according to two goals.
+%% 1. Ensure greedy/local target_n spacing between indices. Note that this
+%% goal intentionally does not reject overall target_n violations.
+%%
+%% 2. Select indices based on the delta between current ownership and
+%% expected ownership. In other words, if A owns 5 partitions and
+%% the desired ownership is 3, then we try to claim at most 2 partitions
+%% from A.
+select_indices(Owners, Deltas, Indices, TargetN, RingSize) ->
+ OwnerDT = dict:from_list(Owners),
+ {FirstNth, _} = hd(Indices),
+ {Claim, _, _} =
+ lists:foldl(fun({Nth, Idx}, {Out, LastNth, DeltaDT}) ->
+ Owner = dict:fetch(Idx, OwnerDT),
+ Delta = dict:fetch(Owner, DeltaDT),
+ First = (LastNth =:= Nth),
+ MeetsTN = spaced_by_n(LastNth, Nth, TargetN,
+ RingSize),
+ case (Delta < 0) and (First or MeetsTN) of
+ true ->
+ NextDeltaDT =
+ dict:update_counter(Owner, 1, DeltaDT),
+ {[Idx|Out], Nth, NextDeltaDT};
+ false ->
+ {Out, LastNth, DeltaDT}
+ end
+ end,
+ {[], FirstNth, dict:from_list(Deltas)},
+ Indices),
+ lists:reverse(Claim).
+
+%% Determines indices that violate the given target_n spacing property.
+find_violations(Ring, TargetN) ->
+ Owners = riak_core_ring:all_owners(Ring),
+ Suffix = lists:sublist(Owners, TargetN-1),
+ Owners2 = Owners ++ Suffix,
+ %% Use a sliding window to determine violations
+ {Bad, _} = lists:foldl(fun(P={Idx, Owner}, {Out, Window}) ->
+ Window2 = lists:sublist([P|Window], TargetN-1),
+ case lists:keyfind(Owner, 2, Window) of
+ {PrevIdx, Owner} ->
+ {[[PrevIdx, Idx] | Out], Window2};
+ false ->
+ {Out, Window2}
+ end
+ end, {[], []}, Owners2),
+ lists:reverse(Bad).
+
+%% Determine if two positions in the ring meet target_n spacing.
+spaced_by_n(NthA, NthB, TargetN, RingSize) ->
+ case NthA > NthB of
+ true ->
+ NFwd = NthA - NthB,
+ NBack = NthB - NthA + RingSize;
+ false ->
+ NFwd = NthA - NthB + RingSize,
+ NBack = NthB - NthA
+ end,
+ (NFwd >= TargetN) and (NBack >= TargetN).
View
@@ -60,6 +60,7 @@
downgrade/2,
claimant/1,
member_status/2,
+ pretty_print/2,
all_member_status/1,
update_member_meta/5,
get_member_meta/3,
@@ -643,6 +644,58 @@ handoff_complete(State, Idx, Mod) ->
ring_changed(Node, State) ->
internal_ring_changed(Node, State).
+pretty_print(Ring, Opts) ->
+ OptNumeric = lists:member(numeric, Opts),
+ OptLegend = lists:member(legend, Opts),
+ Out = proplists:get_value(out, Opts, standard_io),
+ TargetN = proplists:get_value(target_n, Opts,
+ app_helper:get_env(riak_core, target_n_val)),
+
+ Owners = riak_core_ring:all_members(Ring),
+ Indices = riak_core_ring:all_owners(Ring),
+ RingSize = length(Indices),
+ Numeric = OptNumeric or (length(Owners) > 26),
+ case Numeric of
+ true ->
+ Ids = [integer_to_list(N) || N <- lists:seq(1, length(Owners))];
+ false ->
+ Ids = [[Letter] || Letter <- lists:seq(97, 96+length(Owners))]
+ end,
+ Names = lists:zip(Owners, Ids),
+ case OptLegend of
+ true ->
+ io:format(Out, "~36..=s Nodes ~36..=s~n", ["", ""]),
+ [begin
+ NodeIndices = [Idx || {Idx,Owner} <- Indices,
+ Owner =:= Node],
+ RingPercent = length(NodeIndices) * 100 / RingSize,
+ io:format(Out, "Node ~s: ~5.1f% ~s~n",
+ [Name, RingPercent, Node])
+ end || {Node, Name} <- Names],
+ io:format(Out, "~36..=s Ring ~37..=s~n", ["", ""]);
+ false ->
+ ok
+ end,
+
+ case Numeric of
+ true ->
+ Ownership =
+ [orddict:fetch(Owner, Names) || {_Idx, Owner} <- Indices],
+ io:format(Out, "~p~n", [Ownership]);
+ false ->
+ lists:foldl(fun({_, Owner}, N) ->
+ Name = orddict:fetch(Owner, Names),
+ case N rem TargetN of
+ 0 ->
+ io:format(Out, "~s|", [[Name]]);
+ _ ->
+ io:format(Out, "~s", [[Name]])
+ end,
+ N+1
+ end, 1, Indices),
+ io:format(Out, "~n", [])
+ end.
+
%% ===================================================================
%% Legacy reconciliation
%% ===================================================================
@@ -895,7 +948,9 @@ update_ring(CNode, CState) ->
%% Rebalance the ring as necessary
Next3 = rebalance_ring(CNode, CState4),
-
+ lager:debug("Pending ownership transfers: ~b~n",
+ [length(pending_changes(CState4))]),
+
%% Remove transfers to/from down nodes
Next4 = handle_down_nodes(CState4, Next3),
Oops, something went wrong.

0 comments on commit 86d506f

Please sign in to comment.