Skip to content

Commit

Permalink
Default to v2 claim, update QC tests, fix bug in select_indices
Browse files Browse the repository at this point in the history
1. The new default claim is now set to v2.

2. The semantics of wants_claim changed so I had to update the wants_claim
   test.  Essentially the old wants_claim was simply an idicator if the ring
   is inbalanced at all and would return `{yes,0}` if it is.  The new wants_claim
   is more true to the name in that it return `{yes,N}` meaning the node would
   like to calim `N` partitions.

3. Based on the unique nodes property there was an edge case in the situation
   where there is 16 partitions and 15 nodes.  I'm not sure if this edge case
   would appear in other situations.  Anyways, the way select_indices was written
   when the 15th node would go to claim it would determine that there was no safe
   partition it could claim and then would perform a rebalance (diagonalize).
   However a rebalance doesn't make any guarentee about keeping the target_n
   invariant on wrap around.  So you would end up with the last and first partition
   being owned by the same node.  The problem was that select_indies assumed that
   the first owner could give up it's partition `First = (LastNth =:= Nth)` but that
   wouldn't hold true and then no other partition could be claimed because they
   would all be within target_n of the LastNth/FirstNth.  My change is to pass
   an explicit flag in the accumulator that represents whether or not the node has
   claimed anything yet.  This makes, the possibly incorrect, assumption that the node
   never currently owns anything when `select_indices` is called.  I was able to get a
   500K iteration of the QC prop to pass but I do wonder if things could be different
   in production.  After talking with Joe he seemed to think the change was safe.
  • Loading branch information
rzezeski committed Dec 16, 2011
1 parent 56075a2 commit 1040fa1
Showing 1 changed file with 218 additions and 11 deletions.
229 changes: 218 additions & 11 deletions src/riak_core_claim.erl
Expand Up @@ -56,7 +56,9 @@
default_choose_claim/1, default_choose_claim/2,
never_wants_claim/1, random_choose_claim/1]).
-export([wants_claim_v1/1, wants_claim_v1/2,
wants_claim_v2/1, wants_claim_v2/2,
choose_claim_v1/1, choose_claim_v1/2,
choose_claim_v2/1, choose_claim_v2/2,
claim_rebalance_n/2,
meets_target_n/2,
diagonal_stripe/2]).
Expand All @@ -79,19 +81,21 @@ default_choose_claim(Ring) ->
default_choose_claim(Ring, node()).

default_choose_claim(Ring, Node) ->
choose_claim_v1(Ring, Node).
choose_claim_v2(Ring, Node).

%% @spec default_wants_claim(riak_core_ring()) -> {yes, integer()} | no
%% @doc Want a partition if we currently have less than floor(ringsize/nodes).
default_wants_claim(Ring) ->
default_wants_claim(Ring, node()).

default_wants_claim(Ring, Node) ->
wants_claim_v1(Ring, Node).
wants_claim_v2(Ring, Node).

%% @deprecated
wants_claim_v1(Ring) ->
wants_claim_v1(Ring, node()).

%% @deprecated
wants_claim_v1(Ring0, Node) ->
Ring = riak_core_ring:upgrade(Ring0),
%% Calculate the expected # of partitions for a perfectly balanced ring. Use
Expand All @@ -110,9 +114,29 @@ wants_claim_v1(Ring0, Node) ->
no
end.

wants_claim_v2(Ring) ->
wants_claim_v2(Ring, node()).

wants_claim_v2(Ring, Node) ->
Active = riak_core_ring:claiming_members(Ring),
Owners = riak_core_ring:all_owners(Ring),
Counts = get_counts(Active, Owners),
NodeCount = erlang:length(Active),
RingSize = riak_core_ring:num_partitions(Ring),
Avg = RingSize div NodeCount,
Count = proplists:get_value(Node, Counts, 0),
case Count < Avg of
false ->
no;
true ->
{yes, Avg - Count}
end.

%% @deprecated
choose_claim_v1(Ring) ->
choose_claim_v1(Ring, node()).

%% @deprecated
choose_claim_v1(Ring0, Node) ->
Ring = riak_core_ring:upgrade(Ring0),
TargetN = app_helper:get_env(riak_core, target_n_val),
Expand All @@ -128,6 +152,78 @@ choose_claim_v1(Ring0, Node) ->
claim_rebalance_n(Ring, Node)
end.

choose_claim_v2(Ring) ->
choose_claim_v2(Ring, node()).

choose_claim_v2(Ring, Node) ->
Active = riak_core_ring:claiming_members(Ring),
Owners = riak_core_ring:all_owners(Ring),
Counts = get_counts(Active, Owners),
RingSize = riak_core_ring:num_partitions(Ring),
NodeCount = erlang:length(Active),
Avg = RingSize div NodeCount,
Deltas = [{Member, Avg - Count} || {Member, Count} <- Counts],
{_, Want} = lists:keyfind(Node, 1, Deltas),
TargetN = app_helper:get_env(riak_core, target_n_val),
AllIndices = lists:zip(lists:seq(0, length(Owners)-1),
[Idx || {Idx, _} <- Owners]),

