Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Catch exit with reason exhausted_preflist on the

call to schedule_input after a nodedown message
is received.

Fixes: az449 bz1113 bz1114

The schedule_input function in riak_kv_map_phase calls itself
in the case that it catches a nodedown exit while trying to
start mapper processes. If all the primary preference list
entries have been checked or their nodes are down this inner
call to schedule_input will result in an exit with reason
exhausted_preflist. Instead of just having the map phase
process exit, this change catches the exit and returns an
error to the caller of schedule_input so that notfound results
can be returned in the case of bz1113 or a more descriptive
error message can be return in the case of bz1114.

Do not expect an empty list for the ClaimList
parameter in the termination clause of
riak_kv_mapred_planner:claim_keys.

Add extra function clause for riak_kv_mapred_planner:claim_keys.
  • Loading branch information...
commit e7d09e8935956471c9091b121c09f941a895175f 1 parent 3f28c0d
@kellymclaughlin kellymclaughlin authored
Showing with 31 additions and 11 deletions.
  1. +29 −11 src/riak_kv_map_phase.erl
  2. +2 −0  src/riak_kv_mapred_planner.erl
View
40 src/riak_kv_map_phase.erl
@@ -43,9 +43,13 @@ handle_input(Inputs0, #state{fsms=FSMs0, qterm=QTerm, mapper_data=MapperData}=St
case length(Inputs1) > 0 of
true ->
ClaimLists = riak_kv_mapred_planner:plan_map(Inputs1),
- {NewFSMs, _ClaimLists1, FsmKeys} = schedule_input(Inputs1, ClaimLists, QTerm, FSMs0, State),
- MapperData1 = MapperData ++ FsmKeys,
- {no_output, State#state{fsms=NewFSMs, mapper_data=MapperData1}};
+ case schedule_input(Inputs1, ClaimLists, QTerm, FSMs0, State) of
+ {NewFSMs, _ClaimLists1, FsmKeys} ->
+ MapperData1 = MapperData ++ FsmKeys,
+ {no_output, State#state{fsms=NewFSMs, mapper_data=MapperData1}};
+ {error, exhausted_preflist} ->
+ {stop, {error, {no_candidate_nodes, exhausted_prefist, erlang:get_stacktrace(), MapperData}}, State}
+ end;
false ->
{no_output, State}
end.
@@ -104,9 +108,14 @@ handle_info({'EXIT', Pid, _Reason}, #state{mapper_data=MapperData, fsms=FSMs, qt
{_Partition, BadNode} = VNode,
NewKeys = prune_input_nodes(Keys, BadNode),
ClaimLists = riak_kv_mapred_planner:plan_map(NewKeys),
- {NewFSMs, _ClaimLists1, FsmKeys} = schedule_input(NewKeys, ClaimLists, QTerm, FSMs, State),
- MapperData1 = lists:keydelete(Id, 1, lists:keydelete(Pid, 1, MapperData ++ FsmKeys)),
- maybe_done(State#state{mapper_data=MapperData1, fsms=NewFSMs})
+ case schedule_input(NewKeys, ClaimLists, QTerm, FSMs, State) of
+ {NewFSMs, _ClaimLists1, FsmKeys} ->
+ MapperData1 = lists:keydelete(Id, 1, lists:keydelete(Pid, 1, MapperData ++ FsmKeys)),
+ maybe_done(State#state{mapper_data=MapperData1, fsms=NewFSMs});
+ {error, exhausted_preflist} ->
+ MapperData1 = lists:keydelete(Id, 1, lists:keydelete(Pid, 1, MapperData)),
+ maybe_done(State#state{mapper_data=MapperData1, fsms=FSMs})
+ end
catch
_:Error ->
{stop, {error, {no_candidate_nodes, Error, erlang:get_stacktrace(), MapperData}}, State}
@@ -141,8 +150,12 @@ schedule_input(Inputs1, ClaimLists, QTerm, FSMs0, State) ->
catch
exit:{{nodedown, Node}, _} ->
Inputs2 = prune_input_nodes(Inputs1, Node),
- ClaimLists2 = riak_kv_mapred_planner:plan_map(Inputs2),
- schedule_input(Inputs2, ClaimLists2, QTerm, FSMs0, State);
+ try riak_kv_mapred_planner:plan_map(Inputs2) of
+ ClaimLists2 ->
+ schedule_input(Inputs2, ClaimLists2, QTerm, FSMs0, State)
+ catch exit:exhausted_preflist ->
+ {error, exhausted_preflist}
+ end;
Error ->
throw(Error)
end.
@@ -266,9 +279,14 @@ handle_not_found_reply(VNode, BKey, Executor, #state{fsms=FSMs, mapper_data=Mapp
try riak_kv_mapred_planner:plan_map(NewKeys) of
ClaimLists ->
FSMs1 = dict:erase(Executor, FSMs),
- {NewFSMs, _ClaimLists1, FsmKeys} = schedule_input(NewKeys, ClaimLists, QTerm, FSMs1, State),
- MapperData1 = lists:keydelete(Executor, 1, MapperData ++ FsmKeys),
- maybe_done(State#state{mapper_data=MapperData1, fsms=NewFSMs})
+ case schedule_input(NewKeys, ClaimLists, QTerm, FSMs1, State) of
+ {NewFSMs, _ClaimLists1, FsmKeys} ->
+ MapperData1 = lists:keydelete(Executor, 1, MapperData ++ FsmKeys),
+ maybe_done(State#state{mapper_data=MapperData1, fsms=NewFSMs});
+ {error, exhausted_preflist} ->
+ MapperData1 = lists:keydelete(Executor, 1, MapperData),
+ maybe_done(State#state{fsms=FSMs1, mapper_data=MapperData1, pending=Pending++Reply})
+ end
catch
exit:exhausted_preflist ->
%% At this point the preflist has been exhausted
View
2  src/riak_kv_mapred_planner.erl
@@ -43,6 +43,8 @@ claim_keys([], [], _) ->
exit(exhausted_preflist);
claim_keys(_, ClaimList, []) ->
ClaimList;
+claim_keys([], _, _) ->
+ exit(exhausted_preflist);
claim_keys([H|T], ClaimList, Keys) ->
{P, PKeys} = H,
PKeys1 = lists:filter(fun(PK) ->

0 comments on commit e7d09e8

Please sign in to comment.
Something went wrong with that request. Please try again.