Skip to content
Merged
5 changes: 5 additions & 0 deletions .changeset/slot-creation-self-heal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Automatically recover sources whose replication slot creation is stuck waiting on a pending transaction, by periodically calling `pg_log_standby_snapshot()`. Previously such sources required a manual restart. When the function is unavailable (PostgreSQL < 14 or missing EXECUTE privilege), Electric falls back to the previous behavior and emits a one-time notice.
66 changes: 66 additions & 0 deletions integration-tests/tests/self-heal-stuck-slot-creation.lux
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
[doc Verify that Electric automatically unblocks a stuck replication slot creation by forcing a standby snapshot, recovering without a manual restart once the blocking transaction commits, even on an otherwise idle database]

[include _macros.luxinc]

[global pg_container_name=self-heal-stuck-slot__pg]

###

## Start a new Postgres cluster
[invoke setup_pg]

## Open a transaction that holds an xid, blocking logical slot creation
[invoke start_psql]
[shell psql]
!BEGIN;
??BEGIN

!SELECT pg_current_xact_id();
??pg_current_xact_id
??(1 row)

## Start Electric; it should block on slot creation
[invoke setup_electric_shell_with_tenant "electric_1" "3000"]

[shell electric_1]
# Prove replication does NOT start while the transaction is open.
-Starting replication from postgres|$fail_pattern

??[debug] ReplicationClient step: create_slot

# This warning is only logged on the first configuration-check tick (~10s), so
# give the match enough headroom to avoid a risky-timer warning.
[timeout 30]
??[warning] Waiting for the replication connection setup to complete...

## Confirm the service reports "starting" (genuinely blocked, not yet active).
## Poll rather than sleep+curl so a slow HTTP startup doesn't make this flaky.
[shell client]
[invoke wait-for "curl -X GET http://localhost:3000/v1/health?database_id=integration_test_tenant" "\{\"status\":\"starting\"\}" 10 $PS1]

## Commit the blocking transaction and then keep the database completely idle.
## On a quiet database Postgres would not emit an XLOG_RUNNING_XACTS record for a
## long time, so without intervention slot creation would stay stuck. Electric
## must force a standby snapshot to recover promptly.
[shell psql]
!COMMIT;
??COMMIT

## Verify Electric forced a standby snapshot and then started replicating.
## The "Forced a standby snapshot" line only appears if the self-heal path ran,
## so it is the proof of this feature (independent of organic recovery timing).
[shell electric_1]
# Reset the failure pattern before the next match.
-$fail_pattern

# Allow time for the periodic unblock attempt (production check interval is 10s).
[timeout 30]
??[debug] Forced a standby snapshot to unblock replication slot creation
??[notice] Starting replication from postgres

## The source becomes healthy and active.
[shell client]
[invoke wait-for "curl -X GET http://localhost:3000/v1/health?database_id=integration_test_tenant" "\{\"status\":\"active\"\}" 10 $PS1]

[cleanup]
[invoke teardown]
155 changes: 135 additions & 20 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ defmodule Electric.Connection.Manager do
# PIDs of the database connection pools
pool_pids: %{admin: nil, snapshot: nil},
validated_connection_opts: %{replication: nil, pool: nil},
drop_slot_requested: false
drop_slot_requested: false,
connection_status_check_interval: nil,
slot_unblock_notice_sent: false
]
end

Expand All @@ -131,6 +133,9 @@ defmodule Electric.Connection.Manager do

require Logger

defguardp admin_pool_available(state)
when not is_nil(state.pool_pids.admin) and is_pid(elem(state.pool_pids.admin, 0))

@type status :: :waiting | :starting | :active

@type option ::
Expand All @@ -145,6 +150,9 @@ defmodule Electric.Connection.Manager do

@connection_status_check_interval 10_000

# pg_log_standby_snapshot() is available from PostgreSQL 14.
@force_standby_snapshot_min_pg_version 140_000

# Time after establishing replication connection before we consider it successful
# from a retrying perspective, to allow for setup errors sent over the stream
# to be received. Any failure within this period will trigger a retry within
Expand Down Expand Up @@ -295,7 +303,9 @@ defmodule Electric.Connection.Manager do
persistent_kv: Keyword.fetch!(opts, :persistent_kv),
manual_table_publishing?: Keyword.get(opts, :manual_table_publishing?, false),
max_shapes: Keyword.fetch!(opts, :max_shapes),
lock_breaker_guard: Keyword.get(opts, :lock_breaker_guard)
lock_breaker_guard: Keyword.get(opts, :lock_breaker_guard),
connection_status_check_interval:
Keyword.get(opts, :connection_status_check_interval, @connection_status_check_interval)
}
|> initialize_connection_opts(opts)

