Skip to content

Commit

Permalink
port MR error msg improvements to PB interface
Browse files Browse the repository at this point in the history
PB error message is now a JSON object
  • Loading branch information
beerriot committed Dec 12, 2011
1 parent c7a0dc9 commit 642a927
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 65 deletions.
72 changes: 72 additions & 0 deletions src/riak_kv_mapred_json.erl
Expand Up @@ -30,6 +30,7 @@

-export([parse_request/1, parse_inputs/1, parse_query/1]).
-export([jsonify_not_found/1, dejsonify_not_found/1]).
-export([jsonify_pipe_error/2]).

-define(QUERY_TOKEN, <<"query">>).
-define(INPUTS_TOKEN, <<"inputs">>).
Expand Down Expand Up @@ -448,6 +449,77 @@ erl_phase_error(StepDef) ->
" specifying a Riak object containing"
" Erlang function source\n"]}.

%% @doc Produce a list of mochjson2 props from a pipe error log
jsonify_pipe_error(From, {Error, Input}) ->
%% map function returned error tuple
[{phase, pipe_phase_index(From)},
{error, trunc_print(Error)},
{input, trunc_print(Input)}];
jsonify_pipe_error(_From, Elist) when is_list(Elist) ->
%% generic pipe fitting error

%% phase pulled from Elist should be the same as From,
%% but just to dig into that info more, we use Elist here
[{phase, pipe_error_phase(Elist)},
{error, pipe_error_error(Elist)},
{input, pipe_error_input(Elist)},
{type, pipe_error_type(Elist)},
{stack, pipe_error_stack(Elist)}];
jsonify_pipe_error(From, Other) ->
%% some other error
[{phase, pipe_phase_index(From)},
{error, trunc_print(Other)}].

%% @doc Turn the pipe fitting name into a MapReduce phase index.
-spec pipe_phase_index(integer()|{term(),integer()}) -> integer().
pipe_phase_index({_Type, I}) -> I;
pipe_phase_index(I) -> I.

%% @doc convenience for formatting ~500chars of a term as a
%% human-readable string
-spec trunc_print(term()) -> binary().
trunc_print(Term) ->
{Msg, _Len} = lager_trunc_io:print(Term, 500),
iolist_to_binary(Msg).

%% @doc Pull a field out of a proplist, and possibly transform it.
pipe_error_field(Field, Proplist) ->
pipe_error_field(Field, Proplist, fun(X) -> X end).
pipe_error_field(Field, Proplist, TrueFun) ->
pipe_error_field(Field, Proplist, TrueFun, null).
pipe_error_field(Field, Proplist, TrueFun, FalseVal) ->
case lists:keyfind(Field, 1, Proplist) of
{Field, Value} -> TrueFun(Value);
false -> FalseVal
end.

%% @doc Pull a field out of a proplist, and format it as a reasonably
%% short binary if available.
pipe_error_trunc_print(Field, Elist) ->
pipe_error_field(Field, Elist, fun trunc_print/1).

%% @doc Determine the phase that this error came from.
pipe_error_phase(Elist) ->
Details = pipe_error_field(details, Elist, fun(X) -> X end, []),
pipe_error_field(name, Details, fun pipe_phase_index/1).

pipe_error_type(Elist) ->
%% is this really useful?
pipe_error_field(type, Elist).

pipe_error_error(Elist) ->
pipe_error_field(error, Elist, fun trunc_print/1,
pipe_error_trunc_print(reason, Elist)).

pipe_error_input(Elist) ->
%% translate common inputs?
%% e.g. strip 'ok' tuple from map input
pipe_error_trunc_print(input, Elist).

pipe_error_stack(Elist) ->
%% truncate stacks to "important" part?
pipe_error_trunc_print(stack, Elist).

-ifdef(TEST).

