Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Sink type" that can provide backpressure #59

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
215 changes: 214 additions & 1 deletion src/riak_pipe.erl
Expand Up @@ -94,6 +94,7 @@
-type fitting_spec() :: #fitting_spec{}.
-type exec_opts() :: [exec_option()].
-type exec_option() :: {sink, fitting()}
| {sink_type, riak_pipe_sink:sink_type()}
| {trace, all | list() | set() | ordsets:ordset()}
| {log, sink | sasl}.
-type stat() :: {atom(), term()}.
Expand Down Expand Up @@ -158,6 +159,25 @@
%% sink (all output, logging, and trace messages). If specified,
%% `Sink' should be a `#fitting{}' record, filled with the pid of
%% the process prepared to receive these messages.
%%</dd<dt>
%% `{sink_type, Type}'
%%</dt><dd>
%% Specifies the way in which messages are delivered to the
%% sink. If `Type' is the atom `raw', messages are delivered as
%% plain Erlang messages. If `Type' is the tuple `{fsm,
%% Period, Timeout}', messages are delivered by calling {@link
%% gen_fsm:send_event/2} `Period` times with the sink's pid and
%% the result message, then calling {@link
%% gen_fsm:sync_send_event/3} once with the sink's pid, the
%% result message, and the specified timeout. If no `sink_type'
%% option is provided, `Type' defaults to `raw'.
%%
%% Some simple `fsm' period examples: `{fsm, 0, T}` will send
%% every message synchronously, `{fsm, infinity, T}` will send
%% every message asynchronously, `{fsm, 1, T}` will alternate
%% every message, `{fsm, 10, T}` sends ten messages
%% asynchronously then one synchronously. For every period except
%% `infinity' the first message is always sent synchrounously.
%%</dd><dt>
%% `{trace, TraceMatches}'
%%</dt><dd>
Expand Down Expand Up @@ -197,7 +217,9 @@
{ok, Pipe::pipe()}.
exec(Spec, Options) ->
[ riak_pipe_fitting:validate_fitting(F) || F <- Spec ],
CorrectOptions = correct_trace(ensure_sink(Options)),
CorrectOptions = correct_trace(
validate_sink_type(
ensure_sink(Options))),
riak_pipe_builder_sup:new_pipeline(Spec, CorrectOptions).

%% @doc Ensure that the `{sink, Sink}' exec/2 option is defined
Expand Down Expand Up @@ -231,6 +253,16 @@ ensure_sink(Options) ->
throw({invalid_sink, not_fitting})
end.

%% @doc Make sure that the `sink_type' option is valid, if it is set.
-spec validate_sink_type(exec_opts()) -> exec_opts().
validate_sink_type(Options) ->
case riak_pipe_sink:valid_sink_type(Options) of
true ->
Options;
{false, Invalid} ->
throw({invalid_sink_type, Invalid})
end.

%% @doc Validate the trace option. Converts `{trace, list()}' to
%% `{trace, set()}' or `{trace, ordset()}' (as supported by the
%% cluster) for easier comparison later.
Expand Down Expand Up @@ -1691,6 +1723,187 @@ limits_test_() ->
]
}.

