Permalink
Browse files

improve performance and fix several bugs in future index calculation

* make future index a constant time calculation (it was O(RingSize * FutureRingSize) before)
* for no possible reason the notsent acc was a linked list instead of a set
* during shrinking the n-value for a preflist could be, depending on ownership assignment and
  order of transfers, implicitly grown (e.g. N=3 -> N=5) leaving behind unreachable data. This
  is because primaries involved in shrinking may transfer data out after receiving data in. In
  this case it is necessary to identify keys that are owned by the source partition in the *future*
  ring. This is done by detecting two conditions, the first is when the position of the source in
  the current preflist is greater than or equal to the size of the new ring (an impossible N value
  in the new ring) and an optional N-value threshold which solves a specific case when halving the ring
  where the first condition is not triggered until data has already been copied where it shouldn't be

add eqc test for riak_core_ring:future_index
  • Loading branch information...
1 parent 262be06 commit 0f4c8ca8f8e697e696aeaa94a3a0772d5f66e847 @jrwest jrwest committed May 1, 2013
Showing with 236 additions and 42 deletions.
  1. +13 −0 src/riak_core_bucket.erl
  2. +26 −20 src/riak_core_handoff_manager.erl
  3. +68 −21 src/riak_core_ring.erl
  4. +1 −1 src/riak_core_vnode.erl
  5. +128 −0 test/riak_core_ring_eqc.erl
