/
riak_pipe_sink.erl
130 lines (118 loc) · 4.54 KB
/
riak_pipe_sink.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2011 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc Methods for sending messages to the sink.
%%
%% Sink messages are delivered as three record types:
%% `#pipe_result{}', `#pipe_log{}', and `#pipe_eoi{}'.
-module(riak_pipe_sink).
-export([
result/4,
log/4,
eoi/2,
valid_sink_type/1
]).
-include("riak_pipe.hrl").
-export_type([sink_type/0]).
-type sink_type() :: raw
| {fsm, Period::integer(), Timeout::timeout()}.
%% @doc Send a result to the sink (used by worker processes). The
%% result is delivered as a `#pipe_result{}' record in the sink
%% process's mailbox.
-spec result(term(), Sink::riak_pipe:fitting(), term(),
riak_pipe:exec_opts()) ->
ok.
result(From, #fitting{pid=Pid, ref=Ref, chashfun=sink}, Output, Opts) ->
send_to_sink(Pid,
#pipe_result{ref=Ref, from=From, result=Output},
sink_type(Opts)).
%% @doc Send a log message to the sink (used by worker processes and
%% fittings). The message is delivered as a `#pipe_log{}' record
%% in the sink process's mailbox.
-spec log(term(), Sink::riak_pipe:fitting(), term(), list()) -> #pipe_log{}.
log(From, #fitting{pid=Pid, ref=Ref, chashfun=sink}, Msg, Opts) ->
send_to_sink(Pid, #pipe_log{ref=Ref, from=From, msg=Msg},
sink_type(Opts)).
%% @doc Send an end-of-inputs message to the sink (used by fittings).
%% The message is delivered as a `#pipe_eoi{}' record in the sink
%% process's mailbox.
-spec eoi(Sink::riak_pipe:fitting(), list()) -> #pipe_eoi{}.
eoi(#fitting{pid=Pid, ref=Ref, chashfun=sink}, Opts) ->
send_to_sink(Pid, #pipe_eoi{ref=Ref},
sink_type(Opts)).
%% @doc Learn the type of sink we're dealing with from the execution
%% options.
-spec sink_type(riak_pipe:exec_opts()) -> sink_type().
sink_type(Opts) ->
case lists:keyfind(sink_type, 1, Opts) of
{_, Type} ->
Type;
false ->
raw
end.
%% @doc Validate the type of sink given in the execution
%% options. Returns `true' if the type is valid, or `{false, Type}' if
%% invalid, where `Type' is what was found.
-spec valid_sink_type(riak_pipe:exec_opts()) -> true | {false, term()}.
valid_sink_type(Opts) ->
case lists:keyfind(sink_type, 1, Opts) of
{_, {fsm, Period, Timeout}}
when (is_integer(Period) orelse Period == infinity),
(is_integer(Timeout) orelse Timeout == infinity) ->
true;
%% other types as needed (fsm_async, for example) can go here
{_, raw} ->
true;
false ->
true;
Other ->
{false, Other}
end.
%% @doc Do the right kind of communication, given the sink type.
-spec send_to_sink(pid(),
#pipe_result{} | #pipe_log{} | #pipe_eoi{},
sink_type()) ->
ok | {error, term()}.
send_to_sink(Pid, Msg, raw) ->
Pid ! Msg,
ok;
send_to_sink(Pid, Msg, {fsm, Period, Timeout}) ->
case get(sink_sync) of
undefined ->
%% never sync for an 'infinity' Period, but always sync
%% first send for any other Period, to prevent worker
%% restart from overwhelming the sink
send_to_sink_fsm(Pid, Msg, Timeout, Period /= infinity, 0);
Count ->
%% integer is never > than atom, so X is not > 'infinity'
send_to_sink_fsm(Pid, Msg, Timeout, Count >= Period, Count)
end.
send_to_sink_fsm(Pid, Msg, _Timeout, false, Count) ->
gen_fsm:send_event(Pid, Msg),
put(sink_sync, Count+1),
ok;
send_to_sink_fsm(Pid, Msg, Timeout, true, _Count) ->
try
gen_fsm:sync_send_event(Pid, Msg, Timeout),
put(sink_sync, 0),
ok
catch
exit:{timeout,_} -> {error, timeout};
exit:{noproc,_} -> {error, sink_died}
end.