Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ This is the list of operational codes that can help you understand your deployme
| ChannelRateLimitReached | The number of channels you can create has reached its limit |
| ConnectionRateLimitReached | The number of connected clients as reached its limit |
| ClientJoinRateLimitReached | The rate of joins per second from your clients has reached the channel limits |
| DatabaseConnectionRateLimitReached | The rate of attempts to connect to tenants database has reached the limit |
| MessagePerSecondRateLimitReached | The rate of messages per second from your clients has reached the channel limits |
| RealtimeDisabledForTenant | Realtime has been disabled for the tenant |
| UnableToConnectToTenantDatabase | Realtime was not able to connect to the tenant's database |
Expand Down
26 changes: 26 additions & 0 deletions lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,32 @@ defmodule Realtime.Tenants do
%RateCounter.Args{id: {:channel, :authorization_errors, external_id}, opts: opts}
end

@connect_per_second_default 10
@doc "RateCounter arguments for counting connect per second."
@spec connect_per_second_rate(Tenant.t() | String.t()) :: RateCounter.Args.t()
def connect_per_second_rate(%Tenant{external_id: external_id}) do
connect_per_second_rate(external_id)
end

def connect_per_second_rate(tenant_id) do
opts = [
max_bucket_len: 10,
limit: [
value: @connect_per_second_default,
measurement: :sum,
log_fn: fn ->
Logger.critical(
"DatabaseConnectionRateLimitReached: Too many connection attempts against the tenant database",
external_id: tenant_id,
project: tenant_id
)
end
]
]

%RateCounter.Args{id: {:database, :connect, tenant_id}, opts: opts}
end

defp pool_size(%{extensions: [%{settings: settings} | _]}) do
Database.pool_size_by_application_name("realtime_connect", settings)
end
Expand Down
32 changes: 18 additions & 14 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@ defmodule Realtime.Tenants.Connect do

use Realtime.Logs

alias Realtime.Tenants.Rebalancer
alias Realtime.Api.Tenant
alias Realtime.GenCounter
alias Realtime.RateCounter
alias Realtime.Rpc
alias Realtime.Tenants
alias Realtime.Tenants.Connect.CheckConnection
alias Realtime.Tenants.Connect.GetTenant
alias Realtime.Tenants.Connect.Piper
alias Realtime.Tenants.Connect.RegisterProcess
alias Realtime.Tenants.Migrations
alias Realtime.Tenants.Rebalancer
alias Realtime.Tenants.ReplicationConnection
alias Realtime.UsersCounter

Expand All @@ -39,11 +41,8 @@ defmodule Realtime.Tenants.Connect do
@doc "Check if Connect has finished setting up connections"
def ready?(tenant_id) do
case whereis(tenant_id) do
pid when is_pid(pid) ->
GenServer.call(pid, :ready?)

_ ->
false
pid when is_pid(pid) -> GenServer.call(pid, :ready?)
_ -> false
end
end

Expand All @@ -55,24 +54,29 @@ defmodule Realtime.Tenants.Connect do
| {:error, :tenant_database_unavailable}
| {:error, :initializing}
| {:error, :tenant_database_connection_initializing}
| {:error, :tenant_db_too_many_connections}
| {:error, :connect_rate_limit_reached}
| {:error, :rpc_error, term()}
def lookup_or_start_connection(tenant_id, opts \\ []) when is_binary(tenant_id) do
case get_status(tenant_id) do
{:ok, conn} ->
{:ok, conn}
rate_args = Tenants.connect_per_second_rate(tenant_id)
RateCounter.new(rate_args)

{:error, :tenant_database_unavailable} ->
{:error, :tenant_database_unavailable}
with {:ok, %{limit: %{triggered: false}}} <- RateCounter.get(rate_args),
{:ok, conn} <- get_status(tenant_id) do
{:ok, conn}
else
{:ok, %{limit: %{triggered: true}}} ->
{:error, :connect_rate_limit_reached}

{:error, :tenant_database_connection_initializing} ->
GenCounter.add(rate_args.id)
call_external_node(tenant_id, opts)

{:error, :initializing} ->
{:error, :tenant_database_unavailable}

{:error, :tenant_db_too_many_connections} ->
{:error, :tenant_db_too_many_connections}
{:error, reason} ->
GenCounter.add(rate_args.id)
{:error, reason}
end
end

Expand Down
9 changes: 8 additions & 1 deletion lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ defmodule RealtimeWeb.RealtimeChannel do
msg = "Database can't accept more connections, Realtime won't connect"
log_error(socket, "DatabaseLackOfConnections", msg)

{:error, :connect_rate_limit_reached} ->
msg = "Too many database connections attempts per second"
log_error(socket, "DatabaseConnectionRateLimitReached", msg)

{:error, :unable_to_set_policies, error} ->
log_error(socket, "UnableToSetPolicies", error)
{:error, %{reason: "Realtime was unable to connect to the project database"}}
Expand Down Expand Up @@ -213,6 +217,9 @@ defmodule RealtimeWeb.RealtimeChannel do
{:error, :invalid_replay_params} ->
log_error(socket, "UnableToReplayMessages", "Replay params are not valid")

{:error, :invalid_replay_channel} ->
log_error(socket, "UnableToReplayMessages", "Replay is not allowed for public channels")

{:error, error} ->
log_error(socket, "UnknownErrorOnChannel", error)
{:error, %{reason: "Unknown Error on Channel"}}
Expand Down Expand Up @@ -790,7 +797,7 @@ defmodule RealtimeWeb.RealtimeChannel do
end