View
@@ -31,6 +31,8 @@
get_bucket/2,
reset_bucket/1,
get_buckets/1,
+ bucket_nval_map/1,
+ default_object_nval/0,
merge_props/2,
name/1,
n_val/1]).
@@ -128,6 +130,17 @@ get_buckets(Ring) ->
Names = riak_core_ring:get_buckets(Ring),
[get_bucket(Name, Ring) || Name <- Names].
+%% @doc returns a proplist containing all buckets and their respective N values
+-spec bucket_nval_map(riak_core_ring:riak_core_ring()) -> [{binary(),integer()}].
+bucket_nval_map(Ring) ->
+ [{riak_core_bucket:name(B), riak_core_bucket:n_val(B)} ||
+ B <- riak_core_bucket:get_buckets(Ring)].
+
+%% @doc returns the default n value for buckets that have not explicitly set the property
+-spec default_object_nval() -> integer().
+default_object_nval() ->
+ riak_core_bucket:n_val(riak_core_config:default_bucket_props()).
+
%% @private
-spec validate_props(BucketProps::list({PropName::atom(), Value::any()}),
Validators::list(module()),
@@ -400,28 +400,34 @@ filter({Key, Value}=_Filter) ->
resize_transfer_filter(Ring, Mod, Src, Target) ->
fun(K) ->
{_, Hashed} = Mod:object_info(K),
- %% when the ring shrinks we may have data on this partition for which
- %% it is not possible to determine a future index because the position
- %% of Src in the current preflist is greater than the size of the future
- %% ring. This data shouldn't be sent anywhere so catch the exception and
- %% move on
- try riak_core_ring:is_future_index(Hashed,
- Src,
- Target,
- Ring)
- catch error:_ -> false
- end
+ riak_core_ring:is_future_index(Hashed,
+ Src,
+ Target,
+ Ring)
end.
resize_transfer_notsent_fun(Ring, Mod, Src) ->
- fun(Key, Acc) -> record_seen_index(Ring, Mod, Src, Key, Acc) end.
-
-record_seen_index(Ring, Mod, Src, Key, Seen) ->
- {_, Hashed} = Mod:object_info(Key),
- try riak_core_ring:future_index(Hashed, Src, Ring) of
- FutureIndex -> [FutureIndex | Seen]
- catch
- error:_ -> Seen
+ Shrinking = riak_core_ring:num_partitions(Ring) > riak_core_ring:future_num_partitions(Ring),
+ case Shrinking of
+ false -> NValMap = DefaultN = undefined;
+ true ->
+ NValMap = riak_core_bucket:bucket_nval_map(Ring),
+ DefaultN = riak_core_bucket:default_object_nval()
+ end,
+ fun(Key, Acc) -> record_seen_index(Ring, Shrinking, NValMap, DefaultN, Mod, Src, Key, Acc) end.
+
+record_seen_index(Ring, Shrinking, NValMap, DefaultN, Mod, Src, Key, Seen) ->
+ {Bucket, Hashed} = Mod:object_info(Key),
+ CheckNVal = case Shrinking of
+ false -> undefined;
+ true -> proplists:get_value(Bucket, NValMap, DefaultN)
+ end,
+ case riak_core_ring:future_index(Hashed, Src, CheckNVal, Ring) of
+ undefined -> Seen;
+ FutureIndex ->
+ lager:info("resize transfer from ~p recording ~p as seen for key ~p",
+ [Src, FutureIndex, Key]),
+ ordsets:add_element(FutureIndex, Seen)
end.
get_concurrency_limit () ->
@@ -483,7 +489,7 @@ send_handoff(HOType, {Mod, Src, Target}, Node, Vnode, HS, {Filter, FilterModFun}
resize_transfer ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
HOFilter = resize_transfer_filter(Ring, Mod, Src, Target),
- HOAcc0 = [],
+ HOAcc0 = ordsets:new(),
HONotSentFun = resize_transfer_notsent_fun(Ring, Mod, Src);
_ ->
HOFilter = none,
View
@@ -126,6 +126,7 @@
resized_ring/1,
set_resized_ring/2,
future_index/3,
+ future_index/4,
is_future_index/4,
future_owner/2,
future_num_partitions/1,
@@ -534,31 +535,77 @@ responsible_index(ChashKey, ?CHSTATE{chring=Ring}) ->
%% for `CHashKey' in the future ring. For regular transitions
%% the returned index will always be `OrigIdx'. If the ring is
%% resizing the index may be different
--spec future_index(chash:index(), integer(), chstate()) -> integer().
+-spec future_index(chash:index(),
+ integer(),
+ chstate()) -> integer() | undefined.
future_index(CHashKey, OrigIdx, State) ->
- FutureState = future_ring(State),
- Preflist = preflist(CHashKey, State),
- FuturePreflist = preflist(CHashKey, FutureState),
- Position = preflist_position(Preflist, OrigIdx),
- {FutureIdx, _} = lists:nth(Position, FuturePreflist),
- FutureIdx.
+ future_index(CHashKey, OrigIdx, undefined, State).
+
+-spec future_index(chash:index(),
+ integer(),
+ undefined | integer(),
+ chstate()) -> integer() | undefined.
+future_index(CHashKey, OrigIdx, NValCheck, State) ->
+ <<CHashInt:160/integer>> = CHashKey,
+ OrigCount = num_partitions(State),
+ NextCount = future_num_partitions(State),
+ OrigInc = chash:ring_increment(OrigCount),
+ NextInc = chash:ring_increment(NextCount),
+
+ %% Determine position in the ring of partition that owns key (head of preflist)
+ %% Position is 1-based starting from partition (0 + ring increment), e.g.
+ %% index 0 is always position N.
+ OwnerPos = ((CHashInt div OrigInc) + 1),
+
+ %% Determine position of the source partition in the ring
+ %% if OrigIdx is 0 we know the position is OrigCount (number of partitions)
+ OrigPos = case OrigIdx of
+ 0 -> OrigCount;
+ _ -> OrigIdx div OrigInc
+ end,
+
+ %% The distance between the key's owner (head of preflist) and the source partition
+ %% is the position of the source in the preflist, the distance may be negative
+ %% in which case we have wrapped around the ring. distance of zero means the source
+ %% is the head of the preflist.
+ OrigDist = case OrigPos - OwnerPos of
+ P when P < 0 -> OrigCount + P;
+ P -> P
+ end,
+
+ %% In the case that the ring is shrinking the future index for a key whose position
+ %% in the preflist is >= ring size may be calculated, any transfer is invalid in
+ %% this case, return undefined. The position may also be >= an optional N value for
+ %% the key, if this is true undefined is also returned
+ case check_invalid_future_index(OrigDist, NextCount, NValCheck) of
+ true -> undefined;
+ false ->
+ %% Determine the partition (head of preflist) that will own the key in the future ring
+ FuturePos = ((CHashInt div NextInc) + 1),
+ NextOwner = FuturePos * NextInc,
+
+ %% Determine the partition that the key should be transferred to (has same position
+ %% in future preflist as source partition does in current preflist)
+ RingTop = trunc(math:pow(2,160)-1),
+ case NextOwner + (NextInc * OrigDist) of
+ FutureIndex when FutureIndex >= RingTop -> FutureIndex - RingTop;
+ FutureIndex -> FutureIndex
+ end
+ end.
+
+check_invalid_future_index(OrigDist, NextCount, NValCheck) ->
+ OverRingSize = OrigDist >= NextCount,
+ OverNVal = case NValCheck of
+ undefined -> false;
+ _ -> OrigDist >= NValCheck
+ end,
+ OverRingSize orelse OverNVal.
-spec is_future_index(chash:index(), integer(), integer(), chstate()) -> boolean().
is_future_index(CHashKey, OrigIdx, TargetIdx, State) ->
- FutureIndex = future_index(CHashKey, OrigIdx, State),
+ FutureIndex = future_index(CHashKey, OrigIdx, undefined, State),
FutureIndex =:= TargetIdx.
-%% @private
-%% returns a 1-based index that is the position of `Idx' in `Preflist'
-preflist_position(Preflist, Idx) ->
- preflist_position(Preflist, Idx, 1).
-
-%% @private
-preflist_position([{Idx, _} | _], Idx, Acc) ->
- Acc;
-preflist_position([_ | Rest], Idx, Acc) ->
- preflist_position(Rest, Idx, Acc+1).
-
-spec transfer_node(Idx :: integer(), Node :: term(), MyState :: chstate()) ->
chstate().
transfer_node(Idx, Node, MyState) ->
@@ -1930,8 +1977,8 @@ resize_test() ->
Key = <<0:160/integer>>,
OrigIdx = element(1, hd(preflist(Key, Ring0))),
%% for non-resize transitions index should be the same
- ?assertEqual(OrigIdx, future_index(Key, OrigIdx, Ring0)),
- ?assertEqual(element(1, hd(preflist(Key, Ring2))), future_index(Key, OrigIdx, Ring3)).
+ ?assertEqual(OrigIdx, future_index(Key, OrigIdx, undefined, Ring0)),
+ ?assertEqual(element(1, hd(preflist(Key, Ring2))), future_index(Key, OrigIdx, undefined, Ring3)).
resize_xfer_test_() ->
{setup,
View
@@ -504,7 +504,7 @@ mark_handoff_complete(SrcIdx, Target, SeenIdxs, Mod, resize_transfer) ->
Source,
SeenIdx)
end,
- Ring2 = lists:foldl(F, Ring, SeenIdxs),
+ Ring2 = lists:foldl(F, Ring, ordsets:to_list(SeenIdxs)),
Ring3 = riak_core_ring:resize_transfer_complete(Ring2,
Source,
Target,
View
@@ -0,0 +1,128 @@
+%% -------------------------------------------------------------------
+%%
+%%
+%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(riak_core_ring_eqc).
+
+-ifdef(EQC).
+-export([prop_future_index/0]).
+
+-include_lib("eqc/include/eqc.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-define(TEST_ITERATIONS, 10000).
+-define(QC_OUT(P),
+ eqc:on_output(fun(Str, Args) -> io:format(user, Str, Args) end, P)).
+
+
+eqc_test_() ->
+ {inparallel,
+ [{spawn,
+ [{setup,
+ fun() -> ok end,
+ fun(_) -> ok end,
+ [
+ %% Run the quickcheck tests
+ {timeout, 60000, % timeout is in msec
+ %% Indicate the number of test iterations for each property here
+ ?_assertEqual(true,
+ quickcheck(numtests(?TEST_ITERATIONS,
+ ?QC_OUT(prop_future_index()))))
+ }]}]}]}.
+
+
+
+prop_future_index() ->
+ ?FORALL({CHashKey, OrigIdx, TargetIdx, N, Pos, Ring}=TransferItem, resize_item(),
+ ?WHENFAIL(prop_future_index_failed(TransferItem),
+ collect(N =/= undefined andalso Pos >= N,
+ begin
+ {Time, Val} = timer:tc(riak_core_ring, future_index,
+ [CHashKey, OrigIdx, N, Ring]),
+ measure(future_index_usec, Time, TargetIdx =:= Val)
+ end))).
+
+resize_item() ->
+ %% RingSize - Starting Ring Size
+ %% GrowthFactor - >1 expanding, <1 shrinking
+ %% IndexStart - first partition in preflist for key
+ %% Pos - position of source partition in preflist
+ %% N - optional known N-value for the key
+ ?LET({RingSize, GrowthF},
+ {current_size(), growth_factor()},
+ ?LET({IndexStart, Pos, N},
+ {index_in_current_ring(RingSize),
+ replica_position(RingSize),
+ check_nval(RingSize, GrowthF)},
+ begin
+ Ring0 = riak_core_ring:fresh(RingSize, node()),
+ Ring1 = riak_core_ring:resize(Ring0,trunc(GrowthF * RingSize)),
+ Ring = riak_core_ring:set_pending_resize(Ring1, Ring0),
+ CHashKey = <<(IndexStart-1):160/integer>>,
+ Preflist = riak_core_ring:preflist(CHashKey, Ring0),
+ FuturePreflist = riak_core_ring:preflist(CHashKey, Ring1),
+ {SourceIdx, _} = lists:nth(Pos+1, Preflist),
+
+ %% account for case where position is greater than
+ %% future ring size or possbly known N-value
+ %% (shrinking) we shouldn't have a target index in
+ %% that case since a transfer to it would be invalid
+ case Pos >= riak_core_ring:num_partitions(Ring1) orelse
+ (N =/= undefined andalso Pos >= N) of
+ true -> TargetIdx = undefined;
+ false -> {TargetIdx, _} = lists:nth(Pos+1, FuturePreflist)
+ end,
+ {CHashKey, SourceIdx, TargetIdx, N, Pos, Ring}
+ end)).
+
+index_in_current_ring(RingSize) ->
+ elements([I || {I,_} <- riak_core_ring:all_owners(riak_core_ring:fresh(RingSize, node()))]).
+
+current_size() ->
+ elements([16, 32, 64, 128]).
+
+growth_factor() ->
+ oneof([0.25, 0.5, 2, 4]).
+
+
+replica_position(RingSize) ->
+ %% use a max position that could be invalid for shrinking (greater than future size)
+ %% purposefully to generate negative cases
+ Max = RingSize,
+ choose(0, (Max - 1)).
+
+check_nval(RingSize, GrowthF) ->
+ case GrowthF > 1 of
+ %% while expanding the n-value doesn't matter
+ true -> undefined;
+ %% while shrinking provide a "realistic" n-value of the key
+ false -> choose(1, trunc((RingSize * GrowthF) / 2) - 1)
+ end.
+
+
+prop_future_index_failed({CHashKey, OrigIdx, TargetIdx, NValCheck, _, R}) ->
+ <<CHashInt:160/integer>> = CHashKey,
+ FoundTarget = riak_core_ring:future_index(CHashKey, OrigIdx, NValCheck, R),
+ io:format("key: ~p~nsource: ~p~ncurrsize: ~p~nfuturesize: ~p~nexpected: ~p~nactual: ~p~n",
+ [CHashInt, OrigIdx,
+ riak_core_ring:num_partitions(R), riak_core_ring:future_num_partitions(R),
+ TargetIdx, FoundTarget]).
+
+-endif.

0 comments on commit 0f4c8ca

Please sign in to comment.