/
broadcast_dispatcher.ex
197 lines (155 loc) · 5.74 KB
/
broadcast_dispatcher.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
defmodule GenStage.BroadcastDispatcher do
@moduledoc """
A dispatcher that accumulates demand from all consumers
before broadcasting events to all of them.
This dispatcher guarantees that events are dispatched to all
consumers without exceeding the demand of any given consumer.
If a producer uses `GenStage.BroadcastDispatcher`, its subscribers
can specify an optional `:selector` function that receives the event
and returns a boolean in the subscription options.
Assume `producer` and `consumer` are stages exchanging events of type
`%{:key => String.t, any => any}`, then by calling
GenStage.sync_subscribe(consumer,
to: producer,
selector: fn %{key: key} -> String.starts_with?(key, "foo-") end)
`consumer` will receive only the events broadcast from `producer`
for which the selector function returns a truthy value.
The `:selector` option can be specified in sync and async subscriptions,
as well as in the `:subscribe_to` list in the return tuple of
`c:GenStage.init/1`. For example:
def init(:ok) do
{:consumer, :ok, subscribe_to:
[{producer, selector: fn %{key: key} -> String.starts_with?(key, "foo-") end}]}
end
"""
@behaviour GenStage.Dispatcher
require Logger
@doc false
def init(_opts) do
{:ok, {[], 0, MapSet.new()}}
end
@doc false
def info(msg, state) do
send(self(), msg)
{:ok, state}
end
@doc false
def subscribe(opts, {pid, ref}, {demands, waiting, subscribed_processes}) do
selector = validate_selector(opts)
if subscribed?(subscribed_processes, pid) do
Logger.error(fn ->
"#{inspect(pid)} is already registered with #{inspect(self())}. " <>
"This subscription has been discarded."
end)
{:error, :already_subscribed}
else
subscribed_processes = add_subscriber(subscribed_processes, pid)
{:ok, 0, {add_demand(-waiting, pid, ref, selector, demands), waiting, subscribed_processes}}
end
end
@doc false
def cancel({pid, ref}, {demands, waiting, subscribed_processes}) do
# Since we may have removed the process we were waiting on,
# cancellation may actually generate demand!
demands = delete_demand(ref, demands)
new_min = get_min(demands)
demands = adjust_demand(new_min, demands)
subscribed_processes = delete_subscriber(subscribed_processes, pid)
{:ok, new_min, {demands, waiting + new_min, subscribed_processes}}
end
@doc false
def ask(counter, {pid, ref}, {demands, waiting, subscribed_processes}) do
{current, selector, demands} = pop_demand(ref, demands)
demands = add_demand(current + counter, pid, ref, selector, demands)
new_min = get_min(demands)
demands = adjust_demand(new_min, demands)
{:ok, new_min, {demands, waiting + new_min, subscribed_processes}}
end
@doc false
def dispatch(events, _length, {demands, 0, subscribed_processes}) do
{:ok, events, {demands, 0, subscribed_processes}}
end
def dispatch(events, length, {demands, waiting, subscribed_processes}) do
{deliver_now, deliver_later, waiting} = split_events(events, length, waiting)
for {_, pid, ref, selector} <- demands do
selected =
case filter_and_count(deliver_now, selector) do
{selected, 0} ->
selected
{selected, discarded} ->
send(self(), {:"$gen_producer", {pid, ref}, {:ask, discarded}})
selected
end
Process.send(pid, {:"$gen_consumer", {self(), ref}, selected}, [:noconnect])
:ok
end
{:ok, deliver_later, {demands, waiting, subscribed_processes}}
end
defp filter_and_count(messages, nil) do
{messages, 0}
end
defp filter_and_count(messages, selector) do
filter_and_count(messages, selector, [], 0)
end
defp filter_and_count([message | messages], selector, acc, count) do
if selector.(message) do
filter_and_count(messages, selector, [message | acc], count)
else
filter_and_count(messages, selector, acc, count + 1)
end
end
defp filter_and_count([], _selector, acc, count) do
{:lists.reverse(acc), count}
end
defp validate_selector(opts) do
case Keyword.get(opts, :selector) do
nil ->
nil
selector when is_function(selector, 1) ->
selector
other ->
raise ArgumentError,
":selector option must be passed a unary function, got: #{inspect(other)}"
end
end
defp get_min([]), do: 0
defp get_min([{acc, _, _, _} | demands]),
do: demands |> Enum.reduce(acc, fn {val, _, _, _}, acc -> min(val, acc) end) |> max(0)
defp split_events(events, length, counter) when length <= counter do
{events, [], counter - length}
end
defp split_events(events, _length, counter) do
{now, later} = Enum.split(events, counter)
{now, later, 0}
end
defp adjust_demand(0, demands) do
demands
end
defp adjust_demand(min, demands) do
Enum.map(demands, fn {counter, pid, key, selector} ->
{counter - min, pid, key, selector}
end)
end
defp add_demand(counter, pid, ref, selector, demands)
when is_integer(counter) and is_pid(pid) and (is_nil(selector) or is_function(selector, 1)) do
[{counter, pid, ref, selector} | demands]
end
defp pop_demand(ref, demands) do
case List.keytake(demands, ref, 2) do
{{current, _pid, ^ref, selector}, rest} -> {current, selector, rest}
nil -> {0, nil, demands}
end
end
defp delete_demand(ref, demands) do
List.keydelete(demands, ref, 2)
end
defp add_subscriber(subscribed_processes, pid) do
MapSet.put(subscribed_processes, pid)
end
defp delete_subscriber(subscribed_processes, pid) do
MapSet.delete(subscribed_processes, pid)
end
defp subscribed?(subscribed_processes, pid) do
MapSet.member?(subscribed_processes, pid)
end
end