Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1929c7c
chore(sync-service): add cheap admission control implementation plan
alco May 19, 2026
c8fd3f1
feat(sync-service): add AdmissionControl.try_swap/4
alco May 19, 2026
89ddcd6
fix(sync-service): address review feedback on try_swap/4
alco May 19, 2026
ca85efa
feat(sync-service): cheap Tier 0 admission + post-load_shape reclassi…
alco May 19, 2026
739ae54
fix(sync-service): default max_concurrent_requests in Shapes.Api
alco May 19, 2026
af29589
chore(sync-service): polish admission_kind comment and reclassify fal…
alco May 19, 2026
44d35d2
chore: changeset for cheap admission control
alco May 19, 2026
5be3eb4
Remove the implementation plan file (committed by accident)
alco May 19, 2026
a284dae
revert(sync-service): drop Api.max_concurrent_requests struct default
alco May 19, 2026
3f2f976
Simplify explainer comments in ServeShapePlug
alco May 19, 2026
fe3deac
fix(sync-service): preserve Content-Type on admission 503 responses
alco May 19, 2026
555eb24
fix(sync-service): rework AdmissionControl.try_swap/4 atomicity
alco May 19, 2026
89f5c39
Add missing @spec for ShapeStatus.has_shape_handle?()
alco May 19, 2026
e51b2fb
Clean up admission control code in ServeShapePlug
alco May 19, 2026
989fc00
Remove excessive prose from admission control tests
alco May 20, 2026
fcca98c
perf(sync-service): add read_concurrency to per-request hot-path ETS …
alco May 20, 2026
ed04f5a
feat(sync-service): hold :initial permit across snapshot-pool checkout
alco May 20, 2026
957c5c1
Remove excessively verbose comments
alco May 20, 2026
1b073ca
Dedupe atomic counter ops by reusing local helper functions
alco May 20, 2026
273893f
fixup! Dedupe atomic counter ops by reusing local helper functions
alco May 20, 2026
43c7384
Tighten changeset wording for cheap admission control
alco May 20, 2026
5e98e90
Add @spec for Shapes.get_merged_log_stream/3
alco May 20, 2026
3943d20
Assert admission residency after :initial->:existing swap in router test
alco May 20, 2026
a39f565
Add regression test for SnapshotError-vs-must_refetch 503/409 race
alco May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/cheap-admission-control.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': patch
---

