-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
stream.ex
171 lines (146 loc) · 4.46 KB
/
stream.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
defmodule GenEvent.Stream do
@moduledoc """
Defines a `GenEvent` stream.
This is a struct returned by `stream/2`. The struct is public and
contains the following fields:
* `:manager` - the manager reference given to `GenEvent.stream/2`
* `:timeout` - the timeout between events, defaults to `:infinity`
"""
defstruct manager: nil, timeout: :infinity
@type t :: %__MODULE__{
manager: GenEvent.manager,
timeout: timeout}
@doc false
def init({_pid, _ref} = state) do
{:ok, state}
end
@doc false
def handle_event(event, _state) do
# We do this to trick dialyzer to not complain about non-local returns.
case :erlang.phash2(1, 1) do
0 -> exit({:bad_event, event})
1 -> :remove_handler
end
end
@doc false
def handle_call(msg, _state) do
# We do this to trick dialyzer to not complain about non-local returns.
reason = {:bad_call, msg}
case :erlang.phash2(1, 1) do
0 -> exit(reason)
1 -> {:remove_handler, reason}
end
end
@doc false
def handle_info(_msg, state) do
{:ok, state}
end
@doc false
def terminate(_reason, _state) do
:ok
end
@doc false
def code_change(_old, state, _extra) do
{:ok, state}
end
end
defimpl Enumerable, for: GenEvent.Stream do
def reduce(stream, acc, fun) do
start_fun = fn() -> start(stream) end
next_fun = &next(stream, &1)
stop_fun = &stop(stream, &1)
Stream.resource(start_fun, next_fun, stop_fun).(acc, wrap_reducer(fun))
end
def count(_stream) do
{:error, __MODULE__}
end
def member?(_stream, _item) do
{:error, __MODULE__}
end
defp wrap_reducer(fun) do
fn
{:ack, manager, ref, event}, acc ->
send manager, {ref, :ok}
fun.(event, acc)
{:async, _manager, _ref, event}, acc ->
fun.(event, acc)
{:sync, manager, ref, event}, acc ->
try do
fun.(event, acc)
after
send manager, {ref, :ok}
end
end
end
defp start(%{manager: manager} = stream) do
try do
{:ok, {pid, ref}} = :gen.call(manager, self(),
{:add_process_handler, self(), self()}, :infinity)
mon_ref = Process.monitor(pid)
{pid, ref, mon_ref}
catch
:exit, reason -> exit({reason, {__MODULE__, :start, [stream]}})
end
end
defp next(%{timeout: timeout} = stream, {pid, ref, mon_ref} = acc) do
self = self()
receive do
# Got an async event.
{_from, {^pid, ^ref}, {:notify, event}} ->
{[{:async, pid, ref, event}], acc}
# Got a sync event.
{_from, {^pid, ^ref}, {:sync_notify, event}} ->
{[{:sync, pid, ref, event}], acc}
# Got an ack event.
{_from, {^pid, ^ref}, {:ack_notify, event}} ->
{[{:ack, pid, ref, event}], acc}
# The handler was removed. Stop iteration, resolve the
# event later. We need to demonitor now, otherwise DOWN
# appears with higher priority in the shutdown process.
{:gen_event_EXIT, {^pid, ^ref}, _reason} = event ->
Process.demonitor(mon_ref, [:flush])
send(self, event)
{:halt, {:removed, acc}}
# The manager died. Stop iteration, resolve the event later.
{:DOWN, ^mon_ref, _, _, _} = event ->
send(self, event)
{:halt, {:removed, acc}}
after
timeout ->
exit({:timeout, {__MODULE__, :next, [stream, acc]}})
end
end
# If we reach this branch, we know the handler was already
# removed, so we don't trigger a request for doing so.
defp stop(stream, {:removed, {pid, ref, mon_ref} = acc}) do
case wait_for_handler_removal(pid, ref, mon_ref) do
:ok ->
flush_events(ref)
{:error, reason} ->
exit({reason, {__MODULE__, :stop, [stream, acc]}})
end
end
# If we reach this branch, the handler was not removed yet,
# so we trigger a request for doing so.
defp stop(stream, {pid, ref, _} = acc) do
_ = GenEvent.remove_handler(pid, {pid, ref}, :shutdown)
stop(stream, {:removed, acc})
end
defp wait_for_handler_removal(pid, ref, mon_ref) do
receive do
{:gen_event_EXIT, {^pid, ^ref}, _reason} ->
Process.demonitor(mon_ref, [:flush])
:ok
{:DOWN, ^mon_ref, _, _, reason} ->
{:error, reason}
end
end
defp flush_events(ref) do
receive do
{_from, {_pid, ^ref}, {notify, _event}} when notify in [:notify, :ack_notify, :sync_notify] ->
flush_events(ref)
after
0 -> :ok
end
end
end