Permalink
Browse files

Merge pull request #552 from basho/slf-backend-capability-enc+no_get

  • Loading branch information...
2 parents 50289fe + 15fb639 commit 146e3ca047139311ff705e74def7dee2ab97a6c4 @slfritchie slfritchie committed May 17, 2013
Showing with 159 additions and 75 deletions.
  1. +3 −1 rebar.config
  2. +80 −53 src/riak_kv_vnode.erl
  3. +76 −21 test/backend_eqc.erl
View
4 rebar.config
@@ -1,6 +1,8 @@
{cover_enabled, true}.
{edoc_opts, [{preprocess, true}]}.
-{erl_opts, [warnings_as_errors, {parse_transform, lager_transform}]}.
+{erl_opts, [warnings_as_errors,
+ {parse_transform, lager_transform},
+ {d, 'TEST_FS2_BACKEND_IN_RIAK_KV'}]}.
{eunit_opts, [verbose]}.
{erl_first_files, [
View
133 src/riak_kv_vnode.erl
@@ -482,7 +482,7 @@ handle_command({hashtree_pid, Node}, _, State=#state{hashtrees=HT}) ->
{reply, {error, wrong_node}, State}
end;
handle_command({rehash, Bucket, Key}, _, State=#state{mod=Mod, modstate=ModState}) ->
- case do_get_binary({Bucket, Key}, Mod, ModState) of
+ case do_get_binary(Bucket, Key, Mod, ModState) of
{ok, Bin, _UpdModState} ->
update_hashtree(Bucket, Key, Bin, State);
_ ->
@@ -938,7 +938,7 @@ prepare_put(#state{idx=Idx,
prunetime=PruneTime},
IndexBackend) ->
GetReply =
- case Mod:get(Bucket, Key, ModState) of
+ case do_get_object(Bucket, Key, Mod, ModState) of
{error, not_found, _UpdModState} ->
ok;
% NOTE: bad_crc is NOT an official backend response. It is
@@ -948,8 +948,8 @@ prepare_put(#state{idx=Idx,
{error, bad_crc, _UpdModState} ->
lager:info("Bad CRC detected while reading Partition=~p, Bucket=~p, Key=~p", [Idx, Bucket, Key]),
ok;
- {ok, GetVal, _UpdModState} ->
- {ok, GetVal}
+ {ok, TheOldObj, _UpdModState} ->
+ {ok, TheOldObj}
end,
case GetReply of
ok ->
@@ -966,8 +966,7 @@ prepare_put(#state{idx=Idx,
RObj
end,
{{true, ObjToStore}, PutArgs#putargs{index_specs=IndexSpecs, is_index=IndexBackend}};
- {ok, Val} ->
- OldObj = object_from_binary(Bucket, Key, Val),
+ {ok, OldObj} ->
case put_merge(Coord, LWW, OldObj, RObj, VId, StartTime) of
{oldobj, OldObj1} ->
{{false, OldObj1}, PutArgs};
@@ -1015,32 +1014,29 @@ perform_put({true, Obj},
bkey={Bucket, Key},
reqid=ReqID,
index_specs=IndexSpecs}) ->
- ObjFmt = riak_core_capability:get({riak_kv, object_format}, v0),
- Val = riak_object:to_binary(ObjFmt, Obj),
- case Mod:put(Bucket, Key, IndexSpecs, Val, ModState) of
- {ok, UpdModState} ->
- update_hashtree(Bucket, Key, Val, State),
+ case encode_and_put(Obj, Mod, Bucket, Key, IndexSpecs, ModState) of
+ {{ok, UpdModState}, EncodedVal} ->
+ update_hashtree(Bucket, Key, EncodedVal, State),
case RB of
true ->
Reply = {dw, Idx, Obj, ReqID};
false ->
Reply = {dw, Idx, ReqID}
end;
- {error, _Reason, UpdModState} ->
+ {{error, _Reason, UpdModState}, _EncodedVal} ->
Reply = {fail, Idx, ReqID}
end,
{Reply, State#state{modstate=UpdModState}}.
do_reformat({Bucket, Key}=BKey, State=#state{mod=Mod, modstate=ModState}) ->
- case Mod:get(Bucket, Key, ModState) of
+ case do_get_object(Bucket, Key, Mod, ModState) of
{error, not_found, _UpdModState} ->
Reply = {error, not_found},
UpdState = State;
- {ok, ObjBin, _UpdModState} ->
+ {ok, RObj, _UpdModState} ->
%% since it is assumed capabilities have been properly set
%% to the desired version, to reformat, all we need to do
%% is submit a new write
- RObj = riak_object:from_binary(Bucket, Key, ObjBin),
PutArgs = #putargs{returnbody=false,
bkey=BKey,
reqid=undefined,
@@ -1119,10 +1115,10 @@ do_get(_Sender, BKey, ReqID,
{reply, {r, Retval, Idx, ReqID}, State}.
%% @private
-do_get_term(BKey, Mod, ModState) ->
- case do_get_binary(BKey, Mod, ModState) of
- {ok, Bin, _UpdModState} ->
- {ok, object_from_binary(BKey, Bin)};
+do_get_term({Bucket, Key}, Mod, ModState) ->
+ case do_get_object(Bucket, Key, Mod, ModState) of
+ {ok, Obj, _UpdModState} ->
+ {ok, Obj};
%% @TODO Eventually it would be good to
%% make the use of not_found or notfound
%% consistent throughout the code.
@@ -1134,8 +1130,31 @@ do_get_term(BKey, Mod, ModState) ->
Err
end.
-do_get_binary({Bucket, Key}, Mod, ModState) ->
- Mod:get(Bucket, Key, ModState).
+do_get_binary(Bucket, Key, Mod, ModState) ->
+ case uses_r_object(Mod, ModState, Bucket) of
+ true ->
+ Mod:get_object(Bucket, Key, true, ModState);
+ false ->
+ Mod:get(Bucket, Key, ModState)
+ end.
+
+do_get_object(Bucket, Key, Mod, ModState) ->
+ case uses_r_object(Mod, ModState, Bucket) of
+ true ->
+ Mod:get_object(Bucket, Key, false, ModState);
+ false ->
+ case do_get_binary(Bucket, Key, Mod, ModState) of
+ {ok, ObjBin, _UpdModState} ->
+ case riak_object:from_binary(Bucket, Key, ObjBin) of
+ {error, R} ->
+ throw(R);
+ RObj ->
+ {ok, RObj, _UpdModState}
+ end;
+ Else ->
+ Else
+ end
+ end.
%% @private
%% @doc This is a generic function for operations that involve
@@ -1290,9 +1309,9 @@ do_get_vclocks(KeyList,_State=#state{mod=Mod,modstate=ModState}) ->
[{BKey, do_get_vclock(BKey,Mod,ModState)} || BKey <- KeyList].
%% @private
do_get_vclock({Bucket, Key}, Mod, ModState) ->
- case Mod:get(Bucket, Key, ModState) of
+ case do_get_object(Bucket, Key, Mod, ModState) of
{error, not_found, _UpdModState} -> vclock:fresh();
- {ok, Val, _UpdModState} -> riak_object:vclock(object_from_binary(Bucket, Key, Val))
+ {ok, Obj, _UpdModState} -> riak_object:vclock(Obj)
end.
%% @private
@@ -1304,27 +1323,25 @@ do_diffobj_put({Bucket, Key}, DiffObj,
StartTS = os:timestamp(),
{ok, Capabilities} = Mod:capabilities(Bucket, ModState),
IndexBackend = lists:member(indexes, Capabilities),
- case Mod:get(Bucket, Key, ModState) of
+ case do_get_object(Bucket, Key, Mod, ModState) of
{error, not_found, _UpdModState} ->
case IndexBackend of
true ->
IndexSpecs = riak_object:index_specs(DiffObj);
false ->
IndexSpecs = []
end,
- ObjFmt = riak_core_capability:get({riak_kv, object_format}, v0),
- Val = riak_object:to_binary(ObjFmt, DiffObj),
- Res = Mod:put(Bucket, Key, IndexSpecs, Val, ModState),
- case Res of
- {ok, _UpdModState} ->
- update_hashtree(Bucket, Key, Val, StateData),
+ case encode_and_put(DiffObj, Mod, Bucket, Key,
+ IndexSpecs, ModState) of
+ {{ok, _UpdModState} = InnerRes, EncodedVal} ->
+ update_hashtree(Bucket, Key, EncodedVal, StateData),
update_index_write_stats(IndexBackend, IndexSpecs),
- update_vnode_stats(vnode_put, Idx, StartTS);
- _ -> nop
- end,
- Res;
- {ok, Val0, _UpdModState} ->
- OldObj = object_from_binary(Bucket, Key, Val0),
+ update_vnode_stats(vnode_put, Idx, StartTS),
+ InnerRes;
+ {InnerRes, _Val} ->
+ InnerRes
+ end;
+ {ok, OldObj, _UpdModState} ->
%% Merge handoff values with the current - possibly discarding
%% if out of date. Ok to set VId/Starttime undefined as
%% they are not used for non-coordinating puts.
@@ -1339,18 +1356,16 @@ do_diffobj_put({Bucket, Key}, DiffObj,
false ->
IndexSpecs = []
end,
- ObjFmt = riak_core_capability:get({riak_kv, object_format}, v0),
- Val = riak_object:to_binary(ObjFmt, AMObj),
- Res = Mod:put(Bucket, Key, IndexSpecs, Val, ModState),
- case Res of
- {ok, _UpdModState} ->
- update_hashtree(Bucket, Key, Val, StateData),
+ case encode_and_put(AMObj, Mod, Bucket, Key,
+ IndexSpecs, ModState) of
+ {{ok, _UpdModState} = InnerRes, EncodedVal} ->
+ update_hashtree(Bucket, Key, EncodedVal, StateData),
update_index_write_stats(IndexBackend, IndexSpecs),
- update_vnode_stats(vnode_put, Idx, StartTS);
- _ ->
- nop
- end,
- Res
+ update_vnode_stats(vnode_put, Idx, StartTS),
+ InnerRes;
+ {InnerRes, _EncodedVal} ->
+ InnerRes
+ end
end
end.
@@ -1577,14 +1592,26 @@ encode_binary_object(Bucket, Key, Value) ->
return_encoded_binary_object(Method, EncodedObject) ->
term_to_binary({ Method, EncodedObject }).
-object_from_binary({B,K}, ValBin) ->
- object_from_binary(B, K, ValBin).
-object_from_binary(B, K, ValBin) ->
- case riak_object:from_binary(B, K, ValBin) of
- {error, R} -> throw(R);
- Obj -> Obj
+-spec encode_and_put(
+ Obj::riak_object:riak_object(), Mod::term(), Bucket::riak_object:bucket(),
+ Key::riak_object:key(), IndexSpecs::list(), ModState::term()) ->
+ {{ok, UpdModState::term()}, EncodedObj::binary()} |
+ {{error, Reason::term(), UpdModState::term()}, EncodedObj::binary()}.
+
+encode_and_put(Obj, Mod, Bucket, Key, IndexSpecs, ModState) ->
+ case uses_r_object(Mod, ModState, Bucket) of
+ true ->
+ Mod:put_object(Bucket, Key, IndexSpecs, Obj, ModState);
+ false ->
+ ObjFmt = riak_core_capability:get({riak_kv, object_format}, v0),
+ EncodedVal = riak_object:to_binary(ObjFmt, Obj),
+ {Mod:put(Bucket, Key, IndexSpecs, EncodedVal, ModState), EncodedVal}
end.
+uses_r_object(Mod, ModState, Bucket) ->
+ {ok, Capabilities} = Mod:capabilities(Bucket, ModState),
+ lists:member(uses_r_object, Capabilities).
+
-ifdef(TEST).
%% Check assigning a vnodeid twice in the same second
View
97 test/backend_eqc.erl
@@ -34,7 +34,12 @@
test/2,
test/3,
test/4,
- test/5]).
+ test/5,
+ property/1,
+ property/2,
+ property/3,
+ property/4,
+ property/5]).
%% eqc_fsm callbacks
-export([initial_state/0,
@@ -71,21 +76,40 @@
%% ====================================================================
test(Backend) ->
- test(Backend, false).
+ test2(property(Backend, false)).
test(Backend, Volatile) ->
- test(Backend, Volatile, []).
+ test2(property(Backend, Volatile, [])).
test(Backend, Volatile, Config) ->
- test(Backend, Volatile, Config, fun(BeState,_Olds) ->
- catch(Backend:stop(BeState)) end).
+ test2(property(Backend, Volatile, Config,
+ fun(BeState,_Olds) -> catch(Backend:stop(BeState)) end)).
test(Backend, Volatile, Config, Cleanup) ->
- test(Backend, Volatile, Config, Cleanup, ?TEST_ITERATIONS).
+ test2(property(Backend, Volatile, Config, Cleanup, ?TEST_ITERATIONS)).
test(Backend, Volatile, Config, Cleanup, NumTests) ->
- eqc:quickcheck(eqc:numtests(NumTests,
- prop_backend(Backend, Volatile, Config, Cleanup))).
+ test2(property(Backend, Volatile, Config, Cleanup, NumTests)).
+
+test2(Prop) ->
+ eqc:quickcheck(Prop).
+
+property(Backend) ->
+ property(Backend, false).
+
+property(Backend, Volatile) ->
+ property(Backend, Volatile, []).
+
+property(Backend, Volatile, Config) ->
+ property(Backend, Volatile, Config, fun(BeState,_Olds) ->
+ catch(Backend:stop(BeState)) end).
+
+property(Backend, Volatile, Config, Cleanup) ->
+ property(Backend, Volatile, Config, Cleanup, ?TEST_ITERATIONS).
+
+property(Backend, Volatile, Config, Cleanup, NumTests) ->
+ eqc:numtests(NumTests,
+ prop_backend(Backend, Volatile, Config, Cleanup)).
%% ====================================================================
%% eqc property
@@ -122,10 +146,24 @@ prop_backend(Backend, Volatile, Config, Cleanup) ->
%%====================================================================
bucket() ->
- elements([<<"b1">>,<<"b2">>,<<"b3">>,<<"b4">>]).
+ elements([<<"b1">>,<<"b2">>]).
+
+bucket(#qcst{backend=Backend}) ->
+ try
+ Backend:backend_eqc_bucket()
+ catch error:undef ->
+ bucket()
+ end.
key() ->
- elements([<<"k1">>,<<"k2">>,<<"k3">>,<<"k4">>]).
+ elements([<<"k1">>,<<"k2">>]).
+
+key(#qcst{backend=Backend}) ->
+ try
+ Backend:backend_eqc_key()
+ catch error:undef ->
+ key()
+ end.
val() ->
%% The creation of the riak object and the call
@@ -140,8 +178,11 @@ val() ->
g_opts() ->
frequency([{5, [async_fold]}, {2, []}]).
-fold_keys_opts() ->
- frequency([{5, [async_fold]}, {2, []}, {2, [{index, bucket(), index_query()}]}, {2, [{bucket, bucket()}]}]).
+fold_keys_opts(Q) ->
+ frequency([{5, [async_fold]},
+ {2, []},
+ {2, [{index, bucket(Q), index_query(Q)}]},
+ {2, [{bucket, bucket(Q)}]}]).
index_specs() ->
?LET(L, list(index_spec()), lists:usort(L)).
@@ -154,9 +195,9 @@ index_spec() ->
{remove, int_index(), int_posting()}
]).
-index_query() ->
+index_query(Q) ->
oneof([
- {eq, <<"$bucket">>, bucket()}, %% the bucket() in this query is ignored/transformed
+ {eq, <<"$bucket">>, bucket(Q)}, %% the bucket() in this query is ignored/transformed
range_query(<<"$key">>, key(), key()),
{eq, <<"$key">>, key()},
eq_query(),
@@ -349,7 +390,15 @@ next_state_data(_From, _To, S, _R, {call, _M, put, [Bucket, Key, IndexSpecs, Val
S#qcst{d = orddict:store({Bucket, Key}, Val, S#qcst.d),
i = update_indexes(Bucket, Key, IndexSpecs, S#qcst.i)};
next_state_data(_From, _To, S, _R, {call, _M, delete, [Bucket, Key|_]}) ->
- S#qcst{d = orddict:erase({Bucket, Key}, S#qcst.d),
+ D1 = orddict:erase({Bucket, Key}, S#qcst.d),
+ D2 = try
+ Backend = S#qcst.backend,
+ BE_c = S#qcst.c,
+ Backend:backend_eqc_filter_orddict_on_delete(Bucket, Key, D1, BE_c)
+ catch error:undef ->
+ D1
+ end,
+ S#qcst{d = D2,
i = remove_indexes(Bucket, Key, S#qcst.i)};
next_state_data(_From, _To, S, _R, {call, ?MODULE, drop, _}) ->
@@ -367,13 +416,13 @@ stopped(#qcst{backend=Backend,
running(#qcst{backend=Backend,
s=State,
- i=Indexes}) ->
+ i=Indexes}=Q) ->
[
- {history, {call, Backend, put, [bucket(), key(), index_specs(), val(), State]}},
- {history, {call, Backend, get, [bucket(), key(), State]}},
- {history, {call, ?MODULE, delete, [bucket(), key(), Backend, State, Indexes]}},
+ {history, {call, Backend, put, [bucket(Q), key(Q), index_specs(), val(), State]}},
+ {history, {call, Backend, get, [bucket(Q), key(Q), State]}},
+ {history, {call, ?MODULE, delete, [bucket(Q), key(Q), Backend, State, Indexes]}},
{history, {call, Backend, fold_buckets, [fold_buckets_fun(), get_fold_buffer(), g_opts(), State]}},
- {history, {call, Backend, fold_keys, [fold_keys_fun(), get_fold_buffer(), fold_keys_opts(), State]}},
+ {history, {call, Backend, fold_keys, [fold_keys_fun(), get_fold_buffer(), fold_keys_opts(Q), State]}},
{history, {call, Backend, fold_objects, [fold_objects_fun(), get_fold_buffer(), g_opts(), State]}},
{history, {call, Backend, is_empty, [State]}},
{stopped, {call, ?MODULE, drop, [Backend, State]}},
@@ -545,7 +594,13 @@ postcondition(_From, _To, S,
{ok, Buffer} ->
finish_fold(Buffer, From)
end,
- R = receive_fold_results([]),
+ R0 = receive_fold_results([]),
+ R = try
+ (S#qcst.backend):backend_eqc_fold_objects_transform(R0)
+ catch
+ error:undef ->
+ R0
+ end,
lists:sort(Objects) =:= lists:sort(R);
postcondition(_From, _To, S,{call, _M, is_empty, [_BeState]}, R) ->
R =:= (orddict:size(S#qcst.d) =:= 0);

0 comments on commit 146e3ca

Please sign in to comment.