Skip to content

Commit

Permalink
PB Updates for HLL Datatypes
Browse files Browse the repository at this point in the history
  • Loading branch information
zeeshanlakhani committed Apr 29, 2016
1 parent 1a43334 commit dd194ce
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 21 deletions.
2 changes: 1 addition & 1 deletion dialyzer.ignore-warnings
Expand Up @@ -60,4 +60,4 @@ riak_yokozuna_pb.erl:221: The pattern <Types, [{FNum, Bytes} | Tail], Acc> can n
## Insufficient typing on record in generated code
riak_pb_codec.erl:349: Invalid type specification for function riak_pb_codec:decode_commit_hooks/1. The success typing is ([any()]) -> [{atom(),atom() | [any(),...]} | {'modfun',atom(),atom() | [any(),...]}]
## Internal calls only pass empty-list, but calls from other libraries pass proplists
riak_pb_dt_codec.erl:181: The pattern {AtomType, _} can never match the type 'false'
riak_pb_dt_codec.erl:187: The pattern {AtomType, _} can never match the type 'false'
3 changes: 3 additions & 0 deletions src/riak.proto
Expand Up @@ -157,6 +157,9 @@ message RpbBucketProps {

// KV fast path
optional bool write_once = 28;

// Hyperlolog DT Precision
optional uint32 hll_precision = 29;
}

// Authentication request
Expand Down
25 changes: 23 additions & 2 deletions src/riak_dt.proto
Expand Up @@ -22,7 +22,8 @@
*/

/*
** Revision: 2.0
%% TODO: Set for whichever release version we use
** Revision: 2.3.0
*/

// Java package specifiers
Expand Down Expand Up @@ -107,6 +108,11 @@ message DtValue {
optional sint64 counter_value = 1;
repeated bytes set_value = 2;
repeated MapEntry map_value = 3;

/* We return an estimated cardinality of the Hyperloglog set
* on fetch.
*/
optional sint64 hll_value = 4;
}


Expand All @@ -124,6 +130,7 @@ message DtFetchResp {
COUNTER = 1;
SET = 2;
MAP = 3;
HLL = 4;
}

optional bytes context = 1;
Expand Down Expand Up @@ -154,6 +161,14 @@ message SetOp {
repeated bytes removes = 2;
}

/*
* An operation to update a Hyperloglog Set, a top-level DT.
* You can only add to a HllSet.
*/
message HllOp {
repeated bytes adds = 1;
}

/*
* An operation to be applied to a value stored in a Map -- the
* contents of an UPDATE operation. The operation field that is
Expand Down Expand Up @@ -205,6 +220,11 @@ message DtOp {
optional CounterOp counter_op = 1;
optional SetOp set_op = 2;
optional MapOp map_op = 3;

/* Adding values to a hyperloglog (set) is just like adding values
* to a set.
*/
optional HllOp hll_op = 4;
}

