-
Notifications
You must be signed in to change notification settings - Fork 64
/
connection_init.ex
143 lines (120 loc) · 4.69 KB
/
connection_init.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
defmodule RigInboundGatewayWeb.ConnectionInit do
@moduledoc """
Cowboy WebSocket handler.
As soon as Phoenix pulls in Cowboy 2 this will have to be rewritten using the
:cowboy_websocket behaviour.
"""
require Logger
alias RIG.AuthorizationCheck.Request
alias RIG.AuthorizationCheck.Subscription, as: SubscriptionAuthZ
alias RIG.JWT
alias RIG.Session
alias Rig.Subscription
alias RIG.Subscriptions
alias RigInboundGatewayWeb.ConnectionLimit
# ---
@type handler_response :: any
@type on_success :: ([Subscription.t()] -> handler_response)
@type on_error :: (reason :: String.t() -> handler_response)
@spec set_up(String.t(), Request.t(), on_success, on_error, pos_integer(), pos_integer()) ::
handler_response
def set_up(
conn_type,
request,
on_success,
on_error,
heartbeat_interval_ms,
subscription_refresh_interval_ms
) do
Logger.debug(fn ->
"new #{conn_type} connection (pid=#{inspect(self())}, params=#{inspect(request)})"
end)
jwt =
case request.auth_info do
nil -> nil
%{auth_tokens: [{"bearer", jwt}]} -> jwt
end
with {:ok, jwt_subs} <-
Subscriptions.from_token(jwt),
true = String.starts_with?(request.content_type, "application/json"),
{:ok, query_subs} <-
Subscriptions.from_json(request.body),
subscriptions = Enum.uniq(jwt_subs ++ query_subs),
:ok <- SubscriptionAuthZ.check_authorization(request),
{:ok, _n_connections} <- ConnectionLimit.check_rate_limit() do
# If the JWT is valid and points to a session, we associate this connection with
# it. If that doesn't work out, we log a warning but don't tell the frontend -
# it's not the frontend's fault anyway.
check_and_register_session(jwt)
|> Result.or_else(fn error ->
Logger.warn(fn ->
"Failed to associate the #{conn_type} connection #{inspect(self())} to its session: #{
inspect(error)
}"
end)
end)
# We're going to accept the connection, so let's set up the heartbeat too:
Process.send_after(self(), :heartbeat, heartbeat_interval_ms)
# We register subscriptions:
send(self(), {:set_subscriptions, subscriptions})
# ..and schedule periodic refresh:
Process.send_after(self(), :refresh_subscriptions, subscription_refresh_interval_ms)
on_success.(subscriptions)
else
{:error, %Subscriptions.Error{} = e} ->
Logger.warn(fn ->
pid = inspect(self())
msg = Exception.message(e)
"Cannot accept #{conn_type} connection #{pid}: #{msg}"
end)
on_error.(Exception.message(e))
{:error, :not_authorized} ->
Logger.warn(fn ->
pid = inspect(self())
msg = "not authorized"
"Cannot accept #{conn_type} connection #{pid}: #{msg}"
end)
on_error.("Subscription denied (not authorized).")
{:error, %ConnectionLimit.MaxConnectionsError{n_connections: n_connections} = ex} ->
Logger.warn(fn ->
pid = inspect(self())
msg = "#{Exception.message(ex)}=#{n_connections} per minute"
"Cannot accept #{conn_type} connection #{pid}: #{msg}"
end)
on_error.({429, Exception.message(ex)})
end
end
# ---
@spec check_and_register_session(map()) ::
Result.t(
any,
{:failed_to_associate_to_session, %RIG.JWT.DecodeError{} | String.t()}
)
defp check_and_register_session(jwt)
defp check_and_register_session(nil), do: {:ok, nil}
defp check_and_register_session(jwt) do
jwt
|> JWT.parse_token()
|> Result.and_then(fn claims -> Session.from_claims(claims) end)
|> Result.map(fn session_name -> Session.register_connection(session_name, self()) end)
|> Result.map_err(fn err -> {:failed_to_associate_to_session, err} end)
end
# ---
@spec subscriptions_query_param_to_body(map) :: {:ok, String.t()} | {:error, String.t()}
def subscriptions_query_param_to_body(query_params)
def subscriptions_query_param_to_body(%{"subscriptions" => json_list})
when byte_size(json_list) > 0 do
case Jason.decode(json_list) do
{:ok, list} ->
Jason.encode(%{"subscriptions" => list})
|> Result.map_err(fn ex ->
msg = "Failed to encode subscriptions body from query parameters"
Logger.warn("#{msg}: #{Exception.message(ex)}")
"#{msg} (please see server logs for details)."
end)
{:error, %Jason.DecodeError{} = ex} ->
{:error, "Failed to decode subscription list: #{Exception.message(ex)}"}
end
end
def subscriptions_query_param_to_body(_), do: {:ok, nil}
end