sink_type_test_() ->
{foreach,
prepare_runtime(),
teardown_runtime(),
[
fun(_) ->
{"raw",
fun() ->
%% the basics: 'raw' is the default with
%% nothing specified (so all other tests should
%% have covered it), but try specifying it
%% explicitly here
Spec = [#fitting_spec{name=r,
module=riak_pipe_w_pass}],
{ok, P} = riak_pipe:exec(
Spec,
[{sink_type, raw}]),
riak_pipe:queue_work(P, 1),
riak_pipe:eoi(P),
Result = riak_pipe:collect_results(P, 1000),
?assertEqual({eoi, [{r, 1}], []}, Result)
end}
end,
fun(_) ->
{"fsm",
fun() ->
%% riak_pipe_test_sink *only* accepts results
%% delivered as gen_fsm events that are tagged
%% as sync vs async
PipeRef = make_ref(),
{ok, SinkPid} = riak_pipe_test_sink_fsm:start_link(
PipeRef),
Spec = [#fitting_spec{name=fs,
module=riak_pipe_w_pass}],
Sink = #fitting{pid=SinkPid, ref=PipeRef},
{ok, P} = riak_pipe:exec(
Spec,
[{sink, Sink},
{sink_type, {fsm, 0, 5000}}]),
riak_pipe:queue_work(P, {sync, 1}),
riak_pipe:eoi(P),
Result = riak_pipe_test_sink_fsm:get_results(SinkPid),
?assertEqual({eoi, [{fs, {sync, 1}}], []}, Result)
end}
end,
fun(_) ->
{"fsm timeout",
fun() ->
%% purposefully disable acking one output, to
%% trigger the timeout on the
%% gen_fsm:sync_send_event
PipeRef = make_ref(),
SinkOpts = [{skip_ack, [{fst,{sync, 2}}]}],
{ok, SinkPid} = riak_pipe_test_sink_fsm:start_link(
PipeRef, SinkOpts),
Spec = [#fitting_spec{name=fst,
module=riak_pipe_w_pass}],
Sink = #fitting{pid=SinkPid, ref=PipeRef},
{ok, P} = riak_pipe:exec(
Spec,
[{log, sink},
{trace, [error]},
{sink, Sink},
%% a very short timeout, to fit eunit
{sink_type, {fsm, 0, 10}}]),
riak_pipe:queue_work(P, {sync, 1}),
riak_pipe:queue_work(P, {sync, 2}),
riak_pipe:queue_work(P, {sync, 3}),
riak_pipe:eoi(P),
{eoi, Results, Logs} =
riak_pipe_test_sink_fsm:get_results(SinkPid),

%% make sure that all results did make it to the sink
?assertEqual([{fst, {sync, 1}},
{fst, {sync, 2}},
{fst, {sync, 3}}],
lists:sort(Results)),
%% but that we also logged an error...
[{fst,{trace,[error],{error,Props}}}] = Logs,
%% ...about the input "2"...
?assertEqual({sync, 2},
proplists:get_value(input, Props)),
%% ...timing out on its way to the sink
?assertEqual({badmatch,{error,timeout}},
proplists:get_value(error, Props))
end}
end,
fun(_) ->
{"fsm sync period",
fun() ->
%% make sure that the sink messages are sent
%% synchronously on the Period, and
%% asynchronously otherwise
PipeRef = make_ref(),
{ok, SinkPid} = riak_pipe_test_sink_fsm:start_link(
PipeRef, []),
%% force a single worker, to make it easy to
%% test the sync period
ConstantChash = fun(_) -> <<0:160/integer>> end,
Spec = [#fitting_spec{name=fst,
module=riak_pipe_w_pass,
chashfun=ConstantChash}],
Sink = #fitting{pid=SinkPid, ref=PipeRef},
{ok, P} = riak_pipe:exec(
Spec,
[{log, sink},
{trace, [error]},
{sink, Sink},
{sink_type, {fsm, 2, 1000}}]),
riak_pipe:queue_work(P, {sync, 1}),
riak_pipe:queue_work(P, {async, 2}),
riak_pipe:queue_work(P, {async, 3}),
riak_pipe:queue_work(P, {sync, 4}),
riak_pipe:eoi(P),
{eoi, Results, []} =
riak_pipe_test_sink_fsm:get_results(SinkPid),

