Permalink
Browse files

first fitting pulse test to produce an error

If a fitting has no active workers when eoi arrives, it may evaluate
the worker behavior in-process. This might cause the fitting to
exit non-normal for any number of reasons, but PULSE found specifically
that the sink exiting while a synchronous message is being sent to it
(fsm-type sink, on the sync period), the gen_fsm call will exit
abnormally, but not necessarily in any of the ways already caught.
The specific exit seen here was {normal, {gen...}}.
  • Loading branch information...
Bryan Fink
Bryan Fink committed Apr 18, 2013
1 parent 7d8e638 commit 1bf171f64d51c22cf5ca4df4bbdf29c6af492221
View
@@ -2,6 +2,7 @@ REPO ?= riak_pipe
RIAK_TAG = $(shell git describe --tags)
REVISION ?= $(shell echo $(RIAK_TAG) | sed -e 's/^$(REPO)-//')
PKG_VERSION ?= $(shell echo $(REVISION) | tr - .)
+PULSE_TESTS = reduce_fitting_pulse
.PHONY: deps
@@ -22,6 +23,12 @@ distclean: clean ballclean
test: all
./rebar skip_deps=true eunit
+# You should 'clean' before your first run of this target
+# so that deps get built with PULSE where needed.
+pulse:
+ ./rebar compile -D PULSE
+ ./rebar eunit -D PULSE skip_deps=true suite=$(PULSE_TESTS)
+
docs:
./rebar skip_deps=true doc
@@ -46,6 +46,14 @@
-include("riak_pipe.hrl").
-include("riak_pipe_debug.hrl").
+-ifdef(PULSE).
+-include_lib("pulse/include/pulse.hrl").
+%% have to transform the 'receive' of the work results
+-compile({parse_transform, pulse_instrument}).
+%% don't trasnform toplevel test functions
+-compile({pulse_replace_module,[{gen_fsm,pulse_gen_fsm}]}).
+-endif.
+
-record(state, {options :: riak_pipe:exec_opts(),
pipe :: #pipe{},
alive :: [{#fitting{}, reference()}], % monitor ref
@@ -38,6 +38,14 @@
-include("riak_pipe.hrl").
+-ifdef(PULSE).
+-include_lib("pulse/include/pulse.hrl").
+%% have to transform the 'receive' of the work results
+-compile({parse_transform, pulse_instrument}).
+%% don't trasnform toplevel test functions
+-compile({pulse_replace_module,[{supervisor,pulse_supervisor}]}).
+-endif.
+
-define(SERVER, ?MODULE).
%%%===================================================================
@@ -52,6 +52,14 @@
-include_lib("eunit/include/eunit.hrl").
-endif.
+-ifdef(PULSE).
+-include_lib("pulse/include/pulse.hrl").
+%% have to transform the 'receive' of the work results
+-compile({parse_transform, pulse_instrument}).
+%% don't trasnform toplevel test functions
+-compile({pulse_replace_module,[{gen_fsm,pulse_gen_fsm}]}).
+-endif.
+
-record(worker, {partition :: riak_pipe_vnode:partition(),
pid :: pid(),
monitor :: reference()}).
@@ -34,6 +34,14 @@
-include("riak_pipe.hrl").
-include("riak_pipe_debug.hrl").
+-ifdef(PULSE).
+-include_lib("pulse/include/pulse.hrl").
+%% have to transform the 'receive' of the work results
+-compile({parse_transform, pulse_instrument}).
+%% don't trasnform toplevel test functions
+-compile({pulse_replace_module,[{supervisor,pulse_supervisor}]}).
+-endif.
+
-define(SERVER, ?MODULE).
%%%===================================================================
View
@@ -33,6 +33,14 @@
-include("riak_pipe.hrl").
+-ifdef(PULSE).
+-include_lib("pulse/include/pulse.hrl").
+%% have to transform the 'receive' of the work results
+-compile({parse_transform, pulse_instrument}).
+%% don't trasnform toplevel test functions
+-compile({pulse_replace_module,[{gen_fsm,pulse_gen_fsm}]}).
+-endif.
+
-export_type([sink_type/0]).
-type sink_type() :: raw
| {fsm, Period::integer(), Timeout::timeout()}.
@@ -0,0 +1,269 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc Test riak_pipe_fitting for no-input, reduce-once-anyway
+%% behavior, related to riak_pipe#48 and riak_pipe#49.
+-module(reduce_fitting_pulse).
+
+-include("riak_pipe.hrl").
+-include("riak_pipe_log.hrl").
+
+-behaviour(riak_pipe_vnode_worker).
+-ifdef(EQC).
+-include_lib("eqc/include/eqc.hrl").
+-endif.
+-include_lib("eunit/include/eunit.hrl").
+
+%% riak_pipe_worker behavior
+-export([no_input_run_reduce_once/0,
+ init/2,
+ process/3,
+ done/1]).
+
+%% console debugging convenience
+-compile(export_all).
+
+-ifdef(PULSE).
+-include_lib("pulse/include/pulse.hrl").
+%% have to transform the 'receive' of the work results
+-compile({parse_transform, pulse_instrument}).
+%% don't trasnform toplevel test functions
+-compile({pulse_skip,[{death_test_,0}]}).
+-endif.
+
+%%% Worker Definition
+
+no_input_run_reduce_once() ->
+ true.
+
+init(Partition, #fitting_details{}=Details) ->
+ maybe_send_trace(Details),
+ {ok, {Partition, Details}}.
+
+process(_Input, _Last, State) ->
+ %% we're only concerned with the zero-worker case, which is
+ %% easiest to replicate with zero inputs
+ throw(this_should_not_be_part_of_the_test),
+ {ok, State}.
+
+done(State) ->
+ maybe_send_output(State),
+ ok.
+
+maybe_send_trace({_, #fitting_details{arg=Arg}=Details}) ->
+ case lists:member(send_trace, Arg) of
+ true ->
+ ?T(Details, [maybe_send_trace], sending_trace);
+ false ->
+ ok
+ end.
+
+maybe_send_output({Partition, #fitting_details{arg=Arg}=Details}) ->
+ case lists:member(send_output, Arg) of
+ true ->
+ riak_pipe_vnode_worker:send_output(
+ an_output, Partition, Details);
+ false ->
+ ok
+ end.
+
+%% none of these tests make sense if PULSE is not used
+-ifdef(PULSE).
+
+%% @doc Nothing should ever cause the fitting to exit abnormally
+prop_fitting_dies_normal() ->
+ ?FORALL({Seed, Trace, Output,
+ {Eoi, Destroy, AlsoDestroySink, ExitPoint}},
+ {pulse:seed(), bool(), bool(),
+ ?LET({Eoi, Destroy},
+ {bool(), bool()},
+ {Eoi, Destroy,
+ oneof([Destroy, false]),
+ oneof([before_eoi]
+ ++ [ after_eoi || Eoi]
+ ++ [ after_destroy || Destroy]
+ ++ [ never || Eoi or Destroy])})},
+ collect({Trace, Output, Eoi, Destroy, AlsoDestroySink, ExitPoint},
+ begin
+ ExitReasons =
+ fitting_exit_reason(
+ Seed, Trace, Output,
+ Eoi, Destroy, AlsoDestroySink, ExitPoint),
+ ?WHENFAIL(
+ io:format(user, "Exit Reasons: ~p~n", [ExitReasons]),
+ exit_reason_is_normalish(
+ proplists:get_value(fitting, ExitReasons)) andalso
+ proplists:get_value(client, ExitReasons) == ExitPoint)
+ end)).
+
+exit_reason_is_normalish(normal) ->
+ true;
+exit_reason_is_normalish(shutdown) ->
+ true.
+
+fitting_exit_reason(Seed, Trace, Output,
+ Eoi, Destroy, AlsoDestroySink, ExitPoint) ->
+ pulse:run_with_seed(
+ fun() ->
+ fitting_exit_reason(Trace, Output,
+ Eoi, Destroy, AlsoDestroySink, ExitPoint)
+ end,
+ Seed).
+
+fitting_exit_reason(Trace, Output,
+ Eoi, Destroy, AlsoDestroySink, ExitPoint) ->
+ riak_pipe_builder_sup:start_link(),
+ riak_pipe_fitting_sup:start_link(),
+ reduce_fitting_pulse_sink_sup:start_link(),
+
+ erlang:process_flag(trap_exit, true),
+ ClientRef = make_ref(),
+ Self = self(),
+ Client = spawn_link(
+ fun() ->
+ pipe_client({ClientRef, Self}, Trace, Output,
+ Eoi, Destroy, AlsoDestroySink, ExitPoint)
+ end),
+
+ Pipe = receive
+ {ClientRef, PipeDef} ->
+ PipeDef
+ end,
+
+ %% monitor before the client proceeds with the test, to
+ %% ensure we get the actual exit reason, instead of a
+ %% bogus noproc
+ FittingMonitor = monitor(process, fitting_process(Pipe)),
+ BuilderMonitor = monitor(process, builder_process(Pipe)),
+ SinkMonitor = monitor(process, sink_process(Pipe)),
+ Client ! {ClientRef, ok},
+
+ Reasons = receive_exits([{fitting, FittingMonitor},
+ {builder, BuilderMonitor},
+ {sink, SinkMonitor},
+ {client, Client}]),
+
+ unlink(whereis(reduce_fitting_pulse_sink_sup)),
+ exit(whereis(reduce_fitting_pulse_sink_sup), kill),
+ unlink(whereis(riak_pipe_fitting_sup)),
+ exit(whereis(riak_pipe_fitting_sup), kill),
+ unlink(whereis(riak_pipe_builder_sup)),
+ exit(whereis(riak_pipe_builder_sup), kill),
+ Reasons.
+
+fitting_process(#pipe{fittings=[{fake_reduce, #fitting{pid=Pid}}]}) ->
+ Pid.
+
+builder_process(#pipe{builder=Builder}) ->
+ Builder.
+
+sink_process(#pipe{sink=#fitting{pid=Pid}}) ->
+ Pid.
+
+receive_exits([]) ->
+ [];
+receive_exits(Waiting) ->
+ {Tag, Reason} = receive
+ {'DOWN', Monitor, process, _Pid, R} ->
+ {Monitor, R};
+ {'EXIT', Pid, R} ->
+ {Pid, R}
+ end,
+ case lists:keytake(Tag, 2, Waiting) of
+ {value, {Name, Tag}, Rest} ->
+ [{Name, Reason}|receive_exits(Rest)];
+ false ->
+ receive_exits(Waiting)
+ end.
+
+pipe_client({Ref, Test}, Trace, Output,
+ Eoi, Destroy, AlsoDestroySink, ExitPoint) ->
+ %% mimicking riak_kv here, using a supervisor for the sink instead
+ %% of bare linking, in case that makes a difference (though it
+ %% doesn't seem to)
+ {ok, Sink} = reduce_fitting_pulse_sink_sup:start_sink(self(), Ref),
+
+ {ok, Pipe} = riak_pipe:exec(
+ [#fitting_spec{name=fake_reduce,
+ module=?MODULE,
+ arg=[send_trace || Trace]++
+ [send_output || Output]}],
+ [{sink, #fitting{pid=Sink, ref=Ref, chashfun=sink}},
+ {sink_type, {fsm, 1, infinity}}]),
+
+ reduce_fitting_pulse_sink:use_pipe(Sink, Ref, Pipe),
+
+ Test ! {Ref, Pipe},
+ receive {Ref, ok} -> ok end,
+
+ maybe_exit(before_eoi, ExitPoint),
+
+ case Eoi of
+ true ->
+ riak_pipe:eoi(Pipe),
+ maybe_exit(after_eoi, ExitPoint);
+ false -> ok
+ end,
+
+ case Destroy of
+ true ->
+ riak_pipe:destroy(Pipe),
+ case AlsoDestroySink of
+ true ->
+ reduce_fitting_pulse_sink_sup:terminate_sink(Sink);
+ false ->
+ ok
+ end,
+ maybe_exit(after_destroy, ExitPoint);
+ false ->
+ %% we don't care if this call fails, just that it doesn't
+ %% return until the sink has finished its work; mimicking
+ %% riak_kv endpoints that don't wait for results if they
+ %% destroy
+ catch reduce_fitting_pulse_sink:all_results(Sink, Ref)
+ end,
+
+ %% we can't let the exit be 'normal', because the test process
+ %% will never see it (it's linking not monitoring)
+ exit(ExitPoint).
+
+maybe_exit(Now, Now) ->
+ exit(Now);
+maybe_exit(_Now, _NotNow) ->
+ ok.
+
+death_test_() ->
+ {setup,
+ fun() ->
+ error_logger:tty(false),
+ pulse:start()
+ end,
+ fun(_) ->
+ pulse:stop(),
+ error_logger:tty(true)
+ end,
+ {timeout, 60,
+ [
+ ?_assert(eqc:quickcheck(
+ eqc:numtests(5000, prop_fitting_dies_normal())))
+ ]}
+ }.
+
+-endif.
Oops, something went wrong.

0 comments on commit 1bf171f

Please sign in to comment.