Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ You can add your own by making a `POST` request to the server. You must change b
"region": "us-west-1",
"poll_interval_ms": 100,
"poll_max_record_bytes": 1048576,
"ssl_enforced": false
"ssl_enforced": false
}
}
]
Expand Down Expand Up @@ -169,6 +169,7 @@ If you're using the default tenant, the URL is `ws://realtime-dev.localhost:4000
| CONNECT_PARTITION_SLOTS | number | Number of dynamic supervisor partitions used by the Connect, ReplicationConnect processes |
| METRICS_CLEANER_SCHEDULE_TIMER_IN_MS | number | Time in ms to run the Metric Cleaner task |
| METRICS_RPC_TIMEOUT_IN_MS | number | Time in ms to wait for RPC call to fetch Metric per node |
| WEBSOCKET_MAX_HEAP_SIZE | number | Max number of bytes to be allocated as heap for the WebSocket transport process. If the limit is reached the process is brutally killed. Defaults to 50MB. |
| REQUEST_ID_BAGGAGE_KEY | string | OTEL Baggage key to be used as request id |
| OTEL_SDK_DISABLED | boolean | Disable OpenTelemetry tracing completely when 'true' |
| OTEL_TRACES_EXPORTER | string | Possible values: `otlp` or `none`. See [https://github.com/open-telemetry/opentelemetry-erlang/tree/v1.4.0/apps#os-environment] for more details on how to configure the traces exporter. |
Expand All @@ -190,6 +191,8 @@ If you're using the default tenant, the URL is `ws://realtime-dev.localhost:4000
| MAX_GEN_RPC_CLIENTS | number | Max amount of `gen_rpc` TCP connections per node-to-node channel |
| REBALANCE_CHECK_INTERVAL_IN_MS | number | Time in ms to check if process is in the right region |
| DISCONNECT_SOCKET_ON_NO_CHANNELS_INTERVAL_IN_MS | number | Time in ms to check if a socket has no channels open and if so, disconnect it |
| BROADCAST_POOL_SIZE | number | Number of processes to relay Phoenix.PubSub messages across the cluster |


The OpenTelemetry variables mentioned above are not an exhaustive list of all [supported environment variables](https://opentelemetry.io/docs/languages/sdk-configuration/).

Expand Down Expand Up @@ -284,6 +287,7 @@ This is the list of operational codes that can help you understand your deployme
| UnknownErrorOnController | An error we are not handling correctly was triggered on a controller |
| UnknownErrorOnChannel | An error we are not handling correctly was triggered on a channel |
| PresenceRateLimitReached | Limit of presence events reached |
| UnableToReplayMessages | An error while replaying messages |

## License

Expand Down
8 changes: 7 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ janitor_run_after_in_ms = Env.get_integer("JANITOR_RUN_AFTER_IN_MS", :timer.minu
janitor_children_timeout = Env.get_integer("JANITOR_CHILDREN_TIMEOUT", :timer.seconds(5))
janitor_schedule_timer = Env.get_integer("JANITOR_SCHEDULE_TIMER_IN_MS", :timer.hours(4))
platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws, else: :fly
broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10)
pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "pg2") |> String.to_atom()
websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize))

no_channel_timeout_in_ms =
if config_env() == :test,
Expand Down Expand Up @@ -106,6 +109,7 @@ config :realtime, Realtime.Repo,
ssl: ssl_opts

config :realtime,
websocket_max_heap_size: websocket_max_heap_size,
migration_partition_slots: migration_partition_slots,
connect_partition_slots: connect_partition_slots,
rebalance_check_interval_in_ms: rebalance_check_interval_in_ms,
Expand All @@ -120,7 +124,9 @@ config :realtime,
rpc_timeout: rpc_timeout,
max_gen_rpc_clients: max_gen_rpc_clients,
no_channel_timeout_in_ms: no_channel_timeout_in_ms,
platform: platform
platform: platform,
pubsub_adapter: pubsub_adapter,
broadcast_pool_size: broadcast_pool_size

