Permalink
Browse files

Tentative addition of custom binary format for counters

Counters all the way down.
  • Loading branch information...
1 parent 1d7cbd9 commit 17feb8c47c8aae3dbe15484b83036248fea1af5c @russelldb russelldb committed May 23, 2013
View
@@ -36,9 +36,17 @@
-module(riak_kv_counter).
--export([update/3, merge/1, value/1]).
+-export([update/3, merge/1, value/1, new/2]).
-include("riak_kv_wm_raw.hrl").
+-include_lib("riak_kv_types.hrl").
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
+-define(TAG, 69).
+-define(V1_VERS, 1).
%% @doc Update `Actor's entry by `Amt' and store it in `RObj'.
-spec update(riak_object:riak_object(), binary(), integer()) ->
@@ -85,9 +93,13 @@ merge_contents(Contents) ->
Contents).
%% worker for `merge_contents/1'
-merge_value({MD, {riak_kv_pncounter, Counter}}, {undefined, undefined, NonCounterSiblings}) ->
+merge_value({MD, <<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>},
+ {undefined, undefined, NonCounterSiblings}) ->
+ Counter = riak_kv_pncounter:from_binary(CounterBin),
{MD, Counter, NonCounterSiblings};
-merge_value({MD, {riak_kv_pncounter, Counter}}, {MergedMeta, Mergedest, NonCounterSiblings}) ->
+merge_value({MD, <<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>},
+ {MergedMeta, Mergedest, NonCounterSiblings}) ->
+ Counter = riak_kv_pncounter:from_binary(CounterBin),
{merge_meta(MD, MergedMeta), riak_kv_pncounter:merge(Counter, Mergedest), NonCounterSiblings};
merge_value(NonCounter, {MD, Mergedest, NonCounterSiblings}) ->
{MD, Mergedest, [NonCounter | NonCounterSiblings]}.
@@ -111,12 +123,12 @@ counter_op(Amt) ->
update_object(RObj, _, undefined, _Siblings) ->
RObj;
update_object(RObj, Meta, Counter, []) ->
- RObj2 = riak_object:update_value(RObj, {riak_kv_pncounter, Counter}),
+ RObj2 = riak_object:update_value(RObj, encode(Counter)),
RObj3 = riak_object:update_metadata(RObj2, counter_meta(Meta)),
riak_object:apply_updates(RObj3);
update_object(RObj, Meta, Counter, SiblingValues) ->
%% keep non-counter siblings, too
- riak_object:set_contents(RObj, [{counter_meta(Meta), {riak_kv_pncounter, Counter}} | SiblingValues]).
+ riak_object:set_contents(RObj, [{counter_meta(Meta), encode(Counter)} | SiblingValues]).
counter_meta(undefined) ->
Now = os:timestamp(),
@@ -145,3 +157,33 @@ later(TS1, TS2) ->
_ ->
true
end.
+
+new(B, K) ->
+ Bin = encode(riak_kv_pncounter:new()),
+ Doc0 = riak_object:new(B, K, Bin, ?COUNTER_TYPE),
+ riak_object:set_vclock(Doc0, vclock:fresh()).
+
+encode(Counter) ->
+ CounterBin = riak_kv_pncounter:to_binary(Counter),
+ <<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>.
+
+%% decode(<<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>) ->
+%% riak_kv_pncounter:from_binary(CounterBin).
+
+%% ===================================================================
+%% EUnit tests
+%% ===================================================================
+-ifdef(TEST).
+
+roundtrip_bin_test() ->
+ PN = riak_kv_pncounter:new(),
+ PN1 = riak_kv_pncounter:update({increment, 2}, <<"a1">>, PN),
+ PN2 = riak_kv_pncounter:update({decrement, 1000000000000000000000000}, douglas_Actor, PN1),
+ PN3 = riak_kv_pncounter:update(increment, [{very, ["Complex"], <<"actor">>}, honest], PN2),
+ PN4 = riak_kv_pncounter:update(decrement, "another_acotr", PN3),
+ Bin = encode(PN4),
+ ?debugFmt("Bin ~p t2b ~p", [byte_size(Bin),
+ byte_size(term_to_binary({riak_kv_pncounter, PN4}))]),
+ ?assert(byte_size(Bin) < term_to_binary({riak_kv_pncounter, PN4})).
+
+-endif.
View
@@ -36,7 +36,7 @@
-module(riak_kv_gcounter).
--export([new/0, new/2, value/1, update/3, merge/2, equal/2]).
+-export([new/0, new/2, value/1, update/3, merge/2, equal/2, to_binary/1, from_binary/1]).
%% EQC API
-ifdef(EQC).
@@ -119,6 +119,61 @@ increment_by(Amount, Actor, GCnt) when is_integer(Amount), Amount > 0 ->
end,
[{Actor, Ctr}|NewGCnt].
+-define(TAG, 70).
+-define(V1_VERS, 1).
+
+%% @doc Encode an effecient binary representation of a `gcounter()'
+-spec to_binary(gcounter()) -> binary().
+to_binary(GCnt) ->
+ ActorCnt = length(GCnt),
+ EntriesBin = encode_entries(GCnt),
+ <<?TAG:8/integer, ?V1_VERS:8/integer, ActorCnt:32/integer, EntriesBin/binary>>.
+
+%% @private
+encode_entries(GCnt) ->
+ F = fun({Actor, Cnt}, Acc) ->
+ ActorBin = encode_actor(Actor),
+ ActorLen = byte_size(ActorBin),
+ CntBin = binary:encode_unsigned(Cnt),
+ CntLen = byte_size(CntBin),
+ <<Acc/binary, ActorLen:32/integer, ActorBin:ActorLen/binary,
+ CntLen:32/integer, CntBin:CntLen/binary>>
+ end,
+ lists:foldl(F, <<>>, GCnt).
+
+%% @private Actor ids in riak are vnode ids, so should always be binaries
+encode_actor(Actor) when is_binary(Actor) ->
+ <<1, Actor/binary>>;
+encode_actor(Actor) ->
+ <<0, (term_to_binary(Actor))/binary>>.
+
+%% @doc Decode binary G-Counter
+-spec from_binary(binary()) -> gcounter().
+from_binary(<<?TAG:8/integer, ?V1_VERS:8/integer, ActorCnt:32/integer, EntriesBin/binary>>) ->
+ decode_entries(ActorCnt, EntriesBin, []).
+
+%% @private
+decode_entries(0, <<>>, GCnt) ->
+ GCnt;
+decode_entries(0, _NotEmpty, _GCnt) ->
+ {error, corrupt_contents};
+decode_entries(Cnt, EntriesBin, GCnt) ->
+ {Entry, Entries} = decode_entry(EntriesBin),
+ decode_entries(Cnt-1, Entries, [Entry | GCnt]).
+
+%% @private
+decode_entry(<<ActorLen:32/integer, ActorBin:ActorLen/binary,
+ CntLen:32/integer, CntBin:CntLen/binary, Rest/binary>>) ->
+ Cnt = binary:decode_unsigned(CntBin),
+ {{decode_actor(ActorBin), Cnt}, Rest}.
+
+%% @private
+decode_actor(<<1, Bin/binary>>) ->
+ Bin;
+decode_actor(<<0, Bin/binary>>) ->
+ binary_to_term(Bin).
+
+
%% ===================================================================
%% EUnit tests
@@ -218,5 +273,15 @@ usage_test() ->
?assertEqual([{a1, 3}, {a2, 1}, {a3, 3}, {a4, 1}],
lists:sort(merge(GC3_2, GC2_2))).
+roundtrip_bin_test() ->
+ GC = new(),
+ GC1 = update({increment, 2}, <<"a1">>, GC),
+ GC2 = update({increment, 4}, a2, GC1),
+ GC3 = update(increment, "a4", GC2),
+ GC4 = update({increment, 10000000000000000000000000000000000000000}, {complex, "actor", [<<"term">>, 2]}, GC3),
+ Bin = to_binary(GC4),
+ Decoded = from_binary(Bin),
+ ?debugFmt("Bin size ~p, T2B size ~p", [byte_size(Bin), byte_size(term_to_binary(GC4))]),
+ ?assert(equal(GC4, Decoded)).
-endif.
@@ -41,7 +41,6 @@
-include_lib("riak_pb/include/riak_kv_pb.hrl").
-include_lib("riak_pb/include/riak_pb_kv_codec.hrl").
--include_lib("riak_kv_types.hrl").
-behaviour(riak_api_pb_service).
@@ -97,8 +96,8 @@ process(#rpbcounterupdatereq{bucket=B, key=K, w=W0, dw=DW0, pw=PW0, amount=Coun
#state{client=C} = State) ->
case allow_mult(B) of
true ->
- O0 = riak_object:new(B, K, ?NEW_COUNTER, ?COUNTER_TYPE),
- O = riak_object:set_vclock(O0, vclock:fresh()),
+ O = riak_kv_counter:new(B, K),
+
%% erlang_protobuffs encodes as 1/0/undefined
W = decode_quorum(W0),
DW = decode_quorum(DW0),
View
@@ -34,7 +34,7 @@
-module(riak_kv_pncounter).
--export([new/0, new/2, value/1, update/3, merge/2, equal/2]).
+-export([new/0, new/2, value/1, update/3, merge/2, equal/2, to_binary/1, from_binary/1]).
%% EQC API
-ifdef(EQC).
@@ -90,6 +90,7 @@ update({decrement, By}, Actor, {Incr, Decr}) when is_integer(By), By > 0 ->
%% @doc Merge two `pncounter()'s to a single `pncounter()'. This is the Least Upper Bound
%% function described in the literature.
+-spec merge(pncounter(), pncounter()) -> pncounter().
merge({Incr1, Decr1}, {Incr2, Decr2}) ->
MergedIncr = riak_kv_gcounter:merge(Incr1, Incr2),
MergedDecr = riak_kv_gcounter:merge(Decr1, Decr2),
@@ -98,9 +99,31 @@ merge({Incr1, Decr1}, {Incr2, Decr2}) ->
%% @doc Are two `pncounter()'s structurally equal? This is not `value/1' equality.
%% Two counters might represent the total `-42', and not be `equal/2'. Equality here is
%% that both counters represent exactly the same information.
+-spec equal(pncounter(), pncounter()) -> boolean().
equal({Incr1, Decr1}, {Incr2, Decr2}) ->
riak_kv_gcounter:equal(Incr1, Incr2) andalso riak_kv_gcounter:equal(Decr1, Decr2).
+-define(TAG, 71).
+-define(V1_VERS, 1).
+
+%% @doc Encode an effecient binary representation of `pncounter()'
+-spec to_binary(pncounter()) -> binary().
+to_binary({P, N}) ->
+ PBin = riak_kv_gcounter:to_binary(P),
+ NBin = riak_kv_gcounter:to_binary(N),
+ PBinLen = byte_size(PBin),
+ NBinLen = byte_size(NBin),
+ <<?TAG:8/integer, ?V1_VERS:8/integer,
+ PBinLen:32/integer, PBin:PBinLen/binary,
+ NBinLen:32/integer, NBin:NBinLen/binary>>.
+
+%% @doc Decode a binary encoded PN-Counter
+-spec from_binary(binary()) -> pncounter().
+from_binary(<<?TAG:8/integer, ?V1_VERS:8/integer,
+ PBinLen:32/integer, PBin:PBinLen/binary,
+ NBinLen:32/integer, NBin:NBinLen/binary>>) ->
+ {riak_kv_gcounter:from_binary(PBin), riak_kv_gcounter:from_binary(NBin)}.
+
%% ===================================================================
%% EUnit tests
%% ===================================================================
@@ -209,4 +232,16 @@ usage_test() ->
?assertEqual({[{a1, 3}, {a4, 1}, {a2, 1}, {a3, 3}], [{a5, 2}, {a2, 1}]},
merge(PNCnt3_3, PNCnt2_3)).
+roundtrip_bin_test() ->
+ PN = new(),
+ PN1 = update({increment, 2}, <<"a1">>, PN),
+ PN2 = update({decrement, 1000000000000000000000000}, douglas_Actor, PN1),
+ PN3 = update(increment, [{very, ["Complex"], <<"actor">>}, honest], PN2),
+ PN4 = update(decrement, "another_acotr", PN3),
+ Bin = to_binary(PN4),
+ Decoded = from_binary(Bin),
+ ?assert(equal(PN4, Decoded)),
+ ?debugFmt("Binsize ~p T2B size ~p", [byte_size(Bin),
+ byte_size(term_to_binary(PN4))]).
+
-endif.
View
@@ -948,7 +948,7 @@ prepare_put(#state{idx=Idx,
false ->
IndexSpecs = []
end,
- ObjToStore = prepare_coord_put(Coord, RObj, VId, StartTime, CounterOp),
+ ObjToStore = prepare_new_put(Coord, RObj, VId, StartTime, CounterOp),
{{true, ObjToStore}, PutArgs#putargs{index_specs=IndexSpecs, is_index=IndexBackend}};
{ok, Val} ->
OldObj = object_from_binary(Bucket, Key, Val),
@@ -981,17 +981,15 @@ prepare_put(#state{idx=Idx,
end.
%% @Doc in the case that this a co-ordinating put, prepare the object.
-prepare_coord_put(true, RObj, VId, StartTime, CounterOp) when is_integer(CounterOp) ->
+prepare_new_put(true, RObj, VId, StartTime, CounterOp) when is_integer(CounterOp) ->
VClockUp = riak_object:increment_vclock(RObj, VId, StartTime),
- %% coordinating a counter operation means
+ %% coordinating a _NEW_ counter operation means
%% creating + incrementing the counter.
%% Make a new counter, stuff it in the riak_object
- Counter = riak_kv_pncounter:new(VId, CounterOp),
- Incremented = riak_object:update_value(VClockUp, {riak_kv_pncounter, Counter}),
- riak_object:apply_updates(Incremented);
-prepare_coord_put(true, RObj, VId, StartTime, _CounterOp) ->
+ riak_kv_counter:update(VClockUp, VId, CounterOp);
+prepare_new_put(true, RObj, VId, StartTime, _CounterOp) ->
riak_object:increment_vclock(RObj, VId, StartTime);
-prepare_coord_put(false, RObj, _VId, _StartTime, _CounterOp) ->
+prepare_new_put(false, RObj, _VId, _StartTime, _CounterOp) ->
RObj.
handle_counter(true, CounterOp, VId, RObj) when is_integer(CounterOp) ->
@@ -1000,7 +998,8 @@ handle_counter(false, CounterOp, _Vid, RObj) when is_integer(CounterOp) ->
%% non co-ord put, merge the values if there are siblings
%% 'cos that is the point of CRDTs / counters: no siblings
riak_kv_counter:merge(RObj);
-handle_counter(_Coord, __CounterOp, _VId, RObj) ->
+handle_counter(_Coord, _CounterOp, _VId, RObj) ->
+ %% i.e. not a counter op
RObj.
perform_put({false, Obj},
@@ -110,7 +110,6 @@
-include_lib("webmachine/include/webmachine.hrl").
-include("riak_kv_wm_raw.hrl").
--include_lib("riak_kv_types.hrl").
%% @spec init(proplist()) -> {ok, context()}
%% @doc Initialize this resource. This function extracts the
@@ -254,15 +253,14 @@ accept_doc_body(RD, Ctx=#ctx{bucket=B, key=K, client=C,
counter_op=CounterOp}) ->
case allow_mult(B) of
true ->
- Doc0 = riak_object:new(B, K, ?NEW_COUNTER, ?COUNTER_TYPE),
- VclockDoc = riak_object:set_vclock(Doc0, vclock:fresh()),
+ Doc = riak_kv_counter:new(B, K),
Options = [{counter_op, CounterOp}] ++ return_value(RD),
- case C:put(VclockDoc, [{w, Ctx#ctx.w}, {dw, Ctx#ctx.dw}, {pw, Ctx#ctx.pw}, {timeout, 60000} |
+ case C:put(Doc, [{w, Ctx#ctx.w}, {dw, Ctx#ctx.dw}, {pw, Ctx#ctx.pw}, {timeout, 60000} |
Options]) of
{error, Reason} ->
handle_common_error(Reason, RD, Ctx);
ok ->
- {true, RD, Ctx#ctx{doc={ok, VclockDoc}}};
+ {true, RD, Ctx#ctx{doc={ok, Doc}}};
{ok, RObj} ->
Body = produce_doc_body(RObj),
{true, wrq:append_to_resp_body(Body, RD), Ctx#ctx{doc={ok, RObj}}}

0 comments on commit 17feb8c

Please sign in to comment.