Cheap admission control for shape requests. `:check_admission` now runs at the front of the request pipeline and classifies requests using a single `:ets.member/2` lookup on `?handle=`, removing the SQLite-backed `:resolve_existing_shape` step that previously ran before admission and saturated the read pool under thundering herd (bottleneck 1 of #4266).
48 changes: 48 additions & 0 deletions packages/sync-service/lib/electric/admission_control.ex
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,54 @@ defmodule Electric.AdmissionControl do
:ok
end

@doc """
Move an in-flight permit from `from_kind` to `to_kind` for a stack.

The caller must already hold a `from_kind` permit (acquired via
`try_acquire/3`). Calling `try_swap/4` without an outstanding
`from_kind` permit will silently grant a phantom `to_kind` permit.

## Options

* `:max_concurrent` — required. Cap for `to_kind`.
* `:table_name` — ETS table (default: `:electric_admission_control`).

"""
@spec try_swap(String.t(), atom(), atom(), keyword()) :: :ok | {:error, :overloaded}
def try_swap(stack_id, from_kind, to_kind, opts)
when from_kind in @allowed_kinds and to_kind in @allowed_kinds do
table_name = Keyword.get(opts, :table_name, @table_name)
cap = Keyword.fetch!(opts, :max_concurrent)

# Plain increment, no threshold clamp. Returns the post-increment value.
new_to = incr(table_name, stack_id, to_kind)

if new_to > cap do
# Rollback the claimed permit since the bucket is already at capacity.
decr(table_name, stack_id, to_kind)

:telemetry.execute(
[:electric, :admission_control, :swap_rejected],
%{count: 1, current: new_to, limit: cap},
%{stack_id: stack_id, from: from_kind, to: to_kind}
)

{:error, :overloaded}
else
# Success: drop the from_kind permit. Clamp at 0 to be defensive
# against callers who lied about holding a from_kind permit.
decr(table_name, stack_id, from_kind)

:telemetry.execute(
[:electric, :admission_control, :swap],
%{count: 1, current: new_to, limit: cap},
%{stack_id: stack_id, from: from_kind, to: to_kind}
)

:ok
end
end

@doc """
Get the current number of in-flight requests for a stack.

Expand Down
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric/lsn_tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Electric.LsnTracker do

case :ets.info(table, :id) do
:undefined ->
:ets.new(table, [:public, :named_table])
:ets.new(table, [:public, :named_table, read_concurrency: true])
:ok

ref when is_reference(ref) ->
Expand Down
98 changes: 66 additions & 32 deletions packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ defmodule Electric.Plug.ServeShapePlug do
clause in `call/2` releases it — firing for success, halt, and exception
paths alike.

Admission classification uses only `conn.query_params["handle"]` and a
single ETS membership check: requests with no handle or an unknown handle
are classified as `:initial`; requests with a known handle are `:existing`.
This avoids any SQLite access on the admission-control hot path.

Using `after` (rather than `register_before_send`) is also what makes the
streaming path correct: `before_send` fires when `send_chunked` starts
streaming, not when it finishes, which would end the telemetry span before
Expand All @@ -27,9 +32,11 @@ defmodule Electric.Plug.ServeShapePlug do

use Plug.Builder

alias Electric.Utils
alias Electric.ShapeCache
alias Electric.ShapeCache.ShapeStatus
alias Electric.Shapes.Api
alias Electric.Telemetry.OpenTelemetry
alias Electric.Utils
alias Plug.Conn

require Logger
Expand All @@ -39,13 +46,19 @@ defmodule Electric.Plug.ServeShapePlug do

# These plugs are invoked inside the `call/2` function below, after `conn` has been preloaded with
# query params and an OTEL span.

# put_resp_content_type runs first so admission rejections (503) still
# carry `Content-Type: application/json`.
plug :put_resp_content_type, "application/json"
plug :check_admission
plug :parse_body
plug :validate_request
plug :reject_subquery_shape_compaction_request
plug :resolve_existing_shape
plug :check_admission
plug :load_shape
plug :hold_initial_until_snapshot_started
# Reclassify off :initial so the :initial admission slot becomes available
# for new requests while the current handler streams the response.
plug :reclassify_admission_kind
plug :serve_shape_response

@impl Plug
Expand Down Expand Up @@ -273,32 +286,9 @@ defmodule Electric.Plug.ServeShapePlug do
end
end

# Check if the shape already exists so admission control can classify
# accurately (:initial for new shapes, :existing for known shapes).
#
# Classification is stored in conn.private without touching request.params.handle.
# Mutating the request handle would flip `load_shape` from the no-handle
# `get_or_create_shape_handle` path to the strict-match `resolve_shape_handle`
# path; if the shape were cleaned between the two steps the client would see a
# 409 refetch flow for a handle they never sent.
defp resolve_existing_shape(%Conn{assigns: %{config: config, request: request}} = conn, _) do
stack_id = get_in(config, [:stack_id])

case Electric.Shapes.fetch_handle_by_shape(stack_id, request.params.shape_definition) do
{:ok, _handle} -> put_private(conn, :shape_exists?, true)
:error -> put_private(conn, :shape_exists?, false)
end
rescue
# Narrow rescue by design: guards against the startup race where the shape cache's ETS
# tables haven't been created yet — ETS operations (lookup, insert, whereis) raise
# ArgumentError on a missing table, which can happen when a request arrives before the
# shape subsystem finishes initializing.
ArgumentError -> put_private(conn, :shape_exists?, false)
end

defp check_admission(%Conn{assigns: %{config: config}} = conn, _) do
stack_id = get_in(config, [:stack_id])
kind = admission_kind(conn)
kind = admission_kind(conn, stack_id)
max_concurrent = Map.fetch!(config[:api].max_concurrent_requests, kind)

case Electric.AdmissionControl.try_acquire(stack_id, kind, max_concurrent: max_concurrent) do
Expand Down Expand Up @@ -333,12 +323,19 @@ defmodule Electric.Plug.ServeShapePlug do
end
end

defp admission_kind(conn) do
if conn.private[:shape_exists?] do
:existing
else
:initial
defp admission_kind(conn, stack_id) do
handle = conn.query_params["handle"]

cond do
is_nil(handle) -> :initial
ShapeStatus.has_shape_handle?(stack_id, handle) -> :existing
# Handle is present but unknown locally (stale from another instance,
# or for a shape this node has not yet recovered/created).
true -> :initial
end
rescue
# Per-stack shape_meta_table may not exist yet during startup.
ArgumentError -> :initial
end

defp calculate_retry_after(_stack_id, _max_concurrent) do
Expand All @@ -357,6 +354,43 @@ defmodule Electric.Plug.ServeShapePlug do
end
end

# This keeps the :initial permit held across Snapshotter PG-pool checkout so saturation of
# the snapshot pool turns into admission rejections at the gate rather than an unbounded
# Postgrex queue.
#
# The await outcome is stashed in `request.snapshot_status` and consumed by
# `Shapes.get_merged_log_stream` instead of being awaited again on the streaming path.
defp hold_initial_until_snapshot_started(
%Conn{assigns: %{request: %{handle: handle} = request}} = conn,
_
)
when is_binary(handle) do
case Process.get(@admission_permit_key) do
{stack_id, :initial} ->
result = ShapeCache.await_snapshot_start(handle, stack_id)
assign(conn, :request, %{request | snapshot_status: result})

_ ->
conn
end
end

defp hold_initial_until_snapshot_started(conn, _), do: conn

# Moves the handler out of :initial so the :initial bucket can admit the next
# validate-and-load wave while this request is still streaming the response body
# to the client.
defp reclassify_admission_kind(%Conn{assigns: %{config: config}} = conn, _) do
with {stack_id, :initial} <- Process.get(@admission_permit_key),
max = Map.fetch!(config[:api].max_concurrent_requests, :existing),
:ok <-
Electric.AdmissionControl.try_swap(stack_id, :initial, :existing, max_concurrent: max) do
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it mean if the try_swap fails?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to try_swap fails only if the :existing bucket is already at capacity. An unlikely case in practice since we configure it with a higher value than :initial.

In any case, if the swap fails, the current request keeps holding the :initial permit until it's released in the after clause of the call function.

Process.put(@admission_permit_key, {stack_id, :existing})
end

conn
end

defp load_shape(%Conn{assigns: %{request: request}} = conn, _) do
case Api.load_shape_info(request) do
{:ok, request} ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,12 @@ defmodule Electric.Postgres.Inspector.EtsInspector do
# Name needs to be an atom but we don't want to dynamically create atoms.
# Instead, we will use the reference to the table that is returned by `:ets.new`
pg_inspector_table =
:ets.new(opts.pg_inspector_table, [:named_table, :protected, :ordered_set])
:ets.new(opts.pg_inspector_table, [
:named_table,
:protected,
:ordered_set,
read_concurrency: true
])

persistence_key = "#{opts.stack_id}:ets_inspector_state"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ defmodule Electric.ShapeCache.ShapeStatus do
end)
end

@spec has_shape_handle?(stack_id(), shape_handle()) :: boolean()
def has_shape_handle?(stack_id, shape_handle) do
:ets.member(shape_meta_table(stack_id), shape_handle)
end
Expand Down
56 changes: 42 additions & 14 deletions packages/sync-service/lib/electric/shapes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Electric.Shapes do
alias Electric.ShapeCache.Storage
alias Electric.ShapeCache
alias Electric.ShapeCache.ShapeStatus
alias Electric.Shapes.Api
alias Electric.Shapes.Shape
alias Electric.Telemetry.OpenTelemetry

Expand All @@ -13,29 +14,56 @@ defmodule Electric.Shapes do

@doc """
Get the snapshot followed by the log.

Accepts a `:snapshot_status` opt carrying the outcome of a prior
`await_snapshot_start` call (typically performed by the admission
plug, which holds an `:initial` permit across the wait so the snapshot
pool is protected by admission-gate backpressure). When set, the
redundant wait is skipped; this is required for correctness on the
snapshot-failure path, where re-issuing the GenServer call would race
against the consumer's `stop_and_clean`.
"""
@spec get_merged_log_stream(stack_id(), shape_handle(),
since: LogOffset.t(),
up_to: LogOffset.t(),
read_only?: boolean(),
snapshot_status: Api.Request.snapshot_status()
) :: {:ok, Storage.log()} | {:error, term()}
def get_merged_log_stream(stack_id, shape_handle, opts)
when is_shape_handle(shape_handle) and is_stack_id(stack_id) do
offset = Access.get(opts, :since, LogOffset.before_all())
max_offset = Access.get(opts, :up_to, LogOffset.last())

if ShapeCache.has_shape?(shape_handle, stack_id) do
with :started <- ShapeCache.await_snapshot_start(shape_handle, stack_id) do
storage =
Storage.for_shape(
shape_handle,
Storage.for_stack(stack_id, read_only?: opts[:read_only?])
)

{:ok, Storage.get_log_stream(offset, max_offset, storage)}
end
else
# If we have a shape handle, but no shape, it means the shape was deleted. Send a 409
# and expect the client to retry - if the state of the world allows, it'll get a new handle.
{:error, Electric.Shapes.Api.Error.must_refetch()}
case Access.get(opts, :snapshot_status) do
{:error, _} = error ->
error

:started ->
{:ok, build_log_stream(stack_id, shape_handle, offset, max_offset, opts)}

nil ->
if ShapeCache.has_shape?(shape_handle, stack_id) do
with :started <- ShapeCache.await_snapshot_start(shape_handle, stack_id) do
{:ok, build_log_stream(stack_id, shape_handle, offset, max_offset, opts)}
end
else
# If we have a shape handle, but no shape, it means the shape was deleted. Send a 409
# and expect the client to retry - if the state of the world allows, it'll get a new handle.
{:error, Electric.Shapes.Api.Error.must_refetch()}
end
end
end

defp build_log_stream(stack_id, shape_handle, offset, max_offset, opts) do
storage =
Storage.for_shape(
shape_handle,
Storage.for_stack(stack_id, read_only?: opts[:read_only?])
)

Storage.get_log_stream(offset, max_offset, storage)
end

@doc """
Get the shape handle that corresponds to this shape definition and return it
"""
Expand Down
3 changes: 2 additions & 1 deletion packages/sync-service/lib/electric/shapes/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,8 @@ defmodule Electric.Shapes.Api do
since: offset,
up_to: chunk_end_offset,
live_sse: in_sse?,
read_only?: request.read_only?
read_only?: request.read_only?,
snapshot_status: request.snapshot_status
) do
{:ok, log} ->
if live? && Enum.take(log, 1) == [] do
Expand Down
3 changes: 3 additions & 0 deletions packages/sync-service/lib/electric/shapes/api/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,23 @@ defmodule Electric.Shapes.Api.Request do
:global_last_seen_lsn,
:new_changes_ref,
:new_changes_pid,
:snapshot_status,
read_only?: false,
api: %Api{},
params: %Api.Params{},
response: %Api.Response{}
]

@type shape_handle :: Electric.shape_handle()
@type snapshot_status :: nil | :started | {:error, term()}
@type t() :: %__MODULE__{
chunk_end_offset: nil | LogOffset.t(),
handle: nil | shape_handle(),
last_offset: nil | LogOffset.t(),
global_last_seen_lsn: nil | pos_integer(),
new_changes_ref: nil | reference(),
new_changes_pid: nil | pid(),
snapshot_status: snapshot_status(),
api: Api.t(),
params: Api.Params.t(),
response: Api.Response.t()
Expand Down
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric/status_monitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ defmodule Electric.StatusMonitor do
Process.set_label({:status_monitor, stack_id})
Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id)

:ets.new(ets_table(stack_id), [:named_table, :protected])
:ets.new(ets_table(stack_id), [:named_table, :protected, read_concurrency: true])

{:ok, %{stack_id: stack_id, waiters: MapSet.new()}}
end
Expand Down
Loading
Loading