defp maybe_replay_messages(%{"broadcast" => %{"replay" => _}}, _sub_topic, _db_conn, false = _private?) do
{:error, :invalid_replay_params}
{:error, :invalid_replay_channel}
end

defp maybe_replay_messages(%{"broadcast" => %{"replay" => replay_params}}, sub_topic, db_conn, true = _private?)
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.51.3",
version: "2.51.5",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
47 changes: 47 additions & 0 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,53 @@ defmodule Realtime.Tenants.ConnectTest do
assert capture_log(fn -> assert {:error, :rpc_error, _} = Connect.lookup_or_start_connection("tenant") end) =~
"project=tenant external_id=tenant [error] ErrorOnRpcCall"
end

test "rate limit connect when too many connections against bad database", %{tenant: tenant} do
extension = %{
"type" => "postgres_cdc_rls",
"settings" => %{
"db_host" => "127.0.0.1",
"db_name" => "postgres",
"db_user" => "supabase_admin",
"db_password" => "postgres",
"poll_interval" => 100,
"poll_max_changes" => 100,
"poll_max_record_bytes" => 1_048_576,
"region" => "us-east-1",
"ssl_enforced" => true
}
}

{:ok, tenant} = update_extension(tenant, extension)

log =
capture_log(fn ->
res =
for _ <- 1..50 do
Process.sleep(200)
Connect.lookup_or_start_connection(tenant.external_id)
end

assert Enum.any?(res, fn {_, res} -> res == :connect_rate_limit_reached end)
end)

assert log =~ "DatabaseConnectionRateLimitReached: Too many connection attempts against the tenant database"
end

test "rate limit connect will not trigger if connection is successful", %{tenant: tenant} do
log =
capture_log(fn ->
res =
for _ <- 1..20 do
Process.sleep(500)
Connect.lookup_or_start_connection(tenant.external_id)
end

refute Enum.any?(res, fn {_, res} -> res == :tenant_db_too_many_connections end)
end)

refute log =~ "DatabaseConnectionRateLimitReached: Too many connection attempts against the tenant database"
end
end

describe "shutdown/1" do
Expand Down
2 changes: 1 addition & 1 deletion test/realtime_web/channels/realtime_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ defmodule RealtimeWeb.RealtimeChannelTest do

assert {
:error,
%{reason: "UnableToReplayMessages: Replay params are not valid"}
%{reason: "UnableToReplayMessages: Replay is not allowed for public channels"}
} = subscribe_and_join(socket, "realtime:test", %{"config" => config})

refute_receive _any
Expand Down
18 changes: 14 additions & 4 deletions test/realtime_web/controllers/broadcast_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@
} do
request_events_key = Tenants.requests_per_second_key(tenant)
broadcast_events_key = Tenants.events_per_second_key(tenant)
connect_events_key = Tenants.connect_per_second_rate(tenant).id
expect(TenantBroadcaster, :pubsub_broadcast, 5, fn _, _, _, _ -> :ok end)

messages_to_send =
Expand All @@ -290,7 +291,10 @@

GenCounter
|> expect(:add, fn ^request_events_key -> :ok end)
|> expect(:add, length(messages), fn ^broadcast_events_key -> :ok end)
|> expect(:add, length(messages), fn
^broadcast_events_key -> :ok
^connect_events_key -> :ok
end)

conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})

Expand Down Expand Up @@ -326,6 +330,7 @@
} do
request_events_key = Tenants.requests_per_second_key(tenant)
broadcast_events_key = Tenants.events_per_second_key(tenant)
connect_events_key = Tenants.connect_per_second_rate(tenant).id
expect(TenantBroadcaster, :pubsub_broadcast, 6, fn _, _, _, _ -> :ok end)

channels =
Expand Down Expand Up @@ -354,7 +359,10 @@

GenCounter
|> expect(:add, fn ^request_events_key -> :ok end)
|> expect(:add, length(messages), fn ^broadcast_events_key -> :ok end)
|> expect(:add, length(messages), fn
^broadcast_events_key -> :ok
^connect_events_key -> :ok
end)

conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})

Expand Down Expand Up @@ -401,13 +409,14 @@
end

@tag role: "authenticated"
test "user with permission to write a limited set is only able to broadcast to said set", %{

Check failure on line 412 in test/realtime_web/controllers/broadcast_controller_test.exs

View workflow job for this annotation

GitHub Actions / Tests

test authorization for broadcast user with permission to write a limited set is only able to broadcast to said set (RealtimeWeb.BroadcastControllerTest)
conn: conn,
db_conn: db_conn,
tenant: tenant
} do
request_events_key = Tenants.requests_per_second_key(tenant)
broadcast_events_key = Tenants.events_per_second_key(tenant)
connect_events_key = Tenants.connect_per_second_rate(tenant).id
expect(TenantBroadcaster, :pubsub_broadcast, 5, fn _, _, _, _ -> :ok end)

messages_to_send =
Expand All @@ -428,7 +437,9 @@

GenCounter
|> expect(:add, fn ^request_events_key -> :ok end)
|> expect(:add, length(messages_to_send), fn ^broadcast_events_key -> :ok end)
# remove the one message that won't be broadcasted for this user
|> expect(:add, 1, fn ^connect_events_key -> :ok end)
|> expect(:add, length(messages) - 1, fn ^broadcast_events_key -> :ok end)

conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})

Expand Down Expand Up @@ -482,7 +493,6 @@

GenCounter
|> expect(:add, fn ^request_events_key -> 1 end)
|> reject(:add, 1)

conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})

Expand Down
Loading