Skip to content

Commit

Permalink
Update binary format to use t2b for gcounter
Browse files Browse the repository at this point in the history
Fix eqc test ot work with binary encoded counters.
  • Loading branch information
russelldb committed May 24, 2013
1 parent 17feb8c commit 723baae
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 66 deletions.
16 changes: 8 additions & 8 deletions src/riak_kv_counter.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@


-module(riak_kv_counter). -module(riak_kv_counter).


-export([update/3, merge/1, value/1, new/2]). -export([update/3, merge/1, value/1, new/2, to_binary/1, from_binary/1]).


-include("riak_kv_wm_raw.hrl"). -include("riak_kv_wm_raw.hrl").
-include_lib("riak_kv_types.hrl"). -include_lib("riak_kv_types.hrl").
Expand Down Expand Up @@ -123,12 +123,12 @@ counter_op(Amt) ->
update_object(RObj, _, undefined, _Siblings) -> update_object(RObj, _, undefined, _Siblings) ->
RObj; RObj;
update_object(RObj, Meta, Counter, []) -> update_object(RObj, Meta, Counter, []) ->
RObj2 = riak_object:update_value(RObj, encode(Counter)), RObj2 = riak_object:update_value(RObj, to_binary(Counter)),
RObj3 = riak_object:update_metadata(RObj2, counter_meta(Meta)), RObj3 = riak_object:update_metadata(RObj2, counter_meta(Meta)),
riak_object:apply_updates(RObj3); riak_object:apply_updates(RObj3);
update_object(RObj, Meta, Counter, SiblingValues) -> update_object(RObj, Meta, Counter, SiblingValues) ->
%% keep non-counter siblings, too %% keep non-counter siblings, too
riak_object:set_contents(RObj, [{counter_meta(Meta), encode(Counter)} | SiblingValues]). riak_object:set_contents(RObj, [{counter_meta(Meta), to_binary(Counter)} | SiblingValues]).


counter_meta(undefined) -> counter_meta(undefined) ->
Now = os:timestamp(), Now = os:timestamp(),
Expand Down Expand Up @@ -159,16 +159,16 @@ later(TS1, TS2) ->
end. end.


new(B, K) -> new(B, K) ->
Bin = encode(riak_kv_pncounter:new()), Bin = to_binary(riak_kv_pncounter:new()),
Doc0 = riak_object:new(B, K, Bin, ?COUNTER_TYPE), Doc0 = riak_object:new(B, K, Bin, ?COUNTER_TYPE),
riak_object:set_vclock(Doc0, vclock:fresh()). riak_object:set_vclock(Doc0, vclock:fresh()).


encode(Counter) -> to_binary(Counter) ->
CounterBin = riak_kv_pncounter:to_binary(Counter), CounterBin = riak_kv_pncounter:to_binary(Counter),
<<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>. <<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>.


%% decode(<<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>) -> from_binary(<<?TAG:8/integer,?V1_VERS:8/integer,CounterBin/binary>>) ->
%% riak_kv_pncounter:from_binary(CounterBin). riak_kv_pncounter:from_binary(CounterBin).


%% =================================================================== %% ===================================================================
%% EUnit tests %% EUnit tests
Expand All @@ -181,7 +181,7 @@ roundtrip_bin_test() ->
PN2 = riak_kv_pncounter:update({decrement, 1000000000000000000000000}, douglas_Actor, PN1), PN2 = riak_kv_pncounter:update({decrement, 1000000000000000000000000}, douglas_Actor, PN1),
PN3 = riak_kv_pncounter:update(increment, [{very, ["Complex"], <<"actor">>}, honest], PN2), PN3 = riak_kv_pncounter:update(increment, [{very, ["Complex"], <<"actor">>}, honest], PN2),
PN4 = riak_kv_pncounter:update(decrement, "another_acotr", PN3), PN4 = riak_kv_pncounter:update(decrement, "another_acotr", PN3),
Bin = encode(PN4), Bin = to_binary(PN4),
?debugFmt("Bin ~p t2b ~p", [byte_size(Bin), ?debugFmt("Bin ~p t2b ~p", [byte_size(Bin),
byte_size(term_to_binary({riak_kv_pncounter, PN4}))]), byte_size(term_to_binary({riak_kv_pncounter, PN4}))]),
?assert(byte_size(Bin) < term_to_binary({riak_kv_pncounter, PN4})). ?assert(byte_size(Bin) < term_to_binary({riak_kv_pncounter, PN4})).
Expand Down
50 changes: 4 additions & 46 deletions src/riak_kv_gcounter.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -125,55 +125,13 @@ increment_by(Amount, Actor, GCnt) when is_integer(Amount), Amount > 0 ->
%% @doc Encode an effecient binary representation of a `gcounter()' %% @doc Encode an effecient binary representation of a `gcounter()'
-spec to_binary(gcounter()) -> binary(). -spec to_binary(gcounter()) -> binary().
to_binary(GCnt) -> to_binary(GCnt) ->
ActorCnt = length(GCnt), EntriesBin = term_to_binary(GCnt),
EntriesBin = encode_entries(GCnt), <<?TAG:8/integer, ?V1_VERS:8/integer, EntriesBin/binary>>.
<<?TAG:8/integer, ?V1_VERS:8/integer, ActorCnt:32/integer, EntriesBin/binary>>.

