Skip to content
Browse files

merge az357-eunit into az362 branch

  • Loading branch information...
1 parent a9feb00 commit d8ccb90487f3ca4477273586e95fe5bd548931f4 Bryan Fink committed May 20, 2011
View
2 .gitignore
@@ -4,4 +4,4 @@ ebin/*
doc/*
deps/*
erl_crash.dump
-
+EUnit-SASL.log
View
2 dialyzer.ignore-warnings
@@ -71,5 +71,5 @@ riak_core_pb.erl:132: The pattern <Binary, 'string'> can never match the type <'
###
### Warnings from validation code, where user code might violate -spec
###
-riak_pipe_fitting.erl:479: The variable NVal can never match since previous clauses completely covered the type fun((_) -> pos_integer()) | pos_integer()
+riak_pipe_fitting.erl:496: The variable NVal can never match since previous clauses completely covered the type fun((_) -> pos_integer()) | pos_integer()
View
2 include/riak_pipe_log.hrl
@@ -1,4 +1,6 @@
-define(T(Details, Extra, Msg),
riak_pipe_log:trace(Details, [?MODULE|Extra], Msg)).
+-define(T_ERR(Details, Props),
+ riak_pipe_log:trace(Details, [?MODULE, error], {error, Props})).
-define(L(Details, Msg),
riak_pipe_log:log(Details, Msg)).
View
2 rebar.config
@@ -1,5 +1,5 @@
%% -*- mode: erlang -*-
-{erl_opts, [fail_on_warning, debug_info]}.
+{erl_opts, [warnings_as_errors, debug_info]}.
{edoc_opts, [{preprocess, true}]}.
{cover_enabled, true}.
{deps, [
View
702 src/riak_pipe.erl
@@ -32,7 +32,7 @@
%% chashfun=fun chash:key_of/1}],
%%
%% % start things up
-%% {ok, Head, Sink} = riak_pipe:exec(PipelineSpec),
+%% {ok, Head, Sink} = riak_pipe:exec(PipelineSpec, []),
%%
%% % send in some work
%% riak_pipe_vnode:queue_work(Head, "work item 1"),
@@ -55,7 +55,9 @@
%% client API
-export([exec/2,
receive_result/1,
- collect_results/1]).
+ receive_result/2,
+ collect_results/1,
+ collect_results/2]).
%% worker/fitting API
-export([result/3, eoi/1, log/3]).
%% examples
@@ -65,10 +67,19 @@
example_receive/1,
example_transform/0,
- example_reduce/0]).
+ generic_transform/4,
+ example_reduce/0,
+ example_tick/3,
+ example_tick/4]).
+-ifdef(TEST).
+-export([do_dep_apps/1, t/0]).
+-endif.
-include("riak_pipe.hrl").
-include("riak_pipe_debug.hrl").
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
-export_type([fitting/0,
fitting_spec/0,
@@ -260,15 +271,23 @@ eoi(#fitting{pid=Pid, ref=Ref}) ->
| {log, {From::term(), Message::term()}}
| eoi
| timeout.
-receive_result(#fitting{ref=Ref}) ->
+receive_result(Fitting) ->
+ receive_result(Fitting, 5000).
+
+-spec receive_result(Sink::fitting(), Timeout::integer() | 'infinity') ->
+ {result, {From::term(), Result::term()}}
+ | {log, {From::term(), Message::term()}}
+ | eoi
+ | timeout.
+receive_result(#fitting{ref=Ref}, Timeout) ->
receive
#pipe_result{ref=Ref, from=From, result=Result} ->
{result, {From, Result}};
#pipe_log{ref=Ref, from=From, msg=Msg} ->
{log, {From, Msg}};
#pipe_eoi{ref=Ref} ->
eoi
- after 5000 ->
+ after Timeout ->
timeout
end.
@@ -295,23 +314,31 @@ receive_result(#fitting{ref=Ref}) ->
Results::[{From::term(), Result::term()}],
Logs::[{From::term(), Message::term()}]}.
collect_results(#fitting{}=Fitting) ->
- collect_results(Fitting, [], []).
+ collect_results(Fitting, [], [], 5000).
+
+-spec collect_results(Sink::fitting(), Timeout::integer() | 'infinity') ->
+ {eoi | timeout,
+ Results::[{From::term(), Result::term()}],
+ Logs::[{From::term(), Message::term()}]}.
+collect_results(#fitting{}=Fitting, Timeout) ->
+ collect_results(Fitting, [], [], Timeout).
%% @doc Internal implementation of collect_results/1. Just calls
%% receive_result/1, and accumulates lists of result and log
%% messages.
-spec collect_results(Sink::fitting(),
ResultAcc::[{From::term(), Result::term()}],
- LogAcc::[{From::term(), Result::term()}]) ->
+ LogAcc::[{From::term(), Result::term()}],
+ Timeout::integer() | 'infinity') ->
{eoi | timeout,
Results::[{From::term(), Result::term()}],
Logs::[{From::term(), Message::term()}]}.
-collect_results(Fitting, ResultAcc, LogAcc) ->
- case receive_result(Fitting) of
+collect_results(Fitting, ResultAcc, LogAcc, Timeout) ->
+ case receive_result(Fitting, Timeout) of
{result, {From, Result}} ->
- collect_results(Fitting, [{From,Result}|ResultAcc], LogAcc);
+ collect_results(Fitting, [{From,Result}|ResultAcc], LogAcc, Timeout);
{log, {From, Result}} ->
- collect_results(Fitting, ResultAcc, [{From,Result}|LogAcc]);
+ collect_results(Fitting, ResultAcc, [{From,Result}|LogAcc], Timeout);
End ->
%% result order shouldn't matter,
%% but it's useful to have logging output in time order
@@ -370,22 +397,31 @@ example_receive(Sink) ->
%% '''
-spec example_transform() -> {eoi | timeout, list(), list()}.
example_transform() ->
- SumFun = fun(Input, Partition, FittingDetails) ->
- riak_pipe_vnode_worker:send_output(
- lists:sum(Input),
- Partition,
- FittingDetails),
- ok
- end,
+ MsgFun = fun lists:sum/1,
+ DriverFun = fun(Head, _Sink) ->
+ ok = riak_pipe_vnode:queue_work(Head, lists:seq(1, 10)),
+ riak_pipe_fitting:eoi(Head),
+ ok
+ end,
+ generic_transform(MsgFun, DriverFun, [], 1).
+
+generic_transform(MsgFun, DriverFun, ExecOpts, NumFittings) ->
+ MsgFunThenSendFun = fun(Input, Partition, FittingDetails) ->
+ riak_pipe_vnode_worker:send_output(
+ MsgFun(Input),
+ Partition,
+ FittingDetails),
+ ok
+ end,
{ok, Head, Sink} =
riak_pipe:exec(
- [#fitting_spec{name="sum transform",
- module=riak_pipe_w_xform,
- arg=SumFun,
- chashfun=fun(_) -> <<0:160/integer>> end}],
- []),
- ok = riak_pipe_vnode:queue_work(Head, lists:seq(1, 10)),
- riak_pipe_fitting:eoi(Head),
+ lists:duplicate(NumFittings,
+ #fitting_spec{name="generic transform",
+ module=riak_pipe_w_xform,
+ arg=MsgFunThenSendFun,
+ chashfun=fun zero_part/1}),
+ ExecOpts),
+ ok = DriverFun(Head, Sink),
example_receive(Sink).
%% @doc Another example pipeline use. This one sets up a simple
@@ -423,3 +459,619 @@ example_reduce() ->
|| N <- lists:seq(16, 20) ],
riak_pipe_fitting:eoi(Head),
example_receive(Sink).
+
+example_tick(TickLen, NumTicks, ChainLen) ->
+ example_tick(TickLen, 1, NumTicks, ChainLen).
+
+example_tick(TickLen, BatchSize, NumTicks, ChainLen) ->
+ Specs = [#fitting_spec{name=list_to_atom("tick_pass" ++ integer_to_list(F_num)),
+ module=riak_pipe_w_pass,
+ chashfun = fun zero_part/1}
+ || F_num <- lists:seq(1, ChainLen)],
+ {ok, Head, Sink} = riak_pipe:exec(Specs, [{log, sink},
+ {trace, all}]),
+ [begin
+ [riak_pipe_vnode:queue_work(Head, {tick, {TickSeq, X}, now()}) ||
+ X <- lists:seq(1, BatchSize)],
+ if TickSeq /= NumTicks -> timer:sleep(TickLen);
+ true -> ok
+ end
+ end || TickSeq <- lists:seq(1, NumTicks)],
+ riak_pipe_fitting:eoi(Head),
+ example_receive(Sink).
+
+%% @doc dummy chashfun for tests and examples
+%% sends everything to partition 0
+zero_part(_) ->
+ riak_pipe_vnode:hash_for_partition(0).
+
+-ifdef(TEST).
+
+extract_trace_errors(Trace) ->
+ [Ps || {_, {trace, _, {error, Ps}}} <- Trace].
+
+%% extract_trace_vnode_failures(Trace) ->
+%% [Partition || {_, {trace, _, {vnode_failure, Partition}}} <- Trace].
+
+extract_fitting_died_errors(Trace) ->
+ [X || {_, {trace, _, {vnode, {fitting_died, _}} = X}} <- Trace].
+
+extract_queued(Trace) ->
+ [{Partition, X} ||
+ {_, {trace, _, {vnode, {queued, Partition, X}}}} <- Trace].
+
+extract_queue_full(Trace) ->
+ [Partition ||
+ {_, {trace, _, {vnode, {queue_full, Partition, _}}}} <- Trace].
+
+extract_unblocking(Trace) ->
+ [Partition ||
+ {_, {trace, _, {vnode, {unblocking, Partition}}}} <- Trace].
+
+kill_all_pipe_vnodes() ->
+ [exit(VNode, kill) ||
+ VNode <- riak_core_vnode_master:all_nodes(riak_pipe_vnode)].
+
+t() ->
+ eunit:test(?MODULE).
+
+dep_apps() ->
+ DelMe = "./EUnit-SASL.log",
+ KillDamnFilterProc = fun() ->
+ timer:sleep(5),
+ catch exit(whereis(riak_sysmon_filter), kill),
+ timer:sleep(5)
+ end,
+ [fun(start) ->
+ _ = application:stop(sasl),
+ _ = application:load(sasl),
+ put(old_sasl_l, app_helper:get_env(sasl, sasl_error_logger)),
+ ok = application:set_env(sasl, sasl_error_logger, {file, DelMe}),
+ ok = application:start(sasl),
+ error_logger:tty(false);
+ (stop) ->
+ ok = application:stop(sasl),
+ ok = application:set_env(sasl, sasl_error_logger, erase(old_sasl_l));
+ (fullstop) ->
+ _ = application:stop(sasl)
+ end,
+ %% public_key and ssl are not needed here but started by others so
+ %% stop them when we're done.
+ crypto, public_key, ssl,
+ fun(start) ->
+ ok = application:start(riak_sysmon);
+ (stop) ->
+ ok = application:stop(riak_sysmon),
+ KillDamnFilterProc();
+ (fullstop) ->
+ _ = application:stop(riak_sysmon),
+ KillDamnFilterProc()
+ end,
+ webmachine,
+ fun(start) ->
+ _ = application:load(riak_core),
+ put(old_hand_ip, app_helper:get_env(riak_core, handoff_ip)),
+ put(old_hand_port, app_helper:get_env(riak_core, handoff_port)),
+ ok = application:set_env(riak_core, handoff_ip, "0.0.0.0"),
+ ok = application:set_env(riak_core, handoff_port, 9183),
+ ok = application:start(riak_core);
+ (stop) ->
+ ok = application:stop(riak_core),
+ ok = application:set_env(riak_core, handoff_ip, get(old_hand_ip)),
+ ok = application:set_env(riak_core, handoff_port, get(old_hand_port));
+ (fullstop) ->
+ _ = application:stop(riak_core)
+ end,
+ riak_pipe].
+
+do_dep_apps(fullstop) ->
+ lists:map(fun(A) when is_atom(A) -> _ = application:stop(A);
+ (F) -> F(fullstop)
+ end, lists:reverse(dep_apps()));
+do_dep_apps(StartStop) ->
+ Apps = if StartStop == start -> dep_apps();
+ StartStop == stop -> lists:reverse(dep_apps())
+ end,
+ lists:map(fun(A) when is_atom(A) -> ok = application:StartStop(A);
+ (F) -> F(StartStop)
+ end, Apps).
+
+prepare_runtime() ->
+ fun() ->
+ do_dep_apps(fullstop),
+ timer:sleep(5),
+ do_dep_apps(start),
+ timer:sleep(5),
+ [foo1, foo2]
+ end.
+
+teardown_runtime() ->
+ fun(_PrepareThingie) ->
+ do_dep_apps(stop),
+ timer:sleep(5)
+ end.
+
+basic_test_() ->
+ AllLog = [{log, sink}, {trace, all}],
+ OrderFun = fun(Head, _Sink) ->
+ ok = riak_pipe_vnode:queue_work(Head, 1),
+ riak_pipe_fitting:eoi(Head),
+ ok
+ end,
+ MultBy2 = fun(X) -> 2 * X end,
+ {foreach,
+ prepare_runtime(),
+ teardown_runtime(),
+ [
+ fun(_) ->
+ {"example()",
+ fun() ->
+ {eoi, [{empty_pass, "hello"}], _Trc} =
+ ?MODULE:example()
+ end}
+ end,
+ fun(_) ->
+ {"example_transform()",
+ fun() ->
+ {eoi, [{"generic transform", 55}], []} =
+ ?MODULE:example_transform()
+ end}
+ end,
+ fun(_) ->
+ {"example_reduce()",
+ fun() ->
+ {eoi, Res, []} = ?MODULE:example_reduce(),
+ [{"sum reduce", {a, [55]}},
+ {"sum reduce", {b, [155]}}] = lists:sort(Res)
+ end}
+ end,
+ fun(_) ->
+ {"pipeline order",
+ fun() ->
+ {eoi, Res, Trace} =
+ generic_transform(MultBy2, OrderFun,
+ AllLog, 5),
+ [{_, 32}] = Res,
+ 0 = length(extract_trace_errors(Trace)),
+ Qed = extract_queued(Trace),
+ %% NOTE: The msg to the sink doesn't appear in Trace
+ [1,2,4,8,16] = [X || {_, X} <- Qed]
+ end}
+ end,
+ fun(_) ->
+ {"trace filtering",
+ fun() ->
+ {eoi, _Res, Trace1} =
+ generic_transform(MultBy2, OrderFun,
+ [{log,sink}, {trace, [eoi]}], 5),
+ {eoi, _Res, Trace2} =
+ generic_transform(MultBy2, OrderFun,
+ [{log,sink}, {trace, all}], 5),
+ %% Looking too deeply into the format of the trace
+ %% messages, since they haven't gelled yet, is madness.
+ length(Trace1) < length(Trace2)
+ end}
+ end
+ ]
+ }.
+
+exception_test_() ->
+ AllLog = [{log, sink}, {trace, all}],
+ ErrLog = [{log, sink}, {trace, [error]}],
+ DecrOrCrashFun = fun(0) -> exit(blastoff);
+ (N) -> N - 1
+ end,
+ Sleep1Fun = fun(X) ->
+ timer:sleep(1),
+ X
+ end,
+ XBad1 = fun(Head, _Sink) ->
+ ok = riak_pipe_vnode:queue_work(Head, [1, 2, 3]),
+ ok = riak_pipe_vnode:queue_work(Head, [4, 5, 6]),
+ ok = riak_pipe_vnode:queue_work(Head, [7, 8, bummer]),
+ ok = riak_pipe_vnode:queue_work(Head, [10, 11, 12]),
+ riak_pipe_fitting:eoi(Head),
+ ok
+ end,
+ XBad2 =
+ fun(Head, _Sink) ->
+ [ok = riak_pipe_vnode:queue_work(Head, N) ||
+ N <- lists:seq(0,2)],
+ ok = riak_pipe_vnode:queue_work(Head, 500),
+ exit({success_so_far, collect_results(_Sink, 100)})
+ end,
+ TailWorkerCrash =
+ fun(Head, _Sink) ->
+ ok = riak_pipe_vnode:queue_work(Head, 100),
+ timer:sleep(100),
+ ok = riak_pipe_vnode:queue_work(Head, 1),
+ riak_pipe_fitting:eoi(Head),
+ ok
+ end,
+ VnodeCrash =
+ fun(Head, _Sink) ->
+ ok = riak_pipe_vnode:queue_work(Head, 100),
+ timer:sleep(100),
+ kill_all_pipe_vnodes(),
+ timer:sleep(100),
+ riak_pipe_fitting:eoi(Head),
+ ok
+ end,
+ HeadFittingCrash =
+ fun(Head, _Sink) ->
+ ok = riak_pipe_vnode:queue_work(Head, [1, 2, 3]),
+ (catch riak_pipe_fitting:crash(Head, fun() -> exit(die) end)),
+ {error, worker_startup_failed} =
+ riak_pipe_vnode:queue_work(Head, [4, 5, 6]),
+ %% Again, just for fun ... still fails
+ {error, worker_startup_failed} =
+ riak_pipe_vnode:queue_work(Head, [4, 5, 6]),
+ exit({success_so_far, collect_results(_Sink, 100)})
+ end,
+ MiddleFittingNormal =
+ fun(Head, _Sink) ->
+ ok = riak_pipe_vnode:queue_work(Head, 20),
+ timer:sleep(100),
+ [{_, BuilderPid, _, _}] = riak_pipe_builder_sup:builder_pids(),
+ {ok, FittingPids} = riak_pipe_builder:fitting_pids(BuilderPid),
+
+ %% Aside: exercise riak_pipe_fitting:workers/1.
+ %% There's a single worker on vnode 0, whee.
+ {ok, [0]} = riak_pipe_fitting:workers(hd(FittingPids)),
+
+ %% Aside: send fitting bogus messages
+ gen_fsm:send_event(hd(FittingPids), bogus_message),
+ {error, unknown} =
+ gen_fsm:sync_send_event(hd(FittingPids), bogus_message),
+ gen_fsm:sync_send_all_state_event(hd(FittingPids), bogus_message),
+ hd(FittingPids) ! bogus_message,
+
+ %% Aside: send bogus done message
+ MyRef = Head#fitting.ref,
+ ok = gen_fsm:sync_send_event(hd(FittingPids),
+ {done, MyRef, asdf}),
+
+ Third = lists:nth(3, FittingPids),
+ (catch riak_pipe_fitting:crash(Third, fun() -> exit(normal) end)),
+ Fourth = lists:nth(4, FittingPids),
+ (catch riak_pipe_fitting:crash(Fourth, fun() -> exit(normal) end)),
+ %% This message will be lost in the middle of the pipe,
+ %% but we'll be able to notice it via extract_trace_errors/1.
+ ok = riak_pipe_vnode:queue_work(Head, 30),
+ exit({success_so_far, collect_results(_Sink, 100)})
+ end,
+ MiddleFittingCrash =
+ fun(Head, _Sink) ->
+ ok = riak_pipe_vnode:queue_work(Head, 20),
+ timer:sleep(100),
+ [{_, BuilderPid, _, _}] = riak_pipe_builder_sup:builder_pids(),
+ {ok, FittingPids} = riak_pipe_builder:fitting_pids(BuilderPid),
+ Third = lists:nth(3, FittingPids),
+ (catch riak_pipe_fitting:crash(Third, fun() -> exit(diedie) end)),
+ Fourth = lists:nth(4, FittingPids),
+ (catch riak_pipe_fitting:crash(Fourth, fun() -> exit(diedie) end)),
+ timer:sleep(100), %% try to avoid racing w/pipeline shutdown
+ {error,worker_startup_failed} =
+ riak_pipe_vnode:queue_work(Head, 30),
+ riak_pipe_fitting:eoi(Head),
+ exit({success_so_far, collect_results(_Sink, 100)})
+ end,
+ %% TODO: It isn't clear to me if TailFittingCrash is really any different
+ %% than MiddleFittingCrash. I'm trying to exercise the patch in
+ %% commit cb0447f3c46 but am not having much luck. {sigh}
+ TailFittingCrash =
+ fun(Head, _Sink) ->
+ ok = riak_pipe_vnode:queue_work(Head, 20),
+ timer:sleep(100),
+ [{_, BuilderPid, _, _}] = riak_pipe_builder_sup:builder_pids(),
+ {ok, FittingPids} = riak_pipe_builder:fitting_pids(BuilderPid),
+ Last = lists:last(FittingPids),
+ (catch riak_pipe_fitting:crash(Last, fun() -> exit(diedie) end)),
+ timer:sleep(100), %% try to avoid racing w/pipeline shutdown
+ {error,worker_startup_failed} =
+ riak_pipe_vnode:queue_work(Head, 30),
+ riak_pipe_fitting:eoi(Head),
+ exit({success_so_far, collect_results(_Sink, 100)})
+ end,
+ Send_one_100 =
+ fun(Head, _Sink) ->
+ ok = riak_pipe_vnode:queue_work(Head, 100),
+ %% Sleep so that we don't have workers being shutdown before
+ %% the above work item gets to the end of the pipe.
+ timer:sleep(100),
+ riak_pipe_fitting:eoi(Head)
+ end,
+ Send_onehundred_100 =
+ fun(Head, _Sink) ->
+ [ok = riak_pipe_vnode:queue_work(Head, 100) ||
+ _ <- lists:seq(1,100)],
+ %% Sleep so that we don't have workers being shutdown before
+ %% the above work item gets to the end of the pipe.
+ timer:sleep(100),
+ riak_pipe_fitting:eoi(Head)
+ end,
+ XFormDecrOrCrashFun = fun(Input, Partition, FittingDetails) ->
+ riak_pipe_vnode_worker:send_output(
+ DecrOrCrashFun(Input),
+ Partition,
+ FittingDetails),
+ ok
+ end,
+ {foreach,
+ prepare_runtime(),
+ teardown_runtime(),
+ [fun(_) ->
+ {"generic_transform(XBad1)",
+ fun() ->
+ {eoi, Res, Trace} =
+ generic_transform(fun lists:sum/1, XBad1, ErrLog, 1),
+ [{_, 6}, {_, 15}, {_, 33}] = lists:sort(Res),
+ [{_, {trace, [error], {error, Ps}}}] = Trace,
+ error = proplists:get_value(type, Ps),
+ badarith = proplists:get_value(error, Ps),
+ [7, 8, bummer] = proplists:get_value(input, Ps)
+ end}
+ end,
+ fun(_) ->
+ {"generic_transform(XBad2)",
+ fun() ->
+ {'EXIT', {success_so_far, {timeout, Res, Trace}}} =
+ (catch generic_transform(DecrOrCrashFun,
+ XBad2,
+ ErrLog, 3)),
+ [{_, 497}] = Res,
+ 3 = length(extract_trace_errors(Trace))
+ end}
+ end,
+ fun(_) ->
+ {"generic_transform(TailWorkerCrash)",
+ fun() ->
+ {eoi, Res, Trace} = generic_transform(DecrOrCrashFun,
+ TailWorkerCrash,
+ ErrLog, 2),
+ [{_, 98}] = Res,
+ 1 = length(extract_trace_errors(Trace))
+ end}
+ end,
+ fun(_) ->
+ {"generic_transform(VnodeCrash)",
+ fun() ->
+ {eoi, Res, Trace} = generic_transform(DecrOrCrashFun,
+ VnodeCrash,
+ ErrLog, 2),
+ [{_, 98}] = Res,
+ 0 = length(extract_trace_errors(Trace))
+ end}
+ end,
+ fun(_) ->
+ {"generic_transform(HeadFittingCrash)",
+ fun() ->
+ {'EXIT', {success_so_far, {timeout, Res, Trace}}} =
+ (catch generic_transform(fun lists:sum/1,
+ HeadFittingCrash,
+ ErrLog, 1)),
+ [{_, 6}] = Res,
+ 1 = length(extract_fitting_died_errors(Trace))
+ end}
+ end,
+ fun(_) ->
+ {"generic_transform(MiddleFittingNormal)",
+ fun() ->
+ {'EXIT', {success_so_far, {timeout, Res, Trace}}} =
+ (catch generic_transform(DecrOrCrashFun,
+ MiddleFittingNormal,
+ ErrLog, 5)),
+ [{_, 15}] = Res,
+ 2 = length(extract_fitting_died_errors(Trace)),
+ 1 = length(extract_trace_errors(Trace))
+ end}
+ end,
+ fun(_) ->
+ {"generic_transform(MiddleFittingCrash)",
+ fun() ->
+ {'EXIT', {success_so_far, {timeout, Res, Trace}}} =
+ (catch generic_transform(DecrOrCrashFun,
+ MiddleFittingCrash,
+ ErrLog, 5)),
+ [{_, 15}] = Res,
+ 5 = length(extract_fitting_died_errors(Trace)),
+ 0 = length(extract_trace_errors(Trace))
+ end}
+ end,
+ fun(_) ->
+ {"generic_transform(TailFittingCrash)",
+ fun() ->
+ {'EXIT', {success_so_far, {timeout, Res, Trace}}} =
+ (catch generic_transform(DecrOrCrashFun,
+ TailFittingCrash,
+ ErrLog, 5)),
+ [{_, 15}] = Res,
+ 5 = length(extract_fitting_died_errors(Trace)),
+ 0 = length(extract_trace_errors(Trace))
+ end}
+ end,
+ fun(_) ->
+ {"worker init crash 1",
+ fun() ->
+ {ok, Head, Sink} =
+ riak_pipe:exec(
+ [#fitting_spec{name="init crash",
+ module=riak_pipe_w_crash,
+ arg=init_exit,
+ chashfun=follow}], ErrLog),
+ {error, worker_startup_failed} =
+ riak_pipe_vnode:queue_work(Head, x),
+ riak_pipe_fitting:eoi(Head),
+ {eoi, [], []} = collect_results(Sink, 500)
+ end}
+ end,
+ fun(_) ->
+ {"worker init crash 2 (only init arg differs from #1 above)",
+ fun() ->
+ {ok, Head, Sink} =
+ riak_pipe:exec(
+ [#fitting_spec{name="init crash",
+ module=riak_pipe_w_crash,
+ arg=init_badreturn,
+ chashfun=follow}], ErrLog),
+ {error, worker_startup_failed} =
+ riak_pipe_vnode:queue_work(Head, x),
+ riak_pipe_fitting:eoi(Head),
+ {eoi, [], []} = collect_results(Sink, 500)
+ end}
+ end,
+ fun(_) ->
+ {"worker limit for one pipe",
+ fun() ->
+ PipeLen = 90,
+ {eoi, Res, Trace} = generic_transform(DecrOrCrashFun,
+ Send_one_100,
+ AllLog, PipeLen),
+ [] = Res,
+ Started = [x || {_, {trace, _,
+ {fitting, init_started}}} <- Trace],
+ PipeLen = length(Started),
+ [Ps] = extract_trace_errors(Trace), % exactly one error!
+ %% io:format(user, "Ps = ~p\n", [Ps]),
+ {badmatch,{error,worker_limit_reached}} =
+ proplists:get_value(error, Ps)
+ end}
+ end,
+ fun(_) ->
+ {"worker limit for multiple pipes",
+ fun() ->
+ PipeLen = 90,
+ Spec = lists:duplicate(
+ PipeLen,
+ #fitting_spec{name="worker limit mult pipes",
+ module=riak_pipe_w_xform,
+ arg=XFormDecrOrCrashFun,
+ chashfun=fun zero_part/1}),
+ {ok, Head1, Sink1} =
+ riak_pipe:exec(Spec, AllLog),
+ {ok, Head2, Sink2} =
+ riak_pipe:exec(Spec, AllLog),
+ ok = riak_pipe_vnode:queue_work(Head1, 100),
+ timer:sleep(100),
+ %% At worker limit, can't even start 1st worker @ Head2
+ {error, worker_limit_reached} =
+ riak_pipe_vnode:queue_work(Head2, 100),
+ {timeout, [], Trace1} = collect_results(Sink1, 500),
+ {timeout, [], Trace2} = collect_results(Sink2, 500),
+ [_] = extract_trace_errors(Trace1), % exactly one error!
+ [] = extract_queued(Trace2)
+ end}
+ end,
+ fun(_) ->
+ {"under per-vnode worker limit for 1 pipe + many vnodes",
+ fun() ->
+ %% 20 * Ring size > worker limit, if indeed the worker
+ %% limit were enforced per node instead of per vnode.
+ PipeLen = 20,
+ Spec = lists:duplicate(
+ PipeLen,
+ #fitting_spec{name="foo",
+ module=riak_pipe_w_xform,
+ arg=XFormDecrOrCrashFun,
+ chashfun=fun chash:key_of/1}),
+ {ok, Head1, Sink1} =
+ riak_pipe:exec(Spec, AllLog),
+ [ok = riak_pipe_vnode:queue_work(Head1, X) ||
+ X <- lists:seq(101, 200)],
+ riak_pipe_fitting:eoi(Head1),
+ {eoi, Res, Trace1} = collect_results(Sink1, 500),
+ 100 = length(Res),
+ [] = extract_trace_errors(Trace1)
+ end}
+ end,
+ fun(_) ->
+ {"Per worker queue limit enforcement",
+ fun() ->
+ {eoi, Res, Trace} =
+ generic_transform(Sleep1Fun,
+ Send_onehundred_100,
+ AllLog, 1),
+ 100 = length(Res),
+ Full = length(extract_queue_full(Trace)),
+ NoLongerFull = length(extract_unblocking(Trace)),
+ Full = NoLongerFull
+ end}
+ end
+ ]
+ }.
+
+validate_test_() ->
+ {foreach,
+ prepare_runtime(),
+ teardown_runtime(),
+ [
+ fun(_) ->
+ {"very bad fitting",
+ fun() ->
+ badarg = (catch
+ riak_pipe_fitting:validate_fitting(x))
+ end}
+ end,
+ fun(_) ->
+ {"bad fitting module",
+ fun() ->
+ badarg = (catch
+ riak_pipe_fitting:validate_fitting(
+ #fitting_spec{name=empty_pass,
+ module=does_not_exist,
+ chashfun=fun zero_part/1}))
+ end}
+ end,
+ fun(_) ->
+ {"bad fitting argument",
+ fun() ->
+ badarg = (catch
+ riak_pipe_fitting:validate_fitting(
+ #fitting_spec{name=empty_pass,
+ module=riak_pipe_w_reduce,
+ arg=bogus_arg,
+ chashfun=fun zero_part/1}))
+ end}
+ end,
+ fun(_) ->
+ {"good partfun",
+ fun() ->
+ ok = (catch
+ riak_pipe_fitting:validate_fitting(
+ #fitting_spec{name=empty_pass,
+ module=riak_pipe_w_pass,
+ chashfun=follow}))
+ end}
+ end,
+ fun(_) ->
+ {"bad partfun",
+ fun() ->
+ badarg = (catch
+ riak_pipe_fitting:validate_fitting(
+ #fitting_spec{name=empty_pass,
+ module=riak_pipe_w_pass,
+ chashfun=fun(_,_) -> 0 end}))
+ end}
+ end,
+ fun(_) ->
+ {"format_name coverage",
+ fun() ->
+ <<"foo">> = riak_pipe_fitting:format_name(<<"foo">>),
+ "foo" = riak_pipe_fitting:format_name("foo"),
+ "[foo]" = lists:flatten(
+ riak_pipe_fitting:format_name([foo]))
+ end}
+ end
+ ]}.
+
+should_be_the_very_last_test() ->
+ Leftovers = [{Pid, X} ||
+ Pid <- processes(),
+ {eunit, X} <- element(2, process_info(Pid, dictionary))],
+ [] = Leftovers.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%%% NOTE: Do not put any EUnit tests after should_be_the_very_last_test()
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+-endif. % TEST
View
5 src/riak_pipe_builder.erl
@@ -95,6 +95,11 @@ get_first_fitting(BuilderPid, Ref) ->
init([Spec, Options]) ->
{sink, #fitting{ref=Ref}} = lists:keyfind(sink, 1, Options),
Fittings = start_fittings(Spec, Options),
+ put(eunit, [{module, ?MODULE},
+ {ref, Ref},
+ {spec, Spec},
+ {options, Options},
+ {fittings, Fittings}]),
{ok, wait_pipeline_shutdown,
#state{options=Options,
ref=Ref,
View
17 src/riak_pipe_fitting.erl
@@ -33,6 +33,9 @@
workers/1]).
-export([validate_fitting/1,
format_name/1]).
+-ifdef(TEST).
+-export([crash/2]).
+-endif.
%% gen_fsm callbacks
-export([init/1,
@@ -122,6 +125,13 @@ workers(Fitting) ->
gone
end.
+-ifdef(TEST).
+crash(#fitting{pid=Pid}, Fun) ->
+ gen_fsm:sync_send_all_state_event(Pid, {test_crash, Fun});
+crash(Pid, Fun) ->
+ gen_fsm:sync_send_all_state_event(Pid, {test_crash, Fun}).
+-endif.
+
%%%===================================================================
%%% gen_fsm callbacks
%%%===================================================================
@@ -151,6 +161,10 @@ init([Builder,
?T(Details, [], {fitting, init_finished}),
+ put(eunit, [{module, ?MODULE},
+ {fitting, Fitting},
+ {details, Details},
+ {builder, Builder}]),
{ok, wait_upstream_eoi,
#state{builder=Builder, details=Details, workers=[],
ref=Output#fitting.ref}}.
@@ -286,6 +300,9 @@ handle_event(_Event, StateName, State) ->
handle_sync_event(workers, _From, StateName, #state{workers=Workers}=State) ->
Partitions = [ P || #worker{partition=P} <- Workers ],
{reply, Partitions, StateName, State};
+handle_sync_event({test_crash, Fun},_,_,_) ->
+ %% Only test-enabled client sends this.
+ Fun();
handle_sync_event(_Event, _From, StateName, State) ->
Reply = ok,
{reply, Reply, StateName, State}.
View
2 src/riak_pipe_vnode.erl
@@ -567,6 +567,8 @@ enqueue_internal(#cmd_enqueue{fitting=Fitting, input=Input, timeout=TO,
end;
worker_limit_reached ->
%% TODO: log/trace this event
+ %% Except we don't have details here to associate with a trace
+ %% function: ?T_ERR(WhereToGetDetails, whatever_limit_hit_here),
{reply, {error, worker_limit_reached}, State};
worker_startup_failed ->
%% TODO: log/trace this event
View
60 src/riak_pipe_vnode_worker.erl
@@ -299,6 +299,10 @@ send_output(Output, FromPartition,
| {stop, {init_failed, term(), term()}}.
init([Partition, VnodePid, #fitting_details{module=Module}=FittingDetails]) ->
try
+ put(eunit, [{module, ?MODULE},
+ {partition, Partition},
+ {VnodePid, VnodePid},
+ {details, FittingDetails}]),
{ok, ModState} = Module:init(Partition, FittingDetails),
{ok, initial_input_request,
#state{partition=Partition,
@@ -412,19 +416,38 @@ process_input(Input, UsedPreflist,
#state{details=FD, modstate=ModState}=State) ->
Module = FD#fitting_details.module,
NVal = (FD#fitting_details.fitting)#fitting.nval,
- {Result, NewModState} = Module:process(Input,
- length(UsedPreflist) == NVal,
- ModState),
- case Result of
- ok ->
- ok;
- forward_preflist ->
- forward_preflist(Input, UsedPreflist, State);
- {error, Error} ->
- %% TODO: replace this with T_ERR when merging master
- ?T(FD, [error], {error, {Error, Input}})
- end,
- State#state{modstate=NewModState}.
+ try
+ {Result, NewModState} = Module:process(Input,
+ length(UsedPreflist) == NVal,
+ ModState),
+ case Result of
+ ok ->
+ ok;
+ forward_preflist ->
+ forward_preflist(Input, UsedPreflist, State);
+ {error, RError} ->
+ processing_error(
+ result, RError, FD, ModState, Module, State, Input)
+ end,
+ State#state{modstate=NewModState}
+ catch Type:Error ->
+ processing_error(Type, Error, FD, ModState, Module, State, Input),
+ State
+ end.
+
+%% @private
+processing_error(Type, Error, FD, ModState, Module, State, Input) ->
+ Fields = record_info(fields, fitting_details),
+ FieldPos = lists:zip(Fields, lists:seq(2, length(Fields)+1)),
+ DsList = [{Field, element(Pos, FD)} || {Field, Pos} <- FieldPos],
+ ?T_ERR(FD, [{module, Module},
+ {partition, State#state.partition},
+ {details, DsList},
+ {type, Type},
+ {error, Error},
+ {input, Input},
+ {modstate, ModState},
+ {stack, erlang:get_stacktrace()}]).
%% @doc Process a done (end-of-inputs) message - call the implementing
%% module's `done/1' function.
@@ -477,13 +500,14 @@ reply_archive(Archive, #state{vnode=Vnode, details=Details}) ->
%% input.
-spec forward_preflist(term(), riak_core_apl:preflist(), state()) -> ok.
forward_preflist(Input, UsedPreflist,
- #state{partition=Partition, details=FittingDetails}) ->
+ #state{partition=Partition,
+ details=FittingDetails}=State) ->
case recurse_input(Input, Partition, FittingDetails,
noblock, UsedPreflist) of
ok -> ok;
{error, Error} ->
- %% TODO: replace with T_ERRO when merging master
- ?T(FittingDetails,
- [forward_preflist],
- {error, Error, Input})
+ processing_error(forward_preflist, Error, FittingDetails,
+ State#state.modstate,
+ FittingDetails#fitting_details.module,
+ State, Input)
end.
View
73 src/riak_pipe_w_crash.erl
@@ -0,0 +1,73 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2011 Basho Technologies, Inc.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc It's what we do: crash.
+
+-module(riak_pipe_w_crash).
+-behaviour(riak_pipe_vnode_worker).
+
+-export([init/2,
+ process/3,
+ done/1]).
+
+-include("riak_pipe.hrl").
+-include("riak_pipe_log.hrl").
+
+-record(state, {p :: riak_pipe_vnode:partition(),
+ fd :: riak_pipe_fitting:details()}).
+-opaque state() :: #state{}.
+
+%% @doc Initialization just stows the partition and fitting details in
+%% the module's state, for sending outputs in {@link process/3}.
+-spec init(riak_pipe_vnode:partition(),
+ riak_pipe_fitting:details()) ->
+ {ok, state()}.
+init(Partition, FittingDetails) ->
+ case FittingDetails#fitting_details.arg of
+ init_exit ->
+ exit(crash);
+ init_badreturn ->
+ crash;
+ _ ->
+ {ok, #state{p=Partition, fd=FittingDetails}}
+ end.
+
+%% @doc Process just sends `Input' directly to the next fitting. This
+%% function also generates two trace messages: `{processing,
+%% Input}' before sending the output, and `{processed, Input}' after
+%% the blocking output send has returned. This can be useful for
+%% dropping in another pipeline to watching data move through it.
+-spec process(term(), boolean(), state()) -> {ok, state()}.
+process(Input, _Last, #state{p=Partition, fd=FittingDetails}=State) ->
+ ?T(FittingDetails, [], {processing, Input}),
+ case FittingDetails#fitting_details.arg of
+ Input ->
+ ?T(FittingDetails, [], {crashing, Input}),
+ exit(process_input_crash);
+ _ ->
+ riak_pipe_vnode_worker:send_output(Input, Partition, FittingDetails),
+ ?T(FittingDetails, [], {processed, Input}),
+ {ok, State}
+ end.
+
+%% @doc Unused.
+-spec done(state()) -> ok.
+done(_State) ->
+ ok.
View
10 src/riak_pipe_w_xform.erl
@@ -50,6 +50,7 @@
validate_arg/1]).
-include("riak_pipe.hrl").
+-include("riak_pipe_log.hrl").
-record(state, {p :: riak_pipe_vnode:partition(),
fd :: riak_pipe_fitting:details()}).
@@ -65,14 +66,7 @@ init(Partition, FittingDetails) ->
-spec process(term(), boolean(), state()) -> {ok, state()}.
process(Input, _Last, #state{p=Partition, fd=FittingDetails}=State) ->
Fun = FittingDetails#fitting_details.arg,
- try
- ok = Fun(Input, Partition, FittingDetails)
- catch Type:Error ->
- %%TODO: forward
- error_logger:info_msg("~p:~p xforming:~n ~P~n ~P",
- [Type, Error, Input, 3,
- erlang:get_stacktrace(), 5])
- end,
+ ok = Fun(Input, Partition, FittingDetails),
{ok, State}.
%% @doc Unused.

0 comments on commit d8ccb90

Please sign in to comment.
Something went wrong with that request. Please try again.