diff --git a/CHANGELOG.md b/CHANGELOG.md index 08c8f961..4309ee6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,11 +44,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - make README smaller, easier to read and highlight features. [#284](https://github.com/Accenture/reactive-interaction-gateway/issues/284) - Updated Phoenix LiveDashboard setup to show also metrics based on the Prometheus metrics (for now only proxy and events metrics). [#157](https://github.com/Accenture/reactive-interaction-gateway/issues/157) - Updated [Channels Example](https://github.com/Accenture/reactive-interaction-gateway/tree/master/examples/channels-example) to use [Kafkajs](https://kafka.js.org/) and NodeJS 14. Updated [Smoke Tests](https://github.com/Accenture/reactive-interaction-gateway/tree/master/smoke_tests) to use NodeJS 14. +- Response code changed: when connecting and subscribing at the same time, RIG replies with 403 (instead of 400) when not authorized to do so. +- Ill-formed JWTs no longer cause subscription requests to fail. ### Fixed - Fixed a bug where distributed set processes would crash when one of their peers has died but hasn't been removed yet from the pg2 group. - Fixed wrong endpoint validation for reverse proxy. Now it should correctly check for `path` or `path_regex`. Before it would require `path` even with `path_regex` in place. [#334](https://github.com/Accenture/reactive-interaction-gateway/issues/334) +- For subscription requests, JWTs that can't be validated are now ignored. This allows to validate JWTs in an external service as configurable via the SUBMISSION_CHECK and SUBSCRIPTION_CHECK environment variables (which was the intention all along). + [#377](https://github.com/Accenture/reactive-interaction-gateway/issues/377) +- Fixed SUBMISSION_CHECK=jwt_validation - it failed the check anytime, regardless of whether the JWT was valid. diff --git a/docs/rig-ops-guide.md b/docs/rig-ops-guide.md index 0b96b995..a999483f 100644 --- a/docs/rig-ops-guide.md +++ b/docs/rig-ops-guide.md @@ -69,7 +69,7 @@ Variable             `PROXY_KINESIS_REQUEST_REGION` | AWS region for Kinesis stream publishing events from proxy. | "eu-west-1" `PROXY_NATS_RESPONSE_TIMEOUT` | If `response_from` is set to `nats`, this defines how long RIG waits for the response. | 60000 `REQUEST_LOG` | Type of loggers to use to log requests processed by API Proxy, delimited by comma. | [] -`SUBMISSION_CHECK` | Select if and how submitting/publishing events should be denied. Can be either `no_check` (submissions are always allowed), `jwt_validation` (submissions are allowed if at least one authorization token is valid - using JWT_SECRET_KEY - and not blacklisted), or an URL that points to an external service that decides whether to allow or deny the submissions. Such an external service is expected to accept POST requests. The CloudEvent is passed as a JSON map in the body. The original request's `Authorization` headers are reused for this request. The submission is allowed if the service returns 2xx and denied otherwise; return either 401 or 403 to reject a submission request. | "NO_CHECK" +`SUBMISSION_CHECK` | Select if and how submitting/publishing events should be denied. _Applies only to the inbound (public) API._ Can be either `no_check` (submissions are always allowed), `jwt_validation` (submissions are allowed if at least one authorization token is valid - using JWT_SECRET_KEY - and not blacklisted), or an URL that points to an external service that decides whether to allow or deny the submissions. Such an external service is expected to accept POST requests. The CloudEvent is passed as a JSON map in the body. The original request's `Authorization` headers are reused for this request. The submission is allowed if the service returns 2xx and denied otherwise; return either 401 or 403 to reject a submission request. | "NO_CHECK" `SUBSCRIPTION_CHECK` | Select if and how creating subscriptions should be denied. Can be either `no_check` (subscriptions are always allowed), `jwt_validation` (subscription are allowed if at least one authorization token is valid - using JWT_SECRET_KEY - and not blacklisted), or an URL that points to an external service that decides whether to allow or deny the subscription. Such an external service is expected to accept POST requests. The subscription parameters are passed in the body. The original request's `Authorization` headers are reused for this request. The subscription is allowed if the service returns 2xx and denied otherwise; return either 401 or 403 to reject a subscription request. | "NO_CHECK" . diff --git a/lib/rig/authorization_check/external.ex b/lib/rig/authorization_check/external.ex index 4dac54c5..aa383755 100644 --- a/lib/rig/authorization_check/external.ex +++ b/lib/rig/authorization_check/external.ex @@ -11,10 +11,19 @@ defmodule RIG.AuthorizationCheck.External do true | false | {:error, url :: String.t(), error :: any()} def check(url, request) do case HTTPoison.post(url, request.body || "", http_headers(request)) do - {:ok, %HTTPoison.Response{status_code: status}} when status >= 200 and status < 300 -> true - {:ok, %HTTPoison.Response{status_code: status}} when status == 401 or status == 403 -> false - {:ok, unexpected_response} -> {:error, unexpected_response} - {:error, error} -> {:error, {url, error}} + {:ok, %HTTPoison.Response{status_code: status}} when status >= 200 and status < 300 -> + Logger.debug(fn -> "ALLOW via #{url} (status #{status})" end) + true + + {:ok, %HTTPoison.Response{status_code: status}} when status == 401 or status == 403 -> + Logger.debug(fn -> "DENY via #{url} (status #{status})" end) + false + + {:ok, unexpected_response} -> + {:error, unexpected_response} + + {:error, error} -> + {:error, {url, error}} end end diff --git a/lib/rig/authorization_check/request.ex b/lib/rig/authorization_check/request.ex index 8e96c2f3..02bf5c07 100644 --- a/lib/rig/authorization_check/request.ex +++ b/lib/rig/authorization_check/request.ex @@ -37,7 +37,7 @@ defmodule RIG.AuthorizationCheck.Request do def from_plug_conn(conn) do [content_type] = Conn.get_req_header(conn, "content-type") - %{ + %__MODULE__{ auth_info: auth_info(conn), query_params: conn.query_params, content_type: content_type, @@ -52,7 +52,7 @@ defmodule RIG.AuthorizationCheck.Request do defp auth_info(%{assigns: %{auth_tokens: tokens}} = conn) do auth_header = for({"authorization", val} <- conn.req_headers, do: val) |> Enum.join(", ") - %{ + %AuthInfo{ auth_header: auth_header, auth_tokens: tokens } diff --git a/lib/rig/authorization_check/submission.ex b/lib/rig/authorization_check/submission.ex index dbe5a20f..40fbc4c0 100644 --- a/lib/rig/authorization_check/submission.ex +++ b/lib/rig/authorization_check/submission.ex @@ -28,7 +28,7 @@ defmodule RIG.AuthorizationCheck.Submission do end @spec check_authorization(Request.t(), CloudEvent.t()) :: :ok | {:error, :not_authorized} - def check_authorization(request, %CloudEvent{}) do + def check_authorization(%Request{} = request, %CloudEvent{}) do %{validation_type: validation_type} = config() case validation_type do diff --git a/lib/rig/authorization_check/subscription.ex b/lib/rig/authorization_check/subscription.ex index 330a7854..c50fc016 100644 --- a/lib/rig/authorization_check/subscription.ex +++ b/lib/rig/authorization_check/subscription.ex @@ -34,7 +34,7 @@ defmodule RIG.AuthorizationCheck.Subscription do # If body is nil, there are no subscriptions to authorize. def check_authorization(%{body: nil}), do: :ok - def check_authorization(request) do + def check_authorization(%Request{} = request) do %{validation_type: validation_type} = config() case validation_type do diff --git a/lib/rig/sources/http/handler.ex b/lib/rig/sources/http/handler.ex index c8ea7f4f..a23a74f6 100644 --- a/lib/rig/sources/http/handler.ex +++ b/lib/rig/sources/http/handler.ex @@ -11,6 +11,7 @@ defmodule RIG.Sources.HTTP.Handler do alias Jason alias Plug.Conn + alias RIG.AuthorizationCheck.Request alias RIG.AuthorizationCheck.Submission alias Rig.EventFilter alias RIG.Plug.BodyReader @@ -169,7 +170,9 @@ defmodule RIG.Sources.HTTP.Handler do defp handle_event(conn, %CloudEvent{} = cloud_event) do authorized? = if conn.assigns[:check_authorization?] do - Submission.check_authorization(conn, cloud_event) == :ok + conn + |> Request.from_plug_conn() + |> Submission.check_authorization(cloud_event) == :ok else true end diff --git a/lib/rig_inbound_gateway_web/connection_init.ex b/lib/rig_inbound_gateway_web/connection_init.ex index f9a0c710..0c989371 100644 --- a/lib/rig_inbound_gateway_web/connection_init.ex +++ b/lib/rig_inbound_gateway_web/connection_init.ex @@ -40,13 +40,11 @@ defmodule RigInboundGatewayWeb.ConnectionInit do %{auth_tokens: [{"bearer", jwt}]} -> jwt end - with {:ok, jwt_subs} <- - Subscriptions.from_token(jwt), + with :ok <- SubscriptionAuthZ.check_authorization(request), + {:ok, jwt_subs} <- Subscriptions.from_token(jwt), true = String.starts_with?(request.content_type, "application/json"), - {:ok, query_subs} <- - Subscriptions.from_json(request.body), + {:ok, query_subs} <- Subscriptions.from_json(request.body), subscriptions = Enum.uniq(jwt_subs ++ query_subs), - :ok <- SubscriptionAuthZ.check_authorization(request), {:ok, _n_connections} <- ConnectionLimit.check_rate_limit() do # If the JWT is valid and points to a session, we associate this connection with # it. If that doesn't work out, we log a warning but don't tell the frontend - @@ -86,7 +84,7 @@ defmodule RigInboundGatewayWeb.ConnectionInit do "Cannot accept #{conn_type} connection #{pid}: #{msg}" end) - on_error.("Subscription denied (not authorized).") + on_error.({403, "Subscription denied (not authorized)."}) {:error, %ConnectionLimit.MaxConnectionsError{n_connections: n_connections} = ex} -> Logger.warn(fn -> diff --git a/lib/rig_inbound_gateway_web/v1/event_controller.ex b/lib/rig_inbound_gateway_web/v1/event_controller.ex index 8d03470d..5dc3615b 100644 --- a/lib/rig_inbound_gateway_web/v1/event_controller.ex +++ b/lib/rig_inbound_gateway_web/v1/event_controller.ex @@ -31,6 +31,7 @@ defmodule RigInboundGatewayWeb.V1.EventController do def publish(%{method: "POST"} = conn, _params) do conn |> with_allow_origin() + |> Plug.Conn.fetch_query_params() |> Handler.handle_http_submission(check_authorization?: true) end end diff --git a/lib/rig_inbound_gateway_web/v1/sse.ex b/lib/rig_inbound_gateway_web/v1/sse.ex index ca35ded0..2e470136 100644 --- a/lib/rig_inbound_gateway_web/v1/sse.ex +++ b/lib/rig_inbound_gateway_web/v1/sse.ex @@ -11,6 +11,7 @@ defmodule RigInboundGatewayWeb.V1.SSE do alias Result + alias RIG.AuthorizationCheck.Request alias Rig.EventFilter alias RigCloudEvents.CloudEvent alias RigInboundGateway.Events @@ -39,7 +40,7 @@ defmodule RigInboundGatewayWeb.V1.SSE do case ConnectionInit.subscriptions_query_param_to_body(query_params) do {:ok, encoded_body_or_nil} -> - request = %{ + request = %Request{ auth_info: auth_info, query_params: "", content_type: "application/json; charset=utf-8", diff --git a/lib/rig_inbound_gateway_web/v1/subscription_controller.ex b/lib/rig_inbound_gateway_web/v1/subscription_controller.ex index 217ba5ca..768860e3 100644 --- a/lib/rig_inbound_gateway_web/v1/subscription_controller.ex +++ b/lib/rig_inbound_gateway_web/v1/subscription_controller.ex @@ -142,7 +142,7 @@ defmodule RigInboundGatewayWeb.V1.SubscriptionController do do_set_subscriptions(conn, socket_pid, subscriptions) else {:error, :not_authorized} -> - conn |> put_status(:forbidden) |> text("Subscription denied.") + conn |> put_status(:forbidden) |> text("Subscription denied (not authorized).") {:error, :not_base64} -> Logger.warn(fn -> "Connection token #{connection_id} is not Base64 encoded." end) @@ -189,10 +189,22 @@ defmodule RigInboundGatewayWeb.V1.SubscriptionController do # --- defp do_set_subscriptions(conn, socket_pid, subscriptions_param) do - with {:ok, jwt_subscriptions} <- - conn.req_headers - |> get_all_claims() - |> Result.and_then(fn claims -> Subscriptions.from_jwt_claims(claims) end), + jwt_subscriptions_result = + conn.req_headers + |> JWT.parse_http_header() + # We ignore any JWT that RIG can't validate: + |> Result.filter_and_unwrap() + |> Enum.map(fn claims -> Subscriptions.from_jwt_claims(claims) end) + # For each JWT, we now either have a list of subscriptions, or an error; + # turn this into either a list of subscription-lists, or a list of errors: + |> Result.list_to_result() + # Flatten into a list of subscriptions: + |> Result.map(&Enum.concat/1) + |> Result.map_err(fn errors -> + "cannot derive subscriptions from authorization tokens: #{inspect(errors)}" + end) + + with {:ok, jwt_subscriptions} <- jwt_subscriptions_result, {:ok, passed_subscriptions} <- parse_subscriptions(subscriptions_param) do subscriptions = jwt_subscriptions ++ passed_subscriptions send(socket_pid, {:set_subscriptions, subscriptions}) @@ -224,22 +236,6 @@ defmodule RigInboundGatewayWeb.V1.SubscriptionController do # --- - # All claims, from all Authorization tokens. Returns a Result. - defp get_all_claims(headers) do - headers - |> JWT.parse_http_header() - |> Enum.reduce(%{}, fn - {:ok, claims}, acc -> Map.merge(acc, claims) - {:error, error}, _acc -> throw(error) - end) - |> Result.ok() - catch - %JWT.DecodeError{} = error -> - Result.err("invalid authorization header: #{Exception.message(error)}") - end - - # --- - defp parse_subscriptions(subscriptions) when is_list(subscriptions) do subscriptions |> Enum.map(&Subscription.new!/1) diff --git a/lib/rig_inbound_gateway_web/v1/websocket.ex b/lib/rig_inbound_gateway_web/v1/websocket.ex index 3fbab701..59103222 100644 --- a/lib/rig_inbound_gateway_web/v1/websocket.ex +++ b/lib/rig_inbound_gateway_web/v1/websocket.ex @@ -9,6 +9,7 @@ defmodule RigInboundGatewayWeb.V1.Websocket do alias Result + alias RIG.AuthorizationCheck.Request alias Rig.EventFilter alias RigCloudEvents.CloudEvent alias RigInboundGateway.Events @@ -50,7 +51,7 @@ defmodule RigInboundGatewayWeb.V1.Websocket do case ConnectionInit.subscriptions_query_param_to_body(query_params) do {:ok, encoded_body_or_nil} -> - request = %{ + request = %Request{ auth_info: auth_info, query_params: "", content_type: "application/json; charset=utf-8", @@ -73,15 +74,22 @@ defmodule RigInboundGatewayWeb.V1.Websocket do {:reply, frame(Events.welcome_event()), state, :hibernate} end - on_error = fn _reason -> + on_error = fn reason -> + Logger.warn(fn -> "websocket error: #{inspect(reason)}" end) # WebSocket close frames may include a payload to indicate the error, but we found # that error message must be really short; if it isn't, the `{:close, :normal, # payload}` is silently converted to `{:close, :abnormal, nil}`. Since there is no # limit mentioned in the spec (RFC-6455), we opt for consistent responses, # omitting the detailed error. - reason = "Bad request." + reply = + case reason do + {403, _} -> "Not authorized." + {code, _} -> "#{code}: Bad request." + _ -> "Bad request." + end + # This will close the connection: - {:reply, closing_frame(reason), :no_state} + {:reply, closing_frame(reply), :no_state} end ConnectionInit.set_up( diff --git a/test/event_subscription_test.exs b/test/event_subscription_test.exs index 21b5e1d3..a96eec10 100644 --- a/test/event_subscription_test.exs +++ b/test/event_subscription_test.exs @@ -308,7 +308,7 @@ defmodule RigInboundGateway.EventSubscriptionTest do end end - test "An invalid JWT causes the request to fail." do + test "An invalid JWT is ignored." do invalid_jwt = "this is not a valid JWT" for client <- @clients do @@ -317,18 +317,11 @@ defmodule RigInboundGateway.EventSubscriptionTest do {%{"data" => []}, ref} = client.read_subscriptions_set_event(ref) - error = - assert_raise SubscriptionError, fn -> - welcome_event - |> connection_id() - |> update_subscriptions([], invalid_jwt) - end + welcome_event + |> connection_id() + |> update_subscriptions([], invalid_jwt) - assert error.body =~ ~r/invalid authorization header/ - assert error.code == 400 - - # The request has failed, so there should be no subscriptions_set event: - {:ok, ref} = client.refute_receive(ref) + assert {%{"data" => []}, ref} = client.read_subscriptions_set_event(ref) client.disconnect(ref) end diff --git a/test/longpolling_subscription_test.exs b/test/longpolling_subscription_test.exs index 5faebb37..6de2beb5 100644 --- a/test/longpolling_subscription_test.exs +++ b/test/longpolling_subscription_test.exs @@ -296,26 +296,24 @@ defmodule RigInboundGateway.LongpollingSubscriptionTest do assert error.body =~ ~r/could not parse given subscriptions/ end - test "An invalid JWT causes the request to fail." do + test "An invalid JWT is ignored." do invalid_jwt = "this is not a valid JWT" {:ok, cookies} = LongpollingClient.connect(subscriptions: []) - {:ok, events, _cookies} = LongpollingClient.read_events(cookies) + {:ok, events, cookies} = LongpollingClient.read_events(cookies) subscriptions_set_event = Enum.at(events, 1) %{"data" => []} = subscriptions_set_event welcome_event = Enum.at(events, 0) - error = - assert_raise SubscriptionError, fn -> - welcome_event - |> connection_id() - |> update_subscriptions([], invalid_jwt) - end + welcome_event + |> connection_id() + |> update_subscriptions([], invalid_jwt) - assert error.code == 400 - assert error.body =~ ~r/invalid authorization header/ + {:ok, events, cookies} = LongpollingClient.read_events(cookies) + subscriptions_set_event = Enum.at(events, 0) + %{"data" => []} = subscriptions_set_event end end diff --git a/test/rig_inbound_gateway/event_submission/external_check_test.exs b/test/rig_inbound_gateway/event_submission/external_check_test.exs new file mode 100644 index 00000000..82aa14d2 --- /dev/null +++ b/test/rig_inbound_gateway/event_submission/external_check_test.exs @@ -0,0 +1,61 @@ +defmodule RigInboundGateway.EventSubmission.ExternalCheckTest do + @moduledoc """ + An external service may be used to allow or deny publishing events. + """ + # Cannot be async because environment variables are modified: + use ExUnit.Case, async: false + + import FakeServer + alias FakeServer.Response + + alias HTTPoison + + alias RIG.JWT + + @hostname "localhost" + @eventhub_port Confex.fetch_env!(:rig, RigInboundGatewayWeb.Endpoint)[:http][:port] + @api_port Confex.fetch_env!(:rig, RigApi.Endpoint)[:http][:port] + @public_submission_url "http://#{@hostname}:#{@eventhub_port}/_rig/v1/events" + @private_submission_url "http://#{@hostname}:#{@api_port}/v3/messages" + @fake_validation_service_port 59_349 + + @event_json """ + { "id": "2", "source": "nil", "specversion": "0.2", "type": "greeting" } + """ + + @var_name "SUBMISSION_CHECK" + @orig_val System.get_env(@var_name) + setup_all do + System.put_env(@var_name, "http://localhost:#{@fake_validation_service_port}") + + on_exit(fn -> + case @orig_val do + nil -> System.delete_env(@var_name) + _ -> System.put_env(@var_name, @orig_val) + end + end) + end + + test "The private API doesn't use the external service." do + headers = %{"content-type" => "application/json"} + assert %{status_code: 202} = HTTPoison.post!(@private_submission_url, @event_json, headers) + end + + test_with_server "The public API allows publishing if the external service accepts.", + port: @fake_validation_service_port do + # The fake subscription-validation service accepts anything: + route("/", Response.ok!("Ok")) + + headers = %{"content-type" => "application/json"} + assert %{status_code: 202} = HTTPoison.post!(@public_submission_url, @event_json, headers) + end + + test_with_server "The public API denies publishing if the external service rejects.", + port: @fake_validation_service_port do + # The fake subscription-validation service rejects anything: + route("/", Response.forbidden!("Go away!")) + + headers = %{"content-type" => "application/json"} + assert %{status_code: 403} = HTTPoison.post!(@public_submission_url, @event_json, headers) + end +end diff --git a/test/rig_inbound_gateway/event_submission/jwt_check_test.exs b/test/rig_inbound_gateway/event_submission/jwt_check_test.exs new file mode 100644 index 00000000..189c1e23 --- /dev/null +++ b/test/rig_inbound_gateway/event_submission/jwt_check_test.exs @@ -0,0 +1,64 @@ +defmodule RigInboundGateway.EventSubmission.JwtCheckTest do + @moduledoc """ + The jwt_validation setting requires a valid JWT to publish events. + """ + # Cannot be async because environment variables are modified: + use ExUnit.Case, async: false + + alias HTTPoison + + alias RIG.JWT + + @hostname "localhost" + @eventhub_port Confex.fetch_env!(:rig, RigInboundGatewayWeb.Endpoint)[:http][:port] + @api_port Confex.fetch_env!(:rig, RigApi.Endpoint)[:http][:port] + @public_submission_url "http://#{@hostname}:#{@eventhub_port}/_rig/v1/events" + @private_submission_url "http://#{@hostname}:#{@api_port}/v3/messages" + + @event_json """ + { "id": "2", "source": "nil", "specversion": "0.2", "type": "greeting" } + """ + + @var_name "SUBMISSION_CHECK" + @orig_val System.get_env(@var_name) + setup_all do + System.put_env(@var_name, "jwt_validation") + + on_exit(fn -> + case @orig_val do + nil -> System.delete_env(@var_name) + _ -> System.put_env(@var_name, @orig_val) + end + end) + end + + test "Both APIs allow publishing an event when using a valid JWT." do + valid_jwt = JWT.encode(%{}) + + headers = %{ + "content-type" => "application/json", + "authorization" => "Bearer #{valid_jwt}" + } + + assert %{status_code: 202} = HTTPoison.post!(@public_submission_url, @event_json, headers) + assert %{status_code: 202} = HTTPoison.post!(@private_submission_url, @event_json, headers) + end + + test "The inbound API denies publishing an event if there is no JWT present, but the outbound one doesn't care." do + headers = %{"content-type" => "application/json"} + assert %{status_code: 403} = HTTPoison.post!(@public_submission_url, @event_json, headers) + assert %{status_code: 202} = HTTPoison.post!(@private_submission_url, @event_json, headers) + end + + test "The inbound API deny publishing an event if the given JWT cannot be validated, but the outbound one doesn't care." do + invalid_jwt = "definitely not a valid jwt" + + headers = %{ + "content-type" => "application/json", + "authorization" => "Bearer #{invalid_jwt}" + } + + assert %{status_code: 403} = HTTPoison.post!(@public_submission_url, @event_json, headers) + assert %{status_code: 202} = HTTPoison.post!(@private_submission_url, @event_json, headers) + end +end diff --git a/test/rig_inbound_gateway/event_submission/no_check_test.exs b/test/rig_inbound_gateway/event_submission/no_check_test.exs new file mode 100644 index 00000000..c964aa18 --- /dev/null +++ b/test/rig_inbound_gateway/event_submission/no_check_test.exs @@ -0,0 +1,38 @@ +defmodule RigInboundGateway.EventSubmission.NoCheckTest do + @moduledoc """ + The no_check setting lets anyone publish events. + """ + # Cannot be async because environment variables are modified: + use ExUnit.Case, async: false + + alias HTTPoison + + @hostname "localhost" + @eventhub_port Confex.fetch_env!(:rig, RigInboundGatewayWeb.Endpoint)[:http][:port] + @api_port Confex.fetch_env!(:rig, RigApi.Endpoint)[:http][:port] + @public_submission_url "http://#{@hostname}:#{@eventhub_port}/_rig/v1/events" + @private_submission_url "http://#{@hostname}:#{@api_port}/v3/messages" + + @event_json """ + { "id": "2", "source": "nil", "specversion": "0.2", "type": "greeting" } + """ + + @var_name "SUBMISSION_CHECK" + @orig_val System.get_env(@var_name) + setup_all do + System.put_env(@var_name, "no_check") + + on_exit(fn -> + case @orig_val do + nil -> System.delete_env(@var_name) + _ -> System.put_env(@var_name, @orig_val) + end + end) + end + + test "Anyone can publish an event using any of the APIs" do + headers = %{"content-type" => "application/json"} + assert %{status_code: 202} = HTTPoison.post!(@public_submission_url, @event_json, headers) + assert %{status_code: 202} = HTTPoison.post!(@private_submission_url, @event_json, headers) + end +end diff --git a/test/rig_inbound_gateway/event_subscription/external_check_test.exs b/test/rig_inbound_gateway/event_subscription/external_check_test.exs index d33ee32a..40810ffc 100644 --- a/test/rig_inbound_gateway/event_subscription/external_check_test.exs +++ b/test/rig_inbound_gateway/event_subscription/external_check_test.exs @@ -1,8 +1,8 @@ defmodule RigInboundGateway.EventSubscription.ExternalCheckTest do @moduledoc """ - An external service can determine, whether or not to allow subscriptions. + An external service may be used to allow or deny subscriptions. """ - # Cannot be async because the extractor configuration is modified: + # Cannot be async because environment variables are modified: use ExUnit.Case, async: false import FakeServer @@ -111,7 +111,7 @@ defmodule RigInboundGateway.EventSubscription.ExternalCheckTest do end end - test_with_server "The external service receives the Authorization header.", + test_with_server "The external service receives the Authorization header, even if the JWT it contains can't be validated.", port: @fake_validation_service_port do # The fake subscription-validation service accepts anything: route("/", fn @@ -124,7 +124,7 @@ defmodule RigInboundGateway.EventSubscription.ExternalCheckTest do ) end) - jwt = JWT.encode(%{}) + jwt = "use case: valid JWT but RIG doesn't have the validation key" for client <- @clients do {:ok, ref} = client.connect() diff --git a/test/rig_inbound_gateway/event_subscription/jwt_check_test.exs b/test/rig_inbound_gateway/event_subscription/jwt_check_test.exs new file mode 100644 index 00000000..68229f18 --- /dev/null +++ b/test/rig_inbound_gateway/event_subscription/jwt_check_test.exs @@ -0,0 +1,152 @@ +defmodule RigInboundGateway.EventSubscription.JwtCheckTest do + @moduledoc """ + The jwt_validation setting requires a valid JWT to subscribe to events. + """ + # Cannot be async because environment variables are modified: + use ExUnit.Case, async: false + + defmodule SubscriptionError do + defexception [:code, :body] + def exception(code, body), do: %__MODULE__{code: code, body: body} + + def message(%__MODULE__{code: code, body: body}), + do: "updating subscriptions failed with #{inspect(code)}: #{inspect(body)}" + end + + alias HTTPoison + alias Jason + + alias RIG.JWT + + @clients [SseClient, WsClient] + + @hostname "localhost" + @eventhub_port Confex.fetch_env!(:rig, RigInboundGatewayWeb.Endpoint)[:http][:port] + + @var_name "SUBSCRIPTION_CHECK" + @orig_val System.get_env(@var_name) + setup_all do + System.put_env(@var_name, "jwt_validation") + + on_exit(fn -> + case @orig_val do + nil -> System.delete_env(@var_name) + _ -> System.put_env(@var_name, @orig_val) + end + end) + end + + defp connection_id(welcome_event) + defp connection_id(%{"data" => %{"connection_token" => connection_id}}), do: connection_id + + defp update_subscriptions(connection_id, subscriptions, jwt \\ nil) do + url = + "http://#{@hostname}:#{@eventhub_port}/_rig/v1/connection/sse/#{connection_id}/subscriptions" + + body = Jason.encode!(%{"subscriptions" => subscriptions}) + + headers = + [{"content-type", "application/json"}] ++ + if is_nil(jwt), do: [], else: [{"authorization", "Bearer #{jwt}"}] + + case HTTPoison.put!(url, body, headers) do + %HTTPoison.Response{status_code: 204} -> + :ok + + %HTTPoison.Response{status_code: code, body: body} -> + raise SubscriptionError, code: code, body: body + end + end + + test "A valid JWT allows to subscribe to anything." do + valid_jwt = JWT.encode(%{}) + + for client <- @clients do + # Is accepted when passed immediately: + {:ok, ref} = client.connect(subscriptions: [%{"eventType" => "greeting1"}], jwt: valid_jwt) + {welcome_event, ref} = client.read_welcome_event(ref) + + assert {%{"data" => [%{"eventType" => "greeting1", "oneOf" => []}]}, ref} = + client.read_subscriptions_set_event(ref) + + # Is also accepted when using the subscriptions endpoint: + assert :ok = + welcome_event + |> connection_id() + |> update_subscriptions([%{"eventType" => "greeting2"}], valid_jwt) + + assert {%{"data" => [%{"eventType" => "greeting2", "oneOf" => []}]}, ref} = + client.read_subscriptions_set_event(ref) + + client.disconnect(ref) + end + end + + test "Subscriptions are denied if no JWT is supplied." do + for client <- @clients do + # Is denied when passed immediately: + + error = + assert_raise TestClient.ConnectionError, fn -> + client.connect(subscriptions: [%{"eventType" => "greeting1"}]) + end + + assert Exception.message(error) =~ ~r/not authorized/i + # WebSocket doesn't return error codes like this.. + assert error.code == 403 || error.code == :normal + + # Is also denied when using the subscriptions endpoint: + + {:ok, ref} = client.connect() + {welcome_event, ref} = client.read_welcome_event(ref) + + error = + assert_raise SubscriptionError, fn -> + welcome_event + |> connection_id() + |> update_subscriptions([%{"eventType" => "greeting2"}]) + end + + assert Exception.message(error) =~ ~r/not authorized/i + # WebSocket doesn't return error codes like this.. + assert error.code == 403 || error.code == :normal + + client.disconnect(ref) + end + end + + test "Subscriptions are denied if the validity of a JWT cannot be established." do + invalid_jwt = "not a valid jwt" + + for client <- @clients do + # Is denied when passed immediately: + + error = + assert_raise TestClient.ConnectionError, fn -> + client.connect(subscriptions: [%{"eventType" => "greeting1"}], jwt: invalid_jwt) + end + + assert Exception.message(error) =~ ~r/not authorized/i + # WebSocket doesn't return error codes like this.. + assert error.code == 403 || error.code == :normal, inspect(error.code) + + # Is also denied when using the subscriptions endpoint: + + {:ok, ref} = client.connect() + {welcome_event, ref} = client.read_welcome_event(ref) + + error = + assert_raise SubscriptionError, fn -> + welcome_event + |> connection_id() + |> update_subscriptions([%{"eventType" => "greeting2"}], invalid_jwt) + end + + assert Exception.message(error) =~ ~r/not authorized/i + # WebSocket doesn't return error codes like this.. + assert error.code == 403 || error.code == :normal + + client.disconnect(ref) + end + end +end diff --git a/test/rig_inbound_gateway/event_subscription/no_check_test.exs b/test/rig_inbound_gateway/event_subscription/no_check_test.exs new file mode 100644 index 00000000..dc16057d --- /dev/null +++ b/test/rig_inbound_gateway/event_subscription/no_check_test.exs @@ -0,0 +1,80 @@ +defmodule RigInboundGateway.EventSubscription.NoCheckTest do + @moduledoc """ + The no_check setting lets anyone subscribe to anything. + """ + # Cannot be async because environment variables are modified: + use ExUnit.Case, async: false + + defmodule SubscriptionError do + defexception [:code, :body] + def exception(code, body), do: %__MODULE__{code: code, body: body} + + def message(%__MODULE__{code: code, body: body}), + do: "updating subscriptions failed with #{inspect(code)}: #{inspect(body)}" + end + + alias HTTPoison + alias Jason + + @clients [SseClient, WsClient] + + @hostname "localhost" + @eventhub_port Confex.fetch_env!(:rig, RigInboundGatewayWeb.Endpoint)[:http][:port] + + @var_name "SUBSCRIPTION_CHECK" + @orig_val System.get_env(@var_name) + setup_all do + System.put_env(@var_name, "no_check") + + on_exit(fn -> + case @orig_val do + nil -> System.delete_env(@var_name) + _ -> System.put_env(@var_name, @orig_val) + end + end) + end + + defp connection_id(welcome_event) + defp connection_id(%{"data" => %{"connection_token" => connection_id}}), do: connection_id + + defp update_subscriptions(connection_id, subscriptions, jwt \\ nil) do + url = + "http://#{@hostname}:#{@eventhub_port}/_rig/v1/connection/sse/#{connection_id}/subscriptions" + + body = Jason.encode!(%{"subscriptions" => subscriptions}) + + headers = + [{"content-type", "application/json"}] ++ + if is_nil(jwt), do: [], else: [{"authorization", "Bearer #{jwt}"}] + + case HTTPoison.put!(url, body, headers) do + %HTTPoison.Response{status_code: 204} -> + :ok + + %HTTPoison.Response{status_code: code, body: body} -> + raise SubscriptionError, code: code, body: body + end + end + + test "Anyone can subscribe to anything." do + for client <- @clients do + # Is accepted when passed immediately: + {:ok, ref} = client.connect(subscriptions: [%{"eventType" => "greeting1"}]) + {welcome_event, ref} = client.read_welcome_event(ref) + + assert {%{"data" => [%{"eventType" => "greeting1", "oneOf" => []}]}, ref} = + client.read_subscriptions_set_event(ref) + + # Is also accepted when using the subscriptions endpoint: + assert :ok = + welcome_event + |> connection_id() + |> update_subscriptions([%{"eventType" => "greeting2"}]) + + assert {%{"data" => [%{"eventType" => "greeting2", "oneOf" => []}]}, ref} = + client.read_subscriptions_set_event(ref) + + client.disconnect(ref) + end + end +end