/
persistent_subscription.ex
186 lines (149 loc) · 5.7 KB
/
persistent_subscription.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
defmodule Volley.PersistentSubscription do
@moduledoc """
A GenStage/Broadway producer for persistent subscriptions
Persistent subscriptions are a feature of EventStoreDB which offload the
responsibilities of tracking processed events, back-pressure, and subscriber
dispatch to the EventStoreDB server. This allows subscribers to more easily
implement complex subscription strategies, such as allowing multiple
subscribers across services without gossip, avoiding head-of-line blocking,
and enabling concurrent and batch processing schemes and rate-limiting.
Broadway features an acknowledgement interface which integrates well with
the persistent subscription `Spear.ack/3` and `Spear.nack/4` system. Consumers
intending to use this producer should prefer writing handlers with
`Broadway` (over `GenStage`) where possible.
## Configuration
* `:broadway?` - (default: `false`) whether to emit events as
`t:Broadway.Message.t/0` messages or as `t:Spear.Event.t/0` events.
`true` should be set for this option if this producer is being used
in a Broadway topology, `false` for use in a `GenStage` pipeline.
When set as `true`, this producer will set the ack interface for
each message to `Volley.PersistentSubscription.Acknowledger` with the
proper connection details for sending acks and nacks. When `false`, the
`Spear.Event.metadata.subscription` field will be replaced with a
`t:Volley.PersistentSubscription.Subscription.t/0` struct with any necessary
connection details.
* `:subscriptions` - (default: `[]`) a list of subscription configurations.
Broadway does not currently allow more than one producer in a topology,
however one may wish to subscribe to multiple persistent subscriptions,
potentially across EventStoreDBs. Since back-pressure is controlled
on the EventStoreDB side, a handler may specify multiple subscriptions
in a single producer without any special considerations. The schema of
each subscription is as follows
* `:connection` - (required) a `t:Spear.Connection.t/0` process which
can either be specified as a PID or any valid `t:GenServer.name/0`
* `:stream_name` - (required) the EventStoreDB stream
* `:group_name` - (required) the EventStoreDB group name
* `:opts` - (default: `[]`) options to pass to
`Spear.connect_to_persistent_subscription/5`. The main use of this
options field is to configure the `:buffer_size` option (default: `1`).
The `:buffer_size` option controls the number of events allowed to be
sent to this producer before any events are acknowledged.
Remaining options are passed to `GenStage.start_link/3` and the
`{:producer, state, opts}` tuple in `c:GenStage.init/1`.
## Examples
```elixir
defmodule MyHandler do
use Broadway
alias Broadway.Message
def start_link(_opts) do
subscription_opts = [
broadway?: true,
subscriptions: [
[
stream_name: "MyStream",
group_name: inspect(__MODULE__),
connection: MyApp.SpearClient,
opts: [
# 10 events allowed in-flight at a time
buffer_size: 10
]
]
]
]
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {Volley.PersistentSubscription, subscription_opts}
],
processors: [
default: [concurrency: 2]
],
batchers: [
default: [concurrency: 1, batch_size: 5]
]
)
end
@impl Broadway
def handle_message(:default, %Message{} = message, _context) do
message
|> Message.put_batcher(:default)
end
@impl Broadway
def handle_batch(:default, messages, _batch_info, context) do
# do something batchy with messages...
end
end
```
"""
use GenStage
import Volley
alias __MODULE__.Subscription
defstruct [:config, subscriptions: %{}]
# coveralls-ignore-start
@doc false
def start_link(opts) do
{start_link_opts, opts} = pop_genserver_opts(opts)
GenStage.start_link(__MODULE__, opts, start_link_opts)
end
# coveralls-ignore-stop
@impl GenStage
def init(opts) do
{producer_opts, opts} = pop_producer_opts(opts)
subscriptions =
opts
|> Keyword.get(:subscriptions, [])
|> Enum.map(&Subscription.from_config/1)
Enum.each(subscriptions, fn sub -> send(self(), {:connect, sub}) end)
config =
opts
|> Map.new()
|> Map.put(:subscriptions, subscriptions)
{:producer, %__MODULE__{config: config}, producer_opts}
end
@impl GenStage
def handle_info({:connect, subscription}, state) do
state =
case Subscription.connect(subscription) do
{:ok, %Subscription{ref: ref} = subscription} ->
put_in(state.subscriptions[ref], subscription)
:error ->
state
end
{:noreply, [], state}
end
def handle_info({:eos, subscription, _reason}, state) do
{%Subscription{} = sub, state} = pop_in(state.subscriptions[subscription])
Subscription.reconnect(sub)
{:noreply, [], state}
end
def handle_info(%Spear.Event{} = event, state) do
{:noreply, [map_event(event, state)], state}
end
@impl GenStage
def handle_demand(_demand, state) do
{:noreply, [], state}
end
if_broadway do
defp map_event(event, %__MODULE__{config: %{broadway?: true}} = state) do
subscription = state.subscriptions[event.metadata.subscription]
%Broadway.Message{
data: event,
acknowledger:
{Volley.PersistentSubscription.Acknowledger, subscription, %{}}
}
end
end
defp map_event(event, state) do
update_in(event.metadata.subscription, &state.subscriptions[&1])
end
end