Skip to content

Commit

Permalink
Merge branch 'feature/systest-read-sibling-handling'
Browse files Browse the repository at this point in the history
  • Loading branch information
kellymclaughlin committed Jun 18, 2014
2 parents c3597b9 + d045dfc commit 34a0489
Showing 1 changed file with 80 additions and 14 deletions.
94 changes: 80 additions & 14 deletions src/rt.erl
Expand Up @@ -1118,23 +1118,89 @@ systest_read(Node, Start, End, Bucket, R) ->
systest_read(Node, Start, End, Bucket, R, <<>>).

systest_read(Node, Start, End, Bucket, R, CommonValBin)
when is_binary(CommonValBin) ->
systest_read(Node, Start, End, Bucket, R, CommonValBin, false).

%% Read and verify the values of objects written with
%% `systest_write'. The `SquashSiblings' parameter exists to
%% optionally allow handling of siblings whose value and metadata are
%% identical except for the dot. This goal is to facilitate testing
%% with DVV enabled because siblings can be created internally by Riak
%% in cases where testing with DVV disabled would not. Such cases
%% include writes that happen during handoff when a vnode forwards
%% writes, but also performs them locally or when a put coordinator
%% fails to send an acknowledgment within the timeout window and
%% another put request is issued.
systest_read(Node, Start, End, Bucket, R, CommonValBin, SquashSiblings)
when is_binary(CommonValBin) ->
rt:wait_for_service(Node, riak_kv),
{ok, C} = riak:client_connect(Node),
F = fun(N, Acc) ->
case C:get(Bucket, <<N:32/integer>>, R) of
{ok, Obj} ->
case riak_object:get_value(Obj) of
<<N:32/integer, CommonValBin/binary>> ->
Acc;
WrongVal ->
[{N, {wrong_val, WrongVal}} | Acc]
end;
Other ->
[{N, Other} | Acc]
end
end,
lists:foldl(F, [], lists:seq(Start, End)).
lists:foldl(systest_read_fold_fun(C, Bucket, R, CommonValBin, SquashSiblings),
[],
lists:seq(Start, End)).

systest_read_fold_fun(C, Bucket, R, CommonValBin, SquashSiblings) ->
fun(N, Acc) ->
GetRes = C:get(Bucket, <<N:32/integer>>, R),
Val = object_value(GetRes, SquashSiblings),
update_acc(value_matches(Val, N, CommonValBin), Val, N, Acc)
end.

object_value({error, _}=Error, _) ->
Error;
object_value({ok, Obj}, SquashSiblings) ->
object_value(riak_object:value_count(Obj), Obj, SquashSiblings).

object_value(1, Obj, _SquashSiblings) ->
riak_object:get_value(Obj);
object_value(_ValueCount, Obj, false) ->
riak_object:get_value(Obj);
object_value(_ValueCount, Obj, true) ->
lager:debug("Siblings detected for ~p:~p", [riak_object:bucket(Obj), riak_object:key(Obj)]),
Contents = riak_object:get_contents(Obj),
case lists:foldl(fun sibling_compare/2, {true, undefined}, Contents) of
{true, {_, _, _, Value}} ->
lager:debug("Siblings determined to be a single value"),
Value;
{false, _} ->
{error, siblings}
end.

sibling_compare({MetaData, Value}, {true, undefined}) ->
Dot = case dict:find(<<"dot">>, MetaData) of
{ok, DotVal} ->
DotVal;
error ->
{error, no_dot}
end,
VTag = dict:fetch(<<"X-Riak-VTag">>, MetaData),
LastMod = dict:fetch(<<"X-Riak-Last-Modified">>, MetaData),
{true, {element(2, Dot), VTag, LastMod, Value}};
sibling_compare(_, {false, _}=InvalidMatch) ->
InvalidMatch;
sibling_compare({MetaData, Value}, {true, PreviousElements}) ->
Dot = case dict:find(<<"dot">>, MetaData) of
{ok, DotVal} ->
DotVal;
error ->
{error, no_dot}
end,
VTag = dict:fetch(<<"X-Riak-VTag">>, MetaData),
LastMod = dict:fetch(<<"X-Riak-Last-Modified">>, MetaData),
ComparisonElements = {element(2, Dot), VTag, LastMod, Value},
{ComparisonElements =:= PreviousElements, ComparisonElements}.

value_matches(<<N:32/integer, CommonValBin/binary>>, N, CommonValBin) ->
true;
value_matches(_WrongVal, _N, _CommonValBin) ->
false.

update_acc(true, _, _, Acc) ->
Acc;
update_acc(false, {error, _}=Val, N, Acc) ->
[{N, Val} | Acc];
update_acc(false, Val, N, Acc) ->
[{N, {wrong_val, Val}} | Acc].

% @doc Reads a single replica of a value. This issues a get command directly
% to the vnode handling the Nth primary partition of the object's preflist.
Expand Down

0 comments on commit 34a0489

Please sign in to comment.