Skip to content

Loading…

Switch to an RPC call (with timeout) to start remote put fsm on preflist node. #301

Merged
merged 1 commit into from

3 participants

@massung

Fixes #300

@jonmeredith
Basho Technologies member

Code looks good - will test independently.

@jonmeredith
Basho Technologies member

+1 merge - managed to provoke errors during testing.


Shell got [{26,{error,{coord_handoff_failed,nodedown}}},
           {25,{error,{coord_handoff_failed,timeout}}},
           {24,{error,{coord_handoff_failed,timeout}}},
@seancribbs

Not sure why this was never merged.

@seancribbs seancribbs merged commit 4b9d0b6 into master
@jonmeredith
Basho Technologies member

I think it was merged against the 1.1 series - interesting 1.1 hadn't been merged to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Mar 9, 2012
  1. @massung
Showing with 14 additions and 13 deletions.
  1. +14 −13 src/riak_kv_put_fsm.erl
View
27 src/riak_kv_put_fsm.erl
@@ -133,7 +133,7 @@ start_link(From, Object, PutOptions) ->
%% bkey - Bucket / Key
%% bucket_props - bucket properties
%% preflist2 - [{{Idx,Node},primary|fallback}] preference list
-%%
+%%
%% As test, but linked to the caller
test_link(From, Object, PutOptions, StateProps) ->
gen_fsm:start_link(?MODULE, {test, [From, Object, PutOptions], StateProps}, []).
@@ -148,7 +148,7 @@ test_link(From, Object, PutOptions, StateProps) ->
%% @private
init([From, RObj, Options]) ->
StateData = add_timing(prepare, #state{from = From,
- robj = RObj,
+ robj = RObj,
options = Options}),
{ok, prepare, StateData, 0};
init({test, Args, StateProps}) ->
@@ -190,11 +190,12 @@ prepare(timeout, StateData0 = #state{from = From, robj = RObj,
%% This node is not in the preference list
%% forward on to the first node
[{{_Idx, CoordNode},_Type}|_] = Preflist2,
- case riak_kv_put_fsm_sup:start_put_fsm(CoordNode, [From, RObj, Options]) of
+ Timeout = get_option(timeout, Options, ?DEFAULT_TIMEOUT),
+ case rpc:call(CoordNode,riak_kv_put_fsm_sup,start_put_fsm,[CoordNode,[From,RObj,Options]],Timeout) of
{ok, _Pid} ->
riak_kv_stat:update(coord_redir),
{stop, normal, StateData0};
- {error, Reason} ->
+ {_, Reason} -> % {error,_} or {badrpc,_}
lager:error("Unable to forward put for ~p to ~p - ~p\n",
[BKey, CoordNode, Reason]),
process_reply({error, {coord_handoff_failed, Reason}}, StateData0)
@@ -353,7 +354,7 @@ waiting_local_vnode(Result, StateData = #state{putcore = PutCore}) ->
end.
%% @private
-%% Send the put requests to any remote nodes if necessary and decided if
+%% Send the put requests to any remote nodes if necessary and decided if
%% enough responses have been received yet (i.e. if W/DW=1)
%% N.B. Not actually a state - here in the source to make reading the flow easier
execute_remote(StateData=#state{robj=RObj, req_id = ReqId,
@@ -466,7 +467,7 @@ process_reply(Reply, StateData = #state{postcommit = PostCommit,
[] ->
StateData1;
_ ->
- %% If postcommits defined, calculate final object
+ %% If postcommits defined, calculate final object
%% before any replies received after responding to
%% the client for a consistent view.
{_, UpdPutCore} = riak_kv_put_core:final(PutCore),
@@ -506,7 +507,7 @@ handle_options([{update_last_modified, true}|T], State = #state{robj = RObj}) ->
handle_options(T, State#state{robj = update_last_modified(RObj)});
handle_options([{returnbody, true}|T], State) ->
VnodeOpts = [{returnbody, true} | State#state.vnode_options],
- %% Force DW>0 if requesting return body to ensure the dw event
+ %% Force DW>0 if requesting return body to ensure the dw event
%% returned by the vnode includes the object.
handle_options(T, State#state{vnode_options=VnodeOpts,
dw=erlang:max(1,State#state.dw),
@@ -515,7 +516,7 @@ handle_options([{returnbody, false}|T], State = #state{postcommit = Postcommit})
case Postcommit of
[] ->
handle_options(T, State#state{returnbody=false});
-
+
_ ->
%% We have post-commit hooks, we'll need to get the body back
%% from the vnode, even though we don't plan to return that to the
@@ -530,7 +531,7 @@ handle_options([{_,_}|T], State) -> handle_options(T, State).
init_putcore(State = #state{n = N, w = W, dw = DW, allowmult = AllowMult,
returnbody = ReturnBody}) ->
- PutCore = riak_kv_put_core:init(N, W, DW,
+ PutCore = riak_kv_put_core:init(N, W, DW,
N-W+1, % cannot ever get W replies
N-DW+1, % cannot ever get DW replies
AllowMult,
@@ -570,7 +571,7 @@ update_last_modified(RObj) ->
%% be an external interface concern and are only used for sibling selection
%% and if-modified type tests so they could be generated on retrieval instead.
%% This changes from being a hash on the value to a likely-to-be-unique value
- %% which should serve the same purpose. It was possible to generate two
+ %% which should serve the same purpose. It was possible to generate two
%% objects with the same vclock on 0.14.2 if the same clientid was used in
%% the same second. It can be revisited post-1.0.0.
Now = erlang:now(),
@@ -701,7 +702,7 @@ get_hooks(HookType, BucketProps) ->
Hooks when is_list(Hooks) ->
Hooks
end.
-
+
get_option(Name, Options, Default) ->
proplists:get_value(Name, Options, Default).
@@ -750,7 +751,7 @@ default_details() ->
add_timing(Stage, State = #state{timing = Timing}) ->
State#state{timing = [{Stage, os:timestamp()} | Timing]}.
-%% Calc timing information - stored as {Stage, StageStart} in reverse order.
+%% Calc timing information - stored as {Stage, StageStart} in reverse order.
%% ResponseUsecs is calculated as time from reply to start.
calc_timing([{Stage, Now} | Timing]) ->
ReplyNow = case Stage of
@@ -764,7 +765,7 @@ calc_timing([{Stage, Now} | Timing]) ->
%% Each timing stage has start time.
calc_timing([], StageEnd, ReplyNow, Stages) ->
%% StageEnd is prepare time
- {timer:now_diff(ReplyNow, StageEnd), Stages};
+ {timer:now_diff(ReplyNow, StageEnd), Stages};
calc_timing([{reply, ReplyNow}|_]=Timing, StageEnd, undefined, Stages) ->
%% Populate ReplyNow then handle normally.
calc_timing(Timing, StageEnd, ReplyNow, Stages);
Something went wrong with that request. Please try again.