Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PB Updates for HLL Datatypes #183

Merged
merged 2 commits into from Aug 16, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion dialyzer.ignore-warnings
Expand Up @@ -7,5 +7,6 @@ riak_ts_pb.erl
riak_yokozuna_pb.erl
## Insufficient typing on record in generated code
Invalid type specification for function riak_pb_codec:decode_commit_hooks/1. The success typing is ([any()]) -> [{atom(),atom() | [any(),...]} | {'modfun',atom(),atom() | [any(),...]}]
riak_pb_codec.erl:353: Invalid type specification for function riak_pb_codec:decode_commit_hooks/1. The success typing is ([any()]) -> [{atom(),atom() | [any(),...]} | {'modfun',atom(),atom() | [any(),...]}]
## Internal calls only pass empty-list, but calls from other libraries pass proplists
riak_pb_dt_codec.erl:181: The pattern {AtomType, _} can never match the type 'false'
riak_pb_dt_codec.erl:187: The pattern {AtomType, _} can never match the type 'false'
3 changes: 3 additions & 0 deletions src/riak.proto
Expand Up @@ -157,6 +157,9 @@ message RpbBucketProps {

// KV fast path
optional bool write_once = 28;

// Hyperlolog DT Precision
optional uint32 hll_precision = 29;
}

// Authentication request
Expand Down
24 changes: 22 additions & 2 deletions src/riak_dt.proto
Expand Up @@ -22,7 +22,7 @@
*/

/*
** Revision: 2.0
** Revision: 2.2.0
*/

// Java package specifiers
Expand Down Expand Up @@ -107,6 +107,11 @@ message DtValue {
optional sint64 counter_value = 1;
repeated bytes set_value = 2;
repeated MapEntry map_value = 3;

/* We return an estimated cardinality of the Hyperloglog set
* on fetch.
*/
optional uint64 hll_value = 4;
}


Expand All @@ -124,6 +129,7 @@ message DtFetchResp {
COUNTER = 1;
SET = 2;
MAP = 3;
HLL = 4;
}

optional bytes context = 1;
Expand Down Expand Up @@ -154,6 +160,14 @@ message SetOp {
repeated bytes removes = 2;
}

/*
* An operation to update a Hyperloglog Set, a top-level DT.
* You can only add to a HllSet.
*/
message HllOp {
repeated bytes adds = 1;
}

/*
* An operation to be applied to a value stored in a Map -- the
* contents of an UPDATE operation. The operation field that is
Expand Down Expand Up @@ -205,6 +219,11 @@ message DtOp {
optional CounterOp counter_op = 1;
optional SetOp set_op = 2;
optional MapOp map_op = 3;

/* Adding values to a hyperloglog (set) is just like adding values
* to a set.
*/
optional HllOp hll_op = 4;
}