if config_env() != :test && run_janitor? do
config :realtime,
Expand Down
2 changes: 1 addition & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ config :logger,
# Configures Elixir's Logger
config :logger, :console,
format: "$time $metadata[$level] $message\n",
metadata: [:request_id, :project, :external_id, :application_name, :sub, :iss, :exp]
metadata: [:error_code, :request_id, :project, :external_id, :application_name, :sub, :iss, :exp]

config :opentelemetry,
span_processor: :simple,
Expand Down
7 changes: 4 additions & 3 deletions lib/realtime/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,10 @@ defmodule Realtime.Api do
|> repo.preload(:extensions)
end

defp list_extensions(type \\ "postgres_cdc_rls") do
from(e in Extensions, where: e.type == ^type, select: e)
|> Repo.all()
def list_extensions(type) do
query = from(e in Extensions, where: e.type == ^type, select: e)

Repo.all(query)
end

def rename_settings_field(from, to) do
Expand Down
4 changes: 3 additions & 1 deletion lib/realtime/api/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ defmodule Realtime.Api.Message do
@primary_key {:id, Ecto.UUID, autogenerate: true}
@schema_prefix "realtime"

@type t :: %__MODULE__{}

schema "messages" do
field(:topic, :string)
field(:extension, Ecto.Enum, values: [:broadcast, :presence])
Expand Down Expand Up @@ -39,7 +41,7 @@ defmodule Realtime.Api.Message do
end

defp maybe_put_timestamp(changeset, field) do
case Map.get(changeset.data, field) do
case get_field(changeset, field) do
nil -> put_timestamp(changeset, field)
_ -> changeset
end
Expand Down
12 changes: 11 additions & 1 deletion lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ defmodule Realtime.Application do
region = Application.get_env(:realtime, :region)
:syn.join(RegionNodes, region, self(), node: node())

broadcast_pool_size = Application.get_env(:realtime, :broadcast_pool_size, 10)
migration_partition_slots = Application.get_env(:realtime, :migration_partition_slots)
connect_partition_slots = Application.get_env(:realtime, :connect_partition_slots)
no_channel_timeout_in_ms = Application.get_env(:realtime, :no_channel_timeout_in_ms)
Expand All @@ -65,7 +66,8 @@ defmodule Realtime.Application do
Realtime.Repo,
RealtimeWeb.Telemetry,
{Cluster.Supervisor, [topologies, [name: Realtime.ClusterSupervisor]]},
{Phoenix.PubSub, name: Realtime.PubSub, pool_size: 10},
{Phoenix.PubSub,
name: Realtime.PubSub, pool_size: 10, adapter: pubsub_adapter(), broadcast_pool_size: broadcast_pool_size},
{Cachex, name: Realtime.RateCounter},
Realtime.Tenants.Cache,
Realtime.RateCounter.DynamicSupervisor,
Expand Down Expand Up @@ -152,4 +154,12 @@ defmodule Realtime.Application do
OpentelemetryPhoenix.setup(adapter: :cowboy2)
OpentelemetryEcto.setup([:realtime, :repo], db_statement: :enabled)
end

defp pubsub_adapter do
if Application.fetch_env!(:realtime, :pubsub_adapter) == :gen_rpc do
Realtime.GenRpcPubSub
else
Phoenix.PubSub.PG2
end
end
end
33 changes: 33 additions & 0 deletions lib/realtime/gen_rpc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,22 @@ defmodule Realtime.GenRpc do

@type result :: any | {:error, :rpc_error, reason :: any}

@doc """
Broadcasts the message `msg` asynchronously to the registered process `name` on the specified `nodes`.

Options:

- `:key` - Optional key to consistently select the same gen_rpc clients to guarantee message order between nodes
"""
@spec abcast([node], atom, any, keyword()) :: :ok
def abcast(nodes, name, msg, opts) when is_list(nodes) and is_atom(name) and is_list(opts) do
key = Keyword.get(opts, :key, nil)
nodes = rpc_nodes(nodes, key)

:gen_rpc.abcast(nodes, name, msg)
:ok
end