%% @private
encode_entries(GCnt) ->
F = fun({Actor, Cnt}, Acc) ->
ActorBin = encode_actor(Actor),
ActorLen = byte_size(ActorBin),
CntBin = binary:encode_unsigned(Cnt),
CntLen = byte_size(CntBin),
<<Acc/binary, ActorLen:32/integer, ActorBin:ActorLen/binary,
CntLen:32/integer, CntBin:CntLen/binary>>
end,
lists:foldl(F, <<>>, GCnt).

%% @private Actor ids in riak are vnode ids, so should always be binaries
encode_actor(Actor) when is_binary(Actor) ->
<<1, Actor/binary>>;
encode_actor(Actor) ->
<<0, (term_to_binary(Actor))/binary>>.


%% @doc Decode binary G-Counter %% @doc Decode binary G-Counter
-spec from_binary(binary()) -> gcounter(). -spec from_binary(binary()) -> gcounter().
from_binary(<<?TAG:8/integer, ?V1_VERS:8/integer, ActorCnt:32/integer, EntriesBin/binary>>) -> from_binary(<<?TAG:8/integer, ?V1_VERS:8/integer, EntriesBin/binary>>) ->
decode_entries(ActorCnt, EntriesBin, []). binary_to_term(EntriesBin).

%% @private
decode_entries(0, <<>>, GCnt) ->
GCnt;
decode_entries(0, _NotEmpty, _GCnt) ->
{error, corrupt_contents};
decode_entries(Cnt, EntriesBin, GCnt) ->
{Entry, Entries} = decode_entry(EntriesBin),
decode_entries(Cnt-1, Entries, [Entry | GCnt]).

%% @private
decode_entry(<<ActorLen:32/integer, ActorBin:ActorLen/binary,
CntLen:32/integer, CntBin:CntLen/binary, Rest/binary>>) ->
Cnt = binary:decode_unsigned(CntBin),
{{decode_actor(ActorBin), Cnt}, Rest}.

%% @private
decode_actor(<<1, Bin/binary>>) ->
Bin;
decode_actor(<<0, Bin/binary>>) ->
binary_to_term(Bin).




%% =================================================================== %% ===================================================================
%% EUnit tests %% EUnit tests
Expand Down
47 changes: 35 additions & 12 deletions test/kv_counter_eqc.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@


-compile(export_all). -compile(export_all).


-define(TAG, 69).
-define(V1_VERS, 1).

-define(BUCKET, <<"b">>). -define(BUCKET, <<"b">>).
-define(KEY, <<"k">>). -define(KEY, <<"k">>).
-define(NUMTESTS, 500). -define(NUMTESTS, 500).
Expand Down Expand Up @@ -120,7 +123,7 @@ prop_update() ->
end, end,
MergeSeed = case Amt of MergeSeed = case Amt of
0 -> undefined; 0 -> undefined;
_ -> riak_kv_pncounter:new(Actor, Amt) _ -> riak_kv_pncounter:new(Actor, Amt)
end, end,


