Permalink
Browse files

WIP commit

EQC test
Changes to counter because of it
Export riak_object:index_specs
  • Loading branch information...
1 parent 3bc205b commit 94a55e4be316d4986544482bf7ce61c975d21bda @russelldb russelldb committed Feb 11, 2013
Showing with 269 additions and 24 deletions.
  1. +49 −21 src/riak_kv_counter.erl
  2. +5 −3 src/riak_object.erl
  3. +215 −0 test/kv_counter_eqc.erl
View
@@ -25,36 +25,62 @@
-include("riak_kv_wm_raw.hrl").
-%% A counter is a two tuple of a `riak_kv_pncounter' stored in a `riak_object'
+%% @doc A counter is a two tuple of a `riak_kv_pncounter'
+%% stored in a `riak_object'
%% with the tag `riak_kv_pncounter' as the first element.
%% Since counters can be stored with any name, in any bucket, there is a
-%% chance that some sibing value for a counter is
+%% chance that some sibling value for a counter is
%% not a `riak_kv_pncounter' in that case, we keep the sibling
%% for later resolution by the user.
--spec update(riak_object:riak_object(), term(), binary(), integer()) ->
+%%
+%% @TODO How do we let callers now about the sibling values?
+-spec update(riak_object:riak_object(), riak_object:index_specs(),
+ binary(), integer()) ->
riak_object:riak_object().
update(RObj, IndexSpecs, Actor, Amt) ->
- Values = riak_object:get_contents(RObj),
- {Counter0, NonCounterSiblings} = merge_values(Values, riak_kv_pncounter:new(), []),
- Meta = merged_meta(IndexSpecs),
+ {Counter0, NonCounterSiblings, Meta} = merge_object(RObj, IndexSpecs,
+ riak_kv_pncounter:new()),
Counter = update_counter(Counter0, Actor, Amt),
update_object(RObj, Meta, Counter, NonCounterSiblings).
+%% @doc Unlike regular, opaque `riak_object' values, conflicting
+%% counter writes can be merged by Riak, thanks to their internal
+%% CRDT PN-Counter structure.
+-spec merge(riak_object:riak_object(), riak_object:index_specs()) ->
+ riak_object:riak_object().
merge(RObj, IndexSpecs) ->
- Values = riak_object:get_contents(RObj),
- {Counter, NonCounterSiblings} = merge_values(Values, riak_kv_pncounter:new(), []),
- Meta = merged_meta(IndexSpecs),
+ {Counter, NonCounterSiblings, Meta} = merge_object(RObj, IndexSpecs, undefined),
update_object(RObj, Meta, Counter, NonCounterSiblings).
-merge_values([], Mergedest, NonCounterSiblings) ->
- {Mergedest, NonCounterSiblings};
-merge_values([{_MD, {riak_kv_pncounter, Value}} | Rest], Mergedest, NonCounterSiblings) ->
- merge_values(Rest, do_merge(Value, Mergedest), NonCounterSiblings);
-merge_values([NotACounter|Rest], Mergedest, NonCounterSiblings) ->
- merge_values(Rest, Mergedest, [NotACounter | NonCounterSiblings]).
+%% @doc Currently _IGNORES_ all non-counter sibling values
+-spec value(riak_object:riak_object()) ->
+ integer().
+value(RObj) ->
+ Contents = riak_object:get_contents(RObj),
+ {Counter, _NonCounterSiblings} = merge_contents(Contents, riak_kv_pncounter:new()),
+ riak_kv_pncounter:value(Counter).
+
+%% Merge contents _AND_ meta
+merge_object(RObj, IndexSpecs, Seed) ->
+ Contents = riak_object:get_contents(RObj),
+ {Counter, NonCounterSiblings} = merge_contents(Contents, Seed),
+ Meta = merged_meta(IndexSpecs),
+ {Counter, NonCounterSiblings, Meta}.
-do_merge(C1, C2) ->
- riak_kv_pncounter:merge(C1, C2).
+%% Only merge the values of actual PN-Counters
+%% If a non-CRDT datum is present, keep it as a sibling value
+merge_contents(Contents, Seed) ->
+ lists:foldl(fun merge_value/2,
+ {Seed, []},
+ Contents).
+
+%% worker for `do_merge/1'
+merge_value({_MD, {riak_kv_pncounter, _Counter}}=PNCount, {undefined, NonCounterSiblings}) ->
+ merge_value(PNCount, {riak_kv_pncounter:new(), NonCounterSiblings});
+merge_value({_MD, {riak_kv_pncounter, Counter}}, {Mergedest, NonCounterSiblings}) ->
+ {riak_kv_pncounter:merge(Counter, Mergedest), NonCounterSiblings};
+merge_value(NonCounter, {Mergedest, NonCounterSiblings}) ->
+ {Mergedest, [NonCounter | NonCounterSiblings]}.
%% Only indexes are allowed in counter
%% meta data.
@@ -76,6 +102,11 @@ counter_op(Amt) when Amt < 0 ->
counter_op(Amt) ->
{increment, Amt}.
+%% This uses an exported but marked INTERNAL
+%% function of `riak_object:set_contents' to preserve
+%% non-counter sibling values and Metadata
+update_object(RObj, _Meta, undefined, _Siblings) ->
+ RObj;
update_object(RObj, Meta, Counter, []) ->
RObj2 = riak_object:update_value(RObj, {riak_kv_pncounter, Counter}),
RObj3 = riak_object:update_metadata(RObj2, Meta),
@@ -84,7 +115,4 @@ update_object(RObj, Meta, Counter, SiblingValues) ->
%% keep non-counter siblings, too
riak_object:set_contents(RObj, [{Meta, {riak_kv_pncounter, Counter}} | SiblingValues]).
-value(RObj) ->
- Contents = riak_object:get_contents(RObj),
- {Counter, _NonCounterSiblings} = merge_values(Contents, riak_kv_pncounter:new(), []),
- riak_kv_pncounter:value(Counter).
+
View
@@ -28,7 +28,7 @@
-endif.
-include("riak_kv_wm_raw.hrl").
--export_type([riak_object/0, bucket/0, key/0, value/0]).
+-export_type([riak_object/0, bucket/0, key/0, value/0, index_specs/0]).
-type key() :: binary().
-type bucket() :: binary().
@@ -53,6 +53,8 @@
-type index_op() :: add | remove.
-type index_value() :: integer() | binary().
+-type index_spec() :: {index_op(), binary(), index_value()}.
+-type index_specs() :: [index_spec()].
-define(MAX_KEY_SIZE, 65536).
@@ -333,7 +335,7 @@ increment_vclock(Object=#r_object{}, ClientId, Timestamp) ->
%% stored for a key and therefore no existing
%% index data.
-spec index_specs(riak_object()) ->
- [{index_op(), binary(), index_value()}].
+ index_specs().
index_specs(Obj) ->
Indexes = index_data(Obj),
assemble_index_specs(Indexes, add).
@@ -345,7 +347,7 @@ index_specs(Obj) ->
%% second parameter. If there are siblings only the unique new
%% indexes are added.
-spec diff_index_specs(riak_object(), riak_object()) ->
- [{index_op(), binary(), index_value()}].
+ index_specs().
diff_index_specs(Obj, OldObj) ->
OldIndexes = index_data(OldObj),
OldIndexSet = ordsets:from_list(OldIndexes),
View
@@ -0,0 +1,215 @@
+%% -------------------------------------------------------------------
+%%
+%% kv_counter_eqc: Quickcheck test for riak_kv_counter
+%%
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(kv_counter_eqc).
+
+%%-ifdef(EQC).
+-include_lib("eqc/include/eqc.hrl").
+-include_lib("eqc/include/eqc_statem.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-include("../src/riak_kv_wm_raw.hrl").
+
+-compile(export_all).
+
+-define(BUCKET, <<"b">>).
+-define(KEY, <<"k">>).
+-define(NUMTESTS, 1000).
+-define(QC_OUT(P),
+ eqc:on_output(fun(Str, Args) ->
+ io:format(user, Str, Args) end, P)).
+
+%%====================================================================
+%% Shell helpers
+%%====================================================================
+
+test() ->
+ test(100).
+
+test(N) ->
+ quickcheck(numtests(N, prop_value())).
+
+test_merge() ->
+ test_merge(100).
+
+test_merge(N) ->
+ quickcheck(numtests(N, prop_merge())).
+
+prop_value() ->
+ %% given any riak_object,
+ %% value will return the value
+ %% of merged PN-Counter payloads (zero if no counters)
+ ?FORALL(RObj, riak_object(),
+ collect(num_counters(riak_object:get_values(RObj)),
+ equals(sumthem(RObj), riak_kv_counter:value(RObj)))).
+
+prop_merge() ->
+ %% given any riak_object
+ %% merge will return a
+ %% riak_object where all counter siblings
+ %% are squashed to a single value
+ %% and all counter metadata is squashed to
+ %% a single dict of indexes
+ %% all other siblings must be maintained, untouched.
+ ?FORALL({RObj, IndexSpecs}, {riak_object(), index_specs()},
+ begin
+ Merged = riak_kv_counter:merge(RObj, IndexSpecs),
+ NumCounters = num_counters(riak_object:get_values(RObj)),
+ NumMergedCounters = num_counters(riak_object:get_values(Merged)),
+ ExpectedCounters = case NumCounters of
+ 0 -> 0;
+ _ -> 1
+ end,
+ %% Check the structure of the merged counter is correct
+ ExpectedCounter = merge_object(RObj, undefined),
+ MergedCounter = single_counter(Merged),
+ %% Check that the meta is correct
+ %% Check that non-sibling values and meta are untouched
+ ExpectedSiblings = non_counter_siblings(RObj),
+ ActualSiblings = non_counter_siblings(Merged),
+ ?WHENFAIL(
+ begin
+ io:format("Gen ~p\n", [RObj]),
+ io:format("Merged ~p\n", [Merged])
+ end,
+ collect(NumCounters,
+ conjunction([{number_of_counters,
+ equals(ExpectedCounters, NumMergedCounters)},
+ {counter_structure,
+ counters_equal(ExpectedCounter, MergedCounter)},
+ {siblings, equals(lists:sort(ExpectedSiblings), lists:sort(ActualSiblings))}
+ ])))
+ end).
+
+%%====================================================================
+%% Helpers
+%%====================================================================
+counters_equal(undefined, undefined) ->
+ true;
+counters_equal(_C1, undefined) ->
+ false;
+counters_equal(undefined, _C2) ->
+ false;
+counters_equal(C1, C2) ->
+ riak_kv_pncounter:equal(C1, C2).
+
+single_counter(Merged) ->
+ Values = riak_object:get_values(Merged),
+ case [begin
+ {riak_kv_pncounter, Counter} = Val,
+ Counter
+ end|| Val <- Values,
+ is_tuple(Val),
+ riak_kv_pncounter =:= element(1, Val)] of
+ [Single] ->
+ Single;
+ _Many -> undefined
+ end.
+
+non_counter_siblings(RObj) ->
+ Contents = riak_object:get_contents(RObj),
+ {_Counters, NonCounters} = lists:partition(fun({_Md, {riak_kv_pncounter, _C}}) ->
+ true;
+ ({_MD, _Val}) ->
+ false
+ end,
+ Contents),
+ NonCounters.
+
+num_counters(Values) ->
+ length([ok || Val <- Values,
+ is_tuple(Val),
+ riak_kv_pncounter =:= element(1, Val)]).
+
+merge_object(RObj, Seed) ->
+ Values = riak_object:get_values(RObj),
+ lists:foldl(fun({riak_kv_pncounter, Counter}, undefined) ->
+ Counter;
+ ({riak_kv_pncounter, Counter}, Mergedest) ->
+ riak_kv_pncounter:merge(Counter, Mergedest);
+ (_Bin, Mergedest) ->
+ Mergedest end,
+ Seed,
+ Values).
+
+%% Somewhat duplicates the logic under test
+%% but is a different implementation, at least
+sumthem(RObj) ->
+ Merged = merge_object(RObj, riak_kv_pncounter:new()),
+ riak_kv_pncounter:value(Merged).
+
+%%====================================================================
+%% Generators
+%%====================================================================
+
+riak_object() ->
+ ?LET({Contents, VClock},
+ {contents(), fsm_eqc_util:vclock()},
+ riak_object:set_contents(
+ riak_object:set_vclock(
+ riak_object:new(?BUCKET, ?KEY, <<>>),
+ VClock),
+ Contents)).
+
+contents() ->
+ list(content()).
+
+content() ->
+ {metadata(), value()}.
+
+metadata() ->
+ %% generate a dict of metadata
+ ?LET(Meta, metadatas(), dict:from_list(Meta)).
+
+metadatas() ->
+ list(metadatum()).
+
+metadatum() ->
+ %% doesn't need to be realistic, even
+ %% just present
+ {binary(), binary()}.
+
+value() ->
+ oneof([binary(), pncounter()]).
+
+gcounter() ->
+ list(clock()).
+
+pncounter() ->
+ {riak_kv_pncounter, {gcounter(), gcounter()}}.
+
+clock() ->
+ {int(), nat()}.
+
+index_specs() ->
+ list(index_spec()).
+
+index_spec() ->
+ {index_op(), binary(), index_value()}.
+
+index_op() ->
+ oneof([add, remove]).
+
+index_value() ->
+ oneof([binary(), int()]).
+
+%%-endif. % EQC

0 comments on commit 94a55e4

Please sign in to comment.