/*
Expand Down Expand Up @@ -251,4 +270,5 @@ message DtUpdateResp {
optional sint64 counter_value = 3;
repeated bytes set_value = 4;
repeated MapEntry map_value = 5;
}
optional uint64 hll_value = 6;
}
14 changes: 9 additions & 5 deletions src/riak_pb_codec.erl
Expand Up @@ -223,12 +223,14 @@ decode_bucket_props(#rpbbucketprops{n_val=N,
search_index=Index,
datatype=Datatype,
consistent=Consistent,
write_once=WriteOnce
write_once=WriteOnce,
hll_precision=HllPrecision
}) ->
%% Extract numerical properties
[ {P,V} || {P,V} <- [ {n_val, N}, {old_vclock, Old}, {young_vclock, Young},
{big_vclock, Big}, {small_vclock, Small} ],
V /= undefined ] ++
[ {P,V} || {P,V} <- [{n_val, N}, {old_vclock, Old}, {young_vclock, Young},
{big_vclock, Big}, {small_vclock, Small},
{hll_precision, HllPrecision}],
V /= undefined ] ++
%% Extract booleans
[ {BProp, decode_bool(Bool)} ||
{BProp, Bool} <- [{allow_mult, AM}, {last_write_wins, LWW},
Expand Down Expand Up @@ -328,6 +330,8 @@ encode_bucket_props([{consistent, S}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{consistent = encode_bool(S)});
encode_bucket_props([{write_once, S}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{write_once = encode_bool(S)});
encode_bucket_props([{hll_precision, Num}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{hll_precision = Num});
encode_bucket_props([_Ignore|Rest], Pb) ->
%% Ignore any properties not explicitly part of the PB message
encode_bucket_props(Rest, Pb).
Expand Down Expand Up @@ -383,7 +387,7 @@ encode_commit_hook({struct, Props}=Hook) ->
-spec decode_commit_hooks([ #rpbcommithook{} ]) -> [ commit_hook_property() ].
decode_commit_hooks(Hooks) ->
[ decode_commit_hook(Hook) || Hook <- Hooks,
Hook =/= #rpbcommithook{modfun=undefined, name=undefined} ].
Hook =/= #rpbcommithook{modfun=undefined, name=undefined} ].

decode_commit_hook(#rpbcommithook{modfun = Modfun}) when Modfun =/= undefined ->
decode_modfun(Modfun, commit_hook);
Expand Down
65 changes: 52 additions & 13 deletions src/riak_pb_dt_codec.erl
Expand Up @@ -48,29 +48,34 @@
-type context() :: binary() | undefined.
-type counter_value() :: integer().
-type set_value() :: [ binary() ].
-type hll_value() :: number().
-type register_value() :: binary().
-type flag_value() :: boolean().
-type map_entry() :: {map_field(), embedded_value()}.
-type map_field() :: {binary(), embedded_type()}.
-type map_value() :: [ map_entry() ].
-type embedded_value() :: counter_value() | set_value() | register_value() | flag_value() | map_value().
-type toplevel_value() :: counter_value() | set_value() | map_value() | undefined.
-type embedded_value() :: counter_value() | set_value() | register_value()
| flag_value() | map_value().
-type toplevel_value() :: counter_value() | set_value() | map_value()
| hll_value() | undefined.
-type fetch_response() :: {toplevel_type(), toplevel_value(), context()}.

%% Type names as atoms
-type embedded_type() :: counter | set | register | flag | map.
-type toplevel_type() :: counter | set | map.
-type toplevel_type() :: counter | set | map | hll.
-type all_type() :: toplevel_type() | embedded_type().

%% Operations
-type counter_op() :: increment | decrement | {increment | decrement, integer()}.
-type simple_set_op() :: {add, binary()} | {remove, binary()} | {add_all, [binary()]} | {remove_all, [binary()]}.
-type set_op() :: simple_set_op() | {update, [simple_set_op()]}.
-type hll_op() :: {add, binary()} | {add_all, [binary()]}.
-type flag_op() :: enable | disable.
-type register_op() :: {assign, binary()}.
-type simple_map_op() :: {remove, map_field()} | {update, map_field(), embedded_type_op()}.
-type map_op() :: simple_map_op() | {update, [simple_map_op()]}.
-type embedded_type_op() :: counter_op() | set_op() | register_op() | flag_op() | map_op().
-type toplevel_op() :: counter_op() | set_op() | map_op().
-type toplevel_op() :: counter_op() | set_op() | map_op() | hll_op().
-type update() :: {toplevel_type(), toplevel_op(), context()}.

%% Request options
Expand All @@ -89,7 +94,7 @@
include_context | {include_context, boolean()}.

%% Server-side type<->module mappings
-type type_mappings() :: [{embedded_type(), module()}].
-type type_mappings() :: [{all_type(), module()}].


%% =========================
Expand Down Expand Up @@ -165,10 +170,11 @@ decode_type(PBType, Mods) ->
AtomType = decode_type(PBType),
proplists:get_value(AtomType, Mods, AtomType).

%% @doc Decodes a PB message type name into an atom.
-spec decode_type(atom()) -> atom().
%% @doc Decodes a PB message type name into an atom type name.
-spec decode_type(atom()) -> all_type().
decode_type('COUNTER') -> counter;
decode_type('SET') -> set;
decode_type('HLL') -> hll;
decode_type('REGISTER') -> register;
decode_type('FLAG') -> flag;
decode_type('MAP') -> map.
Expand All @@ -185,9 +191,10 @@ encode_type(TypeOrMod, Mods) ->
end.

%% @doc Encodes an atom type name into the PB message equivalent.
-spec encode_type(atom()) -> atom().
-spec encode_type(all_type()) -> atom().
encode_type(counter) -> 'COUNTER';
encode_type(set) -> 'SET';
encode_type(hll) -> 'HLL';
encode_type(register) -> 'REGISTER';
encode_type(flag) -> 'FLAG';
encode_type(map) -> 'MAP'.
Expand All @@ -197,6 +204,7 @@ encode_flag_value(on) -> true;
encode_flag_value(off) -> false;
encode_flag_value(Other) -> Other.


%% ========================
%% FETCH REQUEST / RESPONSE
%% ========================
Expand Down Expand Up @@ -252,6 +260,9 @@ decode_fetch_response(#dtfetchresp{context=Context, type='COUNTER',
decode_fetch_response(#dtfetchresp{context=Context, type='SET',
value=#dtvalue{set_value=Val}}) ->
{set, Val, Context};
decode_fetch_response(#dtfetchresp{context=Context, type='HLL',
value=#dtvalue{hll_value=Val}}) ->
{hll, Val, Context};
decode_fetch_response(#dtfetchresp{context=Context, type='MAP',
value=#dtvalue{map_value=Val}}) ->
{map, [ decode_map_entry(Entry) || Entry <- Val ], Context}.
Expand All @@ -262,7 +273,8 @@ encode_fetch_response(Type, Value, Context) ->
encode_fetch_response(Type, Value, Context, []).

%% @doc Encodes the result of a fetch request into a FetchResponse message.
-spec encode_fetch_response(toplevel_type(), toplevel_value(), context(), type_mappings()) -> #dtfetchresp{}.
-spec encode_fetch_response(toplevel_type(), toplevel_value(), context(),
type_mappings()) -> #dtfetchresp{}.
encode_fetch_response(Type, undefined, _Context, _Mods) ->
#dtfetchresp{type=encode_type(Type)};
encode_fetch_response(Type, Value, Context, Mods) ->
Expand All @@ -272,6 +284,8 @@ encode_fetch_response(Type, Value, Context, Mods) ->
Response#dtfetchresp{value=#dtvalue{counter_value=Value}};
set ->
Response#dtfetchresp{value=#dtvalue{set_value=Value}};
hll ->
Response#dtfetchresp{value=#dtvalue{hll_value=Value}};
map ->
Response#dtfetchresp{value=#dtvalue{map_value=[encode_map_entry(Entry, Mods) || Entry <- Value]}}
end.
Expand Down Expand Up @@ -338,6 +352,18 @@ decode_flag_op('DISABLE') -> disable.
encode_flag_op(enable) -> 'ENABLE';
encode_flag_op(disable) -> 'DISABLE'.

%% @doc Decodes a HllOp message into a hll operation.
-spec decode_hll_op(#hllop{}) -> hll_op().
decode_hll_op(#hllop{adds=A}) ->
{add_all, A}.

%% @doc Encodes an hll(set) update into the HllOp message.
-spec encode_hll_op(hll_op()) -> #hllop{}.
encode_hll_op({add, Member}) when is_binary(Member) ->
#hllop{adds=[Member]};
encode_hll_op({add_all, Members}) when is_list(Members) ->
#hllop{adds=Members}.

%% @doc Decodes a MapUpdate message into a map field operation.
-spec decode_map_update(#mapupdate{}, type_mappings()) -> {map_field(), embedded_type_op()}.
decode_map_update(#mapupdate{field=#mapfield{name=N, type='COUNTER'=Type}, counter_op=#counterop{}=Op}, Mods) ->
Expand Down Expand Up @@ -412,6 +438,8 @@ decode_operation(#dtop{counter_op=#counterop{}=Op}, _) ->
decode_counter_op(Op);
decode_operation(#dtop{set_op=#setop{}=Op}, _) ->
decode_set_op(Op);
decode_operation(#dtop{hll_op=#hllop{}=Op}, _) ->
decode_hll_op(Op);
decode_operation(#dtop{map_op=#mapop{}=Op}, Mods) ->
decode_map_op(Op, Mods).

Expand All @@ -421,6 +449,8 @@ encode_operation(Op, counter) ->
#dtop{counter_op=encode_counter_op(Op)};
encode_operation(Op, set) ->
#dtop{set_op=encode_set_op(Op)};
encode_operation(Op, hll) ->
#dtop{hll_op=encode_hll_op(Op)};
encode_operation(Op, map) ->
#dtop{map_op=encode_map_op(Op)}.

Expand All @@ -431,6 +461,8 @@ operation_type(#dtop{counter_op=#counterop{}}) ->
counter;
operation_type(#dtop{set_op=#setop{}}) ->
set;
operation_type(#dtop{hll_op=#hllop{}}) ->
hll;
operation_type(#dtop{map_op=#mapop{}}) ->
map.

Expand Down Expand Up @@ -488,6 +520,9 @@ decode_update_response(#dtupdateresp{key=K}, _, false) ->
end;
decode_update_response(#dtupdateresp{counter_value=C, context=Ctx}=Resp, counter, true) ->
maybe_wrap_key({counter, C, Ctx}, Resp);
decode_update_response(#dtupdateresp{hll_value=Hll, context=Ctx}=Resp, hll,
true) ->
maybe_wrap_key({hll, Hll, Ctx}, Resp);
decode_update_response(#dtupdateresp{set_value=S, context=Ctx}=Resp, set, true) ->
maybe_wrap_key({set, S, Ctx}, Resp);
decode_update_response(#dtupdateresp{map_value=M, context=Ctx}=Resp, map, true) ->
Expand All @@ -497,17 +532,21 @@ maybe_wrap_key(Term, #dtupdateresp{key=undefined}) -> Term;
maybe_wrap_key(Term, #dtupdateresp{key=K}) -> {K, Term}.

%% @doc Encodes an update response into a DtUpdateResp message.
-spec encode_update_response(toplevel_type(), toplevel_value(), binary(), context()) -> #dtupdateresp{}.
-spec encode_update_response(toplevel_type(), toplevel_value(), binary(),
context()) -> #dtupdateresp{}.
encode_update_response(Type, Value, Key, Context) ->
encode_update_response(Type, Value, Key, Context, []).

%% @doc Encodes an update response into a DtUpdateResp message.
-spec encode_update_response(toplevel_type(), toplevel_value(), binary(), context(), type_mappings()) -> #dtupdateresp{}.
-spec encode_update_response(toplevel_type(), toplevel_value(), binary(),
context(), type_mappings()) -> #dtupdateresp{}.
encode_update_response(counter, Value, Key, Context, _Mods) ->
#dtupdateresp{key=Key, context=Context, counter_value=Value};
encode_update_response(set, Value, Key, Context, _Mods) ->
#dtupdateresp{key=Key, context=Context, set_value=Value};
encode_update_response(hll, Value, Key, Context, _Mods) ->
#dtupdateresp{key=Key, context=Context, hll_value=Value};
encode_update_response(map, Value, Key, Context, Mods) when is_list(Value) ->
#dtupdateresp{key=Key, context=Context,
map_value=[ encode_map_entry(Entry, Mods) || Value /= undefined,
Entry <- Value ]}.
map_value=[encode_map_entry(Entry, Mods)
|| Value /= undefined, Entry <- Value]}.
2 changes: 1 addition & 1 deletion test/bucket_props_codec_eqc.erl
Expand Up @@ -137,7 +137,7 @@ linkfun() ->
?LET({M,F}, {atom(), atom()}, {linkfun, {modfun, M, F}}).

datatype() ->
{datatype, elements([counter, set, map])}.
{datatype, elements([counter, set, map, hll])}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had no idea we had this duplicate list here. I guess we can't depend on riak_kv_types for this. Eugh. OK.


yz_index() ->
{search_index, non_empty(binary())}.
Expand Down