EnoughNodes =
(NodeCount > TargetN)
or ((NodeCount == TargetN) and (RingSize rem TargetN =:= 0)),
case EnoughNodes of
true ->
%% If we have enough nodes to meet target_n, then we prefer to
%% claim indices that are currently causing violations, and then
%% fallback to indices in linear order. The filtering steps below
%% will ensure no new violations are introduced.
Violated = lists:flatten(find_violations(Ring, TargetN)),
Violated2 = [lists:keyfind(Idx, 2, AllIndices) || Idx <- Violated],
Indices = Violated2 ++ (AllIndices -- Violated2);
false ->
%% If we do not have enough nodes to meet target_n, then we prefer
%% claiming the same indices that would occur during a
%% re-diagonalization of the ring with target_n nodes, falling
%% back to linear offsets off these preferred indices when the
%% number of indices desired is less than the computed set.
Padding = lists:duplicate(TargetN, undefined),
Expanded = lists:sublist(Active ++ Padding, TargetN),
PreferredClaim = riak_core_claim:diagonal_stripe(Ring, Expanded),
PreferredNth = [begin
{Nth, Idx} = lists:keyfind(Idx, 2, AllIndices),
Nth
end || {Idx,Owner} <- PreferredClaim,
Owner =:= Node],
Offsets = lists:seq(0, RingSize div length(PreferredNth)),
AllNth = lists:sublist([(X+Y) rem RingSize || Y <- Offsets,
X <- PreferredNth],
RingSize),
Indices = [lists:keyfind(Nth, 1, AllIndices) || Nth <- AllNth]
end,

%% Filter out indices that conflict with the node's existing ownership
Indices2 = prefilter_violations(Ring, Node, AllIndices, Indices,
TargetN, RingSize),
%% Claim indices from the remaining candidate set
Claim = select_indices(Owners, Deltas, Indices2, TargetN, RingSize),
Claim2 = lists:sublist(Claim, Want),
NewRing = lists:foldl(fun(Idx, Ring0) ->
riak_core_ring:transfer_node(Idx, Node, Ring0)
end, Ring, Claim2),

RingChanged = ([] /= Claim2),
RingMeetsTargetN = meets_target_n(NewRing, TargetN),
case {RingChanged, EnoughNodes, RingMeetsTargetN} of
{false, _, _} ->
%% Unable to claim, fallback to re-diagonalization
claim_rebalance_n(Ring, Node);
{_, true, false} ->
%% Failed to meet target_n, fallback to re-diagonalization
claim_rebalance_n(Ring, Node);
_ ->
NewRing
end.

meets_target_n(Ring, TargetN) ->
Owners = lists:keysort(1, riak_core_ring:all_owners(Ring)),
meets_target_n(Owners, TargetN, 0, [], []).
Expand Down Expand Up @@ -283,6 +379,41 @@ find_biggest_hole(Mine) ->
none,
lists:zip(Mine, tl(Mine)++[hd(Mine)])).

%% @private
%%
%% @doc Determines indices that violate the given target_n spacing
%% property.
find_violations(Ring, TargetN) ->
Owners = riak_core_ring:all_owners(Ring),
Suffix = lists:sublist(Owners, TargetN-1),
Owners2 = Owners ++ Suffix,
%% Use a sliding window to determine violations
{Bad, _} = lists:foldl(fun(P={Idx, Owner}, {Out, Window}) ->
Window2 = lists:sublist([P|Window], TargetN-1),
case lists:keyfind(Owner, 2, Window) of
{PrevIdx, Owner} ->
{[[PrevIdx, Idx] | Out], Window2};
false ->
{Out, Window2}
end
end, {[], []}, Owners2),
lists:reverse(Bad).

%% @private
%%
%% @doc Counts up the number of partitions owned by each node.
get_counts(Nodes, Ring) ->
Empty = [{Node, 0} || Node <- Nodes],
Counts = lists:foldl(fun({_Idx, Node}, Counts) ->
case lists:member(Node, Nodes) of
true ->
dict:update_counter(Node, 1, Counts);
false ->
Counts
end
end, dict:from_list(Empty), Ring),
dict:to_list(Counts).

%% @private
get_expected_partitions(Ring, Node) ->
riak_core_ring:num_partitions(Ring) div get_member_count(Ring, Node).
Expand All @@ -299,17 +430,83 @@ get_member_count(Ring, Node) ->
length(AllMembers) + 1
end.

%% @private
%%
%% @doc Filter out candidate indices that would violate target_n given
%% a node's current partition ownership.
prefilter_violations(Ring, Node, AllIndices, Indices, TargetN, RingSize) ->
CurrentIndices = riak_core_ring:indices(Ring, Node),
CurrentNth = [lists:keyfind(Idx, 2, AllIndices) || Idx <- CurrentIndices],
[{Nth, Idx} || {Nth, Idx} <- Indices,
lists:all(fun({CNth, _}) ->
spaced_by_n(CNth, Nth, TargetN, RingSize)
end, CurrentNth)].

