Skip to content

Commit

Permalink
Merge pull request #59 from bwf-sink-backpressure
Browse files Browse the repository at this point in the history
Allows the sink to receive result/log/eoi messages as gen_fsm events,
and to choose how often they should be synchronous events, allowing the
sink to provide backpressure to the pipe.

Squashed commit of the following:

commit 300088a
Author: Bryan Fink <bryan@basho.com>
Date:   Sun Nov 11 12:00:42 2012 -0500

    rename sink type fsm_sync to fsm

    Using Period=='infinity' is exactly what an 'fsm_async' sink type would
    have been, so use the shorter name (and stop the confusing "sync sink"
    typos).

commit ce90a17
Author: Bryan Fink <bryan@basho.com>
Date:   Sun Nov 11 11:50:44 2012 -0500

    send first fsm sink result syncronously, unless Period==infinity

    Sending the first result synchronously prevents worker restart from
    overwhelming the sink with "first" result messages (because the period
    counter is kept in the process dictionary, which will clear during
    worker restart).

commit e9819b8
Author: Bryan Fink <bryan@basho.com>
Date:   Sat Nov 10 22:36:59 2012 -0500

    only use gen_fsm:sync_send_event periodically

    Waiting for an ack on *every* message to the sink adds a lot of
    overhead. Change {fsm_sink, Timeout} to {fsm_sync, Period, Timeout},
    where Period represents the number of times to use gen_fsm:send_event
    before using gen_fsm:sync_send_event once.

commit eb80d30
Author: Bryan Fink <bryan@basho.com>
Date:   Tue Nov 6 13:56:28 2012 -0500

    fix state name typo in test sink doc

commit cbd15b5
Author: Bryan Fink <bryan@basho.com>
Date:   Fri Nov 2 23:39:12 2012 -0400

    deliver logs and eoi as gen_fsm sync events as well

    Logs have overwhelmed our sinks too, and doing eoi the same way just
    makes the interface more consistent.

commit ae0e1fe
Author: Bryan Fink <bryan@basho.com>
Date:   Fri Nov 2 10:43:59 2012 -0400

    sink backpressure via gen_fsm sync events

    This patch allows a Pipe user to specify the "type" of the sink process.
    This type is used to decide how to send it #pipe_result{} messages.

    If the sink type is 'raw' or unspecified, result messages are delivered
    as regular erlang messages, as they have been before this commit.

    If the sink type is '{fsm_sync, Timeout}', result messages are delivered
    as calls to gen_fsm:sync_send_message/3. The biggest benefit of this
    sink type is that it will provide backpressure to the pipe, to prevent
    the sink's mailbox from being overwhelmed.
  • Loading branch information
beerriot committed Nov 12, 2012
1 parent bb45837 commit 807c632
Show file tree
Hide file tree
Showing 7 changed files with 497 additions and 20 deletions.
215 changes: 214 additions & 1 deletion src/riak_pipe.erl
Expand Up @@ -94,6 +94,7 @@
-type fitting_spec() :: #fitting_spec{}.
-type exec_opts() :: [exec_option()].
-type exec_option() :: {sink, fitting()}
| {sink_type, riak_pipe_sink:sink_type()}
| {trace, all | list() | set() | ordsets:ordset()}
| {log, sink | sasl}.
-type stat() :: {atom(), term()}.
Expand Down Expand Up @@ -158,6 +159,25 @@
%% sink (all output, logging, and trace messages). If specified,
%% `Sink' should be a `#fitting{}' record, filled with the pid of
%% the process prepared to receive these messages.
%%</dd<dt>
%% `{sink_type, Type}'
%%</dt><dd>
%% Specifies the way in which messages are delivered to the
%% sink. If `Type' is the atom `raw', messages are delivered as
%% plain Erlang messages. If `Type' is the tuple `{fsm,
%% Period, Timeout}', messages are delivered by calling {@link
%% gen_fsm:send_event/2} `Period` times with the sink's pid and
%% the result message, then calling {@link
%% gen_fsm:sync_send_event/3} once with the sink's pid, the
%% result message, and the specified timeout. If no `sink_type'
%% option is provided, `Type' defaults to `raw'.
%%
%% Some simple `fsm' period examples: `{fsm, 0, T}` will send
%% every message synchronously, `{fsm, infinity, T}` will send
%% every message asynchronously, `{fsm, 1, T}` will alternate
%% every message, `{fsm, 10, T}` sends ten messages
%% asynchronously then one synchronously. For every period except
%% `infinity' the first message is always sent synchrounously.
%%</dd><dt>
%% `{trace, TraceMatches}'
%%</dt><dd>
Expand Down Expand Up @@ -197,7 +217,9 @@
{ok, Pipe::pipe()}.
exec(Spec, Options) ->
[ riak_pipe_fitting:validate_fitting(F) || F <- Spec ],
CorrectOptions = correct_trace(ensure_sink(Options)),
CorrectOptions = correct_trace(
validate_sink_type(
ensure_sink(Options))),
riak_pipe_builder_sup:new_pipeline(Spec, CorrectOptions).

%% @doc Ensure that the `{sink, Sink}' exec/2 option is defined
Expand Down Expand Up @@ -231,6 +253,16 @@ ensure_sink(Options) ->
throw({invalid_sink, not_fitting})
end.

