Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Intermediate refactoring: isolate Mod:put() and Mod:get()

  • Loading branch information...
commit 44167e7c2752798041212a37aa58512af09bf60f 1 parent c836bca
@slfritchie slfritchie authored
Showing with 79 additions and 53 deletions.
  1. +79 −53 src/riak_kv_vnode.erl
View
132 src/riak_kv_vnode.erl
@@ -480,7 +480,7 @@ handle_command({hashtree_pid, Node}, _, State=#state{hashtrees=HT}) ->
{reply, {error, wrong_node}, State}
end;
handle_command({rehash, Bucket, Key}, _, State=#state{mod=Mod, modstate=ModState}) ->
- case do_get_binary({Bucket, Key}, Mod, ModState) of
+ case do_get_binary(Bucket, Key, Mod, ModState) of
{ok, Bin, _UpdModState} ->
update_hashtree(Bucket, Key, Bin, State);
_ ->
@@ -937,7 +937,7 @@ prepare_put(#state{idx=Idx,
prunetime=PruneTime},
IndexBackend) ->
GetReply =
- case Mod:get(Bucket, Key, ModState) of
+ case do_get_object(Bucket, Key, Mod, ModState) of
{error, not_found, _UpdModState} ->
ok;
% NOTE: bad_crc is NOT an official backend response. It is
@@ -947,8 +947,8 @@ prepare_put(#state{idx=Idx,
{error, bad_crc, _UpdModState} ->
lager:info("Bad CRC detected while reading Partition=~p, Bucket=~p, Key=~p", [Idx, Bucket, Key]),
ok;
- {ok, GetVal, _UpdModState} ->
- {ok, GetVal}
+ {ok, TheOldObj, _UpdModState} ->
+ {ok, TheOldObj}
end,
case GetReply of
ok ->
@@ -965,8 +965,7 @@ prepare_put(#state{idx=Idx,
RObj
end,
{{true, ObjToStore}, PutArgs#putargs{index_specs=IndexSpecs, is_index=IndexBackend}};
- {ok, Val} ->
- OldObj = object_from_binary(Bucket, Key, Val),
+ {ok, OldObj} ->
case put_merge(Coord, LWW, OldObj, RObj, VId, StartTime) of
{oldobj, OldObj1} ->
{{false, OldObj1}, PutArgs};
@@ -1014,32 +1013,29 @@ perform_put({true, Obj},
bkey={Bucket, Key},
reqid=ReqID,
index_specs=IndexSpecs}) ->
- ObjFmt = riak_core_capability:get({riak_kv, object_format}, v0),
- Val = riak_object:to_binary(ObjFmt, Obj),
- case Mod:put(Bucket, Key, IndexSpecs, Val, ModState) of
- {ok, UpdModState} ->
- update_hashtree(Bucket, Key, Val, State),
+ case encode_and_Mod_put(Obj, Mod, Bucket, Key, IndexSpecs, ModState) of
+ {{ok, UpdModState}, EncodedVal} ->
+ update_hashtree(Bucket, Key, EncodedVal, State),
case RB of
true ->
Reply = {dw, Idx, Obj, ReqID};
false ->
Reply = {dw, Idx, ReqID}
end;
- {error, _Reason, UpdModState} ->
+ {{error, _Reason, UpdModState}, _EncodedVal} ->
Reply = {fail, Idx, ReqID}
end,
{Reply, State#state{modstate=UpdModState}}.
do_reformat({Bucket, Key}=BKey, State=#state{mod=Mod, modstate=ModState}) ->
- case Mod:get(Bucket, Key, ModState) of
+ case do_get_object(Bucket, Key, Mod, ModState) of
{error, not_found, _UpdModState} ->
Reply = {error, not_found},
UpdState = State;
- {ok, ObjBin, _UpdModState} ->
+ {ok, RObj, _UpdModState} ->
%% since it is assumed capabilities have been properly set
%% to the desired version, to reformat, all we need to do
%% is submit a new write
- RObj = riak_object:from_binary(Bucket, Key, ObjBin),
PutArgs = #putargs{returnbody=false,
bkey=BKey,
reqid=undefined,
@@ -1118,10 +1114,10 @@ do_get(_Sender, BKey, ReqID,
{reply, {r, Retval, Idx, ReqID}, State}.
%% @private
-do_get_term(BKey, Mod, ModState) ->
- case do_get_binary(BKey, Mod, ModState) of
- {ok, Bin, _UpdModState} ->
- {ok, object_from_binary(BKey, Bin)};
+do_get_term({Bucket, Key}, Mod, ModState) ->
+ case do_get_object(Bucket, Key, Mod, ModState) of
+ {ok, Obj, _UpdModState} ->
+ {ok, Obj};
%% @TODO Eventually it would be good to
%% make the use of not_found or notfound
%% consistent throughout the code.
@@ -1133,8 +1129,31 @@ do_get_term(BKey, Mod, ModState) ->
Err
end.
-do_get_binary({Bucket, Key}, Mod, ModState) ->
- Mod:get(Bucket, Key, ModState).
+do_get_binary(Bucket, Key, Mod, ModState) ->
+ case mod_capability_uses_r_object(Mod, ModState, Bucket) of
+ true ->
+ Mod:get_object(Bucket, Key, true, ModState);
+ false ->
+ Mod:get(Bucket, Key, ModState)
+ end.
+
+do_get_object(Bucket, Key, Mod, ModState) ->
+ case mod_capability_uses_r_object(Mod, ModState, Bucket) of
+ true ->
+ Mod:get_object(Bucket, Key, false, ModState);
+ false ->
+ case do_get_binary(Bucket, Key, Mod, ModState) of
+ {ok, ObjBin, _UpdModState} ->
+ case riak_object:from_binary(Bucket, Key, ObjBin) of
+ {error, R} ->
+ throw(R);
+ RObj ->
+ {ok, RObj, _UpdModState}
+ end;
+ Else ->
+ Else
+ end
+ end.
%% @private
%% @doc This is a generic function for operations that involve
@@ -1289,9 +1308,9 @@ do_get_vclocks(KeyList,_State=#state{mod=Mod,modstate=ModState}) ->
[{BKey, do_get_vclock(BKey,Mod,ModState)} || BKey <- KeyList].
%% @private
do_get_vclock({Bucket, Key}, Mod, ModState) ->
- case Mod:get(Bucket, Key, ModState) of
+ case do_get_object(Bucket, Key, Mod, ModState) of
{error, not_found, _UpdModState} -> vclock:fresh();
- {ok, Val, _UpdModState} -> riak_object:vclock(object_from_binary(Bucket, Key, Val))
+ {ok, Obj, _UpdModState} -> riak_object:vclock(Obj)
end.
%% @private
@@ -1303,7 +1322,7 @@ do_diffobj_put({Bucket, Key}, DiffObj,
StartTS = os:timestamp(),
{ok, Capabilities} = Mod:capabilities(Bucket, ModState),
IndexBackend = lists:member(indexes, Capabilities),
- case Mod:get(Bucket, Key, ModState) of
+ case do_get_object(Bucket, Key, Mod, ModState) of
{error, not_found, _UpdModState} ->
case IndexBackend of
true ->
@@ -1311,19 +1330,17 @@ do_diffobj_put({Bucket, Key}, DiffObj,
false ->
IndexSpecs = []
end,
- ObjFmt = riak_core_capability:get({riak_kv, object_format}, v0),
- Val = riak_object:to_binary(ObjFmt, DiffObj),
- Res = Mod:put(Bucket, Key, IndexSpecs, Val, ModState),
- case Res of
- {ok, _UpdModState} ->
- update_hashtree(Bucket, Key, Val, StateData),
+ case encode_and_Mod_put(DiffObj, Mod, Bucket, Key,
+ IndexSpecs, ModState) of
+ {{ok, _UpdModState} = InnerRes, EncodedVal} ->
+ update_hashtree(Bucket, Key, EncodedVal, StateData),
update_index_write_stats(IndexBackend, IndexSpecs),
- update_vnode_stats(vnode_put, Idx, StartTS);
- _ -> nop
- end,
- Res;
- {ok, Val0, _UpdModState} ->
- OldObj = object_from_binary(Bucket, Key, Val0),
+ update_vnode_stats(vnode_put, Idx, StartTS),
+ InnerRes;
+ {InnerRes, _Val} ->
+ InnerRes
+ end;
+ {ok, OldObj, _UpdModState} ->
%% Merge handoff values with the current - possibly discarding
%% if out of date. Ok to set VId/Starttime undefined as
%% they are not used for non-coordinating puts.
@@ -1338,18 +1355,16 @@ do_diffobj_put({Bucket, Key}, DiffObj,
false ->
IndexSpecs = []
end,
- ObjFmt = riak_core_capability:get({riak_kv, object_format}, v0),
- Val = riak_object:to_binary(ObjFmt, AMObj),
- Res = Mod:put(Bucket, Key, IndexSpecs, Val, ModState),
- case Res of
- {ok, _UpdModState} ->
- update_hashtree(Bucket, Key, Val, StateData),
+ case encode_and_Mod_put(AMObj, Mod, Bucket, Key,
+ IndexSpecs, ModState) of
+ {{ok, _UpdModState} = InnerRes, EncodedVal} ->
+ update_hashtree(Bucket, Key, EncodedVal, StateData),
update_index_write_stats(IndexBackend, IndexSpecs),
- update_vnode_stats(vnode_put, Idx, StartTS);
- _ ->
- nop
- end,
- Res
+ update_vnode_stats(vnode_put, Idx, StartTS),
+ InnerRes;
+ {InnerRes, _EncodedVal} ->
+ InnerRes
+ end
end
end.
@@ -1526,14 +1541,25 @@ object_info({Bucket, _Key}=BKey) ->
Hash = riak_core_util:chash_key(BKey),
{Bucket, Hash}.
-object_from_binary({B,K}, ValBin) ->
- object_from_binary(B, K, ValBin).
-object_from_binary(B, K, ValBin) ->
- case riak_object:from_binary(B, K, ValBin) of
- {error, R} -> throw(R);
- Obj -> Obj
+-spec encode_and_Mod_put(
+ Obj::riak_object:object(), Mod::term(), Bucket::riak_object:bucket(),
+ Key::riak_object:key(), IndexSpecs::list(), ModState::term()) ->
+ {{ok, UpdModState::term()}, EncodedObj::binary()} |
+ {{error, Reason::term(), UpdModState::term()}, EncodedObj::binary()}.
+
+encode_and_Mod_put(Obj, Mod, Bucket, Key, IndexSpecs, ModState) ->
+ case mod_capability_uses_r_object(Mod, ModState, Bucket) of
+ true ->
+ Mod:put_object(Bucket, Key, IndexSpecs, Obj, ModState);
+ false ->
+ ObjFmt = riak_core_capability:get({riak_kv, object_format}, v0),
+ EncodedVal = riak_object:to_binary(ObjFmt, Obj),
+ {Mod:put(Bucket, Key, IndexSpecs, EncodedVal, ModState), EncodedVal}
end.
+mod_capability_uses_r_object(Mod, ModState, Bucket) ->
+ {ok, Capabilities} = Mod:capabilities(Bucket, ModState),
+ lists:member(uses_r_object, Capabilities).
-ifdef(TEST).
Please sign in to comment.
Something went wrong with that request. Please try again.