%% @private
%%
%% @doc Select indices from a given candidate set, according to two
%% goals.
%%
%% 1. Ensure greedy/local target_n spacing between indices. Note that this
%% goal intentionally does not reject overall target_n violations.
%%
%% 2. Select indices based on the delta between current ownership and
%% expected ownership. In other words, if A owns 5 partitions and
%% the desired ownership is 3, then we try to claim at most 2 partitions
%% from A.
select_indices(_Owners, _Deltas, [], _TargetN, _RingSize) ->
[];
select_indices(Owners, Deltas, Indices, TargetN, RingSize) ->
OwnerDT = dict:from_list(Owners),
{FirstNth, _} = hd(Indices),
%% The `First' symbol indicates whether or not this is the first
%% partition to be claimed by this node. This assumes that the
%% node doesn't already own any partitions. In that case it is
%% _always_ safe to claim the first partition that another owner
%% is willing to part with. It's the subsequent partitions
%% claimed by this node that must not break the target_n invariant.
{Claim, _, _, _} =
lists:foldl(fun({Nth, Idx}, {Out, LastNth, DeltaDT, First}) ->
Owner = dict:fetch(Idx, OwnerDT),
Delta = dict:fetch(Owner, DeltaDT),
MeetsTN = spaced_by_n(LastNth, Nth, TargetN,
RingSize),
case (Delta < 0) and (First or MeetsTN) of
true ->
NextDeltaDT =
dict:update_counter(Owner, 1, DeltaDT),
{[Idx|Out], Nth, NextDeltaDT, false};
false ->
{Out, LastNth, DeltaDT, First}
end
end,
{[], FirstNth, dict:from_list(Deltas), true},
Indices),
lists:reverse(Claim).

%% @private
%%
%% @doc Determine if two positions in the ring meet target_n spacing.
spaced_by_n(NthA, NthB, TargetN, RingSize) ->
case NthA > NthB of
true ->
NFwd = NthA - NthB,
NBack = NthB - NthA + RingSize;
false ->
NFwd = NthA - NthB + RingSize,
NBack = NthB - NthA
end,
(NFwd >= TargetN) and (NBack >= TargetN).

%% ===================================================================
%% Unit tests
%% ===================================================================
-ifdef(TEST).

wants_claim_test() ->
%% riak_core_ring_events:start_link(),
%% riak_core_ring_manager:start_link(test),
riak_core_test_util:setup_mockring1(),
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
?assertEqual(no, default_wants_claim(Ring)),
?assertEqual({yes, 1}, default_wants_claim(Ring)),
riak_core_ring_manager:stop().

find_biggest_hole_test() ->
Expand Down Expand Up @@ -348,23 +545,24 @@ test_nodes(Count) ->
[node() | [list_to_atom(lists:concat(["n_", N])) || N <- lists:seq(1, Count-1)]].

prop_claim_ensures_unique_nodes_test_() ->
Prop = eqc:numtests(500, ?QC_OUT(prop_claim_ensures_unique_nodes())),
Prop = eqc:numtests(1000, ?QC_OUT(prop_claim_ensures_unique_nodes())),
{timeout, 120, fun() -> ?assert(eqc:quickcheck(Prop)) end}.

prop_claim_ensures_unique_nodes() ->
%% NOTE: We know that this doesn't work for the case of {_, 3}.
?FORALL({PartsPow, NodeCount}, {choose(4, 9), choose(4, 15)},
?FORALL({PartsPow, NodeCount}, {choose(4,9), choose(4,15)}, %{choose(4, 9), choose(4, 15)},
begin
Nval = 3,
application:set_env(riak_core, target_n_val, Nval + 1),
TNval = Nval + 1,
application:set_env(riak_core, target_n_val, TNval),

Partitions = ?POW_2(PartsPow),
[Node0 | RestNodes] = test_nodes(NodeCount),

R0 = riak_core_ring:fresh(Partitions, Node0),
Rfinal = lists:foldl(fun(Node, Racc) ->
Racc0 = riak_core_ring:add_member(Node0, Racc, Node),
choose_claim_v1(Racc0, Node)
default_choose_claim(Racc0, Node)
end, R0, RestNodes),

Preflists = riak_core_ring:all_preflists(Rfinal, Nval),
Expand All @@ -378,8 +576,17 @@ prop_claim_ensures_unique_nodes() ->
ordsets:add_element(PL, Acc)
end
end, [], Preflists)),
?assertEqual([], Counts),
true
?WHENFAIL(
begin
io:format(user, "{Partitions, Nodes} {~p, ~p}~n",
[Partitions, NodeCount]),
io:format(user, "Owners: ~p~n",
[riak_core_ring:all_owners(Rfinal)])
end,
conjunction([{meets_target_n,
equals({true,[]},
meets_target_n(Rfinal, TNval))},
{unique_nodes, equals([], Counts)}]))
end).

-endif. % EQC
Expand Down

0 comments on commit 1040fa1

Please sign in to comment.