%% @doc Make sure that the `sink_type' option is valid, if it is set.
-spec validate_sink_type(exec_opts()) -> exec_opts().
validate_sink_type(Options) ->
case riak_pipe_sink:valid_sink_type(Options) of
true ->
Options;
{false, Invalid} ->
throw({invalid_sink_type, Invalid})
end.

%% @doc Validate the trace option. Converts `{trace, list()}' to
%% `{trace, set()}' or `{trace, ordset()}' (as supported by the
%% cluster) for easier comparison later.
Expand Down Expand Up @@ -1691,6 +1723,187 @@ limits_test_() ->
]
}.

sink_type_test_() ->
{foreach,
prepare_runtime(),
teardown_runtime(),
[
fun(_) ->
{"raw",
fun() ->
%% the basics: 'raw' is the default with
%% nothing specified (so all other tests should
%% have covered it), but try specifying it
%% explicitly here
Spec = [#fitting_spec{name=r,
module=riak_pipe_w_pass}],
{ok, P} = riak_pipe:exec(
Spec,
[{sink_type, raw}]),
riak_pipe:queue_work(P, 1),
riak_pipe:eoi(P),
Result = riak_pipe:collect_results(P, 1000),
?assertEqual({eoi, [{r, 1}], []}, Result)
end}
end,
fun(_) ->
{"fsm",
fun() ->
%% riak_pipe_test_sink *only* accepts results
%% delivered as gen_fsm events that are tagged
%% as sync vs async
PipeRef = make_ref(),
{ok, SinkPid} = riak_pipe_test_sink_fsm:start_link(
PipeRef),
Spec = [#fitting_spec{name=fs,
module=riak_pipe_w_pass}],
Sink = #fitting{pid=SinkPid, ref=PipeRef},
{ok, P} = riak_pipe:exec(
Spec,
[{sink, Sink},
{sink_type, {fsm, 0, 5000}}]),
riak_pipe:queue_work(P, {sync, 1}),
riak_pipe:eoi(P),
Result = riak_pipe_test_sink_fsm:get_results(SinkPid),
?assertEqual({eoi, [{fs, {sync, 1}}], []}, Result)
end}
end,
fun(_) ->
{"fsm timeout",
fun() ->
%% purposefully disable acking one output, to
%% trigger the timeout on the
%% gen_fsm:sync_send_event
PipeRef = make_ref(),
SinkOpts = [{skip_ack, [{fst,{sync, 2}}]}],
{ok, SinkPid} = riak_pipe_test_sink_fsm:start_link(
PipeRef, SinkOpts),
Spec = [#fitting_spec{name=fst,
module=riak_pipe_w_pass}],
Sink = #fitting{pid=SinkPid, ref=PipeRef},
{ok, P} = riak_pipe:exec(
Spec,
[{log, sink},
{trace, [error]},
{sink, Sink},
%% a very short timeout, to fit eunit
{sink_type, {fsm, 0, 10}}]),
riak_pipe:queue_work(P, {sync, 1}),
riak_pipe:queue_work(P, {sync, 2}),
riak_pipe:queue_work(P, {sync, 3}),
riak_pipe:eoi(P),
{eoi, Results, Logs} =
riak_pipe_test_sink_fsm:get_results(SinkPid),

