diff --git a/README.md b/README.md index 2235bf388..4e13e44df 100644 --- a/README.md +++ b/README.md @@ -94,7 +94,7 @@ You can add your own by making a `POST` request to the server. You must change b "region": "us-west-1", "poll_interval_ms": 100, "poll_max_record_bytes": 1048576, - "ssl_enforced": false + "ssl_enforced": false } } ] @@ -169,6 +169,7 @@ If you're using the default tenant, the URL is `ws://realtime-dev.localhost:4000 | CONNECT_PARTITION_SLOTS | number | Number of dynamic supervisor partitions used by the Connect, ReplicationConnect processes | | METRICS_CLEANER_SCHEDULE_TIMER_IN_MS | number | Time in ms to run the Metric Cleaner task | | METRICS_RPC_TIMEOUT_IN_MS | number | Time in ms to wait for RPC call to fetch Metric per node | +| WEBSOCKET_MAX_HEAP_SIZE | number | Max number of bytes to be allocated as heap for the WebSocket transport process. If the limit is reached the process is brutally killed. Defaults to 50MB. | | REQUEST_ID_BAGGAGE_KEY | string | OTEL Baggage key to be used as request id | | OTEL_SDK_DISABLED | boolean | Disable OpenTelemetry tracing completely when 'true' | | OTEL_TRACES_EXPORTER | string | Possible values: `otlp` or `none`. See [https://github.com/open-telemetry/opentelemetry-erlang/tree/v1.4.0/apps#os-environment] for more details on how to configure the traces exporter. | @@ -190,6 +191,8 @@ If you're using the default tenant, the URL is `ws://realtime-dev.localhost:4000 | MAX_GEN_RPC_CLIENTS | number | Max amount of `gen_rpc` TCP connections per node-to-node channel | | REBALANCE_CHECK_INTERVAL_IN_MS | number | Time in ms to check if process is in the right region | | DISCONNECT_SOCKET_ON_NO_CHANNELS_INTERVAL_IN_MS | number | Time in ms to check if a socket has no channels open and if so, disconnect it | +| BROADCAST_POOL_SIZE | number | Number of processes to relay Phoenix.PubSub messages across the cluster | + The OpenTelemetry variables mentioned above are not an exhaustive list of all [supported environment variables](https://opentelemetry.io/docs/languages/sdk-configuration/). @@ -284,6 +287,7 @@ This is the list of operational codes that can help you understand your deployme | UnknownErrorOnController | An error we are not handling correctly was triggered on a controller | | UnknownErrorOnChannel | An error we are not handling correctly was triggered on a channel | | PresenceRateLimitReached | Limit of presence events reached | +| UnableToReplayMessages | An error while replaying messages | ## License diff --git a/config/runtime.exs b/config/runtime.exs index ac0a2569b..47961f98a 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -67,6 +67,9 @@ janitor_run_after_in_ms = Env.get_integer("JANITOR_RUN_AFTER_IN_MS", :timer.minu janitor_children_timeout = Env.get_integer("JANITOR_CHILDREN_TIMEOUT", :timer.seconds(5)) janitor_schedule_timer = Env.get_integer("JANITOR_SCHEDULE_TIMER_IN_MS", :timer.hours(4)) platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws, else: :fly +broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10) +pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "pg2") |> String.to_atom() +websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize)) no_channel_timeout_in_ms = if config_env() == :test, @@ -106,6 +109,7 @@ config :realtime, Realtime.Repo, ssl: ssl_opts config :realtime, + websocket_max_heap_size: websocket_max_heap_size, migration_partition_slots: migration_partition_slots, connect_partition_slots: connect_partition_slots, rebalance_check_interval_in_ms: rebalance_check_interval_in_ms, @@ -120,7 +124,9 @@ config :realtime, rpc_timeout: rpc_timeout, max_gen_rpc_clients: max_gen_rpc_clients, no_channel_timeout_in_ms: no_channel_timeout_in_ms, - platform: platform + platform: platform, + pubsub_adapter: pubsub_adapter, + broadcast_pool_size: broadcast_pool_size if config_env() != :test && run_janitor? do config :realtime, diff --git a/config/test.exs b/config/test.exs index 4c7c66ae8..a69c51701 100644 --- a/config/test.exs +++ b/config/test.exs @@ -47,7 +47,7 @@ config :logger, # Configures Elixir's Logger config :logger, :console, format: "$time $metadata[$level] $message\n", - metadata: [:request_id, :project, :external_id, :application_name, :sub, :iss, :exp] + metadata: [:error_code, :request_id, :project, :external_id, :application_name, :sub, :iss, :exp] config :opentelemetry, span_processor: :simple, diff --git a/lib/realtime/api.ex b/lib/realtime/api.ex index c504d0187..ac1f7d291 100644 --- a/lib/realtime/api.ex +++ b/lib/realtime/api.ex @@ -186,9 +186,10 @@ defmodule Realtime.Api do |> repo.preload(:extensions) end - defp list_extensions(type \\ "postgres_cdc_rls") do - from(e in Extensions, where: e.type == ^type, select: e) - |> Repo.all() + def list_extensions(type) do + query = from(e in Extensions, where: e.type == ^type, select: e) + + Repo.all(query) end def rename_settings_field(from, to) do diff --git a/lib/realtime/api/message.ex b/lib/realtime/api/message.ex index 90ebc5bc9..18bbc9a87 100644 --- a/lib/realtime/api/message.ex +++ b/lib/realtime/api/message.ex @@ -8,6 +8,8 @@ defmodule Realtime.Api.Message do @primary_key {:id, Ecto.UUID, autogenerate: true} @schema_prefix "realtime" + @type t :: %__MODULE__{} + schema "messages" do field(:topic, :string) field(:extension, Ecto.Enum, values: [:broadcast, :presence]) @@ -39,7 +41,7 @@ defmodule Realtime.Api.Message do end defp maybe_put_timestamp(changeset, field) do - case Map.get(changeset.data, field) do + case get_field(changeset, field) do nil -> put_timestamp(changeset, field) _ -> changeset end diff --git a/lib/realtime/application.ex b/lib/realtime/application.ex index 0f4c9ae50..99096edfb 100644 --- a/lib/realtime/application.ex +++ b/lib/realtime/application.ex @@ -52,6 +52,7 @@ defmodule Realtime.Application do region = Application.get_env(:realtime, :region) :syn.join(RegionNodes, region, self(), node: node()) + broadcast_pool_size = Application.get_env(:realtime, :broadcast_pool_size, 10) migration_partition_slots = Application.get_env(:realtime, :migration_partition_slots) connect_partition_slots = Application.get_env(:realtime, :connect_partition_slots) no_channel_timeout_in_ms = Application.get_env(:realtime, :no_channel_timeout_in_ms) @@ -65,7 +66,8 @@ defmodule Realtime.Application do Realtime.Repo, RealtimeWeb.Telemetry, {Cluster.Supervisor, [topologies, [name: Realtime.ClusterSupervisor]]}, - {Phoenix.PubSub, name: Realtime.PubSub, pool_size: 10}, + {Phoenix.PubSub, + name: Realtime.PubSub, pool_size: 10, adapter: pubsub_adapter(), broadcast_pool_size: broadcast_pool_size}, {Cachex, name: Realtime.RateCounter}, Realtime.Tenants.Cache, Realtime.RateCounter.DynamicSupervisor, @@ -152,4 +154,12 @@ defmodule Realtime.Application do OpentelemetryPhoenix.setup(adapter: :cowboy2) OpentelemetryEcto.setup([:realtime, :repo], db_statement: :enabled) end + + defp pubsub_adapter do + if Application.fetch_env!(:realtime, :pubsub_adapter) == :gen_rpc do + Realtime.GenRpcPubSub + else + Phoenix.PubSub.PG2 + end + end end diff --git a/lib/realtime/gen_rpc.ex b/lib/realtime/gen_rpc.ex index bb7099242..a7b46a869 100644 --- a/lib/realtime/gen_rpc.ex +++ b/lib/realtime/gen_rpc.ex @@ -10,6 +10,22 @@ defmodule Realtime.GenRpc do @type result :: any | {:error, :rpc_error, reason :: any} + @doc """ + Broadcasts the message `msg` asynchronously to the registered process `name` on the specified `nodes`. + + Options: + + - `:key` - Optional key to consistently select the same gen_rpc clients to guarantee message order between nodes + """ + @spec abcast([node], atom, any, keyword()) :: :ok + def abcast(nodes, name, msg, opts) when is_list(nodes) and is_atom(name) and is_list(opts) do + key = Keyword.get(opts, :key, nil) + nodes = rpc_nodes(nodes, key) + + :gen_rpc.abcast(nodes, name, msg) + :ok + end + @doc """ Fire and forget apply(mod, func, args) on all nodes @@ -41,6 +57,23 @@ defmodule Realtime.GenRpc do @spec call(node, module, atom, list(any), keyword()) :: result def call(node, mod, func, args, opts) when is_atom(node) and is_atom(mod) and is_atom(func) and is_list(args) and is_list(opts) do + if node == node() or node in Node.list() do + do_call(node, mod, func, args, opts) + else + tenant_id = Keyword.get(opts, :tenant_id) + + log_error( + "ErrorOnRpcCall", + %{target: node, mod: mod, func: func, error: :badnode}, + project: tenant_id, + external_id: tenant_id + ) + + {:error, :rpc_error, :badnode} + end + end + + defp do_call(node, mod, func, args, opts) do timeout = Keyword.get(opts, :timeout, default_rpc_timeout()) tenant_id = Keyword.get(opts, :tenant_id) key = Keyword.get(opts, :key, nil) diff --git a/lib/realtime/gen_rpc/pub_sub.ex b/lib/realtime/gen_rpc/pub_sub.ex new file mode 100644 index 000000000..b2a90b165 --- /dev/null +++ b/lib/realtime/gen_rpc/pub_sub.ex @@ -0,0 +1,78 @@ +defmodule Realtime.GenRpcPubSub do + @moduledoc """ + gen_rpc Phoenix.PubSub adapter + """ + + @behaviour Phoenix.PubSub.Adapter + alias Realtime.GenRpc + use Supervisor + + @impl true + def node_name(_), do: node() + + # Supervisor callbacks + + def start_link(opts) do + adapter_name = Keyword.fetch!(opts, :adapter_name) + name = Keyword.fetch!(opts, :name) + pool_size = Keyword.get(opts, :pool_size, 1) + broadcast_pool_size = Keyword.get(opts, :broadcast_pool_size, pool_size) + + Supervisor.start_link(__MODULE__, {adapter_name, name, broadcast_pool_size}, + name: :"#{name}#{adapter_name}_supervisor" + ) + end + + @impl true + def init({adapter_name, pubsub, pool_size}) do + workers = for number <- 1..pool_size, do: :"#{pubsub}#{adapter_name}_#{number}" + + :persistent_term.put(adapter_name, List.to_tuple(workers)) + + children = + for worker <- workers do + Supervisor.child_spec({Realtime.GenRpcPubSub.Worker, {pubsub, worker}}, id: worker) + end + + Supervisor.init(children, strategy: :one_for_one) + end + + defp worker_name(adapter_name, key) do + workers = :persistent_term.get(adapter_name) + elem(workers, :erlang.phash2(key, tuple_size(workers))) + end + + @impl true + def broadcast(adapter_name, topic, message, dispatcher) do + worker = worker_name(adapter_name, self()) + GenRpc.abcast(Node.list(), worker, forward_to_local(topic, message, dispatcher), key: worker) + end + + @impl true + def direct_broadcast(adapter_name, node_name, topic, message, dispatcher) do + worker = worker_name(adapter_name, self()) + GenRpc.abcast([node_name], worker, forward_to_local(topic, message, dispatcher), key: worker) + end + + defp forward_to_local(topic, message, dispatcher), do: {:ftl, topic, message, dispatcher} +end + +defmodule Realtime.GenRpcPubSub.Worker do + @moduledoc false + use GenServer + + @doc false + def start_link({pubsub, worker}), do: GenServer.start_link(__MODULE__, pubsub, name: worker) + + @impl true + def init(pubsub), do: {:ok, pubsub} + + @impl true + def handle_info({:ftl, topic, message, dispatcher}, pubsub) do + Phoenix.PubSub.local_broadcast(pubsub, topic, message, dispatcher) + {:noreply, pubsub} + end + + @impl true + def handle_info(_, pubsub), do: {:noreply, pubsub} +end diff --git a/lib/realtime/messages.ex b/lib/realtime/messages.ex index c6d571db7..804a48d66 100644 --- a/lib/realtime/messages.ex +++ b/lib/realtime/messages.ex @@ -3,6 +3,61 @@ defmodule Realtime.Messages do Handles `realtime.messages` table operations """ + alias Realtime.Api.Message + + import Ecto.Query, only: [from: 2] + + @hard_limit 25 + @default_timeout 5_000 + + @doc """ + Fetch last `limit ` messages for a given `topic` inserted after `since` + + Automatically uses RPC if the database connection is not in the same node + + Only allowed for private channels + """ + @spec replay(pid, String.t(), non_neg_integer, non_neg_integer) :: + {:ok, Message.t(), [String.t()]} | {:error, term} | {:error, :rpc_error, term} + def replay(conn, topic, since, limit) when node(conn) == node() and is_integer(since) and is_integer(limit) do + limit = max(min(limit, @hard_limit), 1) + + with {:ok, since} <- DateTime.from_unix(since, :millisecond), + {:ok, messages} <- messages(conn, topic, since, limit) do + {:ok, Enum.reverse(messages), MapSet.new(messages, & &1.id)} + else + {:error, :postgrex_exception} -> {:error, :failed_to_replay_messages} + {:error, :invalid_unix_time} -> {:error, :invalid_replay_params} + error -> error + end + end + + def replay(conn, topic, since, limit) when is_integer(since) and is_integer(limit) do + Realtime.GenRpc.call(node(conn), __MODULE__, :replay, [conn, topic, since, limit], key: topic) + end + + def replay(_, _, _, _), do: {:error, :invalid_replay_params} + + defp messages(conn, topic, since, limit) do + since = DateTime.to_naive(since) + # We want to avoid searching partitions in the future as they should be empty + # so we limit to 1 minute in the future to account for any potential drift + now = NaiveDateTime.utc_now() |> NaiveDateTime.add(1, :minute) + + query = + from m in Message, + where: + m.topic == ^topic and + m.private == true and + m.extension == :broadcast and + m.inserted_at >= ^since and + m.inserted_at < ^now, + limit: ^limit, + order_by: [desc: m.inserted_at] + + Realtime.Repo.all(conn, query, Message, timeout: @default_timeout) + end + @doc """ Deletes messages older than 72 hours for a given tenant connection """ diff --git a/lib/realtime/monitoring/erl_sys_mon.ex b/lib/realtime/monitoring/erl_sys_mon.ex index 32a4f857b..3278886d6 100644 --- a/lib/realtime/monitoring/erl_sys_mon.ex +++ b/lib/realtime/monitoring/erl_sys_mon.ex @@ -10,8 +10,8 @@ defmodule Realtime.ErlSysMon do @defaults [ :busy_dist_port, :busy_port, - {:long_gc, 250}, - {:long_schedule, 100}, + {:long_gc, 500}, + {:long_schedule, 500}, {:long_message_queue, {0, 1_000}} ] @@ -24,8 +24,36 @@ defmodule Realtime.ErlSysMon do {:ok, []} end + def handle_info({:monitor, pid, _type, _meta} = msg, state) when is_pid(pid) do + log_process_info(msg, pid) + {:noreply, state} + end + def handle_info(msg, state) do - Logger.error("#{__MODULE__} message: " <> inspect(msg)) + Logger.warning("#{__MODULE__} message: " <> inspect(msg)) {:noreply, state} end + + defp log_process_info(msg, pid) do + pid_info = + pid + |> Process.info(:dictionary) + |> case do + {:dictionary, dict} when is_list(dict) -> + {List.keyfind(dict, :"$initial_call", 0), List.keyfind(dict, :"$ancestors", 0)} + + other -> + other + end + + extra_info = Process.info(pid, [:registered_name, :message_queue_len, :total_heap_size]) + + Logger.warning( + "#{__MODULE__} message: " <> + inspect(msg) <> "|\n process info: #{inspect(pid_info)} #{inspect(extra_info)}" + ) + rescue + _ -> + Logger.warning("#{__MODULE__} message: " <> inspect(msg)) + end end diff --git a/lib/realtime/monitoring/prom_ex/plugins/phoenix.ex b/lib/realtime/monitoring/prom_ex/plugins/phoenix.ex index d3f64afbe..6cc3709d2 100644 --- a/lib/realtime/monitoring/prom_ex/plugins/phoenix.ex +++ b/lib/realtime/monitoring/prom_ex/plugins/phoenix.ex @@ -57,15 +57,10 @@ if Code.ensure_loaded?(Phoenix) do def execute_metrics do active_conn = - case :ets.lookup(:ranch_server, {:listener_sup, HTTP}) do - [] -> - -1 - - _ -> - HTTP - |> :ranch_server.get_connections_sup() - |> :supervisor.count_children() - |> Keyword.get(:active) + if :ranch.info()[HTTP] do + :ranch.info(HTTP)[:all_connections] + else + -1 end :telemetry.execute(@event_all_connections, %{active: active_conn}, %{}) diff --git a/lib/realtime/syn_handler.ex b/lib/realtime/syn_handler.ex index 397c8cf8f..d2fa5541c 100644 --- a/lib/realtime/syn_handler.ex +++ b/lib/realtime/syn_handler.ex @@ -10,9 +10,9 @@ defmodule Realtime.SynHandler do @behaviour :syn_event_handler @impl true - def on_registry_process_updated(Connect, tenant_id, _pid, %{conn: conn}, :normal) when is_pid(conn) do + def on_registry_process_updated(Connect, tenant_id, pid, %{conn: conn}, :normal) when is_pid(conn) do # Update that a database connection is ready - Endpoint.local_broadcast(Connect.syn_topic(tenant_id), "ready", %{conn: conn}) + Endpoint.local_broadcast(Connect.syn_topic(tenant_id), "ready", %{pid: pid, conn: conn}) end def on_registry_process_updated(PostgresCdcRls, tenant_id, _pid, meta, _reason) do @@ -38,7 +38,7 @@ defmodule Realtime.SynHandler do end topic = topic(mod) - Endpoint.local_broadcast(topic <> ":" <> name, topic <> "_down", nil) + Endpoint.local_broadcast(topic <> ":" <> name, topic <> "_down", %{pid: pid, reason: reason}) :ok end diff --git a/lib/realtime/tenants/batch_broadcast.ex b/lib/realtime/tenants/batch_broadcast.ex index 4fc31aa0f..98427621b 100644 --- a/lib/realtime/tenants/batch_broadcast.ex +++ b/lib/realtime/tenants/batch_broadcast.ex @@ -29,7 +29,9 @@ defmodule Realtime.Tenants.BatchBroadcast do @spec broadcast( auth_params :: map() | nil, tenant :: Tenant.t(), - messages :: %{messages: list(%{topic: String.t(), payload: map(), event: String.t(), private: boolean()})}, + messages :: %{ + messages: list(%{id: String.t(), topic: String.t(), payload: map(), event: String.t(), private: boolean()}) + }, super_user :: boolean() ) :: :ok | {:error, atom()} def broadcast(auth_params, tenant, messages, super_user \\ false) @@ -59,8 +61,8 @@ defmodule Realtime.Tenants.BatchBroadcast do # Handle events for public channel events |> Map.get(false, []) - |> Enum.each(fn %{topic: sub_topic, payload: payload, event: event} -> - send_message_and_count(tenant, events_per_second_rate, sub_topic, event, payload, true) + |> Enum.each(fn message -> + send_message_and_count(tenant, events_per_second_rate, message, true) end) # Handle events for private channel @@ -69,14 +71,14 @@ defmodule Realtime.Tenants.BatchBroadcast do |> Enum.group_by(fn event -> Map.get(event, :topic) end) |> Enum.each(fn {topic, events} -> if super_user do - Enum.each(events, fn %{topic: sub_topic, payload: payload, event: event} -> - send_message_and_count(tenant, events_per_second_rate, sub_topic, event, payload, false) + Enum.each(events, fn message -> + send_message_and_count(tenant, events_per_second_rate, message, false) end) else case permissions_for_message(tenant, auth_params, topic) do %Policies{broadcast: %BroadcastPolicies{write: true}} -> - Enum.each(events, fn %{topic: sub_topic, payload: payload, event: event} -> - send_message_and_count(tenant, events_per_second_rate, sub_topic, event, payload, false) + Enum.each(events, fn message -> + send_message_and_count(tenant, events_per_second_rate, message, false) end) _ -> @@ -91,15 +93,15 @@ defmodule Realtime.Tenants.BatchBroadcast do def broadcast(_, nil, _, _), do: {:error, :tenant_not_found} - def changeset(payload, attrs) do + defp changeset(payload, attrs) do payload |> cast(attrs, []) |> cast_embed(:messages, required: true, with: &message_changeset/2) end - def message_changeset(message, attrs) do + defp message_changeset(message, attrs) do message - |> cast(attrs, [:topic, :payload, :event, :private]) + |> cast(attrs, [:id, :topic, :payload, :event, :private]) |> maybe_put_private_change() |> validate_required([:topic, :payload, :event]) end @@ -112,11 +114,19 @@ defmodule Realtime.Tenants.BatchBroadcast do end @event_type "broadcast" - defp send_message_and_count(tenant, events_per_second_rate, topic, event, payload, public?) do - tenant_topic = Tenants.tenant_topic(tenant, topic, public?) - payload = %{"payload" => payload, "event" => event, "type" => "broadcast"} + defp send_message_and_count(tenant, events_per_second_rate, message, public?) do + tenant_topic = Tenants.tenant_topic(tenant, message.topic, public?) - broadcast = %Phoenix.Socket.Broadcast{topic: topic, event: @event_type, payload: payload} + payload = %{"payload" => message.payload, "event" => message.event, "type" => "broadcast"} + + payload = + if message[:id] do + Map.put(payload, "meta", %{"id" => message.id}) + else + payload + end + + broadcast = %Phoenix.Socket.Broadcast{topic: message.topic, event: @event_type, payload: payload} GenCounter.add(events_per_second_rate.id) TenantBroadcaster.pubsub_broadcast(tenant.external_id, tenant_topic, broadcast, RealtimeChannel.MessageDispatcher) diff --git a/lib/realtime/tenants/connect.ex b/lib/realtime/tenants/connect.ex index b9bf00eb4..3d8f39833 100644 --- a/lib/realtime/tenants/connect.ex +++ b/lib/realtime/tenants/connect.ex @@ -19,7 +19,6 @@ defmodule Realtime.Tenants.Connect do alias Realtime.Tenants.Connect.GetTenant alias Realtime.Tenants.Connect.Piper alias Realtime.Tenants.Connect.RegisterProcess - alias Realtime.Tenants.Connect.StartCounters alias Realtime.Tenants.Migrations alias Realtime.Tenants.ReplicationConnection alias Realtime.UsersCounter @@ -56,6 +55,7 @@ defmodule Realtime.Tenants.Connect do | {:error, :tenant_database_unavailable} | {:error, :initializing} | {:error, :tenant_database_connection_initializing} + | {:error, :tenant_db_too_many_connections} | {:error, :rpc_error, term()} def lookup_or_start_connection(tenant_id, opts \\ []) when is_binary(tenant_id) do case get_status(tenant_id) do @@ -63,13 +63,16 @@ defmodule Realtime.Tenants.Connect do {:ok, conn} {:error, :tenant_database_unavailable} -> - call_external_node(tenant_id, opts) + {:error, :tenant_database_unavailable} {:error, :tenant_database_connection_initializing} -> call_external_node(tenant_id, opts) {:error, :initializing} -> {:error, :tenant_database_unavailable} + + {:error, :tenant_db_too_many_connections} -> + {:error, :tenant_db_too_many_connections} end end @@ -81,16 +84,16 @@ defmodule Realtime.Tenants.Connect do | {:error, :tenant_database_unavailable} | {:error, :initializing} | {:error, :tenant_database_connection_initializing} + | {:error, :tenant_db_too_many_connections} def get_status(tenant_id) do case :syn.lookup(__MODULE__, tenant_id) do - {_pid, %{conn: nil}} -> - wait_for_connection(tenant_id) + {pid, %{conn: nil}} -> + wait_for_connection(pid, tenant_id) {_, %{conn: conn}} -> {:ok, conn} :undefined -> - Logger.warning("Connection process starting up") {:error, :tenant_database_connection_initializing} error -> @@ -101,7 +104,7 @@ defmodule Realtime.Tenants.Connect do def syn_topic(tenant_id), do: "connect:#{tenant_id}" - defp wait_for_connection(tenant_id) do + defp wait_for_connection(pid, tenant_id) do RealtimeWeb.Endpoint.subscribe(syn_topic(tenant_id)) # We do a lookup after subscribing because we could've missed a message while subscribing @@ -112,9 +115,18 @@ defmodule Realtime.Tenants.Connect do _ -> # Wait for up to 5 seconds for the ready event receive do - %{event: "ready", payload: %{conn: conn}} -> {:ok, conn} + %{event: "ready", payload: %{pid: ^pid, conn: conn}} -> + {:ok, conn} + + %{event: "connect_down", payload: %{pid: ^pid, reason: {:shutdown, :tenant_db_too_many_connections}}} -> + {:error, :tenant_db_too_many_connections} + + %{event: "connect_down", payload: %{pid: ^pid, reason: _reason}} -> + metadata = [external_id: tenant_id, project: tenant_id] + log_error("UnableToConnectToTenantDatabase", "Unable to connect to tenant database", metadata) + {:error, :tenant_database_unavailable} after - 5_000 -> {:error, :initializing} + 15_000 -> {:error, :initializing} end end after @@ -139,16 +151,6 @@ defmodule Realtime.Tenants.Connect do {:error, {:already_started, _}} -> get_status(tenant_id) - {:error, {:shutdown, :tenant_db_too_many_connections}} -> - {:error, :tenant_db_too_many_connections} - - {:error, {:shutdown, :tenant_not_found}} -> - {:error, :tenant_not_found} - - {:error, :shutdown} -> - log_error("UnableToConnectToTenantDatabase", "Unable to connect to tenant database", metadata) - {:error, :tenant_database_unavailable} - {:error, error} -> log_error("UnableToConnectToTenantDatabase", error, metadata) {:error, :tenant_database_unavailable} @@ -209,30 +211,33 @@ defmodule Realtime.Tenants.Connect do def init(%{tenant_id: tenant_id} = state) do Logger.metadata(external_id: tenant_id, project: tenant_id) + {:ok, state, {:continue, :db_connect}} + end + + @impl true + def handle_continue(:db_connect, state) do pipes = [ GetTenant, CheckConnection, - StartCounters, RegisterProcess ] case Piper.run(pipes, state) do {:ok, acc} -> - {:ok, acc, {:continue, :run_migrations}} + {:noreply, acc, {:continue, :run_migrations}} {:error, :tenant_not_found} -> - {:stop, {:shutdown, :tenant_not_found}} + {:stop, {:shutdown, :tenant_not_found}, state} {:error, :tenant_db_too_many_connections} -> - {:stop, {:shutdown, :tenant_db_too_many_connections}} + {:stop, {:shutdown, :tenant_db_too_many_connections}, state} {:error, error} -> log_error("UnableToConnectToTenantDatabase", error) - {:stop, :shutdown} + {:stop, :shutdown, state} end end - @impl true def handle_continue(:run_migrations, state) do %{tenant: tenant, db_conn_pid: db_conn_pid} = state Logger.warning("Tenant #{tenant.external_id} is initializing: #{inspect(node())}") @@ -252,31 +257,10 @@ defmodule Realtime.Tenants.Connect do end def handle_continue(:start_replication, state) do - %{tenant: tenant} = state - - with {:ok, replication_connection_pid} <- ReplicationConnection.start(tenant, self()) do - replication_connection_reference = Process.monitor(replication_connection_pid) - - state = %{ - state - | replication_connection_pid: replication_connection_pid, - replication_connection_reference: replication_connection_reference - } - - {:noreply, state, {:continue, :setup_connected_user_events}} - else - {:error, :max_wal_senders_reached} -> - log_error("ReplicationMaxWalSendersReached", "Tenant database has reached the maximum number of WAL senders") - {:stop, :shutdown, state} - - {:error, error} -> - log_error("StartReplicationFailed", error) - {:stop, :shutdown, state} + case start_replication_connection(state) do + {:ok, state} -> {:noreply, state, {:continue, :setup_connected_user_events}} + {:error, state} -> {:stop, :shutdown, state} end - rescue - error -> - log_error("StartReplicationFailed", error) - {:stop, :shutdown, state} end def handle_continue(:setup_connected_user_events, state) do @@ -348,13 +332,30 @@ defmodule Realtime.Tenants.Connect do {:stop, :shutdown, state} end + @replication_recovery_backoff 1000 + # Handle replication connection termination def handle_info( {:DOWN, replication_connection_reference, _, _, _}, %{replication_connection_reference: replication_connection_reference} = state ) do - Logger.warning("Replication connection has died") - {:stop, :shutdown, state} + log_warning("ReplicationConnectionDown", "Replication connection has been terminated") + Process.send_after(self(), :recover_replication_connection, @replication_recovery_backoff) + state = %{state | replication_connection_pid: nil, replication_connection_reference: nil} + {:noreply, state} + end + + @replication_connection_query "SELECT 1 from pg_stat_activity where application_name='realtime_replication_connection'" + def handle_info(:recover_replication_connection, state) do + with %{num_rows: 0} <- Postgrex.query!(state.db_conn_pid, @replication_connection_query, []), + {:ok, state} <- start_replication_connection(state) do + {:noreply, state} + else + _ -> + log_error("ReplicationConnectionRecoveryFailed", "Replication connection recovery failed") + Process.send_after(self(), :recover_replication_connection, @replication_recovery_backoff) + {:noreply, state} + end end def handle_info(_, state), do: {:noreply, state} @@ -375,6 +376,7 @@ defmodule Realtime.Tenants.Connect do ## Private functions defp call_external_node(tenant_id, opts) do + Logger.warning("Connection process starting up") rpc_timeout = Keyword.get(opts, :rpc_timeout, @rpc_timeout_default) with tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_id), @@ -413,4 +415,32 @@ defmodule Realtime.Tenants.Connect do defp tenant_suspended?(_), do: :ok defp rebalance_check_interval_in_ms(), do: Application.fetch_env!(:realtime, :rebalance_check_interval_in_ms) + + defp start_replication_connection(state) do + %{tenant: tenant} = state + + with {:ok, replication_connection_pid} <- ReplicationConnection.start(tenant, self()) do + replication_connection_reference = Process.monitor(replication_connection_pid) + + state = %{ + state + | replication_connection_pid: replication_connection_pid, + replication_connection_reference: replication_connection_reference + } + + {:ok, state} + else + {:error, :max_wal_senders_reached} -> + log_error("ReplicationMaxWalSendersReached", "Tenant database has reached the maximum number of WAL senders") + {:error, state} + + {:error, error} -> + log_error("StartReplicationFailed", error) + {:error, state} + end + rescue + error -> + log_error("StartReplicationFailed", error) + {:error, state} + end end diff --git a/lib/realtime/tenants/connect/check_connection.ex b/lib/realtime/tenants/connect/check_connection.ex index 697c08b6c..53cd8e480 100644 --- a/lib/realtime/tenants/connect/check_connection.ex +++ b/lib/realtime/tenants/connect/check_connection.ex @@ -2,16 +2,14 @@ defmodule Realtime.Tenants.Connect.CheckConnection do @moduledoc """ Check tenant database connection. """ - alias Realtime.Database @behaviour Realtime.Tenants.Connect.Piper @impl true def run(acc) do %{tenant: tenant} = acc - case Database.check_tenant_connection(tenant) do + case Realtime.Database.check_tenant_connection(tenant) do {:ok, conn} -> - Process.link(conn) db_conn_reference = Process.monitor(conn) {:ok, %{acc | db_conn_pid: conn, db_conn_reference: db_conn_reference}} diff --git a/lib/realtime/tenants/connect/start_counters.ex b/lib/realtime/tenants/connect/start_counters.ex deleted file mode 100644 index f8ce6c378..000000000 --- a/lib/realtime/tenants/connect/start_counters.ex +++ /dev/null @@ -1,60 +0,0 @@ -defmodule Realtime.Tenants.Connect.StartCounters do - @moduledoc """ - Start tenant counters. - """ - - alias Realtime.RateCounter - alias Realtime.Tenants - - @behaviour Realtime.Tenants.Connect.Piper - - @impl true - def run(acc) do - %{tenant: tenant} = acc - - with :ok <- start_joins_per_second_counter(tenant), - :ok <- start_max_events_counter(tenant), - :ok <- start_db_events_counter(tenant) do - {:ok, acc} - end - end - - def start_joins_per_second_counter(tenant) do - res = - tenant - |> Tenants.joins_per_second_rate() - |> RateCounter.new() - - case res do - {:ok, _} -> :ok - {:error, {:already_started, _}} -> :ok - {:error, reason} -> {:error, reason} - end - end - - def start_max_events_counter(tenant) do - res = - tenant - |> Tenants.events_per_second_rate() - |> RateCounter.new() - - case res do - {:ok, _} -> :ok - {:error, {:already_started, _}} -> :ok - {:error, reason} -> {:error, reason} - end - end - - def start_db_events_counter(tenant) do - res = - tenant - |> Tenants.db_events_per_second_rate() - |> RateCounter.new() - - case res do - {:ok, _} -> :ok - {:error, {:already_started, _}} -> :ok - {:error, reason} -> {:error, reason} - end - end -end diff --git a/lib/realtime/tenants/migrations.ex b/lib/realtime/tenants/migrations.ex index 04475c2b7..a5fa1eb8b 100644 --- a/lib/realtime/tenants/migrations.ex +++ b/lib/realtime/tenants/migrations.ex @@ -74,7 +74,8 @@ defmodule Realtime.Tenants.Migrations do RealtimeSendSetsTopicConfig, SubscriptionIndexBridgingDisabled, RunSubscriptionIndexBridgingDisabled, - BroadcastSendErrorLogging + BroadcastSendErrorLogging, + CreateMessagesReplayIndex } @migrations [ @@ -140,7 +141,8 @@ defmodule Realtime.Tenants.Migrations do {20_250_128_220_012, RealtimeSendSetsTopicConfig}, {20_250_506_224_012, SubscriptionIndexBridgingDisabled}, {20_250_523_164_012, RunSubscriptionIndexBridgingDisabled}, - {20_250_714_121_412, BroadcastSendErrorLogging} + {20_250_714_121_412, BroadcastSendErrorLogging}, + {20_250_905_041_441, CreateMessagesReplayIndex} ] defstruct [:tenant_external_id, :settings] diff --git a/lib/realtime/tenants/replication_connection.ex b/lib/realtime/tenants/replication_connection.ex index 45e03c66e..4ebb1f8e8 100644 --- a/lib/realtime/tenants/replication_connection.ex +++ b/lib/realtime/tenants/replication_connection.ex @@ -144,8 +144,8 @@ defmodule Realtime.Tenants.ReplicationConnection do port: connection_opts.port, socket_options: connection_opts.socket_options, ssl: connection_opts.ssl, - backoff_type: :stop, sync_connect: true, + auto_reconnect: false, parameters: [application_name: "realtime_replication_connection"] ] @@ -310,7 +310,13 @@ defmodule Realtime.Tenants.ReplicationConnection do {:ok, topic} <- get_or_error(to_broadcast, "topic", :topic_missing), {:ok, private} <- get_or_error(to_broadcast, "private", :private_missing), %Tenant{} = tenant <- Cache.get_tenant_by_external_id(tenant_id), - broadcast_message = %{topic: topic, event: event, private: private, payload: Map.put_new(payload, "id", id)}, + broadcast_message = %{ + id: id, + topic: topic, + event: event, + private: private, + payload: Map.put_new(payload, "id", id) + }, :ok <- BatchBroadcast.broadcast(nil, tenant, %{messages: [broadcast_message]}, true) do inserted_at = NaiveDateTime.from_iso8601!(inserted_at) latency_inserted_at = NaiveDateTime.utc_now() |> NaiveDateTime.diff(inserted_at) diff --git a/lib/realtime/tenants/repo/migrations/20250905041441_create_messages_replay_index.ex b/lib/realtime/tenants/repo/migrations/20250905041441_create_messages_replay_index.ex new file mode 100644 index 000000000..77afde6e0 --- /dev/null +++ b/lib/realtime/tenants/repo/migrations/20250905041441_create_messages_replay_index.ex @@ -0,0 +1,11 @@ +defmodule Realtime.Tenants.Migrations.CreateMessagesReplayIndex do + @moduledoc false + + use Ecto.Migration + + def change do + create_if_not_exists index(:messages, [{:desc, :inserted_at}, :topic], + where: "extension = 'broadcast' and private IS TRUE" + ) + end +end diff --git a/lib/realtime_web/channels/payloads/broadcast.ex b/lib/realtime_web/channels/payloads/broadcast.ex index 7feddb043..e2881fd54 100644 --- a/lib/realtime_web/channels/payloads/broadcast.ex +++ b/lib/realtime_web/channels/payloads/broadcast.ex @@ -9,9 +9,11 @@ defmodule RealtimeWeb.Channels.Payloads.Broadcast do embedded_schema do field :ack, :boolean, default: false field :self, :boolean, default: false + embeds_one :replay, RealtimeWeb.Channels.Payloads.Broadcast.Replay end def changeset(broadcast, attrs) do cast(broadcast, attrs, [:ack, :self], message: &Join.error_message/2) + |> cast_embed(:replay, invalid_message: "unable to parse, expected a map") end end diff --git a/lib/realtime_web/channels/payloads/broadcast/replay.ex b/lib/realtime_web/channels/payloads/broadcast/replay.ex new file mode 100644 index 000000000..b0a5804a2 --- /dev/null +++ b/lib/realtime_web/channels/payloads/broadcast/replay.ex @@ -0,0 +1,17 @@ +defmodule RealtimeWeb.Channels.Payloads.Broadcast.Replay do + @moduledoc """ + Validate broadcast replay field of the join payload. + """ + use Ecto.Schema + import Ecto.Changeset + alias RealtimeWeb.Channels.Payloads.Join + + embedded_schema do + field :limit, :integer, default: 10 + field :since, :integer, default: 0 + end + + def changeset(broadcast, attrs) do + cast(broadcast, attrs, [:limit, :since], message: &Join.error_message/2) + end +end diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index 26c033f5c..1d58d9da7 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -72,12 +72,21 @@ defmodule RealtimeWeb.RealtimeChannel do {:ok, claims, confirm_token_ref} <- confirm_token(socket), socket = assign_authorization_context(socket, sub_topic, claims), {:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id), - {:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, socket) do + {:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, socket), + {:ok, replayed_message_ids} <- + maybe_replay_messages(params["config"], sub_topic, db_conn, socket.assigns.private?) do tenant_topic = Tenants.tenant_topic(tenant_id, sub_topic, !socket.assigns.private?) # fastlane subscription metadata = - MessageDispatcher.fastlane_metadata(transport_pid, serializer, topic, socket.assigns.log_level, tenant_id) + MessageDispatcher.fastlane_metadata( + transport_pid, + serializer, + topic, + log_level, + tenant_id, + replayed_message_ids + ) RealtimeWeb.Endpoint.subscribe(tenant_topic, metadata: metadata) @@ -198,6 +207,12 @@ defmodule RealtimeWeb.RealtimeChannel do {:error, :shutdown_in_progress} -> log_error(socket, "RealtimeRestarting", "Realtime is restarting, please standby") + {:error, :failed_to_replay_messages} -> + log_error(socket, "UnableToReplayMessages", "Realtime was unable to replay messages") + + {:error, :invalid_replay_params} -> + log_error(socket, "UnableToReplayMessages", "Replay params are not valid") + {:error, error} -> log_error(socket, "UnknownErrorOnChannel", error) {:error, %{reason: "Unknown Error on Channel"}} @@ -205,6 +220,17 @@ defmodule RealtimeWeb.RealtimeChannel do end @impl true + def handle_info({:replay, messages}, socket) do + for message <- messages do + meta = %{"replayed" => true, "id" => message.id} + payload = %{"payload" => message.payload, "event" => message.event, "type" => "broadcast", "meta" => meta} + + push(socket, "broadcast", payload) + end + + {:noreply, socket} + end + def handle_info(:update_rate_counter, socket) do count(socket) @@ -376,7 +402,7 @@ defmodule RealtimeWeb.RealtimeChannel do end def handle_in("presence", payload, %{assigns: %{private?: false}} = socket) do - with {:ok, socket} <- PresenceHandler.handle(payload, socket) do + with {:ok, socket} <- PresenceHandler.handle(payload, nil, socket) do {:reply, :ok, socket} else {:error, :rate_limit_exceeded} -> @@ -762,4 +788,25 @@ defmodule RealtimeWeb.RealtimeChannel do do: {:error, :private_only}, else: :ok end + + defp maybe_replay_messages(%{"broadcast" => %{"replay" => _}}, _sub_topic, _db_conn, false = _private?) do + {:error, :invalid_replay_params} + end + + defp maybe_replay_messages(%{"broadcast" => %{"replay" => replay_params}}, sub_topic, db_conn, true = _private?) + when is_map(replay_params) do + with {:ok, messages, message_ids} <- + Realtime.Messages.replay( + db_conn, + sub_topic, + replay_params["since"], + replay_params["limit"] || 25 + ) do + # Send to self because we can't write to the socket before finishing the join process + send(self(), {:replay, messages}) + {:ok, message_ids} + end + end + + defp maybe_replay_messages(_, _, _, _), do: {:ok, MapSet.new()} end diff --git a/lib/realtime_web/channels/realtime_channel/logging.ex b/lib/realtime_web/channels/realtime_channel/logging.ex index 296dce1bc..2f6c91fdb 100644 --- a/lib/realtime_web/channels/realtime_channel/logging.ex +++ b/lib/realtime_web/channels/realtime_channel/logging.ex @@ -21,7 +21,7 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do def log_error(socket, code, msg) do msg = build_msg(code, msg) emit_system_error(:error, code) - log(socket, :error, msg) + log(socket, :error, code, msg) {:error, %{reason: msg}} end @@ -32,7 +32,7 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do {:error, %{reason: binary}} def log_warning(socket, code, msg) do msg = build_msg(code, msg) - log(socket, :warning, msg) + log(socket, :warning, code, msg) {:error, %{reason: msg}} end @@ -59,16 +59,16 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do if code, do: "#{code}: #{msg}", else: msg end - defp log(%{assigns: %{tenant: tenant, access_token: access_token}}, level, msg) do + defp log(%{assigns: %{tenant: tenant, access_token: access_token}}, level, code, msg) do Logger.metadata(external_id: tenant, project: tenant) if level in [:error, :warning], do: update_metadata_with_token_claims(access_token) - Logger.log(level, msg) + Logger.log(level, msg, error_code: code) end defp maybe_log(%{assigns: %{log_level: log_level}} = socket, level, code, msg) do msg = build_msg(code, msg) emit_system_error(level, code) - if Logger.compare_levels(log_level, level) != :gt, do: log(socket, level, msg) + if Logger.compare_levels(log_level, level) != :gt, do: log(socket, level, code, msg) if level in [:error, :warning], do: {:error, %{reason: msg}}, else: :ok end diff --git a/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex b/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex index b5db97f95..32e1528f3 100644 --- a/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex +++ b/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex @@ -5,12 +5,14 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do require Logger - def fastlane_metadata(fastlane_pid, serializer, topic, :info, tenant_id) do - {:realtime_channel_fastlane, fastlane_pid, serializer, topic, {:log, tenant_id}} + def fastlane_metadata(fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids \\ MapSet.new()) + + def fastlane_metadata(fastlane_pid, serializer, topic, :info, tenant_id, replayed_message_ids) do + {:rc_fastlane, fastlane_pid, serializer, topic, {:log, tenant_id}, replayed_message_ids} end - def fastlane_metadata(fastlane_pid, serializer, topic, _log_level, _tenant_id) do - {:realtime_channel_fastlane, fastlane_pid, serializer, topic} + def fastlane_metadata(fastlane_pid, serializer, topic, _log_level, _tenant_id, replayed_message_ids) do + {:rc_fastlane, fastlane_pid, serializer, topic, replayed_message_ids} end @doc """ @@ -23,22 +25,34 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do # This reduce caches the serialization and bypasses the channel process going straight to the # transport process + message_id = message_id(msg.payload) + # Credo doesn't like that we don't use the result aggregation _ = Enum.reduce(subscribers, %{}, fn {pid, _}, cache when pid == from -> cache - {pid, {:realtime_channel_fastlane, fastlane_pid, serializer, join_topic}}, cache -> - send(pid, :update_rate_counter) - do_dispatch(msg, fastlane_pid, serializer, join_topic, cache) + {pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, replayed_message_ids}}, cache -> + if already_replayed?(message_id, replayed_message_ids) do + # skip already replayed message + cache + else + send(pid, :update_rate_counter) + do_dispatch(msg, fastlane_pid, serializer, join_topic, cache) + end - {pid, {:realtime_channel_fastlane, fastlane_pid, serializer, join_topic, {:log, tenant_id}}}, cache -> - send(pid, :update_rate_counter) - log = "Received message on #{join_topic} with payload: #{inspect(msg, pretty: true)}" - Logger.info(log, external_id: tenant_id, project: tenant_id) + {pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, {:log, tenant_id}, replayed_message_ids}}, cache -> + if already_replayed?(message_id, replayed_message_ids) do + # skip already replayed message + cache + else + send(pid, :update_rate_counter) + log = "Received message on #{join_topic} with payload: #{inspect(msg, pretty: true)}" + Logger.info(log, external_id: tenant_id, project: tenant_id) - do_dispatch(msg, fastlane_pid, serializer, join_topic, cache) + do_dispatch(msg, fastlane_pid, serializer, join_topic, cache) + end {pid, _}, cache -> send(pid, msg) @@ -48,6 +62,12 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do :ok end + defp message_id(%{"meta" => %{"id" => id}}), do: id + defp message_id(_), do: nil + + defp already_replayed?(nil, _replayed_message_ids), do: false + defp already_replayed?(message_id, replayed_message_ids), do: MapSet.member?(replayed_message_ids, message_id) + defp do_dispatch(msg, fastlane_pid, serializer, join_topic, cache) do case cache do %{^serializer => encoded_msg} -> diff --git a/lib/realtime_web/channels/realtime_channel/presence_handler.ex b/lib/realtime_web/channels/realtime_channel/presence_handler.ex index 00ce77c02..9dc23d219 100644 --- a/lib/realtime_web/channels/realtime_channel/presence_handler.ex +++ b/lib/realtime_web/channels/realtime_channel/presence_handler.ex @@ -52,28 +52,22 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do end end - @spec handle(map(), Socket.t()) :: - {:ok, Socket.t()} | {:error, :rls_policy_error | :unable_to_set_policies | :rate_limit_exceeded} - def handle(_, %{assigns: %{presence_enabled?: false}} = socket), do: {:ok, socket} - def handle(payload, socket) when not is_private?(socket), do: handle(payload, nil, socket) - @spec handle(map(), pid() | nil, Socket.t()) :: {:ok, Socket.t()} | {:error, :rls_policy_error | :unable_to_set_policies | :rate_limit_exceeded | :unable_to_track_presence} - def handle(_, _, %{assigns: %{presence_enabled?: false}} = socket), do: {:ok, socket} - def handle(%{"event" => event} = payload, db_conn, socket) do event = String.downcase(event, :ascii) handle_presence_event(event, payload, db_conn, socket) end - def handle(_payload, _db_conn, socket), do: {:ok, socket} + def handle(_, _, socket), do: {:ok, socket} - defp handle_presence_event("track", payload, _db_conn, socket) when not is_private?(socket) do + defp handle_presence_event("track", payload, _, socket) when not is_private?(socket) do track(socket, payload) end - defp handle_presence_event("track", payload, db_conn, socket) when is_nil(socket.assigns.policies.presence.write) do + defp handle_presence_event("track", payload, db_conn, socket) + when is_private?(socket) and is_nil(socket.assigns.policies.presence.write) do %{assigns: %{authorization_context: authorization_context, policies: policies}} = socket case Authorization.get_write_authorizations(policies, db_conn, authorization_context) do @@ -111,6 +105,8 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do end defp track(socket, payload) do + socket = assign(socket, :presence_enabled?, true) + %{assigns: %{presence_key: presence_key, tenant_topic: tenant_topic}} = socket payload = Map.get(payload, "payload", %{}) diff --git a/lib/realtime_web/channels/user_socket.ex b/lib/realtime_web/channels/user_socket.ex index 09dd15906..849aa052d 100644 --- a/lib/realtime_web/channels/user_socket.ex +++ b/lib/realtime_web/channels/user_socket.ex @@ -1,4 +1,12 @@ defmodule RealtimeWeb.UserSocket do + # This is defined up here before `use Phoenix.Socket` is called so that we can define `Phoenix.Socket.init/1` + # It has to be overridden because we need to set the `max_heap_size` flag from the transport process context + @impl true + def init(state) when is_tuple(state) do + Process.flag(:max_heap_size, max_heap_size()) + Phoenix.Socket.__init__(state) + end + use Phoenix.Socket use Realtime.Logs @@ -122,4 +130,6 @@ defmodule RealtimeWeb.UserSocket do _ -> @default_log_level end end + + defp max_heap_size(), do: Application.fetch_env!(:realtime, :websocket_max_heap_size) end diff --git a/lib/realtime_web/endpoint.ex b/lib/realtime_web/endpoint.ex index 917ab65b9..190e1a917 100644 --- a/lib/realtime_web/endpoint.ex +++ b/lib/realtime_web/endpoint.ex @@ -16,6 +16,15 @@ defmodule RealtimeWeb.Endpoint do connect_info: [:peer_data, :uri, :x_headers], fullsweep_after: 20, max_frame_size: 8_000_000, + # https://github.com/ninenines/cowboy/blob/24d32de931a0c985ff7939077463fc8be939f0e9/doc/src/manual/cowboy_websocket.asciidoc#L228 + # active_n: The number of packets Cowboy will request from the socket at once. + # This can be used to tweak the performance of the server. Higher values reduce + # the number of times Cowboy need to request more packets from the port driver at + # the expense of potentially higher memory being used. + active_n: 100, + # Skip validating UTF8 for faster frame processing. + # Currently all text frames as handled only with JSON which already requires UTF-8 + validate_utf8: false, serializer: [ {Phoenix.Socket.V1.JSONSerializer, "~> 1.0.0"}, {Phoenix.Socket.V2.JSONSerializer, "~> 2.0.0"} diff --git a/lib/realtime_web/tenant_broadcaster.ex b/lib/realtime_web/tenant_broadcaster.ex index ee8646614..da02df79e 100644 --- a/lib/realtime_web/tenant_broadcaster.ex +++ b/lib/realtime_web/tenant_broadcaster.ex @@ -9,7 +9,11 @@ defmodule RealtimeWeb.TenantBroadcaster do def pubsub_broadcast(tenant_id, topic, message, dispatcher) do collect_payload_size(tenant_id, message) - Realtime.GenRpc.multicast(PubSub, :local_broadcast, [Realtime.PubSub, topic, message, dispatcher], key: topic) + if pubsub_adapter() == :gen_rpc do + PubSub.broadcast(Realtime.PubSub, topic, message, dispatcher) + else + Realtime.GenRpc.multicast(PubSub, :local_broadcast, [Realtime.PubSub, topic, message, dispatcher], key: topic) + end :ok end @@ -25,12 +29,16 @@ defmodule RealtimeWeb.TenantBroadcaster do def pubsub_broadcast_from(tenant_id, from, topic, message, dispatcher) do collect_payload_size(tenant_id, message) - Realtime.GenRpc.multicast( - PubSub, - :local_broadcast_from, - [Realtime.PubSub, from, topic, message, dispatcher], - key: topic - ) + if pubsub_adapter() == :gen_rpc do + PubSub.broadcast_from(Realtime.PubSub, from, topic, message, dispatcher) + else + Realtime.GenRpc.multicast( + PubSub, + :local_broadcast_from, + [Realtime.PubSub, from, topic, message, dispatcher], + key: topic + ) + end :ok end @@ -45,4 +53,8 @@ defmodule RealtimeWeb.TenantBroadcaster do defp collect_payload_size(tenant_id, payload) do :telemetry.execute(@payload_size_event, %{size: :erlang.external_size(payload)}, %{tenant: tenant_id}) end + + defp pubsub_adapter do + Application.fetch_env!(:realtime, :pubsub_adapter) + end end diff --git a/mix.exs b/mix.exs index c0d4e1516..1db3bf5b8 100644 --- a/mix.exs +++ b/mix.exs @@ -90,7 +90,7 @@ defmodule Realtime.MixProject do {:opentelemetry_phoenix, "~> 2.0"}, {:opentelemetry_cowboy, "~> 1.0"}, {:opentelemetry_ecto, "~> 1.2"}, - {:gen_rpc, git: "https://github.com/supabase/gen_rpc.git", ref: "d161cf263c661a534eaabf80aac7a34484dac772"}, + {:gen_rpc, git: "https://github.com/supabase/gen_rpc.git", ref: "901aada9adb307ff89a8be197a5d384e69dd57d6"}, {:mimic, "~> 1.0", only: :test}, {:floki, ">= 0.30.0", only: :test}, {:mint_web_socket, "~> 1.0", only: :test}, diff --git a/mix.lock b/mix.lock index 76eb0d980..c5fce6022 100644 --- a/mix.lock +++ b/mix.lock @@ -7,9 +7,9 @@ "castore": {:hex, :castore, "1.0.11", "4bbd584741601eb658007339ea730b082cc61f3554cf2e8f39bf693a11b49073", [:mix], [], "hexpm", "e03990b4db988df56262852f20de0f659871c35154691427a5047f4967a16a62"}, "chatterbox": {:hex, :ts_chatterbox, "0.15.1", "5cac4d15dd7ad61fc3c4415ce4826fc563d4643dee897a558ec4ea0b1c835c9c", [:rebar3], [{:hpack, "~> 0.3.0", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "4f75b91451338bc0da5f52f3480fa6ef6e3a2aeecfc33686d6b3d0a0948f31aa"}, "corsica": {:hex, :corsica, "2.1.3", "dccd094ffce38178acead9ae743180cdaffa388f35f0461ba1e8151d32e190e6", [:mix], [{:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "616c08f61a345780c2cf662ff226816f04d8868e12054e68963e95285b5be8bc"}, - "cowboy": {:hex, :cowboy, "2.12.0", "f276d521a1ff88b2b9b4c54d0e753da6c66dd7be6c9fca3d9418b561828a3731", [:make, :rebar3], [{:cowlib, "2.13.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "8a7abe6d183372ceb21caa2709bec928ab2b72e18a3911aa1771639bef82651e"}, + "cowboy": {:hex, :cowboy, "2.13.0", "09d770dd5f6a22cc60c071f432cd7cb87776164527f205c5a6b0f24ff6b38990", [:make, :rebar3], [{:cowlib, ">= 2.14.0 and < 3.0.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, ">= 1.8.0 and < 3.0.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "e724d3a70995025d654c1992c7b11dbfea95205c047d86ff9bf1cda92ddc5614"}, "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"}, - "cowlib": {:hex, :cowlib, "2.13.0", "db8f7505d8332d98ef50a3ef34b34c1afddec7506e4ee4dd4a3a266285d282ca", [:make, :rebar3], [], "hexpm", "e1e1284dc3fc030a64b1ad0d8382ae7e99da46c3246b815318a4b848873800a4"}, + "cowlib": {:hex, :cowlib, "2.15.0", "3c97a318a933962d1c12b96ab7c1d728267d2c523c25a5b57b0f93392b6e9e25", [:make, :rebar3], [], "hexpm", "4f00c879a64b4fe7c8fcb42a4281925e9ffdb928820b03c3ad325a617e857532"}, "credo": {:hex, :credo, "1.7.11", "d3e805f7ddf6c9c854fd36f089649d7cf6ba74c42bc3795d587814e3c9847102", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "56826b4306843253a66e47ae45e98e7d284ee1f95d53d1612bb483f88a8cf219"}, "ctx": {:hex, :ctx, "0.6.0", "8ff88b70e6400c4df90142e7f130625b82086077a45364a78d208ed3ed53c7fe", [:rebar3], [], "hexpm", "a14ed2d1b67723dbebbe423b28d7615eb0bdcba6ff28f2d1f1b0a7e1d4aa5fc2"}, "db_connection": {:hex, :db_connection, "2.8.0", "64fd82cfa6d8e25ec6660cea73e92a4cbc6a18b31343910427b702838c4b33b2", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "008399dae5eee1bf5caa6e86d204dcb44242c82b1ed5e22c881f2c34da201b15"}, @@ -29,7 +29,7 @@ "file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"}, "finch": {:hex, :finch, "0.19.0", "c644641491ea854fc5c1bbaef36bfc764e3f08e7185e1f084e35e0672241b76d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "fc5324ce209125d1e2fa0fcd2634601c52a787aff1cd33ee833664a5af4ea2b6"}, "floki": {:hex, :floki, "0.37.0", "b83e0280bbc6372f2a403b2848013650b16640cd2470aea6701f0632223d719e", [:mix], [], "hexpm", "516a0c15a69f78c47dc8e0b9b3724b29608aa6619379f91b1ffa47109b5d0dd3"}, - "gen_rpc": {:git, "https://github.com/supabase/gen_rpc.git", "d161cf263c661a534eaabf80aac7a34484dac772", [ref: "d161cf263c661a534eaabf80aac7a34484dac772"]}, + "gen_rpc": {:git, "https://github.com/supabase/gen_rpc.git", "901aada9adb307ff89a8be197a5d384e69dd57d6", [ref: "901aada9adb307ff89a8be197a5d384e69dd57d6"]}, "gettext": {:hex, :gettext, "0.26.2", "5978aa7b21fada6deabf1f6341ddba50bc69c999e812211903b169799208f2a8", [:mix], [{:expo, "~> 0.5.1 or ~> 1.0", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "aa978504bcf76511efdc22d580ba08e2279caab1066b76bb9aa81c4a1e0a32a5"}, "gproc": {:hex, :gproc, "0.9.1", "f1df0364423539cf0b80e8201c8b1839e229e5f9b3ccb944c5834626998f5b8c", [:rebar3], [], "hexpm", "905088e32e72127ed9466f0bac0d8e65704ca5e73ee5a62cb073c3117916d507"}, "grpcbox": {:hex, :grpcbox, "0.17.1", "6e040ab3ef16fe699ffb513b0ef8e2e896da7b18931a1ef817143037c454bcce", [:rebar3], [{:acceptor_pool, "~> 1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~> 0.15.1", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~> 0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~> 0.9.1", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "4a3b5d7111daabc569dc9cbd9b202a3237d81c80bf97212fbc676832cb0ceb17"}, @@ -82,7 +82,7 @@ "postgres_replication": {:git, "https://github.com/filipecabaco/postgres_replication.git", "69129221f0263aa13faa5fbb8af97c28aeb4f71c", []}, "postgrex": {:hex, :postgrex, "0.20.0", "363ed03ab4757f6bc47942eff7720640795eb557e1935951c1626f0d303a3aed", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "d36ef8b36f323d29505314f704e21a1a038e2dc387c6409ee0cd24144e187c0f"}, "prom_ex": {:hex, :prom_ex, "1.9.0", "63e6dda6c05cdeec1f26c48443dcc38ffd2118b3665ae8d2bd0e5b79f2aea03e", [:mix], [{:absinthe, ">= 1.6.0", [hex: :absinthe, repo: "hexpm", optional: true]}, {:broadway, ">= 1.0.2", [hex: :broadway, repo: "hexpm", optional: true]}, {:ecto, ">= 3.5.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:finch, "~> 0.15", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}, {:oban, ">= 2.4.0", [hex: :oban, repo: "hexpm", optional: true]}, {:octo_fetch, "~> 0.3", [hex: :octo_fetch, repo: "hexpm", optional: false]}, {:phoenix, ">= 1.5.0", [hex: :phoenix, repo: "hexpm", optional: true]}, {:phoenix_live_view, ">= 0.14.0", [hex: :phoenix_live_view, repo: "hexpm", optional: true]}, {:plug, ">= 1.12.1", [hex: :plug, repo: "hexpm", optional: true]}, {:plug_cowboy, "~> 2.5", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:telemetry, ">= 1.0.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}, {:telemetry_metrics_prometheus_core, "~> 1.0", [hex: :telemetry_metrics_prometheus_core, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 1.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "01f3d4f69ec93068219e686cc65e58a29c42bea5429a8ff4e2121f19db178ee6"}, - "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, + "ranch": {:hex, :ranch, "2.2.0", "25528f82bc8d7c6152c57666ca99ec716510fe0925cb188172f41ce93117b1b0", [:make, :rebar3], [], "hexpm", "fa0b99a1780c80218a4197a59ea8d3bdae32fbff7e88527d7d8a4787eff4f8e7"}, "recon": {:hex, :recon, "2.5.6", "9052588e83bfedfd9b72e1034532aee2a5369d9d9343b61aeb7fbce761010741", [:mix, :rebar3], [], "hexpm", "96c6799792d735cc0f0fd0f86267e9d351e63339cbe03df9d162010cefc26bb0"}, "req": {:hex, :req, "0.5.10", "a3a063eab8b7510785a467f03d30a8d95f66f5c3d9495be3474b61459c54376c", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "8a604815743f8a2d3b5de0659fa3137fa4b1cffd636ecb69b30b2b9b2c2559be"}, "sleeplocks": {:hex, :sleeplocks, "1.1.3", "96a86460cc33b435c7310dbd27ec82ca2c1f24ae38e34f8edde97f756503441a", [:rebar3], [], "hexpm", "d3b3958552e6eb16f463921e70ae7c767519ef8f5be46d7696cc1ed649421321"}, diff --git a/rel/vm.args.eex b/rel/vm.args.eex index 278da5524..9de4e952f 100644 --- a/rel/vm.args.eex +++ b/rel/vm.args.eex @@ -10,8 +10,8 @@ ## Tweak GC to run more often ##-env ERL_FULLSWEEP_AFTER 10 -## Limit process heap for all procs to 1000 MB -+hmax 1000000000 +## Limit process heap for all procs to 500 MB. The number here is the number of words ++hmax <%= div(500_000_000, :erlang.system_info(:wordsize)) %> ## Set distribution buffer busy limit (default is 1024) +zdbbl 100000 @@ -19,4 +19,4 @@ ## Disable Busy Wait +sbwt none +sbwtdio none -+sbwtdcpu none \ No newline at end of file ++sbwtdcpu none diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs index 806a5ad7e..23b1a3a7f 100644 --- a/test/integration/rt_channel_test.exs +++ b/test/integration/rt_channel_test.exs @@ -25,6 +25,7 @@ defmodule Realtime.Integration.RtChannelTest do alias Realtime.Tenants alias Realtime.Tenants.Authorization alias Realtime.Tenants.Connect + alias Realtime.Tenants.ReplicationConnection alias RealtimeWeb.RealtimeChannel.Tracker alias RealtimeWeb.SocketDisconnect @@ -653,8 +654,8 @@ defmodule Realtime.Integration.RtChannelTest do :syn.update_registry(Connect, tenant.external_id, fn _pid, meta -> %{meta | conn: nil} end) payload = %{"event" => "TEST", "payload" => %{"msg" => 1}, "type" => "broadcast"} WebsocketClient.send_event(service_role_socket, topic, "broadcast", payload) - # Waiting more than 5 seconds as this is the amount of time we will wait for the Connection to be ready - refute_receive %Message{event: "broadcast", payload: ^payload, topic: ^topic}, 6000 + # Waiting more than 15 seconds as this is the amount of time we will wait for the Connection to be ready + refute_receive %Message{event: "broadcast", payload: ^payload, topic: ^topic}, 16000 end) assert log =~ "UnableToHandleBroadcast" @@ -831,7 +832,7 @@ defmodule Realtime.Integration.RtChannelTest do refute_receive %Message{event: "presence_diff"}, 500 # Waiting more than 5 seconds as this is the amount of time we will wait for the Connection to be ready - refute_receive %Message{event: "phx_leave", topic: ^topic}, 6000 + refute_receive %Message{event: "phx_leave", topic: ^topic}, 16000 end) assert log =~ "UnableToHandlePresence" @@ -909,6 +910,56 @@ defmodule Realtime.Integration.RtChannelTest do assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}}, 500 refute_receive %Message{event: "presence_state"}, 500 end + + test "presence automatically enabled when user sends track message for public channel", %{tenant: tenant} do + {socket, _} = get_connection(tenant) + config = %{presence: %{key: "", enabled: false}, private: false} + topic = "realtime:any" + + WebsocketClient.join(socket, topic, %{config: config}) + + assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300 + refute_receive %Message{event: "presence_state"}, 500 + + payload = %{ + type: "presence", + event: "TRACK", + payload: %{name: "realtime_presence_96", t: 1814.7000000029802} + } + + WebsocketClient.send_event(socket, topic, "presence", payload) + + assert_receive %Message{event: "presence_diff", payload: %{"joins" => joins, "leaves" => %{}}, topic: ^topic} + + join_payload = joins |> Map.values() |> hd() |> get_in(["metas"]) |> hd() + assert get_in(join_payload, ["name"]) == payload.payload.name + assert get_in(join_payload, ["t"]) == payload.payload.t + end + + @tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence] + test "presence automatically enabled when user sends track message for private channel", + %{tenant: tenant, topic: topic} do + {socket, _} = get_connection(tenant, "authenticated") + config = %{presence: %{key: "", enabled: false}, private: true} + topic = "realtime:#{topic}" + + WebsocketClient.join(socket, topic, %{config: config}) + assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300 + refute_receive %Message{event: "presence_state"}, 500 + + payload = %{ + type: "presence", + event: "TRACK", + payload: %{name: "realtime_presence_96", t: 1814.7000000029802} + } + + WebsocketClient.send_event(socket, topic, "presence", payload) + + assert_receive %Message{event: "presence_diff", payload: %{"joins" => joins, "leaves" => %{}}, topic: ^topic}, 500 + join_payload = joins |> Map.values() |> hd() |> get_in(["metas"]) |> hd() + assert get_in(join_payload, ["name"]) == payload.payload.name + assert get_in(join_payload, ["t"]) == payload.payload.t + end end describe "token handling" do @@ -2304,6 +2355,135 @@ defmodule Realtime.Integration.RtChannelTest do assert count == 2 end + describe "WAL bloat handling" do + setup %{tenant: tenant} do + topic = random_string() + {:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop) + + %{rows: [[max_wal_size]]} = Postgrex.query!(db_conn, "SHOW max_wal_size", []) + %{rows: [[wal_keep_size]]} = Postgrex.query!(db_conn, "SHOW wal_keep_size", []) + %{rows: [[max_slot_wal_keep_size]]} = Postgrex.query!(db_conn, "SHOW max_slot_wal_keep_size", []) + + assert max_wal_size == "32MB" + assert wal_keep_size == "32MB" + assert max_slot_wal_keep_size == "32MB" + + Postgrex.query!(db_conn, "CREATE TABLE IF NOT EXISTS wal_test (id INT, data TEXT)", []) + + Postgrex.query!( + db_conn, + """ + CREATE OR REPLACE FUNCTION wal_test_trigger_func() RETURNS TRIGGER AS $$ + BEGIN + PERFORM realtime.send(json_build_object ('value', 'test' :: text)::jsonb, 'test', '#{topic}', false); + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + """, + [] + ) + + Postgrex.query!(db_conn, "DROP TRIGGER IF EXISTS wal_test_trigger ON wal_test", []) + + Postgrex.query!( + db_conn, + """ + CREATE TRIGGER wal_test_trigger + AFTER INSERT OR UPDATE OR DELETE ON wal_test + FOR EACH ROW + EXECUTE FUNCTION wal_test_trigger_func() + """, + [] + ) + + GenServer.stop(db_conn) + + on_exit(fn -> + {:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop) + + Postgrex.query!(db_conn, "DROP TABLE IF EXISTS wal_test CASCADE", []) + end) + + %{topic: topic} + end + + test "track PID changes during WAL bloat creation", %{tenant: tenant, topic: topic} do + {socket, _} = get_connection(tenant, "authenticated") + config = %{broadcast: %{self: true}, private: false} + full_topic = "realtime:#{topic}" + + active_slot_query = + "SELECT active_pid FROM pg_replication_slots where active_pid is not null and slot_name = 'supabase_realtime_messages_replication_slot_'" + + WebsocketClient.join(socket, full_topic, %{config: config}) + + assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}}, 500 + assert_receive %Message{event: "presence_state"}, 500 + + assert Connect.ready?(tenant.external_id) + + {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id) + + original_connect_pid = Connect.whereis(tenant.external_id) + original_replication_pid = ReplicationConnection.whereis(tenant.external_id) + %{rows: [[original_db_pid]]} = Postgrex.query!(db_conn, active_slot_query, []) + + tasks = + for _ <- 1..5 do + Task.async(fn -> + {:ok, bloat_conn} = Database.connect(tenant, "realtime_bloat", :stop) + + Postgrex.transaction(bloat_conn, fn conn -> + Postgrex.query(conn, "INSERT INTO wal_test SELECT generate_series(1, 100000), repeat('x', 2000)", []) + {:error, "test"} + end) + + Process.exit(bloat_conn, :normal) + end) + end + + Task.await_many(tasks, 20000) + + # Kill all pending transactions still running + Postgrex.query!( + db_conn, + "SELECT pg_terminate_backend(pid) from pg_stat_activity where application_name='realtime_bloat'", + [] + ) + + # Does it recover? + assert Connect.ready?(tenant.external_id) + {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id) + Process.sleep(1000) + %{rows: [[new_db_pid]]} = Postgrex.query!(db_conn, active_slot_query, []) + + assert new_db_pid != original_db_pid + assert ^original_connect_pid = Connect.whereis(tenant.external_id) + assert original_replication_pid != ReplicationConnection.whereis(tenant.external_id) + + # Check if socket is still connected + payload = %{"event" => "TEST", "payload" => %{"msg" => 1}, "type" => "broadcast"} + WebsocketClient.send_event(socket, full_topic, "broadcast", payload) + assert_receive %Message{event: "broadcast", payload: ^payload, topic: ^full_topic}, 500 + + # Check if we are receiving the message from replication connection + Postgrex.query!(db_conn, "INSERT INTO wal_test VALUES (1, 'test')", []) + + assert_receive %Phoenix.Socket.Message{ + event: "broadcast", + payload: %{ + "event" => "test", + "payload" => %{"value" => "test"}, + "type" => "broadcast" + }, + join_ref: nil, + ref: nil, + topic: ^full_topic + }, + 5000 + end + end + defp mode(%{mode: :distributed}) do tenant = Api.get_tenant_by_external_id("dev_tenant") diff --git a/test/realtime/gen_rpc_pub_sub_test.exs b/test/realtime/gen_rpc_pub_sub_test.exs new file mode 100644 index 000000000..0013c2e7b --- /dev/null +++ b/test/realtime/gen_rpc_pub_sub_test.exs @@ -0,0 +1,2 @@ +Application.put_env(:phoenix_pubsub, :test_adapter, {Realtime.GenRpcPubSub, []}) +Code.require_file("../../deps/phoenix_pubsub/test/shared/pubsub_test.exs", __DIR__) diff --git a/test/realtime/gen_rpc_test.exs b/test/realtime/gen_rpc_test.exs index dd837aaf8..0c41d3ea1 100644 --- a/test/realtime/gen_rpc_test.exs +++ b/test/realtime/gen_rpc_test.exs @@ -172,6 +172,51 @@ defmodule Realtime.GenRpcTest do mechanism: :gen_rpc }} end + + test "bad node" do + node = :"unknown@1.1.1.1" + + log = + capture_log(fn -> + assert GenRpc.call(node, Map, :fetch, [%{a: 1}, :a], tenant_id: 123) == {:error, :rpc_error, :badnode} + end) + + assert log =~ + ~r/project=123 external_id=123 \[error\] ErrorOnRpcCall: %{+error: :badnode, mod: Map, func: :fetch, target: :"#{node}"/ + end + end + + describe "abcast/4" do + test "abcast to registered process", %{node: node} do + name = + System.unique_integer() + |> to_string() + |> String.to_atom() + + :erlang.register(name, self()) + + # Use erpc to make the other node abcast to this one + :erpc.call(node, GenRpc, :abcast, [[node()], name, "a message", []]) + + assert_receive "a message" + refute_receive _any + end + + @tag extra_config: [{:gen_rpc, :tcp_server_port, 9999}] + test "tcp error" do + Logger.put_process_level(self(), :debug) + + log = + capture_log(fn -> + assert GenRpc.abcast(Node.list(), :some_process_name, "a message", []) == :ok + # We have to wait for gen_rpc logs to show up + Process.sleep(100) + end) + + assert log =~ "[error] event=connect_to_remote_server" + + refute_receive _any + end end describe "multicast/4" do diff --git a/test/realtime/messages_test.exs b/test/realtime/messages_test.exs index 3bef9a5e0..cca0ce742 100644 --- a/test/realtime/messages_test.exs +++ b/test/realtime/messages_test.exs @@ -16,32 +16,221 @@ defmodule Realtime.MessagesTest do %{conn: conn, tenant: tenant, date_start: date_start, date_end: date_end} end - test "delete_old_messages/1 deletes messages older than 72 hours", %{ - conn: conn, - tenant: tenant, - date_start: date_start, - date_end: date_end - } do - utc_now = NaiveDateTime.utc_now() - limit = NaiveDateTime.add(utc_now, -72, :hour) - - messages = - for date <- Date.range(date_start, date_end) do - inserted_at = date |> NaiveDateTime.new!(Time.new!(0, 0, 0)) - message_fixture(tenant, %{inserted_at: inserted_at}) + describe "replay/5" do + test "invalid replay params" do + assert Messages.replay(self(), "a topic", "not a number", 123) == + {:error, :invalid_replay_params} + + assert Messages.replay(self(), "a topic", 123, "not a number") == + {:error, :invalid_replay_params} + + assert Messages.replay(self(), "a topic", 253_402_300_800_000, 10) == + {:error, :invalid_replay_params} + end + + test "empty replay", %{conn: conn} do + assert Messages.replay(conn, "test", 0, 10) == {:ok, [], MapSet.new()} + end + + test "replay respects limit", %{conn: conn, tenant: tenant} do + m1 = + message_fixture(tenant, %{ + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-1, :minute), + "event" => "new", + "extension" => "broadcast", + "topic" => "test", + "private" => true, + "payload" => %{"value" => "new"} + }) + + message_fixture(tenant, %{ + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-2, :minute), + "event" => "old", + "extension" => "broadcast", + "topic" => "test", + "private" => true, + "payload" => %{"value" => "old"} + }) + + assert Messages.replay(conn, "test", 0, 1) == {:ok, [m1], MapSet.new([m1.id])} + end + + test "replay private topic only", %{conn: conn, tenant: tenant} do + privatem = + message_fixture(tenant, %{ + "private" => true, + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-1, :minute), + "event" => "new", + "extension" => "broadcast", + "topic" => "test", + "payload" => %{"value" => "new"} + }) + + message_fixture(tenant, %{ + "private" => false, + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-2, :minute), + "event" => "old", + "extension" => "broadcast", + "topic" => "test", + "payload" => %{"value" => "old"} + }) + + assert Messages.replay(conn, "test", 0, 10) == {:ok, [privatem], MapSet.new([privatem.id])} + end + + test "replay extension=broadcast", %{conn: conn, tenant: tenant} do + privatem = + message_fixture(tenant, %{ + "private" => true, + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-1, :minute), + "event" => "new", + "extension" => "broadcast", + "topic" => "test", + "payload" => %{"value" => "new"} + }) + + message_fixture(tenant, %{ + "private" => true, + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-2, :minute), + "event" => "old", + "extension" => "presence", + "topic" => "test", + "payload" => %{"value" => "old"} + }) + + assert Messages.replay(conn, "test", 0, 10) == {:ok, [privatem], MapSet.new([privatem.id])} + end + + test "replay respects since", %{conn: conn, tenant: tenant} do + m1 = + message_fixture(tenant, %{ + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-2, :minute), + "event" => "first", + "extension" => "broadcast", + "topic" => "test", + "private" => true, + "payload" => %{"value" => "first"} + }) + + m2 = + message_fixture(tenant, %{ + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-1, :minute), + "event" => "second", + "extension" => "broadcast", + "topic" => "test", + "private" => true, + "payload" => %{"value" => "second"} + }) + + message_fixture(tenant, %{ + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-10, :minute), + "event" => "old", + "extension" => "broadcast", + "topic" => "test", + "private" => true, + "payload" => %{"value" => "old"} + }) + + since = DateTime.utc_now() |> DateTime.add(-3, :minute) |> DateTime.to_unix(:millisecond) + + assert Messages.replay(conn, "test", since, 10) == {:ok, [m1, m2], MapSet.new([m1.id, m2.id])} + end + + test "replay respects hard max limit of 25", %{conn: conn, tenant: tenant} do + for _i <- 1..30 do + message_fixture(tenant, %{ + "inserted_at" => NaiveDateTime.utc_now(), + "event" => "event", + "extension" => "broadcast", + "topic" => "test", + "private" => true, + "payload" => %{"value" => "message"} + }) end - assert length(messages) == 11 + assert {:ok, messages, set} = Messages.replay(conn, "test", 0, 30) + assert length(messages) == 25 + assert MapSet.size(set) == 25 + end + + test "replay respects hard min limit of 1", %{conn: conn, tenant: tenant} do + message_fixture(tenant, %{ + "inserted_at" => NaiveDateTime.utc_now(), + "event" => "event", + "extension" => "broadcast", + "topic" => "test", + "private" => true, + "payload" => %{"value" => "message"} + }) + + assert {:ok, messages, set} = Messages.replay(conn, "test", 0, 0) + assert length(messages) == 1 + assert MapSet.size(set) == 1 + end + + test "distributed replay", %{conn: conn, tenant: tenant} do + m = + message_fixture(tenant, %{ + "inserted_at" => NaiveDateTime.utc_now(), + "event" => "event", + "extension" => "broadcast", + "topic" => "test", + "private" => true, + "payload" => %{"value" => "message"} + }) + + {:ok, node} = Clustered.start() + + # Call remote node passing the database connection that is local to this node + assert :erpc.call(node, Messages, :replay, [conn, "test", 0, 30]) == {:ok, [m], MapSet.new([m.id])} + end + + test "distributed replay error", %{tenant: tenant} do + message_fixture(tenant, %{ + "inserted_at" => NaiveDateTime.utc_now(), + "event" => "event", + "extension" => "broadcast", + "topic" => "test", + "private" => true, + "payload" => %{"value" => "message"} + }) + + {:ok, node} = Clustered.start() + + # Call remote node passing the database connection that is local to this node + pid = spawn(fn -> :ok end) + assert :erpc.call(node, Messages, :replay, [pid, "test", 0, 30]) == {:error, :failed_to_replay_messages} + end + end + + describe "delete_old_messages/1" do + test "delete_old_messages/1 deletes messages older than 72 hours", %{ + conn: conn, + tenant: tenant, + date_start: date_start, + date_end: date_end + } do + utc_now = NaiveDateTime.utc_now() + limit = NaiveDateTime.add(utc_now, -72, :hour) + + messages = + for date <- Date.range(date_start, date_end) do + inserted_at = date |> NaiveDateTime.new!(Time.new!(0, 0, 0)) + message_fixture(tenant, %{inserted_at: inserted_at}) + end + + assert length(messages) == 11 - to_keep = - Enum.reject( - messages, - &(NaiveDateTime.compare(limit, &1.inserted_at) == :gt) - ) + to_keep = + Enum.reject( + messages, + &(NaiveDateTime.compare(NaiveDateTime.beginning_of_day(limit), &1.inserted_at) == :gt) + ) - assert :ok = Messages.delete_old_messages(conn) - {:ok, current} = Repo.all(conn, from(m in Message), Message) + assert :ok = Messages.delete_old_messages(conn) + {:ok, current} = Repo.all(conn, from(m in Message), Message) - assert Enum.sort(current) == Enum.sort(to_keep) + assert Enum.sort(current) == Enum.sort(to_keep) + end end end diff --git a/test/realtime/monitoring/erl_sys_mon_test.exs b/test/realtime/monitoring/erl_sys_mon_test.exs index b1e122d58..e9c7b87b7 100644 --- a/test/realtime/monitoring/erl_sys_mon_test.exs +++ b/test/realtime/monitoring/erl_sys_mon_test.exs @@ -5,16 +5,25 @@ defmodule Realtime.Monitoring.ErlSysMonTest do describe "system monitoring" do test "logs system monitor events" do - start_supervised!({ErlSysMon, config: [{:long_message_queue, {1, 10}}]}) + start_supervised!({ErlSysMon, config: [{:long_message_queue, {1, 100}}]}) - assert capture_log(fn -> - Task.async(fn -> - Enum.map(1..1000, &send(self(), &1)) - # Wait for ErlSysMon to notice - Process.sleep(4000) - end) - |> Task.await() - end) =~ "Realtime.ErlSysMon message:" + log = + capture_log(fn -> + Task.async(fn -> + Process.register(self(), TestProcess) + Enum.map(1..1000, &send(self(), &1)) + # Wait for ErlSysMon to notice + Process.sleep(4000) + end) + |> Task.await() + end) + + assert log =~ "Realtime.ErlSysMon message:" + assert log =~ "$initial_call\", {Realtime.Monitoring.ErlSysMonTest" + assert log =~ "ancestors\", [#{inspect(self())}]" + assert log =~ "registered_name: TestProcess" + assert log =~ "message_queue_len: " + assert log =~ "total_heap_size: " end end end diff --git a/test/realtime/monitoring/prom_ex/plugins/phoenix_test.exs b/test/realtime/monitoring/prom_ex/plugins/phoenix_test.exs index a73e6e2f5..ad9198c97 100644 --- a/test/realtime/monitoring/prom_ex/plugins/phoenix_test.exs +++ b/test/realtime/monitoring/prom_ex/plugins/phoenix_test.exs @@ -1,6 +1,7 @@ defmodule Realtime.PromEx.Plugins.PhoenixTest do use Realtime.DataCase, async: false alias Realtime.PromEx.Plugins + alias Realtime.Integration.WebsocketClient defmodule MetricsTest do use PromEx, otp_app: :realtime_test_phoenix @@ -13,16 +14,20 @@ defmodule Realtime.PromEx.Plugins.PhoenixTest do describe "pooling metrics" do setup do start_supervised!(MetricsTest) - :ok + %{tenant: Containers.checkout_tenant(run_migrations: true)} end - test "number of connections" do - # Trigger a connection by making a request to the endpoint - url = RealtimeWeb.Endpoint.url() <> "/healthcheck" - Req.get!(url) + test "number of connections", %{tenant: tenant} do + {:ok, token} = token_valid(tenant, "anon", %{}) + + {:ok, _} = + WebsocketClient.connect(self(), uri(tenant, 4002), Phoenix.Socket.V1.JSONSerializer, [{"x-api-key", token}]) + + {:ok, _} = + WebsocketClient.connect(self(), uri(tenant, 4002), Phoenix.Socket.V1.JSONSerializer, [{"x-api-key", token}]) Process.sleep(200) - assert metric_value() > 0 + assert metric_value() >= 2 end end diff --git a/test/realtime/syn_handler_test.exs b/test/realtime/syn_handler_test.exs index 2b27cf322..1cf0d3bad 100644 --- a/test/realtime/syn_handler_test.exs +++ b/test/realtime/syn_handler_test.exs @@ -168,32 +168,40 @@ defmodule Realtime.SynHandlerTest do test "it handles :syn_conflict_resolution reason" do reason = :syn_conflict_resolution + pid = self() log = capture_log(fn -> - assert SynHandler.on_process_unregistered(@mod, @name, self(), %{}, reason) == :ok + assert SynHandler.on_process_unregistered(@mod, @name, pid, %{}, reason) == :ok end) topic = "#{@topic}:#{@name}" event = "#{@topic}_down" assert log =~ "#{@mod} terminated due to syn conflict resolution: #{inspect(@name)} #{inspect(self())}" - assert_receive %Phoenix.Socket.Broadcast{topic: ^topic, event: ^event, payload: nil} + assert_receive %Phoenix.Socket.Broadcast{topic: ^topic, event: ^event, payload: %{reason: ^reason, pid: ^pid}} end test "it handles other reasons" do reason = :other_reason + pid = self() log = capture_log(fn -> - assert SynHandler.on_process_unregistered(@mod, @name, self(), %{}, reason) == :ok + assert SynHandler.on_process_unregistered(@mod, @name, pid, %{}, reason) == :ok end) topic = "#{@topic}:#{@name}" event = "#{@topic}_down" refute log =~ "#{@mod} terminated: #{inspect(@name)} #{node()}" - assert_receive %Phoenix.Socket.Broadcast{topic: ^topic, event: ^event, payload: nil}, 500 + + assert_receive %Phoenix.Socket.Broadcast{ + topic: ^topic, + event: ^event, + payload: %{reason: ^reason, pid: ^pid} + }, + 500 end end end diff --git a/test/realtime/tenants/connect_test.exs b/test/realtime/tenants/connect_test.exs index 290fb1c8d..8ba462b27 100644 --- a/test/realtime/tenants/connect_test.exs +++ b/test/realtime/tenants/connect_test.exs @@ -78,12 +78,55 @@ defmodule Realtime.Tenants.ConnectTest do assert_receive {:ok, ^pid} end - test "more than 5 seconds passed error out", %{tenant: tenant} do + test "more than 15 seconds passed error out", %{tenant: tenant} do parent = self() # Let's slow down Connect starting expect(Database, :check_tenant_connection, fn t -> - :timer.sleep(5500) + Process.sleep(15500) + call_original(Database, :check_tenant_connection, [t]) + end) + + connect = fn -> send(parent, Connect.lookup_or_start_connection(tenant.external_id)) end + + spawn(connect) + spawn(connect) + + {:error, :initializing} = Connect.lookup_or_start_connection(tenant.external_id) + # The above call waited 15 seconds + assert_receive {:error, :initializing} + assert_receive {:error, :initializing} + + # This one will succeed + {:ok, _pid} = Connect.lookup_or_start_connection(tenant.external_id) + end + + test "too many db connections", %{tenant: tenant} do + extension = %{ + "type" => "postgres_cdc_rls", + "settings" => %{ + "db_host" => "127.0.0.1", + "db_name" => "postgres", + "db_user" => "supabase_admin", + "db_password" => "postgres", + "poll_interval" => 100, + "poll_max_changes" => 100, + "poll_max_record_bytes" => 1_048_576, + "region" => "us-east-1", + "ssl_enforced" => false, + "db_pool" => 100, + "subcriber_pool_size" => 100, + "subs_pool_size" => 100 + } + } + + {:ok, tenant} = update_extension(tenant, extension) + + parent = self() + + # Let's slow down Connect starting + expect(Database, :check_tenant_connection, fn t -> + :timer.sleep(1000) call_original(Database, :check_tenant_connection, [t]) end) @@ -97,12 +140,13 @@ defmodule Realtime.Tenants.ConnectTest do spawn(connect) spawn(connect) - {:error, :tenant_database_unavailable} = Connect.lookup_or_start_connection(tenant.external_id) + # This one should block and wait for the first Connect + {:error, :tenant_db_too_many_connections} = Connect.lookup_or_start_connection(tenant.external_id) - # Only one will succeed the others timed out waiting - assert_receive {:error, :tenant_database_unavailable} - assert_receive {:error, :tenant_database_unavailable} - assert_receive {:ok, _pid}, 7000 + assert_receive {:error, :tenant_db_too_many_connections} + assert_receive {:error, :tenant_db_too_many_connections} + assert_receive {:error, :tenant_db_too_many_connections} + refute_receive _any end end @@ -267,6 +311,34 @@ defmodule Realtime.Tenants.ConnectTest do assert {:error, :tenant_suspended} = Connect.lookup_or_start_connection(tenant.external_id) end + test "tenant not able to connect if database has not enough connections", %{ + tenant: tenant + } do + extension = %{ + "type" => "postgres_cdc_rls", + "settings" => %{ + "db_host" => "127.0.0.1", + "db_name" => "postgres", + "db_user" => "supabase_admin", + "db_password" => "postgres", + "poll_interval" => 100, + "poll_max_changes" => 100, + "poll_max_record_bytes" => 1_048_576, + "region" => "us-east-1", + "ssl_enforced" => false, + "db_pool" => 100, + "subcriber_pool_size" => 100, + "subs_pool_size" => 100 + } + } + + {:ok, tenant} = update_extension(tenant, extension) + + assert capture_log(fn -> + assert {:error, :tenant_db_too_many_connections} = Connect.lookup_or_start_connection(tenant.external_id) + end) =~ ~r/Only \d+ available connections\. At least \d+ connections are required/ + end + test "handles tenant suspension and unsuspension in a reactive way", %{tenant: tenant} do assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id) assert Connect.ready?(tenant.external_id) @@ -352,11 +424,13 @@ defmodule Realtime.Tenants.ConnectTest do assert replication_connection_before == replication_connection_after end - test "on replication connection postgres pid being stopped, also kills the Connect module", %{tenant: tenant} do + test "on replication connection postgres pid being stopped, Connect module recovers it", %{tenant: tenant} do assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id) assert Connect.ready?(tenant.external_id) replication_connection_pid = ReplicationConnection.whereis(tenant.external_id) + Process.monitor(replication_connection_pid) + assert Process.alive?(replication_connection_pid) pid = Connect.whereis(tenant.external_id) @@ -366,21 +440,33 @@ defmodule Realtime.Tenants.ConnectTest do [] ) - assert_process_down(replication_connection_pid) - assert_process_down(pid) + assert_receive {:DOWN, _, :process, ^replication_connection_pid, _} + + Process.sleep(1500) + new_replication_connection_pid = ReplicationConnection.whereis(tenant.external_id) + + assert replication_connection_pid != new_replication_connection_pid + assert Process.alive?(new_replication_connection_pid) + assert Process.alive?(pid) end - test "on replication connection exit, also kills the Connect module", %{tenant: tenant} do + test "on replication connection exit, Connect module recovers it", %{tenant: tenant} do assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id) assert Connect.ready?(tenant.external_id) replication_connection_pid = ReplicationConnection.whereis(tenant.external_id) + Process.monitor(replication_connection_pid) assert Process.alive?(replication_connection_pid) pid = Connect.whereis(tenant.external_id) Process.exit(replication_connection_pid, :kill) + assert_receive {:DOWN, _, :process, ^replication_connection_pid, _} - assert_process_down(replication_connection_pid) - assert_process_down(pid) + Process.sleep(1500) + new_replication_connection_pid = ReplicationConnection.whereis(tenant.external_id) + + assert replication_connection_pid != new_replication_connection_pid + assert Process.alive?(new_replication_connection_pid) + assert Process.alive?(pid) end test "handles max_wal_senders by logging the correct operational code", %{tenant: tenant} do @@ -449,30 +535,6 @@ defmodule Realtime.Tenants.ConnectTest do test "if tenant does not exist, does nothing" do assert :ok = Connect.shutdown("none") end - - test "tenant not able to connect if database has not enough connections", %{tenant: tenant} do - extension = %{ - "type" => "postgres_cdc_rls", - "settings" => %{ - "db_host" => "127.0.0.1", - "db_name" => "postgres", - "db_user" => "supabase_admin", - "db_password" => "postgres", - "poll_interval" => 100, - "poll_max_changes" => 100, - "poll_max_record_bytes" => 1_048_576, - "region" => "us-east-1", - "ssl_enforced" => false, - "db_pool" => 100, - "subcriber_pool_size" => 100, - "subs_pool_size" => 100 - } - } - - {:ok, tenant} = update_extension(tenant, extension) - - assert {:error, :tenant_db_too_many_connections} = Connect.lookup_or_start_connection(tenant.external_id) - end end describe "registers into local registry" do diff --git a/test/realtime/tenants/janitor/maintenance_task_test.exs b/test/realtime/tenants/janitor/maintenance_task_test.exs index f4c51436e..4c42b7ab3 100644 --- a/test/realtime/tenants/janitor/maintenance_task_test.exs +++ b/test/realtime/tenants/janitor/maintenance_task_test.exs @@ -15,9 +15,15 @@ defmodule Realtime.Tenants.Janitor.MaintenanceTaskTest do end test "cleans messages older than 72 hours and creates partitions", %{tenant: tenant} do + {:ok, conn} = Database.connect(tenant, "realtime_test", :stop) + utc_now = NaiveDateTime.utc_now() limit = NaiveDateTime.add(utc_now, -72, :hour) + date_start = Date.utc_today() |> Date.add(-10) + date_end = Date.utc_today() + create_messages_partitions(conn, date_start, date_end) + messages = for days <- -5..0 do inserted_at = NaiveDateTime.add(utc_now, days, :day) @@ -27,12 +33,11 @@ defmodule Realtime.Tenants.Janitor.MaintenanceTaskTest do to_keep = messages - |> Enum.reject(&(NaiveDateTime.compare(limit, &1.inserted_at) == :gt)) + |> Enum.reject(&(NaiveDateTime.compare(NaiveDateTime.beginning_of_day(limit), &1.inserted_at) == :gt)) |> MapSet.new() assert MaintenanceTask.run(tenant.external_id) == :ok - {:ok, conn} = Database.connect(tenant, "realtime_test", :stop) {:ok, res} = Repo.all(conn, from(m in Message), Message) verify_partitions(conn) @@ -80,7 +85,7 @@ defmodule Realtime.Tenants.Janitor.MaintenanceTaskTest do defp verify_partitions(conn) do today = Date.utc_today() - yesterday = Date.add(today, -1) + yesterday = Date.add(today, -3) future = Date.add(today, 3) dates = Date.range(yesterday, future) diff --git a/test/realtime/tenants/janitor_test.exs b/test/realtime/tenants/janitor_test.exs index 4ac1a0eda..fb597a4c4 100644 --- a/test/realtime/tenants/janitor_test.exs +++ b/test/realtime/tenants/janitor_test.exs @@ -31,6 +31,14 @@ defmodule Realtime.Tenants.JanitorTest do end ) + date_start = Date.utc_today() |> Date.add(-10) + date_end = Date.utc_today() + + Enum.map(tenants, fn tenant -> + {:ok, conn} = Database.connect(tenant, "realtime_test", :stop) + create_messages_partitions(conn, date_start, date_end) + end) + start_supervised!( {Task.Supervisor, name: Realtime.Tenants.Janitor.TaskSupervisor, max_children: 5, max_seconds: 500, max_restarts: 1} @@ -62,7 +70,7 @@ defmodule Realtime.Tenants.JanitorTest do to_keep = messages - |> Enum.reject(&(NaiveDateTime.compare(limit, &1.inserted_at) == :gt)) + |> Enum.reject(&(NaiveDateTime.compare(NaiveDateTime.beginning_of_day(limit), &1.inserted_at) == :gt)) |> MapSet.new() start_supervised!(Janitor) @@ -105,7 +113,7 @@ defmodule Realtime.Tenants.JanitorTest do to_keep = messages - |> Enum.reject(&(NaiveDateTime.compare(limit, &1.inserted_at) == :gt)) + |> Enum.reject(&(NaiveDateTime.compare(NaiveDateTime.beginning_of_day(limit), &1.inserted_at) == :gt)) |> MapSet.new() start_supervised!(Janitor) @@ -162,7 +170,7 @@ defmodule Realtime.Tenants.JanitorTest do defp verify_partitions(conn) do today = Date.utc_today() - yesterday = Date.add(today, -1) + yesterday = Date.add(today, -3) future = Date.add(today, 3) dates = Date.range(yesterday, future) diff --git a/test/realtime/tenants/replication_connection_test.exs b/test/realtime/tenants/replication_connection_test.exs index 783270313..b28a23988 100644 --- a/test/realtime/tenants/replication_connection_test.exs +++ b/test/realtime/tenants/replication_connection_test.exs @@ -98,6 +98,7 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do payload = %{ "event" => "INSERT", + "meta" => %{"id" => row.id}, "payload" => %{ "id" => row.id, "value" => value @@ -139,8 +140,9 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do "event" => "broadcast", "payload" => %{ "event" => "INSERT", + "meta" => %{"id" => id}, "payload" => %{ - "id" => _, + "id" => id, "value" => ^value } }, @@ -222,21 +224,26 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do "payload" => %{"value" => "something"} }) + fixture_id = fixture.id + assert_receive {:socket_push, :text, data}, 500 message = data |> IO.iodata_to_binary() |> Jason.decode!() assert %{ "event" => "broadcast", - "payload" => %{"event" => "INSERT", "payload" => payload, "type" => "broadcast"}, + "payload" => %{ + "event" => "INSERT", + "meta" => %{"id" => ^fixture_id}, + "payload" => payload, + "type" => "broadcast" + }, "ref" => nil, "topic" => ^topic } = message - id = fixture.id - assert payload == %{ "value" => "something", - "id" => id + "id" => fixture_id } end @@ -252,19 +259,25 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do payload = %{"value" => "something", "id" => "123456"} - message_fixture(tenant, %{ - "topic" => topic, - "private" => true, - "event" => "INSERT", - "payload" => payload - }) + %{id: fixture_id} = + message_fixture(tenant, %{ + "topic" => topic, + "private" => true, + "event" => "INSERT", + "payload" => payload + }) assert_receive {:socket_push, :text, data}, 500 message = data |> IO.iodata_to_binary() |> Jason.decode!() assert %{ "event" => "broadcast", - "payload" => %{"event" => "INSERT", "payload" => ^payload, "type" => "broadcast"}, + "payload" => %{ + "meta" => %{"id" => ^fixture_id}, + "event" => "INSERT", + "payload" => ^payload, + "type" => "broadcast" + }, "ref" => nil, "topic" => ^topic } = message @@ -331,6 +344,26 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do assert {:error, :max_wal_senders_reached} = ReplicationConnection.start(tenant, self()) end + + test "handles WAL pressure gracefully", %{tenant: tenant} do + {:ok, replication_pid} = ReplicationConnection.start(tenant, self()) + + {:ok, conn} = Database.connect(tenant, "realtime_test", :stop) + on_exit(fn -> Process.exit(conn, :normal) end) + + large_payload = String.duplicate("x", 10 * 1024 * 1024) + + for i <- 1..5 do + message_fixture_with_conn(tenant, conn, %{ + "topic" => "stress_#{i}", + "private" => true, + "event" => "INSERT", + "payload" => %{"data" => large_payload} + }) + end + + assert Process.alive?(replication_pid) + end end describe "whereis/1" do @@ -409,4 +442,20 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do ref = Process.monitor(pid) assert_receive {:DOWN, ^ref, :process, ^pid, _reason}, timeout end + + defp message_fixture_with_conn(_tenant, conn, override) do + create_attrs = %{ + "topic" => random_string(), + "extension" => "broadcast" + } + + override = override |> Enum.map(fn {k, v} -> {"#{k}", v} end) |> Map.new() + + {:ok, message} = + create_attrs + |> Map.merge(override) + |> TenantConnection.create_message(conn) + + message + end end diff --git a/test/realtime_web/channels/payloads/join_test.exs b/test/realtime_web/channels/payloads/join_test.exs index 32bf1b397..c1ea54a67 100644 --- a/test/realtime_web/channels/payloads/join_test.exs +++ b/test/realtime_web/channels/payloads/join_test.exs @@ -6,6 +6,7 @@ defmodule RealtimeWeb.Channels.Payloads.JoinTest do alias RealtimeWeb.Channels.Payloads.Join alias RealtimeWeb.Channels.Payloads.Config alias RealtimeWeb.Channels.Payloads.Broadcast + alias RealtimeWeb.Channels.Payloads.Broadcast.Replay alias RealtimeWeb.Channels.Payloads.Presence alias RealtimeWeb.Channels.Payloads.PostgresChange @@ -17,7 +18,7 @@ defmodule RealtimeWeb.Channels.Payloads.JoinTest do config = %{ "config" => %{ "private" => false, - "broadcast" => %{"ack" => false, "self" => false}, + "broadcast" => %{"ack" => false, "self" => false, "replay" => %{"since" => 1, "limit" => 10}}, "presence" => %{"enabled" => true, "key" => key}, "postgres_changes" => [ %{"event" => "INSERT", "schema" => "public", "table" => "users", "filter" => "id=eq.1"}, @@ -37,8 +38,9 @@ defmodule RealtimeWeb.Channels.Payloads.JoinTest do postgres_changes: postgres_changes } = config - assert %Broadcast{ack: false, self: false} = broadcast + assert %Broadcast{ack: false, self: false, replay: replay} = broadcast assert %Presence{enabled: true, key: ^key} = presence + assert %Replay{since: 1, limit: 10} = replay assert [ %PostgresChange{event: "INSERT", schema: "public", table: "users", filter: "id=eq.1"}, @@ -56,6 +58,17 @@ defmodule RealtimeWeb.Channels.Payloads.JoinTest do assert is_binary(key) end + test "invalid replay" do + config = %{"config" => %{"broadcast" => %{"replay" => 123}}} + + assert { + :error, + :invalid_join_payload, + %{config: %{broadcast: %{replay: ["unable to parse, expected a map"]}}} + } = + Join.validate(config) + end + test "missing enabled presence defaults to true" do config = %{"config" => %{"presence" => %{}}} diff --git a/test/realtime_web/channels/realtime_channel/logging_test.exs b/test/realtime_web/channels/realtime_channel/logging_test.exs index 92634daef..cd131d16e 100644 --- a/test/realtime_web/channels/realtime_channel/logging_test.exs +++ b/test/realtime_web/channels/realtime_channel/logging_test.exs @@ -37,6 +37,7 @@ defmodule RealtimeWeb.RealtimeChannel.LoggingTest do assert log =~ "sub=#{sub}" assert log =~ "exp=#{exp}" assert log =~ "iss=#{iss}" + assert log =~ "error_code=TestError" end end @@ -57,6 +58,7 @@ defmodule RealtimeWeb.RealtimeChannel.LoggingTest do assert log =~ "sub=#{sub}" assert log =~ "exp=#{exp}" assert log =~ "iss=#{iss}" + assert log =~ "error_code=TestWarning" end end @@ -67,10 +69,14 @@ defmodule RealtimeWeb.RealtimeChannel.LoggingTest do for log_level <- log_levels do socket = %{assigns: %{log_level: log_level, tenant: random_string(), access_token: "test_token"}} - assert capture_log(fn -> - assert Logging.maybe_log_error(socket, "TestCode", "test message") == - {:error, %{reason: "TestCode: test message"}} - end) =~ "TestCode: test message" + log = + capture_log(fn -> + assert Logging.maybe_log_error(socket, "TestCode", "test message") == + {:error, %{reason: "TestCode: test message"}} + end) + + assert log =~ "TestCode: test message" + assert log =~ "error_code=TestCode" assert capture_log(fn -> assert Logging.maybe_log_error(socket, "TestCode", %{a: "b"}) == @@ -103,11 +109,14 @@ defmodule RealtimeWeb.RealtimeChannel.LoggingTest do for log_level <- log_levels do socket = %{assigns: %{log_level: log_level, tenant: random_string(), access_token: "test_token"}} - assert capture_log(fn -> - assert Logging.maybe_log_warning(socket, "TestCode", "test message") == - {:error, %{reason: "TestCode: test message"}} - end) =~ - "TestCode: test message" + log = + capture_log(fn -> + assert Logging.maybe_log_warning(socket, "TestCode", "test message") == + {:error, %{reason: "TestCode: test message"}} + end) + + assert log =~ "TestCode: test message" + assert log =~ "error_code=TestCode" assert capture_log(fn -> assert Logging.maybe_log_warning(socket, "TestCode", %{a: "b"}) == diff --git a/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs b/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs index 7a9e2eb25..44ce83b99 100644 --- a/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs +++ b/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs @@ -16,12 +16,12 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do describe "fastlane_metadata/5" do test "info level" do assert MessageDispatcher.fastlane_metadata(self(), Serializer, "realtime:topic", :info, "tenant_id") == - {:realtime_channel_fastlane, self(), Serializer, "realtime:topic", {:log, "tenant_id"}} + {:rc_fastlane, self(), Serializer, "realtime:topic", {:log, "tenant_id"}, MapSet.new()} end test "non-info level" do assert MessageDispatcher.fastlane_metadata(self(), Serializer, "realtime:topic", :warning, "tenant_id") == - {:realtime_channel_fastlane, self(), Serializer, "realtime:topic"} + {:rc_fastlane, self(), Serializer, "realtime:topic", MapSet.new()} end end @@ -50,12 +50,11 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do from_pid = :erlang.list_to_pid(~c'<0.2.1>') subscribers = [ - {subscriber_pid, {:realtime_channel_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}}}, - {subscriber_pid, {:realtime_channel_fastlane, self(), TestSerializer, "realtime:topic"}} + {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}, MapSet.new()}}, + {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", MapSet.new()}} ] msg = %Broadcast{topic: "some:other:topic", event: "event", payload: %{data: "test"}} - require Logger log = capture_log(fn -> @@ -75,6 +74,87 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do refute_receive _any end + test "does not dispatch messages to fastlane subscribers if they already replayed it" do + parent = self() + + subscriber_pid = + spawn(fn -> + loop = fn loop -> + receive do + msg -> + send(parent, {:subscriber, msg}) + loop.(loop) + end + end + + loop.(loop) + end) + + from_pid = :erlang.list_to_pid(~c'<0.2.1>') + replaeyd_message_ids = MapSet.new(["123"]) + + subscribers = [ + {subscriber_pid, + {:rc_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}, replaeyd_message_ids}}, + {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", replaeyd_message_ids}} + ] + + msg = %Broadcast{ + topic: "some:other:topic", + event: "event", + payload: %{"data" => "test", "meta" => %{"id" => "123"}} + } + + assert MessageDispatcher.dispatch(subscribers, from_pid, msg) == :ok + + assert Agent.get(TestSerializer, & &1) == 0 + + refute_receive _any + end + + test "payload is not a map" do + parent = self() + + subscriber_pid = + spawn(fn -> + loop = fn loop -> + receive do + msg -> + send(parent, {:subscriber, msg}) + loop.(loop) + end + end + + loop.(loop) + end) + + from_pid = :erlang.list_to_pid(~c'<0.2.1>') + + subscribers = [ + {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}, MapSet.new()}}, + {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", MapSet.new()}} + ] + + msg = %Broadcast{topic: "some:other:topic", event: "event", payload: "not a map"} + + log = + capture_log(fn -> + assert MessageDispatcher.dispatch(subscribers, from_pid, msg) == :ok + end) + + assert log =~ "Received message on realtime:topic with payload: #{inspect(msg, pretty: true)}" + + assert_receive {:encoded, %Broadcast{event: "event", payload: "not a map", topic: "realtime:topic"}} + assert_receive {:encoded, %Broadcast{event: "event", payload: "not a map", topic: "realtime:topic"}} + + assert Agent.get(TestSerializer, & &1) == 1 + + assert_receive {:subscriber, :update_rate_counter} + assert_receive {:subscriber, :update_rate_counter} + + refute_receive _any + end + test "dispatches messages to non fastlane subscribers" do from_pid = :erlang.list_to_pid(~c'<0.2.1>') diff --git a/test/realtime_web/channels/realtime_channel/presence_handler_test.exs b/test/realtime_web/channels/realtime_channel/presence_handler_test.exs index e5ecd32ad..0cdf422e2 100644 --- a/test/realtime_web/channels/realtime_channel/presence_handler_test.exs +++ b/test/realtime_web/channels/realtime_channel/presence_handler_test.exs @@ -99,7 +99,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do end end - describe "handle/2" do + describe "handle/3" do test "with true policy and is private, user can track their presence and changes", %{ tenant: tenant, topic: topic, @@ -142,7 +142,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do policies = %Policies{presence: %PresencePolicies{read: false, write: false}} socket = socket_fixture(tenant, topic, key, policies: policies, private?: false) - assert {:ok, _socket} = PresenceHandler.handle(%{"event" => "track"}, socket) + assert {:ok, _socket} = PresenceHandler.handle(%{"event" => "track"}, nil, socket) topic = socket.assigns.tenant_topic assert_receive %Broadcast{topic: ^topic, event: "presence_diff", payload: %{joins: joins, leaves: %{}}} @@ -229,6 +229,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do assert {:ok, socket} = PresenceHandler.handle( %{"event" => "track", "payload" => %{"metadata" => random_string()}}, + nil, socket ) @@ -248,7 +249,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do assert log =~ "UnknownPresenceEvent" end - test "socket with presence enabled false will ignore presence events in public channel", %{ + test "socket with presence enabled false will ignore non-track presence events in public channel", %{ tenant: tenant, topic: topic } do @@ -256,12 +257,12 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do policies = %Policies{presence: %PresencePolicies{read: true, write: true}} socket = socket_fixture(tenant, topic, key, policies: policies, private?: false, enabled?: false) - assert {:ok, _socket} = PresenceHandler.handle(%{"event" => "track"}, socket) + assert {:ok, _socket} = PresenceHandler.handle(%{"event" => "untrack"}, nil, socket) topic = socket.assigns.tenant_topic refute_receive %Broadcast{topic: ^topic, event: "presence_diff"} end - test "socket with presence enabled false will ignore presence events in private channel", %{ + test "socket with presence enabled false will ignore non-track presence events in private channel", %{ tenant: tenant, topic: topic, db_conn: db_conn @@ -270,11 +271,80 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do policies = %Policies{presence: %PresencePolicies{read: true, write: true}} socket = socket_fixture(tenant, topic, key, policies: policies, private?: false, enabled?: false) - assert {:ok, _socket} = PresenceHandler.handle(%{"event" => "track"}, db_conn, socket) + assert {:ok, _socket} = PresenceHandler.handle(%{"event" => "untrack"}, db_conn, socket) topic = socket.assigns.tenant_topic refute_receive %Broadcast{topic: ^topic, event: "presence_diff"} end + test "socket with presence disabled will enable presence on track message for public channel", %{ + tenant: tenant, + topic: topic + } do + key = random_string() + policies = %Policies{presence: %PresencePolicies{read: true, write: true}} + socket = socket_fixture(tenant, topic, key, policies: policies, private?: false, enabled?: false) + + refute socket.assigns.presence_enabled? + + assert {:ok, updated_socket} = PresenceHandler.handle(%{"event" => "track"}, nil, socket) + + assert updated_socket.assigns.presence_enabled? + topic = socket.assigns.tenant_topic + assert_receive %Broadcast{topic: ^topic, event: "presence_diff", payload: %{joins: joins, leaves: %{}}} + assert Map.has_key?(joins, key) + end + + test "socket with presence disabled will enable presence on track message for private channel", %{ + tenant: tenant, + topic: topic, + db_conn: db_conn + } do + key = random_string() + policies = %Policies{presence: %PresencePolicies{read: true, write: true}} + socket = socket_fixture(tenant, topic, key, policies: policies, private?: true, enabled?: false) + + refute socket.assigns.presence_enabled? + + assert {:ok, updated_socket} = PresenceHandler.handle(%{"event" => "track"}, db_conn, socket) + + assert updated_socket.assigns.presence_enabled? + topic = socket.assigns.tenant_topic + assert_receive %Broadcast{topic: ^topic, event: "presence_diff", payload: %{joins: joins, leaves: %{}}} + assert Map.has_key?(joins, key) + end + + test "socket with presence disabled will not enable presence on untrack message", %{ + tenant: tenant, + topic: topic, + db_conn: db_conn + } do + key = random_string() + policies = %Policies{presence: %PresencePolicies{read: true, write: true}} + socket = socket_fixture(tenant, topic, key, policies: policies, enabled?: false) + + refute socket.assigns.presence_enabled? + + assert {:ok, updated_socket} = PresenceHandler.handle(%{"event" => "untrack"}, db_conn, socket) + + refute updated_socket.assigns.presence_enabled? + topic = socket.assigns.tenant_topic + refute_receive %Broadcast{topic: ^topic, event: "presence_diff"} + end + + test "socket with presence disabled will not enable presence on unknown event", %{ + tenant: tenant, + topic: topic, + db_conn: db_conn + } do + key = random_string() + policies = %Policies{presence: %PresencePolicies{read: true, write: true}} + socket = socket_fixture(tenant, topic, key, policies: policies, enabled?: false) + + refute socket.assigns.presence_enabled? + + assert {:error, :unknown_presence_event} = PresenceHandler.handle(%{"event" => "unknown"}, db_conn, socket) + end + @tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence] test "rate limit is checked on private channel", %{tenant: tenant, topic: topic, db_conn: db_conn} do key = random_string() diff --git a/test/realtime_web/channels/realtime_channel_test.exs b/test/realtime_web/channels/realtime_channel_test.exs index 2dff83da3..0a0d8aca9 100644 --- a/test/realtime_web/channels/realtime_channel_test.exs +++ b/test/realtime_web/channels/realtime_channel_test.exs @@ -28,6 +28,216 @@ defmodule RealtimeWeb.RealtimeChannelTest do setup :rls_context + test "max heap size is set", %{tenant: tenant} do + jwt = Generators.generate_jwt_token(tenant) + {:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts(tenant, jwt)) + + assert Process.info(socket.transport_pid, :max_heap_size) == + {:max_heap_size, %{error_logger: true, include_shared_binaries: false, kill: true, size: 6_250_000}} + end + + describe "broadcast" do + @describetag policies: [:authenticated_all_topic_read] + + test "broadcast map payload", %{tenant: tenant} do + jwt = Generators.generate_jwt_token(tenant) + {:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts(tenant, jwt)) + + config = %{ + "presence" => %{"enabled" => false}, + "broadcast" => %{"self" => true} + } + + assert {:ok, _, socket} = subscribe_and_join(socket, "realtime:test", %{"config" => config}) + + push(socket, "broadcast", %{"event" => "my_event", "payload" => %{"hello" => "world"}}) + + assert_receive %Phoenix.Socket.Message{ + topic: "realtime:test", + event: "broadcast", + payload: %{"event" => "my_event", "payload" => %{"hello" => "world"}} + } + end + + test "broadcast non-map payload", %{tenant: tenant} do + jwt = Generators.generate_jwt_token(tenant) + {:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts(tenant, jwt)) + + config = %{ + "presence" => %{"enabled" => false}, + "broadcast" => %{"self" => true} + } + + assert {:ok, _, socket} = subscribe_and_join(socket, "realtime:test", %{"config" => config}) + + push(socket, "broadcast", "not a map") + + assert_receive %Phoenix.Socket.Message{ + topic: "realtime:test", + event: "broadcast", + payload: "not a map" + } + end + + test "wrong replay params", %{tenant: tenant} do + jwt = Generators.generate_jwt_token(tenant) + {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt)) + + config = %{ + "private" => true, + "broadcast" => %{ + "replay" => %{"limit" => "not a number", "since" => :erlang.system_time(:millisecond) - 5 * 60000} + } + } + + assert {:error, %{reason: "UnableToReplayMessages: Replay params are not valid"}} = + subscribe_and_join(socket, "realtime:test", %{"config" => config}) + + config = %{ + "private" => true, + "broadcast" => %{ + "replay" => %{"limit" => 1, "since" => "not a number"} + } + } + + assert {:error, %{reason: "UnableToReplayMessages: Replay params are not valid"}} = + subscribe_and_join(socket, "realtime:test", %{"config" => config}) + + config = %{ + "private" => true, + "broadcast" => %{ + "replay" => %{} + } + } + + assert {:error, %{reason: "UnableToReplayMessages: Replay params are not valid"}} = + subscribe_and_join(socket, "realtime:test", %{"config" => config}) + end + + test "failure to replay", %{tenant: tenant} do + jwt = Generators.generate_jwt_token(tenant) + {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt)) + + config = %{ + "private" => true, + "broadcast" => %{ + "replay" => %{"limit" => 12, "since" => :erlang.system_time(:millisecond) - 5 * 60000} + } + } + + Authorization + |> expect(:get_read_authorizations, fn _, _, _ -> + {:ok, + %Authorization.Policies{ + broadcast: %Authorization.Policies.BroadcastPolicies{read: true, write: nil} + }} + end) + + # Broken database connection + conn = spawn(fn -> :ok end) + Connect.lookup_or_start_connection(tenant.external_id) + {:ok, _} = :syn.update_registry(Connect, tenant.external_id, fn _pid, meta -> %{meta | conn: conn} end) + + assert {:error, %{reason: "UnableToReplayMessages: Realtime was unable to replay messages"}} = + subscribe_and_join(socket, "realtime:test", %{"config" => config}) + end + + test "replay messages on public topic not allowed", %{tenant: tenant} do + jwt = Generators.generate_jwt_token(tenant) + {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt)) + + config = %{ + "presence" => %{"enabled" => false}, + "broadcast" => %{"replay" => %{"limit" => 2, "since" => :erlang.system_time(:millisecond) - 5 * 60000}} + } + + assert { + :error, + %{reason: "UnableToReplayMessages: Replay params are not valid"} + } = subscribe_and_join(socket, "realtime:test", %{"config" => config}) + + refute_receive _any + end + + @tag policies: [:authenticated_all_topic_read] + test "replay messages on private topic", %{tenant: tenant} do + jwt = Generators.generate_jwt_token(tenant) + {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt)) + + # Old message + message_fixture(tenant, %{ + "private" => true, + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-1, :day), + "event" => "old", + "extension" => "broadcast", + "topic" => "test", + "payload" => %{"value" => "old"} + }) + + %{id: message1_id} = + message_fixture(tenant, %{ + "private" => true, + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-1, :minute), + "event" => "first", + "extension" => "broadcast", + "topic" => "test", + "payload" => %{"value" => "first"} + }) + + %{id: message2_id} = + message_fixture(tenant, %{ + "private" => true, + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-2, :minute), + "event" => "second", + "extension" => "broadcast", + "topic" => "test", + "payload" => %{"value" => "second"} + }) + + # This one should not be received because of the limit + message_fixture(tenant, %{ + "private" => true, + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-3, :minute), + "event" => "third", + "extension" => "broadcast", + "topic" => "test", + "payload" => %{"value" => "third"} + }) + + config = %{ + "private" => true, + "presence" => %{"enabled" => false}, + "broadcast" => %{"replay" => %{"limit" => 2, "since" => :erlang.system_time(:millisecond) - 5 * 60000}} + } + + assert {:ok, _, %Socket{}} = subscribe_and_join(socket, "realtime:test", %{"config" => config}) + + assert_receive %Socket.Message{ + topic: "realtime:test", + event: "broadcast", + payload: %{ + "event" => "first", + "meta" => %{"id" => ^message1_id, "replayed" => true}, + "payload" => %{"value" => "first"}, + "type" => "broadcast" + } + } + + assert_receive %Socket.Message{ + topic: "realtime:test", + event: "broadcast", + payload: %{ + "event" => "second", + "meta" => %{"id" => ^message2_id, "replayed" => true}, + "payload" => %{"value" => "second"}, + "type" => "broadcast" + } + } + + refute_receive %Socket.Message{} + end + end + describe "presence" do test "events are counted", %{tenant: tenant} do jwt = Generators.generate_jwt_token(tenant) diff --git a/test/realtime_web/tenant_broadcaster_test.exs b/test/realtime_web/tenant_broadcaster_test.exs index d9afbf641..ddda381a1 100644 --- a/test/realtime_web/tenant_broadcaster_test.exs +++ b/test/realtime_web/tenant_broadcaster_test.exs @@ -1,5 +1,5 @@ defmodule RealtimeWeb.TenantBroadcasterTest do - # Usage of Clustered + # Usage of Clustered and changing Application env use Realtime.DataCase, async: false alias Phoenix.Socket.Broadcast @@ -47,95 +47,107 @@ defmodule RealtimeWeb.TenantBroadcasterTest do pid: self() ) + original = Application.fetch_env!(:realtime, :pubsub_adapter) + on_exit(fn -> Application.put_env(:realtime, :pubsub_adapter, original) end) + Application.put_env(:realtime, :pubsub_adapter, context.pubsub_adapter) + :ok end - describe "pubsub_broadcast/4" do - test "pubsub_broadcast", %{node: node} do - message = %Broadcast{topic: @topic, event: "an event", payload: %{"a" => "b"}} - TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub) + for pubsub_adapter <- [:gen_rpc, :pg2] do + describe "pubsub_broadcast/4 #{pubsub_adapter}" do + @describetag pubsub_adapter: pubsub_adapter - assert_receive ^message + test "pubsub_broadcast", %{node: node} do + message = %Broadcast{topic: @topic, event: "an event", payload: %{"a" => "b"}} + TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub) - # Remote node received the broadcast - assert_receive {:relay, ^node, ^message} + assert_receive ^message - assert_receive { - :telemetry, - [:realtime, :tenants, :payload, :size], - %{size: 114}, - %{tenant: "realtime-dev"} - } - end + # Remote node received the broadcast + assert_receive {:relay, ^node, ^message} - test "pubsub_broadcast list payload", %{node: node} do - message = %Broadcast{topic: @topic, event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]} - TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub) + assert_receive { + :telemetry, + [:realtime, :tenants, :payload, :size], + %{size: 114}, + %{tenant: "realtime-dev"} + } + end - assert_receive ^message + test "pubsub_broadcast list payload", %{node: node} do + message = %Broadcast{topic: @topic, event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]} + TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub) - # Remote node received the broadcast - assert_receive {:relay, ^node, ^message} + assert_receive ^message - assert_receive { - :telemetry, - [:realtime, :tenants, :payload, :size], - %{size: 130}, - %{tenant: "realtime-dev"} - } - end + # Remote node received the broadcast + assert_receive {:relay, ^node, ^message} - test "pubsub_broadcast string payload", %{node: node} do - message = %Broadcast{topic: @topic, event: "an event", payload: "some text payload"} - TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub) + assert_receive { + :telemetry, + [:realtime, :tenants, :payload, :size], + %{size: 130}, + %{tenant: "realtime-dev"} + } + end - assert_receive ^message + test "pubsub_broadcast string payload", %{node: node} do + message = %Broadcast{topic: @topic, event: "an event", payload: "some text payload"} + TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub) - # Remote node received the broadcast - assert_receive {:relay, ^node, ^message} + assert_receive ^message - assert_receive { - :telemetry, - [:realtime, :tenants, :payload, :size], - %{size: 119}, - %{tenant: "realtime-dev"} - } + # Remote node received the broadcast + assert_receive {:relay, ^node, ^message} + + assert_receive { + :telemetry, + [:realtime, :tenants, :payload, :size], + %{size: 119}, + %{tenant: "realtime-dev"} + } + end end end - describe "pubsub_broadcast_from/5" do - test "pubsub_broadcast_from", %{node: node} do - parent = self() + for pubsub_adapter <- [:gen_rpc, :pg2] do + describe "pubsub_broadcast_from/5 #{pubsub_adapter}" do + @describetag pubsub_adapter: pubsub_adapter + + test "pubsub_broadcast_from", %{node: node} do + parent = self() - spawn_link(fn -> - Endpoint.subscribe(@topic) - send(parent, :ready) + spawn_link(fn -> + Endpoint.subscribe(@topic) + send(parent, :ready) - receive do - msg -> send(parent, {:other_process, msg}) - end - end) + receive do + msg -> send(parent, {:other_process, msg}) + end + end) - assert_receive :ready + assert_receive :ready - message = %Broadcast{topic: @topic, event: "an event", payload: %{"a" => "b"}} + message = %Broadcast{topic: @topic, event: "an event", payload: %{"a" => "b"}} - TenantBroadcaster.pubsub_broadcast_from("realtime-dev", self(), @topic, message, Phoenix.PubSub) + TenantBroadcaster.pubsub_broadcast_from("realtime-dev", self(), @topic, message, Phoenix.PubSub) - assert_receive {:other_process, ^message} + assert_receive {:other_process, ^message} - # Remote node received the broadcast - assert_receive {:relay, ^node, ^message} + # Remote node received the broadcast + assert_receive {:relay, ^node, ^message} - assert_receive { - :telemetry, - [:realtime, :tenants, :payload, :size], - %{size: 114}, - %{tenant: "realtime-dev"} - } + assert_receive { + :telemetry, + [:realtime, :tenants, :payload, :size], + %{size: 114}, + %{tenant: "realtime-dev"} + } - # This process does not receive the message - refute_receive _any + # This process does not receive the message + refute_receive _any + end end end diff --git a/test/support/containers.ex b/test/support/containers.ex index cd66f2699..bc49fa275 100644 --- a/test/support/containers.ex +++ b/test/support/containers.ex @@ -267,7 +267,13 @@ defmodule Containers do @image, "postgres", "-c", - "config_file=/etc/postgresql/postgresql.conf" + "config_file=/etc/postgresql/postgresql.conf", + "-c", + "wal_keep_size=32MB", + "-c", + "max_wal_size=32MB", + "-c", + "max_slot_wal_keep_size=32MB" ]) end end