-
Notifications
You must be signed in to change notification settings - Fork 519
/
proxy.ex
56 lines (44 loc) · 1.28 KB
/
proxy.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
defmodule Absinthe.Subscription.Proxy do
@moduledoc false
use GenServer
defstruct [
:pubsub,
:node,
:task_super
]
def child_spec([_, _, shard] = args) do
%{
id: {__MODULE__, shard},
start: {__MODULE__, :start_link, [args]}
}
end
alias Absinthe.Subscription
@gc_interval 5_000
def start_link(args) do
GenServer.start_link(__MODULE__, args)
end
def topic(shard), do: "__absinthe__:proxy:#{shard}"
def init([task_super, pubsub, shard]) do
node_name = pubsub.node_name()
:ok = pubsub.subscribe(topic(shard))
Process.send_after(self(), :gc, @gc_interval)
{:ok, %__MODULE__{pubsub: pubsub, node: node_name, task_super: task_super}}
end
def handle_info(:gc, state) do
:erlang.garbage_collect()
Process.send_after(self(), :gc, @gc_interval)
{:noreply, state}
end
def handle_info(payload, state) do
# There's no meaningful form of backpressure to have here, and we can't
# bottleneck execution inside each proxy process
unless payload.node == state.pubsub.node_name() do
Task.Supervisor.start_child(state.task_super, Subscription.Local, :publish_mutation, [
state.pubsub,
payload.mutation_result,
payload.subscribed_fields
])
end
{:noreply, state}
end
end