Improve MR Error Messages (AZ935) #252

Merged
merged 7 commits into from Dec 12, 2011
View
@@ -233,10 +233,14 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
define_invoke_anon_js(JS, Args, #state{ctx=Ctx}=State) ->
- JSFun = define_anon_js(JS, Args),
- case invoke_js(Ctx, JSFun) of
- {ok, R} ->
- {{ok, R}, State};
+ case define_anon_js(JS, Args) of
+ {ok, JSFun} ->
+ case invoke_js(Ctx, JSFun) of
+ {ok, R} ->
+ {{ok, R}, State};
+ Error ->
+ {Error, State}
+ end;
Error ->
{Error, State}
end.
@@ -270,25 +274,25 @@ invoke_js(Ctx, Js, Args) ->
catch
exit: {ucs, {bad_utf8_character_code}} ->
lager:error("Error JSON encoding arguments: ~p", [Args]),
- {error, bad_encoding};
+ {error, bad_utf8_character_code};
exit: {json_encode, _} ->
{error, bad_json};
throw:invalid_utf8 ->
- {error, bad_encoding}
+ {error, bad_utf8_character_code}
end.
define_anon_js(JS, Args) ->
try
ArgList = build_arg_list(Args, []),
- iolist_to_binary([JS, <<"(">>, ArgList, <<");">>])
+ {ok, iolist_to_binary([JS, <<"(">>, ArgList, <<");">>])}
catch
exit: {ucs, {bad_utf8_character_code}} ->
lager:error("Error JSON encoding arguments: ~p", [Args]),
- {error, bad_encoding};
+ {error, bad_utf8_character_code};
exit: {json_encode, _} ->
{error, bad_json};
throw:invalid_utf8 ->
- {error, bad_encoding}
+ {error, bad_utf8_character_code}
end.
new_context(ThreadStack, HeapSize) ->
@@ -30,6 +30,7 @@
-export([parse_request/1, parse_inputs/1, parse_query/1]).
-export([jsonify_not_found/1, dejsonify_not_found/1]).
+-export([jsonify_pipe_error/2]).
-define(QUERY_TOKEN, <<"query">>).
-define(INPUTS_TOKEN, <<"inputs">>).
@@ -448,6 +449,77 @@ erl_phase_error(StepDef) ->
" specifying a Riak object containing"
" Erlang function source\n"]}.
+%% @doc Produce a list of mochjson2 props from a pipe error log
+jsonify_pipe_error(From, {Error, Input}) ->
+ %% map function returned error tuple
+ [{phase, pipe_phase_index(From)},
+ {error, trunc_print(Error)},
+ {input, trunc_print(Input)}];
+jsonify_pipe_error(_From, Elist) when is_list(Elist) ->
+ %% generic pipe fitting error
+
+ %% phase pulled from Elist should be the same as From,
+ %% but just to dig into that info more, we use Elist here
+ [{phase, pipe_error_phase(Elist)},
+ {error, pipe_error_error(Elist)},
+ {input, pipe_error_input(Elist)},
+ {type, pipe_error_type(Elist)},
+ {stack, pipe_error_stack(Elist)}];
+jsonify_pipe_error(From, Other) ->
+ %% some other error
+ [{phase, pipe_phase_index(From)},
+ {error, trunc_print(Other)}].
+
+%% @doc Turn the pipe fitting name into a MapReduce phase index.
+-spec pipe_phase_index(integer()|{term(),integer()}) -> integer().
+pipe_phase_index({_Type, I}) -> I;
+pipe_phase_index(I) -> I.
+
+%% @doc convenience for formatting ~500chars of a term as a
+%% human-readable string
+-spec trunc_print(term()) -> binary().
+trunc_print(Term) ->
+ {Msg, _Len} = lager_trunc_io:print(Term, 500),
+ iolist_to_binary(Msg).
+
+%% @doc Pull a field out of a proplist, and possibly transform it.
+pipe_error_field(Field, Proplist) ->
+ pipe_error_field(Field, Proplist, fun(X) -> X end).
+pipe_error_field(Field, Proplist, TrueFun) ->
+ pipe_error_field(Field, Proplist, TrueFun, null).
+pipe_error_field(Field, Proplist, TrueFun, FalseVal) ->
+ case lists:keyfind(Field, 1, Proplist) of
+ {Field, Value} -> TrueFun(Value);
+ false -> FalseVal
+ end.
+
+%% @doc Pull a field out of a proplist, and format it as a reasonably
+%% short binary if available.
+pipe_error_trunc_print(Field, Elist) ->
+ pipe_error_field(Field, Elist, fun trunc_print/1).
+
+%% @doc Determine the phase that this error came from.
+pipe_error_phase(Elist) ->
+ Details = pipe_error_field(details, Elist, fun(X) -> X end, []),
+ pipe_error_field(name, Details, fun pipe_phase_index/1).
+
+pipe_error_type(Elist) ->
+ %% is this really useful?
+ pipe_error_field(type, Elist).
+
+pipe_error_error(Elist) ->
+ pipe_error_field(error, Elist, fun trunc_print/1,
+ pipe_error_trunc_print(reason, Elist)).
+
+pipe_error_input(Elist) ->
+ %% translate common inputs?
+ %% e.g. strip 'ok' tuple from map input
+ pipe_error_trunc_print(input, Elist).
+
+pipe_error_stack(Elist) ->
+ %% truncate stacks to "important" part?
+ pipe_error_trunc_print(stack, Elist).
+
-ifdef(TEST).
bucket_input_test() ->
View
@@ -138,10 +138,14 @@ process(Input, _Last,
#state{fd=_FittingDetails, phase=Phase, arg=Arg}=State) ->
?T(_FittingDetails, [map], {mapping, Input}),
case map(Phase, Arg, Input) of
- {ok, Results} ->
+ {ok, Results} when is_list(Results) ->
?T(_FittingDetails, [map], {produced, Results}),
send_results(Results, State),
{ok, State};
+ {ok, _NonListResults} ->
+ ?T(_FittingDetails, [map, error],
+ {error, {non_list_result, Input}}),
+ {ok, State};
{forward_preflist, Reason} ->
?T(_FittingDetails, [map], {forward_preflist, Reason}),
{forward_preflist, State};
@@ -134,15 +134,17 @@ handle_info(#pipe_result{ref=ReqId, from=PhaseId, result=Res},
{noreply, send_msg(#rpbmapredresp{phase=PhaseId,
response=Response}, State)}
end;
-handle_info(#pipe_log{ref=ReqId, msg=Msg},
+handle_info(#pipe_log{ref=ReqId, from=From, msg=Msg},
State=#state{req=#rpbmapredreq{},
req_ctx=#pipe_ctx{ref=ReqId}=PipeCtx}) ->
case Msg of
- {trace, [error], {error, {Error, _Input}}} ->
+ {trace, [error], {error, Info}} ->
erlang:cancel_timer(PipeCtx#pipe_ctx.timer),
%% destroying the pipe will automatically kill the sender
riak_pipe:destroy(PipeCtx#pipe_ctx.pipe),
- NewState = send_error("~p", [{error, Error}], State),
+ JsonInfo = {struct, riak_kv_mapred_json:jsonify_pipe_error(
+ From, Info)},
+ NewState = send_error(mochijson2:encode(JsonInfo), [], State),
{noreply, NewState#state{req = undefined, req_ctx = undefined}};
_ ->
{noreply, State}
View
@@ -203,22 +203,13 @@ handoff(HandoffAcc, #state{inacc=OldInAcc}=State) ->
{ok, [term()]} | {error, {term(), term(), term()}}.
reduce(Inputs, #state{fd=FittingDetails}, ErrString) ->
{rct, Fun, Arg} = FittingDetails#fitting_details.arg,
- try
- ?T(FittingDetails, [reduce], {reducing, length(Inputs)}),
- Outputs = Fun(Inputs, Arg),
- true = is_list(Outputs), %%TODO: nicer error
- ?T(FittingDetails, [reduce], {reduced, length(Outputs)}),
- Outputs
- catch Type:Error ->
- %% attempting to be helpful here by catching the error and
- %% preserving the inputs, in case trying the input again
- %% later (when a new input or eoi arrives) will be
- %% successful
- ?T(FittingDetails, [reduce], {reduce_error, Type, Error}),
- error_logger:error_msg(
- "~p:~p ~s:~n ~P~n ~P",
- [Type, Error, ErrString, Inputs, 15, erlang:get_stacktrace(), 15]),
- Inputs
+ ?T(FittingDetails, [reduce], {reducing, ErrString, length(Inputs)}),
+ case Fun(Inputs, Arg) of
+ Outputs when is_list(Outputs) ->
+ ?T(FittingDetails, [reduce], {reduced, ErrString, length(Outputs)}),
+ Outputs;
+ _NonListOutputs ->
+ exit(non_list_result)
end.
%% @doc Check that the arg is a valid arity-2 function. See {@link
@@ -285,12 +276,13 @@ js_runner(JS) ->
JSCall = {JS, [JSInputs, SafeArg]},
case riak_kv_js_manager:blocking_dispatch(
?JSPOOL_REDUCE, JSCall, ?DEFAULT_JS_RESERVE_ATTEMPTS) of
- {ok, Results0} ->
+ {ok, Results0} when is_list(Results0) ->
[riak_kv_mapred_json:dejsonify_not_found(R)
|| R <- Results0];
- {error, no_vms} ->
- %% will be caught by process/3, or will blow up done/1
- throw(no_js_vms)
+ {ok, NonlistResults} ->
+ NonlistResults; %% will blow up in reduce/3
+ {error, Error} ->
+ exit(Error)
end
end.
View
@@ -82,7 +82,6 @@ process_post(RD, State) ->
legacy_mapred(RD, State)
end.
-
%% Internal functions
send_error(Error, RD) ->
wrq:set_resp_body(format_error(Error), RD).
@@ -190,8 +189,7 @@ pipe_mapred_nonchunked(RD, State, Pipe, NumKeeps, Sender) ->
riak_pipe:destroy(Pipe),
prevent_keepalive(),
{{halt, 500}, send_error({error, timeout}, RD), State};
- {error, {Error, _Input}} ->
- %% TODO: jsonify Input for error message
+ {error, Error} ->
riak_pipe:destroy(Pipe),
prevent_keepalive(),
{{halt, 500}, send_error({error, Error}, RD), State}
@@ -219,10 +217,11 @@ pipe_receive_output(Ref, {SenderPid, SenderRef}) ->
eoi;
#pipe_result{ref=Ref, from=From, result=Result} ->
{ok, {From, Result}};
- #pipe_log{ref=Ref, msg=Msg} ->
+ #pipe_log{ref=Ref, from=From, msg=Msg} ->
case Msg of
- {trace, [error], {error, {Error, Input}}} ->
- {error, {Error, Input}};
+ {trace, [error], {error, Info}} ->
+ {error, riak_kv_mapred_json:jsonify_pipe_error(
+ From, Info)};
_ ->
%% not a log message we're interested in
pipe_receive_output(Ref, {SenderPid, SenderRef})