Skip to content

Distributed Adaptive Concurrency

Ameya Borkar edited this page Jun 10, 2026 · 5 revisions

Distributed adaptive concurrency — one global ceiling across the fleet

Added in 0.10.0 (2026-05-29).

adaptiveConcurrency() infers a concurrency ceiling per process from locally observed RTT. The moment N processes front a shared backend (one inference cluster, one database pool, one upstream API), each of the N independent limiters infers a ceiling for the whole backend and the fleet collectively admits up to Σ Lᵢ — N× the backend's true capacity. The limiter that was supposed to prevent overload now causes it under fan-out.

distributedAdaptiveConcurrency() closes that gap. It is a drop-in ConcurrencyGuard whose admissions are governed by one cooperatively-inferred global ceiling L_global (inferred from the fleet's aggregate RTT signal, not configured by hand). The hard, coordinator-enforced guarantee is GlobalCap:

Σ_n granted_share[n]  ≤  L_global     (the coordinator never over-commits)

so in steady state Σ_n inflight[n] ≤ L_global. The coordinator also reserves each peer's max(share, inflight) (0.10.0, D-DAC-18), so a joiner is never granted capacity an incumbent is still occupying — eliminating the rebalance in-flight overshoot in the synchronous / low-latency case (the joiner ramps only as fast as incumbents drain). By default it is not a hard instantaneous bound across the async fleet: grant-reply and heartbeat-reporting lag leave a bounded (~1.5–2×), self-draining residual — in-flight requests are non-revocable — that converges back under budget, never by new over-authorization (see Membership changes).

Opt-in hard bound (acknowledged handoff, D-DAC-19). Set acknowledgedHandoff: true on the coordinator and Σ_n inflight[n] ≤ L_global becomes a hard instantaneous invariant of the async fleet. The coordinator then reserves each peer's max un-acknowledged grant (the largest share it issued that the peer hasn't confirmed superseding — the guard echoes the grant generation it enforces, atomically with its in-flight) unioned with that reported in-flight, so freed budget is handed to a joiner only after the incumbent confirms it lowered and drained. The overshoot becomes a ramp delay (≈1–2 heartbeats), not a violation — that latency is the price, which is why it is opt-in (the occupancy cap above is the default; enable handoff only once every node is upgraded). Machine-checked hard + tight by spec/GaleHeartbeatHandoff.tla (TLC, 250,624 states) and the async BFS twin.

Eager handoff removes the ramp-latency price (D-DAC-20, 0.11.0). That ~1–2 heartbeat ramp is an artifact of batching the budget transfer onto the periodic heartbeat tick. Set eagerHandoff: true on the guard and it fires off-cycle beats the instant local state shows the allocation is stale — when it is capped below its fair share (budget is coming), when an incumbent drains below its lowered share (budget is freed), or when it applied a lowered grant (to confirm it promptly). The ramp collapses toward the physical floor (drain + one round-trip) with no loosening of the bound — off-cycle beats are just heartbeats at a different time, which the safety proof already covers. Off-cycle beats are debounced (minHeartbeatMs), so steady state adds zero beats. It is guard-side only (no coordinator/wire change) and needs a scheduler with setTimer (the default has one). The pitch-perfect config is { acknowledgedHandoff: true, eagerHandoff: true } — a hard Σ inflight ≤ L_global and a near-floor ramp, the two properties the toggle alone made you choose between.

Self-fencing closes the partition overshoot (D-DAC-21, 0.11.0, default ON under fail-closed). A crashed or network-partitioned node can't heartbeat, but a partition hangs rather than throwing, so in earlier versions the node kept admitting against its last-known share while the coordinator reassigned its budget to peers — Σ inflight over the backend exceeded L_global for the whole partition. Self-fencing enforces the lease on the node's own clock: it stops admitting at lastSuccessfulBeatExpiresAt − fenceSafetyMargin, strictly before the coordinator's reclaim, and an onFenced hook lets you abort in-flight (e.g. an AbortController) so the occupancy drains. A healthy node never fences (stats().fenced shows the state). This rests on the standard lease assumption — bounded node↔coordinator clock skew ≤ fenceSafetyMargin (the same assumption Chubby and Kubernetes leader-election make; keep nodes NTP-synced and the default margin covers it). local-only opts out by design (it serves through an outage). The unbounded-skew + uncooperative-backend case is provably impossible to bound (FLP/CAP) — a theorem, not a setting.

