-
Notifications
You must be signed in to change notification settings - Fork 190
/
partition_dispatcher.ex
249 lines (204 loc) · 8.35 KB
/
partition_dispatcher.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
defmodule GenStage.PartitionDispatcher do
@moduledoc """
A dispatcher that sends events according to partitions.
Keep in mind that, if partitions are not evenly distributed,
a backed-up partition will slow all other ones.
## Options
The partition dispatcher accepts the following options
on initialization:
* `:partitions` - the number of partitions to dispatch to. It may be
an integer with a total number of partitions, where each partition
is named from 0 up to `integer - 1`. For example, `partitions: 4`
will contain 4 partitions named 0, 1, 2 and 3.
It may also be an enumerable that specifies the name of every partition.
For instance, `partitions: [:odd, :even]` will build two partitions,
named `:odd` and `:even`.
* `:hash` - the hashing algorithm, which receives the event and returns
a tuple with two elements, containing the event and the partition.
The partition must be one of the partitions specified in `:partitions`
above. The default uses `&:erlang.phash2(&1, Enum.count(partitions))`
on the event to select the partition.
### Examples
To start a producer with four partitions named 0, 1, 2 and 3:
{:producer, state, dispatcher: {GenStage.PartitionDispatcher, partitions: 0..3}}
To start a producer with two partitions named `:odd` and `:even`:
{:producer, state, dispatcher: {GenStage.PartitionDispatcher, partitions: [:odd, :even]}}
## Subscribe options
When subscribing to a `GenStage` with a partition dispatcher the following
option is required:
* `:partition` - the name of the partition. The partition must be one of
the partitions specified in `:partitions` above.
### Examples
The partition function can be given either on `init`'s subscribe_to:
{:consumer, :ok, subscribe_to: [{producer, partition: 0}]}
Or when calling `sync_subscribe`:
GenStage.sync_subscribe(consumer, to: producer, partition: 0)
"""
@behaviour GenStage.Dispatcher
@init {nil, nil, 0}
@doc false
def init(opts) do
partitions =
case Keyword.get(opts, :partitions) do
nil ->
raise ArgumentError, "the enumerable of :partitions is required when using the partition dispatcher"
partitions when is_integer(partitions) ->
0..partitions-1
partitions ->
partitions
end
partitions =
for i <- partitions, into: %{} do
Process.put(i, [])
{i, @init}
end
size = map_size(partitions)
hash = Keyword.get(opts, :hash, &hash(&1, size))
{:ok, {make_ref(), hash, 0, 0, partitions, %{}}}
end
defp hash(event, range) do
{event, :erlang.phash2(event, range)}
end
@doc false
def notify(msg, {tag, hash, waiting, pending, partitions, references}) do
partitions =
Enum.reduce(partitions, partitions, fn
{partition, @init}, acc ->
Map.put(acc, partition, {nil, nil, :queue.in({tag, msg}, :queue.new)})
{partition, {pid, ref, queue}}, acc when not is_integer(queue) ->
Map.put(acc, partition, {pid, ref, :queue.in({tag, msg}, queue)})
{_, {pid, ref, _}}, acc ->
Process.send(pid, {:"$gen_consumer", {self(), ref}, {:notification, msg}}, [:noconnect])
acc
end)
{:ok, {tag, hash, waiting, pending, partitions, references}}
end
@doc false
def subscribe(opts, {pid, ref}, {tag, hash, waiting, pending, partitions, references}) do
partition = Keyword.get(opts, :partition)
case partitions do
%{^partition => {nil, nil, demand_or_queue}} ->
partitions = Map.put(partitions, partition, {pid, ref, demand_or_queue})
references = Map.put(references, ref, partition)
{:ok, 0, {tag, hash, waiting, pending, partitions, references}}
%{^partition => {pid, _, _}} ->
raise ArgumentError, "the partition #{partition} is already taken by #{inspect pid}"
_ when is_nil(partition) ->
raise ArgumentError, "the :partition option is required when subscribing to a producer with partition dispatcher"
_ ->
keys = Map.keys(partitions)
raise ArgumentError, ":partition must be one of #{inspect keys}, got: #{partition}"
end
end
@doc false
def cancel({_, ref}, {tag, hash, waiting, pending, partitions, references}) do
{partition, references} = Map.pop(references, ref)
{_pid, _ref, demand_or_queue} = Map.get(partitions, partition)
partitions = Map.put(partitions, partition, @init)
case demand_or_queue do
demand when is_integer(demand) ->
{:ok, 0, {tag, hash, waiting, pending + demand, partitions, references}}
queue ->
length = count_from_queue(queue, tag, 0)
{:ok, length, {tag, hash, waiting + length, pending, partitions, references}}
end
end
@doc false
def ask(counter, {_, ref}, {tag, hash, waiting, pending, partitions, references}) do
partition = Map.fetch!(references, ref)
{pid, ref, demand_or_queue} = Map.fetch!(partitions, partition)
demand_or_queue =
case demand_or_queue do
demand when is_integer(demand) ->
demand + counter
queue ->
send_from_queue(queue, tag, pid, ref, counter, [])
end
partitions = Map.put(partitions, partition, {pid, ref, demand_or_queue})
already_sent = min(pending, counter)
demand = counter - already_sent
{:ok, demand, {tag, hash, waiting + demand, pending - already_sent, partitions, references}}
end
defp send_from_queue(queue, _tag, pid, ref, 0, acc) do
maybe_send(acc, pid, ref)
queue
end
defp send_from_queue(queue, tag, pid, ref, counter, acc) do
case :queue.out(queue) do
{{:value, {^tag, msg}}, queue} ->
maybe_send(acc, pid, ref)
Process.send(pid, {:"$gen_consumer", {self(), ref}, {:notification, msg}}, [:noconnect])
send_from_queue(queue, tag, pid, ref, counter, [])
{{:value, event}, queue} ->
send_from_queue(queue, tag, pid, ref, counter - 1, [event | acc])
{:empty, _queue} ->
maybe_send(acc, pid, ref)
counter
end
end
defp count_from_queue(queue, tag, counter) do
case :queue.out(queue) do
{{:value, {^tag, _}}, queue} ->
count_from_queue(queue, tag, counter)
{{:value, _}, queue} ->
count_from_queue(queue, tag, counter + 1)
{:empty, _queue} ->
counter
end
end
# Important: events must be in reverse order
defp maybe_send([], _pid, _ref),
do: :ok
defp maybe_send(events, pid, ref),
do: Process.send(pid, {:"$gen_consumer", {self(), ref}, :lists.reverse(events)}, [:noconnect])
@doc false
def dispatch(events, _length, {tag, hash, waiting, pending, partitions, references}) do
{deliver_now, deliver_later, waiting} =
split_events(events, waiting, [])
for event <- deliver_now do
{event, partition} = hash.(event)
Process.put(partition, [event | Process.get(partition)])
end
partitions =
partitions
|> :maps.to_list
|> dispatch_per_partition
|> :maps.from_list
{:ok, deliver_later, {tag, hash, waiting, pending, partitions, references}}
end
defp split_events(events, 0, acc),
do: {:lists.reverse(acc), events, 0}
defp split_events([], counter, acc),
do: {:lists.reverse(acc), [], counter}
defp split_events([event | events], counter, acc),
do: split_events(events, counter - 1, [event | acc])
defp dispatch_per_partition([{partition, {pid, ref, demand_or_queue} = value} | rest]) do
case Process.put(partition, []) do
[] ->
[{partition, value} | dispatch_per_partition(rest)]
events ->
events = :lists.reverse(events)
{events, demand_or_queue} =
case demand_or_queue do
demand when is_integer(demand) ->
split_into_queue(events, demand, [])
queue ->
{[], put_into_queue(events, queue)}
end
maybe_send(events, pid, ref)
[{partition, {pid, ref, demand_or_queue}} | dispatch_per_partition(rest)]
end
end
defp dispatch_per_partition([]) do
[]
end
defp split_into_queue(events, 0, acc),
do: {acc, put_into_queue(events, :queue.new)}
defp split_into_queue([], counter, acc),
do: {acc, counter}
defp split_into_queue([event | events], counter, acc),
do: split_into_queue(events, counter - 1, [event | acc])
defp put_into_queue(events, queue) do
Enum.reduce(events, queue, &:queue.in/2)
end
end