-
Notifications
You must be signed in to change notification settings - Fork 64
/
longpolling_controller.ex
133 lines (111 loc) · 3.84 KB
/
longpolling_controller.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
defmodule RigInboundGatewayWeb.V1.LongpollingController do
@moduledoc """
Handling of Longpolling functionalitiy
"""
require Logger
use Rig.Config, [:cors]
use RigInboundGatewayWeb, :controller
alias Rig.Connection
alias RigInboundGatewayWeb.ConnectionLimit
alias RigInboundGatewayWeb.Session
alias RigOutboundGateway
# ---
@doc false
def handle_preflight(%{method: "OPTIONS"} = conn, _params) do
conn
|> with_allow_origin()
|> put_resp_header("access-control-allow-methods", "GET")
|> put_resp_header("access-control-allow-headers", "*")
|> send_resp(:no_content, "")
end
# ---
@doc false
def handle_connection(%{method: "GET"} = conn, _params) do
conn = conn |> fetch_cookies |> fetch_query_params
conn.req_cookies["connection_token"] |> is_new_session? |> process_request(conn)
end
# -----
# Helpers
# -----
# validates if a connection_token was given.
# If yes, it validates if corresponding session processes are still alive
# ignoring invalid/timed out cookies
defp is_new_session?(connection_token)
defp is_new_session?(nil), do: true
defp is_new_session?(connection_token) do
case Connection.Codec.deserialize(connection_token) do
{:ok, session_pid} -> !Process.alive?(session_pid)
_ -> false
end
end
# ---
defp process_request(is_new_session, conn)
# starts a new session
defp process_request(true, conn) do
with {:ok, _n_connections} <- ConnectionLimit.check_rate_limit(),
{:ok, session_pid} <- Session.start(conn.query_params) do
Logger.debug(fn ->
"new Longpolling connection (pid=#{inspect(session_pid)}, params=#{
inspect(conn.query_params)
})"
end)
conn
|> with_allow_origin()
|> put_resp_cookie("connection_token", session_pid |> Connection.Codec.serialize())
|> put_resp_cookie("last_event_id", Jason.encode!("first_event"))
|> put_resp_header("cache-control", "no-cache")
|> put_status(200)
|> json("ok")
else
{:error, {:bad_request, message}} ->
conn
|> with_allow_origin()
|> put_status(:bad_request)
|> text(message)
{:error, %ConnectionLimit.MaxConnectionsError{n_connections: n_connections} = ex} ->
Logger.warn(fn ->
pid = inspect(self())
msg = "#{Exception.message(ex)}=#{n_connections} per minute"
"Cannot accept long polling connection #{pid}: #{msg}"
end)
conn
|> put_status(:too_many_requests)
|> text(Exception.message(ex))
error ->
msg = "Failed to initialize long polling connection"
Logger.error(fn -> "#{msg}: #{inspect(error)}" end)
conn
|> with_allow_origin()
|> put_status(:internal_server_error)
|> text("Internal server error: #{msg}.")
end
end
# reconnect to existing session
defp process_request(false, conn) do
{:ok, session_pid} = Connection.Codec.deserialize(conn.req_cookies["connection_token"])
response =
Session.recv_events(
session_pid,
Jason.decode!(conn.req_cookies["last_event_id"] || "first_event")
)
conn
|> with_allow_origin()
|> put_resp_cookie("connection_token", session_pid |> Connection.Codec.serialize())
|> put_resp_cookie(
"last_event_id",
Jason.encode!(response[:last_event_id] || "first_event")
)
|> put_resp_header("content-type", "application/json; charset=utf-8")
|> put_resp_header("cache-control", "no-cache")
|> put_status(200)
|> text(
~s<{"last_event_id":"#{response.last_event_id}","events":[#{Enum.join(response.events, ",")}]}>
)
end
# ---
defp with_allow_origin(conn) do
%{cors: origins} = config()
put_resp_header(conn, "access-control-allow-origin", origins)
|> put_resp_header("access-control-allow-credentials", "true")
end
end