forked from basho/riak_pipe
-
Notifications
You must be signed in to change notification settings - Fork 0
/
riak_pipe_w_crash.erl
137 lines (127 loc) · 4.91 KB
/
riak_pipe_w_crash.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2011 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc It's what we do: crash.
-module(riak_pipe_w_crash).
-behaviour(riak_pipe_vnode_worker).
-export([init/2,
process/3,
done/1]).
-include("riak_pipe.hrl").
-include("riak_pipe_log.hrl").
-record(state, {p :: riak_pipe_vnode:partition(),
fd :: riak_pipe_fitting:details()}).
-opaque state() :: #state{}.
-export_type([state/0]).
%% name of the table reMEMbering restarts
-define(MEM, ?MODULE).
%% @doc Initialization just stows the partition and fitting details in
%% the module's state, for sending outputs in {@link process/3}.
-spec init(riak_pipe_vnode:partition(),
riak_pipe_fitting:details()) ->
{ok, state()}.
init(Partition, FittingDetails) ->
case FittingDetails#fitting_details.arg of
init_exit ->
exit(crash);
init_badreturn ->
crash;
init_restartfail ->
case is_restart(Partition, FittingDetails) of
true ->
restart_crash;
false ->
{ok, #state{p=Partition, fd=FittingDetails}}
end;
_ ->
{ok, #state{p=Partition, fd=FittingDetails}}
end.
is_restart(Partition, FittingDetails) ->
Fitting = FittingDetails#fitting_details.fitting,
%% set the fitting coordinator as the heir, such that the ets
%% table survives when this worker exits, but gets cleaned
%% up when the pipeline shuts down
case (catch ets:new(?MEM, [set, {keypos, 1},
named_table, public,
{heir, Fitting#fitting.pid, ok}])) of
{'EXIT',{badarg,_}} ->
%% table was already created
ok;
?MEM ->
%% table is now created
ok
end,
case ets:lookup(?MEM, Partition) of
[] ->
%% no record - not restart;
%% make a record for the next start to check
ets:insert(?MEM, {Partition, true}),
false;
[{Partition, true}] ->
true
end.
%% @doc Process just sends `Input' directly to the next fitting. This
%% function also generates two trace messages: `{processing,
%% Input}' before sending the output, and `{processed, Input}' after
%% the blocking output send has returned. This can be useful for
%% dropping in another pipeline to watching data move through it.
-spec process(term(), boolean(), state()) -> {ok, state()}.
process(Input, _Last, #state{p=Partition, fd=FittingDetails}=State) ->
?T(FittingDetails, [], {processing, Input}),
case FittingDetails#fitting_details.arg of
Input ->
if Input == init_restartfail ->
%% "worker restart failure, input forwarding" test in
%% riak_pipe exploits this timer:sleep to test both
%% moments of forwarding: those items left in a
%% failed-restart worker's queue, as well as those items
%% sent to a worker that is already forwarding
timer:sleep(1000);
true -> ok
end,
?T(FittingDetails, [], {crashing, Input}),
exit(process_input_crash);
{recurse_done_pause, _} ->
%% "restart after eoi" test in riak_pipe uses this
%% behavior see done/1 for more details
case Input of
[_] -> ok;
[_|More] ->
timer:sleep(100),
riak_pipe_vnode_worker:recurse_input(
More, Partition, FittingDetails)
end,
{ok, State};
_Other ->
riak_pipe_vnode_worker:send_output(Input, Partition, FittingDetails),
?T(FittingDetails, [], {processed, Input}),
{ok, State}
end.
%% @doc Unused.
-spec done(state()) -> ok.
done(#state{fd=FittingDetails}) ->
case FittingDetails#fitting_details.arg of
{recurse_done_pause, Time} ->
%% "restart after eoi" test in riak_pipe exploits this
%% sleep to get more input queued while the worker is
%% shutting down
timer:sleep(Time);
_ ->
ok
end.