From any client, no client change. Behind a ThrottleKit server this is a Tier-1 feature reachable over the server's existing Admit RPC (distributedConcurrency: policy) — a stock client (including Python) gets the fleet-shared ceiling just by pointing at the policy. See Scaling & the Fleet.

The fan-out overshoot problem

A single backend has true capacity C (time-varying, unknown). N gateway nodes each run their own adaptiveConcurrency. Each node observes RTT that reflects the total load on the shared backend (the backend is slow for everyone at once), so each independently infers Lᵢ ≈ C and admits up to Lᵢ concurrent requests. The backend then sees Σ Lᵢ ≈ N·C — an overshoot factor of N. As the backend buckles, every node's RTT climbs together, every node backs off together, and the fleet oscillates in lockstep (synchronized AIMD collapse).

The single most important conceptual guard-rail: all nodes estimate the same quantity (the shared backend's capacity), so reconstructing a fleet ceiling by summing per-node estimates is exactly the bug above. The coordinator never sums — it folds the per-node views into one robust central estimate.

The two-mechanism model

Distributed adaptive concurrency is a composition of two orthogonal mechanisms running at different tempos.

Mechanism 1 — Capacity estimation ("how big is the backend right now?"). Each node reports its locally-inferred L_local (straight from a private adaptiveConcurrency fed by that node's RTTs). The coordinator folds all live reports into L_global = aggregate({L_local}) using median (default) or min — NEVER sum (summing rebuilds the N·C overshoot). Median is robust to a single mis-calibrated or cold-starting node; min is the conservative extreme where the most-stressed node's view caps the fleet.

Mechanism 2 — Capacity allocation ("which node may admit a request?"). L_global is divided into per-node shares by budget-capped equal split: a node's grant is its equal-split target ⌊L/N⌋ capped at the budget not already committed to the other live nodesshare = min(target, L_global − Σ other shares). A node admits while its local in-flight count is below its share. The cap makes Σ shares ≤ L_global a hard invariant under any heartbeat interleaving (GlobalCap): a joining node computes min(target, L_global − L_global) = 0 and stays at 0 until the incumbents re-heartbeat down — no separate join bookkeeping needed.

 fast loop  (synchronous, in-process, per request)
   acquire()/release()  ──gate on──▶  min(share, L_local)
                                          ▲          │ RTT feeds
   the ConcurrencyGuard contract lives here          ▼
 slow loop  (async, background, every heartbeatMs)
   report L_local + inflight ──▶ coordinator ──▶ aggregate ──▶ equal-split ──▶ new share

The fast loop keeps acquire() synchronous — a concurrency gate must never block on the network; shedding is the right move when you are out of slots. The slow loop imposes one coordination round-trip per heartbeat, not per request: the share is delegated authority the node spends locally.

The effective ceiling is min(share, L_local). The min only ever lowers the ceiling below share, so it can never violate inflight ≤ share (safe), and it gives fast local reaction: if this node's RTT spikes, L_local drops within one request and the node sheds immediately — without waiting for the next heartbeat.

API

import {
  distributedAdaptiveConcurrency,
  TestConcurrencyCoordinator,
} from "throttlekit";

// One coordinator owns L_global for a shared backend.
const coordinator = new TestConcurrencyCoordinator({ aggregate: "median" });

const guard = distributedAdaptiveConcurrency({
  coordinator,
  nodeId: process.env.HOSTNAME!,   // REQUIRED — unique per process
  key: "inference-cluster",        // nodes fronting the same backend MUST match
  local: { minLimit: 4, maxLimit: 128 },
  heartbeatMs: 1000,               // the heartbeat_T
  leaseTtlMs: 2000,                // default 2 * heartbeatMs
  onCoordinatorOutage: "fail-closed",
});

// acquire() is synchronous — the ConcurrencyGuard contract.
const lease = guard.acquire();
if (lease.ok) {
  try { /* call the shared backend */ }
  finally { lease.release(); }   // event-release; idempotent
}

await guard.close();   // stop the timer + leave() the fleet

distributedAdaptiveConcurrency(options) returns a DistributedConcurrencyGuard — a ConcurrencyGuard plus distributed lifecycle:

Member Shape Notes
acquire() synchronous Gates on min(share, L_local); inherited from the base guard
release() synchronous Event-release; idempotent (double-release is a no-op)
heartbeat() Promise<void> Force a heartbeat now (report L_local, refresh share). Never throws — outage routes to the outage policy. Useful to gate startup: await guard.heartbeat() once after construction
close() Promise<void> Stop the timer and leave() the fleet. Idempotent
stats() snapshot Extends the base stats with share, lGlobal, nodes

ConcurrencyCoordinator

The coordinator owns the shared L_global and parcels it into per-node shares — the event-release sibling of federation's GlobalCoordinator. Each node's heartbeat(report) does report + (re)lease in one round-trip; the coordinator upserts the node's { lLocal, inflight, expiresAt }, evicts every node whose lease has expired, recomputes L_global = aggregate(...), equal-splits it across the live nodes, and returns this node's { share, lGlobal, nodes }. leave() reclaims a node's share immediately on voluntary departure.

Aggregation policy lives on the coordinator, not the guard. aggregate ∈ { "median", "min" } is a fleet-wide decision — every node must use one consistent rule — so it is a coordinator-construction option. Guards only report raw lLocal. A misconfigured mixed-aggregation fleet is impossible by construction.

Concrete coordinators

Coordinator When to use Status
TestConcurrencyCoordinator (in-memory) Tests + examples; deterministic, no timers, no I/O (expiry compared against an injected clock) Shipped 0.10.0
RedisConcurrencyCoordinator (shared Redis) Production default; one Lua script does heartbeat-aggregate-split atomically Shipped 0.10.0
PostgresConcurrencyCoordinator When you already run Postgres Shipped 0.11.2 — an advisory-lock transaction running the shared heartbeat-core; dual-path Test ≡ Postgres
import { RedisConcurrencyCoordinator } from "throttlekit";
import { fromIoredis } from "throttlekit/redis";
import Redis from "ioredis";

const coordinator = new RedisConcurrencyCoordinator({
  client: fromIoredis(new Redis(process.env.COORDINATOR_URL!)),
  aggregate: "median",
});

Both coordinators implement the identical ConcurrencyCoordinator interface and are drop-in interchangeable; an identical report sequence through either yields identical { share, lGlobal, nodes } (the dual-path conformance test).

The budget cap and the equal-split limitation (skew)

Each grant is the equal-split target ⌊L/N⌋ (+1 to the lowest-ranked L mod N nodes by lexicographic nodeId) capped at L_global − Σ other live shares. The cap is what makes Σ shares ≤ L_global hold under staggered heartbeats — a stateless ⌊L/N⌋ split would over-commit the instant a node joins (the joiner computes its small share while an incumbent still holds its larger pre-join one). It is deterministic (no RNG, O(N log N)) and needs no separate clawback path — the cap is the clawback. The fleet converges to the exact equal split (Σ shares = L_global) within ≈ N heartbeats.

The cost: an idle node still holds ≈ L/N (under the default allocation: "equal-split"). Under skew the busy nodes are capped below what the idle nodes are leaving on the table — a utilization limitation, not a safety one (the cap never overshoots).

Shipped in 0.11.2 (opt-in): allocation: "demand-proportional". A satisfied node (inflight < share) drains to a 1-slot probe, and the cap re-grants the released budget to hungry nodes (inflight ≥ share) — +25–50pp utilization under skew, 0 regression when balanced, starvation-free when L_global ≥ N. It's a target-only change: the occupancy cap is untouched, so both safety bounds (Σ shares ≤ L_global, synchronous Σ inflight ≤ L_global) hold for it exactly as for equal-split — re-verified exhaustively in the BFS twin and bit-identically across the JS ↔ Redis-Lua / Postgres dual paths. Set it on every coordinator on the key (all must agree, like aggregate); the default stays "equal-split".

Membership changes

A node joining a live fleet is the staggered hazard the budget cap exists for: because each node fetches its share independently at a staggered time from a different live-set snapshot, a stateless ⌊L/N⌋ split would grant a fresh joiner a positive share before the incumbents re-split down — so for up to one heartbeat Σ granted shares > L_global (worst case a 1.5× overshoot on a 1→2 scale-up) with L_global never having decreased.

The budget cap closes this with no extra bookkeeping: a joiner's grant is min(target, L_global − Σ incumbent shares), and since the incumbents still hold all of L_global, the joiner gets min(target, 0) = 0. It stays at 0 until the incumbents re-heartbeat down (each capped to its own fair share), then earns its ≈ L/N on a later heartbeat. Σ shares ≤ L_global holds at every instant across growth — and, unlike a one-shot "share 0 for one heartbeat" rule, the cap stays correct even if the joiner re-heartbeats repeatedly before an incumbent shrinks (each grant is re-capped at the current remainder).

The cap reserves not just a peer's share but its in-flight too — L_global − Σ_other max(share, inflight) (D-DAC-18). So even a joiner that could fit by committed shares is held at 0 until the incumbents' in-flight physically drains — it never ramps into still-occupied capacity. In the synchronous / low-latency case this makes Σ inflight ≤ L_global hold throughout the rebalance (model-checked as InflightCap in the spec + BFS twin). Across the async fleet a bounded, self-draining residual remains — a guard admits against its cached grant while a reduction is still in flight, and the cap reserves a peer's last-reported in-flight. A hard instantaneous bound is available via the opt-in acknowledged handoff (acknowledgedHandoff: true, D-DAC-19; see above), and eager handoff (eagerHandoff: true, D-DAC-20) removes its ramp-latency cost — together a hard bound at near-floor ramp. With the default cap the honest contract is unchanged: bounded and converges, never a runaway.

A node leaving (or L_global dropping) is the dual — shrink-drain: an over-allocated node admits nothing new and drains as in-flight requests complete (you cannot retroactively un-admit a running request). Σ inflight is non-increasing under this debt and converges back to ≤ L_global. This is identical to how single-node adaptiveConcurrency behaves when its estimate drops, and is the same reason Σ inflight ≤ L_global is a steady-state property, not a hard invariant — a liveness/convergence property, not a safety violation.

Coordinator-outage modes

Mode Behavior on heartbeat() throw When to choose
"fail-closed" (default) share → 0; the node sheds everything (503) until the coordinator returns. Never overshoots; trades availability Safety > availability (matches federation's default)
"local-only" share → L_local; the node falls back to pure in-process adaptive concurrency. The fleet may collectively overshoot the backend (the fan-out regime) but stays up Availability > strict bound; the honest degraded mode

leaseTtlMs defaults to 2 · heartbeatMs, so a single slow heartbeat does not drop a node from the fleet — only two consecutive misses do. A node that crashes is reclaimed within leaseTtlMs, and survivors' shares grow on their next heartbeat.

The fail-closed row above covers a heartbeat that throws. A partition, though, usually makes the beat hang — and a hanging node would otherwise keep admitting against its stale share while the coordinator reassigns its budget. So under fail-closed, self-fencing (selfFence, default ON — D-DAC-21) makes the node stop admitting on its own clock at expiresAt − fenceSafetyMargin, before the coordinator's reclaim, and onFenced lets you abort in-flight. It assumes node↔ coordinator clock skew ≤ fenceSafetyMargin (keep nodes NTP-synced). A healthy node never fences; local-only opts out (it serves through outages by design).

Cold start: the first heartbeat is scheduled on the next tick (not after a full heartbeatMs) to minimize the startup stall. Before the first grant lands, share = 0 under fail-closed (admits nothing for ~one RTT) or share = L_local under local-only. A founding (solo) node's first grant is its full L_global — it is the only live node, so the budget cap binds at nothing; a node joining an already-saturated fleet instead gets share = 0 from the cap until the incumbents re-heartbeat down. Callers who must gate startup can await guard.heartbeat() once after construction.

Forward-compat with the 0.9.2 adapters

DistributedConcurrencyGuard extends ConcurrencyGuard, and acquire() stays synchronous, so the value returned drops straight into the 0.9.2 middleware adapters — no code change at the call site. This is the forward- compat hook the middleware integration page promised:

import express from "express";
import {
  expressAdaptiveConcurrency,
  distributedAdaptiveConcurrency,
  RedisConcurrencyCoordinator,
} from "throttlekit";

const guard = distributedAdaptiveConcurrency({
  coordinator: new RedisConcurrencyCoordinator({ client, aggregate: "median" }),
  nodeId: process.env.HOSTNAME!,
  key: "inference-cluster",
});

const app = express();
// The same adapter that wrapped a local guard now wraps the distributed one.
app.use(expressAdaptiveConcurrency({ guard }));

Every sibling adapter (fastifyAdaptiveConcurrency, koaAdaptiveConcurrency, honoAdaptiveConcurrency, withAdaptiveConcurrency, …) picks it up the same way — the adapter owns the exactly-once release(), the distributed guard owns L_global.

Failure modes

The bound holds end-to-end through every coordinator outage shape; detailed table in docs/FAILURE-MODES.md. Summary:

Condition fail-closed (default) local-only Recovery
Coordinator unreachable share → 0, node sheds all (503) share → L_local, per-node self-limit; fleet may overshoot backend Next successful heartbeat restores share
Node crashes holding share Lease TTL (2·heartbeatMs) expires → coordinator reclaims; survivors' shares grow next heartbeat same Automatic within leaseTtlMs
L_global shrinks (backend degraded) Over-allocated nodes admit nothing new; in-flight drains; Σ inflight → L_global same Convergence within max request duration
Node JOINS (L_global constant) Budget cap gives the joiner min(target, L_global − Σ incumbents) = 0 until incumbents re-split down; Σ shares ≤ L_global holds throughout (no over-commitment) same Joiner earns its ≈ L/N share over the next ≈ N heartbeats
Stale share (between heartbeats) Node may admit up to a now-too-large share for ≤ heartbeatMs; bounded by the min(share, L_local) fast-shrink same Smaller heartbeatMs trades coordinator load for reaction speed
nodeId collision (operator error) Two processes overwrite one record → undercount → under-admission (safe direction) same Enforce unique nodeId
Clock skew between node and coordinator Large skew → premature evict (safe) or late evict (bounded by skew) same Keep nodes NTP-synced

Safety + proof — read more

The hard safety theorem (for every staggered interleaving of per-node Reallocate/Join/Leave/Acquire/Release at constant L_global, GlobalCap: Σ_active share ≤ L_global) relabels federation's window- coupled leasing proof — windowMs → heartbeat_T, the heartbeat boundary IS the federation Roll. The TLA⁺ spec models the staggered, budget-capped protocol (a Reallocate action that caps each grant at the remaining budget, a Join action that adds a node at share = 0) so the membership-growth bound is witnessed, not assumed. Σ inflight ≤ L_global is deliberately not an invariant — in-flight is non-revocable and drains after a rebalance.

  • DESIGN.md — full design, the DR-18 reduction, the two-mechanism split, the locked ConcurrencyCoordinator interface, the budget cap (D-DAC-17), and 17 decision records.
  • spec/GaleHeartbeatLeasing.tla — the TLA⁺ spec proving GlobalCap (Σ_active share ≤ L), with the GlobalCap / GlobalCapTight invariants, model-checked by the BFS twin.
  • docs/FAILURE-MODES.md — full outage shape table.

See also

  • Middleware integration — the 0.9.2 adapters that pick up the distributed guard unchanged.
  • Federation — the rate-axis sibling (window-coupled leasing, Δ = 0); this is its event-release dual on the concurrency axis.
  • Distributed & provable — the broader provable- bounds story.

Clone this wiki locally