Skip to content

Commit

Permalink
Merge pull request #1405 from basho/feature-zl-add_expanded_is_crdt
Browse files Browse the repository at this point in the history
Feature zl add expanded is crdt [JIRA: RIAK-2531]

Reviewed-by: bsparrow435
  • Loading branch information
borshop committed May 3, 2016
2 parents 330cb06 + 0a1155c commit 4a82445
Showing 1 changed file with 95 additions and 2 deletions.
97 changes: 95 additions & 2 deletions src/riak_kv_crdt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
-export([log_merge_errors/4, meta/2, merge_value/2]).
%% MR helper funs
-export([value/1, counter_value/1, set_value/1, map_value/1]).
%% Other helper funs
-export([is_crdt/1, is_crdt/2]).

-include("riak_kv_wm_raw.hrl").
-include("riak_object.hrl").
Expand Down Expand Up @@ -128,6 +130,38 @@ map_value(RObj) ->
{{_Ctx, Map}, _Stats} = value(RObj, ?MAP_TYPE),
Map.

%% @doc convenience function for (e.g.) Yokozuna. Checks the bucket props for
%% the object, if it has a supported datatype entry, returns true; otherwise
%% false if not a 2.0 CRDT.
-spec is_crdt(riak_object:riak_object()) -> boolean().
is_crdt(RObj) ->
Bucket = riak_object:bucket(RObj),
case riak_core_bucket:get_bucket(Bucket) of
BProps when is_list(BProps) ->
is_crdt(RObj, BProps);
{error, _}=Err ->
Err;
_ ->
false
end.

-spec is_crdt(riak_object:riak_object(), riak_kv_bucket:props()) -> boolean().
is_crdt(RObj, BProps) when is_list(BProps) ->
Type = proplists:get_value(datatype, BProps),
Mod = riak_kv_crdt:to_mod(Type),
supported(Mod) andalso is_crdt_object(RObj);
is_crdt(_RObj, _BProps) ->
false.

-spec is_crdt_object(riak_object:riak_object()) -> boolean()|{error,_}.
is_crdt_object(RObj) ->
ObjVal = riak_object:get_value(RObj),
case ObjVal of
<<?TAG, _Rest/binary>> when is_binary(ObjVal) ->
true;
_ -> false
end.

%% @TODO in riak_dt change value to query allow query to take an
%% argument, (so as to query subfields of map, or set membership etc)
-spec crdt_value(module(), error | {ok, {dict(), crdt()}}) ->
Expand Down Expand Up @@ -337,6 +371,8 @@ later(TS1, TS2) ->
true
end.

-spec new(riak_object:bucket(), riak_object:key(), DT_MOD::module())
-> riak_object:riak_object().
new(B, K, Mod) ->
CRDT=#crdt{ctype=CType} = to_record(Mod, Mod:new()),
Bin = to_binary(CRDT),
Expand Down Expand Up @@ -488,6 +524,63 @@ get_context(Type, Value) ->
%% ===================================================================
-ifdef(TEST).

is_crdt_test_() ->
{setup,
fun() ->
meck:new(riak_core_bucket),
meck:new(riak_core_capability, []),
meck:expect(riak_core_capability, get,
fun({riak_kv, crdt}, []) ->
[pncounter,riak_dt_pncounter,riak_dt_orswot,
riak_dt_map];
(X, Y) -> meck:passthrough([X, Y]) end),
ok
end,
fun(_) ->
meck:unload(riak_core_capability),
meck:unload(riak_core_bucket)
end,
[
?_test(begin
meck:expect(riak_core_bucket, get_bucket,
fun(_Bucket) -> [{datatype, foo}] end),
Bucket = {<<"counterz">>, <<"crdt">>},
BTProps = riak_core_bucket:get_bucket(Bucket),
?assertEqual(foo, proplists:get_value(datatype, BTProps)),
?assertNot(is_crdt(riak_object:new(Bucket, <<"k1">>, hello)))
end),
?_test(begin
Bucket = {<<"t">>, <<"bucketjumpy">>},
?assertNot(is_crdt(riak_object:new(Bucket, <<"k1">>, hi)))
end),
?_test(begin
meck:expect(riak_core_bucket, get_bucket,
fun({<<"maps">>, _Name}) -> [{datatype, map}];
({<<"sets">>, _Name}) -> [{datatype, set}];
({<<"counters">>, _Name}) ->
[{datatype, counter}];
({<<"mappyz">>, _Name}) -> [];
({X, Y}) -> meck:passthrough([X, Y]) end),
Bucket1 = {<<"maps">>, <<"crdt">>},
Bucket2 = {<<"sets">>, <<"crdt">>},
Bucket3 = {<<"counters">>, <<"crdt">>},
Bucket4 = {<<"mappyz">>, <<"crdt">>},
BTPropsMap = riak_core_bucket:get_bucket(Bucket1),
BTPropsSet = riak_core_bucket:get_bucket(Bucket2),
BTPropsCounter = riak_core_bucket:get_bucket(Bucket3),
?assertEqual(map, proplists:get_value(datatype, BTPropsMap)),
?assertEqual(set, proplists:get_value(datatype, BTPropsSet)),
?assertEqual(counter,
proplists:get_value(datatype, BTPropsCounter)),
[?assert(is_crdt(riak_kv_crdt:new(B, K, Mod)))
|| {B, K, Mod} <- [{Bucket1, <<"k1">>, riak_dt_map},
{Bucket2, <<"k2">>, riak_dt_orswot},
{Bucket3, <<"k3">>, riak_dt_pncounter}]],
?assertNot(is_crdt(riak_kv_crdt:new(Bucket4, <<"k5">>,
riak_dt_map))),
?assertNot(is_crdt(riak_object:new(Bucket1, <<"k6">>,
<<"classic">>)))
end)]}.

-ifdef(EQC).
-define(QC_OUT(P),
Expand All @@ -503,13 +596,13 @@ eqc_test_() ->
?_test(?TIMED_QC(prop_binary_roundtrip()))}.

prop_binary_roundtrip() ->
?FORALL({_Type, Mod}, oneof(?MOD_MAP),
?FORALL({_Type, Mod}, oneof(?MOD_MAP),
begin
{ok, ?CRDT{mod=SMod, value=SValue}} = from_binary(to_binary(?CRDT{mod=Mod, value=Mod:new()})),
conjunction([{module, equals(Mod, SMod)},
{value, Mod:equal(SValue, Mod:new())}])
end).


-endif.
-endif.

0 comments on commit 4a82445

Please sign in to comment.