-
Notifications
You must be signed in to change notification settings - Fork 269
/
notifications.ex
318 lines (237 loc) · 9.81 KB
/
notifications.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
defmodule Postgrex.Notifications do
@moduledoc ~S"""
API for notifications (pub/sub) in PostgreSQL.
In order to use it, first you need to start the notification process.
In your supervision tree:
{Postgrex.Notifications, name: MyApp.Notifications}
Then you can listen to certain channels:
{:ok, listen_ref} = Postgrex.Notifications.listen(MyApp.Notifications, "channel")
Now every time a message is broadcast on said channel, for example via
PostgreSQL command line:
NOTIFY "channel", "Oh hai!";
You will receive a message in the format:
{:notification, notification_pid, listen_ref, channel, message}
## Async connect, auto-reconnects and missed notifications
By default, the notification system establishes a connection to the
database on initialization, you can configure the connection to happen
asynchronously. You can also configure the connection to automatically
reconnect.
Note however that when the notification system is waiting for a connection,
any notifications that occur during the disconnection period are not queued
and cannot be recovered. Similarly, any listen command will be queued until
the connection is up.
There is a race condition between starting to listen and notifications being
issued "at the same time", as explained [in the PostgreSQL documentation](https://www.postgresql.org/docs/current/sql-listen.html).
If your application needs to keep a consistent representation of data, follow
the three-step approach of first subscribing, then obtaining the current
state of data, then handling the incoming notifications.
Beware that the same
race condition applies to auto-reconnects. A simple way of dealing with this
issue is not using the auto-reconnect feature directly, but monitoring and
re-starting the Notifications process, then subscribing to channel messages
over again, using the same three-step approach.
## A note on casing
While PostgreSQL seems to behave as case-insensitive, it actually has a very
peculiar behaviour on casing. When you write:
SELECT * FROM POSTS
PostgreSQL actually converts `POSTS` into the lowercase `posts`. That's why
both `SELECT * FROM POSTS` and `SELECT * FROM posts` feel equivalent.
However, if you wrap the table name in quotes, then the casing in quotes
will be preserved.
These same rules apply to PostgreSQL notification channels. More importantly,
whenever `Postgrex.Notifications` listens to a channel, it wraps the channel
name in quotes. Therefore, if you listen to a channel named "fooBar" and
you send a notification without quotes in the channel name, such as:
NOTIFY fooBar, "Oh hai!";
The notification will not be received by Postgrex.Notifications because the
notification will be effectively sent to `"foobar"` and not `"fooBar"`. Therefore,
you must guarantee one of the two following properties:
1. If you can wrap the channel name in quotes when sending a notification,
then make sure the channel name has the exact same casing when listening
and sending notifications
2. If you cannot wrap the channel name in quotes when sending a notification,
then make sure to give the lowercased channel name when listening
"""
@typedoc since: "0.17.0"
@type server :: :gen_statem.from()
alias Postgrex.SimpleConnection
@behaviour SimpleConnection
require Logger
defstruct [
:from,
:ref,
auto_reconnect: false,
connected: false,
listeners: %{},
listener_channels: %{}
]
@timeout 5000
@doc false
def child_spec(opts) do
%{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}}
end
@doc """
Start the notification connection process and connect to postgres.
The options that this function accepts are the same as those accepted by
`Postgrex.start_link/1`, as well as the extra options `:sync_connect`,
`:auto_reconnect`, `:reconnect_backoff`, and `:configure`.
## Options
* `:sync_connect` - controls if the connection should be established on boot
or asynchronously right after boot. Defaults to `true`.
* `:auto_reconnect` - automatically attempt to reconnect to the database
in event of a disconnection. See the
[note about async connect and auto-reconnects](#module-async-connect-and-auto-reconnects)
above. Defaults to `false`, which means the process terminates.
* `:reconnect_backoff` - time (in ms) between reconnection attempts when
`auto_reconnect` is enabled. Defaults to `500`.
* `:idle_interval` - while also accepted on `Postgrex.start_link/1`, it has
a default of `5000ms` in `Postgrex.Notifications` (instead of 1000ms).
* `:configure` - A function to run before every connect attempt to dynamically
configure the options as a `{module, function, args}`, where the current
options will prepended to `args`. Defaults to `nil`.
"""
@spec start_link(Keyword.t()) :: {:ok, pid} | {:error, Postgrex.Error.t() | term}
def start_link(opts) do
args = Keyword.take(opts, [:auto_reconnect])
SimpleConnection.start_link(__MODULE__, args, opts)
end
@doc """
Listens to an asynchronous notification channel using the `LISTEN` command.
A message `{:notification, connection_pid, ref, channel, payload}` will be
sent to the calling process when a notification is received.
It returns `{:ok, reference}`. It may also return `{:eventually, reference}`
if the notification process is not currently connected to the database and
it was started with `:sync_connect` set to false or `:auto_reconnect` set
to true. The `reference` can be used to issue an `unlisten/3` command.
## Options
* `:timeout` - Call timeout (default: `#{@timeout}`)
"""
@spec listen(server, String.t(), Keyword.t()) ::
{:ok, reference} | {:eventually, reference}
def listen(pid, channel, opts \\ []) do
SimpleConnection.call(pid, {:listen, channel}, Keyword.get(opts, :timeout, @timeout))
end
@doc """
Listens to an asynchronous notification channel `channel`. See `listen/2`.
"""
@spec listen!(server, String.t(), Keyword.t()) :: reference
def listen!(pid, channel, opts \\ []) do
{:ok, ref} = listen(pid, channel, opts)
ref
end
@doc """
Stops listening on the given channel by passing the reference returned from
`listen/2`.
## Options
* `:timeout` - Call timeout (default: `#{@timeout}`)
"""
@spec unlisten(server, reference, Keyword.t()) :: :ok | :error
def unlisten(pid, ref, opts \\ []) do
SimpleConnection.call(pid, {:unlisten, ref}, Keyword.get(opts, :timeout, @timeout))
end
@doc """
Stops listening on the given channel by passing the reference returned from
`listen/2`.
"""
@spec unlisten!(server, reference, Keyword.t()) :: :ok
def unlisten!(pid, ref, opts \\ []) do
case unlisten(pid, ref, opts) do
:ok -> :ok
:error -> raise ArgumentError, "unknown reference #{inspect(ref)}"
end
end
## CALLBACKS ##
@impl true
def init(args) do
{:ok, struct!(__MODULE__, args)}
end
@impl true
def notify(channel, payload, state) do
for {ref, pid} <- Map.get(state.listener_channels, channel, []) do
send(pid, {:notification, self(), ref, channel, payload})
end
:ok
end
@impl true
def handle_connect(state) do
state = %{state | connected: true}
if map_size(state.listener_channels) > 0 do
listen_statements =
state.listener_channels
|> Map.keys()
|> Enum.map_join("\n", &~s(LISTEN "#{&1}";))
query = "DO $$BEGIN #{listen_statements} END$$"
{:query, query, state}
else
{:noreply, state}
end
end
@impl true
def handle_disconnect(state) do
state = %{state | connected: false}
if state.auto_reconnect && state.from && state.ref do
SimpleConnection.reply(state.from, {:eventually, state.ref})
{:noreply, %{state | from: nil, ref: nil}}
else
{:noreply, state}
end
end
@impl true
def handle_call({:listen, channel}, {pid, _} = from, state) do
ref = Process.monitor(pid)
state = put_in(state.listeners[ref], {channel, pid})
state = update_in(state.listener_channels[channel], &Map.put(&1 || %{}, ref, pid))
cond do
not state.connected ->
SimpleConnection.reply(from, {:eventually, ref})
{:noreply, state}
map_size(state.listener_channels[channel]) == 1 ->
{:query, ~s(LISTEN "#{channel}"), %{state | from: from, ref: ref}}
true ->
SimpleConnection.reply(from, {:ok, ref})
{:noreply, state}
end
end
def handle_call({:unlisten, ref}, from, state) do
case state.listeners do
%{^ref => {channel, _pid}} ->
Process.demonitor(ref, [:flush])
{_, state} = pop_in(state.listeners[ref])
{_, state} = pop_in(state.listener_channels[channel][ref])
if map_size(state.listener_channels[channel]) == 0 do
{_, state} = pop_in(state.listener_channels[channel])
{:query, ~s(UNLISTEN "#{channel}"), %{state | from: from}}
else
from && SimpleConnection.reply(from, :ok)
{:noreply, state}
end
_ ->
from && SimpleConnection.reply(from, :error)
{:noreply, state}
end
end
@impl true
def handle_info({:DOWN, ref, :process, _, _}, state) do
handle_call({:unlisten, ref}, nil, state)
end
def handle_info(msg, state) do
Logger.info(fn ->
context = " received unexpected message: "
[inspect(__MODULE__), ?\s, inspect(self()), context | inspect(msg)]
end)
{:noreply, state}
end
@impl true
def handle_result(_message, %{from: from, ref: ref} = state) do
cond do
from && ref ->
SimpleConnection.reply(from, {:ok, ref})
{:noreply, %{state | from: nil, ref: nil}}
from ->
SimpleConnection.reply(from, :ok)
{:noreply, %{state | from: nil}}
true ->
{:noreply, state}
end
end
end