Skip to content

Commit

Permalink
chore(electric): Limit client connections when the sync service's Pos…
Browse files Browse the repository at this point in the history
…tgres connection is down (#946)

Fixes VAX-1290.

This change also addresses the problem where a request sent to
`/api/status` could time out if the Postgres Manager process was itself
blocked on an IO call at the time.

---------

Co-authored-by: Garry Hill <garry@electric-sql.com>
  • Loading branch information
alco and magnetised committed Mar 13, 2024
1 parent 1f64c60 commit 452361d
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 39 deletions.
5 changes: 5 additions & 0 deletions .changeset/dry-pillows-confess.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/electric": patch
---

Limit client connections when the sync service's Postgres connection is down.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ defmodule Electric.Plug.ProxyWebsocketPlug do
)

{:error, reason} ->
Logger.debug("Client WebSocket connection failed with reason: #{reason}")
Logger.warning("Client WebSocket connection failed with reason: #{reason}")
send_resp(conn, 400, "Bad request")
end
end
Expand Down
29 changes: 22 additions & 7 deletions components/electric/lib/electric/plug/satellite_websocket_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ defmodule Electric.Plug.SatelliteWebsocketPlug do

import Plug.Conn

alias Electric.Replication.{InitialSync, PostgresConnector}

require Logger

@protocol_prefix "electric."
Expand All @@ -14,19 +16,18 @@ defmodule Electric.Plug.SatelliteWebsocketPlug do
base_opts
|> Keyword.put(:client_version, client_version)
|> Keyword.put_new_lazy(:auth_provider, fn -> Electric.Satellite.Auth.provider() end)
|> Keyword.put_new_lazy(:connector_config, fn ->
Electric.Replication.PostgresConnector.connector_config()
end)
|> Keyword.put_new_lazy(:connector_config, fn -> PostgresConnector.connector_config() end)
|> Keyword.put_new_lazy(:subscription_data_fun, fn ->
&Electric.Replication.InitialSync.query_subscription_data/2
&InitialSync.query_subscription_data/2
end)

@currently_supported_versions ">= 0.6.0 and <= #{%{Electric.vsn() | pre: []}}"

def call(conn, handler_opts) do
with :ok <- check_if_valid_upgrade(conn),
{:ok, conn} <- check_if_subprotocol_present(conn),
{:ok, conn} <- check_if_vsn_compatible(conn, with: @currently_supported_versions) do
{:ok, conn} <- check_if_vsn_compatible(conn, with: @currently_supported_versions),
:ok <- check_if_postgres_is_ready() do
Logger.metadata(
remote_ip: conn.remote_ip |> :inet.ntoa() |> to_string(),
instance_id: Electric.instance_id()
Expand Down Expand Up @@ -62,7 +63,7 @@ defmodule Electric.Plug.SatelliteWebsocketPlug do

:error ->
reason = "Missing satellite websocket subprotocol"
Logger.debug("Client WebSocket connection failed with reason: #{reason}")
Logger.warning("Client WebSocket connection failed with reason: #{reason}")
{:error, 400, reason}
end
end
Expand All @@ -74,11 +75,25 @@ defmodule Electric.Plug.SatelliteWebsocketPlug do
reason =
"Cannot connect satellite version #{assigns.satellite_vsn}: this server requires #{requirements}"

Logger.debug("Client WebSocket connection failed with reason: #{reason}")
Logger.warning("Client WebSocket connection failed with reason: #{reason}")
{:error, 400, reason}
end
end

if Mix.env() == :test do
defp check_if_postgres_is_ready, do: :ok
else
defp check_if_postgres_is_ready do
PostgresConnector.connector_config()
|> Electric.Replication.Connectors.origin()
|> Electric.Replication.PostgresConnectorMng.status()
|> case do
:ready -> :ok
other -> {:error, 503, "Postgres connection is not ready: #{other}..."}
end
end
end

defp get_satellite_subprotocol(%Plug.Conn{} = conn) do
get_req_header(conn, "sec-websocket-protocol")
|> Enum.filter(&String.starts_with?(&1, @protocol_prefix))
Expand Down
73 changes: 42 additions & 31 deletions components/electric/lib/electric/replication/postgres_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ defmodule Electric.Replication.PostgresConnectorMng do
}
end

@status_key :status
@connector_config_key :connector_config

@spec start_link(Connectors.config()) :: {:ok, pid} | :ignore | {:error, term}
def start_link(connector_config) do
GenServer.start_link(__MODULE__, connector_config, [])
Expand All @@ -63,12 +66,15 @@ defmodule Electric.Replication.PostgresConnectorMng do

