Skip to content

Commit

Permalink
Subscriptions controller: Ignore JWTs that can't be validated
Browse files Browse the repository at this point in the history
Fixes #377.

- For subscription requests, JWTs that can't be validated are now
ignored. This allows to validate JWTs in an external service as
configurable via the SUBMISSION_CHECK and SUBSCRIPTION_CHECK environment
variables (which was the intention all along).

- Response code changed: when connecting and subscribing at the same
time, RIG replies with 403 (instead of 400) when not authorized to do
so.

- Ill-formed JWTs no longer cause subscription requests to fail.

- Fixed SUBMISSION_CHECK=jwt_validation - it failed the check anytime,
regardless of whether the JWT was valid.
  • Loading branch information
kevinbader committed May 23, 2021
1 parent d49dfa4 commit d4eafdd
Show file tree
Hide file tree
Showing 20 changed files with 475 additions and 68 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- make README smaller, easier to read and highlight features. [#284](https://github.com/Accenture/reactive-interaction-gateway/issues/284)
- Updated Phoenix LiveDashboard setup to show also metrics based on the Prometheus metrics (for now only proxy and events metrics). [#157](https://github.com/Accenture/reactive-interaction-gateway/issues/157)
- Updated [Channels Example](https://github.com/Accenture/reactive-interaction-gateway/tree/master/examples/channels-example) to use [Kafkajs](https://kafka.js.org/) and NodeJS 14. Updated [Smoke Tests](https://github.com/Accenture/reactive-interaction-gateway/tree/master/smoke_tests) to use NodeJS 14.
- Response code changed: when connecting and subscribing at the same time, RIG replies with 403 (instead of 400) when not authorized to do so.
- Ill-formed JWTs no longer cause subscription requests to fail.

### Fixed

- Fixed a bug where distributed set processes would crash when one of their peers has died but hasn't been removed yet from the pg2 group.
- Fixed wrong endpoint validation for reverse proxy. Now it should correctly check for `path` or `path_regex`. Before it would require `path` even with `path_regex` in place. [#334](https://github.com/Accenture/reactive-interaction-gateway/issues/334)
- For subscription requests, JWTs that can't be validated are now ignored. This allows to validate JWTs in an external service as configurable via the SUBMISSION_CHECK and SUBSCRIPTION_CHECK environment variables (which was the intention all along).
[#377](https://github.com/Accenture/reactive-interaction-gateway/issues/377)
- Fixed SUBMISSION_CHECK=jwt_validation - it failed the check anytime, regardless of whether the JWT was valid.

<!-- ### Deprecated -->

Expand Down
2 changes: 1 addition & 1 deletion docs/rig-ops-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ Variable&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
`PROXY_KINESIS_REQUEST_REGION` | AWS region for Kinesis stream publishing events from proxy. | "eu-west-1"
`PROXY_NATS_RESPONSE_TIMEOUT` | If `response_from` is set to `nats`, this defines how long RIG waits for the response. | 60000
`REQUEST_LOG` | Type of loggers to use to log requests processed by API Proxy, delimited by comma. | []
`SUBMISSION_CHECK` | Select if and how submitting/publishing events should be denied. Can be either `no_check` (submissions are always allowed), `jwt_validation` (submissions are allowed if at least one authorization token is valid - using JWT_SECRET_KEY - and not blacklisted), or an URL that points to an external service that decides whether to allow or deny the submissions. Such an external service is expected to accept POST requests. The CloudEvent is passed as a JSON map in the body. The original request's `Authorization` headers are reused for this request. The submission is allowed if the service returns 2xx and denied otherwise; return either 401 or 403 to reject a submission request. | "NO_CHECK"
`SUBMISSION_CHECK` | Select if and how submitting/publishing events should be denied. _Applies only to the inbound (public) API._ Can be either `no_check` (submissions are always allowed), `jwt_validation` (submissions are allowed if at least one authorization token is valid - using JWT_SECRET_KEY - and not blacklisted), or an URL that points to an external service that decides whether to allow or deny the submissions. Such an external service is expected to accept POST requests. The CloudEvent is passed as a JSON map in the body. The original request's `Authorization` headers are reused for this request. The submission is allowed if the service returns 2xx and denied otherwise; return either 401 or 403 to reject a submission request. | "NO_CHECK"
`SUBSCRIPTION_CHECK` | Select if and how creating subscriptions should be denied. Can be either `no_check` (subscriptions are always allowed), `jwt_validation` (subscription are allowed if at least one authorization token is valid - using JWT_SECRET_KEY - and not blacklisted), or an URL that points to an external service that decides whether to allow or deny the subscription. Such an external service is expected to accept POST requests. The subscription parameters are passed in the body. The original request's `Authorization` headers are reused for this request. The subscription is allowed if the service returns 2xx and denied otherwise; return either 401 or 403 to reject a subscription request. | "NO_CHECK"

.
17 changes: 13 additions & 4 deletions lib/rig/authorization_check/external.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,19 @@ defmodule RIG.AuthorizationCheck.External do
true | false | {:error, url :: String.t(), error :: any()}
def check(url, request) do
case HTTPoison.post(url, request.body || "", http_headers(request)) do
{:ok, %HTTPoison.Response{status_code: status}} when status >= 200 and status < 300 -> true
{:ok, %HTTPoison.Response{status_code: status}} when status == 401 or status == 403 -> false
{:ok, unexpected_response} -> {:error, unexpected_response}
{:error, error} -> {:error, {url, error}}
{:ok, %HTTPoison.Response{status_code: status}} when status >= 200 and status < 300 ->
Logger.debug(fn -> "ALLOW via #{url} (status #{status})" end)
true

{:ok, %HTTPoison.Response{status_code: status}} when status == 401 or status == 403 ->
Logger.debug(fn -> "DENY via #{url} (status #{status})" end)
false

{:ok, unexpected_response} ->
{:error, unexpected_response}

{:error, error} ->
{:error, {url, error}}
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/rig/authorization_check/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ defmodule RIG.AuthorizationCheck.Request do
def from_plug_conn(conn) do
[content_type] = Conn.get_req_header(conn, "content-type")

%{
%__MODULE__{
auth_info: auth_info(conn),
query_params: conn.query_params,
content_type: content_type,
Expand All @@ -52,7 +52,7 @@ defmodule RIG.AuthorizationCheck.Request do
defp auth_info(%{assigns: %{auth_tokens: tokens}} = conn) do
auth_header = for({"authorization", val} <- conn.req_headers, do: val) |> Enum.join(", ")

%{
%AuthInfo{
auth_header: auth_header,
auth_tokens: tokens
}
Expand Down
2 changes: 1 addition & 1 deletion lib/rig/authorization_check/submission.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ defmodule RIG.AuthorizationCheck.Submission do
end

@spec check_authorization(Request.t(), CloudEvent.t()) :: :ok | {:error, :not_authorized}
def check_authorization(request, %CloudEvent{}) do
def check_authorization(%Request{} = request, %CloudEvent{}) do
%{validation_type: validation_type} = config()

case validation_type do
Expand Down
2 changes: 1 addition & 1 deletion lib/rig/authorization_check/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ defmodule RIG.AuthorizationCheck.Subscription do
# If body is nil, there are no subscriptions to authorize.
def check_authorization(%{body: nil}), do: :ok

def check_authorization(request) do
def check_authorization(%Request{} = request) do
%{validation_type: validation_type} = config()

case validation_type do
Expand Down
5 changes: 4 additions & 1 deletion lib/rig/sources/http/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule RIG.Sources.HTTP.Handler do
alias Jason
alias Plug.Conn

alias RIG.AuthorizationCheck.Request
alias RIG.AuthorizationCheck.Submission
alias Rig.EventFilter
alias RIG.Plug.BodyReader
Expand Down Expand Up @@ -169,7 +170,9 @@ defmodule RIG.Sources.HTTP.Handler do
defp handle_event(conn, %CloudEvent{} = cloud_event) do
authorized? =
if conn.assigns[:check_authorization?] do
Submission.check_authorization(conn, cloud_event) == :ok
conn
|> Request.from_plug_conn()
|> Submission.check_authorization(cloud_event) == :ok
else
true
end
Expand Down
10 changes: 4 additions & 6 deletions lib/rig_inbound_gateway_web/connection_init.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,11 @@ defmodule RigInboundGatewayWeb.ConnectionInit do
%{auth_tokens: [{"bearer", jwt}]} -> jwt
end

with {:ok, jwt_subs} <-
Subscriptions.from_token(jwt),
with :ok <- SubscriptionAuthZ.check_authorization(request),
{:ok, jwt_subs} <- Subscriptions.from_token(jwt),
true = String.starts_with?(request.content_type, "application/json"),
{:ok, query_subs} <-
Subscriptions.from_json(request.body),
{:ok, query_subs} <- Subscriptions.from_json(request.body),
subscriptions = Enum.uniq(jwt_subs ++ query_subs),
:ok <- SubscriptionAuthZ.check_authorization(request),
{:ok, _n_connections} <- ConnectionLimit.check_rate_limit() do
# If the JWT is valid and points to a session, we associate this connection with
# it. If that doesn't work out, we log a warning but don't tell the frontend -
Expand Down Expand Up @@ -86,7 +84,7 @@ defmodule RigInboundGatewayWeb.ConnectionInit do
"Cannot accept #{conn_type} connection #{pid}: #{msg}"
end)

on_error.("Subscription denied (not authorized).")
on_error.({403, "Subscription denied (not authorized)."})

{:error, %ConnectionLimit.MaxConnectionsError{n_connections: n_connections} = ex} ->
Logger.warn(fn ->
Expand Down
1 change: 1 addition & 0 deletions lib/rig_inbound_gateway_web/v1/event_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ defmodule RigInboundGatewayWeb.V1.EventController do
def publish(%{method: "POST"} = conn, _params) do
conn
|> with_allow_origin()
|> Plug.Conn.fetch_query_params()
|> Handler.handle_http_submission(check_authorization?: true)
end
end
3 changes: 2 additions & 1 deletion lib/rig_inbound_gateway_web/v1/sse.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule RigInboundGatewayWeb.V1.SSE do

alias Result

alias RIG.AuthorizationCheck.Request
alias Rig.EventFilter
alias RigCloudEvents.CloudEvent
alias RigInboundGateway.Events
Expand Down Expand Up @@ -39,7 +40,7 @@ defmodule RigInboundGatewayWeb.V1.SSE do

case ConnectionInit.subscriptions_query_param_to_body(query_params) do
{:ok, encoded_body_or_nil} ->
request = %{
request = %Request{
auth_info: auth_info,
query_params: "",
content_type: "application/json; charset=utf-8",
Expand Down
38 changes: 17 additions & 21 deletions lib/rig_inbound_gateway_web/v1/subscription_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ defmodule RigInboundGatewayWeb.V1.SubscriptionController do
do_set_subscriptions(conn, socket_pid, subscriptions)
else
{:error, :not_authorized} ->
conn |> put_status(:forbidden) |> text("Subscription denied.")
conn |> put_status(:forbidden) |> text("Subscription denied (not authorized).")

{:error, :not_base64} ->
Logger.warn(fn -> "Connection token #{connection_id} is not Base64 encoded." end)
Expand Down Expand Up @@ -189,10 +189,22 @@ defmodule RigInboundGatewayWeb.V1.SubscriptionController do
# ---

defp do_set_subscriptions(conn, socket_pid, subscriptions_param) do
with {:ok, jwt_subscriptions} <-
conn.req_headers
|> get_all_claims()
|> Result.and_then(fn claims -> Subscriptions.from_jwt_claims(claims) end),
jwt_subscriptions_result =
conn.req_headers
|> JWT.parse_http_header()
# We ignore any JWT that RIG can't validate:
|> Result.filter_and_unwrap()
|> Enum.map(fn claims -> Subscriptions.from_jwt_claims(claims) end)
# For each JWT, we now either have a list of subscriptions, or an error;
# turn this into either a list of subscription-lists, or a list of errors:
|> Result.list_to_result()
# Flatten into a list of subscriptions:
|> Result.map(&Enum.concat/1)
|> Result.map_err(fn errors ->
"cannot derive subscriptions from authorization tokens: #{inspect(errors)}"
end)

with {:ok, jwt_subscriptions} <- jwt_subscriptions_result,
{:ok, passed_subscriptions} <- parse_subscriptions(subscriptions_param) do
subscriptions = jwt_subscriptions ++ passed_subscriptions
send(socket_pid, {:set_subscriptions, subscriptions})
Expand Down Expand Up @@ -224,22 +236,6 @@ defmodule RigInboundGatewayWeb.V1.SubscriptionController do

# ---

# All claims, from all Authorization tokens. Returns a Result.
defp get_all_claims(headers) do
headers
|> JWT.parse_http_header()
|> Enum.reduce(%{}, fn
{:ok, claims}, acc -> Map.merge(acc, claims)
{:error, error}, _acc -> throw(error)
end)
|> Result.ok()
catch
%JWT.DecodeError{} = error ->
Result.err("invalid authorization header: #{Exception.message(error)}")
end

# ---

defp parse_subscriptions(subscriptions) when is_list(subscriptions) do
subscriptions
|> Enum.map(&Subscription.new!/1)
Expand Down
16 changes: 12 additions & 4 deletions lib/rig_inbound_gateway_web/v1/websocket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule RigInboundGatewayWeb.V1.Websocket do

alias Result

alias RIG.AuthorizationCheck.Request
alias Rig.EventFilter
alias RigCloudEvents.CloudEvent
alias RigInboundGateway.Events
Expand Down Expand Up @@ -50,7 +51,7 @@ defmodule RigInboundGatewayWeb.V1.Websocket do

case ConnectionInit.subscriptions_query_param_to_body(query_params) do
{:ok, encoded_body_or_nil} ->
request = %{
request = %Request{
auth_info: auth_info,
query_params: "",
content_type: "application/json; charset=utf-8",
Expand All @@ -73,15 +74,22 @@ defmodule RigInboundGatewayWeb.V1.Websocket do
{:reply, frame(Events.welcome_event()), state, :hibernate}
end

on_error = fn _reason ->
on_error = fn reason ->
Logger.warn(fn -> "websocket error: #{inspect(reason)}" end)
# WebSocket close frames may include a payload to indicate the error, but we found
# that error message must be really short; if it isn't, the `{:close, :normal,
# payload}` is silently converted to `{:close, :abnormal, nil}`. Since there is no
# limit mentioned in the spec (RFC-6455), we opt for consistent responses,
# omitting the detailed error.
reason = "Bad request."
reply =
case reason do
{403, _} -> "Not authorized."
{code, _} -> "#{code}: Bad request."
_ -> "Bad request."
end

# This will close the connection:
{:reply, closing_frame(reason), :no_state}
{:reply, closing_frame(reply), :no_state}
end

ConnectionInit.set_up(
Expand Down
17 changes: 5 additions & 12 deletions test/event_subscription_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ defmodule RigInboundGateway.EventSubscriptionTest do
end
end

test "An invalid JWT causes the request to fail." do
test "An invalid JWT is ignored." do
invalid_jwt = "this is not a valid JWT"

for client <- @clients do
Expand All @@ -317,18 +317,11 @@ defmodule RigInboundGateway.EventSubscriptionTest do

{%{"data" => []}, ref} = client.read_subscriptions_set_event(ref)

error =
assert_raise SubscriptionError, fn ->
welcome_event
|> connection_id()
|> update_subscriptions([], invalid_jwt)
end
welcome_event
|> connection_id()
|> update_subscriptions([], invalid_jwt)

assert error.body =~ ~r/invalid authorization header/
assert error.code == 400

# The request has failed, so there should be no subscriptions_set event:
{:ok, ref} = client.refute_receive(ref)
assert {%{"data" => []}, ref} = client.read_subscriptions_set_event(ref)

client.disconnect(ref)
end
Expand Down
18 changes: 8 additions & 10 deletions test/longpolling_subscription_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -296,26 +296,24 @@ defmodule RigInboundGateway.LongpollingSubscriptionTest do
assert error.body =~ ~r/could not parse given subscriptions/
end

test "An invalid JWT causes the request to fail." do
test "An invalid JWT is ignored." do
invalid_jwt = "this is not a valid JWT"

{:ok, cookies} = LongpollingClient.connect(subscriptions: [])
{:ok, events, _cookies} = LongpollingClient.read_events(cookies)
{:ok, events, cookies} = LongpollingClient.read_events(cookies)

subscriptions_set_event = Enum.at(events, 1)
%{"data" => []} = subscriptions_set_event

welcome_event = Enum.at(events, 0)

error =
assert_raise SubscriptionError, fn ->
welcome_event
|> connection_id()
|> update_subscriptions([], invalid_jwt)
end
welcome_event
|> connection_id()
|> update_subscriptions([], invalid_jwt)

assert error.code == 400
assert error.body =~ ~r/invalid authorization header/
{:ok, events, cookies} = LongpollingClient.read_events(cookies)
subscriptions_set_event = Enum.at(events, 0)
%{"data" => []} = subscriptions_set_event
end
end

Expand Down
61 changes: 61 additions & 0 deletions test/rig_inbound_gateway/event_submission/external_check_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
defmodule RigInboundGateway.EventSubmission.ExternalCheckTest do
@moduledoc """
An external service may be used to allow or deny publishing events.
"""
# Cannot be async because environment variables are modified:
use ExUnit.Case, async: false

import FakeServer
alias FakeServer.Response

alias HTTPoison

alias RIG.JWT

@hostname "localhost"
@eventhub_port Confex.fetch_env!(:rig, RigInboundGatewayWeb.Endpoint)[:http][:port]
@api_port Confex.fetch_env!(:rig, RigApi.Endpoint)[:http][:port]
@public_submission_url "http://#{@hostname}:#{@eventhub_port}/_rig/v1/events"
@private_submission_url "http://#{@hostname}:#{@api_port}/v3/messages"
@fake_validation_service_port 59_349

@event_json """
{ "id": "2", "source": "nil", "specversion": "0.2", "type": "greeting" }
"""

@var_name "SUBMISSION_CHECK"
@orig_val System.get_env(@var_name)
setup_all do
System.put_env(@var_name, "http://localhost:#{@fake_validation_service_port}")

on_exit(fn ->
case @orig_val do
nil -> System.delete_env(@var_name)
_ -> System.put_env(@var_name, @orig_val)
end
end)
end

test "The private API doesn't use the external service." do
headers = %{"content-type" => "application/json"}
assert %{status_code: 202} = HTTPoison.post!(@private_submission_url, @event_json, headers)
end

test_with_server "The public API allows publishing if the external service accepts.",
port: @fake_validation_service_port do
# The fake subscription-validation service accepts anything:
route("/", Response.ok!("Ok"))

headers = %{"content-type" => "application/json"}
assert %{status_code: 202} = HTTPoison.post!(@public_submission_url, @event_json, headers)
end

test_with_server "The public API denies publishing if the external service rejects.",
port: @fake_validation_service_port do
# The fake subscription-validation service rejects anything:
route("/", Response.forbidden!("Go away!"))

headers = %{"content-type" => "application/json"}
assert %{status_code: 403} = HTTPoison.post!(@public_submission_url, @event_json, headers)
end
end

0 comments on commit d4eafdd

Please sign in to comment.