@doc """
Fire and forget apply(mod, func, args) on all nodes

Expand Down Expand Up @@ -41,6 +57,23 @@ defmodule Realtime.GenRpc do
@spec call(node, module, atom, list(any), keyword()) :: result
def call(node, mod, func, args, opts)
when is_atom(node) and is_atom(mod) and is_atom(func) and is_list(args) and is_list(opts) do
if node == node() or node in Node.list() do
do_call(node, mod, func, args, opts)
else
tenant_id = Keyword.get(opts, :tenant_id)

log_error(
"ErrorOnRpcCall",
%{target: node, mod: mod, func: func, error: :badnode},
project: tenant_id,
external_id: tenant_id
)

{:error, :rpc_error, :badnode}
end
end

defp do_call(node, mod, func, args, opts) do
timeout = Keyword.get(opts, :timeout, default_rpc_timeout())
tenant_id = Keyword.get(opts, :tenant_id)
key = Keyword.get(opts, :key, nil)
Expand Down
78 changes: 78 additions & 0 deletions lib/realtime/gen_rpc/pub_sub.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
defmodule Realtime.GenRpcPubSub do
@moduledoc """
gen_rpc Phoenix.PubSub adapter
"""

@behaviour Phoenix.PubSub.Adapter
alias Realtime.GenRpc
use Supervisor

@impl true
def node_name(_), do: node()

# Supervisor callbacks

def start_link(opts) do
adapter_name = Keyword.fetch!(opts, :adapter_name)
name = Keyword.fetch!(opts, :name)
pool_size = Keyword.get(opts, :pool_size, 1)
broadcast_pool_size = Keyword.get(opts, :broadcast_pool_size, pool_size)

Supervisor.start_link(__MODULE__, {adapter_name, name, broadcast_pool_size},
name: :"#{name}#{adapter_name}_supervisor"
)
end

@impl true
def init({adapter_name, pubsub, pool_size}) do
workers = for number <- 1..pool_size, do: :"#{pubsub}#{adapter_name}_#{number}"

:persistent_term.put(adapter_name, List.to_tuple(workers))

children =
for worker <- workers do
Supervisor.child_spec({Realtime.GenRpcPubSub.Worker, {pubsub, worker}}, id: worker)
end

Supervisor.init(children, strategy: :one_for_one)
end

defp worker_name(adapter_name, key) do
workers = :persistent_term.get(adapter_name)
elem(workers, :erlang.phash2(key, tuple_size(workers)))
end

@impl true
def broadcast(adapter_name, topic, message, dispatcher) do
worker = worker_name(adapter_name, self())
GenRpc.abcast(Node.list(), worker, forward_to_local(topic, message, dispatcher), key: worker)
end

@impl true
def direct_broadcast(adapter_name, node_name, topic, message, dispatcher) do
worker = worker_name(adapter_name, self())
GenRpc.abcast([node_name], worker, forward_to_local(topic, message, dispatcher), key: worker)
end

defp forward_to_local(topic, message, dispatcher), do: {:ftl, topic, message, dispatcher}
end

defmodule Realtime.GenRpcPubSub.Worker do
@moduledoc false
use GenServer

@doc false
def start_link({pubsub, worker}), do: GenServer.start_link(__MODULE__, pubsub, name: worker)

@impl true
def init(pubsub), do: {:ok, pubsub}

@impl true
def handle_info({:ftl, topic, message, dispatcher}, pubsub) do
Phoenix.PubSub.local_broadcast(pubsub, topic, message, dispatcher)
{:noreply, pubsub}
end

@impl true
def handle_info(_, pubsub), do: {:noreply, pubsub}
end
55 changes: 55 additions & 0 deletions lib/realtime/messages.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,61 @@ defmodule Realtime.Messages do
Handles `realtime.messages` table operations
"""

alias Realtime.Api.Message

import Ecto.Query, only: [from: 2]

@hard_limit 25
@default_timeout 5_000