?WHENFAIL( ?WHENFAIL(
Expand Down Expand Up @@ -163,7 +166,7 @@ latest_meta(RObj, MergedMeta) ->


latest_counter_meta([], Latest) -> latest_counter_meta([], Latest) ->
Latest; Latest;
latest_counter_meta([{MD, {riak_kv_pncounter, _}}|Rest], Latest) -> latest_counter_meta([{MD, <<?TAG:8/integer, ?V1_VERS:8/integer, _CounterBin/binary>>}|Rest], Latest) ->
latest_counter_meta(Rest, get_latest_meta(MD, Latest)); latest_counter_meta(Rest, get_latest_meta(MD, Latest));
latest_counter_meta([_|Rest], Latest) -> latest_counter_meta([_|Rest], Latest) ->
latest_counter_meta(Rest, Latest). latest_counter_meta(Rest, Latest).
Expand All @@ -187,26 +190,42 @@ counters_equal(_C1, undefined) ->
false; false;
counters_equal(undefined, _C2) -> counters_equal(undefined, _C2) ->
false; false;
counters_equal(C1B, C2B) when is_binary(C1B), is_binary(C2B) ->
C1 = riak_kv_counter:from_binary(C1B),
C2 = riak_kv_counter:from_binary(C2B),
riak_kv_pncounter:equal(C1, C2);
counters_equal(C1B, C2) when is_binary(C1B) ->
C1 = riak_kv_counter:from_binary(C1B),
counters_equal(C1, C2);
counters_equal(C1, C2B) when is_binary(C2B) ->
C2 = riak_kv_counter:from_binary(C2B),
counters_equal(C1, C2);
counters_equal(C1, C2) -> counters_equal(C1, C2) ->
riak_kv_pncounter:equal(C1, C2). riak_kv_pncounter:equal(C1, C2).



%% Extract a single {meta, counter} value %% Extract a single {meta, counter} value
single_counter(Merged) -> single_counter(Merged) ->
Contents = riak_object:get_contents(Merged), Contents = riak_object:get_contents(Merged),
case [begin case [begin
{riak_kv_pncounter, Counter} = Val, <<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>> = Val,
Counter = riak_kv_pncounter:from_binary(CounterBin),
{Meta, Counter} {Meta, Counter}
end || {Meta, Val} <- Contents, end || {Meta, Val} <- Contents,
is_tuple(Val), is_counter(Val)] of
riak_kv_pncounter =:= element(1, Val)] of
[Single] -> [Single] ->
Single; Single;
_Many -> {undefined, undefined} _Many -> {undefined, undefined}
end. end.


is_counter(<<?TAG:8/integer, ?V1_VERS:8/integer, _CounterBin/binary>>) ->
true;
is_counter(_) ->
false.

non_counter_siblings(RObj) -> non_counter_siblings(RObj) ->
Contents = riak_object:get_contents(RObj), Contents = riak_object:get_contents(RObj),
{_Counters, NonCounters} = lists:partition(fun({_Md, {riak_kv_pncounter, _C}}) -> {_Counters, NonCounters} = lists:partition(fun({_Md, <<?TAG:8/integer, ?V1_VERS:8/integer, _CounterBin/binary>>}) ->
true; true;
({_MD, _Val}) -> ({_MD, _Val}) ->
false false
Expand All @@ -218,14 +237,14 @@ non_counter_siblings(RObj) ->
num_counters(RObj) -> num_counters(RObj) ->
Values = riak_object:get_values(RObj), Values = riak_object:get_values(RObj),
length([ok || Val <- Values, length([ok || Val <- Values,
is_tuple(Val), is_counter(Val)]).
riak_kv_pncounter =:= element(1, Val)]).


merge_object(RObj, Seed) -> merge_object(RObj, Seed) ->
Values = riak_object:get_values(RObj), Values = riak_object:get_values(RObj),
lists:foldl(fun({riak_kv_pncounter, Counter}, undefined) -> lists:foldl(fun(<<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>, undefined) ->
Counter; riak_kv_pncounter:from_binary(CounterBin);
({riak_kv_pncounter, Counter}, Mergedest) -> (<<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>, Mergedest) ->
Counter = riak_kv_pncounter:from_binary(CounterBin),
riak_kv_pncounter:merge(Counter, Mergedest); riak_kv_pncounter:merge(Counter, Mergedest);
(_Bin, Mergedest) -> (_Bin, Mergedest) ->
Mergedest end, Mergedest end,
Expand Down Expand Up @@ -277,8 +296,12 @@ metadatum() ->
gcounter() -> gcounter() ->
list(clock()). list(clock()).


pncounterds() ->
{gcounter(), gcounter()}.

pncounter() -> pncounter() ->
{riak_kv_pncounter, {gcounter(), gcounter()}}. ?LET(PNCounter, pncounterds(),
riak_kv_counter:to_binary(PNCounter)).


clock() -> clock() ->
{int(), nat()}. {int(), nat()}.
Expand Down

0 comments on commit 723baae

Please sign in to comment.