Skip to content
Browse files

Drop indexes from counters

Add minimal required meta data to work with HTTP API
  • Loading branch information...
1 parent 9c6d99d commit dfab047ecdc4276900ffa0cd0745276f2aed6b77 @russelldb russelldb committed Apr 10, 2013
Showing with 107 additions and 117 deletions.
  1. +49 −37 src/riak_kv_counter.erl
  2. +1 −17 src/riak_kv_put_fsm.erl
  3. +12 −1 src/riak_kv_util.erl
  4. +6 −6 src/riak_kv_vnode.erl
  5. +39 −56 test/kv_counter_eqc.erl
View
86 src/riak_kv_counter.erl
@@ -21,7 +21,7 @@
%% -------------------------------------------------------------------
-module(riak_kv_counter).
--export([update/4, merge/2, value/1]).
+-export([update/3, merge/1, value/1]).
-include("riak_kv_wm_raw.hrl").
@@ -34,11 +34,10 @@
%% for later resolution by the user.
%%
%% @TODO How do we let callers now about the sibling values?
--spec update(riak_object:riak_object(), riak_object:index_specs(),
- binary(), integer()) ->
+-spec update(riak_object:riak_object(), binary(), integer()) ->
riak_object:riak_object().
-update(RObj, IndexSpecs, Actor, Amt) ->
- {Counter0, NonCounterSiblings, Meta} = merge_object(RObj, IndexSpecs),
+update(RObj, Actor, Amt) ->
+ {Meta, Counter0, NonCounterSiblings} = merge_object(RObj),
Counter = case Amt of
0 -> Counter0;
_ -> update_counter(Counter0, Actor, Amt)
@@ -48,59 +47,43 @@ update(RObj, IndexSpecs, Actor, Amt) ->
%% @doc Unlike regular, opaque `riak_object' values, conflicting
%% counter writes can be merged by Riak, thanks to their internal
%% CRDT PN-Counter structure.
--spec merge(riak_object:riak_object(), riak_object:index_specs()) ->
+-spec merge(riak_object:riak_object()) ->
riak_object:riak_object().
-merge(RObj, IndexSpecs) ->
- {Counter, NonCounterSiblings, Meta} = merge_object(RObj, IndexSpecs),
+merge(RObj) ->
+ {Meta, Counter, NonCounterSiblings} = merge_object(RObj),
update_object(RObj, Meta, Counter, NonCounterSiblings).
%% @doc Currently _IGNORES_ all non-counter sibling values
-spec value(riak_object:riak_object()) ->
integer().
value(RObj) ->
Contents = riak_object:get_contents(RObj),
- {Counter, _NonCounterSiblings} = merge_contents(Contents),
+ {_Meta, Counter, _NonCounterSiblings} = merge_contents(Contents),
case Counter of
undefined -> 0;
_ ->
riak_kv_pncounter:value(Counter)
end.
%% Merge contents _AND_ meta
-merge_object(RObj, IndexSpecs) ->
+merge_object(RObj) ->
Contents = riak_object:get_contents(RObj),
- {Counter, NonCounterSiblings} = merge_contents(Contents),
- Meta = merged_meta(IndexSpecs),
- {Counter, NonCounterSiblings, Meta}.
+ merge_contents(Contents).
%% Only merge the values of actual PN-Counters
%% If a non-CRDT datum is present, keep it as a sibling value
merge_contents(Contents) ->
lists:foldl(fun merge_value/2,
- {undefined, []},
+ {undefined, undefined, []},
Contents).
%% worker for `merge_contents/1'
-merge_value({_MD, {riak_kv_pncounter, Counter}}, {undefined, NonCounterSiblings}) ->
- {Counter, NonCounterSiblings};
-merge_value({_MD, {riak_kv_pncounter, Counter}}, {Mergedest, NonCounterSiblings}) ->
- {riak_kv_pncounter:merge(Counter, Mergedest), NonCounterSiblings};
-merge_value(NonCounter, {Mergedest, NonCounterSiblings}) ->
- {Mergedest, [NonCounter | NonCounterSiblings]}.
-
-%% Only indexes are allowed in counter
-%% meta data.
-%% The job of merging index meta data has
-%% already been done to get the indexspecs
-%% therefore create a meta that is
-%% only the index meta data we already know about
-merged_meta(IndexSpecs) ->
- case [{Index, Value} || {Op, Index, Value} <- IndexSpecs,
- Op =:= add] of
- [] -> dict:new();
- Indexes ->
- dict:store(?MD_INDEX, Indexes, dict:new())
- end.
+merge_value({MD, {riak_kv_pncounter, Counter}}, {undefined, undefined, NonCounterSiblings}) ->
+ {MD, Counter, NonCounterSiblings};
+merge_value({MD, {riak_kv_pncounter, Counter}}, {MergedMeta, Mergedest, NonCounterSiblings}) ->
+ {merge_meta(MD, MergedMeta), riak_kv_pncounter:merge(Counter, Mergedest), NonCounterSiblings};
+merge_value(NonCounter, {MD, Mergedest, NonCounterSiblings}) ->
+ {MD, Mergedest, [NonCounter | NonCounterSiblings]}.
update_counter(undefined, Actor, Amt) ->
update_counter(riak_kv_pncounter:new(), Actor, Amt);
@@ -116,14 +99,43 @@ counter_op(Amt) ->
%% This uses an exported but marked INTERNAL
%% function of `riak_object:set_contents' to preserve
%% non-counter sibling values and Metadata
-update_object(RObj, _Meta, undefined, _Siblings) ->
+%% NOTE: if `Meta' is `undefined' then this
+%% is a new counter.
+update_object(RObj, _, undefined, _Siblings) ->
RObj;
update_object(RObj, Meta, Counter, []) ->
RObj2 = riak_object:update_value(RObj, {riak_kv_pncounter, Counter}),
- RObj3 = riak_object:update_metadata(RObj2, Meta),
+ RObj3 = riak_object:update_metadata(RObj2, counter_meta(Meta)),
riak_object:apply_updates(RObj3);
update_object(RObj, Meta, Counter, SiblingValues) ->
%% keep non-counter siblings, too
- riak_object:set_contents(RObj, [{Meta, {riak_kv_pncounter, Counter}} | SiblingValues]).
+ riak_object:set_contents(RObj, [{counter_meta(Meta), {riak_kv_pncounter, Counter}} | SiblingValues]).
+
+counter_meta(undefined) ->
+ Now = os:timestamp(),
+ M = dict:new(),
+ M2 = dict:store(?MD_LASTMOD, Now, M),
+ dict:store(?MD_VTAG, riak_kv_util:make_vtag(Now), M2);
+counter_meta(Meta) ->
+ Meta.
+
+%% Just a simple take the largest for meta values based on last mod
+merge_meta(Meta1, Meta2) ->
+ case later(lastmod(Meta1), lastmod(Meta2)) of
+ true ->
+ Meta1;
+ false ->
+ Meta2
+ end.
+lastmod(Meta) ->
+ dict:fetch(?MD_LASTMOD, Meta).
+
+later(TS1, TS2) ->
+ case timer:now_diff(TS1, TS2) of
+ Before when Before < 0 ->
+ false;
+ _ ->
+ true
+ end.
View
18 src/riak_kv_put_fsm.erl
@@ -663,14 +663,10 @@ update_last_modified(RObj) ->
%% objects with the same vclock on 0.14.2 if the same clientid was used in
%% the same second. It can be revisited post-1.0.0.
Now = os:timestamp(),
- NewMD = dict:store(?MD_VTAG, make_vtag(Now),
+ NewMD = dict:store(?MD_VTAG, riak_kv_util:make_vtag(Now),
dict:store(?MD_LASTMOD, Now, MD0)),
riak_object:update_metadata(RObj, NewMD).
-make_vtag(Now) ->
- <<HashAsNum:128/integer>> = crypto:md5(term_to_binary({node(), Now})),
- riak_core_util:integer_to_list(HashAsNum,62).
-
%% Invokes the hook and returns a tuple of
%% {Lang, Called, Result}
%% Where Called = {Mod, Fun} if Lang = erlang
@@ -866,15 +862,3 @@ atom2list(P) when is_pid(P)->
dtrace_errstr(Term) ->
io_lib:format("~P", [Term, 12]).
-
-%% ===================================================================
-%% EUnit tests
-%% ===================================================================
--ifdef(TEST).
-
-make_vtag_test() ->
- crypto:start(),
- ?assertNot(make_vtag(now()) =:=
- make_vtag(now())).
-
--endif.
View
13 src/riak_kv_util.erl
@@ -38,7 +38,8 @@
fix_incorrect_index_entries/1,
fix_incorrect_index_entries/0,
responsible_preflists/1,
- responsible_preflists/2]).
+ responsible_preflists/2,
+ make_vtag/1]).
-include_lib("riak_kv_vnode.hrl").
@@ -332,6 +333,11 @@ mark_indexes_reformatted(Idx, 0, ForUpgrade) ->
mark_indexes_reformatted(_Idx, _ErrorCount, _ForUpgrade) ->
undefined.
+%% @Doc vtag creation function
+-spec make_vtag(erlang:timestamp()) -> list().
+make_vtag(Now) ->
+ <<HashAsNum:128/integer>> = crypto:md5(term_to_binary({node(), Now})),
+ riak_core_util:integer_to_list(HashAsNum,62).
%% ===================================================================
%% EUnit tests
@@ -359,4 +365,9 @@ deleted_test() ->
O, dict:store(<<"X-Riak-Deleted">>, true, MD))),
true = is_x_deleted(O1).
+make_vtag_test() ->
+ crypto:start(),
+ ?assertNot(make_vtag(now()) =:=
+ make_vtag(now())).
+
-endif.
View
12 src/riak_kv_vnode.erl
@@ -989,19 +989,19 @@ prepare_put(#state{idx=Idx,
PruneTime,
BProps))
end,
- ObjToStore2 = handle_counter(Coord, CounterOp, VId, ObjToStore, IndexSpecs),
+ ObjToStore2 = handle_counter(Coord, CounterOp, VId, ObjToStore),
{{true, ObjToStore2},
PutArgs#putargs{index_specs=IndexSpecs, is_index=IndexBackend}}
end
end.
-handle_counter(true, CounterOp, VId, RObj, IndexSpecs) when is_integer(CounterOp) ->
- riak_kv_counter:update(RObj, IndexSpecs, VId, CounterOp);
-handle_counter(false, CounterOp, _Vid, RObj, IndexSpecs) when is_integer(CounterOp) ->
+handle_counter(true, CounterOp, VId, RObj) when is_integer(CounterOp) ->
+ riak_kv_counter:update(RObj, VId, CounterOp);
+handle_counter(false, CounterOp, _Vid, RObj) when is_integer(CounterOp) ->
%% non co-ord put, merge the values if there are siblings
%% 'cos that is the point of CRDTs / counters: no siblings
- riak_kv_counter:merge(RObj, IndexSpecs);
-handle_counter(_Coord, __CounterOp, _VId, RObj, _IndexSpecs) ->
+ riak_kv_counter:merge(RObj);
+handle_counter(_Coord, __CounterOp, _VId, RObj) ->
RObj.
perform_put({false, Obj},
View
95 test/kv_counter_eqc.erl
@@ -84,9 +84,9 @@ prop_value() ->
equals(sumthem(RObj), riak_kv_counter:value(RObj))).
prop_merge() ->
- ?FORALL({RObj, IndexSpecs}, {riak_object(), index_specs()},
+ ?FORALL(RObj, riak_object(),
begin
- Merged = riak_kv_counter:merge(RObj, IndexSpecs),
+ Merged = riak_kv_counter:merge(RObj),
FExpectedCounters = fun(NumGeneratedCounters) ->
case NumGeneratedCounters of
0 -> 0;
@@ -98,21 +98,20 @@ prop_merge() ->
?WHENFAIL(
begin
io:format("Gen ~p\n", [RObj]),
- io:format("Merged ~p\n", [Merged]),
- io:format("Index Specs ~p~n", [IndexSpecs])
+ io:format("Merged ~p\n", [Merged])
end,
conjunction([
{value, equals(sumthem(RObj), riak_kv_counter:value(Merged))},
- {verify_merge, verify_merge(RObj, Merged, IndexSpecs, FExpectedCounters, MergeSeed)}
+ {verify_merge, verify_merge(RObj, Merged, FExpectedCounters, MergeSeed)}
]))
end).
prop_update() ->
- ?FORALL({RObj, IndexSpecs, Actor, Amt},
- {riak_object(), index_specs(), noshrink(binary(4)), int()},
+ ?FORALL({RObj, Actor, Amt},
+ {riak_object(), noshrink(binary(4)), int()},
begin
- Updated = riak_kv_counter:update(RObj, IndexSpecs, Actor, Amt),
+ Updated = riak_kv_counter:update(RObj, Actor, Amt),
FExpectedCounters = fun(NumGeneratedCounters) ->
case {NumGeneratedCounters, Amt} of
{0, 0} -> 0;
@@ -128,13 +127,12 @@ prop_update() ->
begin
io:format("Gen ~p~n", [RObj]),
io:format("Updated ~p~n", [Updated]),
- io:format("Index Specs ~p~n", [IndexSpecs]),
io:format("Amt ~p~n", [Amt])
end,
conjunction([
{counter_value, equals(sumthem(RObj) + Amt,
riak_kv_counter:value(Updated))},
- {verify_merge, verify_merge(RObj, Updated, IndexSpecs, FExpectedCounters, MergeSeed)}
+ {verify_merge, verify_merge(RObj, Updated, FExpectedCounters, MergeSeed)}
]))
end).
@@ -144,47 +142,42 @@ prop_update() ->
%% Update and Merge are the same, except for the
%% end value of the counter. Reuse the common properties.
-verify_merge(Generated, PostAction, IndexSpecs, FExpectedCounters, MergeSeed) ->
+verify_merge(Generated, PostAction, FExpectedCounters, MergeSeed) ->
NumGeneratedCounters = num_counters(Generated),
ExpectedCounters = FExpectedCounters(NumGeneratedCounters),
ExpectedCounter = merge_object(Generated, MergeSeed),
NumMergedCounters = num_counters(PostAction),
{MergedMeta, MergedCounter} = single_counter(PostAction),
- ExpectedIndexMeta = expected_indexes(IndexSpecs, ExpectedCounters),
ExpectedSiblings = non_counter_siblings(Generated),
ActualSiblings = non_counter_siblings(PostAction),
conjunction([{number_of_counters,
equals(ExpectedCounters, NumMergedCounters)},
{counter_structure,
counters_equal(ExpectedCounter, MergedCounter)},
{siblings, equals(ExpectedSiblings, ActualSiblings)},
- {index_meta, equals(ExpectedIndexMeta, sorted_index_meta(MergedMeta))}]).
-
-sorted_index_meta(undefined) ->
- undefined;
-sorted_index_meta(Meta) ->
- case dict:find(?MD_INDEX, Meta) of
- error ->
- undefined;
- {ok, Val} ->
- lists:sort(Val)
- end.
-
-expected_indexes(_IndexSpecs, 0) ->
- undefined;
-expected_indexes(IndexSpecs, _) ->
- %% Use fold just to differentiate the code
- %% under test and the testing code
- case lists:foldl(fun({add, Index, Val}, Acc) ->
- [{Index, Val} | Acc];
- (_, Acc) ->
- Acc
- end,
- [],
- IndexSpecs) of
- [] ->
- undefined;
- Indexes -> lists:sort(Indexes)
+ {meta, equals(latest_meta(Generated, MergedMeta), MergedMeta)}]).
+
+latest_meta(RObj, MergedMeta) ->
+ %% Get the largest last modified containing meta data
+ latest_counter_meta(riak_object:get_contents(RObj), MergedMeta).
+
+latest_counter_meta([], Latest) ->
+ Latest;
+latest_counter_meta([{MD, {riak_kv_pncounter, _}}|Rest], Latest) ->
+ latest_counter_meta(Rest, get_latest_meta(MD, Latest));
+latest_counter_meta([_|Rest], Latest) ->
+ latest_counter_meta(Rest, Latest).
+
+get_latest_meta(MD, undefined) ->
+ MD;
+get_latest_meta(MD1, MD2) ->
+ TS1 = dict:fetch(?MD_LASTMOD, MD1),
+ TS2 = dict:fetch(?MD_LASTMOD, MD2),
+ case timer:now_diff(TS1, TS2) of
+ N when N < 0 ->
+ MD2;
+ _ ->
+ MD1
end.
%% safe wrap of riak_kv_pncounter:equal/2
@@ -197,7 +190,6 @@ counters_equal(undefined, _C2) ->
counters_equal(C1, C2) ->
riak_kv_pncounter:equal(C1, C2).
-
%% Extract a single {meta, counter} value
single_counter(Merged) ->
Contents = riak_object:get_contents(Merged),
@@ -262,7 +254,13 @@ contents() ->
list(content()).
content() ->
- {metadata(), value()}.
+ oneof([{metadata(), binary()}, {counter_meta(), pncounter()}]).
+
+counter_meta() ->
+ %% generate a dict of metadata
+ ?LET({_Mega, _Sec, _Micro}=Now, {nat(), nat(), nat()},
+ dict:store(?MD_VTAG, riak_kv_util:make_vtag(Now),
+ dict:store(?MD_LASTMOD, Now, dict:new()))).
metadata() ->
%% generate a dict of metadata
@@ -276,9 +274,6 @@ metadatum() ->
%% just present
{binary(), binary()}.
-value() ->
- oneof([binary(), pncounter()]).
-
gcounter() ->
list(clock()).
@@ -288,17 +283,5 @@ pncounter() ->
clock() ->
{int(), nat()}.
-index_specs() ->
- list(index_spec()).
-
-index_spec() ->
- {index_op(), binary(), index_value()}.
-
-index_op() ->
- oneof([add, remove]).
-
-index_value() ->
- oneof([binary(), int()]).
-
-endif. % EQC

0 comments on commit dfab047

Please sign in to comment.
Something went wrong with that request. Please try again.