Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
841625e
fix: runtime setup error (#1520)
filipecabaco Sep 2, 2025
1b63b4f
fix: use primary instead of replica on rename_settings_field (#1521)
edgurgel Sep 3, 2025
da3404a
feat: upgrade cowboy & ranch (#1523)
edgurgel Sep 4, 2025
bd2c141
fix: Fix GenRpc to not try to connect to nodes that are not alive (#1…
edgurgel Sep 8, 2025
6cfe6e1
fix: enable presence on track message (#1527)
filipecabaco Sep 8, 2025
b13bb21
fix: set cowboy active_n=100 as cowboy 2.12.0 (#1530)
edgurgel Sep 10, 2025
a17ce3e
fix: provide error_code metadata on RealtimeChannel.Logging (#1531)
edgurgel Sep 12, 2025
eeba306
feat: disable UTF8 validation on websocket frames (#1532)
edgurgel Sep 14, 2025
70339c7
fix: move DB setup to happen after Connect.init (#1533)
edgurgel Sep 15, 2025
50891cd
fix: handle wal bloat (#1528)
filipecabaco Sep 15, 2025
5ccea17
feat: replay realtime.messages (#1526)
edgurgel Sep 16, 2025
c4ba2aa
feat: gen_rpc pub sub adapter (#1529)
edgurgel Sep 16, 2025
e8a343a
fix: ensure message id doesn't raise on non-map payloads (#1534)
edgurgel Sep 17, 2025
380b882
fix: match error on Connect (#1536)
filipecabaco Sep 18, 2025
4ba956f
feat: websocket max heap size configuration (#1538)
edgurgel Sep 22, 2025
1df809e
fix: update gen_rpc to fix gen_rpc_dispatcher issues (#1537)
edgurgel Sep 22, 2025
9a21897
fix: improve ErlSysMon logging for processes (#1540)
edgurgel Sep 22, 2025
54cd3f7
fix: make pubsub adapter configurable (#1539)
edgurgel Sep 22, 2025
e4ee7c8
fix: specify that only private channels are allowed when replaying (#…
edgurgel Sep 25, 2025
d4565df
fix: rate limit connect module (#1541)
filipecabaco Sep 26, 2025
d309c55
build: automatically cancel old tests/build on new push (#1545)
kevcodez Sep 27, 2025
a72a835
fix: move message queue data to off-heap for gen_rpc pub sub workers …
edgurgel Sep 30, 2025
353c142
fix: rate limit Connect.lookup_or_start_connection on error only (#1549)
edgurgel Oct 1, 2025
748398c
fix: increase connect error rate window to 30 seconds (#1550)
edgurgel Oct 1, 2025
92e7b59
fix: set a lower fullsweep_after flag for GenRpcPubSub workers (#1551)
edgurgel Oct 1, 2025
6248e2b
fix: hardcode presence limit (#1552)
filipecabaco Oct 2, 2025
e84ac08
fix: further decrease limit on presence events (#1553)
filipecabaco Oct 2, 2025
13052aa
fix: bump up realtime (#1554)
filipecabaco Oct 2, 2025
6e650f0
fix: lower rate limit to 100 events per second (#1556)
filipecabaco Oct 2, 2025
05ac93e
fix: move connect rate limit to socket (#1555)
filipecabaco Oct 2, 2025
e9eaf9f
fix: collect global metrics without tenant tagging (#1557)
edgurgel Oct 2, 2025
16bd44d
feat: presence payload size (#1559)
edgurgel Oct 5, 2025
07de665
fix: use GenRpc for Realtime.Latency pings (#1560)
edgurgel Oct 6, 2025
ecac071
Fastlane for phoenix presence_diff (#1558)
edgurgel Oct 6, 2025
d4defc7
Merge branch 'main' into upstream-main
Fudster Oct 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ on:
branches:
- main

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
tests:
name: Tests
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ PORT ?= 4000
# Common commands

dev: ## Start a dev server
ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=$(PORT) MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev REGION=fra DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" GEN_RPC_TCP_SERVER_PORT=5369 GEN_RPC_TCP_CLIENT_PORT=5469 iex --name $(NODE_NAME)@127.0.0.1 --cookie cookie -S mix phx.server
ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=$(PORT) MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev REGION=us-east-1 DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" GEN_RPC_TCP_SERVER_PORT=5369 GEN_RPC_TCP_CLIENT_PORT=5469 iex --name $(NODE_NAME)@127.0.0.1 --cookie cookie -S mix phx.server

dev.orange: ## Start another dev server (orange) on port 4001
ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=4001 MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" GEN_RPC_TCP_SERVER_PORT=5469 GEN_RPC_TCP_CLIENT_PORT=5369 iex --name orange@127.0.0.1 --cookie cookie -S mix phx.server
ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=4001 MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev REGION=eu-west-1 DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" GEN_RPC_TCP_SERVER_PORT=5469 GEN_RPC_TCP_CLIENT_PORT=5369 iex --name orange@127.0.0.1 --cookie cookie -S mix phx.server

seed: ## Seed the database
DB_ENC_KEY="1234567890123456" FLY_ALLOC_ID=123e4567-e89b-12d3-a456-426614174000 mix run priv/repo/dev_seeds.exs
Expand Down
4 changes: 3 additions & 1 deletion config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ config :phoenix, :plug_init_mode, :runtime
# Disable caching to ensure the rendered spec is refreshed
config :open_api_spex, :cache_adapter, OpenApiSpex.Plug.NoneCache

config :opentelemetry, traces_exporter: {:otel_exporter_stdout, []}
# Disabled but can print to stdout with:
# config :opentelemetry, traces_exporter: {:otel_exporter_stdout, []}
config :opentelemetry, traces_exporter: :none

config :mix_test_watch, clear: true
2 changes: 1 addition & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ janitor_children_timeout = Env.get_integer("JANITOR_CHILDREN_TIMEOUT", :timer.se
janitor_schedule_timer = Env.get_integer("JANITOR_SCHEDULE_TIMER_IN_MS", :timer.hours(4))
platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws, else: :fly
broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10)
pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "pg2") |> String.to_atom()
pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "gen_rpc") |> String.to_atom()
websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize))

no_channel_timeout_in_ms =
Expand Down
2 changes: 1 addition & 1 deletion lib/extensions/postgres_cdc_rls/replication_poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
topic = "realtime:postgres:" <> tenant_id

RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher)
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes)
end

{:ok, rows_count}
Expand Down
6 changes: 5 additions & 1 deletion lib/realtime/gen_rpc/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ defmodule Realtime.GenRpcPubSub.Worker do
def start_link({pubsub, worker}), do: GenServer.start_link(__MODULE__, pubsub, name: worker)

@impl true
def init(pubsub), do: {:ok, pubsub}
def init(pubsub) do
Process.flag(:message_queue_data, :off_heap)
Process.flag(:fullsweep_after, 100)
{:ok, pubsub}
end

@impl true
def handle_info({:ftl, topic, message, dispatcher}, pubsub) do
Expand Down
8 changes: 4 additions & 4 deletions lib/realtime/monitoring/latency.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Realtime.Latency do
use Realtime.Logs

alias Realtime.Nodes
alias Realtime.Rpc
alias Realtime.GenRpc

defmodule Payload do
@moduledoc false
Expand All @@ -33,7 +33,7 @@ defmodule Realtime.Latency do
}
end

@every 5_000
@every 15_000
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
Expand Down Expand Up @@ -76,7 +76,7 @@ defmodule Realtime.Latency do
Task.Supervisor.async(Realtime.TaskSupervisor, fn ->
{latency, response} =
:timer.tc(fn ->
Rpc.call(n, __MODULE__, :pong, [pong_timeout], timeout: timer_timeout)
GenRpc.call(n, __MODULE__, :pong, [pong_timeout], timeout: timer_timeout)
end)

latency_ms = latency / 1_000
Expand All @@ -85,7 +85,7 @@ defmodule Realtime.Latency do
from_node = Nodes.short_node_id_from_name(Node.self())

case response do
{:badrpc, reason} ->
{:error, :rpc_error, reason} ->
log_error(
"RealtimeNodeDisconnected",
"Unable to connect to #{short_name} from #{region}: #{reason}"
Expand Down
25 changes: 22 additions & 3 deletions lib/realtime/monitoring/prom_ex/plugins/tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,21 @@ defmodule Realtime.PromEx.Plugins.Tenant do
event_name: [:realtime, :tenants, :payload, :size],
measurement: :size,
description: "Tenant payload size",
tags: [:tenant],
tags: [:tenant, :message_type],
unit: :byte,
reporter_options: [
buckets: [100, 250, 500, 1000, 2000, 3000, 5000, 10_000, 25_000]
buckets: [250, 500, 1000, 3000, 5000, 10_000, 25_000, 100_000, 500_000, 1_000_000, 3_000_000]
]
),
distribution(
[:realtime, :payload, :size],
event_name: [:realtime, :tenants, :payload, :size],
measurement: :size,
description: "Payload size",
tags: [:message_type],
unit: :byte,
reporter_options: [
buckets: [100, 250, 500, 1000, 2000, 3000, 5000, 10_000, 25_000]
buckets: [250, 500, 1000, 3000, 5000, 10_000, 25_000, 100_000, 500_000, 1_000_000, 3_000_000]
]
)
]
Expand Down Expand Up @@ -157,20 +158,38 @@ defmodule Realtime.PromEx.Plugins.Tenant do
description: "Sum of messages sent on a Realtime Channel.",
tags: [:tenant]
),
sum(
[:realtime, :channel, :global, :events],
event_name: [:realtime, :rate_counter, :channel, :events],
measurement: :sum,
description: "Global sum of messages sent on a Realtime Channel."
),
sum(
[:realtime, :channel, :presence_events],
event_name: [:realtime, :rate_counter, :channel, :presence_events],
measurement: :sum,
description: "Sum of presence messages sent on a Realtime Channel.",
tags: [:tenant]
),
sum(
[:realtime, :channel, :global, :presence_events],
event_name: [:realtime, :rate_counter, :channel, :presence_events],
measurement: :sum,
description: "Global sum of presence messages sent on a Realtime Channel."
),
sum(
[:realtime, :channel, :db_events],
event_name: [:realtime, :rate_counter, :channel, :db_events],
measurement: :sum,
description: "Sum of db messages sent on a Realtime Channel.",
tags: [:tenant]
),
sum(
[:realtime, :channel, :global, :db_events],
event_name: [:realtime, :rate_counter, :channel, :db_events],
measurement: :sum,
description: "Global sum of db messages sent on a Realtime Channel."
),
sum(
[:realtime, :channel, :joins],
event_name: [:realtime, :rate_counter, :channel, :joins],
Expand Down
9 changes: 9 additions & 0 deletions lib/realtime/monitoring/prom_ex/plugins/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ defmodule Realtime.PromEx.Plugins.Tenants do
unit: {:microsecond, :millisecond},
tags: [:success, :tenant, :mechanism],
reporter_options: [buckets: [10, 250, 5000, 15_000]]
),
distribution(
[:realtime, :global, :rpc],
event_name: [:realtime, :rpc],
description: "Global Latency of rpc calls",
measurement: :latency,
unit: {:microsecond, :millisecond},
tags: [:success, :mechanism],
reporter_options: [buckets: [10, 250, 5000, 15_000]]
)
])
end
Expand Down
14 changes: 7 additions & 7 deletions lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -328,18 +328,18 @@ defmodule Realtime.Tenants do
%RateCounter.Args{id: {:channel, :authorization_errors, external_id}, opts: opts}
end

@connect_per_second_default 10
@connect_errors_per_second_default 10
@doc "RateCounter arguments for counting connect per second."
@spec connect_per_second_rate(Tenant.t() | String.t()) :: RateCounter.Args.t()
def connect_per_second_rate(%Tenant{external_id: external_id}) do
connect_per_second_rate(external_id)
@spec connect_errors_per_second_rate(Tenant.t() | String.t()) :: RateCounter.Args.t()
def connect_errors_per_second_rate(%Tenant{external_id: external_id}) do
connect_errors_per_second_rate(external_id)
end

def connect_per_second_rate(tenant_id) do
def connect_errors_per_second_rate(tenant_id) do
opts = [
max_bucket_len: 10,
max_bucket_len: 30,
limit: [
value: @connect_per_second_default,
value: @connect_errors_per_second_default,
measurement: :sum,
log_fn: fn ->
Logger.critical(
Expand Down
9 changes: 8 additions & 1 deletion lib/realtime/tenants/batch_broadcast.ex
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,14 @@ defmodule Realtime.Tenants.BatchBroadcast do
broadcast = %Phoenix.Socket.Broadcast{topic: message.topic, event: @event_type, payload: payload}

GenCounter.add(events_per_second_rate.id)
TenantBroadcaster.pubsub_broadcast(tenant.external_id, tenant_topic, broadcast, RealtimeChannel.MessageDispatcher)

TenantBroadcaster.pubsub_broadcast(
tenant.external_id,
tenant_topic,
broadcast,
RealtimeChannel.MessageDispatcher,
:broadcast
)
end

defp permissions_for_message(_, nil, _), do: nil
Expand Down
12 changes: 9 additions & 3 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ defmodule Realtime.Tenants.Connect do
| {:error, :connect_rate_limit_reached}
| {:error, :rpc_error, term()}
def lookup_or_start_connection(tenant_id, opts \\ []) when is_binary(tenant_id) do
rate_args = Tenants.connect_per_second_rate(tenant_id)
rate_args = Tenants.connect_errors_per_second_rate(tenant_id)
RateCounter.new(rate_args)

with {:ok, %{limit: %{triggered: false}}} <- RateCounter.get(rate_args),
Expand All @@ -68,8 +68,14 @@ defmodule Realtime.Tenants.Connect do
{:error, :connect_rate_limit_reached}

{:error, :tenant_database_connection_initializing} ->
GenCounter.add(rate_args.id)
call_external_node(tenant_id, opts)
case call_external_node(tenant_id, opts) do
{:ok, pid} ->
{:ok, pid}

error ->
GenCounter.add(rate_args.id)
error
end

{:error, :initializing} ->
{:error, :tenant_database_unavailable}
Expand Down
1 change: 1 addition & 0 deletions lib/realtime_web/channels/presence.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ defmodule RealtimeWeb.Presence do
use Phoenix.Presence,
otp_app: :realtime,
pubsub_server: Realtime.PubSub,
dispatcher: RealtimeWeb.RealtimeChannel.MessageDispatcher,
pool_size: 10
end
17 changes: 0 additions & 17 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ defmodule RealtimeWeb.RealtimeChannel do
alias Realtime.Tenants.Authorization
alias Realtime.Tenants.Authorization.Policies
alias Realtime.Tenants.Authorization.Policies.BroadcastPolicies
alias Realtime.Tenants.Authorization.Policies.PresencePolicies
alias Realtime.Tenants.Connect

alias RealtimeWeb.Channels.Payloads.Join
Expand Down Expand Up @@ -259,27 +258,11 @@ defmodule RealtimeWeb.RealtimeChannel do
{:noreply, assign(socket, %{pg_sub_ref: pg_sub_ref})}
end

def handle_info(
%{event: "presence_diff"},
%{assigns: %{policies: %Policies{presence: %PresencePolicies{read: false}}}} = socket
) do
Logger.warning("Presence message ignored")
{:noreply, socket}
end

def handle_info(_msg, %{assigns: %{policies: %Policies{broadcast: %BroadcastPolicies{read: false}}}} = socket) do
Logger.warning("Broadcast message ignored")
{:noreply, socket}
end

def handle_info(%{event: "presence_diff", payload: payload} = msg, socket) do
%{presence_rate_counter: presence_rate_counter} = socket.assigns
GenCounter.add(presence_rate_counter.id)
maybe_log_info(socket, msg)
push(socket, "presence_diff", payload)
{:noreply, socket}
end

def handle_info(%{event: type, payload: payload} = msg, socket) do
count(socket)
maybe_log_info(socket, msg)
Expand Down
11 changes: 9 additions & 2 deletions lib/realtime_web/channels/realtime_channel/broadcast_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,21 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
broadcast = %Phoenix.Socket.Broadcast{topic: tenant_topic, event: @event_type, payload: payload}

if self_broadcast do
TenantBroadcaster.pubsub_broadcast(tenant_id, tenant_topic, broadcast, RealtimeChannel.MessageDispatcher)
TenantBroadcaster.pubsub_broadcast(
tenant_id,
tenant_topic,
broadcast,
RealtimeChannel.MessageDispatcher,
:broadcast
)
else
TenantBroadcaster.pubsub_broadcast_from(
tenant_id,
self(),
tenant_topic,
broadcast,
RealtimeChannel.MessageDispatcher
RealtimeChannel.MessageDispatcher,
:broadcast
)
end
end
Expand Down
Loading
Loading