Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Add binary format for counters #563

Merged
merged 4 commits into from

2 participants

@russelldb
Owner

May not be optimal, but adds Tag and Vsn fields to make upgrade / downgrade simpler in future.

src/riak_kv_counter.erl
((14 lines not shown))
+from_binary(<<?TAG:8/integer,?V1_VERS:8/integer,CounterBin/binary>>) ->
+ riak_kv_pncounter:from_binary(CounterBin).
+
+%% ===================================================================
+%% EUnit tests
+%% ===================================================================
+-ifdef(TEST).
+
+roundtrip_bin_test() ->
+ PN = riak_kv_pncounter:new(),
+ PN1 = riak_kv_pncounter:update({increment, 2}, <<"a1">>, PN),
+ PN2 = riak_kv_pncounter:update({decrement, 1000000000000000000000000}, douglas_Actor, PN1),
+ PN3 = riak_kv_pncounter:update(increment, [{very, ["Complex"], <<"actor">>}, honest], PN2),
+ PN4 = riak_kv_pncounter:update(decrement, "another_acotr", PN3),
+ Bin = to_binary(PN4),
+ ?debugFmt("Bin ~p t2b ~p", [byte_size(Bin),
@jrwest
jrwest added a note

missed a debug print. although i found it kind of useful to quickly verify the size difference.

@russelldb Owner

oops, actually, that test does nothing, except print that size diff

Wait, it tests it too, ignore me, it's late for me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@jrwest

riak_test/eqc/eunit tests still pass, booted it up and used the http api a bit, all looks good. A debug print (which I commented on) shows a few bytes of savings and having versioned binaries is always better. +1!

@russelldb russelldb merged commit 1e32b2a into rdb-kv-counter
@seancribbs seancribbs deleted the rdb-kv-counter_bin branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on May 23, 2013
  1. @russelldb

    Tentative addition of custom binary format for counters

    russelldb authored
    Counters all the way down.
Commits on May 24, 2013
  1. @russelldb

    Update binary format to use t2b for gcounter

    russelldb authored
    Fix eqc test ot work with binary encoded counters.
  2. @russelldb
  3. @russelldb

    Remove dbg statement

    russelldb authored
This page is out of date. Refresh to see the latest.
View
50 src/riak_kv_counter.erl
@@ -36,9 +36,17 @@
-module(riak_kv_counter).
--export([update/3, merge/1, value/1]).
+-export([update/3, merge/1, value/1, new/2, to_binary/1, from_binary/1]).
-include("riak_kv_wm_raw.hrl").
+-include_lib("riak_kv_types.hrl").
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
+-define(TAG, 69).
+-define(V1_VERS, 1).
%% @doc Update `Actor's entry by `Amt' and store it in `RObj'.
-spec update(riak_object:riak_object(), binary(), integer()) ->
@@ -85,9 +93,13 @@ merge_contents(Contents) ->
Contents).
%% worker for `merge_contents/1'
-merge_value({MD, {riak_kv_pncounter, Counter}}, {undefined, undefined, NonCounterSiblings}) ->
+merge_value({MD, <<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>},
+ {undefined, undefined, NonCounterSiblings}) ->
+ Counter = riak_kv_pncounter:from_binary(CounterBin),
{MD, Counter, NonCounterSiblings};
-merge_value({MD, {riak_kv_pncounter, Counter}}, {MergedMeta, Mergedest, NonCounterSiblings}) ->
+merge_value({MD, <<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>},
+ {MergedMeta, Mergedest, NonCounterSiblings}) ->
+ Counter = riak_kv_pncounter:from_binary(CounterBin),
{merge_meta(MD, MergedMeta), riak_kv_pncounter:merge(Counter, Mergedest), NonCounterSiblings};
merge_value(NonCounter, {MD, Mergedest, NonCounterSiblings}) ->
{MD, Mergedest, [NonCounter | NonCounterSiblings]}.
@@ -111,12 +123,12 @@ counter_op(Amt) ->
update_object(RObj, _, undefined, _Siblings) ->
RObj;
update_object(RObj, Meta, Counter, []) ->
- RObj2 = riak_object:update_value(RObj, {riak_kv_pncounter, Counter}),
+ RObj2 = riak_object:update_value(RObj, to_binary(Counter)),
RObj3 = riak_object:update_metadata(RObj2, counter_meta(Meta)),
riak_object:apply_updates(RObj3);
update_object(RObj, Meta, Counter, SiblingValues) ->
%% keep non-counter siblings, too
- riak_object:set_contents(RObj, [{counter_meta(Meta), {riak_kv_pncounter, Counter}} | SiblingValues]).
+ riak_object:set_contents(RObj, [{counter_meta(Meta), to_binary(Counter)} | SiblingValues]).
counter_meta(undefined) ->
Now = os:timestamp(),
@@ -145,3 +157,31 @@ later(TS1, TS2) ->
_ ->
true
end.
+
+new(B, K) ->
+ Bin = to_binary(riak_kv_pncounter:new()),
+ Doc0 = riak_object:new(B, K, Bin, ?COUNTER_TYPE),
+ riak_object:set_vclock(Doc0, vclock:fresh()).
+
+to_binary(Counter) ->
+ CounterBin = riak_kv_pncounter:to_binary(Counter),
+ <<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>.
+
+from_binary(<<?TAG:8/integer,?V1_VERS:8/integer,CounterBin/binary>>) ->
+ riak_kv_pncounter:from_binary(CounterBin).
+
+%% ===================================================================
+%% EUnit tests
+%% ===================================================================
+-ifdef(TEST).
+
+roundtrip_bin_test() ->
+ PN = riak_kv_pncounter:new(),
+ PN1 = riak_kv_pncounter:update({increment, 2}, <<"a1">>, PN),
+ PN2 = riak_kv_pncounter:update({decrement, 1000000000000000000000000}, douglas_Actor, PN1),
+ PN3 = riak_kv_pncounter:update(increment, [{very, ["Complex"], <<"actor">>}, honest], PN2),
+ PN4 = riak_kv_pncounter:update(decrement, "another_acotr", PN3),
+ Bin = to_binary(PN4),
+ ?assert(byte_size(Bin) < term_to_binary({riak_kv_pncounter, PN4})).
+
+-endif.
View
36 src/riak_kv_gcounter.erl
@@ -36,7 +36,7 @@
-module(riak_kv_gcounter).
--export([new/0, new/2, value/1, update/3, merge/2, equal/2]).
+-export([new/0, new/2, value/1, update/3, merge/2, equal/2, to_binary/1, from_binary/1]).
%% EQC API
-ifdef(EQC).
@@ -119,6 +119,19 @@ increment_by(Amount, Actor, GCnt) when is_integer(Amount), Amount > 0 ->
end,
[{Actor, Ctr}|NewGCnt].
+-define(TAG, 70).
+-define(V1_VERS, 1).
+
+%% @doc Encode an effecient binary representation of a `gcounter()'
+-spec to_binary(gcounter()) -> binary().
+to_binary(GCnt) ->
+ EntriesBin = term_to_binary(GCnt),
+ <<?TAG:8/integer, ?V1_VERS:8/integer, EntriesBin/binary>>.
+
+%% @doc Decode binary G-Counter
+-spec from_binary(binary()) -> gcounter().
+from_binary(<<?TAG:8/integer, ?V1_VERS:8/integer, EntriesBin/binary>>) ->
+ binary_to_term(EntriesBin).
%% ===================================================================
%% EUnit tests
@@ -218,5 +231,26 @@ usage_test() ->
?assertEqual([{a1, 3}, {a2, 1}, {a3, 3}, {a4, 1}],
lists:sort(merge(GC3_2, GC2_2))).
+roundtrip_bin_test() ->
+ GC = new(),
+ GC1 = update({increment, 2}, <<"a1">>, GC),
+ GC2 = update({increment, 4}, a2, GC1),
+ GC3 = update(increment, "a4", GC2),
+ GC4 = update({increment, 10000000000000000000000000000000000000000}, {complex, "actor", [<<"term">>, 2]}, GC3),
+ Bin = to_binary(GC4),
+ Decoded = from_binary(Bin),
+ ?assert(equal(GC4, Decoded)).
+
+lots_of_actors_test() ->
+ GC = lists:foldl(fun(_, GCnt) ->
+ ActorLen = crypto:rand_uniform(1, 1000),
+ Actor = crypto:rand_bytes(ActorLen),
+ Cnt = crypto:rand_uniform(1, 10000),
+ riak_kv_gcounter:update({increment, Cnt}, Actor, GCnt) end,
+ new(),
+ lists:seq(1, 1000)),
+ Bin = to_binary(GC),
+ Decoded = from_binary(Bin),
+ ?assert(equal(GC, Decoded)).
-endif.
View
5 src/riak_kv_pb_counter.erl
@@ -41,7 +41,6 @@
-include_lib("riak_pb/include/riak_kv_pb.hrl").
-include_lib("riak_pb/include/riak_pb_kv_codec.hrl").
--include_lib("riak_kv_types.hrl").
-behaviour(riak_api_pb_service).
@@ -97,8 +96,8 @@ process(#rpbcounterupdatereq{bucket=B, key=K, w=W0, dw=DW0, pw=PW0, amount=Coun
#state{client=C} = State) ->
case allow_mult(B) of
true ->
- O0 = riak_object:new(B, K, ?NEW_COUNTER, ?COUNTER_TYPE),
- O = riak_object:set_vclock(O0, vclock:fresh()),
+ O = riak_kv_counter:new(B, K),
+
%% erlang_protobuffs encodes as 1/0/undefined
W = decode_quorum(W0),
DW = decode_quorum(DW0),
View
35 src/riak_kv_pncounter.erl
@@ -34,7 +34,7 @@
-module(riak_kv_pncounter).
--export([new/0, new/2, value/1, update/3, merge/2, equal/2]).
+-export([new/0, new/2, value/1, update/3, merge/2, equal/2, to_binary/1, from_binary/1]).
%% EQC API
-ifdef(EQC).
@@ -90,6 +90,7 @@ update({decrement, By}, Actor, {Incr, Decr}) when is_integer(By), By > 0 ->
%% @doc Merge two `pncounter()'s to a single `pncounter()'. This is the Least Upper Bound
%% function described in the literature.
+-spec merge(pncounter(), pncounter()) -> pncounter().
merge({Incr1, Decr1}, {Incr2, Decr2}) ->
MergedIncr = riak_kv_gcounter:merge(Incr1, Incr2),
MergedDecr = riak_kv_gcounter:merge(Decr1, Decr2),
@@ -98,9 +99,31 @@ merge({Incr1, Decr1}, {Incr2, Decr2}) ->
%% @doc Are two `pncounter()'s structurally equal? This is not `value/1' equality.
%% Two counters might represent the total `-42', and not be `equal/2'. Equality here is
%% that both counters represent exactly the same information.
+-spec equal(pncounter(), pncounter()) -> boolean().
equal({Incr1, Decr1}, {Incr2, Decr2}) ->
riak_kv_gcounter:equal(Incr1, Incr2) andalso riak_kv_gcounter:equal(Decr1, Decr2).
+-define(TAG, 71).
+-define(V1_VERS, 1).
+
+%% @doc Encode an effecient binary representation of `pncounter()'
+-spec to_binary(pncounter()) -> binary().
+to_binary({P, N}) ->
+ PBin = riak_kv_gcounter:to_binary(P),
+ NBin = riak_kv_gcounter:to_binary(N),
+ PBinLen = byte_size(PBin),
+ NBinLen = byte_size(NBin),
+ <<?TAG:8/integer, ?V1_VERS:8/integer,
+ PBinLen:32/integer, PBin:PBinLen/binary,
+ NBinLen:32/integer, NBin:NBinLen/binary>>.
+
+%% @doc Decode a binary encoded PN-Counter
+-spec from_binary(binary()) -> pncounter().
+from_binary(<<?TAG:8/integer, ?V1_VERS:8/integer,
+ PBinLen:32/integer, PBin:PBinLen/binary,
+ NBinLen:32/integer, NBin:NBinLen/binary>>) ->
+ {riak_kv_gcounter:from_binary(PBin), riak_kv_gcounter:from_binary(NBin)}.
+
%% ===================================================================
%% EUnit tests
%% ===================================================================
@@ -209,4 +232,14 @@ usage_test() ->
?assertEqual({[{a1, 3}, {a4, 1}, {a2, 1}, {a3, 3}], [{a5, 2}, {a2, 1}]},
merge(PNCnt3_3, PNCnt2_3)).
+roundtrip_bin_test() ->
+ PN = new(),
+ PN1 = update({increment, 2}, <<"a1">>, PN),
+ PN2 = update({decrement, 1000000000000000000000000}, douglas_Actor, PN1),
+ PN3 = update(increment, [{very, ["Complex"], <<"actor">>}, honest], PN2),
+ PN4 = update(decrement, "another_acotr", PN3),
+ Bin = to_binary(PN4),
+ Decoded = from_binary(Bin),
+ ?assert(equal(PN4, Decoded)).
+
-endif.
View
17 src/riak_kv_vnode.erl
@@ -948,7 +948,7 @@ prepare_put(#state{idx=Idx,
false ->
IndexSpecs = []
end,
- ObjToStore = prepare_coord_put(Coord, RObj, VId, StartTime, CounterOp),
+ ObjToStore = prepare_new_put(Coord, RObj, VId, StartTime, CounterOp),
{{true, ObjToStore}, PutArgs#putargs{index_specs=IndexSpecs, is_index=IndexBackend}};
{ok, Val} ->
OldObj = object_from_binary(Bucket, Key, Val),
@@ -981,17 +981,15 @@ prepare_put(#state{idx=Idx,
end.
%% @Doc in the case that this a co-ordinating put, prepare the object.
-prepare_coord_put(true, RObj, VId, StartTime, CounterOp) when is_integer(CounterOp) ->
+prepare_new_put(true, RObj, VId, StartTime, CounterOp) when is_integer(CounterOp) ->
VClockUp = riak_object:increment_vclock(RObj, VId, StartTime),
- %% coordinating a counter operation means
+ %% coordinating a _NEW_ counter operation means
%% creating + incrementing the counter.
%% Make a new counter, stuff it in the riak_object
- Counter = riak_kv_pncounter:new(VId, CounterOp),
- Incremented = riak_object:update_value(VClockUp, {riak_kv_pncounter, Counter}),
- riak_object:apply_updates(Incremented);
-prepare_coord_put(true, RObj, VId, StartTime, _CounterOp) ->
+ riak_kv_counter:update(VClockUp, VId, CounterOp);
+prepare_new_put(true, RObj, VId, StartTime, _CounterOp) ->
riak_object:increment_vclock(RObj, VId, StartTime);
-prepare_coord_put(false, RObj, _VId, _StartTime, _CounterOp) ->
+prepare_new_put(false, RObj, _VId, _StartTime, _CounterOp) ->
RObj.
handle_counter(true, CounterOp, VId, RObj) when is_integer(CounterOp) ->
@@ -1000,7 +998,8 @@ handle_counter(false, CounterOp, _Vid, RObj) when is_integer(CounterOp) ->
%% non co-ord put, merge the values if there are siblings
%% 'cos that is the point of CRDTs / counters: no siblings
riak_kv_counter:merge(RObj);
-handle_counter(_Coord, __CounterOp, _VId, RObj) ->
+handle_counter(_Coord, _CounterOp, _VId, RObj) ->
+ %% i.e. not a counter op
RObj.
perform_put({false, Obj},
View
8 src/riak_kv_wm_counter.erl
@@ -110,7 +110,6 @@
-include_lib("webmachine/include/webmachine.hrl").
-include("riak_kv_wm_raw.hrl").
--include_lib("riak_kv_types.hrl").
%% @spec init(proplist()) -> {ok, context()}
%% @doc Initialize this resource. This function extracts the
@@ -254,15 +253,14 @@ accept_doc_body(RD, Ctx=#ctx{bucket=B, key=K, client=C,
counter_op=CounterOp}) ->
case allow_mult(B) of
true ->
- Doc0 = riak_object:new(B, K, ?NEW_COUNTER, ?COUNTER_TYPE),
- VclockDoc = riak_object:set_vclock(Doc0, vclock:fresh()),
+ Doc = riak_kv_counter:new(B, K),
Options = [{counter_op, CounterOp}] ++ return_value(RD),
- case C:put(VclockDoc, [{w, Ctx#ctx.w}, {dw, Ctx#ctx.dw}, {pw, Ctx#ctx.pw}, {timeout, 60000} |
+ case C:put(Doc, [{w, Ctx#ctx.w}, {dw, Ctx#ctx.dw}, {pw, Ctx#ctx.pw}, {timeout, 60000} |
Options]) of
{error, Reason} ->
handle_common_error(Reason, RD, Ctx);
ok ->
- {true, RD, Ctx#ctx{doc={ok, VclockDoc}}};
+ {true, RD, Ctx#ctx{doc={ok, Doc}}};
{ok, RObj} ->
Body = produce_doc_body(RObj),
{true, wrq:append_to_resp_body(Body, RD), Ctx#ctx{doc={ok, RObj}}}
View
47 test/kv_counter_eqc.erl
@@ -30,6 +30,9 @@
-compile(export_all).
+-define(TAG, 69).
+-define(V1_VERS, 1).
+
-define(BUCKET, <<"b">>).
-define(KEY, <<"k">>).
-define(NUMTESTS, 500).
@@ -120,7 +123,7 @@ prop_update() ->
end,
MergeSeed = case Amt of
0 -> undefined;
- _ -> riak_kv_pncounter:new(Actor, Amt)
+ _ -> riak_kv_pncounter:new(Actor, Amt)
end,
?WHENFAIL(
@@ -163,7 +166,7 @@ latest_meta(RObj, MergedMeta) ->
latest_counter_meta([], Latest) ->
Latest;
-latest_counter_meta([{MD, {riak_kv_pncounter, _}}|Rest], Latest) ->
+latest_counter_meta([{MD, <<?TAG:8/integer, ?V1_VERS:8/integer, _CounterBin/binary>>}|Rest], Latest) ->
latest_counter_meta(Rest, get_latest_meta(MD, Latest));
latest_counter_meta([_|Rest], Latest) ->
latest_counter_meta(Rest, Latest).
@@ -187,26 +190,42 @@ counters_equal(_C1, undefined) ->
false;
counters_equal(undefined, _C2) ->
false;
+counters_equal(C1B, C2B) when is_binary(C1B), is_binary(C2B) ->
+ C1 = riak_kv_counter:from_binary(C1B),
+ C2 = riak_kv_counter:from_binary(C2B),
+ riak_kv_pncounter:equal(C1, C2);
+counters_equal(C1B, C2) when is_binary(C1B) ->
+ C1 = riak_kv_counter:from_binary(C1B),
+ counters_equal(C1, C2);
+counters_equal(C1, C2B) when is_binary(C2B) ->
+ C2 = riak_kv_counter:from_binary(C2B),
+ counters_equal(C1, C2);
counters_equal(C1, C2) ->
riak_kv_pncounter:equal(C1, C2).
+
%% Extract a single {meta, counter} value
single_counter(Merged) ->
Contents = riak_object:get_contents(Merged),
case [begin
- {riak_kv_pncounter, Counter} = Val,
+ <<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>> = Val,
+ Counter = riak_kv_pncounter:from_binary(CounterBin),
{Meta, Counter}
end || {Meta, Val} <- Contents,
- is_tuple(Val),
- riak_kv_pncounter =:= element(1, Val)] of
+ is_counter(Val)] of
[Single] ->
Single;
_Many -> {undefined, undefined}
end.
+is_counter(<<?TAG:8/integer, ?V1_VERS:8/integer, _CounterBin/binary>>) ->
+ true;
+is_counter(_) ->
+ false.
+
non_counter_siblings(RObj) ->
Contents = riak_object:get_contents(RObj),
- {_Counters, NonCounters} = lists:partition(fun({_Md, {riak_kv_pncounter, _C}}) ->
+ {_Counters, NonCounters} = lists:partition(fun({_Md, <<?TAG:8/integer, ?V1_VERS:8/integer, _CounterBin/binary>>}) ->
true;
({_MD, _Val}) ->
false
@@ -218,14 +237,14 @@ non_counter_siblings(RObj) ->
num_counters(RObj) ->
Values = riak_object:get_values(RObj),
length([ok || Val <- Values,
- is_tuple(Val),
- riak_kv_pncounter =:= element(1, Val)]).
+ is_counter(Val)]).
merge_object(RObj, Seed) ->
Values = riak_object:get_values(RObj),
- lists:foldl(fun({riak_kv_pncounter, Counter}, undefined) ->
- Counter;
- ({riak_kv_pncounter, Counter}, Mergedest) ->
+ lists:foldl(fun(<<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>, undefined) ->
+ riak_kv_pncounter:from_binary(CounterBin);
+ (<<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>, Mergedest) ->
+ Counter = riak_kv_pncounter:from_binary(CounterBin),
riak_kv_pncounter:merge(Counter, Mergedest);
(_Bin, Mergedest) ->
Mergedest end,
@@ -277,8 +296,12 @@ metadatum() ->
gcounter() ->
list(clock()).
+pncounterds() ->
+ {gcounter(), gcounter()}.
+
pncounter() ->
- {riak_kv_pncounter, {gcounter(), gcounter()}}.
+ ?LET(PNCounter, pncounterds(),
+ riak_kv_counter:to_binary(PNCounter)).
clock() ->
{int(), nat()}.
Something went wrong with that request. Please try again.