%% make sure that all results did make it to the sink
%% ('async' sorts before 'sync')
?assertEqual([{fst, {async, 2}},
{fst, {async, 3}},
{fst, {sync, 1}},
{fst, {sync, 4}}],
lists:sort(Results))
end}
end,
fun(_) ->
{"fsm infinity sync period",
fun() ->
%% infinite period means sink results are
%% always delivered asynchronously
PipeRef = make_ref(),
{ok, SinkPid} = riak_pipe_test_sink_fsm:start_link(
PipeRef, []),
%% force a single worker, to make it easy to
%% test the sync period
ConstantChash = fun(_) -> <<0:160/integer>> end,
Spec = [#fitting_spec{name=fst,
module=riak_pipe_w_pass,
chashfun=ConstantChash}],
Sink = #fitting{pid=SinkPid, ref=PipeRef},
{ok, P} = riak_pipe:exec(
Spec,
[{log, sink},
{trace, [error]},
{sink, Sink},
{sink_type, {fsm, infinity, 1000}}]),
riak_pipe:queue_work(P, {async, 1}),
riak_pipe:queue_work(P, {async, 2}),
riak_pipe:queue_work(P, {async, 3}),
riak_pipe:queue_work(P, {async, 4}),
riak_pipe:eoi(P),
{eoi, Results, []} =
riak_pipe_test_sink_fsm:get_results(SinkPid),

%% make sure that all results did make it to the sink
?assertEqual([{fst, {async, 1}},
{fst, {async, 2}},
{fst, {async, 3}},
{fst, {async, 4}}],
lists:sort(Results))
end}
end,
fun(_) ->
{"invalid",
fun() ->
try
Spec = [#fitting_spec{module=riak_pipe_w_pass}],
{ok, P} = riak_pipe:exec(
Spec,
[{sink_type, invalid}]),
riak_pipe:destroy(P),
?assert(false)
catch throw:{invalid_sink_type,
{sink_type, invalid}} ->
ok
end
end}
end
]}.

