Permalink
Browse files

Merge pull request #563 from basho/rdb-kv-counter_bin

Add binary format for counters
  • Loading branch information...
2 parents 1d7cbd9 + 52182a5 commit 1e32b2a0bff5878e6e22420feca303179e350a34 @russelldb russelldb committed May 24, 2013
@@ -36,9 +36,17 @@
-module(riak_kv_counter).
--export([update/3, merge/1, value/1]).
+-export([update/3, merge/1, value/1, new/2, to_binary/1, from_binary/1]).
-include("riak_kv_wm_raw.hrl").
+-include_lib("riak_kv_types.hrl").
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
+-define(TAG, 69).
+-define(V1_VERS, 1).
%% @doc Update `Actor's entry by `Amt' and store it in `RObj'.
-spec update(riak_object:riak_object(), binary(), integer()) ->
@@ -85,9 +93,13 @@ merge_contents(Contents) ->
Contents).
%% worker for `merge_contents/1'
-merge_value({MD, {riak_kv_pncounter, Counter}}, {undefined, undefined, NonCounterSiblings}) ->
+merge_value({MD, <<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>},
+ {undefined, undefined, NonCounterSiblings}) ->
+ Counter = riak_kv_pncounter:from_binary(CounterBin),
{MD, Counter, NonCounterSiblings};
-merge_value({MD, {riak_kv_pncounter, Counter}}, {MergedMeta, Mergedest, NonCounterSiblings}) ->
+merge_value({MD, <<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>},
+ {MergedMeta, Mergedest, NonCounterSiblings}) ->
+ Counter = riak_kv_pncounter:from_binary(CounterBin),
{merge_meta(MD, MergedMeta), riak_kv_pncounter:merge(Counter, Mergedest), NonCounterSiblings};
merge_value(NonCounter, {MD, Mergedest, NonCounterSiblings}) ->
{MD, Mergedest, [NonCounter | NonCounterSiblings]}.
@@ -111,12 +123,12 @@ counter_op(Amt) ->
update_object(RObj, _, undefined, _Siblings) ->
RObj;
update_object(RObj, Meta, Counter, []) ->
- RObj2 = riak_object:update_value(RObj, {riak_kv_pncounter, Counter}),
+ RObj2 = riak_object:update_value(RObj, to_binary(Counter)),
RObj3 = riak_object:update_metadata(RObj2, counter_meta(Meta)),
riak_object:apply_updates(RObj3);
update_object(RObj, Meta, Counter, SiblingValues) ->
%% keep non-counter siblings, too
- riak_object:set_contents(RObj, [{counter_meta(Meta), {riak_kv_pncounter, Counter}} | SiblingValues]).
+ riak_object:set_contents(RObj, [{counter_meta(Meta), to_binary(Counter)} | SiblingValues]).
counter_meta(undefined) ->
Now = os:timestamp(),
@@ -145,3 +157,31 @@ later(TS1, TS2) ->
_ ->
true
end.
+
+new(B, K) ->
+ Bin = to_binary(riak_kv_pncounter:new()),
+ Doc0 = riak_object:new(B, K, Bin, ?COUNTER_TYPE),
+ riak_object:set_vclock(Doc0, vclock:fresh()).
+
+to_binary(Counter) ->
+ CounterBin = riak_kv_pncounter:to_binary(Counter),
+ <<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>.
+
+from_binary(<<?TAG:8/integer,?V1_VERS:8/integer,CounterBin/binary>>) ->
+ riak_kv_pncounter:from_binary(CounterBin).
+
+%% ===================================================================
+%% EUnit tests
+%% ===================================================================
+-ifdef(TEST).
+
+roundtrip_bin_test() ->
+ PN = riak_kv_pncounter:new(),
+ PN1 = riak_kv_pncounter:update({increment, 2}, <<"a1">>, PN),
+ PN2 = riak_kv_pncounter:update({decrement, 1000000000000000000000000}, douglas_Actor, PN1),
+ PN3 = riak_kv_pncounter:update(increment, [{very, ["Complex"], <<"actor">>}, honest], PN2),
+ PN4 = riak_kv_pncounter:update(decrement, "another_acotr", PN3),
+ Bin = to_binary(PN4),
+ ?assert(byte_size(Bin) < term_to_binary({riak_kv_pncounter, PN4})).
+
+-endif.
@@ -36,7 +36,7 @@
-module(riak_kv_gcounter).
--export([new/0, new/2, value/1, update/3, merge/2, equal/2]).
+-export([new/0, new/2, value/1, update/3, merge/2, equal/2, to_binary/1, from_binary/1]).
%% EQC API
-ifdef(EQC).
@@ -119,6 +119,19 @@ increment_by(Amount, Actor, GCnt) when is_integer(Amount), Amount > 0 ->
end,
[{Actor, Ctr}|NewGCnt].
+-define(TAG, 70).
+-define(V1_VERS, 1).
+
+%% @doc Encode an effecient binary representation of a `gcounter()'
+-spec to_binary(gcounter()) -> binary().
+to_binary(GCnt) ->
+ EntriesBin = term_to_binary(GCnt),
+ <<?TAG:8/integer, ?V1_VERS:8/integer, EntriesBin/binary>>.
+
+%% @doc Decode binary G-Counter
+-spec from_binary(binary()) -> gcounter().
+from_binary(<<?TAG:8/integer, ?V1_VERS:8/integer, EntriesBin/binary>>) ->
+ binary_to_term(EntriesBin).
%% ===================================================================
%% EUnit tests
@@ -218,5 +231,26 @@ usage_test() ->
?assertEqual([{a1, 3}, {a2, 1}, {a3, 3}, {a4, 1}],
lists:sort(merge(GC3_2, GC2_2))).
+roundtrip_bin_test() ->
+ GC = new(),
+ GC1 = update({increment, 2}, <<"a1">>, GC),
+ GC2 = update({increment, 4}, a2, GC1),
+ GC3 = update(increment, "a4", GC2),
+ GC4 = update({increment, 10000000000000000000000000000000000000000}, {complex, "actor", [<<"term">>, 2]}, GC3),
+ Bin = to_binary(GC4),
+ Decoded = from_binary(Bin),
+ ?assert(equal(GC4, Decoded)).
+
+lots_of_actors_test() ->
+ GC = lists:foldl(fun(_, GCnt) ->
+ ActorLen = crypto:rand_uniform(1, 1000),
+ Actor = crypto:rand_bytes(ActorLen),
+ Cnt = crypto:rand_uniform(1, 10000),
+ riak_kv_gcounter:update({increment, Cnt}, Actor, GCnt) end,
+ new(),
+ lists:seq(1, 1000)),
+ Bin = to_binary(GC),
+ Decoded = from_binary(Bin),
+ ?assert(equal(GC, Decoded)).
-endif.
@@ -41,7 +41,6 @@
-include_lib("riak_pb/include/riak_kv_pb.hrl").
-include_lib("riak_pb/include/riak_pb_kv_codec.hrl").
--include_lib("riak_kv_types.hrl").
-behaviour(riak_api_pb_service).
@@ -97,8 +96,8 @@ process(#rpbcounterupdatereq{bucket=B, key=K, w=W0, dw=DW0, pw=PW0, amount=Coun
#state{client=C} = State) ->
case allow_mult(B) of
true ->
- O0 = riak_object:new(B, K, ?NEW_COUNTER, ?COUNTER_TYPE),
- O = riak_object:set_vclock(O0, vclock:fresh()),
+ O = riak_kv_counter:new(B, K),
+
%% erlang_protobuffs encodes as 1/0/undefined
W = decode_quorum(W0),
DW = decode_quorum(DW0),
@@ -34,7 +34,7 @@
-module(riak_kv_pncounter).
--export([new/0, new/2, value/1, update/3, merge/2, equal/2]).
+-export([new/0, new/2, value/1, update/3, merge/2, equal/2, to_binary/1, from_binary/1]).
%% EQC API
-ifdef(EQC).
@@ -90,6 +90,7 @@ update({decrement, By}, Actor, {Incr, Decr}) when is_integer(By), By > 0 ->
%% @doc Merge two `pncounter()'s to a single `pncounter()'. This is the Least Upper Bound
%% function described in the literature.
+-spec merge(pncounter(), pncounter()) -> pncounter().
merge({Incr1, Decr1}, {Incr2, Decr2}) ->
MergedIncr = riak_kv_gcounter:merge(Incr1, Incr2),
MergedDecr = riak_kv_gcounter:merge(Decr1, Decr2),
@@ -98,9 +99,31 @@ merge({Incr1, Decr1}, {Incr2, Decr2}) ->
%% @doc Are two `pncounter()'s structurally equal? This is not `value/1' equality.
%% Two counters might represent the total `-42', and not be `equal/2'. Equality here is
%% that both counters represent exactly the same information.
+-spec equal(pncounter(), pncounter()) -> boolean().
equal({Incr1, Decr1}, {Incr2, Decr2}) ->
riak_kv_gcounter:equal(Incr1, Incr2) andalso riak_kv_gcounter:equal(Decr1, Decr2).
+-define(TAG, 71).
+-define(V1_VERS, 1).
+
+%% @doc Encode an effecient binary representation of `pncounter()'
+-spec to_binary(pncounter()) -> binary().
+to_binary({P, N}) ->
+ PBin = riak_kv_gcounter:to_binary(P),
+ NBin = riak_kv_gcounter:to_binary(N),
+ PBinLen = byte_size(PBin),
+ NBinLen = byte_size(NBin),
+ <<?TAG:8/integer, ?V1_VERS:8/integer,
+ PBinLen:32/integer, PBin:PBinLen/binary,
+ NBinLen:32/integer, NBin:NBinLen/binary>>.
+
+%% @doc Decode a binary encoded PN-Counter
+-spec from_binary(binary()) -> pncounter().
+from_binary(<<?TAG:8/integer, ?V1_VERS:8/integer,
+ PBinLen:32/integer, PBin:PBinLen/binary,
+ NBinLen:32/integer, NBin:NBinLen/binary>>) ->
+ {riak_kv_gcounter:from_binary(PBin), riak_kv_gcounter:from_binary(NBin)}.
+
%% ===================================================================
%% EUnit tests
%% ===================================================================
@@ -209,4 +232,14 @@ usage_test() ->
?assertEqual({[{a1, 3}, {a4, 1}, {a2, 1}, {a3, 3}], [{a5, 2}, {a2, 1}]},
merge(PNCnt3_3, PNCnt2_3)).
+roundtrip_bin_test() ->
+ PN = new(),
+ PN1 = update({increment, 2}, <<"a1">>, PN),
+ PN2 = update({decrement, 1000000000000000000000000}, douglas_Actor, PN1),
+ PN3 = update(increment, [{very, ["Complex"], <<"actor">>}, honest], PN2),
+ PN4 = update(decrement, "another_acotr", PN3),
+ Bin = to_binary(PN4),
+ Decoded = from_binary(Bin),
+ ?assert(equal(PN4, Decoded)).
+
-endif.
View
@@ -948,7 +948,7 @@ prepare_put(#state{idx=Idx,
false ->
IndexSpecs = []
end,
- ObjToStore = prepare_coord_put(Coord, RObj, VId, StartTime, CounterOp),
+ ObjToStore = prepare_new_put(Coord, RObj, VId, StartTime, CounterOp),
{{true, ObjToStore}, PutArgs#putargs{index_specs=IndexSpecs, is_index=IndexBackend}};
{ok, Val} ->
OldObj = object_from_binary(Bucket, Key, Val),
@@ -981,17 +981,15 @@ prepare_put(#state{idx=Idx,
end.
%% @Doc in the case that this a co-ordinating put, prepare the object.
-prepare_coord_put(true, RObj, VId, StartTime, CounterOp) when is_integer(CounterOp) ->
+prepare_new_put(true, RObj, VId, StartTime, CounterOp) when is_integer(CounterOp) ->
VClockUp = riak_object:increment_vclock(RObj, VId, StartTime),
- %% coordinating a counter operation means
+ %% coordinating a _NEW_ counter operation means
%% creating + incrementing the counter.
%% Make a new counter, stuff it in the riak_object
- Counter = riak_kv_pncounter:new(VId, CounterOp),
- Incremented = riak_object:update_value(VClockUp, {riak_kv_pncounter, Counter}),
- riak_object:apply_updates(Incremented);
-prepare_coord_put(true, RObj, VId, StartTime, _CounterOp) ->
+ riak_kv_counter:update(VClockUp, VId, CounterOp);
+prepare_new_put(true, RObj, VId, StartTime, _CounterOp) ->
riak_object:increment_vclock(RObj, VId, StartTime);
-prepare_coord_put(false, RObj, _VId, _StartTime, _CounterOp) ->
+prepare_new_put(false, RObj, _VId, _StartTime, _CounterOp) ->
RObj.
handle_counter(true, CounterOp, VId, RObj) when is_integer(CounterOp) ->
@@ -1000,7 +998,8 @@ handle_counter(false, CounterOp, _Vid, RObj) when is_integer(CounterOp) ->
%% non co-ord put, merge the values if there are siblings
%% 'cos that is the point of CRDTs / counters: no siblings
riak_kv_counter:merge(RObj);
-handle_counter(_Coord, __CounterOp, _VId, RObj) ->
+handle_counter(_Coord, _CounterOp, _VId, RObj) ->
+ %% i.e. not a counter op
RObj.
perform_put({false, Obj},
@@ -110,7 +110,6 @@
-include_lib("webmachine/include/webmachine.hrl").
-include("riak_kv_wm_raw.hrl").
--include_lib("riak_kv_types.hrl").
%% @spec init(proplist()) -> {ok, context()}
%% @doc Initialize this resource. This function extracts the
@@ -254,15 +253,14 @@ accept_doc_body(RD, Ctx=#ctx{bucket=B, key=K, client=C,
counter_op=CounterOp}) ->
case allow_mult(B) of
true ->
- Doc0 = riak_object:new(B, K, ?NEW_COUNTER, ?COUNTER_TYPE),
- VclockDoc = riak_object:set_vclock(Doc0, vclock:fresh()),
+ Doc = riak_kv_counter:new(B, K),
Options = [{counter_op, CounterOp}] ++ return_value(RD),
- case C:put(VclockDoc, [{w, Ctx#ctx.w}, {dw, Ctx#ctx.dw}, {pw, Ctx#ctx.pw}, {timeout, 60000} |
+ case C:put(Doc, [{w, Ctx#ctx.w}, {dw, Ctx#ctx.dw}, {pw, Ctx#ctx.pw}, {timeout, 60000} |
Options]) of
{error, Reason} ->
handle_common_error(Reason, RD, Ctx);
ok ->
- {true, RD, Ctx#ctx{doc={ok, VclockDoc}}};
+ {true, RD, Ctx#ctx{doc={ok, Doc}}};
{ok, RObj} ->
Body = produce_doc_body(RObj),
{true, wrq:append_to_resp_body(Body, RD), Ctx#ctx{doc={ok, RObj}}}
@@ -30,6 +30,9 @@
-compile(export_all).
+-define(TAG, 69).
+-define(V1_VERS, 1).
+
-define(BUCKET, <<"b">>).
-define(KEY, <<"k">>).
-define(NUMTESTS, 500).
@@ -120,7 +123,7 @@ prop_update() ->
end,
MergeSeed = case Amt of
0 -> undefined;
- _ -> riak_kv_pncounter:new(Actor, Amt)
+ _ -> riak_kv_pncounter:new(Actor, Amt)
end,
?WHENFAIL(
@@ -163,7 +166,7 @@ latest_meta(RObj, MergedMeta) ->
latest_counter_meta([], Latest) ->
Latest;
-latest_counter_meta([{MD, {riak_kv_pncounter, _}}|Rest], Latest) ->
+latest_counter_meta([{MD, <<?TAG:8/integer, ?V1_VERS:8/integer, _CounterBin/binary>>}|Rest], Latest) ->
latest_counter_meta(Rest, get_latest_meta(MD, Latest));
latest_counter_meta([_|Rest], Latest) ->
latest_counter_meta(Rest, Latest).
@@ -187,26 +190,42 @@ counters_equal(_C1, undefined) ->
false;
counters_equal(undefined, _C2) ->
false;
+counters_equal(C1B, C2B) when is_binary(C1B), is_binary(C2B) ->
+ C1 = riak_kv_counter:from_binary(C1B),
+ C2 = riak_kv_counter:from_binary(C2B),
+ riak_kv_pncounter:equal(C1, C2);
+counters_equal(C1B, C2) when is_binary(C1B) ->
+ C1 = riak_kv_counter:from_binary(C1B),
+ counters_equal(C1, C2);
+counters_equal(C1, C2B) when is_binary(C2B) ->
+ C2 = riak_kv_counter:from_binary(C2B),
+ counters_equal(C1, C2);
counters_equal(C1, C2) ->
riak_kv_pncounter:equal(C1, C2).
+
%% Extract a single {meta, counter} value
single_counter(Merged) ->
Contents = riak_object:get_contents(Merged),
case [begin
- {riak_kv_pncounter, Counter} = Val,
+ <<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>> = Val,
+ Counter = riak_kv_pncounter:from_binary(CounterBin),
{Meta, Counter}
end || {Meta, Val} <- Contents,
- is_tuple(Val),
- riak_kv_pncounter =:= element(1, Val)] of
+ is_counter(Val)] of
[Single] ->
Single;
_Many -> {undefined, undefined}
end.
+is_counter(<<?TAG:8/integer, ?V1_VERS:8/integer, _CounterBin/binary>>) ->
+ true;
+is_counter(_) ->
+ false.
+
non_counter_siblings(RObj) ->
Contents = riak_object:get_contents(RObj),
- {_Counters, NonCounters} = lists:partition(fun({_Md, {riak_kv_pncounter, _C}}) ->
+ {_Counters, NonCounters} = lists:partition(fun({_Md, <<?TAG:8/integer, ?V1_VERS:8/integer, _CounterBin/binary>>}) ->
true;
({_MD, _Val}) ->
false
@@ -218,14 +237,14 @@ non_counter_siblings(RObj) ->
num_counters(RObj) ->
Values = riak_object:get_values(RObj),
length([ok || Val <- Values,
- is_tuple(Val),
- riak_kv_pncounter =:= element(1, Val)]).
+ is_counter(Val)]).
merge_object(RObj, Seed) ->
Values = riak_object:get_values(RObj),
- lists:foldl(fun({riak_kv_pncounter, Counter}, undefined) ->
- Counter;
- ({riak_kv_pncounter, Counter}, Mergedest) ->
+ lists:foldl(fun(<<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>, undefined) ->
+ riak_kv_pncounter:from_binary(CounterBin);
+ (<<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>, Mergedest) ->
+ Counter = riak_kv_pncounter:from_binary(CounterBin),
riak_kv_pncounter:merge(Counter, Mergedest);
(_Bin, Mergedest) ->
Mergedest end,
@@ -277,8 +296,12 @@ metadatum() ->
gcounter() ->
list(clock()).
+pncounterds() ->
+ {gcounter(), gcounter()}.
+
pncounter() ->
- {riak_kv_pncounter, {gcounter(), gcounter()}}.
+ ?LET(PNCounter, pncounterds(),
+ riak_kv_counter:to_binary(PNCounter)).
clock() ->
{int(), nat()}.

0 comments on commit 1e32b2a

Please sign in to comment.