Refactor ShapeStream state to be an explicit state machine#3816
Refactor ShapeStream state to be an explicit state machine#3816
ShapeStream state to be an explicit state machine#3816Conversation
bd8df84 to
6631758
Compare
commit: |
✅ Deploy Preview for electric-next ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3816 +/- ##
==========================================
- Coverage 87.68% 87.28% -0.40%
==========================================
Files 23 24 +1
Lines 2078 2305 +227
Branches 548 575 +27
==========================================
+ Hits 1822 2012 +190
- Misses 254 291 +37
Partials 2 2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
This comment has been minimized.
This comment has been minimized.
Test Review: Applying DSL & Property-Based Testing IdeasThe state machine extraction is a great structural move — pulling scattered mutable fields out of the 1800-line Key ideas from the guide that apply hereThe guide argues that when you have a well-defined state machine (which is exactly what this PR creates), example-based tests ("given this state and this input, expect that output") are the weakest form of verification. The stronger techniques are:
The core insight: you've built a pure, immutable, isolated state machine. This is the perfect candidate for these techniques — no mocking fetch or SSE, no async, just inputs → outputs. Don't leave that leverage on the table. 1. Scenario DSL (High Priority)Bugs in state machines almost always come from unexpected sequences, not individual steps. The current tests check one transition at a time. A fluent scenario builder would let you test full journeys: function scenario(initial?: Partial<SharedStateFields>) {
let state: ShapeStreamState = createInitialState({
offset: initial?.offset ?? `-1`,
handle: initial?.handle
})
const trace: Array<{ event: string; before: string; after: string }> = []
const self = {
response(input: Partial<ResponseMetadataInput>) {
const before = state.kind
const transition = state.handleResponseMetadata(makeResponseInput(input))
state = transition.state
trace.push({ event: `response`, before, after: state.kind })
assertStateInvariants(state) // automatic invariant checking at every step
return self
},
messages(input: Partial<MessageBatchInput>) { /* similar */ return self },
pause() { /* ... */ return self },
resume() { /* ... */ return self },
error(msg: string) { /* ... */ return self },
retry() { /* ... */ return self },
reset(handle?: string) { /* ... */ return self },
sseClose(input: Partial<SseCloseInput>) { /* ... */ return self },
expectKind(kind: ShapeStreamStateKind) {
expect(state.kind).toBe(kind)
return self
},
expectUpToDate(expected: boolean) {
expect(state.isUpToDate).toBe(expected)
return self
},
expectHandle(h: string | undefined) {
expect(state.handle).toBe(h)
return self
},
done() { return { state, trace } }
}
return self
}This enables readable multi-step tests: it(`full lifecycle: initial → sync → live → pause → resume → error → retry`, () => {
scenario()
.response({ responseHandle: `h1`, responseOffset: `0_5` })
.expectKind(`syncing`)
.messages({ hasUpToDateMessage: true })
.expectKind(`live`)
.expectUpToDate(true)
.pause()
.expectKind(`paused`)
.expectUpToDate(true) // paused-from-live preserves isUpToDate
.resume()
.expectKind(`live`)
.error(`connection lost`)
.expectKind(`error`)
.retry()
.expectKind(`live`)
})
it(`stale CDN → retry → fresh response → sync → live`, () => {
scenario()
.response({ responseHandle: `stale-h`, expiredHandle: `stale-h` })
.expectKind(`stale-retry`)
.response({ responseHandle: `fresh-h`, responseOffset: `0_0` })
.expectKind(`syncing`)
.messages({ hasUpToDateMessage: true })
.expectKind(`live`)
.expectHandle(`fresh-h`)
})The builder is the "well-formed scenario" tier. For adversarial testing, keep the raw constructors ( 2. Algebraic Property Tests (High Priority)Pause/resume, error/retry, const allStates = (): ShapeStreamState[] => {
const shared = makeShared()
return [
createInitialState({ offset: `-1` }),
new SyncingState(shared),
new LiveState(shared),
new ReplayingState({ ...shared, replayCursor: `c1` }),
new StaleRetryState({ ...shared, staleCacheBuster: `cb`, staleCacheRetryCount: 1 }),
new LiveState(shared).pause(),
new SyncingState(shared).toErrorState(new Error(`test`)),
]
}
describe(`algebraic properties`, () => {
it.each(allStates().map(s => [s.kind, s]))(
`%s: pause().resume() round-trips`,
(_kind, state) => {
const roundTripped = state.pause().resume()
expect(roundTripped.kind).toBe(state.kind)
expect(roundTripped.handle).toBe(state.handle)
expect(roundTripped.offset).toBe(state.offset)
expect(roundTripped.isUpToDate).toBe(state.isUpToDate)
}
)
it.each(allStates().map(s => [s.kind, s]))(
`%s: toErrorState(e).retry() round-trips`,
(_kind, state) => {
const roundTripped = state.toErrorState(new Error(`x`)).retry()
expect(roundTripped.kind).toBe(state.kind)
expect(roundTripped.handle).toBe(state.handle)
expect(roundTripped.offset).toBe(state.offset)
}
)
it.each(allStates().map(s => [s.kind, s]))(
`%s: markMustRefetch always → InitialState with offset=-1`,
(_kind, state) => {
const reset = state.markMustRefetch(`new-h`)
expect(reset).toBeInstanceOf(InitialState)
expect(reset.offset).toBe(`-1`)
expect(reset.handle).toBe(`new-h`)
expect(reset.schema).toBeUndefined()
expect(reset.isUpToDate).toBe(false)
}
)
it.each(allStates().map(s => [s.kind, s]))(
`%s: withHandle changes only handle`,
(_kind, state) => {
const updated = state.withHandle(`changed`)
expect(updated.handle).toBe(`changed`)
expect(updated.offset).toBe(state.offset)
expect(updated.kind).toBe(state.kind)
expect(updated.isUpToDate).toBe(state.isUpToDate)
}
)
})3. Random Sequence Fuzzing (Medium Priority)Generate random event sequences and verify invariants hold at every step. A single fuzz run like this explores more of the state space than all 42 hand-written tests combined: function applyEvent(state: ShapeStreamState, event: Event): ShapeStreamState {
switch (event.type) {
case `response`: return state.handleResponseMetadata(event.input).state
case `messages`: return state.handleMessageBatch(event.input).state
case `pause`: return state.pause()
case `resume`: return state instanceof PausedState ? state.resume() : state
case `error`: return state.toErrorState(new Error(`fuzz`))
case `retry`: return state instanceof ErrorState ? state.retry() : state
case `markMustRefetch`: return state.markMustRefetch()
case `sseClose`: return state.handleSseConnectionClosed(event.input).state
}
}
function checkInvariants(state: ShapeStreamState) {
expect(state).toBeDefined()
expect([`initial`,`syncing`,`live`,`replaying`,`stale-retry`,`paused`,`error`]).toContain(state.kind)
expect(typeof state.offset).toBe(`string`)
// Only LiveState (or delegates to it) should be up-to-date
if ([`initial`, `syncing`, `stale-retry`, `replaying`].includes(state.kind)) {
expect(state.isUpToDate).toBe(false)
}
// staleCacheBuster only present in StaleRetryState (or delegates)
if (![`stale-retry`, `paused`, `error`].includes(state.kind)) {
expect(state.staleCacheBuster).toBeUndefined()
}
}
it(`survives 1000 random 50-step sequences without invariant violations`, () => {
for (let seed = 0; seed < 1000; seed++) {
let state: ShapeStreamState = createInitialState({ offset: `-1` })
const rng = mulberry32(seed) // seeded PRNG for reproducibility
for (let step = 0; step < 50; step++) {
const event = randomEvent(rng)
state = applyEvent(state, event)
checkInvariants(state)
}
}
})When a seed fails, you have a fully reproducible failing sequence you can minimize. 4. Specific Missing Edge Cases (High Priority)Even without the DSL/fuzzing infrastructure, these gaps should be filled now: Double-pause nesting — potential bug if it(`double pause creates nested PausedState — resume only unwraps one layer`, () => {
const live = new LiveState(makeShared())
const paused1 = live.pause()
const paused2 = paused1.pause()
expect(paused2).toBeInstanceOf(PausedState)
const resumed1 = paused2.resume()
expect(resumed1).toBeInstanceOf(PausedState) // still paused once
})(Consider: should 204 response handling: it(`204 response sets lastSyncedAt`, () => {
const syncing = new SyncingState(makeShared({ lastSyncedAt: undefined }))
const transition = syncing.handleResponseMetadata(
makeResponseInput({ status: 204, now: 1700000000 })
)
expect(transition.state.lastSyncedAt).toBe(1700000000)
})SSE vs. non-SSE offset handling: it(`SSE up-to-date message updates offset`, () => {
const syncing = new SyncingState(makeShared({ offset: `0_0` }))
const transition = syncing.handleMessageBatch(
makeMessageBatchInput({ isSse: true, upToDateOffset: `5_3` as Offset })
)
expect(transition.state.offset).toBe(`5_3`)
})
it(`non-SSE up-to-date message does NOT update offset`, () => {
const syncing = new SyncingState(makeShared({ offset: `0_0` }))
const transition = syncing.handleMessageBatch(
makeMessageBatchInput({ isSse: false, upToDateOffset: `5_3` as Offset })
)
expect(transition.state.offset).toBe(`0_0`)
})Schema set-once semantics: it(`schema is only set once (first response wins)`, () => {
const initial = createInitialState({ offset: `-1` })
const t1 = initial.handleResponseMetadata(
makeResponseInput({ responseSchema: { id: { type: `int4` } } })
)
const t2 = t1.state.handleResponseMetadata(
makeResponseInput({ responseSchema: { name: { type: `text` } } })
)
expect(t2.state.schema).toEqual({ id: { type: `int4` } })
})Events on Paused/Error states (defensive no-ops): it(`PausedState.handleResponseMetadata returns ignored`, () => {
const paused = new SyncingState(makeShared()).pause()
const transition = paused.handleResponseMetadata(makeResponseInput())
expect(transition.action).toBe(`ignored`)
})
it(`ErrorState.handleMessageBatch returns no-op`, () => {
const errored = new SyncingState(makeShared()).toErrorState(new Error(`x`))
const transition = errored.handleMessageBatch(makeMessageBatchInput())
expect(transition.suppressBatch).toBe(false)
expect(transition.state).toBe(errored)
})Summary
The current test suite is a solid "did I implement this correctly?" check. What these techniques add is a "can anything break this?" check. The state machine is pure and immutable — it's the perfect candidate for property-based and trace-based testing. That's the whole payoff of extracting it from |
Test Review Part 2: The Glue Layer Between State Machine and Real WorldThe first review focused on testing the state machine in isolation — algebraic properties, scenario DSLs, fuzz testing. This review addresses the other major risk surface: the adapter code in The state machine refactoring creates a clean seam, but that seam is also where the new risk concentrates. The state machine is now a pure function: inputs → (new state, transition metadata). But the glue code has two jobs that are both undertested:
The state machine unit tests verify the engine is correct; they say nothing about whether the steering wheel is connected to the wheels. Mapping the Risk SurfaceI traced every Risk 1: Input Extraction at
|
| Test file | Covers | Glue layer gaps |
|---|---|---|
shape-stream-state.test.ts |
Pure state machine transitions | Doesn't touch client.ts at all |
client.test.ts |
Error recovery w/ onError, visibility pause/resume, shape rotation, isConnected, isLoading |
No header extraction, no transition branching, no rapid pause/resume, no snapshot 409 distinction |
integration.test.ts |
End-to-end with real server | Can't isolate glue bugs from state machine bugs or server bugs |
stream.test.ts |
URL construction, column mapping | No state transitions |
fetch.test.ts |
Fetch wrapper retries, backoff, prefetch | No state machine interaction |
The gap: there are no tests sitting between the state machine unit tests and the full integration tests. Nothing tests the adapter layer in isolation.
Proposed Tests
A. Input Extraction Contract Tests (High Priority)
Verify that #onInitialResponse correctly maps HTTP headers to state machine inputs:
describe(`glue: #onInitialResponse header extraction`, () => {
it(`maps HTTP headers to state machine input fields`, async () => {
const { stream, nextFetch } = createMockShapeStream()
nextFetch.respond({
status: 200,
headers: {
[SHAPE_HANDLE_HEADER]: `test-handle`,
[CHUNK_LAST_OFFSET_HEADER]: `5_3`,
[LIVE_CACHE_BUSTER_HEADER]: `cursor-42`,
// + schema headers
},
body: `[]`,
})
await stream.waitForNextTick()
expect(stream.shapeHandle).toBe(`test-handle`)
expect(stream.lastOffset).toBe(`5_3`)
})
it(`looks up expired handle from cache and triggers stale-retry`, async () => {
// Pre-populate expiredShapesCache with a handle
// Respond with that same handle
// Verify: StaleCacheError thrown, console.warn emitted
})
it(`204 response sets lastSyncedAt via state machine`, async () => {
// Respond with 204
// Verify: lastSyncedAt() returns a recent timestamp
})
})B. Transition Branch Tests (High Priority)
For each transition.action value, verify the correct side effect:
describe(`glue: transition result branching`, () => {
it(`stale-retry cancels body and throws StaleCacheError`, async () => {
// Setup: respond with handle matching expired handle, no local handle
// Verify: response.body.cancel() called, StaleCacheError thrown
})
it(`stale-retry exceeding max retries throws FetchError 502`, async () => {
// Setup: trigger stale-retry more than maxStaleCacheRetries times
// Verify: FetchError with status 502
})
it(`ignored stale response logs warning and skips body processing`, async () => {
// Setup: local handle exists, respond with different expired handle
// Verify: console.warn includes "Ignoring", no subscriber notification from this response
})
it(`suppressBatch skips subscriber notification but resolves midStream promise`, async () => {
// Setup: enter replay mode, respond with up-to-date + unchanged cursor
// Verify: subscriber NOT called, but midStream promise resolves
})
})C. Pause/Resume Protocol Tests (High Priority)
describe(`glue: pause/resume protocol`, () => {
it(`pause during idle → next requestShape creates PausedState`, async () => {
const { stream } = createLiveShapeStream()
stream.triggerPause()
await stream.waitForNextTick()
expect(stream.isPaused()).toBe(true)
})
it(`rapid pause→resume before request loop: no PausedState created`, async () => {
const { stream } = createLiveShapeStream()
stream.triggerPause()
stream.triggerResume() // immediately, before #requestShape runs
await stream.waitForNextTick()
expect(stream.isPaused()).toBe(false)
// Verify: state was never PausedState
})
it(`pause during active fetch: abort caught, transitions to PausedState`, async () => {
const { stream, hangingFetch } = createMockShapeStream()
hangingFetch() // fetch that never resolves until aborted
stream.triggerPause()
await stream.waitForNextTick()
expect(stream.isPaused()).toBe(true)
})
it(`resume detects resumingFromPause, avoids live long-poll param`, async () => {
const { stream, nextFetch, getLastFetchUrl } = createPausedLiveShapeStream()
stream.triggerResume()
nextFetch.respond({ /* up-to-date response */ })
const url = getLastFetchUrl()
expect(url.searchParams.has(`live`)).toBe(false) // no long-poll!
})
it(`resume with aborted user signal: stays paused`, async () => {
const controller = new AbortController()
const { stream } = createPausedShapeStream({ signal: controller.signal })
controller.abort()
stream.triggerResume()
expect(stream.isPaused()).toBe(true)
})
})D. Snapshot 409 Distinction Test (Medium Priority)
describe(`glue: snapshot 409 uses withHandle not markMustRefetch`, () => {
it(`updates handle but preserves offset and schema`, async () => {
const { stream, triggerSnapshot409 } = createLiveShapeStream({
handle: `h1`, offset: `5_3`
})
triggerSnapshot409({ newHandle: `h2` })
await stream.waitForNextTick()
expect(stream.shapeHandle).toBe(`h2`) // updated
expect(stream.lastOffset).toBe(`5_3`) // NOT reset to -1
expect(stream.isUpToDate).toBe(true) // NOT reset to false
})
})E. Dual-State Consistency Invariants (Medium Priority)
Add an invariant checker that can be wired into the mock harness:
function assertGlueConsistency(stream: ShapeStream) {
// isLoading is the inverse of isUpToDate
expect(stream.isLoading()).toBe(!stream.isUpToDate)
// isPaused should only be true when syncState is PausedState
// (not when #pauseRequested is true but not yet consumed)
if (stream.isPaused()) {
// state machine should be in PausedState
// #pauseRequested should be false (consumed)
}
}Run this after every mock response and every pause/resume operation.
F. SSE Backoff Glue Test (Low Priority)
describe(`glue: SSE close → backoff computation`, () => {
it(`short connection triggers sleep with exponential delay`, async () => {
// Mock setTimeout to capture delay
// Trigger SSE connection that closes after 50ms (< minSseConnectionDuration)
// Verify: setTimeout called with delay based on 2^consecutiveShortSseConnections
})
it(`fallback to long polling emits warning`, async () => {
// Trigger maxShortSseConnections consecutive short connections
// Verify: console.warn about proxy buffering
// Verify: next request does NOT include SSE params
})
})The Testing Harness
All of the above require a mock fetch harness that sits between the state machine unit tests and the full integration tests. The pattern already exists in stream.test.ts (with fetchWrapper), but needs to be extended to:
- Queue responses —
nextFetch.respond({status, headers, body})for sequencing multi-step scenarios - Hang fetches —
hangingFetch()returns a promise that never resolves (for testing pause during active fetch) - Capture requests —
getLastFetchUrl()to verify URL params the glue code constructed - Expose internals — access to
isPaused(),isUpToDate,lastOffset,shapeHandlefor assertions
This harness would make it trivial to write targeted tests for each glue-layer risk zone without the overhead of a real server.
Summary
| Priority | Risk Zone | What to Test | Current Coverage |
|---|---|---|---|
| High | Input extraction (#onInitialResponse) |
HTTP headers → state machine input mapping | None — unit tests bypass this entirely |
| High | Transition branching | stale-retry / ignored / suppressBatch → correct side effects |
Indirect only via integration tests |
| High | Pause/resume protocol | #pauseRequested ↔ PausedState synchronization, race conditions |
Partial — only visibility-based pause/resume |
| Medium | Snapshot 409 distinction | withHandle() preserves offset/schema vs markMustRefetch() resets |
None |
| Medium | Dual-state consistency | #isMidStream, #connected, isLoading stay in sync with #syncState |
None |
| Low | SSE backoff glue | Duration → backoff delay computation, long-polling fallback | None |
The state machine extraction was the right move — it makes the core logic testable in isolation. The next step is testing the wiring harness that connects it to the real world. A mock fetch harness plus these targeted tests would close the gap between "state machine is correct" and "the system behaves correctly."
Test Review Part 3: Concurrency, Parallel State, and Delegation DepthParts 1 and 2 covered the state machine in isolation and the glue layer between the state machine and real-world events. This final review covers three remaining risk areas: concurrent pause/resume interactions, the parallel state that didn't move into the state machine, and delegation chain depth in PausedState/ErrorState. Risk 7: Snapshot ↔ Visibility Pause/Resume InteractionThe snapshot request flow pauses the main stream, fetches data, then resumes: // requestSnapshot (line ~1597)
this.#activeSnapshotRequests++
if (this.#activeSnapshotRequests === 1) {
this.#pause() // pause main stream
}
const { metadata, data } = await this.fetchSnapshot(opts)
// ... inject data ...
finally {
this.#activeSnapshotRequests--
if (this.#activeSnapshotRequests === 0) {
this.#resume() // resume main stream
}
}The visibility handler also calls const visibilityHandler = () => {
if (document.hidden) this.#pause()
else this.#resume()
}These can interleave in a problematic way:
The reverse is also problematic:
This isn't a new bug introduced by this PR — the pause/resume control flow is unchanged — but it IS an untested interaction that could produce state inconsistencies. The state machine refactoring makes it easier to expose via targeted tests. Proposed test: describe(`snapshot ↔ visibility interaction`, () => {
it(`snapshot resume while tab hidden: stream should stay paused`, async () => {
const { stream, mockVisibility, triggerSnapshot } = createMockShapeStream()
mockVisibility.hide() // tab goes hidden → stream paused
await triggerSnapshot() // snapshot pauses (no-op), fetches, resumes
// After snapshot completes, stream should STILL be paused
// because tab is still hidden
expect(stream.isPaused()).toBe(true) // THIS LIKELY FAILS — revealing the bug
})
it(`visibility resume during snapshot: stream should stay paused until snapshot completes`, async () => {
const { stream, mockVisibility, hangingSnapshot } = createMockShapeStream()
hangingSnapshot() // snapshot starts, pauses stream
mockVisibility.show() // tab visible → resume called
// Stream should NOT resume while snapshot is in flight
expect(stream.isPaused()).toBe(true)
})
})Risk 8: The
|
| Field | Lifecycle | Used for |
|---|---|---|
#connected |
Set true in #fetchShape, false in error paths + #reset() + normal completion |
isConnected() public API |
#isMidStream |
true on messages, false on up-to-date, true on reset |
#waitForStreamEnd() for snapshot coordination |
#isRefreshing |
true before abort, false after next tick / via microtask |
shouldUseSse(), canLongPoll in URL params |
#pauseRequested |
true by #pause(), consumed by #requestShape() |
Coordinates async pause with state machine |
#activeSnapshotRequests |
Incremented/decremented around snapshot requests, reset in #reset() |
Coordinates pause/resume for concurrent snapshots |
#started |
true in #start(), false before retry |
Guards against multiple starts |
These form their own implicit state machine with invariants that should hold:
#connectedshould betrueonly when a fetch/SSE connection is active#isMidStreamshould betrueonly between receiving data messages and the up-to-date control message#isRefreshingshould betrueonly during a brief abort→reconnect window#pauseRequestedshould betrueonly between#pause()and the next#requestShape()iteration#activeSnapshotRequestsshould never go negative
None of these invariants are tested. And the interactions between them are subtle — for example, #reset() clears #activeSnapshotRequests to 0, but the comment at line 1696 explicitly says snapshot 409s DON'T call #reset() to avoid breaking the counter. This constraint is enforced by convention, not by tests.
Proposed: parallel state invariant checker
function assertParallelStateInvariants(stream: TestableShapeStream) {
// activeSnapshotRequests is never negative
expect(stream.activeSnapshotRequests).toBeGreaterThanOrEqual(0)
// If stream is paused and not mid-snapshot, pauseRequested should be false
// (it should have been consumed by #requestShape)
if (stream.isPaused() && stream.activeSnapshotRequests === 0) {
expect(stream.pauseRequested).toBe(false)
}
// If not started, connected must be false
if (!stream.hasStarted()) {
expect(stream.isConnected()).toBe(false)
}
// If isRefreshing, we should be in an abort→reconnect cycle
// (hard to check directly, but can verify it doesn't persist)
}Summary of All Three Reviews
| Review | Focus | Key Gaps |
|---|---|---|
| Part 1 | State machine in isolation | No algebraic property tests, no scenario DSL, no fuzzing, missing edge cases |
| Part 2 | Glue layer (state machine ↔ real world) | No input extraction tests, no transition branch tests, no pause protocol tests, no snapshot 409 distinction test |
| Part 3 | Concurrency + parallel state | Snapshot↔visibility pause conflict (potential bug), #isRefreshing microtask race, delegation depth, parallel state invariants |
The three layers form a testing pyramid:
- Bottom: Pure state machine (algebraic properties, fuzz, DSL) — cheapest to write, fastest to run
- Middle: Glue layer (mock fetch harness, targeted adapter tests) — moderate cost, high value
- Top: Concurrency interactions (snapshot↔visibility, wake↔refresh, delegation chains) — hardest to test, but where the most surprising bugs live
The state machine refactoring was the right move — it makes the bottom two layers testable for the first time. The concurrency issues in the top layer predate this PR, but are now more visible because the state machine makes it clear when writes to #syncState could conflict.
Design Review: Parallel State & Pause CoordinationFollowing up on the testing reviews (Part 1, Part 2, Part 3) — this comment looks at the remaining transport/connection state that lives outside the state machine, and proposes a targeted fix for the coordination bugs identified in Part 3. The Two LayersThe PR cleanly extracts the sync protocol into a state machine: offset, handle, cursor, schema, up-to-date status, replay mode, stale cache retry. This is a state progression (Initial → Syncing → Live) where each state carries different data and responds to events differently. State machine is the right abstraction here. The transport/connection layer stays as fields on
The first three are simple lifecycle flags with no contention — they're fine as booleans. The last three are where the coordination complexity (and bugs) live. The Coordination Problem
These callers don't know about each other. The current code uses Bug 1: Snapshot resume overrides visibility pause
Bug 2: Visibility resume overrides snapshot pause
Bug 3: Snapshot blocks on live long-poll, consuming browser connections This one was reported separately and has two interacting issues: Issue A: Issue B: In All three bugs stem from the same root cause: pause coordination is split across multiple mechanisms ( Proposed Fix: Pause LockReplace class PauseLock {
#holders = new Set<string>()
#onStateChange: (isPaused: boolean) => void
constructor(onStateChange: (isPaused: boolean) => void) {
this.#onStateChange = onStateChange
}
acquire(reason: string): void {
if (this.#holders.has(reason)) {
// Set-based lock is naturally idempotent — double acquire is safe
// but likely indicates a caller bug (e.g., visibilitychange firing
// 'hidden' twice without a 'visible' in between)
console.warn(
`[Electric] PauseLock: "${reason}" already held — ignoring duplicate acquire`
)
return
}
const wasEmpty = this.#holders.size === 0
this.#holders.add(reason)
if (wasEmpty) this.#onStateChange(true)
}
release(reason: string): void {
this.#holders.delete(reason)
if (this.#holders.size === 0) {
this.#onStateChange(false)
}
}
get isPaused(): boolean {
return this.#holders.size > 0
}
/** Check if a specific reason is holding the lock */
isHeldBy(reason: string): boolean {
return this.#holders.has(reason)
}
}The Usage in ShapeStream: // In constructor:
this.#pauseLock = new PauseLock((isPaused) => {
if (isPaused) {
this.#requestAbortController?.abort(PAUSE_STREAM)
} else {
if (this.options.signal?.aborted) return
this.#start()
}
})
// Visibility handler — simple, no guards needed:
if (document.hidden) this.#pauseLock.acquire('visibility')
else this.#pauseLock.release('visibility')
// Snapshot requests — acquire BEFORE waitForStreamEnd:
async requestSnapshot(opts) {
this.#pauseLock.acquire(`snapshot-${snapshotId}`)
// acquire() immediately aborts the live long-poll via onStateChange,
// so waitForStreamEnd() resolves fast instead of blocking 20s
await this.#waitForStreamEnd()
try {
return await this.fetchSnapshot(opts)
} finally {
this.#pauseLock.release(`snapshot-${snapshotId}`)
}
}
// Wake detection — doesn't need pause at all, just refresh:
// (unchanged)
// #requestShape — check the lock instead of #pauseRequested:
if (this.#pauseLock.isPaused) {
this.#syncState = this.#syncState.pause()
return
}What this fixes:
What this eliminates:
What stays the same:
The
|
| Component | Current | Proposed | Why |
|---|---|---|---|
| Sync protocol | State machine ✓ | Keep as-is | Clean state progression, already well-designed |
| Pause coordination | #pauseRequested + #activeSnapshotRequests + guards |
Pause lock | Fixes bugs 1–3, eliminates two-phase protocol and abort controller race |
| Refresh flag | #isRefreshing with two clearing mechanisms |
Counter or single clearing mechanism | Eliminates microtask race |
| Connection/lifecycle | #connected, #isMidStream, #started |
Keep as-is | Simple, independent, no contention |
The pause lock is ~20 lines, trivially testable in isolation, and directly fixes all three coordination bugs. It's a much smaller and more targeted change than a full transport state machine — right tool for the right problem.
Fix a concurrency bug where the visibility handler and snapshot requests could override each other's pause state. Without this fix: 1. A snapshot completing while the tab is hidden would resume the stream, wasting bandwidth on a long-poll the user can't see. 2. A tab becoming visible during an active snapshot would resume the stream, causing concurrent writes from both the main stream and the snapshot. Each resume path now checks whether the other pause reason still holds, inspired by the PauseLock concept from PR #3816 review. https://claude.ai/code/session_01UGPdwB6UpFkkQi9p4sjPRj
Replace the #pauseRequested boolean + #activeSnapshotRequests counter with a set-based PauseLock that tracks *why* the stream is paused. This fixes three concurrency bugs identified in PR #3816 review: 1. Snapshot resume while tab hidden: snapshot completes and resumes the stream even though the tab is still hidden, wasting bandwidth. Fix: snapshot releases its lock reason, but 'visibility' reason remains held — stream stays paused. 2. Visibility resume during active snapshot: tab becomes visible and resumes the stream while a snapshot is in flight, causing both the main stream and snapshot to write concurrently. Fix: visibility releases its lock reason, but 'snapshot-N' reason remains held — stream stays paused. 3. Snapshot blocks on live long-poll: requestSnapshot called #waitForStreamEnd BEFORE #pause, blocking up to 20s waiting for the long-poll to complete. Fix: PauseLock.acquire() is called BEFORE waitForStreamEnd, immediately aborting the long-poll via the onAcquired callback. Also fixes the #isRefreshing microtask race by replacing the boolean flag with a counter + getter pattern. forceDisconnectAndRefresh and wake detection both increment/decrement in try/finally blocks, eliminating the window where concurrent operations could clear the flag prematurely. https://claude.ai/code/session_01UGPdwB6UpFkkQi9p4sjPRj
|
@KyleAMathews tl;dr of those reviews? ;D |
|
@kevin-dp and I chatted and we'll work on my suggestions as follow up PRs. |
KyleAMathews
left a comment
There was a problem hiding this comment.
huge improvement! The code is way more readable and reliable feeling now.
This PR reproduces a bug where the schema becomes `undefined` after handling a stale response which may lead to parse errors ([see CI test failure](https://github.com/electric-sql/electric/actions/runs/21870136703/job/63122684063)). This bug was found by Claude during a review: **Issue: schema undefined + ignored stale response → crash on `schema!`** The original code has the exact same flow: 1. Stale response with local handle → `return` from `#onInitialResponse` (line 1129 on main), skipping `this.#schema = this.#schema ?? getSchemaFromHeaders(headers)` (line 1144 on main) 2. Control returns to `#requestShapeLongPoll` which does `const schema = this.#schema!` 3. If schema was undefined (fresh session resuming from persisted handle/offset), this crashes The refactored code does the same thing: ignored transition → return early → `this.#syncState.schema!`. Identical behavior.
This PR [reproduces](https://github.com/electric-sql/electric/actions/runs/21897631349/job/63217468839?pr=3828) and fixes a bug related to stale handlers. ### Bug: stale cache detection fails when client's own handle is the expired handle When a shape handle is marked as expired (e.g. after a 409 response), the client is supposed to retry with a cache buster query parameter to bypass stale CDN/proxy caches. However, this only works when the client has **no handle** (fresh start) or a **different handle** than the expired one. When the client resumes with a persisted handle that happens to be the same as the expired handle (`localHandle === expiredHandle`), the stale detection logic sees that the client already has a handle and returns `ignored` instead of `stale-retry`. The client logs a warning ("Ignoring the stale response") but never adds a cache buster — so it just keeps receiving the same stale cached response in an infinite loop. ### How the test reproduces it 1. Marks handle `expired-H1` as expired in the `ExpiredShapesCache` 2. Creates a `ShapeStream` that resumes with `handle: expired-H1` (simulating a client that persisted this handle from a previous session) 3. The mock backend always returns responses with that same expired handle (mimicks CDN behaviour) 4. Asserts that the client should use a `cache_buster` query parameter to escape the stale cache — which currently fails because the client takes the `ignored` path instead of `stale-retry` ### Root cause In `checkStaleResponse` (line 311-344), the condition at line 322 is: ```typescript if (this.#shared.handle === undefined) { // enter stale retry } // else: "We have a valid local handle — ignore this stale response" ``` This assumes that if the client has a local handle, it's a *different* handle from the expired one, so the stale response can be safely ignored. But that assumption is wrong when `localHandle === expiredHandle` — the client resumed with the same handle that was marked expired. At this point in the code, we already know `responseHandle === expiredHandle` (line 317). The missing check is whether `this.#shared.handle` is *also* the expired handle. ### Fix Change the condition at line 322 from: ```typescript if (this.#shared.handle === undefined) { ``` to: ```typescript if (this.#shared.handle === undefined || this.#shared.handle === expiredHandle) { ``` That's it — one condition added. When the client's own handle matches the expired handle, it enters `stale-retry` (gets a cache buster) instead of falling through to `ignored`. The rest of the stale-retry machinery already handles everything correctly from there.
Replace the #pauseRequested boolean + #activeSnapshotRequests counter with a set-based PauseLock that tracks *why* the stream is paused. This fixes three concurrency bugs identified in PR #3816 review: 1. Snapshot resume while tab hidden: snapshot completes and resumes the stream even though the tab is still hidden, wasting bandwidth. Fix: snapshot releases its lock reason, but 'visibility' reason remains held — stream stays paused. 2. Visibility resume during active snapshot: tab becomes visible and resumes the stream while a snapshot is in flight, causing both the main stream and snapshot to write concurrently. Fix: visibility releases its lock reason, but 'snapshot-N' reason remains held — stream stays paused. 3. Snapshot blocks on live long-poll: requestSnapshot called #waitForStreamEnd BEFORE #pause, blocking up to 20s waiting for the long-poll to complete. Fix: PauseLock.acquire() is called BEFORE waitForStreamEnd, immediately aborting the long-poll via the onAcquired callback. Also fixes the #isRefreshing microtask race by replacing the boolean flag with a counter + getter pattern. forceDisconnectAndRefresh and wake detection both increment/decrement in try/finally blocks, eliminating the window where concurrent operations could clear the flag prematurely. https://claude.ai/code/session_01UGPdwB6UpFkkQi9p4sjPRj
|
amazing, when should we expect a new tag? |
|
This PR has been released! 🚀 The following packages include changes from this PR:
Thanks for contributing to Electric! |
Fixes #3785
This PR refactors the
ShapeStreamclass into an explicit state machine. This removes many state variables and code paths from theShapeStreaminto dedicated state classes.Summary
Extracts the implicit sync state from
ShapeStreaminto an explicit state machine (ShapeStreamState).The original
ShapeStreamtracked sync state as ~12 flat private fields (#lastOffset,#shapeHandle,#isUpToDate,#liveCacheBuster,#schema,#lastSyncedAt,#lastSeenCursor,#consecutiveShortSseConnections,#sseFallbackToLongPolling,#staleCacheBuster,#staleCacheRetryCount,#state) with transition logic scattered across#onInitialResponse,#onMessages,#reset,#constructUrl, and#requestShape. This made it hard to reason about which fields were relevant in which phase of the sync lifecycle.The new design replaces these with a single
#syncState: ShapeStreamStatefield backed by an immutable state machine:Each state carries only the fields relevant to it and defines its own behavior:
handleResponseMetadata(stale detection, field parsing, state-specific transitions)LiveStatepreserves SSE tracking,ReplayingStatedoes cursor-based suppression, fetching states transition toLiveStateapplyUrlParams(url)lets each state add its own query parameters (offset, handle, cache busters) instead of the client branching on fieldsshouldUseSse()andhandleSseConnectionClosed()live onLiveStatewhere the tracking state isShapeStreamis simplified to orchestration: it drives the request loop, handles errors, manages async coordination (pause/resume, snapshots, visibility), and delegates all sync state decisions to the state machine.