diff --git a/README.md b/README.md index 4e13e44df..7dd223bf3 100644 --- a/README.md +++ b/README.md @@ -243,6 +243,7 @@ This is the list of operational codes that can help you understand your deployme | ChannelRateLimitReached | The number of channels you can create has reached its limit | | ConnectionRateLimitReached | The number of connected clients as reached its limit | | ClientJoinRateLimitReached | The rate of joins per second from your clients has reached the channel limits | +| DatabaseConnectionRateLimitReached | The rate of attempts to connect to tenants database has reached the limit | | MessagePerSecondRateLimitReached | The rate of messages per second from your clients has reached the channel limits | | RealtimeDisabledForTenant | Realtime has been disabled for the tenant | | UnableToConnectToTenantDatabase | Realtime was not able to connect to the tenant's database | diff --git a/lib/realtime/tenants.ex b/lib/realtime/tenants.ex index 63965abea..db2a02cc4 100644 --- a/lib/realtime/tenants.ex +++ b/lib/realtime/tenants.ex @@ -328,6 +328,32 @@ defmodule Realtime.Tenants do %RateCounter.Args{id: {:channel, :authorization_errors, external_id}, opts: opts} end + @connect_per_second_default 10 + @doc "RateCounter arguments for counting connect per second." + @spec connect_per_second_rate(Tenant.t() | String.t()) :: RateCounter.Args.t() + def connect_per_second_rate(%Tenant{external_id: external_id}) do + connect_per_second_rate(external_id) + end + + def connect_per_second_rate(tenant_id) do + opts = [ + max_bucket_len: 10, + limit: [ + value: @connect_per_second_default, + measurement: :sum, + log_fn: fn -> + Logger.critical( + "DatabaseConnectionRateLimitReached: Too many connection attempts against the tenant database", + external_id: tenant_id, + project: tenant_id + ) + end + ] + ] + + %RateCounter.Args{id: {:database, :connect, tenant_id}, opts: opts} + end + defp pool_size(%{extensions: [%{settings: settings} | _]}) do Database.pool_size_by_application_name("realtime_connect", settings) end diff --git a/lib/realtime/tenants/connect.ex b/lib/realtime/tenants/connect.ex index 3d8f39833..0ee43f161 100644 --- a/lib/realtime/tenants/connect.ex +++ b/lib/realtime/tenants/connect.ex @@ -11,8 +11,9 @@ defmodule Realtime.Tenants.Connect do use Realtime.Logs - alias Realtime.Tenants.Rebalancer alias Realtime.Api.Tenant + alias Realtime.GenCounter + alias Realtime.RateCounter alias Realtime.Rpc alias Realtime.Tenants alias Realtime.Tenants.Connect.CheckConnection @@ -20,6 +21,7 @@ defmodule Realtime.Tenants.Connect do alias Realtime.Tenants.Connect.Piper alias Realtime.Tenants.Connect.RegisterProcess alias Realtime.Tenants.Migrations + alias Realtime.Tenants.Rebalancer alias Realtime.Tenants.ReplicationConnection alias Realtime.UsersCounter @@ -39,11 +41,8 @@ defmodule Realtime.Tenants.Connect do @doc "Check if Connect has finished setting up connections" def ready?(tenant_id) do case whereis(tenant_id) do - pid when is_pid(pid) -> - GenServer.call(pid, :ready?) - - _ -> - false + pid when is_pid(pid) -> GenServer.call(pid, :ready?) + _ -> false end end @@ -55,24 +54,29 @@ defmodule Realtime.Tenants.Connect do | {:error, :tenant_database_unavailable} | {:error, :initializing} | {:error, :tenant_database_connection_initializing} - | {:error, :tenant_db_too_many_connections} + | {:error, :connect_rate_limit_reached} | {:error, :rpc_error, term()} def lookup_or_start_connection(tenant_id, opts \\ []) when is_binary(tenant_id) do - case get_status(tenant_id) do - {:ok, conn} -> - {:ok, conn} + rate_args = Tenants.connect_per_second_rate(tenant_id) + RateCounter.new(rate_args) - {:error, :tenant_database_unavailable} -> - {:error, :tenant_database_unavailable} + with {:ok, %{limit: %{triggered: false}}} <- RateCounter.get(rate_args), + {:ok, conn} <- get_status(tenant_id) do + {:ok, conn} + else + {:ok, %{limit: %{triggered: true}}} -> + {:error, :connect_rate_limit_reached} {:error, :tenant_database_connection_initializing} -> + GenCounter.add(rate_args.id) call_external_node(tenant_id, opts) {:error, :initializing} -> {:error, :tenant_database_unavailable} - {:error, :tenant_db_too_many_connections} -> - {:error, :tenant_db_too_many_connections} + {:error, reason} -> + GenCounter.add(rate_args.id) + {:error, reason} end end diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index 1d58d9da7..91a417c21 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -167,6 +167,10 @@ defmodule RealtimeWeb.RealtimeChannel do msg = "Database can't accept more connections, Realtime won't connect" log_error(socket, "DatabaseLackOfConnections", msg) + {:error, :connect_rate_limit_reached} -> + msg = "Too many database connections attempts per second" + log_error(socket, "DatabaseConnectionRateLimitReached", msg) + {:error, :unable_to_set_policies, error} -> log_error(socket, "UnableToSetPolicies", error) {:error, %{reason: "Realtime was unable to connect to the project database"}} @@ -213,6 +217,9 @@ defmodule RealtimeWeb.RealtimeChannel do {:error, :invalid_replay_params} -> log_error(socket, "UnableToReplayMessages", "Replay params are not valid") + {:error, :invalid_replay_channel} -> + log_error(socket, "UnableToReplayMessages", "Replay is not allowed for public channels") + {:error, error} -> log_error(socket, "UnknownErrorOnChannel", error) {:error, %{reason: "Unknown Error on Channel"}} @@ -790,7 +797,7 @@ defmodule RealtimeWeb.RealtimeChannel do end defp maybe_replay_messages(%{"broadcast" => %{"replay" => _}}, _sub_topic, _db_conn, false = _private?) do - {:error, :invalid_replay_params} + {:error, :invalid_replay_channel} end defp maybe_replay_messages(%{"broadcast" => %{"replay" => replay_params}}, sub_topic, db_conn, true = _private?) diff --git a/mix.exs b/mix.exs index 9c66b3dde..4b0b1f40c 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.51.3", + version: "2.51.5", elixir: "~> 1.17.3", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/realtime/tenants/connect_test.exs b/test/realtime/tenants/connect_test.exs index 8ba462b27..a52973d53 100644 --- a/test/realtime/tenants/connect_test.exs +++ b/test/realtime/tenants/connect_test.exs @@ -515,6 +515,53 @@ defmodule Realtime.Tenants.ConnectTest do assert capture_log(fn -> assert {:error, :rpc_error, _} = Connect.lookup_or_start_connection("tenant") end) =~ "project=tenant external_id=tenant [error] ErrorOnRpcCall" end + + test "rate limit connect when too many connections against bad database", %{tenant: tenant} do + extension = %{ + "type" => "postgres_cdc_rls", + "settings" => %{ + "db_host" => "127.0.0.1", + "db_name" => "postgres", + "db_user" => "supabase_admin", + "db_password" => "postgres", + "poll_interval" => 100, + "poll_max_changes" => 100, + "poll_max_record_bytes" => 1_048_576, + "region" => "us-east-1", + "ssl_enforced" => true + } + } + + {:ok, tenant} = update_extension(tenant, extension) + + log = + capture_log(fn -> + res = + for _ <- 1..50 do + Process.sleep(200) + Connect.lookup_or_start_connection(tenant.external_id) + end + + assert Enum.any?(res, fn {_, res} -> res == :connect_rate_limit_reached end) + end) + + assert log =~ "DatabaseConnectionRateLimitReached: Too many connection attempts against the tenant database" + end + + test "rate limit connect will not trigger if connection is successful", %{tenant: tenant} do + log = + capture_log(fn -> + res = + for _ <- 1..20 do + Process.sleep(500) + Connect.lookup_or_start_connection(tenant.external_id) + end + + refute Enum.any?(res, fn {_, res} -> res == :tenant_db_too_many_connections end) + end) + + refute log =~ "DatabaseConnectionRateLimitReached: Too many connection attempts against the tenant database" + end end describe "shutdown/1" do diff --git a/test/realtime_web/channels/realtime_channel_test.exs b/test/realtime_web/channels/realtime_channel_test.exs index 0a0d8aca9..ae6c1734a 100644 --- a/test/realtime_web/channels/realtime_channel_test.exs +++ b/test/realtime_web/channels/realtime_channel_test.exs @@ -153,7 +153,7 @@ defmodule RealtimeWeb.RealtimeChannelTest do assert { :error, - %{reason: "UnableToReplayMessages: Replay params are not valid"} + %{reason: "UnableToReplayMessages: Replay is not allowed for public channels"} } = subscribe_and_join(socket, "realtime:test", %{"config" => config}) refute_receive _any diff --git a/test/realtime_web/controllers/broadcast_controller_test.exs b/test/realtime_web/controllers/broadcast_controller_test.exs index 9c38d58bd..7bd426353 100644 --- a/test/realtime_web/controllers/broadcast_controller_test.exs +++ b/test/realtime_web/controllers/broadcast_controller_test.exs @@ -272,6 +272,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do } do request_events_key = Tenants.requests_per_second_key(tenant) broadcast_events_key = Tenants.events_per_second_key(tenant) + connect_events_key = Tenants.connect_per_second_rate(tenant).id expect(TenantBroadcaster, :pubsub_broadcast, 5, fn _, _, _, _ -> :ok end) messages_to_send = @@ -290,7 +291,10 @@ defmodule RealtimeWeb.BroadcastControllerTest do GenCounter |> expect(:add, fn ^request_events_key -> :ok end) - |> expect(:add, length(messages), fn ^broadcast_events_key -> :ok end) + |> expect(:add, length(messages), fn + ^broadcast_events_key -> :ok + ^connect_events_key -> :ok + end) conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages}) @@ -326,6 +330,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do } do request_events_key = Tenants.requests_per_second_key(tenant) broadcast_events_key = Tenants.events_per_second_key(tenant) + connect_events_key = Tenants.connect_per_second_rate(tenant).id expect(TenantBroadcaster, :pubsub_broadcast, 6, fn _, _, _, _ -> :ok end) channels = @@ -354,7 +359,10 @@ defmodule RealtimeWeb.BroadcastControllerTest do GenCounter |> expect(:add, fn ^request_events_key -> :ok end) - |> expect(:add, length(messages), fn ^broadcast_events_key -> :ok end) + |> expect(:add, length(messages), fn + ^broadcast_events_key -> :ok + ^connect_events_key -> :ok + end) conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages}) @@ -408,6 +416,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do } do request_events_key = Tenants.requests_per_second_key(tenant) broadcast_events_key = Tenants.events_per_second_key(tenant) + connect_events_key = Tenants.connect_per_second_rate(tenant).id expect(TenantBroadcaster, :pubsub_broadcast, 5, fn _, _, _, _ -> :ok end) messages_to_send = @@ -428,7 +437,9 @@ defmodule RealtimeWeb.BroadcastControllerTest do GenCounter |> expect(:add, fn ^request_events_key -> :ok end) - |> expect(:add, length(messages_to_send), fn ^broadcast_events_key -> :ok end) + # remove the one message that won't be broadcasted for this user + |> expect(:add, 1, fn ^connect_events_key -> :ok end) + |> expect(:add, length(messages) - 1, fn ^broadcast_events_key -> :ok end) conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages}) @@ -482,7 +493,6 @@ defmodule RealtimeWeb.BroadcastControllerTest do GenCounter |> expect(:add, fn ^request_events_key -> 1 end) - |> reject(:add, 1) conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})