should_be_the_very_last_test() ->
Leftovers = [{Pid, X} ||
Pid <- processes(),
Expand Down
2 changes: 1 addition & 1 deletion src/riak_pipe_fitting.erl
Expand Up @@ -390,7 +390,7 @@ forward_eoi(#state{details=Details}) ->
?T(Details, [eoi], {fitting, send_eoi}),
case Details#fitting_details.output of
#fitting{chashfun=sink}=Sink ->
riak_pipe_sink:eoi(Sink);
riak_pipe_sink:eoi(Sink, Details#fitting_details.options);
#fitting{}=Fitting ->
riak_pipe_fitting:eoi(Fitting)
end.
Expand Down
4 changes: 2 additions & 2 deletions src/riak_pipe_log.erl
Expand Up @@ -43,9 +43,9 @@ log(#fitting_details{options=O, name=N}, Msg) ->
ok; %% no logging
sink ->
Sink = proplists:get_value(sink, O),
riak_pipe_sink:log(N, Sink, Msg);
riak_pipe_sink:log(N, Sink, Msg, O);
{sink, Sink} ->
riak_pipe_sink:log(N, Sink, Msg);
riak_pipe_sink:log(N, Sink, Msg, O);
lager ->
lager:info(
"~s: ~P",
Expand Down
100 changes: 88 additions & 12 deletions src/riak_pipe_sink.erl
Expand Up @@ -25,30 +25,106 @@
-module(riak_pipe_sink).

-export([
result/3,
log/3,
eoi/1
result/4,
log/4,
eoi/2,
valid_sink_type/1
]).

-include("riak_pipe.hrl").

-export_type([sink_type/0]).
-type sink_type() :: raw
| {fsm, Period::integer(), Timeout::timeout()}.

%% @doc Send a result to the sink (used by worker processes). The
%% result is delivered as a `#pipe_result{}' record in the sink
%% process's mailbox.
-spec result(term(), Sink::riak_pipe:fitting(), term()) -> #pipe_result{}.
result(From, #fitting{pid=Pid, ref=Ref, chashfun=sink}, Output) ->
Pid ! #pipe_result{ref=Ref, from=From, result=Output}.
-spec result(term(), Sink::riak_pipe:fitting(), term(),
riak_pipe:exec_opts()) ->
ok.
result(From, #fitting{pid=Pid, ref=Ref, chashfun=sink}, Output, Opts) ->
send_to_sink(Pid,
#pipe_result{ref=Ref, from=From, result=Output},
sink_type(Opts)).

%% @doc Send a log message to the sink (used by worker processes and
%% fittings). The message is delivered as a `#pipe_log{}' record
%% in the sink process's mailbox.
-spec log(term(), Sink::riak_pipe:fitting(), term()) -> #pipe_log{}.
log(From, #fitting{pid=Pid, ref=Ref, chashfun=sink}, Msg) ->
Pid ! #pipe_log{ref=Ref, from=From, msg=Msg}.
-spec log(term(), Sink::riak_pipe:fitting(), term(), list()) -> #pipe_log{}.
log(From, #fitting{pid=Pid, ref=Ref, chashfun=sink}, Msg, Opts) ->
send_to_sink(Pid, #pipe_log{ref=Ref, from=From, msg=Msg},
sink_type(Opts)).

%% @doc Send an end-of-inputs message to the sink (used by fittings).
%% The message is delivered as a `#pipe_eoi{}' record in the sink
%% process's mailbox.
-spec eoi(Sink::riak_pipe:fitting()) -> #pipe_eoi{}.
eoi(#fitting{pid=Pid, ref=Ref, chashfun=sink}) ->
Pid ! #pipe_eoi{ref=Ref}.
-spec eoi(Sink::riak_pipe:fitting(), list()) -> #pipe_eoi{}.
eoi(#fitting{pid=Pid, ref=Ref, chashfun=sink}, Opts) ->
send_to_sink(Pid, #pipe_eoi{ref=Ref},
sink_type(Opts)).

%% @doc Learn the type of sink we're dealing with from the execution
%% options.
-spec sink_type(riak_pipe:exec_opts()) -> sink_type().
sink_type(Opts) ->
case lists:keyfind(sink_type, 1, Opts) of
{_, Type} ->
Type;
false ->
raw
end.

%% @doc Validate the type of sink given in the execution
%% options. Returns `true' if the type is valid, or `{false, Type}' if
%% invalid, where `Type' is what was found.
-spec valid_sink_type(riak_pipe:exec_opts()) -> true | {false, term()}.
valid_sink_type(Opts) ->
case lists:keyfind(sink_type, 1, Opts) of
{_, {fsm, Period, Timeout}}
when (is_integer(Period) orelse Period == infinity),
(is_integer(Timeout) orelse Timeout == infinity) ->
true;
%% other types as needed (fsm_async, for example) can go here
{_, raw} ->
true;
false ->
true;
Other ->
{false, Other}
end.

%% @doc Do the right kind of communication, given the sink type.
-spec send_to_sink(pid(),
#pipe_result{} | #pipe_log{} | #pipe_eoi{},
sink_type()) ->
ok | {error, term()}.
send_to_sink(Pid, Msg, raw) ->
Pid ! Msg,
ok;
send_to_sink(Pid, Msg, {fsm, Period, Timeout}) ->
case get(sink_sync) of
undefined ->
%% never sync for an 'infinity' Period, but always sync
%% first send for any other Period, to prevent worker
%% restart from overwhelming the sink
send_to_sink_fsm(Pid, Msg, Timeout, Period /= infinity, 0);
Count ->
%% integer is never > than atom, so X is not > 'infinity'
send_to_sink_fsm(Pid, Msg, Timeout, Count >= Period, Count)
end.

send_to_sink_fsm(Pid, Msg, _Timeout, false, Count) ->
gen_fsm:send_event(Pid, Msg),
put(sink_sync, Count+1),
ok;
send_to_sink_fsm(Pid, Msg, Timeout, true, _Count) ->
try
gen_fsm:sync_send_event(Pid, Msg, Timeout),
put(sink_sync, 0),
ok
catch
exit:{timeout,_} -> {error, timeout};
exit:{noproc,_} -> {error, sink_died}
end.