bucket_input_test() ->
Expand Down
8 changes: 5 additions & 3 deletions src/riak_kv_pb_socket.erl
Expand Up @@ -134,15 +134,17 @@ handle_info(#pipe_result{ref=ReqId, from=PhaseId, result=Res},
{noreply, send_msg(#rpbmapredresp{phase=PhaseId,
response=Response}, State)}
end;
handle_info(#pipe_log{ref=ReqId, msg=Msg},
handle_info(#pipe_log{ref=ReqId, from=From, msg=Msg},
State=#state{req=#rpbmapredreq{},
req_ctx=#pipe_ctx{ref=ReqId}=PipeCtx}) ->
case Msg of
{trace, [error], {error, {Error, _Input}}} ->
{trace, [error], {error, Info}} ->
erlang:cancel_timer(PipeCtx#pipe_ctx.timer),
%% destroying the pipe will automatically kill the sender
riak_pipe:destroy(PipeCtx#pipe_ctx.pipe),
NewState = send_error("~p", [{error, Error}], State),
JsonInfo = {struct, riak_kv_mapred_json:jsonify_pipe_error(
From, Info)},
NewState = send_error(mochijson2:encode(JsonInfo), [], State),
{noreply, NewState#state{req = undefined, req_ctx = undefined}};
_ ->
{noreply, State}
Expand Down
65 changes: 3 additions & 62 deletions src/riak_kv_wm_mapred.erl
Expand Up @@ -219,18 +219,9 @@ pipe_receive_output(Ref, {SenderPid, SenderRef}) ->
{ok, {From, Result}};
#pipe_log{ref=Ref, from=From, msg=Msg} ->
case Msg of
{trace, [error], {error, {Error, Input}}} ->
%% map function returned error tuple
{error, [{phase, pipe_phase_index(From)},
{error, trunc_print(Error)},
{input, trunc_print(Input)}]};
{trace, [error], {error, Elist}} when is_list(Elist) ->
%% generic pipe fitting error
{error, [{phase, pipe_error_phase(Elist)},
{error, pipe_error_error(Elist)},
{input, pipe_error_input(Elist)},
{type, pipe_error_type(Elist)},
{stack, pipe_error_stack(Elist)}]};
{trace, [error], {error, Info}} ->
{error, riak_kv_mapred_json:jsonify_pipe_error(
From, Info)};
_ ->
%% not a log message we're interested in
pipe_receive_output(Ref, {SenderPid, SenderRef})
Expand All @@ -246,56 +237,6 @@ pipe_receive_output(Ref, {SenderPid, SenderRef}) ->
{error, timeout}
end.

%% @doc Turn the pipe fitting name into a MapReduce phase index.
-spec pipe_phase_index(integer()|{term(),integer()}) -> integer().
pipe_phase_index({_Type, I}) -> I;
pipe_phase_index(I) -> I.

%% @doc convenience for formatting ~500chars of a term as a
%% human-readable string
-spec trunc_print(term()) -> binary().
trunc_print(Term) ->
{Msg, _Len} = lager_trunc_io:print(Term, 500),
iolist_to_binary(Msg).

%% @doc Pull a field out of a proplist, and possibly transform it.
pipe_error_field(Field, Proplist) ->
pipe_error_field(Field, Proplist, fun(X) -> X end).
pipe_error_field(Field, Proplist, TrueFun) ->
pipe_error_field(Field, Proplist, TrueFun, null).
pipe_error_field(Field, Proplist, TrueFun, FalseVal) ->
case lists:keyfind(Field, 1, Proplist) of
{Field, Value} -> TrueFun(Value);
false -> FalseVal
end.

%% @doc Pull a field out of a proplist, and format it as a reasonably
%% short binary if available.
pipe_error_trunc_print(Field, Elist) ->
pipe_error_field(Field, Elist, fun trunc_print/1).

%% @doc Determine the phase that this error came from.
pipe_error_phase(Elist) ->
Details = pipe_error_field(details, Elist, fun(X) -> X end, []),
pipe_error_field(name, Details, fun pipe_phase_index/1).

pipe_error_type(Elist) ->
%% is this really useful?
pipe_error_field(type, Elist).

pipe_error_error(Elist) ->
pipe_error_field(error, Elist, fun trunc_print/1,
pipe_error_trunc_print(reason, Elist)).

pipe_error_input(Elist) ->
%% translate common inputs?
%% e.g. strip 'ok' tuple from map input
pipe_error_trunc_print(input, Elist).

pipe_error_stack(Elist) ->
%% truncate stacks to "important" part?
pipe_error_trunc_print(stack, Elist).

pipe_mapred_chunked(RD, State, Pipe, Sender) ->
Boundary = riak_core_util:unique_id_62(),
CTypeRD = wrq:set_resp_header(
Expand Down

0 comments on commit 642a927

Please sign in to comment.