-
Notifications
You must be signed in to change notification settings - Fork 64
/
nats.ex
137 lines (110 loc) · 3.6 KB
/
nats.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
defmodule RigInboundGateway.ApiProxy.Handler.Nats do
@moduledoc """
Handles requests for NATS targets.
"""
require Logger
use Rig.Config, [:timeout, :cors]
alias Plug.Conn
alias Rig.Connection.Codec
alias RigInboundGateway.ApiProxy.Handler
alias RigMetrics.ProxyMetrics
@behaviour Handler
@help_text """
Produce the request to a NATS topic and optionally wait for the response.
Expects a JSON encoded CloudEvent in the HTTP body.
"""
@impl Handler
@doc @help_text
def handle_http_request(conn, api, endpoint, request_path)
# CORS response for preflight request.
def handle_http_request(%{method: "OPTIONS"} = conn, _, %{"target" => "nats"} = _endpoint, _) do
conn
|> with_cors()
|> Conn.send_resp(:no_content, "")
end
def handle_http_request(
conn,
_,
%{"target" => "nats", "topic" => topic} = endpoint,
request_path
)
when byte_size(topic) > 0 do
conn = with_cors(conn)
response_from = Map.get(endpoint, "response_from", "http")
case Jason.decode(conn.assigns[:body]) do
{:ok, %{"specversion" => _} = event} ->
do_handle_http_request(conn, request_path, event, response_from, topic)
error ->
message =
case error do
{:ok, _obj} -> "The body is a valid JSON object but does not look like a CloudEvent."
{:error, error} -> "The body is not JSON encoded (#{inspect(error)})."
end
response = """
Bad request: #{message}
# Usage
#{@help_text}
"""
Logger.debug(fn -> "Received invalid request to NATS proxy: #{inspect(error)}" end)
count_request(conn, response_from, "bad_request")
Conn.send_resp(conn, :bad_request, response)
end
end
# ---
def do_handle_http_request(conn, request_path, event, response_from, topic) do
message = event |> add_metadata(conn, request_path) |> Jason.encode!()
wait_for_response? =
case response_from do
"nats" -> true
_ -> false
end
%{timeout: timeout} = config()
if wait_for_response? do
case Gnat.request(:nats, topic, message, receive_timeout: timeout) do
{:ok, %{body: response}} ->
count_request(conn, response_from, "ok")
conn
|> Conn.put_resp_content_type("application/json")
|> Conn.send_resp(:ok, response)
{:error, :timeout} ->
count_request(conn, response_from, "response_timeout")
Conn.send_resp(conn, :gateway_timeout, "Timed out while waiting for the response.")
end
else
Gnat.pub(:nats, topic, message)
count_request(conn, response_from, "ok")
Conn.send_resp(conn, :accepted, "Accepted.")
end
end
# ---
defp add_metadata(event, conn, request_path) do
Map.put(event, "rig", %{
correlation: Codec.serialize(self()),
remoteip: to_string(:inet_parse.ntoa(conn.remote_ip)),
host: conn.host,
port: conn.port,
scheme: conn.scheme,
headers: Enum.map(conn.req_headers, &Tuple.to_list(&1)),
method: conn.method,
path: request_path,
query: conn.query_string
})
end
# ---
defp with_cors(conn) do
conn
|> Conn.put_resp_header("access-control-allow-origin", config().cors)
|> Conn.put_resp_header("access-control-allow-methods", "*")
|> Conn.put_resp_header("access-control-allow-headers", "content-type,authorization")
end
# ---
defp count_request(conn, response_from, status) do
ProxyMetrics.count_proxy_request(
conn.method,
conn.request_path,
"nats",
response_from,
status
)
end
end