From bfab43db231726f678f3f1cbd28e01e8ed9d44da Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Wed, 30 Mar 2016 15:22:20 -0400 Subject: [PATCH 1/2] PB Updates for HLL Datatypes --- dialyzer.ignore-warnings | 3 +- src/riak.proto | 3 ++ src/riak_dt.proto | 25 ++++++++++++- src/riak_pb_codec.erl | 14 ++++--- src/riak_pb_dt_codec.erl | 65 ++++++++++++++++++++++++++------- test/bucket_props_codec_eqc.erl | 2 +- 6 files changed, 90 insertions(+), 22 deletions(-) diff --git a/dialyzer.ignore-warnings b/dialyzer.ignore-warnings index 0d60fa46..cf3a48c2 100644 --- a/dialyzer.ignore-warnings +++ b/dialyzer.ignore-warnings @@ -7,5 +7,6 @@ riak_ts_pb.erl riak_yokozuna_pb.erl ## Insufficient typing on record in generated code Invalid type specification for function riak_pb_codec:decode_commit_hooks/1. The success typing is ([any()]) -> [{atom(),atom() | [any(),...]} | {'modfun',atom(),atom() | [any(),...]}] +riak_pb_codec.erl:353: Invalid type specification for function riak_pb_codec:decode_commit_hooks/1. The success typing is ([any()]) -> [{atom(),atom() | [any(),...]} | {'modfun',atom(),atom() | [any(),...]}] ## Internal calls only pass empty-list, but calls from other libraries pass proplists -riak_pb_dt_codec.erl:181: The pattern {AtomType, _} can never match the type 'false' +riak_pb_dt_codec.erl:187: The pattern {AtomType, _} can never match the type 'false' diff --git a/src/riak.proto b/src/riak.proto index 19aa929f..6050c7c4 100644 --- a/src/riak.proto +++ b/src/riak.proto @@ -157,6 +157,9 @@ message RpbBucketProps { // KV fast path optional bool write_once = 28; + + // Hyperlolog DT Precision + optional uint32 hll_precision = 29; } // Authentication request diff --git a/src/riak_dt.proto b/src/riak_dt.proto index f6e31e47..3e344f2d 100644 --- a/src/riak_dt.proto +++ b/src/riak_dt.proto @@ -22,7 +22,8 @@ */ /* -** Revision: 2.0 +%% TODO: Set for whichever release version we use +** Revision: 2.3.0 */ // Java package specifiers @@ -107,6 +108,11 @@ message DtValue { optional sint64 counter_value = 1; repeated bytes set_value = 2; repeated MapEntry map_value = 3; + + /* We return an estimated cardinality of the Hyperloglog set + * on fetch. + */ + optional uint64 hll_value = 4; } @@ -124,6 +130,7 @@ message DtFetchResp { COUNTER = 1; SET = 2; MAP = 3; + HLL = 4; } optional bytes context = 1; @@ -154,6 +161,14 @@ message SetOp { repeated bytes removes = 2; } +/* + * An operation to update a Hyperloglog Set, a top-level DT. + * You can only add to a HllSet. + */ +message HllOp { + repeated bytes adds = 1; +} + /* * An operation to be applied to a value stored in a Map -- the * contents of an UPDATE operation. The operation field that is @@ -205,6 +220,11 @@ message DtOp { optional CounterOp counter_op = 1; optional SetOp set_op = 2; optional MapOp map_op = 3; + + /* Adding values to a hyperloglog (set) is just like adding values + * to a set. + */ + optional HllOp hll_op = 4; } /* @@ -251,4 +271,5 @@ message DtUpdateResp { optional sint64 counter_value = 3; repeated bytes set_value = 4; repeated MapEntry map_value = 5; -} \ No newline at end of file + optional uint64 hll_value = 6; +} diff --git a/src/riak_pb_codec.erl b/src/riak_pb_codec.erl index 3b95f29b..92eef33e 100644 --- a/src/riak_pb_codec.erl +++ b/src/riak_pb_codec.erl @@ -223,12 +223,14 @@ decode_bucket_props(#rpbbucketprops{n_val=N, search_index=Index, datatype=Datatype, consistent=Consistent, - write_once=WriteOnce + write_once=WriteOnce, + hll_precision=HllPrecision }) -> %% Extract numerical properties - [ {P,V} || {P,V} <- [ {n_val, N}, {old_vclock, Old}, {young_vclock, Young}, - {big_vclock, Big}, {small_vclock, Small} ], - V /= undefined ] ++ + [ {P,V} || {P,V} <- [{n_val, N}, {old_vclock, Old}, {young_vclock, Young}, + {big_vclock, Big}, {small_vclock, Small}, + {hll_precision, HllPrecision}], + V /= undefined ] ++ %% Extract booleans [ {BProp, decode_bool(Bool)} || {BProp, Bool} <- [{allow_mult, AM}, {last_write_wins, LWW}, @@ -328,6 +330,8 @@ encode_bucket_props([{consistent, S}|Rest], Pb) -> encode_bucket_props(Rest, Pb#rpbbucketprops{consistent = encode_bool(S)}); encode_bucket_props([{write_once, S}|Rest], Pb) -> encode_bucket_props(Rest, Pb#rpbbucketprops{write_once = encode_bool(S)}); +encode_bucket_props([{hll_precision, Num}|Rest], Pb) -> + encode_bucket_props(Rest, Pb#rpbbucketprops{hll_precision = Num}); encode_bucket_props([_Ignore|Rest], Pb) -> %% Ignore any properties not explicitly part of the PB message encode_bucket_props(Rest, Pb). @@ -383,7 +387,7 @@ encode_commit_hook({struct, Props}=Hook) -> -spec decode_commit_hooks([ #rpbcommithook{} ]) -> [ commit_hook_property() ]. decode_commit_hooks(Hooks) -> [ decode_commit_hook(Hook) || Hook <- Hooks, - Hook =/= #rpbcommithook{modfun=undefined, name=undefined} ]. + Hook =/= #rpbcommithook{modfun=undefined, name=undefined} ]. decode_commit_hook(#rpbcommithook{modfun = Modfun}) when Modfun =/= undefined -> decode_modfun(Modfun, commit_hook); diff --git a/src/riak_pb_dt_codec.erl b/src/riak_pb_dt_codec.erl index 25131425..444a30f2 100644 --- a/src/riak_pb_dt_codec.erl +++ b/src/riak_pb_dt_codec.erl @@ -48,29 +48,34 @@ -type context() :: binary() | undefined. -type counter_value() :: integer(). -type set_value() :: [ binary() ]. +-type hll_value() :: number(). -type register_value() :: binary(). -type flag_value() :: boolean(). -type map_entry() :: {map_field(), embedded_value()}. -type map_field() :: {binary(), embedded_type()}. -type map_value() :: [ map_entry() ]. --type embedded_value() :: counter_value() | set_value() | register_value() | flag_value() | map_value(). --type toplevel_value() :: counter_value() | set_value() | map_value() | undefined. +-type embedded_value() :: counter_value() | set_value() | register_value() + | flag_value() | map_value(). +-type toplevel_value() :: counter_value() | set_value() | map_value() + | hll_value() | undefined. -type fetch_response() :: {toplevel_type(), toplevel_value(), context()}. %% Type names as atoms -type embedded_type() :: counter | set | register | flag | map. --type toplevel_type() :: counter | set | map. +-type toplevel_type() :: counter | set | map | hll. +-type all_type() :: toplevel_type() | embedded_type(). %% Operations -type counter_op() :: increment | decrement | {increment | decrement, integer()}. -type simple_set_op() :: {add, binary()} | {remove, binary()} | {add_all, [binary()]} | {remove_all, [binary()]}. -type set_op() :: simple_set_op() | {update, [simple_set_op()]}. +-type hll_op() :: {add, binary()} | {add_all, [binary()]}. -type flag_op() :: enable | disable. -type register_op() :: {assign, binary()}. -type simple_map_op() :: {remove, map_field()} | {update, map_field(), embedded_type_op()}. -type map_op() :: simple_map_op() | {update, [simple_map_op()]}. -type embedded_type_op() :: counter_op() | set_op() | register_op() | flag_op() | map_op(). --type toplevel_op() :: counter_op() | set_op() | map_op(). +-type toplevel_op() :: counter_op() | set_op() | map_op() | hll_op(). -type update() :: {toplevel_type(), toplevel_op(), context()}. %% Request options @@ -89,7 +94,7 @@ include_context | {include_context, boolean()}. %% Server-side type<->module mappings --type type_mappings() :: [{embedded_type(), module()}]. +-type type_mappings() :: [{all_type(), module()}]. %% ========================= @@ -165,10 +170,11 @@ decode_type(PBType, Mods) -> AtomType = decode_type(PBType), proplists:get_value(AtomType, Mods, AtomType). -%% @doc Decodes a PB message type name into an atom. --spec decode_type(atom()) -> atom(). +%% @doc Decodes a PB message type name into an atom type name. +-spec decode_type(atom()) -> all_type(). decode_type('COUNTER') -> counter; decode_type('SET') -> set; +decode_type('HLL') -> hll; decode_type('REGISTER') -> register; decode_type('FLAG') -> flag; decode_type('MAP') -> map. @@ -185,9 +191,10 @@ encode_type(TypeOrMod, Mods) -> end. %% @doc Encodes an atom type name into the PB message equivalent. --spec encode_type(atom()) -> atom(). +-spec encode_type(all_type()) -> atom(). encode_type(counter) -> 'COUNTER'; encode_type(set) -> 'SET'; +encode_type(hll) -> 'HLL'; encode_type(register) -> 'REGISTER'; encode_type(flag) -> 'FLAG'; encode_type(map) -> 'MAP'. @@ -197,6 +204,7 @@ encode_flag_value(on) -> true; encode_flag_value(off) -> false; encode_flag_value(Other) -> Other. + %% ======================== %% FETCH REQUEST / RESPONSE %% ======================== @@ -252,6 +260,9 @@ decode_fetch_response(#dtfetchresp{context=Context, type='COUNTER', decode_fetch_response(#dtfetchresp{context=Context, type='SET', value=#dtvalue{set_value=Val}}) -> {set, Val, Context}; +decode_fetch_response(#dtfetchresp{context=Context, type='HLL', + value=#dtvalue{hll_value=Val}}) -> + {hll, Val, Context}; decode_fetch_response(#dtfetchresp{context=Context, type='MAP', value=#dtvalue{map_value=Val}}) -> {map, [ decode_map_entry(Entry) || Entry <- Val ], Context}. @@ -262,7 +273,8 @@ encode_fetch_response(Type, Value, Context) -> encode_fetch_response(Type, Value, Context, []). %% @doc Encodes the result of a fetch request into a FetchResponse message. --spec encode_fetch_response(toplevel_type(), toplevel_value(), context(), type_mappings()) -> #dtfetchresp{}. +-spec encode_fetch_response(toplevel_type(), toplevel_value(), context(), + type_mappings()) -> #dtfetchresp{}. encode_fetch_response(Type, undefined, _Context, _Mods) -> #dtfetchresp{type=encode_type(Type)}; encode_fetch_response(Type, Value, Context, Mods) -> @@ -272,6 +284,8 @@ encode_fetch_response(Type, Value, Context, Mods) -> Response#dtfetchresp{value=#dtvalue{counter_value=Value}}; set -> Response#dtfetchresp{value=#dtvalue{set_value=Value}}; + hll -> + Response#dtfetchresp{value=#dtvalue{hll_value=Value}}; map -> Response#dtfetchresp{value=#dtvalue{map_value=[encode_map_entry(Entry, Mods) || Entry <- Value]}} end. @@ -338,6 +352,18 @@ decode_flag_op('DISABLE') -> disable. encode_flag_op(enable) -> 'ENABLE'; encode_flag_op(disable) -> 'DISABLE'. +%% @doc Decodes a HllOp message into a hll operation. +-spec decode_hll_op(#hllop{}) -> hll_op(). +decode_hll_op(#hllop{adds=A}) -> + {add_all, A}. + +%% @doc Encodes an hll(set) update into the HllOp message. +-spec encode_hll_op(hll_op()) -> #hllop{}. +encode_hll_op({add, Member}) when is_binary(Member) -> + #hllop{adds=[Member]}; +encode_hll_op({add_all, Members}) when is_list(Members) -> + #hllop{adds=Members}. + %% @doc Decodes a MapUpdate message into a map field operation. -spec decode_map_update(#mapupdate{}, type_mappings()) -> {map_field(), embedded_type_op()}. decode_map_update(#mapupdate{field=#mapfield{name=N, type='COUNTER'=Type}, counter_op=#counterop{}=Op}, Mods) -> @@ -412,6 +438,8 @@ decode_operation(#dtop{counter_op=#counterop{}=Op}, _) -> decode_counter_op(Op); decode_operation(#dtop{set_op=#setop{}=Op}, _) -> decode_set_op(Op); +decode_operation(#dtop{hll_op=#hllop{}=Op}, _) -> + decode_hll_op(Op); decode_operation(#dtop{map_op=#mapop{}=Op}, Mods) -> decode_map_op(Op, Mods). @@ -421,6 +449,8 @@ encode_operation(Op, counter) -> #dtop{counter_op=encode_counter_op(Op)}; encode_operation(Op, set) -> #dtop{set_op=encode_set_op(Op)}; +encode_operation(Op, hll) -> + #dtop{hll_op=encode_hll_op(Op)}; encode_operation(Op, map) -> #dtop{map_op=encode_map_op(Op)}. @@ -431,6 +461,8 @@ operation_type(#dtop{counter_op=#counterop{}}) -> counter; operation_type(#dtop{set_op=#setop{}}) -> set; +operation_type(#dtop{hll_op=#hllop{}}) -> + hll; operation_type(#dtop{map_op=#mapop{}}) -> map. @@ -488,6 +520,9 @@ decode_update_response(#dtupdateresp{key=K}, _, false) -> end; decode_update_response(#dtupdateresp{counter_value=C, context=Ctx}=Resp, counter, true) -> maybe_wrap_key({counter, C, Ctx}, Resp); +decode_update_response(#dtupdateresp{hll_value=Hll, context=Ctx}=Resp, hll, + true) -> + maybe_wrap_key({hll, Hll, Ctx}, Resp); decode_update_response(#dtupdateresp{set_value=S, context=Ctx}=Resp, set, true) -> maybe_wrap_key({set, S, Ctx}, Resp); decode_update_response(#dtupdateresp{map_value=M, context=Ctx}=Resp, map, true) -> @@ -497,17 +532,21 @@ maybe_wrap_key(Term, #dtupdateresp{key=undefined}) -> Term; maybe_wrap_key(Term, #dtupdateresp{key=K}) -> {K, Term}. %% @doc Encodes an update response into a DtUpdateResp message. --spec encode_update_response(toplevel_type(), toplevel_value(), binary(), context()) -> #dtupdateresp{}. +-spec encode_update_response(toplevel_type(), toplevel_value(), binary(), + context()) -> #dtupdateresp{}. encode_update_response(Type, Value, Key, Context) -> encode_update_response(Type, Value, Key, Context, []). %% @doc Encodes an update response into a DtUpdateResp message. --spec encode_update_response(toplevel_type(), toplevel_value(), binary(), context(), type_mappings()) -> #dtupdateresp{}. +-spec encode_update_response(toplevel_type(), toplevel_value(), binary(), + context(), type_mappings()) -> #dtupdateresp{}. encode_update_response(counter, Value, Key, Context, _Mods) -> #dtupdateresp{key=Key, context=Context, counter_value=Value}; encode_update_response(set, Value, Key, Context, _Mods) -> #dtupdateresp{key=Key, context=Context, set_value=Value}; +encode_update_response(hll, Value, Key, Context, _Mods) -> + #dtupdateresp{key=Key, context=Context, hll_value=Value}; encode_update_response(map, Value, Key, Context, Mods) when is_list(Value) -> #dtupdateresp{key=Key, context=Context, - map_value=[ encode_map_entry(Entry, Mods) || Value /= undefined, - Entry <- Value ]}. + map_value=[encode_map_entry(Entry, Mods) + || Value /= undefined, Entry <- Value]}. diff --git a/test/bucket_props_codec_eqc.erl b/test/bucket_props_codec_eqc.erl index 6fc57a27..996d6436 100644 --- a/test/bucket_props_codec_eqc.erl +++ b/test/bucket_props_codec_eqc.erl @@ -137,7 +137,7 @@ linkfun() -> ?LET({M,F}, {atom(), atom()}, {linkfun, {modfun, M, F}}). datatype() -> - {datatype, elements([counter, set, map])}. + {datatype, elements([counter, set, map, hll])}. yz_index() -> {search_index, non_empty(binary())}. From 84ec1f4cbd8264af527a9b89212f37b90f7745a1 Mon Sep 17 00:00:00 2001 From: Nick Marino Date: Tue, 16 Aug 2016 13:53:58 -0400 Subject: [PATCH 2/2] Move Revision to 2.2.0 since HLL will be in 2.2 Ran this by Zeeshan, he said go ahead. --- src/riak_dt.proto | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/riak_dt.proto b/src/riak_dt.proto index 3e344f2d..06f19fba 100644 --- a/src/riak_dt.proto +++ b/src/riak_dt.proto @@ -22,8 +22,7 @@ */ /* -%% TODO: Set for whichever release version we use -** Revision: 2.3.0 +** Revision: 2.2.0 */ // Java package specifiers