@doc """
Fetch last `limit ` messages for a given `topic` inserted after `since`

Automatically uses RPC if the database connection is not in the same node

Only allowed for private channels
"""
@spec replay(pid, String.t(), non_neg_integer, non_neg_integer) ::
{:ok, Message.t(), [String.t()]} | {:error, term} | {:error, :rpc_error, term}
def replay(conn, topic, since, limit) when node(conn) == node() and is_integer(since) and is_integer(limit) do
limit = max(min(limit, @hard_limit), 1)

with {:ok, since} <- DateTime.from_unix(since, :millisecond),
{:ok, messages} <- messages(conn, topic, since, limit) do
{:ok, Enum.reverse(messages), MapSet.new(messages, & &1.id)}
else
{:error, :postgrex_exception} -> {:error, :failed_to_replay_messages}
{:error, :invalid_unix_time} -> {:error, :invalid_replay_params}
error -> error
end
end

def replay(conn, topic, since, limit) when is_integer(since) and is_integer(limit) do
Realtime.GenRpc.call(node(conn), __MODULE__, :replay, [conn, topic, since, limit], key: topic)
end

def replay(_, _, _, _), do: {:error, :invalid_replay_params}

defp messages(conn, topic, since, limit) do
since = DateTime.to_naive(since)
# We want to avoid searching partitions in the future as they should be empty
# so we limit to 1 minute in the future to account for any potential drift
now = NaiveDateTime.utc_now() |> NaiveDateTime.add(1, :minute)

query =
from m in Message,
where:
m.topic == ^topic and
m.private == true and
m.extension == :broadcast and
m.inserted_at >= ^since and
m.inserted_at < ^now,
limit: ^limit,
order_by: [desc: m.inserted_at]

Realtime.Repo.all(conn, query, Message, timeout: @default_timeout)
end

@doc """
Deletes messages older than 72 hours for a given tenant connection
"""
Expand Down
34 changes: 31 additions & 3 deletions lib/realtime/monitoring/erl_sys_mon.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ defmodule Realtime.ErlSysMon do
@defaults [
:busy_dist_port,
:busy_port,
{:long_gc, 250},
{:long_schedule, 100},
{:long_gc, 500},
{:long_schedule, 500},
{:long_message_queue, {0, 1_000}}
]

Expand All @@ -24,8 +24,36 @@ defmodule Realtime.ErlSysMon do
{:ok, []}
end

def handle_info({:monitor, pid, _type, _meta} = msg, state) when is_pid(pid) do
log_process_info(msg, pid)
{:noreply, state}
end

def handle_info(msg, state) do
Logger.error("#{__MODULE__} message: " <> inspect(msg))
Logger.warning("#{__MODULE__} message: " <> inspect(msg))
{:noreply, state}
end

defp log_process_info(msg, pid) do
pid_info =
pid
|> Process.info(:dictionary)
|> case do
{:dictionary, dict} when is_list(dict) ->
{List.keyfind(dict, :"$initial_call", 0), List.keyfind(dict, :"$ancestors", 0)}

other ->
other
end

extra_info = Process.info(pid, [:registered_name, :message_queue_len, :total_heap_size])

Logger.warning(
"#{__MODULE__} message: " <>
inspect(msg) <> "|\n process info: #{inspect(pid_info)} #{inspect(extra_info)}"
)
rescue
_ ->
Logger.warning("#{__MODULE__} message: " <> inspect(msg))
end
end
13 changes: 4 additions & 9 deletions lib/realtime/monitoring/prom_ex/plugins/phoenix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,10 @@ if Code.ensure_loaded?(Phoenix) do

def execute_metrics do
active_conn =
case :ets.lookup(:ranch_server, {:listener_sup, HTTP}) do
[] ->
-1

_ ->
HTTP
|> :ranch_server.get_connections_sup()
|> :supervisor.count_children()
|> Keyword.get(:active)
if :ranch.info()[HTTP] do
:ranch.info(HTTP)[:all_connections]
else
-1
end

:telemetry.execute(@event_all_connections, %{active: active_conn}, %{})
Expand Down
Loading
Loading