/
subscription.ex
255 lines (198 loc) · 7.28 KB
/
subscription.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
defmodule Absinthe.Subscription do
@moduledoc """
Real time updates via GraphQL
For a how to guide on getting started with Absinthe.Subscriptions in your phoenix
project see the `Absinthe.Phoenix` package.
Define in your schema via `Absinthe.Schema.subscription/2`
## Basic Usage
## Performance Characteristics
There are a couple of limitations to the beta release of subscriptions that
are worth keeping in mind if you want to use this in production:
By design, all subscription docs triggered by a mutation are run inside the
mutation process as a form of back pressure.
At the moment however database batching does not happen across the set of
subscription docs. Thus if you have a lot of subscription docs and they each
do a lot of extra DB lookups you're going to delay incoming mutation responses
by however long it takes to do all that work.
Before the final version of 1.4.0 we want
- Batching across subscriptions
- More user control over back pressure / async balance.
"""
alias __MODULE__
alias Absinthe.Subscription.PipelineSerializer
@doc """
Add Absinthe.Subscription to your process tree.
"""
@spec start_link(atom() | [opt()]) :: Supervisor.on_start()
defdelegate start_link(opts_or_pubsub), to: Subscription.Supervisor
@type opt() ::
{:pubsub, atom()} | {:compress_registry?, boolean()} | {:pool_size, pos_integer()}
@doc """
Build a child specification for subscriptions.
In order to use subscriptions in your application, you must add
`Absinthe.Subscription` to your supervision tree after your endpoint.
See `guides/subscriptions.md` for more information on how to get up and
running with subscriptions.
## Options
* `:pubsub` - (Required) The `Phoenix.Pubsub` that should be used to publish
subscriptions. Typically this will be your `Phoenix.Endpoint`.
* `:compress_registry?` - (Optional - default `true`) A boolean controlling
whether the Registry used to keep track of subscriptions will should be
compressed or not.
* `:pool_size` - (Optional - default `System.schedulers() * 2`) An integer
specifying the number of `Absinthe.Subscription.Proxy` processes to start.
"""
@spec child_spec(atom() | [opt()]) :: Supervisor.child_spec()
def child_spec(pubsub) when is_atom(pubsub) do
# child_spec/1 used to take a single argument - the pub-sub - so in order
# to maintain compatibility for existing users of the library we still
# accept this argument and transform it into a keyword list.
child_spec(pubsub: pubsub)
end
def child_spec(opts) when is_list(opts) do
%{
id: __MODULE__,
start: {Subscription.Supervisor, :start_link, [opts]},
type: :supervisor
}
end
@type subscription_field_spec :: {atom, term | (term -> term)}
@doc """
Publish a mutation
This function is generally used when trying to publish to one or more subscription
fields "out of band" from any particular mutation.
## Examples
Note: As with all subscription examples if you're using Absinthe.Phoenix `pubsub`
will be `MyAppWeb.Endpoint`.
```
Absinthe.Subscription.publish(pubsub, user, [new_users: user.account_id])
```
```
# publish to two subscription fields
Absinthe.Subscription.publish(pubsub, user, [
new_users: user.account_id,
other_user_subscription_field: user.id,
])
```
"""
@spec publish(
Absinthe.Subscription.Pubsub.t(),
term,
Absinthe.Resolution.t() | [subscription_field_spec]
) :: :ok
def publish(_pubsub, _mutation_result, []), do: :ok
def publish(pubsub, mutation_result, %Absinthe.Resolution{} = info) do
subscribed_fields = get_subscription_fields(info)
publish(pubsub, mutation_result, subscribed_fields)
end
def publish(pubsub, mutation_result, subscribed_fields) do
_ = publish_remote(pubsub, mutation_result, subscribed_fields)
_ = Subscription.Local.publish_mutation(pubsub, mutation_result, subscribed_fields)
:ok
end
defp get_subscription_fields(resolution_info) do
mutation_field = resolution_info.definition.schema_node
schema = resolution_info.schema
subscription = Absinthe.Schema.lookup_type(schema, :subscription) || %{fields: []}
subscription_fields = fetch_fields(subscription.fields, mutation_field.triggers)
for {sub_field_id, sub_field} <- subscription_fields do
triggers = Absinthe.Type.function(sub_field, :triggers)
config = Map.fetch!(triggers, mutation_field.identifier)
{sub_field_id, config}
end
end
# TODO: normalize the `.fields` type.
defp fetch_fields(fields, triggers) when is_map(fields) do
Map.take(fields, triggers)
end
defp fetch_fields(_, _), do: []
@doc false
def subscribe(pubsub, field_keys, doc_id, doc) do
field_keys = List.wrap(field_keys)
registry = pubsub |> registry_name
doc_value = %{
initial_phases: PipelineSerializer.pack(doc.initial_phases),
source: doc.source
}
pdict_add_fields(doc_id, field_keys)
for field_key <- field_keys do
{:ok, _} = Registry.register(registry, field_key, doc_id)
end
{:ok, _} = Registry.register(registry, doc_id, doc_value)
end
defp pdict_fields(doc_id) do
Process.get({__MODULE__, doc_id}, [])
end
defp pdict_add_fields(doc_id, field_keys) do
Process.put({__MODULE__, doc_id}, field_keys ++ pdict_fields(doc_id))
end
defp pdict_delete_fields(doc_id) do
Process.delete({__MODULE__, doc_id})
end
@doc false
def unsubscribe(pubsub, doc_id) do
registry = pubsub |> registry_name
for field_key <- pdict_fields(doc_id) do
Registry.unregister(registry, field_key)
end
Registry.unregister(registry, doc_id)
pdict_delete_fields(doc_id)
:ok
end
@doc false
def get(pubsub, key) do
pubsub
|> registry_name
|> Registry.lookup(key)
|> then(fn doc_ids ->
pubsub
|> registry_name
|> Registry.select(
# We compose a list of match specs that basically mean "lookup all keys
# in the doc_ids list"
for {_, doc_id} <- doc_ids,
do: {{:"$1", :_, :"$2"}, [{:==, :"$1", doc_id}], [{{:"$1", :"$2"}}]}
)
end)
|> Map.new(fn {doc_id, doc} ->
doc = Map.update!(doc, :initial_phases, &PipelineSerializer.unpack/1)
{doc_id, doc}
end)
end
@doc false
def registry_name(pubsub) do
Module.concat([pubsub, :Registry])
end
@doc false
def publish_remote(pubsub, mutation_result, subscribed_fields) do
{:ok, pool_size} =
pubsub
|> registry_name
|> Registry.meta(:pool_size)
shard = :erlang.phash2(mutation_result, pool_size)
proxy_topic = Subscription.Proxy.topic(shard)
:ok = pubsub.publish_mutation(proxy_topic, mutation_result, subscribed_fields)
end
## Middleware callback
@doc false
def call(%{state: :resolved, errors: [], value: value} = res, _) do
with {:ok, pubsub} <- extract_pubsub(res.context) do
__MODULE__.publish(pubsub, value, res)
end
res
end
def call(res, _), do: res
@doc false
def extract_pubsub(context) do
with {:ok, pubsub} <- Map.fetch(context, :pubsub),
pid when is_pid(pid) <- Process.whereis(registry_name(pubsub)) do
{:ok, pubsub}
else
_ -> :error
end
end
@doc false
def add_middleware(middleware) do
middleware ++ [{__MODULE__, []}]
end
end