Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
841625e
fix: runtime setup error (#1520)
filipecabaco Sep 2, 2025
1b63b4f
fix: use primary instead of replica on rename_settings_field (#1521)
edgurgel Sep 3, 2025
da3404a
feat: upgrade cowboy & ranch (#1523)
edgurgel Sep 4, 2025
bd2c141
fix: Fix GenRpc to not try to connect to nodes that are not alive (#1…
edgurgel Sep 8, 2025
6cfe6e1
fix: enable presence on track message (#1527)
filipecabaco Sep 8, 2025
b13bb21
fix: set cowboy active_n=100 as cowboy 2.12.0 (#1530)
edgurgel Sep 10, 2025
a17ce3e
fix: provide error_code metadata on RealtimeChannel.Logging (#1531)
edgurgel Sep 12, 2025
eeba306
feat: disable UTF8 validation on websocket frames (#1532)
edgurgel Sep 14, 2025
70339c7
fix: move DB setup to happen after Connect.init (#1533)
edgurgel Sep 15, 2025
50891cd
fix: handle wal bloat (#1528)
filipecabaco Sep 15, 2025
5ccea17
feat: replay realtime.messages (#1526)
edgurgel Sep 16, 2025
c4ba2aa
feat: gen_rpc pub sub adapter (#1529)
edgurgel Sep 16, 2025
e8a343a
fix: ensure message id doesn't raise on non-map payloads (#1534)
edgurgel Sep 17, 2025
380b882
fix: match error on Connect (#1536)
filipecabaco Sep 18, 2025
4ba956f
feat: websocket max heap size configuration (#1538)
edgurgel Sep 22, 2025
1df809e
fix: update gen_rpc to fix gen_rpc_dispatcher issues (#1537)
edgurgel Sep 22, 2025
9a21897
fix: improve ErlSysMon logging for processes (#1540)
edgurgel Sep 22, 2025
54cd3f7
fix: make pubsub adapter configurable (#1539)
edgurgel Sep 22, 2025
e4ee7c8
fix: specify that only private channels are allowed when replaying (#…
edgurgel Sep 25, 2025
d4565df
fix: rate limit connect module (#1541)
filipecabaco Sep 26, 2025
59a5481
Merge branch 'main' into upstream-main
Fudster Oct 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ This is the list of operational codes that can help you understand your deployme
| ChannelRateLimitReached | The number of channels you can create has reached its limit |
| ConnectionRateLimitReached | The number of connected clients as reached its limit |
| ClientJoinRateLimitReached | The rate of joins per second from your clients has reached the channel limits |
| DatabaseConnectionRateLimitReached | The rate of attempts to connect to tenants database has reached the limit |
| MessagePerSecondRateLimitReached | The rate of messages per second from your clients has reached the channel limits |
| RealtimeDisabledForTenant | Realtime has been disabled for the tenant |
| UnableToConnectToTenantDatabase | Realtime was not able to connect to the tenant's database |
Expand Down
26 changes: 26 additions & 0 deletions lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,32 @@ defmodule Realtime.Tenants do
%RateCounter.Args{id: {:channel, :authorization_errors, external_id}, opts: opts}
end

@connect_per_second_default 10
@doc "RateCounter arguments for counting connect per second."
@spec connect_per_second_rate(Tenant.t() | String.t()) :: RateCounter.Args.t()
def connect_per_second_rate(%Tenant{external_id: external_id}) do
connect_per_second_rate(external_id)
end

def connect_per_second_rate(tenant_id) do
opts = [
max_bucket_len: 10,
limit: [
value: @connect_per_second_default,
measurement: :sum,
log_fn: fn ->
Logger.critical(
"DatabaseConnectionRateLimitReached: Too many connection attempts against the tenant database",
external_id: tenant_id,
project: tenant_id
)
end
]
]

%RateCounter.Args{id: {:database, :connect, tenant_id}, opts: opts}
end

defp pool_size(%{extensions: [%{settings: settings} | _]}) do
Database.pool_size_by_application_name("realtime_connect", settings)
end
Expand Down
32 changes: 18 additions & 14 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@ defmodule Realtime.Tenants.Connect do

use Realtime.Logs

alias Realtime.Tenants.Rebalancer
alias Realtime.Api.Tenant
alias Realtime.GenCounter
alias Realtime.RateCounter
alias Realtime.Rpc
alias Realtime.Tenants
alias Realtime.Tenants.Connect.CheckConnection
alias Realtime.Tenants.Connect.GetTenant
alias Realtime.Tenants.Connect.Piper
alias Realtime.Tenants.Connect.RegisterProcess
alias Realtime.Tenants.Migrations
alias Realtime.Tenants.Rebalancer
alias Realtime.Tenants.ReplicationConnection
alias Realtime.UsersCounter

Expand All @@ -39,11 +41,8 @@ defmodule Realtime.Tenants.Connect do
@doc "Check if Connect has finished setting up connections"
def ready?(tenant_id) do
case whereis(tenant_id) do
pid when is_pid(pid) ->
GenServer.call(pid, :ready?)

_ ->
false
pid when is_pid(pid) -> GenServer.call(pid, :ready?)
_ -> false
end
end

Expand All @@ -55,24 +54,29 @@ defmodule Realtime.Tenants.Connect do
| {:error, :tenant_database_unavailable}
| {:error, :initializing}
| {:error, :tenant_database_connection_initializing}
| {:error, :tenant_db_too_many_connections}
| {:error, :connect_rate_limit_reached}
| {:error, :rpc_error, term()}
def lookup_or_start_connection(tenant_id, opts \\ []) when is_binary(tenant_id) do
case get_status(tenant_id) do
{:ok, conn} ->
{:ok, conn}
rate_args = Tenants.connect_per_second_rate(tenant_id)
RateCounter.new(rate_args)

{:error, :tenant_database_unavailable} ->
{:error, :tenant_database_unavailable}
with {:ok, %{limit: %{triggered: false}}} <- RateCounter.get(rate_args),
{:ok, conn} <- get_status(tenant_id) do
{:ok, conn}
else
{:ok, %{limit: %{triggered: true}}} ->
{:error, :connect_rate_limit_reached}

{:error, :tenant_database_connection_initializing} ->
GenCounter.add(rate_args.id)
call_external_node(tenant_id, opts)

{:error, :initializing} ->
{:error, :tenant_database_unavailable}

{:error, :tenant_db_too_many_connections} ->
{:error, :tenant_db_too_many_connections}
{:error, reason} ->
GenCounter.add(rate_args.id)
{:error, reason}
end
end

Expand Down
9 changes: 8 additions & 1 deletion lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ defmodule RealtimeWeb.RealtimeChannel do
msg = "Database can't accept more connections, Realtime won't connect"
log_error(socket, "DatabaseLackOfConnections", msg)

{:error, :connect_rate_limit_reached} ->
msg = "Too many database connections attempts per second"
log_error(socket, "DatabaseConnectionRateLimitReached", msg)

{:error, :unable_to_set_policies, error} ->
log_error(socket, "UnableToSetPolicies", error)
{:error, %{reason: "Realtime was unable to connect to the project database"}}
Expand Down Expand Up @@ -213,6 +217,9 @@ defmodule RealtimeWeb.RealtimeChannel do
{:error, :invalid_replay_params} ->
log_error(socket, "UnableToReplayMessages", "Replay params are not valid")

{:error, :invalid_replay_channel} ->
log_error(socket, "UnableToReplayMessages", "Replay is not allowed for public channels")

{:error, error} ->
log_error(socket, "UnknownErrorOnChannel", error)
{:error, %{reason: "Unknown Error on Channel"}}
Expand Down Expand Up @@ -790,7 +797,7 @@ defmodule RealtimeWeb.RealtimeChannel do
end

defp maybe_replay_messages(%{"broadcast" => %{"replay" => _}}, _sub_topic, _db_conn, false = _private?) do
{:error, :invalid_replay_params}
{:error, :invalid_replay_channel}
end

defp maybe_replay_messages(%{"broadcast" => %{"replay" => replay_params}}, sub_topic, db_conn, true = _private?)
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.51.3",
version: "2.51.5",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
47 changes: 47 additions & 0 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,53 @@ defmodule Realtime.Tenants.ConnectTest do
assert capture_log(fn -> assert {:error, :rpc_error, _} = Connect.lookup_or_start_connection("tenant") end) =~
"project=tenant external_id=tenant [error] ErrorOnRpcCall"
end

test "rate limit connect when too many connections against bad database", %{tenant: tenant} do
extension = %{
"type" => "postgres_cdc_rls",
"settings" => %{
"db_host" => "127.0.0.1",
"db_name" => "postgres",
"db_user" => "supabase_admin",
"db_password" => "postgres",
"poll_interval" => 100,
"poll_max_changes" => 100,
"poll_max_record_bytes" => 1_048_576,
"region" => "us-east-1",
"ssl_enforced" => true
}
}

{:ok, tenant} = update_extension(tenant, extension)

log =
capture_log(fn ->
res =
for _ <- 1..50 do
Process.sleep(200)
Connect.lookup_or_start_connection(tenant.external_id)
end

assert Enum.any?(res, fn {_, res} -> res == :connect_rate_limit_reached end)
end)

assert log =~ "DatabaseConnectionRateLimitReached: Too many connection attempts against the tenant database"
end

test "rate limit connect will not trigger if connection is successful", %{tenant: tenant} do
log =
capture_log(fn ->
res =
for _ <- 1..20 do
Process.sleep(500)
Connect.lookup_or_start_connection(tenant.external_id)
end

refute Enum.any?(res, fn {_, res} -> res == :tenant_db_too_many_connections end)
end)

refute log =~ "DatabaseConnectionRateLimitReached: Too many connection attempts against the tenant database"
end
end

describe "shutdown/1" do
Expand Down
2 changes: 1 addition & 1 deletion test/realtime_web/channels/realtime_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ defmodule RealtimeWeb.RealtimeChannelTest do

assert {
:error,
%{reason: "UnableToReplayMessages: Replay params are not valid"}
%{reason: "UnableToReplayMessages: Replay is not allowed for public channels"}
} = subscribe_and_join(socket, "realtime:test", %{"config" => config})

refute_receive _any
Expand Down
18 changes: 14 additions & 4 deletions test/realtime_web/controllers/broadcast_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
} do
request_events_key = Tenants.requests_per_second_key(tenant)
broadcast_events_key = Tenants.events_per_second_key(tenant)
connect_events_key = Tenants.connect_per_second_rate(tenant).id
expect(TenantBroadcaster, :pubsub_broadcast, 5, fn _, _, _, _ -> :ok end)

messages_to_send =
Expand All @@ -290,7 +291,10 @@ defmodule RealtimeWeb.BroadcastControllerTest do

GenCounter
|> expect(:add, fn ^request_events_key -> :ok end)
|> expect(:add, length(messages), fn ^broadcast_events_key -> :ok end)
|> expect(:add, length(messages), fn
^broadcast_events_key -> :ok
^connect_events_key -> :ok
end)

conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})

Expand Down Expand Up @@ -326,6 +330,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
} do
request_events_key = Tenants.requests_per_second_key(tenant)
broadcast_events_key = Tenants.events_per_second_key(tenant)
connect_events_key = Tenants.connect_per_second_rate(tenant).id
expect(TenantBroadcaster, :pubsub_broadcast, 6, fn _, _, _, _ -> :ok end)

channels =
Expand Down Expand Up @@ -354,7 +359,10 @@ defmodule RealtimeWeb.BroadcastControllerTest do

GenCounter
|> expect(:add, fn ^request_events_key -> :ok end)
|> expect(:add, length(messages), fn ^broadcast_events_key -> :ok end)
|> expect(:add, length(messages), fn
^broadcast_events_key -> :ok
^connect_events_key -> :ok
end)

conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})

Expand Down Expand Up @@ -408,6 +416,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
} do
request_events_key = Tenants.requests_per_second_key(tenant)
broadcast_events_key = Tenants.events_per_second_key(tenant)
connect_events_key = Tenants.connect_per_second_rate(tenant).id
expect(TenantBroadcaster, :pubsub_broadcast, 5, fn _, _, _, _ -> :ok end)

messages_to_send =
Expand All @@ -428,7 +437,9 @@ defmodule RealtimeWeb.BroadcastControllerTest do

GenCounter
|> expect(:add, fn ^request_events_key -> :ok end)
|> expect(:add, length(messages_to_send), fn ^broadcast_events_key -> :ok end)
# remove the one message that won't be broadcasted for this user
|> expect(:add, 1, fn ^connect_events_key -> :ok end)
|> expect(:add, length(messages) - 1, fn ^broadcast_events_key -> :ok end)

conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})

Expand Down Expand Up @@ -482,7 +493,6 @@ defmodule RealtimeWeb.BroadcastControllerTest do

GenCounter
|> expect(:add, fn ^request_events_key -> 1 end)
|> reject(:add, 1)

conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})

Expand Down
Loading