Permalink
Browse files

Add a bunch of terrible, terrible hacks for debugging deletes

DO NOT EVER MERGE THIS BRANCH INTO ANYTHING EVER!
  • Loading branch information...
1 parent 0d6537d commit f51c48895f7eb8b4a50301d6c4d0acce90b73f92 @Vagabond Vagabond committed May 6, 2011
Showing with 139 additions and 27 deletions.
  1. +6 −1 include/riak_kv_vnode.hrl
  2. +33 −3 src/riak_client.erl
  3. +20 −8 src/riak_kv_delete.erl
  4. +22 −4 src/riak_kv_get_fsm.erl
  5. +1 −1 src/riak_kv_put_fsm.erl
  6. +57 −10 src/riak_kv_vnode.erl
@@ -29,6 +29,11 @@
bkey :: {binary(), binary()},
req_id :: non_neg_integer()}).
+-record(riak_kv_delete_req_v2, {
+ bkey :: {binary(), binary()},
+ vclock :: term(),
+ req_id :: non_neg_integer()}).
+
-record(riak_kv_map_req_v1, {
bkey :: {binary(), binary()},
qterm :: term(),
@@ -43,6 +48,6 @@
-define(KV_GET_REQ, #riak_kv_get_req_v1).
-define(KV_MGET_REQ, #riak_kv_mget_req_v1).
-define(KV_LISTKEYS_REQ, #riak_kv_listkeys_req_v2).
--define(KV_DELETE_REQ, #riak_kv_delete_req_v1).
+-define(KV_DELETE_REQ, #riak_kv_delete_req_v2).
-define(KV_MAP_REQ, #riak_kv_map_req_v1).
-define(KV_VCLOCK_REQ, #riak_kv_vclock_req_v1).
View
@@ -231,7 +231,20 @@ get(Bucket, Key) ->
%% have responded with a value or error.
get(Bucket, Key, Options) when is_list(Options) ->
Me = self(),
- ReqId = mk_reqid(),
+ ReqIdIn = case get(iteration) of
+ undefined ->
+ mk_reqid();
+ N ->
+ 100+N
+ end,
+
+ case get(delete) of
+ true ->
+ ReqId = ReqIdIn + 6000000000;
+ _ ->
+ ReqId = ReqIdIn
+ end,
+
riak_kv_get_fsm_sup:start_get_fsm(Node, [{raw, ReqId, Me}, Bucket, Key, Options]),
%% TODO: Investigate adding a monitor here and eliminating the timeout.
Timeout = recv_timeout(Options),
@@ -292,7 +305,19 @@ put(RObj) -> THIS:put(RObj, []).
put(RObj, Options) when is_list(Options) ->
UpdObj = riak_object:increment_vclock(RObj, ClientId),
Me = self(),
- ReqId = mk_reqid(),
+ ReqIdIn = case get(iteration) of
+ undefined ->
+ mk_reqid();
+ N ->
+ 200+N
+ end,
+
+ case get(delete) of
+ true ->
+ ReqId = ReqIdIn + 8000000000;
+ _ ->
+ ReqId = ReqIdIn
+ end,
riak_kv_put_fsm_sup:start_put_fsm(Node, [{raw, ReqId, Me}, UpdObj, Options]),
%% TODO: Investigate adding a monitor here and eliminating the timeout.
Timeout = recv_timeout(Options),
@@ -378,7 +403,12 @@ delete(Bucket,Key,RW) -> delete(Bucket,Key,RW,?DEFAULT_TIMEOUT).
%% nodes have responded with a value or error, or TimeoutMillisecs passes.
delete(Bucket,Key,RW,Timeout) ->
Me = self(),
- ReqId = mk_reqid(),
+ case get(iteration) of
+ undefined ->
+ ReqId = mk_reqid();
+ N ->
+ ReqId = N
+ end,
riak_kv_delete_sup:start_delete(Node, [ReqId, Bucket, Key, RW, Timeout, Me]),
wait_for_reqid(ReqId, Timeout).
View
@@ -44,36 +44,48 @@ delete(ReqId,Bucket,Key,RW0,Timeout,Client) ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
BucketProps = riak_core_bucket:get_bucket(Bucket, Ring),
N = proplists:get_value(n_val,BucketProps),
+ put(iteration, ReqId),
case riak_kv_util:expand_rw_value(rw, RW0, BucketProps, N) of
error ->
Client ! {ReqId, {error, {rw_val_violation, RW0}}};
RW ->
{ok,C} = riak:local_client(),
+ put(delete, true),
case C:get(Bucket,Key,RW,Timeout) of
{ok, OrigObj} ->
RemainingTime = Timeout - (riak_core_util:moment() - RealStartTime),
OrigMD = hd([MD || {MD,_V} <- riak_object:get_contents(OrigObj)]),
NewObj = riak_object:update_metadata(OrigObj,
- dict:store(<<"X-Riak-Deleted">>, "true", OrigMD)),
+ dict:store(<<"X-Riak-Deleted-Ref">>,
+ term_to_binary(make_ref()),
+ dict:store(<<"X-Riak-Deleted">>,
+ "true",
+ OrigMD))),
Reply = C:put(NewObj, RW, RW, RemainingTime),
+ %io:format("delete response is ~p~n", [Reply]),
Client ! {ReqId, Reply},
case Reply of
- ok -> reap(Bucket,Key,RemainingTime);
+ ok ->
+ C:get(Bucket, Key, N, RemainingTime);
_ -> nop
end;
+ %Client ! {ReqId, Reply};
{error, notfound} ->
Client ! {ReqId, {error, notfound}};
X ->
Client ! {ReqId, X}
end
end.
-reap(Bucket, Key, Timeout) ->
- {ok,C} = riak:local_client(),
- {ok, Ring} = riak_core_ring_manager:get_my_ring(),
- BucketProps = riak_core_bucket:get_bucket(Bucket, Ring),
- N = proplists:get_value(n_val,BucketProps),
- C:get(Bucket,Key,N,Timeout).
+%reap(Bucket, Key, Timeout) ->
+ %io:format("reap start~n"),
+ %{ok,C} = riak:local_client(),
+ %{ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ %BucketProps = riak_core_bucket:get_bucket(Bucket, Ring),
+ %N = proplists:get_value(n_val,BucketProps),
+ %Res = C:get(Bucket,Key,N,Timeout),
+ %io:format("reap done~n"),
+ %Res.
%% ===================================================================
View
@@ -247,6 +247,7 @@ handle_sync_event(_Event, _From, _StateName, StateData) ->
%% @private
handle_info(request_timeout, StateName, StateData) ->
+ %io:format("get FSM timeout ~p~n", [StateName]),
?MODULE:StateName(request_timeout, StateData);
%% @private
handle_info(_Info, _StateName, StateData) ->
@@ -343,6 +344,12 @@ maybe_finalize_delete(_StateData=#state{replied_notfound=NotFound,n=N,
replied_r=RepliedR,
preflist2=Sent,req_id=ReqId,
bkey=BKey}) ->
+ %Uniq = lists:usort([RObj || {RObj,_Idx} <- RepliedR]),
+ %io:format("replied_r length ~p; ~p unique~n", [length(RepliedR), length(Uniq)]),
+ %io:format("replied_r ~p~n", [RepliedR]),
+ DelObj = riak_object:reconcile([RObj || {RObj,_Idx} <- RepliedR], false),
+ VClock = dict:fetch(<<"X-Riak-Deleted-Ref">>,
+ riak_object:get_metadata(DelObj)),
IdealNodes = [{I,Node} || {{I,Node},primary} <- Sent],
case length(IdealNodes) of
N -> % this means we sent to a perfect preflist
@@ -351,7 +358,10 @@ maybe_finalize_delete(_StateData=#state{replied_notfound=NotFound,n=N,
case lists:all(fun(X) -> riak_kv_util:is_x_deleted(X) end,
[O || {O,_I} <- RepliedR]) of
true -> % and every response was X-Deleted, go!
- riak_kv_vnode:del(IdealNodes, BKey, ReqId);
+ Res = riak_kv_vnode:del(IdealNodes, BKey, VClock,
+ ReqId + 2000000000),
+ %io:format("vnode del ~p~n", [Res]),
+ Res;
_ -> nop
end;
_ -> nop
@@ -365,6 +375,7 @@ maybe_do_read_repair(Sent,Final,RepliedR,NotFound,BKey,ReqId,StartTime,BucketPro
case Targets of
[] -> nop;
_ ->
+ %io:format("read repair~n"),
RepairPreflist = [{Idx, Node} || {{Idx,Node},_Type} <- Sent,
lists:member(Idx, Targets)],
riak_kv_vnode:readrepair(RepairPreflist, BKey, FinalRObj, ReqId,
@@ -404,6 +415,7 @@ respond(VResponses,AllowMult) ->
Merged = merge(VResponses, AllowMult),
case Merged of
tombstone ->
+ %io:format("tombstone!~n"),
Reply = {error,notfound};
{error, notfound} ->
Reply = Merged;
@@ -463,26 +475,32 @@ client_info([vnodes | Rest], StateData = #state{num_r = NumOks,
[{vnode_errors, Errors} | Oks]
end,
client_info(Rest, StateData, Info ++ Acc);
+client_info([vclock | Rest], #state{replied_r=[]} = StateData, Acc) ->
+ client_info(Rest, StateData, Acc);
+client_info([vclock | Rest], #state{replied_r=RepliedR} = StateData, Acc) ->
+ Obj = riak_object:reconcile([RObj || {RObj,_Idx} <- RepliedR], false),
+ client_info(Rest, StateData, [{vclock, riak_object:vclock(Obj)} | Acc]);
client_info([Unknown | Rest], StateData, Acc) ->
client_info(Rest, StateData, [{Unknown, unknown_detail} | Acc]).
details() ->
[timing,
- vnodes].
+ vnodes,
+ vclock].
-ifdef(TEST).
-define(expect_msg(Exp,Timeout),
?assertEqual(Exp, receive Exp -> Exp after Timeout -> timeout end)).
get_fsm_test_() ->
- {spawn, [{ setup,
+ {timeout, 30, {spawn, [{ setup,
fun setup/0,
fun cleanup/1,
[
fun happy_path_case/0,
fun n_val_violation_case/0
]
- }]}.
+ }]}}.
setup() ->
%% Set infinity timeout for the vnode inactivity timer so it does not
View
@@ -621,7 +621,7 @@ client_info([timing | Rest], StateData = #state{timing = Timing}, Info) ->
{stages, Stages} | Info]).
default_details() ->
- [timing].
+ [timing, vclock].
%% Add timing information to the state
View
@@ -30,7 +30,7 @@
-export([start_vnode/1,
get/3,
mget/3,
- del/3,
+ del/4,
put/6,
readrepair/6,
list_keys/4,
@@ -105,9 +105,10 @@ mget(Preflist, BKeys, ReqId) ->
Req,
riak_kv_vnode_master).
-del(Preflist, BKey, ReqId) ->
+del(Preflist, BKey, VClock, ReqId) ->
riak_core_vnode_master:command(Preflist,
?KV_DELETE_REQ{bkey=BKey,
+ vclock=VClock,
req_id=ReqId},
riak_kv_vnode_master).
@@ -168,12 +169,14 @@ handle_command(?KV_PUT_REQ{bkey=BKey,
start_time=StartTime,
options=Options},
Sender, State=#state{idx=Idx}) ->
+ io:format("~p Backend command put ~p ~n", [self(), ReqId]),
riak_kv_mapred_cache:eject(BKey),
riak_core_vnode:reply(Sender, {w, Idx, ReqId}),
do_put(Sender, BKey, Object, ReqId, StartTime, Options, State),
{noreply, State};
handle_command(?KV_GET_REQ{bkey=BKey,req_id=ReqId},Sender,State) ->
+ io:format("~p Backend command get ~p ~n", [self(), ReqId]),
do_get(Sender, BKey, ReqId, State);
handle_command(?KV_MGET_REQ{bkeys=BKeys, req_id=ReqId, from=From}, _Sender, State) ->
do_mget(From, BKeys, ReqId, State);
@@ -184,16 +187,48 @@ handle_command(?KV_LISTKEYS_REQ{bucket=Bucket, req_id=ReqId, caller=Caller}, _Se
State=#state{mod=Mod, modstate=ModState, idx=Idx}) ->
do_list_keys(Caller,ReqId,Bucket,Idx,Mod,ModState),
{noreply, State};
-
-handle_command(?KV_DELETE_REQ{bkey=BKey, req_id=ReqId}, _Sender,
+handle_command(#riak_kv_delete_req_v1{bkey=BKey, req_id=ReqId}, _Sender,
State=#state{mod=Mod, modstate=ModState,
idx=Idx}) ->
riak_kv_mapred_cache:eject(BKey),
- case Mod:delete(ModState, BKey) of
- ok ->
- {reply, {del, Idx, ReqId}, State};
- {error, _Reason} ->
- {reply, {fail, Idx, ReqId}, State}
+ Res = do_delete(BKey, Mod, ModState),
+ {reply, {Res, Idx, ReqId}, State};
+handle_command(?KV_DELETE_REQ{bkey=BKey, req_id=ReqId, vclock=VClock}, _Sender,
+ State=#state{mod=Mod, modstate=ModState,
+ idx=Idx}) ->
+ io:format("~p Backend command delete ~p ~n", [self(), ReqId]),
+ case do_get_term(BKey, Mod, ModState) of
+ {ok, Obj} ->
+ {ObjVClock, Printable} = case dict:find(<<"X-Riak-Deleted-Ref">>,
+ riak_object:get_metadata(Obj)) of
+ {ok, X} ->
+ {X, binary_to_term(X)};
+ _ ->
+ {undefined, undefined}
+ end,
+ io:format("~p Asked to delete ~p, got ~p ~p~n",
+ [self(), binary_to_term(VClock), Printable, ReqId]),
+ %% Is this an object an ancestor of the deleted object?
+ case VClock == ObjVClock of
+ true ->
+ %io:format("Vclock: ~p ObjVclock ~p~n",
+ %[binary_to_term(VClock), binary_to_term(ObjVClock)]),
+ %io:format("is a descendant ~p~n", [VClock == ObjVClock]),
+ %io:format("object ~p~n",
+ %[binary_to_term(riak_object:get_value(Obj))]),
+ %% Object is the same or older, we can delete
+ Res = do_delete(BKey, Mod, ModState),
+ %io:format("~p Delete result is ~p ~p~n", [self(), Res, ReqId]),
+ {reply, {Res, Idx, ReqId}, State};
+ false ->
+ io:format("~p Delete ref did not match ~p ~p ~p~n",
+ [self(), binary_to_term(VClock), Printable, ReqId]),
+ %io:format("is NOT a descendant~n"),
+ {reply, {changed, Idx, ReqId}, State}
+ end;
+ _ ->
+ io:format("~p Object not present ~p~n", [self(), ReqId]),
+ {reply, {notfound, Idx, ReqId}, State}
end;
handle_command(?KV_VCLOCK_REQ{bkeys=BKeys}, _Sender, State) ->
{reply, do_get_vclocks(BKeys, State), State};
@@ -232,6 +267,7 @@ handle_command({mapexec_reply, JobId, Result}, _Sender, #state{mrjobs=Jobs}=Stat
handle_handoff_command(Req=?FOLD_REQ{}, Sender, State) ->
handle_command(Req, Sender, State);
handle_handoff_command(Req={backend_callback, _Ref, _Msg}, Sender, State) ->
+ io:format("~p Backend command ~p ~n", [self(), element(1, Req)]),
handle_command(Req, Sender, State);
handle_handoff_command(_Req, _Sender, State) -> {forward, State}.
@@ -345,7 +381,7 @@ perform_put({false, _Obj}, #state{idx=Idx}, #putargs{returnbody=false,reqid=ReqI
perform_put({true, Obj}, #state{idx=Idx,mod=Mod,modstate=ModState},
#putargs{returnbody=RB, bkey=BKey, reqid=ReqID}) ->
Val = term_to_binary(Obj),
- case Mod:put(ModState, BKey, Val) of
+ case Mod:put(ModState, BKey, Val) of
ok ->
case RB of
true -> {dw, Idx, Obj, ReqID};
@@ -392,6 +428,8 @@ syntactic_put_merge(Mod, ModState, BKey, Obj1, ReqId, StartTime) ->
Obj0 = binary_to_term(Val0),
ResObj = riak_object:syntactic_merge(
Obj0,Obj1,term_to_binary(ReqId), StartTime),
+ io:format("VClock compare ~p ~n ~p ~n ~p ~n", [riak_object:vclock(Obj0),
+ riak_object:vclock(Obj1), riak_object:vclock(ResObj)]),
case riak_object:vclock(ResObj) =:= riak_object:vclock(Obj0) of
true -> {oldobj, ResObj};
false -> {newobj, ResObj}
@@ -475,6 +513,15 @@ do_list_keys(Caller,ReqId,Bucket,Idx,Mod,ModState) ->
Caller ! {ReqId, Idx, done}.
%% @private
+do_delete(BKey, Mod, ModState) ->
+ case Mod:delete(ModState, BKey) of
+ ok ->
+ del;
+ {error, _Reason} ->
+ fail
+ end.
+
+%% @private
process_keys(Caller, ReqId, Idx, '_', {Bucket, _K}, Acc) ->
%% Bucket='_' means "list buckets" instead of "list keys"
buffer_key_result(Caller, ReqId, Idx, [Bucket|Acc]);

0 comments on commit f51c488

Please sign in to comment.