%% make sure that all results did make it to the sink
?assertEqual([{fst, {sync, 1}},
{fst, {sync, 2}},
{fst, {sync, 3}}],
lists:sort(Results)),
%% but that we also logged an error...
[{fst,{trace,[error],{error,Props}}}] = Logs,
%% ...about the input "2"...
?assertEqual({sync, 2},
proplists:get_value(input, Props)),
%% ...timing out on its way to the sink
?assertEqual({badmatch,{error,timeout}},
proplists:get_value(error, Props))
end}
end,
fun(_) ->
{"fsm sync period",
fun() ->
%% make sure that the sink messages are sent
%% synchronously on the Period, and
%% asynchronously otherwise
PipeRef = make_ref(),
{ok, SinkPid} = riak_pipe_test_sink_fsm:start_link(
PipeRef, []),
%% force a single worker, to make it easy to
%% test the sync period
ConstantChash = fun(_) -> <<0:160/integer>> end,
Spec = [#fitting_spec{name=fst,
module=riak_pipe_w_pass,
chashfun=ConstantChash}],
Sink = #fitting{pid=SinkPid, ref=PipeRef},
{ok, P} = riak_pipe:exec(
Spec,
[{log, sink},
{trace, [error]},
{sink, Sink},
{sink_type, {fsm, 2, 1000}}]),
riak_pipe:queue_work(P, {sync, 1}),
riak_pipe:queue_work(P, {async, 2}),
riak_pipe:queue_work(P, {async, 3}),
riak_pipe:queue_work(P, {sync, 4}),
riak_pipe:eoi(P),
{eoi, Results, []} =
riak_pipe_test_sink_fsm:get_results(SinkPid),

%% make sure that all results did make it to the sink
%% ('async' sorts before 'sync')
?assertEqual([{fst, {async, 2}},
{fst, {async, 3}},
{fst, {sync, 1}},
{fst, {sync, 4}}],
lists:sort(Results))
end}
end,
fun(_) ->
{"fsm infinity sync period",
fun() ->
%% infinite period means sink results are
%% always delivered asynchronously
PipeRef = make_ref(),
{ok, SinkPid} = riak_pipe_test_sink_fsm:start_link(
PipeRef, []),
%% force a single worker, to make it easy to
%% test the sync period
ConstantChash = fun(_) -> <<0:160/integer>> end,
Spec = [#fitting_spec{name=fst,
module=riak_pipe_w_pass,
chashfun=ConstantChash}],
Sink = #fitting{pid=SinkPid, ref=PipeRef},
{ok, P} = riak_pipe:exec(
Spec,
[{log, sink},
{trace, [error]},
{sink, Sink},
{sink_type, {fsm, infinity, 1000}}]),
riak_pipe:queue_work(P, {async, 1}),
riak_pipe:queue_work(P, {async, 2}),
riak_pipe:queue_work(P, {async, 3}),
riak_pipe:queue_work(P, {async, 4}),
riak_pipe:eoi(P),
{eoi, Results, []} =
riak_pipe_test_sink_fsm:get_results(SinkPid),

%% make sure that all results did make it to the sink
?assertEqual([{fst, {async, 1}},
{fst, {async, 2}},
{fst, {async, 3}},
{fst, {async, 4}}],
lists:sort(Results))
end}
end,
fun(_) ->
{"invalid",
fun() ->
try
Spec = [#fitting_spec{module=riak_pipe_w_pass}],
{ok, P} = riak_pipe:exec(
Spec,
[{sink_type, invalid}]),
riak_pipe:destroy(P),
?assert(false)
catch throw:{invalid_sink_type,
{sink_type, invalid}} ->
ok
end
end}
end
]}.