@spec status(Connectors.origin()) :: State.status()
def status(origin) do
GenServer.call(name(origin), :status)
case :ets.lookup(ets_table_name(origin), @status_key) do
[{@status_key, status}] -> status
[] -> :initialization
end
end

@spec connector_config(Connectors.origin()) :: Connectors.config()
def connector_config(origin) do
GenServer.call(name(origin), :connector_config)
:ets.lookup_element(ets_table_name(origin), @connector_config_key, 2)
end

@impl GenServer
Expand All @@ -80,34 +86,54 @@ defmodule Electric.Replication.PostgresConnectorMng do
Logger.metadata(origin: origin)
Process.flag(:trap_exit, true)

connector_config = preflight_connector_config(connector_config)
# Use an ETS table to store data that are regularly looked up by other processes.
:ets.new(ets_table_name(origin), [:protected, :named_table, read_concurrency: true])

state =
reset_state(%State{
origin: origin,
connector_config: connector_config,
conn_opts: Connectors.get_connection_opts(connector_config),
repl_opts: Connectors.get_replication_opts(connector_config),
write_to_pg_mode: Connectors.write_to_pg_mode(connector_config)
})
%State{origin: origin, connector_config: connector_config}
|> update_connector_config(&preflight_connector_config/1)
|> reset_state()

{:ok, state, {:continue, :init}}
end

defp ets_table_name(origin) do
String.to_atom(inspect(__MODULE__) <> ":" <> origin)
end

defp reset_state(%State{} = state) do
%State{
state
| backoff: {:backoff.init(1000, 10_000), nil},
status: :initialization,
pg_connector_sup_monitor: nil
}
|> set_status(:initialization)
end

defp set_status(state, status) do
:ets.insert(ets_table_name(state.origin), {@status_key, status})
%{state | status: status}
end

defp update_connector_config(state, fun) do
connector_config = fun.(state.connector_config)

:ets.insert(ets_table_name(state.origin), {@connector_config_key, connector_config})

%{
state
| connector_config: connector_config,
conn_opts: Connectors.get_connection_opts(connector_config),
repl_opts: Connectors.get_replication_opts(connector_config),
write_to_pg_mode: Connectors.write_to_pg_mode(connector_config)
}
end

@impl GenServer
def handle_continue(:init, state) do
case initialize_postgres(state) do
:ok ->
state = %State{state | status: :establishing_repl_conn}
state = set_status(state, :establishing_repl_conn)
{:noreply, state, {:continue, :establish_repl_conn}}

{:error, {:ssl_negotiation_failed, _}} when state.conn_opts.ssl != :required ->
Expand Down Expand Up @@ -137,7 +163,7 @@ defmodule Electric.Replication.PostgresConnectorMng do
Logger.info("Successfully initialized Postgres connector #{inspect(state.origin)}.")

ref = Process.monitor(sup_pid)
state = %State{state | status: :subscribing, pg_connector_sup_monitor: ref}
state = %State{state | pg_connector_sup_monitor: ref} |> set_status(:subscribing)
{:noreply, state, {:continue, :subscribe}}

:error ->
Expand All @@ -147,23 +173,14 @@ defmodule Electric.Replication.PostgresConnectorMng do

def handle_continue(:subscribe, %State{write_to_pg_mode: :logical_replication} = state) do
case start_subscription(state) do
:ok -> {:noreply, %State{state | status: :ready}}
:ok -> {:noreply, set_status(state, :ready)}
{:error, _} -> {:noreply, schedule_retry(:subscribe, state)}
end
end

def handle_continue(:subscribe, %State{write_to_pg_mode: :direct_writes} = state) do
:ok = stop_subscription(state)
{:noreply, %State{state | status: :ready}}
end

@impl GenServer
def handle_call(:status, _from, state) do
{:reply, state.status, state}
end

def handle_call(:connector_config, _from, state) do
{:reply, state.connector_config, state}
{:noreply, set_status(state, :ready)}
end

@impl GenServer
Expand Down Expand Up @@ -352,13 +369,7 @@ defmodule Electric.Replication.PostgresConnectorMng do
"Falling back to trying an unencrypted connection to Postgres, since DATABASE_REQUIRE_SSL=false."
)

connector_config = put_in(state.connector_config, [:connection, :ssl], false)

%State{
state
| connector_config: connector_config,
conn_opts: Connectors.get_connection_opts(connector_config)
}
update_connector_config(state, &put_in(&1, [:connection, :ssl], false))
end

defp extra_error_description(:invalid_authorization_specification) do
Expand Down

0 comments on commit 452361d

Please sign in to comment.