/*
Expand Down Expand Up @@ -251,4 +271,5 @@ message DtUpdateResp {
optional sint64 counter_value = 3;
repeated bytes set_value = 4;
repeated MapEntry map_value = 5;
}
optional sint64 hll_value = 6;
}
12 changes: 8 additions & 4 deletions src/riak_pb_codec.erl
Expand Up @@ -189,12 +189,14 @@ decode_bucket_props(#rpbbucketprops{n_val=N,
search_index=Index,
datatype=Datatype,
consistent=Consistent,
write_once=WriteOnce
write_once=WriteOnce,
hll_precision=HllPrecision
}) ->
%% Extract numerical properties
[ {P,V} || {P,V} <- [ {n_val, N}, {old_vclock, Old}, {young_vclock, Young},
{big_vclock, Big}, {small_vclock, Small} ],
V /= undefined ] ++
[ {P,V} || {P,V} <- [{n_val, N}, {old_vclock, Old}, {young_vclock, Young},
{big_vclock, Big}, {small_vclock, Small},
{hll_precision, HllPrecision}],
V /= undefined ] ++
%% Extract booleans
[ {BProp, decode_bool(Bool)} ||
{BProp, Bool} <- [{allow_mult, AM}, {last_write_wins, LWW},
Expand Down Expand Up @@ -294,6 +296,8 @@ encode_bucket_props([{consistent, S}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{consistent = encode_bool(S)});
encode_bucket_props([{write_once, S}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{write_once = encode_bool(S)});
encode_bucket_props([{hll_precision, Num}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{hll_precision = Num});
encode_bucket_props([_Ignore|Rest], Pb) ->
%% Ignore any properties not explicitly part of the PB message
encode_bucket_props(Rest, Pb).
Expand Down
65 changes: 52 additions & 13 deletions src/riak_pb_dt_codec.erl
Expand Up @@ -48,29 +48,34 @@
-type context() :: binary() | undefined.
-type counter_value() :: integer().
-type set_value() :: [ binary() ].
-type hll_value() :: number().
-type register_value() :: binary().
-type flag_value() :: boolean().
-type map_entry() :: {map_field(), embedded_value()}.
-type map_field() :: {binary(), embedded_type()}.
-type map_value() :: [ map_entry() ].
-type embedded_value() :: counter_value() | set_value() | register_value() | flag_value() | map_value().
-type toplevel_value() :: counter_value() | set_value() | map_value() | undefined.
-type embedded_value() :: counter_value() | set_value() | register_value()
| flag_value() | map_value().
-type toplevel_value() :: counter_value() | set_value() | map_value()
| hll_value() | undefined.
-type fetch_response() :: {toplevel_type(), toplevel_value(), context()}.

%% Type names as atoms
-type embedded_type() :: counter | set | register | flag | map.
-type toplevel_type() :: counter | set | map.
-type toplevel_type() :: counter | set | map | hll.
-type all_type() :: toplevel_type() | embedded_type().

%% Operations
-type counter_op() :: increment | decrement | {increment | decrement, integer()}.
-type simple_set_op() :: {add, binary()} | {remove, binary()} | {add_all, [binary()]} | {remove_all, [binary()]}.
-type set_op() :: simple_set_op() | {update, [simple_set_op()]}.
-type hll_op() :: {add, binary()} | {add_all, [binary()]}.
-type flag_op() :: enable | disable.
-type register_op() :: {assign, binary()}.
-type simple_map_op() :: {remove, map_field()} | {update, map_field(), embedded_type_op()}.
-type map_op() :: simple_map_op() | {update, [simple_map_op()]}.
-type embedded_type_op() :: counter_op() | set_op() | register_op() | flag_op() | map_op().
-type toplevel_op() :: counter_op() | set_op() | map_op().
-type toplevel_op() :: counter_op() | set_op() | map_op() | hll_op().
-type update() :: {toplevel_type(), toplevel_op(), context()}.

%% Request options
Expand All @@ -89,7 +94,7 @@
include_context | {include_context, boolean()}.

%% Server-side type<->module mappings
-type type_mappings() :: [{embedded_type(), module()}].
-type type_mappings() :: [{all_type(), module()}].


%% =========================
Expand Down Expand Up @@ -165,10 +170,11 @@ decode_type(PBType, Mods) ->
AtomType = decode_type(PBType),
proplists:get_value(AtomType, Mods, AtomType).

%% @doc Decodes a PB message type name into an atom.
-spec decode_type(atom()) -> atom().
%% @doc Decodes a PB message type name into an atom type name.
-spec decode_type(atom()) -> all_type().
decode_type('COUNTER') -> counter;
decode_type('SET') -> set;
decode_type('HLL') -> hll;
decode_type('REGISTER') -> register;
decode_type('FLAG') -> flag;
decode_type('MAP') -> map.
Expand All @@ -185,9 +191,10 @@ encode_type(TypeOrMod, Mods) ->
end.

%% @doc Encodes an atom type name into the PB message equivalent.
-spec encode_type(atom()) -> atom().
-spec encode_type(all_type()) -> atom().
encode_type(counter) -> 'COUNTER';
encode_type(set) -> 'SET';
encode_type(hll) -> 'HLL';
encode_type(register) -> 'REGISTER';
encode_type(flag) -> 'FLAG';
encode_type(map) -> 'MAP'.
Expand All @@ -197,6 +204,7 @@ encode_flag_value(on) -> true;
encode_flag_value(off) -> false;
encode_flag_value(Other) -> Other.


%% ========================
%% FETCH REQUEST / RESPONSE
%% ========================
Expand Down Expand Up @@ -252,6 +260,9 @@ decode_fetch_response(#dtfetchresp{context=Context, type='COUNTER',
decode_fetch_response(#dtfetchresp{context=Context, type='SET',
value=#dtvalue{set_value=Val}}) ->
{set, Val, Context};
decode_fetch_response(#dtfetchresp{context=Context, type='HLL',
value=#dtvalue{hll_value=Val}}) ->
{hll, Val, Context};
decode_fetch_response(#dtfetchresp{context=Context, type='MAP',
value=#dtvalue{map_value=Val}}) ->
{map, [ decode_map_entry(Entry) || Entry <- Val ], Context}.
Expand All @@ -262,7 +273,8 @@ encode_fetch_response(Type, Value, Context) ->
encode_fetch_response(Type, Value, Context, []).

%% @doc Encodes the result of a fetch request into a FetchResponse message.
-spec encode_fetch_response(toplevel_type(), toplevel_value(), context(), type_mappings()) -> #dtfetchresp{}.
-spec encode_fetch_response(toplevel_type(), toplevel_value(), context(),
type_mappings()) -> #dtfetchresp{}.
encode_fetch_response(Type, undefined, _Context, _Mods) ->
#dtfetchresp{type=encode_type(Type)};
encode_fetch_response(Type, Value, Context, Mods) ->
Expand All @@ -272,6 +284,8 @@ encode_fetch_response(Type, Value, Context, Mods) ->
Response#dtfetchresp{value=#dtvalue{counter_value=Value}};
set ->
Response#dtfetchresp{value=#dtvalue{set_value=Value}};
hll ->
Response#dtfetchresp{value=#dtvalue{hll_value=Value}};
map ->
Response#dtfetchresp{value=#dtvalue{map_value=[encode_map_entry(Entry, Mods) || Entry <- Value]}}
end.
Expand Down Expand Up @@ -338,6 +352,18 @@ decode_flag_op('DISABLE') -> disable.
encode_flag_op(enable) -> 'ENABLE';
encode_flag_op(disable) -> 'DISABLE'.

%% @doc Decodes a HllOp message into a hll operation.
-spec decode_hll_op(#hllop{}) -> hll_op().
decode_hll_op(#hllop{adds=A}) ->
{add_all, A}.

%% @doc Encodes an hll(set) update into the HllOp message.
-spec encode_hll_op(hll_op()) -> #hllop{}.
encode_hll_op({add, Member}) when is_binary(Member) ->
#hllop{adds=[Member]};
encode_hll_op({add_all, Members}) when is_list(Members) ->
#hllop{adds=Members}.

%% @doc Decodes a MapUpdate message into a map field operation.
-spec decode_map_update(#mapupdate{}, type_mappings()) -> {map_field(), embedded_type_op()}.
decode_map_update(#mapupdate{field=#mapfield{name=N, type='COUNTER'=Type}, counter_op=#counterop{}=Op}, Mods) ->
Expand Down Expand Up @@ -412,6 +438,8 @@ decode_operation(#dtop{counter_op=#counterop{}=Op}, _) ->
decode_counter_op(Op);
decode_operation(#dtop{set_op=#setop{}=Op}, _) ->
decode_set_op(Op);
decode_operation(#dtop{hll_op=#hllop{}=Op}, _) ->
decode_hll_op(Op);
decode_operation(#dtop{map_op=#mapop{}=Op}, Mods) ->
decode_map_op(Op, Mods).

Expand All @@ -421,6 +449,8 @@ encode_operation(Op, counter) ->
#dtop{counter_op=encode_counter_op(Op)};
encode_operation(Op, set) ->
#dtop{set_op=encode_set_op(Op)};
encode_operation(Op, hll) ->
#dtop{hll_op=encode_hll_op(Op)};
encode_operation(Op, map) ->
#dtop{map_op=encode_map_op(Op)}.

Expand All @@ -431,6 +461,8 @@ operation_type(#dtop{counter_op=#counterop{}}) ->
counter;
operation_type(#dtop{set_op=#setop{}}) ->
set;
operation_type(#dtop{hll_op=#hllop{}}) ->
hll;
operation_type(#dtop{map_op=#mapop{}}) ->
map.

Expand Down Expand Up @@ -488,6 +520,9 @@ decode_update_response(#dtupdateresp{key=K}, _, false) ->
end;
decode_update_response(#dtupdateresp{counter_value=C, context=Ctx}=Resp, counter, true) ->
maybe_wrap_key({counter, C, Ctx}, Resp);
decode_update_response(#dtupdateresp{hll_value=Hll, context=Ctx}=Resp, hll,
true) ->
maybe_wrap_key({hll, Hll, Ctx}, Resp);
decode_update_response(#dtupdateresp{set_value=S, context=Ctx}=Resp, set, true) ->
maybe_wrap_key({set, S, Ctx}, Resp);
decode_update_response(#dtupdateresp{map_value=M, context=Ctx}=Resp, map, true) ->
Expand All @@ -497,17 +532,21 @@ maybe_wrap_key(Term, #dtupdateresp{key=undefined}) -> Term;
maybe_wrap_key(Term, #dtupdateresp{key=K}) -> {K, Term}.

%% @doc Encodes an update response into a DtUpdateResp message.
-spec encode_update_response(toplevel_type(), toplevel_value(), binary(), context()) -> #dtupdateresp{}.
-spec encode_update_response(toplevel_type(), toplevel_value(), binary(),
context()) -> #dtupdateresp{}.
encode_update_response(Type, Value, Key, Context) ->
encode_update_response(Type, Value, Key, Context, []).

%% @doc Encodes an update response into a DtUpdateResp message.
-spec encode_update_response(toplevel_type(), toplevel_value(), binary(), context(), type_mappings()) -> #dtupdateresp{}.
-spec encode_update_response(toplevel_type(), toplevel_value(), binary(),
context(), type_mappings()) -> #dtupdateresp{}.
encode_update_response(counter, Value, Key, Context, _Mods) ->
#dtupdateresp{key=Key, context=Context, counter_value=Value};
encode_update_response(set, Value, Key, Context, _Mods) ->
#dtupdateresp{key=Key, context=Context, set_value=Value};
encode_update_response(hll, Value, Key, Context, _Mods) ->
#dtupdateresp{key=Key, context=Context, hll_value=Value};
encode_update_response(map, Value, Key, Context, Mods) when is_list(Value) ->
#dtupdateresp{key=Key, context=Context,
map_value=[ encode_map_entry(Entry, Mods) || Value /= undefined,
Entry <- Value ]}.
map_value=[encode_map_entry(Entry, Mods)
|| Value /= undefined, Entry <- Value]}.
2 changes: 1 addition & 1 deletion test/bucket_props_codec_eqc.erl
Expand Up @@ -137,7 +137,7 @@ linkfun() ->
?LET({M,F}, {atom(), atom()}, {linkfun, {modfun, M, F}}).

datatype() ->
{datatype, elements([counter, set, map])}.
{datatype, elements([counter, set, map, hll])}.

yz_index() ->
{search_index, non_empty(binary())}.
Expand Down

0 comments on commit dd194ce

Please sign in to comment.