Permalink
Browse files

Add trace statements to get/put FSMs & vnode

Also, add new 'trace' option to riak_client to write get/put traces to a
file.
  • Loading branch information...
1 parent fd48a7c commit f1d106a05eb2a6b104125fa18c5b0ebf46517059 @Vagabond Vagabond committed Oct 27, 2011
Showing with 154 additions and 29 deletions.
  1. +37 −2 src/riak_client.erl
  2. +24 −6 src/riak_kv_get_fsm.erl
  3. +16 −4 src/riak_kv_put_core.erl
  4. +61 −11 src/riak_kv_put_fsm.erl
  5. +16 −6 src/riak_kv_vnode.erl
View
@@ -234,10 +234,27 @@ get(Bucket, Key) ->
get(Bucket, Key, Options) when is_list(Options) ->
Me = self(),
ReqId = mk_reqid(),
+ case proplists:get_value(trace, Options, false) of
+ true ->
+ {ok, Trace} = lager:trace_file("log/request.log", [{reqid, ReqId}]);
+ File when is_list(File) ->
+ {ok, Trace} = lager:trace_file(File, [{reqid, ReqId}]);
+ _ ->
+ Trace = undefined,
+ ok
+ end,
riak_kv_get_fsm_sup:start_get_fsm(Node, [{raw, ReqId, Me}, Bucket, Key, Options]),
%% TODO: Investigate adding a monitor here and eliminating the timeout.
Timeout = recv_timeout(Options),
- wait_for_reqid(ReqId, Timeout);
+ Res = wait_for_reqid(ReqId, Timeout),
+ %% remove the trace
+ case Trace of
+ undefined ->
+ Res;
+ TraceExpr ->
+ lager:stop_trace(TraceExpr),
+ Res
+ end;
%% @spec get(riak_object:bucket(), riak_object:key(), R :: integer()) ->
%% {ok, riak_object:riak_object()} |
@@ -294,6 +311,15 @@ put(RObj) -> THIS:put(RObj, []).
put(RObj, Options) when is_list(Options) ->
Me = self(),
ReqId = mk_reqid(),
+ case proplists:get_value(trace, Options, false) of
+ true ->
+ {ok, Trace} = lager:trace_file("log/request.log", [{reqid, ReqId}]);
+ File when is_list(File) ->
+ {ok, Trace} = lager:trace_file(File, [{reqid, ReqId}]);
+ _ ->
+ Trace = undefined,
+ ok
+ end,
case ClientId of
undefined ->
riak_kv_put_fsm_sup:start_put_fsm(Node, [{raw, ReqId, Me}, RObj, Options]);
@@ -303,7 +329,16 @@ put(RObj, Options) when is_list(Options) ->
end,
%% TODO: Investigate adding a monitor here and eliminating the timeout.
Timeout = recv_timeout(Options),
- wait_for_reqid(ReqId, Timeout);
+ Res = wait_for_reqid(ReqId, Timeout),
+ %% remove the trace
+ case Trace of
+ undefined ->
+ Res;
+ TraceExpr ->
+ lager:stop_trace(TraceExpr),
+ Res
+ end;
+
%% @spec put(RObj :: riak_object:riak_object(), W :: integer()) ->
%% ok |
View
@@ -145,7 +145,10 @@ init({test, Args, StateProps}) ->
{ok, validate, TestStateData, 0}.
%% @private
-prepare(timeout, StateData=#state{bkey=BKey={Bucket,_Key}}) ->
+prepare(timeout, StateData=#state{from={raw, ReqId, _Pid},
+ bkey=BKey={Bucket,_Key}}) ->
+ lager:debug([{reqid, ReqId},{bkey, BKey}], "Preparing get request ~p for ~p",
+ [ReqId, BKey]),
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
BucketProps = riak_core_bucket:get_bucket(Bucket, Ring),
DocIdx = riak_core_util:chash_key(BKey),
@@ -159,7 +162,10 @@ prepare(timeout, StateData=#state{bkey=BKey={Bucket,_Key}}) ->
%% @private
validate(timeout, StateData=#state{from = {raw, ReqId, _Pid}, options = Options,
- n = N, bucket_props = BucketProps, preflist2 = PL2}) ->
+ n = N, bucket_props = BucketProps,
+ bkey=BKey, preflist2 = PL2}) ->
+ lager:debug([{reqid, ReqId},{bkey,BKey}], "Validating get request ~p for ~p",
+ [ReqId, BKey]),
Timeout = get_option(timeout, Options, ?DEFAULT_TIMEOUT),
R0 = get_option(r, Options, ?DEFAULT_R),
PR0 = get_option(pr, Options, ?DEFAULT_PR),
@@ -212,14 +218,20 @@ validate(timeout, StateData=#state{from = {raw, ReqId, _Pid}, options = Options,
execute(timeout, StateData0=#state{timeout=Timeout,req_id=ReqId,
bkey=BKey,
preflist2 = Preflist2}) ->
+ lager:debug([{reqid,ReqId},{bkey,BKey}], "Executing get request ~p for ~p",
+ [ReqId, BKey]),
TRef = schedule_timeout(Timeout),
Preflist = [IndexNode || {IndexNode, _Type} <- Preflist2],
riak_kv_vnode:get(Preflist, BKey, ReqId),
StateData = StateData0#state{tref=TRef},
{next_state,waiting_vnode_r,StateData}.
%% @private
-waiting_vnode_r({r, VnodeResult, Idx, _ReqId}, StateData = #state{get_core = GetCore}) ->
+waiting_vnode_r({r, VnodeResult, Idx, ReqId}, StateData = #state{get_core =
+ GetCore, bkey=BKey}) ->
+ lager:debug([{reqid,ReqId},{vnode,Idx},{bkey,BKey}],
+ "Request ~p got vnode get response from ~p for ~p",
+ [ReqId, Idx, BKey]),
UpdGetCore = riak_kv_get_core:add_result(Idx, VnodeResult, GetCore),
case riak_kv_get_core:enough(UpdGetCore) of
true ->
@@ -231,15 +243,21 @@ waiting_vnode_r({r, VnodeResult, Idx, _ReqId}, StateData = #state{get_core = Get
false ->
{next_state, waiting_vnode_r, StateData#state{get_core = UpdGetCore}}
end;
-waiting_vnode_r(request_timeout, StateData) ->
+waiting_vnode_r(request_timeout, StateData = #state{req_id = ReqId, bkey=BKey}) ->
+ lager:debug([{reqid, ReqId},{bkey,BKey}],
+ "Request ~p timed out waiting for vnode get response for ~p",
+ [ReqId, BKey]),
S2 = update_timing(StateData),
update_stats(timeout, S2),
client_reply({error,timeout}, S2),
finalize(S2).
%% @private
-waiting_read_repair({r, VnodeResult, Idx, _ReqId},
- StateData = #state{get_core = GetCore}) ->
+waiting_read_repair({r, VnodeResult, Idx, ReqId},
+ StateData = #state{get_core = GetCore, bkey=BKey}) ->
+ lager:debug([{reqid, ReqId},{bkey,BKey}],
+ "Request ~p got read-repair response from ~p for ~p",
+ [ReqId, Idx, BKey]),
UpdGetCore = riak_kv_get_core:add_result(Idx, VnodeResult, GetCore),
maybe_finalize(StateData#state{get_core = UpdGetCore});
waiting_read_repair(request_timeout, StateData) ->
View
@@ -66,20 +66,32 @@ init(N, W, DW, WFailThreshold, DWFailThreshold, AllowMult, ReturnBody) ->
%% Add a result from the vnode
-spec add_result(vput_result(), putcore()) -> putcore().
-add_result({w, Idx, _ReqId}, PutCore = #putcore{results = Results,
+add_result({w, Idx, ReqId}, PutCore = #putcore{results = Results,
num_w = NumW}) ->
+ lager:debug([{reqid, ReqId},{vnode,Idx},{bkey,get(bkey)}],
+ "Completed W put request ~p on vnode ~p for ~p",
+ [ReqId, Idx, get(bkey)]),
PutCore#putcore{results = [{Idx, w} | Results],
num_w = NumW + 1};
-add_result({dw, Idx, _ReqId}, PutCore = #putcore{results = Results,
+add_result({dw, Idx, ReqId}, PutCore = #putcore{results = Results,
num_dw = NumDW}) ->
+ lager:debug([{reqid, ReqId},{vnode,Idx},{bkey,get(bkey)}],
+ "Completed DW put request ~p on vnode ~p for ~p",
+ [ReqId, Idx, get(bkey)]),
PutCore#putcore{results = [{Idx, {dw, undefined}} | Results],
num_dw = NumDW + 1};
-add_result({dw, Idx, ResObj, _ReqId}, PutCore = #putcore{results = Results,
+add_result({dw, Idx, ResObj, ReqId}, PutCore = #putcore{results = Results,
num_dw = NumDW}) ->
+ lager:debug([{reqid, ReqId},{vnode,Idx},{bkey,get(bkey)}],
+ "Completed DW put request ~p on vnode ~p for ~p",
+ [ReqId, Idx, get(bkey)]),
PutCore#putcore{results = [{Idx, {dw, ResObj}} | Results],
num_dw = NumDW + 1};
-add_result({fail, Idx, _ReqId}, PutCore = #putcore{results = Results,
+add_result({fail, Idx, ReqId}, PutCore = #putcore{results = Results,
num_fail = NumFail}) ->
+ lager:debug([{reqid, ReqId},{vnode,Idx},{bkey,get(bkey)}],
+ "Request ~p failed to do put on vnode ~p for ~p",
+ [ReqId, Idx, get(bkey)]),
PutCore#putcore{results = [{Idx, {error, undefined}} | Results],
num_fail = NumFail + 1};
add_result(_Other, PutCore = #putcore{num_fail = NumFail}) ->
View
@@ -171,9 +171,13 @@ init({test, Args, StateProps}) ->
%% @private
prepare(timeout, StateData0 = #state{from = From, robj = RObj,
options = Options}) ->
+ {raw, ReqId, _Pid} = From,
+ BKey = {riak_object:bucket(RObj), riak_object:key(RObj)},
+ put(bkey, BKey),
+ lager:debug([{reqid, ReqId},{bkey,BKey}], "Preparing put request ~p for ~p",
+ [ReqId, BKey]),
{ok,Ring} = riak_core_ring_manager:get_my_ring(),
BucketProps = riak_core_bucket:get_bucket(riak_object:bucket(RObj), Ring),
- BKey = {riak_object:bucket(RObj), riak_object:key(RObj)},
DocIdx = riak_core_util:chash_key(BKey),
N = proplists:get_value(n_val,BucketProps),
UpNodes = riak_core_node_watcher:nodes(riak_kv),
@@ -195,8 +199,9 @@ prepare(timeout, StateData0 = #state{from = From, robj = RObj,
riak_kv_stat:update(coord_redir),
{stop, normal, StateData0};
{error, Reason} ->
- lager:error("Unable to forward put for ~p to ~p - ~p\n",
- [BKey, CoordNode, Reason]),
+ lager:error([{reqid,ReqId},{bkey,BKey}],
+ "Unable to forward put for ~p to ~p - ~p\n",
+ [BKey, CoordNode, Reason]),
process_reply({error, {coord_handoff_failed, Reason}}, StateData0)
end;
_ ->
@@ -223,7 +228,10 @@ prepare(timeout, StateData0 = #state{from = From, robj = RObj,
validate(timeout, StateData0 = #state{from = {raw, ReqId, _Pid},
options = Options0,
n=N, bucket_props = BucketProps,
+ bkey=BKey,
preflist2 = Preflist2}) ->
+ lager:debug([{reqid, ReqId},{bkey,BKey}], "Validating put request ~p for ~p",
+ [ReqId, BKey]),
Timeout = get_option(timeout, Options0, ?DEFAULT_TIMEOUT),
PW0 = get_option(pw, Options0, default),
W0 = get_option(w, Options0, default),
@@ -293,14 +301,28 @@ validate(timeout, StateData0 = #state{from = {raw, ReqId, _Pid},
%% Run the precommit hooks
precommit(timeout, State = #state{precommit = []}) ->
execute(State);
-precommit(timeout, State = #state{precommit = [Hook | Rest], robj = RObj}) ->
+precommit(timeout, State = #state{precommit = [Hook | Rest], bkey=BKey,
+ req_id=ReqId, robj = RObj}) ->
+ lager:debug([{reqid, ReqId},{bkey,BKey}],
+ "Running precommit hook ~s:~s on get request ~p for ~p",
+ [proplists:get_value(<<"mod">>, element(2, Hook)),
+ proplists:get_value(<<"fun">>, element(2, Hook)), ReqId, BKey]),
Result = decode_precommit(invoke_hook(Hook, RObj)),
case Result of
fail ->
+ lager:debug([{reqid, ReqId},{bkey,BKey}],
+ "Precommit hook failed on get request ~p for ~p",
+ [ReqId, BKey]),
process_reply({error, precommit_fail}, State);
{fail, Reason} ->
+ lager:debug([{reqid, ReqId},{bkey,BKey}],
+ "Precommit hook failed on get request ~p for ~p: ~p",
+ [ReqId, BKey, Reason]),
process_reply({error, {precommit_fail, Reason}}, State);
Result ->
+ lager:debug([{reqid, ReqId},{bkey,BKey}],
+ "Precommit hook succeeded on get request ~p for ~p",
+ [ReqId, BKey]),
{next_state, precommit, State#state{robj = riak_object:apply_updates(Result),
precommit = Rest}, 0}
end.
@@ -320,9 +342,12 @@ execute(State=#state{coord_pl_entry = CPL}) ->
%% N.B. Not actually a state - here in the source to make reading the flow easier
execute_local(StateData=#state{robj=RObj, req_id = ReqId,
timeout=Timeout, bkey=BKey,
- coord_pl_entry = {_Index, _Node} = CoordPLEntry,
+ coord_pl_entry = {Index, _Node} = CoordPLEntry,
vnode_options=VnodeOptions,
starttime = StartTime}) ->
+ lager:debug([{reqid, ReqId},{vnode,Index},{bkey,BKey}],
+ "Coordinating put request ~p on vnode ~p for ~p",
+ [ReqId, Index, BKey]),
StateData1 = add_timing(execute_local, StateData),
TRef = schedule_timeout(Timeout),
riak_kv_vnode:coord_put(CoordPLEntry, BKey, RObj, ReqId, StartTime, VnodeOptions),
@@ -334,7 +359,10 @@ execute_local(StateData=#state{robj=RObj, req_id = ReqId,
%% @private
waiting_local_vnode(request_timeout, StateData) ->
process_reply({error,timeout}, StateData);
-waiting_local_vnode(Result, StateData = #state{putcore = PutCore}) ->
+waiting_local_vnode(Result, StateData = #state{putcore = PutCore, bkey=BKey,
+ req_id=ReqId}) ->
+ lager:debug([{reqid, ReqId},{bkey,BKey}], "Local put request ~p for ~p",
+ [ReqId, BKey]),
UpdPutCore1 = riak_kv_put_core:add_result(Result, PutCore),
case Result of
{fail, _Idx, _ReqId} ->
@@ -365,6 +393,8 @@ execute_remote(StateData=#state{robj=RObj, req_id = ReqId,
StateData1 = add_timing(execute_remote, StateData),
Preflist = [IndexNode || {IndexNode, _Type} <- Preflist2,
IndexNode /= CoordPLEntry],
+ lager:debug([{reqid, ReqId},{bkey,BKey}], "Remote put request ~p for ~p",
+ [ReqId, BKey]),
riak_kv_vnode:put(Preflist, BKey, RObj, ReqId, StartTime, VnodeOptions),
case riak_kv_put_core:enough(PutCore) of
true ->
@@ -376,12 +406,20 @@ execute_remote(StateData=#state{robj=RObj, req_id = ReqId,
%% @private
-waiting_remote_vnode(request_timeout, StateData) ->
+waiting_remote_vnode(request_timeout, StateData =
+ #state{req_id=ReqId, bkey=BKey}) ->
+ lager:debug([{reqid, ReqId},{bkey,BKey}],
+ "Request ~p timed out waiting for remote put response for ~p",
+ [ReqId, BKey]),
process_reply({error,timeout}, StateData);
-waiting_remote_vnode(Result, StateData = #state{putcore = PutCore}) ->
+waiting_remote_vnode(Result, StateData = #state{putcore = PutCore,
+ req_id=ReqId, bkey=BKey}) ->
UpdPutCore1 = riak_kv_put_core:add_result(Result, PutCore),
case riak_kv_put_core:enough(UpdPutCore1) of
true ->
+ lager:debug([{reqid, ReqId},{bkey,BKey}],
+ "Enough responses received for request ~p for ~p",
+ [ReqId, BKey]),
{Reply, UpdPutCore2} = riak_kv_put_core:response(UpdPutCore1),
process_reply(Reply, StateData#state{putcore = UpdPutCore2});
false ->
@@ -392,17 +430,26 @@ waiting_remote_vnode(Result, StateData = #state{putcore = PutCore}) ->
postcommit(timeout, StateData = #state{postcommit = []}) ->
new_state_timeout(finish, StateData);
postcommit(timeout, StateData = #state{postcommit = [Hook | Rest],
- putcore = PutCore}) ->
+ putcore = PutCore, req_id=ReqId,
+ bkey=BKey}) ->
%% Process the next hook - gives sys:get_status messages a chance if hooks
%% take a long time. No checking error returns for postcommit hooks.
+ lager:debug([{reqid, ReqId},{bkey,BKey}],
+ "Running postcommit hook ~s:~s on get request ~p for ~p",
+ [proplists:get_value(<<"mod">>, element(2, Hook)),
+ proplists:get_value(<<"fun">>, element(2, Hook)), ReqId, BKey]),
{ReplyObj, UpdPutCore} = riak_kv_put_core:final(PutCore),
invoke_hook(Hook, ReplyObj),
{next_state, postcommit, StateData#state{postcommit = Rest,
putcore = UpdPutCore}, 0};
postcommit(request_timeout, StateData) -> % still process hooks even if request timed out
{next_state, postcommit, StateData, 0};
-postcommit(Reply, StateData = #state{putcore = PutCore}) ->
+postcommit(Reply, StateData = #state{putcore = PutCore, req_id=ReqId,
+ bkey=BKey}) ->
%% late responses - add to state. *Does not* recompute finalobj
+ lager:debug([{reqid, ReqId},{bkey,BKey}],
+ "Request ~p got late remote put response from vnode for ~p",
+ [ReqId, BKey]),
UpdPutCore = riak_kv_put_core:add_result(Reply, PutCore),
{next_state, postcommit, StateData#state{putcore = UpdPutCore}, 0}.
@@ -417,8 +464,11 @@ finish(timeout, StateData = #state{timing = Timing, reply = Reply}) ->
riak_kv_stat:update({put_fsm_time, Duration})
end,
{stop, normal, StateData};
-finish(Reply, StateData = #state{putcore = PutCore}) ->
+finish(Reply, StateData = #state{putcore = PutCore, req_id=ReqId, bkey=BKey}) ->
%% late responses - add to state. *Does not* recompute finalobj
+ lager:debug([{reqid, ReqId},{bkey,BKey}],
+ "Request ~p got late remote put response from vnode for ~p",
+ [ReqId, BKey]),
UpdPutCore = riak_kv_put_core:add_result(Reply, PutCore),
{next_state, finish, StateData#state{putcore = UpdPutCore}, 0}.
Oops, something went wrong.

0 comments on commit f1d106a

Please sign in to comment.