Expand Down Expand Up @@ -363,6 +373,7 @@ defmodule Electric.Connection.Manager do
state
| replication_client_pid: pid,
replication_configuration_blocked_by_pending_transaction: false,
slot_unblock_notice_sent: false,
current_step: {:start_replication_client, :connecting}
}

Expand Down Expand Up @@ -549,10 +560,9 @@ defmodule Electric.Connection.Manager do
if lock_breaker_allowed and
state.current_step == {:start_replication_client, :acquiring_lock} and
not is_nil(pg_backend_pid) do
pool = pool_name(state.stack_id, :admin)
lock_name = Keyword.fetch!(state.replication_opts, :slot_name)
database = replication_connection_opts(state)[:database]
execute_lock_breaker_query(pool, lock_name, pg_backend_pid, database)
execute_lock_breaker_query(admin_pool(state.stack_id), lock_name, pg_backend_pid, database)
else
if not lock_breaker_allowed do
Logger.notice(
Expand All @@ -564,6 +574,21 @@ defmodule Electric.Connection.Manager do
{:noreply, state}
end

def handle_continue(
:unblock_slot_creation,
%State{current_step: {:start_replication_client, :configuring_connection}} = state
) do
case execute_force_standby_snapshot_query(state) do
:ok -> {:noreply, state}
{:unavailable, reason} -> {:noreply, notify_slot_unblock_unavailable(state, reason)}
end
end

# Defensive no-op: the only dispatcher of this continue is gated on the
# :configuring_connection step, but keep crash-safety if a future caller ever
# dispatches it from a different step.
def handle_continue(:unblock_slot_creation, state), do: {:noreply, state}

@impl true
def handle_info(
{:timeout, tref, {:check_status, :replication_lock}},
Expand All @@ -574,7 +599,13 @@ defmodule Electric.Connection.Manager do
} = state
) do
Logger.warning(fn -> "Waiting for postgres lock to be acquired..." end)
tref = schedule_periodic_connection_status_check(:replication_lock)

tref =
schedule_periodic_connection_status_check(
:replication_lock,
state.connection_status_check_interval
)

state = %{state | replication_lock_timer: tref}
{:noreply, state, {:continue, :check_lock_not_abandoned}}
end
Expand All @@ -598,15 +629,19 @@ defmodule Electric.Connection.Manager do
dispatch_stack_event(:replication_slot_creation_blocked_by_pending_transactions, state)
end

tref = schedule_periodic_connection_status_check(:replication_configuration)
tref =
schedule_periodic_connection_status_check(
:replication_configuration,
state.connection_status_check_interval
)

state = %{
state
| replication_configuration_timer: tref,
replication_configuration_blocked_by_pending_transaction: true
}

{:noreply, state}
{:noreply, state, {:continue, :unblock_slot_creation}}
end

def handle_info({:timeout, tref, {:check_status, _}}, state) do
Expand Down Expand Up @@ -690,7 +725,12 @@ defmodule Electric.Connection.Manager do
) do
dispatch_stack_event(:waiting_for_connection_lock, state)
state = mark_connection_succeeded(state)
tref = schedule_periodic_connection_status_check(:replication_lock)

tref =
schedule_periodic_connection_status_check(
:replication_lock,
state.connection_status_check_interval
)

state = %{
state
Expand Down Expand Up @@ -728,7 +768,12 @@ defmodule Electric.Connection.Manager do
) do
Electric.StatusMonitor.mark_pg_lock_acquired(state.stack_id, state.replication_client_pid)
dispatch_stack_event(:connection_lock_acquired, state)
tref = schedule_periodic_connection_status_check(:replication_configuration)

tref =
schedule_periodic_connection_status_check(
:replication_configuration,
state.connection_status_check_interval
)

# Refresh shape metadata now that the lock is acquired and storage is
# entirely within this service's control
Expand Down Expand Up @@ -1130,6 +1175,71 @@ defmodule Electric.Connection.Manager do

defp pooled_connection_opts(state), do: state.connection_opts

# Forces Postgres to emit an XLOG_RUNNING_XACTS record so that a logical slot
# creation that is waiting for a consistent snapshot can proceed as soon as the
# blocking transaction(s) have ended.
defp execute_force_standby_snapshot_query(state)
when not is_integer(state.pg_version) or
state.pg_version < @force_standby_snapshot_min_pg_version,
do: {:unavailable, :pg_version_too_old}

