diff --git a/.gitignore b/.gitignore index a96970546..fec4b85ab 100644 --- a/.gitignore +++ b/.gitignore @@ -27,13 +27,9 @@ realtime-*.tar # Ignore Dialyzer .plt /priv/plts/* - node_modules - .supabase - config/prod.secret.exs - demo/.env - .lexical +.vscode \ No newline at end of file diff --git a/config/runtime.exs b/config/runtime.exs index 99d524e54..ac0a2569b 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -1,9 +1,25 @@ import Config -config :logflare_logger_backend, - url: System.get_env("LOGFLARE_LOGGER_BACKEND_URL", "https://api.logflare.app") +defmodule Env do + def get_integer(env, default) do + value = System.get_env(env) + if value, do: String.to_integer(value), else: default + end + + def get_charlist(env, default) do + value = System.get_env(env) + if value, do: String.to_charlist(value), else: default + end + + def get_boolean(env, default) do + value = System.get_env(env) + if value, do: String.to_existing_atom(value), else: default + end +end app_name = System.get_env("APP_NAME", "") + +# Setup Database default_db_host = System.get_env("DB_HOST", "127.0.0.1") username = System.get_env("DB_USER", "postgres") password = System.get_env("DB_PASSWORD", "postgres") @@ -11,62 +27,51 @@ database = System.get_env("DB_NAME", "postgres") port = System.get_env("DB_PORT", "5432") db_version = System.get_env("DB_IP_VERSION") slot_name_suffix = System.get_env("SLOT_NAME_SUFFIX") +db_ssl_enabled? = Env.get_boolean("DB_SSL", false) +db_ssl_ca_cert = System.get_env("DB_SSL_CA_CERT") +queue_target = Env.get_integer("DB_QUEUE_TARGET", 5000) +queue_interval = Env.get_integer("DB_QUEUE_INTERVAL", 5000) +pool_size = Env.get_integer("DB_POOL_SIZE", 5) + +after_connect_query_args = + case System.get_env("DB_AFTER_CONNECT_QUERY") do + nil -> nil + query -> {Postgrex, :query!, [query, []]} + end ssl_opts = - if System.get_env("DB_SSL", "false") == "true" do - if cert = System.get_env("DB_SSL_CA_CERT") do - [cacertfile: cert] - else - [verify: :verify_none] - end - else - false + cond do + db_ssl_enabled? and is_binary(db_ssl_ca_cert) -> [cacertfile: db_ssl_ca_cert] + db_ssl_enabled? -> [verify: :verify_none] + true -> false end -tenant_cache_expiration = - System.get_env("TENANT_CACHE_EXPIRATION_IN_MS", "30000") |> String.to_integer() - -migration_partition_slots = - System.get_env("MIGRATION_PARTITION_SLOTS", "#{System.schedulers_online() * 2}") |> String.to_integer() - -connect_partition_slots = - System.get_env("CONNECT_PARTITION_SLOTS", "#{System.schedulers_online() * 2}") |> String.to_integer() - -# defaults to 30 minutes -metrics_cleaner_schedule_timer_in_ms = - System.get_env("METRICS_CLEANER_SCHEDULE_TIMER_IN_MS", "1800000") |> String.to_integer() - -metrics_rpc_timeout_in_ms = - System.get_env("METRICS_RPC_TIMEOUT_IN_MS", "15000") |> String.to_integer() - -rebalance_check_interval_in_ms = - System.get_env("REBALANCE_CHECK_INTERVAL_IN_MS", to_string(:timer.minutes(10))) |> String.to_integer() - -disconnect_socket_on_no_channels_interval_in_ms = - System.get_env("DISCONNECT_SOCKET_ON_NO_CHANNELS_INTERVAL_IN_MS", "30000") |> String.to_integer() - -tenant_max_bytes_per_second = System.get_env("TENANT_MAX_BYTES_PER_SECOND", "100000") |> String.to_integer() -tenant_max_channels_per_client = System.get_env("TENANT_MAX_CHANNELS_PER_CLIENT", "100") |> String.to_integer() -tenant_max_concurrent_users = System.get_env("TENANT_MAX_CONCURRENT_USERS", "200") |> String.to_integer() -tenant_max_events_per_second = System.get_env("TENANT_MAX_EVENTS_PER_SECOND", "100") |> String.to_integer() -tenant_max_joins_per_second = System.get_env("TENANT_MAX_JOINS_PER_SECOND", "100") |> String.to_integer() -rpc_timeout = System.get_env("RPC_TIMEOUT", "30000") |> String.to_integer() -max_gen_rpc_clients = System.get_env("MAX_GEN_RPC_CLIENTS", "5") |> String.to_integer() -run_janitor? = System.get_env("RUN_JANITOR", "false") == "true" -janitor_schedule_randomize = System.get_env("JANITOR_SCHEDULE_RANDOMIZE", "true") == "true" -janitor_max_children = System.get_env("JANITOR_MAX_CHILDREN", "5") |> String.to_integer() -janitor_chunk_size = System.get_env("JANITOR_CHUNK_SIZE", "10") |> String.to_integer() -# defaults to 10 minutes -janitor_run_after_in_ms = System.get_env("JANITOR_RUN_AFTER_IN_MS", "600000") |> String.to_integer() -# defaults to 5 seconds -janitor_children_timeout = System.get_env("JANITOR_CHILDREN_TIMEOUT", "5000") |> String.to_integer() -# 4 hours by default -janitor_schedule_timer = System.get_env("JANITOR_SCHEDULE_TIMER_IN_MS", "14400000") |> String.to_integer() -# defaults to 10 minutes +tenant_cache_expiration = Env.get_integer("TENANT_CACHE_EXPIRATION_IN_MS", :timer.seconds(30)) +migration_partition_slots = Env.get_integer("MIGRATION_PARTITION_SLOTS", System.schedulers_online() * 2) +connect_partition_slots = Env.get_integer("CONNECT_PARTITION_SLOTS", System.schedulers_online() * 2) +metrics_cleaner_schedule_timer_in_ms = Env.get_integer("METRICS_CLEANER_SCHEDULE_TIMER_IN_MS", :timer.minutes(30)) +metrics_rpc_timeout_in_ms = Env.get_integer("METRICS_RPC_TIMEOUT_IN_MS", :timer.seconds(15)) +rebalance_check_interval_in_ms = Env.get_integer("REBALANCE_CHECK_INTERVAL_IN_MS", :timer.minutes(10)) +tenant_max_bytes_per_second = Env.get_integer("TENANT_MAX_BYTES_PER_SECOND", 100_000) +tenant_max_channels_per_client = Env.get_integer("TENANT_MAX_CHANNELS_PER_CLIENT", 100) +tenant_max_concurrent_users = Env.get_integer("TENANT_MAX_CONCURRENT_USERS", 200) +tenant_max_events_per_second = Env.get_integer("TENANT_MAX_EVENTS_PER_SECOND", 100) +tenant_max_joins_per_second = Env.get_integer("TENANT_MAX_JOINS_PER_SECOND", 100) +rpc_timeout = Env.get_integer("RPC_TIMEOUT", :timer.seconds(30)) +max_gen_rpc_clients = Env.get_integer("MAX_GEN_RPC_CLIENTS", 5) +run_janitor? = Env.get_boolean("RUN_JANITOR", false) +janitor_schedule_randomize = Env.get_boolean("JANITOR_SCHEDULE_RANDOMIZE", true) +janitor_max_children = Env.get_integer("JANITOR_MAX_CHILDREN", 5) +janitor_chunk_size = Env.get_integer("JANITOR_CHUNK_SIZE", 10) +janitor_run_after_in_ms = Env.get_integer("JANITOR_RUN_AFTER_IN_MS", :timer.minutes(10)) +janitor_children_timeout = Env.get_integer("JANITOR_CHILDREN_TIMEOUT", :timer.seconds(5)) +janitor_schedule_timer = Env.get_integer("JANITOR_SCHEDULE_TIMER_IN_MS", :timer.hours(4)) +platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws, else: :fly + no_channel_timeout_in_ms = if config_env() == :test, - do: 3000, - else: System.get_env("NO_CHANNEL_TIMEOUT_IN_MS", "600000") |> String.to_integer() + do: :timer.seconds(3), + else: Env.get_integer("NO_CHANNEL_TIMEOUT_IN_MS", :timer.minutes(10)) if !(db_version in [nil, "ipv6", "ipv4"]), do: raise("Invalid IP version, please set either ipv6 or ipv4") @@ -86,6 +91,20 @@ socket_options = end end +config :realtime, Realtime.Repo, + hostname: default_db_host, + username: username, + password: password, + database: database, + port: port, + pool_size: pool_size, + queue_target: queue_target, + queue_interval: queue_interval, + parameters: [application_name: "supabase_mt_realtime"], + after_connect: after_connect_query_args, + socket_options: socket_options, + ssl: ssl_opts + config :realtime, migration_partition_slots: migration_partition_slots, connect_partition_slots: connect_partition_slots, @@ -98,63 +117,96 @@ config :realtime, metrics_cleaner_schedule_timer_in_ms: metrics_cleaner_schedule_timer_in_ms, metrics_rpc_timeout: metrics_rpc_timeout_in_ms, tenant_cache_expiration: tenant_cache_expiration, - disconnect_socket_on_no_channels_interval_in_ms: disconnect_socket_on_no_channels_interval_in_ms, rpc_timeout: rpc_timeout, max_gen_rpc_clients: max_gen_rpc_clients, - no_channel_timeout_in_ms: no_channel_timeout_in_ms + no_channel_timeout_in_ms: no_channel_timeout_in_ms, + platform: platform -if config_env() == :test || !run_janitor? do - config :realtime, run_janitor: false -else +if config_env() != :test && run_janitor? do config :realtime, - # disabled for now by default - run_janitor: run_janitor?, + run_janitor: true, janitor_schedule_randomize: janitor_schedule_randomize, janitor_max_children: janitor_max_children, janitor_chunk_size: janitor_chunk_size, - # defaults the runner to only start after 10 minutes janitor_run_after_in_ms: janitor_run_after_in_ms, janitor_children_timeout: janitor_children_timeout, - # defaults to 4 hours janitor_schedule_timer: janitor_schedule_timer end -if config_env() == :prod do - secret_key_base = - System.get_env("SECRET_KEY_BASE") || - raise """ - environment variable SECRET_KEY_BASE is missing. - You can generate one by calling: mix phx.gen.secret - """ +default_cluster_strategy = + case config_env() do + :prod -> "POSTGRES" + _ -> "EPMD" + end - if app_name == "" do - raise "APP_NAME not available" +cluster_topologies = + System.get_env("CLUSTER_STRATEGIES", default_cluster_strategy) + |> String.upcase() + |> String.split(",") + |> Enum.reduce([], fn strategy, acc -> + strategy + |> String.trim() + |> then(fn + "DNS" -> + [ + dns: [ + strategy: Cluster.Strategy.DNSPoll, + config: [polling_interval: 5_000, query: System.get_env("DNS_NODES"), node_basename: app_name] + ] + ] ++ acc + + "POSTGRES" -> + [ + postgres: [ + strategy: LibclusterPostgres.Strategy, + config: [ + hostname: default_db_host, + username: username, + password: password, + database: database, + port: port, + parameters: [application_name: "cluster_node_#{node()}"], + socket_options: socket_options, + ssl: ssl_opts, + heartbeat_interval: 5_000 + ] + ] + ] ++ acc + + "EPMD" -> + [ + dev: [ + strategy: Cluster.Strategy.Epmd, + config: [hosts: [:"orange@127.0.0.1", :"pink@127.0.0.1"]], + connect: {:net_kernel, :connect_node, []}, + disconnect: {:net_kernel, :disconnect_node, []} + ] + ] ++ acc + + _ -> + acc + end) + end) + +# Setup Logging + +if System.get_env("LOGS_ENGINE") == "logflare" do + config :logflare_logger_backend, url: System.get_env("LOGFLARE_LOGGER_BACKEND_URL", "https://api.logflare.app") + + if !System.get_env("LOGFLARE_API_KEY") or !System.get_env("LOGFLARE_SOURCE_ID") do + raise """ + Environment variable LOGFLARE_API_KEY or LOGFLARE_SOURCE_ID is missing. + Check those variables or choose another LOGS_ENGINE. + """ end - config :realtime, RealtimeWeb.Endpoint, - server: true, - url: [host: "#{app_name}.fly.dev", port: 80], - http: [ - compress: true, - port: String.to_integer(System.get_env("PORT") || "4000"), - protocol_options: [ - max_header_value_length: String.to_integer(System.get_env("MAX_HEADER_LENGTH") || "4096") - ], - transport_options: [ - # max_connection is per connection supervisor - # num_conns_sups defaults to num_acceptors - # total conns accepted here is max_connections * num_acceptors - # ref: https://ninenines.eu/docs/en/ranch/2.0/manual/ranch/ - max_connections: String.to_integer(System.get_env("MAX_CONNECTIONS") || "1000"), - num_acceptors: String.to_integer(System.get_env("NUM_ACCEPTORS") || "100"), - # IMPORTANT: support IPv6 addresses - socket_opts: [:inet6] - ] - ], - check_origin: false, - secret_key_base: secret_key_base + config :logger, + sync_threshold: 6_000, + discard_threshold: 6_000, + backends: [LogflareLogger.HttpBackend] end +# Setup production and development environments if config_env() != :test do gen_rpc_socket_ip = System.get_env("GEN_RPC_SOCKET_IP", "0.0.0.0") |> to_charlist() @@ -210,8 +262,6 @@ if config_env() != :test do config :logger, level: System.get_env("LOG_LEVEL", "info") |> String.to_existing_atom() - platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws, else: :fly - config :realtime, request_id_baggage_key: System.get_env("REQUEST_ID_BAGGAGE_KEY", "request-id"), jwt_claim_validators: System.get_env("JWT_CLAIM_VALIDATORS", "{}"), @@ -221,34 +271,36 @@ if config_env() != :test do metrics_jwt_secret: System.get_env("METRICS_JWT_SECRET"), db_enc_key: System.get_env("DB_ENC_KEY"), region: System.get_env("REGION"), - prom_poll_rate: System.get_env("PROM_POLL_RATE", "5000") |> String.to_integer(), - platform: platform, + prom_poll_rate: Env.get_integer("PROM_POLL_RATE", 5000), slot_name_suffix: slot_name_suffix +end - queue_target = System.get_env("DB_QUEUE_TARGET", "5000") |> String.to_integer() - queue_interval = System.get_env("DB_QUEUE_INTERVAL", "5000") |> String.to_integer() +# Setup Production - after_connect_query_args = - case System.get_env("DB_AFTER_CONNECT_QUERY") do - nil -> nil - query -> {Postgrex, :query!, [query, []]} - end +if config_env() == :prod do + config :libcluster, debug: false, topologies: cluster_topologies + secret_key_base = System.fetch_env!("SECRET_KEY_BASE") + if app_name == "", do: raise("APP_NAME not available") - config :realtime, Realtime.Repo, - hostname: default_db_host, - username: username, - password: password, - database: database, - port: port, - pool_size: System.get_env("DB_POOL_SIZE", "5") |> String.to_integer(), - queue_target: queue_target, - queue_interval: queue_interval, - parameters: [ - application_name: "supabase_mt_realtime" + config :realtime, RealtimeWeb.Endpoint, + server: true, + url: [host: "#{app_name}.supabase.co", port: 443], + http: [ + compress: true, + port: Env.get_integer("PORT", 4000), + protocol_options: [ + max_header_value_length: Env.get_integer("MAX_HEADER_LENGTH", 4096) + ], + transport_options: [ + max_connections: Env.get_integer("MAX_CONNECTIONS", 1000), + num_acceptors: Env.get_integer("NUM_ACCEPTORS", 100), + socket_opts: [:inet6] + ] ], - after_connect: after_connect_query_args, - socket_options: socket_options, - ssl: ssl_opts + check_origin: false, + secret_key_base: secret_key_base + + alias Realtime.Repo.Replica replica_repos = %{ Realtime.Repo.Replica.FRA => System.get_env("DB_HOST_REPLICA_FRA", default_db_host), @@ -281,87 +333,3 @@ if config_env() != :test do ssl: ssl_opts end end - -default_cluster_strategy = - config_env() - |> case do - :prod -> "DNS" - _ -> "EPMD" - end - -cluster_topologies = - System.get_env("CLUSTER_STRATEGIES", default_cluster_strategy) - |> String.upcase() - |> String.split(",") - |> Enum.reduce([], fn strategy, acc -> - strategy - |> String.trim() - |> case do - "DNS" -> - [ - fly6pn: [ - strategy: Cluster.Strategy.DNSPoll, - config: [ - polling_interval: 5_000, - query: System.get_env("DNS_NODES"), - node_basename: app_name - ] - ] - ] ++ acc - - "POSTGRES" -> - [ - postgres: [ - strategy: LibclusterPostgres.Strategy, - config: [ - hostname: default_db_host, - username: username, - password: password, - database: database, - port: port, - parameters: [ - application_name: "cluster_node_#{node()}" - ], - socket_options: socket_options, - ssl: ssl_opts, - heartbeat_interval: 5_000 - ] - ] - ] ++ acc - - "EPMD" -> - [ - dev: [ - strategy: Cluster.Strategy.Epmd, - config: [ - hosts: [:"orange@127.0.0.1", :"pink@127.0.0.1"] - ], - connect: {:net_kernel, :connect_node, []}, - disconnect: {:net_kernel, :disconnect_node, []} - ] - ] ++ acc - - _ -> - acc - end - end) - -if config_env() == :prod do - config :libcluster, - debug: false, - topologies: cluster_topologies -end - -if System.get_env("LOGS_ENGINE") == "logflare" do - if !System.get_env("LOGFLARE_API_KEY") or !System.get_env("LOGFLARE_SOURCE_ID") do - raise """ - Environment variable LOGFLARE_API_KEY or LOGFLARE_SOURCE_ID is missing. - Check those variables or choose another LOGS_ENGINE. - """ - end - - config :logger, - sync_threshold: 6_000, - discard_threshold: 6_000, - backends: [LogflareLogger.HttpBackend] -end diff --git a/lib/realtime/application.ex b/lib/realtime/application.ex index ff7dea923..0f4c9ae50 100644 --- a/lib/realtime/application.ex +++ b/lib/realtime/application.ex @@ -127,19 +127,18 @@ defmodule Realtime.Application do end defp janitor_tasks do - if Application.fetch_env!(:realtime, :run_janitor) do - janitor_max_children = - Application.get_env(:realtime, :janitor_max_children) - - janitor_children_timeout = - Application.get_env(:realtime, :janitor_children_timeout) + if Application.get_env(:realtime, :run_janitor) do + janitor_max_children = Application.get_env(:realtime, :janitor_max_children) + janitor_children_timeout = Application.get_env(:realtime, :janitor_children_timeout) [ - {Task.Supervisor, - name: Realtime.Tenants.Janitor.TaskSupervisor, - max_children: janitor_max_children, - max_seconds: janitor_children_timeout, - max_restarts: 1}, + { + Task.Supervisor, + name: Realtime.Tenants.Janitor.TaskSupervisor, + max_children: janitor_max_children, + max_seconds: janitor_children_timeout, + max_restarts: 1 + }, Realtime.Tenants.Janitor, Realtime.MetricsCleaner ] diff --git a/lib/realtime/nodes.ex b/lib/realtime/nodes.ex index a2a11370c..ae237eb5f 100644 --- a/lib/realtime/nodes.ex +++ b/lib/realtime/nodes.ex @@ -27,11 +27,6 @@ defmodule Realtime.Nodes do def platform_region_translator(nil), do: nil def platform_region_translator(tenant_region) when is_binary(tenant_region) do - platform = Application.get_env(:realtime, :platform) - region_mapping(platform, tenant_region) - end - - defp region_mapping(:aws, tenant_region) do case tenant_region do "ap-east-1" -> "ap-southeast-1" "ap-northeast-1" -> "ap-southeast-1" @@ -55,28 +50,6 @@ defmodule Realtime.Nodes do end end - defp region_mapping(:fly, tenant_region) do - case tenant_region do - "us-east-1" -> "iad" - "us-west-1" -> "sea" - "sa-east-1" -> "iad" - "ca-central-1" -> "iad" - "ap-southeast-1" -> "syd" - "ap-northeast-1" -> "syd" - "ap-northeast-2" -> "syd" - "ap-southeast-2" -> "syd" - "ap-east-1" -> "syd" - "ap-south-1" -> "syd" - "eu-west-1" -> "lhr" - "eu-west-2" -> "lhr" - "eu-west-3" -> "lhr" - "eu-central-1" -> "lhr" - _ -> nil - end - end - - defp region_mapping(_, tenant_region), do: tenant_region - @doc """ Lists the nodes in a region. Sorts by node name in case the list order is unstable. diff --git a/mix.exs b/mix.exs index a9f47990d..13ffe985a 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.46.1", + version: "2.46.3", elixir: "~> 1.17.3", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/run.sh b/run.sh index 2dddbc1b8..66585dc2b 100755 --- a/run.sh +++ b/run.sh @@ -90,7 +90,7 @@ if [ "${ENABLE_ERL_CRASH_DUMP:-false}" = true ]; then trap upload_crash_dump_to_s3 INT TERM KILL EXIT fi -if [[ -n "${GENERATE_CLUSTER_CERTS}" ]] ; then +if [[ -n "${GENERATE_CLUSTER_CERTS:-}" ]] ; then generate_certs fi diff --git a/test/realtime/monitoring/distributed_metrics_test.exs b/test/realtime/monitoring/distributed_metrics_test.exs index a1cf89777..491083973 100644 --- a/test/realtime/monitoring/distributed_metrics_test.exs +++ b/test/realtime/monitoring/distributed_metrics_test.exs @@ -32,28 +32,5 @@ defmodule Realtime.DistributedMetricsTest do } } = DistributedMetrics.info() end - - test "metric matches on both sides", %{node: node} do - # We need to generate some data first - Realtime.Rpc.call(node, String, :to_integer, ["25"], key: 1) - Realtime.Rpc.call(node, String, :to_integer, ["25"], key: 2) - - local_metrics = DistributedMetrics.info()[node][:inet_stats] - # Use gen_rpc to not use erl dist and change the result - remote_metrics = :gen_rpc.call(node, DistributedMetrics, :info, [])[node()][:inet_stats] - - # It's not going to 100% the same because erl dist sends pings and other things out of our control - - assert local_metrics[:connections] == remote_metrics[:connections] - - assert_in_delta(local_metrics[:send_avg], remote_metrics[:recv_avg], 5) - assert_in_delta(local_metrics[:recv_avg], remote_metrics[:send_avg], 5) - - assert_in_delta(local_metrics[:send_oct], remote_metrics[:recv_oct], 5) - assert_in_delta(local_metrics[:recv_oct], remote_metrics[:send_oct], 5) - - assert_in_delta(local_metrics[:send_max], remote_metrics[:recv_max], 5) - assert_in_delta(local_metrics[:recv_max], remote_metrics[:send_max], 5) - end end end diff --git a/test/realtime/rate_counter/rate_counter_test.exs b/test/realtime/rate_counter/rate_counter_test.exs index c4a01bd74..6d3f57401 100644 --- a/test/realtime/rate_counter/rate_counter_test.exs +++ b/test/realtime/rate_counter/rate_counter_test.exs @@ -227,11 +227,12 @@ defmodule Realtime.RateCounterTest do log = capture_log(fn -> - GenCounter.add(args.id, 50) + GenCounter.add(args.id, 100) Process.sleep(100) end) - assert {:ok, %RateCounter{sum: 50, limit: %{triggered: true}}} = RateCounter.get(args) + assert {:ok, %RateCounter{sum: sum, limit: %{triggered: true}}} = RateCounter.get(args) + assert sum > 49 assert log =~ "project=tenant123 external_id=tenant123 [error] ErrorMessage: Reason" # Only one log message should be emitted