should_be_the_very_last_test() ->
Leftovers = [{Pid, X} ||
Pid <- processes(),
Expand Down
2 changes: 1 addition & 1 deletion src/riak_pipe_fitting.erl
Expand Up @@ -390,7 +390,7 @@ forward_eoi(#state{details=Details}) ->
?T(Details, [eoi], {fitting, send_eoi}),
case Details#fitting_details.output of
#fitting{chashfun=sink}=Sink ->
riak_pipe_sink:eoi(Sink);
riak_pipe_sink:eoi(Sink, Details#fitting_details.options);
#fitting{}=Fitting ->
riak_pipe_fitting:eoi(Fitting)
end.
Expand Down
4 changes: 2 additions & 2 deletions src/riak_pipe_log.erl
Expand Up @@ -43,9 +43,9 @@ log(#fitting_details{options=O, name=N}, Msg) ->
ok; %% no logging
sink ->
Sink = proplists:get_value(sink, O),
riak_pipe_sink:log(N, Sink, Msg);
riak_pipe_sink:log(N, Sink, Msg, O);
{sink, Sink} ->
riak_pipe_sink:log(N, Sink, Msg);
riak_pipe_sink:log(N, Sink, Msg, O);
lager ->
lager:info(
"~s: ~P",
Expand Down
100 changes: 88 additions & 12 deletions src/riak_pipe_sink.erl
Expand Up @@ -25,30 +25,106 @@
-module(riak_pipe_sink).

-export([
result/3,
log/3,
eoi/1
result/4,
log/4,
eoi/2,
valid_sink_type/1
]).

-include("riak_pipe.hrl").

-export_type([sink_type/0]).
-type sink_type() :: raw
| {fsm, Period::integer(), Timeout::timeout()}.

%% @doc Send a result to the sink (used by worker processes). The
%% result is delivered as a `#pipe_result{}' record in the sink
%% process's mailbox.
-spec result(term(), Sink::riak_pipe:fitting(), term()) -> #pipe_result{}.
result(From, #fitting{pid=Pid, ref=Ref, chashfun=sink}, Output) ->
Pid ! #pipe_result{ref=Ref, from=From, result=Output}.
-spec result(term(), Sink::riak_pipe:fitting(), term(),
riak_pipe:exec_opts()) ->
ok.
result(From, #fitting{pid=Pid, ref=Ref, chashfun=sink}, Output, Opts) ->
send_to_sink(Pid,
#pipe_result{ref=Ref, from=From, result=Output},
sink_type(Opts)).

%% @doc Send a log message to the sink (used by worker processes and
%% fittings). The message is delivered as a `#pipe_log{}' record
%% in the sink process's mailbox.
-spec log(term(), Sink::riak_pipe:fitting(), term()) -> #pipe_log{}.
log(From, #fitting{pid=Pid, ref=Ref, chashfun=sink}, Msg) ->
Pid ! #pipe_log{ref=Ref, from=From, msg=Msg}.
-spec log(term(), Sink::riak_pipe:fitting(), term(), list()) -> #pipe_log{}.
log(From, #fitting{pid=Pid, ref=Ref, chashfun=sink}, Msg, Opts) ->
send_to_sink(Pid, #pipe_log{ref=Ref, from=From, msg=Msg},
sink_type(Opts)).

%% @doc Send an end-of-inputs message to the sink (used by fittings).
%% The message is delivered as a `#pipe_eoi{}' record in the sink
%% process's mailbox.
-spec eoi(Sink::riak_pipe:fitting()) -> #pipe_eoi{}.
eoi(#fitting{pid=Pid, ref=Ref, chashfun=sink}) ->
Pid ! #pipe_eoi{ref=Ref}.
-spec eoi(Sink::riak_pipe:fitting(), list()) -> #pipe_eoi{}.
eoi(#fitting{pid=Pid, ref=Ref, chashfun=sink}, Opts) ->
send_to_sink(Pid, #pipe_eoi{ref=Ref},
sink_type(Opts)).

%% @doc Learn the type of sink we're dealing with from the execution
%% options.
-spec sink_type(riak_pipe:exec_opts()) -> sink_type().
sink_type(Opts) ->
case lists:keyfind(sink_type, 1, Opts) of
{_, Type} ->
Type;
false ->
raw
end.

%% @doc Validate the type of sink given in the execution
%% options. Returns `true' if the type is valid, or `{false, Type}' if
%% invalid, where `Type' is what was found.
-spec valid_sink_type(riak_pipe:exec_opts()) -> true | {false, term()}.
valid_sink_type(Opts) ->
case lists:keyfind(sink_type, 1, Opts) of
{_, {fsm, Period, Timeout}}
when (is_integer(Period) orelse Period == infinity),
(is_integer(Timeout) orelse Timeout == infinity) ->
true;
%% other types as needed (fsm_async, for example) can go here
{_, raw} ->
true;
false ->
true;
Other ->
{false, Other}
end.

%% @doc Do the right kind of communication, given the sink type.
-spec send_to_sink(pid(),
#pipe_result{} | #pipe_log{} | #pipe_eoi{},
sink_type()) ->
ok | {error, term()}.
send_to_sink(Pid, Msg, raw) ->
Pid ! Msg,
ok;
send_to_sink(Pid, Msg, {fsm, Period, Timeout}) ->
case get(sink_sync) of
undefined ->
%% never sync for an 'infinity' Period, but always sync
%% first send for any other Period, to prevent worker
%% restart from overwhelming the sink
send_to_sink_fsm(Pid, Msg, Timeout, Period /= infinity, 0);
Count ->
%% integer is never > than atom, so X is not > 'infinity'
send_to_sink_fsm(Pid, Msg, Timeout, Count >= Period, Count)
end.

send_to_sink_fsm(Pid, Msg, _Timeout, false, Count) ->
gen_fsm:send_event(Pid, Msg),
put(sink_sync, Count+1),
ok;
send_to_sink_fsm(Pid, Msg, Timeout, true, _Count) ->
try
gen_fsm:sync_send_event(Pid, Msg, Timeout),
put(sink_sync, 0),
ok
catch
exit:{timeout,_} -> {error, timeout};
exit:{noproc,_} -> {error, sink_died}
end.

0 comments on commit 807c632

Please sign in to comment.