defp execute_force_standby_snapshot_query(state) when admin_pool_available(state) do
case Postgrex.query(admin_pool(state.stack_id), "SELECT pg_log_standby_snapshot()", [],
timeout: 5_000
) do
{:ok, _result} ->
Logger.debug("Forced a standby snapshot to unblock replication slot creation")
:ok

{:error, %Postgrex.Error{postgres: %{code: code}}}
when code in [:insufficient_privilege, :undefined_function] ->
{:unavailable, code}

{:error, error} ->
Logger.warning("Failed to force a standby snapshot: #{inspect(error)}")
:ok
end
catch
# Guard against potential named process non-existence, checkout timeout or process crashes.
kind, error ->
Logger.warning(
"Failed to force a standby snapshot: #{Exception.format(kind, error, __STACKTRACE__)}"
)

:ok
end

defp execute_force_standby_snapshot_query(_state), do: :ok

defp notify_slot_unblock_unavailable(%State{slot_unblock_notice_sent: true} = state, _reason),
do: state

defp notify_slot_unblock_unavailable(state, reason) do
reason_prose =
case reason do
:insufficient_privilege ->
"due to the missing EXECUTE ON FUNCTION pg_log_standby_snapshot() privilege"

:undefined_function ->
"because pg_log_standby_snapshot() is undefined"

:pg_version_too_old ->
"because pg_log_standby_snapshot() is not available in the current PG version"
end

user_message =
"Replication slot creation is blocked by a pending transaction. Electric might not be able to " <>
"automatically unblock it as soon as the transaction commits/rolls back #{reason_prose}. " <>
"Manually restarting the source may be needed once the offending transaction is no longer active."

# Also log it: stack events are only seen by external subscribers (e.g. the cloud
# control plane), so a self-hosted operator watching logs needs this signal too.
# Gated once via slot_unblock_notice_sent (see the head clause above).
Logger.warning(user_message)
dispatch_stack_event({:replication_slot_unblock_unavailable, %{message: user_message}}, state)
%{state | slot_unblock_notice_sent: true}
end

defp execute_lock_breaker_query(pool, lock_name, pg_backend_pid, database) do
query = lock_breaker_query(lock_name, pg_backend_pid, database)

Expand Down Expand Up @@ -1214,9 +1324,6 @@ defmodule Electric.Connection.Manager do
end
end

defguardp admin_pool_available(state)
when not is_nil(state.pool_pids.admin) and is_pid(elem(state.pool_pids.admin, 0))

defp drop_publication(state) when not admin_pool_available(state) do
Logger.warning("Skipping publication drop, pool connection not available")
end
Expand All @@ -1226,22 +1333,27 @@ defmodule Electric.Connection.Manager do
end

defp drop_publication(state) do
pool = pool_name(state.stack_id, :admin)
publication_name = Keyword.fetch!(state.replication_opts, :publication_name)
execute_and_log_errors(pool, "DROP PUBLICATION IF EXISTS #{publication_name}")

execute_and_log_errors(
admin_pool(state.stack_id),
"DROP PUBLICATION IF EXISTS #{publication_name}"
)
end

defp drop_slot(state) when not admin_pool_available(state) do
Logger.warning("Skipping slot drop, pool connection not available")
end

defp drop_slot(state) do
pool = pool_name(state.stack_id, :admin)
slot_name = Keyword.fetch!(state.replication_opts, :slot_name)
slot_temporary? = Keyword.fetch!(state.replication_opts, :slot_temporary?)

if not slot_temporary? do
execute_and_log_errors(pool, "SELECT pg_drop_replication_slot('#{slot_name}');")
execute_and_log_errors(
admin_pool(state.stack_id),
"SELECT pg_drop_replication_slot('#{slot_name}');"
)
end
end

Expand All @@ -1256,8 +1368,11 @@ defmodule Electric.Connection.Manager do
end

defp kill_replication_backend(%State{replication_pg_backend_pid: pg_backend_pid} = state) do
pool = pool_name(state.stack_id, :admin)
execute_and_log_errors(pool, "SELECT pg_terminate_backend(#{pg_backend_pid});")
execute_and_log_errors(
admin_pool(state.stack_id),
"SELECT pg_terminate_backend(#{pg_backend_pid});"
)

%{state | replication_pg_backend_pid: nil}
end

Expand All @@ -1279,8 +1394,8 @@ defmodule Electric.Connection.Manager do
)
end

defp schedule_periodic_connection_status_check(type) do
:erlang.start_timer(@connection_status_check_interval, self(), {:check_status, type})
defp schedule_periodic_connection_status_check(type, interval) do
:erlang.start_timer(interval, self(), {:check_status, type})
end

# It's possible that the exit signal received from an exiting process includes a
Expand Down
Loading
Loading