Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

use capabilities to determine when to use new object format

* register {riak_kv, object_format} capability
* writes should only be serialized to v1 format when it is
  supported accross the cluster
* before sending data between nodes (handoff) the binary format
  should be changed to the current version supported accross the
  cluster. This is to ensure an older node does not receive data
  in the new format after being downgraded
* robj_to_binary -> to_binary. it also takes an explicit version
* binary_to_robj -> from_binary
* to_binary_version/4 naively converts between binary formats.
  If/when we have future versions this function can be "smarter"
  • Loading branch information...
commit 17815c28adde3dfd4485080fab9a5b26812116f8 1 parent efe5fac
@jrwest jrwest authored
View
4 src/riak_kv_app.erl
@@ -131,6 +131,10 @@ start(_Type, _StartArgs) ->
mapred_2i_pipe,
[{true, true}, {false, false}]}),
+ riak_core_capability:register({riak_kv, object_format},
+ [v1, v0],
+ v0),
+
%% Go ahead and mark the riak_kv service as up in the node watcher.
%% The riak_core_ring_handler blocks until all vnodes have been started
%% synchronously.
View
22 src/riak_kv_vnode.erl
@@ -528,8 +528,8 @@ handoff_finished(_TargetNode, State) ->
handle_handoff_data(BinObj, State) ->
PBObj = riak_core_pb:decode_riakobject_pb(zlib:unzip(BinObj)),
- BKey = {PBObj#riakobject_pb.bucket,PBObj#riakobject_pb.key},
- case do_diffobj_put(BKey, binary_to_term(PBObj#riakobject_pb.val), State) of
+ {B, K} = BKey = {PBObj#riakobject_pb.bucket,PBObj#riakobject_pb.key},
+ case do_diffobj_put(BKey, riak_object:from_binary(B, K, PBObj#riakobject_pb.val), State) of
{ok, UpdModState} ->
{reply, ok, State#state{modstate=UpdModState}};
{error, Reason, UpdModState} ->
@@ -539,8 +539,13 @@ handle_handoff_data(BinObj, State) ->
end.
encode_handoff_item({B, K}, V) ->
+ %% before sending data to another node change binary version
+ %% to one supported by the cluster. This way we don't send
+ %% unsupported formats to old nodes
+ ObjFmt = riak_core_capability:get({riak_kv, object_format}, v0),
+ Val = riak_object:to_binary_version(ObjFmt, B, K, V),
zlib:zip(riak_core_pb:encode_riakobject_pb(
- #riakobject_pb{bucket=B, key=K, val=V})).
+ #riakobject_pb{bucket=B, key=K, val=Val})).
is_empty(State=#state{mod=Mod, modstate=ModState}) ->
{Mod:is_empty(ModState), State}.
@@ -738,7 +743,8 @@ perform_put({true, Obj},
bkey={Bucket, Key},
reqid=ReqID,
index_specs=IndexSpecs}) ->
- Val = riak_object:robj_to_binary(Obj),
+ ObjFmt = riak_core_capability:get({riak_kv, object_format}, v0),
+ Val = riak_object:to_binary(ObjFmt, Obj),
case Mod:put(Bucket, Key, IndexSpecs, Val, ModState) of
{ok, UpdModState} ->
case RB of
@@ -1010,7 +1016,8 @@ do_diffobj_put({Bucket, Key}, DiffObj,
false ->
IndexSpecs = []
end,
- Val = riak_object:robj_to_binary(DiffObj),
+ ObjFmt = riak_core_capability:get({riak_kv, object_format}, v0),
+ Val = riak_object:to_binary(ObjFmt, DiffObj),
Res = Mod:put(Bucket, Key, IndexSpecs, Val, ModState),
case Res of
{ok, _UpdModState} ->
@@ -1035,7 +1042,8 @@ do_diffobj_put({Bucket, Key}, DiffObj,
false ->
IndexSpecs = []
end,
- Val = riak_object:robj_to_binary(AMObj),
+ ObjFmt = riak_core_capability:get({riak_kv, object_format}, v0),
+ Val = riak_object:to_binary(ObjFmt, AMObj),
Res = Mod:put(Bucket, Key, IndexSpecs, Val, ModState),
case Res of
{ok, _UpdModState} ->
@@ -1194,7 +1202,7 @@ object_info({Bucket, _Key}=BKey) ->
object_from_binary({B,K}, ValBin) ->
object_from_binary(B, K, ValBin).
object_from_binary(B, K, ValBin) ->
- case riak_object:binary_to_robj(B, K, ValBin) of
+ case riak_object:from_binary(B, K, ValBin) of
{error, R} -> throw(R);
Obj -> Obj
end.
View
25 src/riak_object.erl
@@ -71,6 +71,7 @@
-type r_object_bin() :: binary().
-type r_content_bin() :: binary().
+-type r_object_vsn() :: v0 | v1.
%% -type rfc1123_date() :: string(). % LastMod Date
-define(LASTMOD_LEN, 29). %% static length of rfc1123_date() type. Hard-coded in Erlang.
@@ -88,18 +89,28 @@
-export([to_json/1, from_json/1]).
-export([index_specs/1, diff_index_specs/2]).
-export([set_contents/2, set_vclock/2]). %% INTERNAL, only for riak_*
--export([robj_to_binary/1, binary_to_robj/3]).
+-export([to_binary/2, from_binary/3, to_binary_version/4]).
%% @doc Convert riak object to binary form
--spec robj_to_binary(#r_object{}) -> r_object_bin().
-robj_to_binary(#r_object{contents=Contents, vclock=VClock}) ->
+-spec to_binary(r_object_vsn(), #r_object{}) -> r_object_bin().
+to_binary(v0, RObj) ->
+ term_to_binary(RObj);
+to_binary(v1, #r_object{contents=Contents, vclock=VClock}) ->
new_v1(VClock, Contents).
+-spec to_binary_version(r_object_vsn(), bucket(), key(), r_object_bin()) -> r_object_bin().
+to_binary_version(v0, _, _, <<131,_/binary>>=Bin) ->
+ Bin;
+to_binary_version(v1, _, _, <<?MAGIC:8/integer, 1:8/integer, _/binary>>=Bin) ->
+ Bin;
+to_binary_version(Vsn, B, K, Bin) ->
+ to_binary(Vsn, from_binary(B, K, Bin)).
+
%% @doc Convert binary object to riak object
--spec binary_to_robj(bucket(),key(),binary()) -> #r_object{} | {error, atom()}.
-binary_to_robj(_B,_K,<<131, _Rest/binary>>=ObjTerm) ->
+-spec from_binary(bucket(),key(),binary()) -> #r_object{} | {error, atom()}.
+from_binary(_B,_K,<<131, _Rest/binary>>=ObjTerm) ->
binary_to_term(ObjTerm);
-binary_to_robj(B,K,<<?MAGIC:8/integer, 1:8/integer, Rest/binary>>=_ObjBin) ->
+from_binary(B,K,<<?MAGIC:8/integer, 1:8/integer, Rest/binary>>=_ObjBin) ->
%% Version 1 of binary riak object
case Rest of
<<VclockLen:32/integer, VclockBin:VclockLen/binary, SibCount:32/integer, SibsBin/binary>> ->
@@ -109,7 +120,7 @@ binary_to_robj(B,K,<<?MAGIC:8/integer, 1:8/integer, Rest/binary>>=_ObjBin) ->
_Other ->
{error, bad_object_format}
end;
-binary_to_robj(_B, _K, <<?MAGIC, _Ver, _Rest/binary>>=_ObjBin) ->
+from_binary(_B, _K, <<?MAGIC, _Ver, _Rest/binary>>=_ObjBin) ->
{error, unknown_version}.
sibs_of_binary(Count,SibsBin) ->
View
30 test/riak_object_eqc.erl
@@ -30,21 +30,29 @@
-compile(export_all).
-riak_object_bin() ->
- ?LET(Obj,
- fsm_eqc_util:riak_object(),
- {riak_object:bucket(Obj),
- riak_object:key(Obj),
- riak_object:robj_to_binary(Obj)}).
-
roundtrip_eqc_test_() ->
- Res = eqc:quickcheck(numtests(1000, ?QC_OUT(prop_serialize_deserialize()))),
+ Res = eqc:quickcheck(numtests(1000, ?QC_OUT(prop_roundtrip()))),
?_assertEqual(true, Res).
-prop_serialize_deserialize() ->
- ?FORALL({B,K,ObjBin},
+%% deserializing a binary representation of a riak_object and
+%% reserializing it for the same version should result in the same
+%% binary
+prop_roundtrip() ->
+ ?FORALL({B,K,ObjBin,BinVsn},
riak_object_bin(),
- ObjBin =:= riak_object:robj_to_binary(riak_object:binary_to_robj(B,K,ObjBin))).
+ collect(BinVsn,
+ ObjBin =:=
+ riak_object:to_binary(BinVsn, riak_object:from_binary(B,K,ObjBin)))).
+
+riak_object_bin() ->
+ ?LET({Obj, Vsn},
+ {fsm_eqc_util:riak_object(), binary_version()},
+ {riak_object:bucket(Obj),
+ riak_object:key(Obj),
+ riak_object:to_binary(Vsn, Obj),
+ Vsn}).
+binary_version() ->
+ oneof([v0, v1]).
-endif. %% EQC
Please sign in to comment.
Something went wrong with that request. Please try again.