Permalink
Browse files

WIP commit of counters-in-riak-object

  • Loading branch information...
1 parent c2f92dd commit d515125cca819e0103787573116c339f0f16afba @russelldb russelldb committed Feb 4, 2013
Showing with 629 additions and 22 deletions.
  1. +90 −0 src/riak_kv_counter.erl
  2. +187 −0 src/riak_kv_gcounter.erl
  3. +182 −0 src/riak_kv_pncounter.erl
  4. +3 −0 src/riak_kv_put_fsm.erl
  5. +44 −22 src/riak_kv_vnode.erl
  6. +123 −0 test/crdt_statem_eqc.erl
View
@@ -0,0 +1,90 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_kv_counter: Counter logic to bridge riak_object and riak_kv_pncounter
+%%
+%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+-module(riak_kv_counter).
+
+-export([update/4, merge/2, value/1]).
+
+-include("riak_kv_wm_raw.hrl").
+
+%% A counter is a two tuple of a `riak_kv_pncounter' stored in a `riak_object'
+%% with the tag `riak_kv_pncounter' as the first element.
+%% Since counters can be stored with any name, in any bucket, there is a
+%% chance that some sibing value for a counter is
+%% not a `riak_kv_pncounter' in that case, we keep the sibling
+%% for later resolution by the user.
+-spec update(riak_object:riak_object(), term(), binary(), integer()) ->
+ riak_object:riak_object().
+update(RObj, IndexSpecs, Actor, Amt) ->
+ Values = riak_object:get_contents(RObj),
+ {Counter0, NonCounterSiblings} = merge_values(Values, riak_kv_pncounter:new(), []),
+ Meta = merged_meta(IndexSpecs),
+ Counter = update_counter(Counter0, Actor, Amt),
+ update_object(RObj, Meta, Counter, NonCounterSiblings).
+
+merge(RObj, IndexSpecs) ->
+ Values = riak_object:get_contents(RObj),
+ {Counter, NonCounterSiblings} = merge_values(Values, riak_kv_pncounter:new(), []),
+ Meta = merged_meta(IndexSpecs),
+ update_object(RObj, Meta, Counter, NonCounterSiblings).
+
+merge_values([], Mergedest, NonCounterSiblings) ->
+ {Mergedest, NonCounterSiblings};
+merge_values([{_MD, {riak_kv_pncounter, Value}} | Rest], Mergedest, NonCounterSiblings) ->
+ merge_values(Rest, do_merge(Value, Mergedest), NonCounterSiblings);
+merge_values([NotACounter|Rest], Mergedest, NonCounterSiblings) ->
+ merge_values(Rest, Mergedest, [NotACounter | NonCounterSiblings]).
+
+do_merge(C1, C2) ->
+ riak_kv_pncounter:merge(C1, C2).
+
+%% Only indexes are allowed in counter
+%% meta data.
+%% The job of merging index meta data has
+%% already been done to get the indexspecs
+%% therefore create a meta that is
+%% only the index meta data we already know about
+merged_meta(IndexSpecs) ->
+ Indexes = [{Index, Value} || {Op, Index, Value} <- IndexSpecs,
+ Op =:= add],
+ dict:store(?MD_INDEX, Indexes, dict:new()).
+
+update_counter(Counter, Actor, Amt) ->
+ Op = counter_op(Amt),
+ riak_kv_pncounter:update(Op, Actor, Counter).
+
+counter_op(Amt) when Amt < 0 ->
+ {decrement, Amt * -1};
+counter_op(Amt) ->
+ {increment, Amt}.
+
+update_object(RObj, Meta, Counter, []) ->
+ RObj2 = riak_object:update_value(RObj, {riak_kv_pncounter, Counter}),
+ RObj3 = riak_object:update_metadata(RObj2, Meta),
+ riak_object:apply_updates(RObj3);
+update_object(RObj, Meta, Counter, SiblingValues) ->
+ %% keep non-counter siblings, too
+ riak_object:set_contents(RObj, [{Meta, {riak_kv_pncounter, Counter}} | SiblingValues]).
+
+value(RObj) ->
+ Contents = riak_object:get_contents(RObj),
+ {Counter, _NonCounterSiblings} = merge_values(Contents, riak_kv_pncounter:new(), []),
+ riak_kv_pncounter:value(Counter).
View
@@ -0,0 +1,187 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_kv_gcounter: A state based, grow only, convergent counter
+%%
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%%% @doc
+%%% a G-Counter CRDT, borrows liberally from argv0 and Justin Sheehy's vclock module
+%%% @end
+
+-module(riak_kv_gcounter).
+
+-export([new/0, new/2, value/1, update/3, merge/2, equal/2]).
+
+-ifdef(EQC).
+-include_lib("eqc/include/eqc.hrl").
+-export([gen_op/0, update_expected/3, eqc_state_value/1]).
+-endif.
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
+%% EQC generator
+-ifdef(EQC).
+gen_op() ->
+ oneof([increment, {increment, gen_pos()}]).
+
+gen_pos()->
+ ?LET(X, int(), 1+abs(X)).
+
+update_expected(_ID, increment, Prev) ->
+ Prev+1;
+update_expected(_ID, {increment, By}, Prev) ->
+ Prev+By;
+update_expected(_ID, _Op, Prev) ->
+ Prev.
+
+eqc_state_value(S) ->
+ S.
+-endif.
+
+new() ->
+ [].
+
+%% create a counter with an initial update
+new(Id, Count) when is_integer(Count), Count > 0 ->
+ update({increment, Count}, Id, new()).
+
+value(GCnt) ->
+ lists:sum([ Cnt || {_Act, Cnt} <- GCnt]).
+
+update(increment, Actor, GCnt) ->
+ increment_by(1, Actor, GCnt);
+update({increment, Amount}, Actor, GCnt) when is_integer(Amount), Amount > 0 ->
+ increment_by(Amount, Actor, GCnt).
+
+merge(GCnt1, GCnt2) ->
+ merge(GCnt1, GCnt2, []).
+
+merge([], [], Acc) ->
+ lists:reverse(Acc);
+merge(LeftOver, [], Acc) ->
+ lists:reverse(Acc, LeftOver);
+merge([], LeftOver, Acc) ->
+ lists:reverse(Acc, LeftOver);
+merge([{Actor1, Cnt1}=AC1|Rest], Clock2, Acc) ->
+ case lists:keytake(Actor1, 1, Clock2) of
+ {value, {Actor1, Cnt2}, RestOfClock2} ->
+ merge(Rest, RestOfClock2, [{Actor1, max(Cnt1, Cnt2)}|Acc]);
+ false ->
+ merge(Rest, Clock2, [AC1|Acc])
+ end.
+
+equal(VA,VB) ->
+ lists:sort(VA) =:= lists:sort(VB).
+
+%% priv
+increment_by(Amount, Actor, GCnt) when is_integer(Amount), Amount > 0 ->
+ {Ctr, NewGCnt} = case lists:keytake(Actor, 1, GCnt) of
+ false ->
+ {Amount, GCnt};
+ {value, {_N, C}, ModGCnt} ->
+ {C + Amount, ModGCnt}
+ end,
+ [{Actor, Ctr}|NewGCnt].
+
+
+%% ===================================================================
+%% EUnit tests
+%% ===================================================================
+-ifdef(TEST).
+
+-ifdef(EQC).
+eqc_value_test_() ->
+ {timeout, 120, [?_assert(crdt_statem_eqc:prop_converge(0, 1000, ?MODULE))]}.
+-endif.
+
+new_test() ->
+ ?assertEqual([], new()).
+
+value_test() ->
+ GC1 = [{1, 1}, {2, 13}, {3, 1}],
+ GC2 = [],
+ ?assertEqual(15, value(GC1)),
+ ?assertEqual(0, value(GC2)).
+
+update_increment_test() ->
+ GC0 = new(),
+ GC1 = update(increment, 1, GC0),
+ GC2 = update(increment, 2, GC1),
+ GC3 = update(increment, 1, GC2),
+ ?assertEqual([{1, 2}, {2, 1}], GC3).
+
+update_increment_by_test() ->
+ GC0 = new(),
+ GC = update({increment, 7}, 1, GC0),
+ ?assertEqual([{1, 7}], GC).
+
+merge_test() ->
+ GC1 = [{<<"1">>, 1},
+ {<<"2">>, 2},
+ {<<"4">>, 4}],
+ GC2 = [{<<"3">>, 3},
+ {<<"4">>, 3}],
+ ?assertEqual([], merge(new(), new())),
+ ?assertEqual([{<<"1">>,1},{<<"2">>,2},{<<"3">>,3},{<<"4">>,4}],
+ lists:sort( merge(GC1, GC2))).
+
+merge_less_left_test() ->
+ GC1 = [{<<"5">>, 5}],
+ GC2 = [{<<"6">>, 6}, {<<"7">>, 7}],
+ ?assertEqual([{<<"5">>, 5},{<<"6">>,6}, {<<"7">>, 7}],
+ merge(GC1, GC2)).
+
+merge_less_right_test() ->
+ GC1 = [{<<"6">>, 6}, {<<"7">>,7}],
+ GC2 = [{<<"5">>, 5}],
+ ?assertEqual([{<<"5">>,5},{<<"6">>,6}, {<<"7">>, 7}],
+ lists:sort( merge(GC1, GC2)) ).
+
+merge_same_id_test() ->
+ GC1 = [{<<"1">>, 2},{<<"2">>,4}],
+ GC2 = [{<<"1">>, 3},{<<"3">>,5}],
+ ?assertEqual([{<<"1">>, 3},{<<"2">>,4},{<<"3">>,5}],
+ lists:sort( merge(GC1, GC2)) ).
+
+equal_test() ->
+ GC1 = [{1, 2}, {2, 1}, {4, 1}],
+ GC2 = [{1, 1}, {2, 4}, {3, 1}],
+ GC3 = [{1, 2}, {2, 1}, {4, 1}],
+ GC4 = [{4, 1}, {1, 2}, {2, 1}],
+ ?assertNot(equal(GC1, GC2)),
+ ?assert(equal(GC1, GC3)),
+ ?assert(equal(GC1, GC4)).
+
+usage_test() ->
+ GC1 = new(),
+ GC2 = new(),
+ ?assert(equal(GC1, GC2)),
+ GC1_1 = update({increment, 2}, a1, GC1),
+ GC2_1 = update(increment, a2, GC2),
+ GC3 = merge(GC1_1, GC2_1),
+ GC2_2 = update({increment, 3}, a3, GC2_1),
+ GC3_1 = update(increment, a4, GC3),
+ GC3_2 = update(increment, a1, GC3_1),
+ ?assertEqual([{a1, 3}, {a2, 1}, {a3, 3}, {a4, 1}],
+ lists:sort(merge(GC3_2, GC2_2))).
+
+
+-endif.
Oops, something went wrong.

0 comments on commit d515125

Please sign in to comment.