diff --git a/.beads/electric-error-propegate.db b/.beads/electric-error-propegate.db new file mode 100644 index 000000000..cadef807f Binary files /dev/null and b/.beads/electric-error-propegate.db differ diff --git a/packages/db/src/collection/index.ts b/packages/db/src/collection/index.ts index 6a2fd12ee..83a3a3c51 100644 --- a/packages/db/src/collection/index.ts +++ b/packages/db/src/collection/index.ts @@ -215,7 +215,7 @@ export class CollectionImpl< public utils: Record = {} // Managers - private _events: CollectionEventsManager + public events: CollectionEventsManager private _changes: CollectionChangesManager private _lifecycle: CollectionLifecycleManager private _sync: CollectionSyncManager @@ -261,7 +261,7 @@ export class CollectionImpl< } this._changes = new CollectionChangesManager() - this._events = new CollectionEventsManager() + this.events = new CollectionEventsManager() this._indexes = new CollectionIndexesManager() this._lifecycle = new CollectionLifecycleManager(config, this.id) this._mutations = new CollectionMutationsManager(config, this.id) @@ -272,9 +272,9 @@ export class CollectionImpl< collection: this, // Required for passing to CollectionSubscription lifecycle: this._lifecycle, sync: this._sync, - events: this._events, + events: this.events, }) - this._events.setDeps({ + this.events.setDeps({ collection: this, // Required for adding to emitted events }) this._indexes.setDeps({ @@ -283,7 +283,7 @@ export class CollectionImpl< }) this._lifecycle.setDeps({ changes: this._changes, - events: this._events, + events: this.events, indexes: this._indexes, state: this._state, sync: this._sync, @@ -810,7 +810,7 @@ export class CollectionImpl< event: T, callback: CollectionEventHandler ) { - return this._events.on(event, callback) + return this.events.on(event, callback) } /** @@ -820,7 +820,7 @@ export class CollectionImpl< event: T, callback: CollectionEventHandler ) { - return this._events.once(event, callback) + return this.events.once(event, callback) } /** @@ -830,7 +830,7 @@ export class CollectionImpl< event: T, callback: CollectionEventHandler ) { - this._events.off(event, callback) + this.events.off(event, callback) } /** @@ -840,7 +840,7 @@ export class CollectionImpl< event: T, timeout?: number ) { - return this._events.waitFor(event, timeout) + return this.events.waitFor(event, timeout) } /** diff --git a/packages/db/src/collection/lifecycle.ts b/packages/db/src/collection/lifecycle.ts index 71c09d022..c8eedb84d 100644 --- a/packages/db/src/collection/lifecycle.ts +++ b/packages/db/src/collection/lifecycle.ts @@ -78,7 +78,7 @@ export class CollectionLifecycleManager< loading: [`initialCommit`, `ready`, `error`, `cleaned-up`], initialCommit: [`ready`, `error`, `cleaned-up`], ready: [`cleaned-up`, `error`], - error: [`cleaned-up`, `idle`], + error: [`cleaned-up`, `idle`, `loading`], "cleaned-up": [`loading`, `error`], } @@ -144,6 +144,11 @@ export class CollectionLifecycleManager< * @private - Should only be called by sync implementations */ public markReady(): void { + // If recovering from error, transition to loading first + if (this.status === `error`) { + this.setStatus(`loading`) + } + this.validateStatusTransition(this.status, `ready`) // Can transition to ready from loading or initialCommit states if (this.status === `loading` || this.status === `initialCommit`) { @@ -170,6 +175,15 @@ export class CollectionLifecycleManager< } } + /** + * Mark the collection as being in an error state + * This is called by sync implementations when persistent errors occur + * @private - Should only be called by sync implementations + */ + public markError(): void { + this.setStatus(`error`) + } + /** * Start the garbage collection timer * Called when the collection becomes inactive (no subscribers) diff --git a/packages/db/src/collection/sync.ts b/packages/db/src/collection/sync.ts index 1a26038dc..000eba803 100644 --- a/packages/db/src/collection/sync.ts +++ b/packages/db/src/collection/sync.ts @@ -159,6 +159,9 @@ export class CollectionSyncManager< markReady: () => { this.lifecycle.markReady() }, + markError: () => { + this.lifecycle.markError() + }, truncate: () => { const pendingTransaction = this.state.pendingSyncedTransactions[ @@ -221,6 +224,19 @@ export class CollectionSyncManager< resolve() }) + // Also listen for error status transitions and reject the promise + const unsubscribeError = this.collection.events.once( + `status:error`, + () => { + reject(new CollectionIsInErrorStateError()) + } + ) + + // Clean up error listener when promise resolves + this.lifecycle.onFirstReady(() => { + unsubscribeError() + }) + // Start sync if collection hasn't started yet or was cleaned up if ( this.lifecycle.status === `idle` || @@ -229,6 +245,7 @@ export class CollectionSyncManager< try { this.startSync() } catch (error) { + unsubscribeError() reject(error) return } diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index 4111cf134..f61d3fc11 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -172,6 +172,7 @@ export interface SyncConfig< write: (message: Omit, `key`>) => void commit: () => void markReady: () => void + markError: () => void truncate: () => void }) => void | CleanupFn | SyncConfigRes diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index bdd6f34a7..2a5c08bac 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -587,6 +587,10 @@ function createElectricSync>( collectionId?: string } ): SyncConfig { + // Track first error for grace period before setting collection to error status + let firstErrorTimestamp: number | null = null + let errorGracePeriodTimeout: ReturnType | null = null + const ERROR_GRACE_PERIOD_MS = 10000 // 10 seconds const { seenTxids, seenSnapshots, @@ -620,7 +624,7 @@ function createElectricSync>( return { sync: (params: Parameters[`sync`]>[0]) => { - const { begin, write, commit, markReady, truncate, collection } = params + const { begin, write, commit, markReady, markError, truncate } = params // Abort controller for the stream - wraps the signal if provided const abortController = new AbortController() @@ -655,25 +659,29 @@ function createElectricSync>( ...shapeOptions, signal: abortController.signal, onError: (errorParams) => { - // Just immediately mark ready if there's an error to avoid blocking - // apps waiting for `.preload()` to finish. // Note that Electric sends a 409 error on a `must-refetch` message, but the - // ShapeStream handled this and it will not reach this handler, therefor - // this markReady will not be triggers by a `must-refetch`. - markReady() + // ShapeStream handles this and it will not reach this handler. + // If the error is transitory, ShapeStream will retry and eventually call + // markReady() naturally when it receives 'up-to-date'. + + // Track first error for grace period + if (firstErrorTimestamp === null) { + firstErrorTimestamp = Date.now() + + // After 10 seconds of continuous errors, set collection status to error + errorGracePeriodTimeout = setTimeout(() => { + markError() + }, ERROR_GRACE_PERIOD_MS) + } if (shapeOptions.onError) { return shapeOptions.onError(errorParams) } else { - console.error( - `An error occurred while syncing collection: ${collection.id}, \n` + - `it has been marked as ready to avoid blocking apps waiting for '.preload()' to finish. \n` + - `You can provide an 'onError' handler on the shapeOptions to handle this error, and this message will not be logged.`, - errorParams - ) + // If no custom error handler is provided, throw the error + // This ensures errors propagate to the app and aligns with + // Electric SQL's documented behavior + throw errorParams } - - return }, }) let transactionStarted = false @@ -767,6 +775,15 @@ function createElectricSync>( } if (hasUpToDate) { + // Clear error tracking on successful sync (recovery from transitory errors) + if (firstErrorTimestamp !== null) { + firstErrorTimestamp = null + if (errorGracePeriodTimeout !== null) { + clearTimeout(errorGracePeriodTimeout) + errorGracePeriodTimeout = null + } + } + // Clear the current batch buffer since we're now up-to-date currentBatchMessages.setState(() => []) diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index bf059a021..c2d6ee437 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -1702,4 +1702,291 @@ describe(`Electric Integration`, () => { vi.useRealTimers() }) }) + + // Tests for error propagation + describe(`Error propagation`, () => { + let capturedOnError: ((error: Error) => void) | undefined + + beforeEach(async () => { + vi.clearAllMocks() + capturedOnError = undefined + + // Import and mock ShapeStream to capture onError callback + const electricClient = await import(`@electric-sql/client`) + const ShapeStreamMock = electricClient.ShapeStream as any + + // Override the mock to capture onError + ShapeStreamMock.mockImplementation((options: any) => { + capturedOnError = options.onError + return mockStream + }) + + // Reset mock subscriber + mockSubscribe.mockImplementation((callback) => { + subscriber = callback + return () => {} + }) + }) + + it(`should throw error when no custom onError handler is provided`, () => { + const config = { + id: `error-propagation-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + createCollection(electricCollectionOptions(config)) + + // Verify we captured the onError callback + expect(capturedOnError).toBeDefined() + + // Simulate an error from Electric + const testError = new Error(`Network connection failed`) + + // The onError callback should throw the error + expect(() => { + capturedOnError!(testError) + }).toThrow(`Network connection failed`) + }) + + it(`should call custom onError handler when provided`, () => { + const customErrorHandler = vi.fn() + const config = { + id: `custom-error-handler-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + onError: customErrorHandler, + }, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + createCollection(electricCollectionOptions(config)) + + // Verify we captured the onError callback + expect(capturedOnError).toBeDefined() + + // Simulate an error from Electric + const testError = new Error(`Network connection failed`) + + // The onError callback should call the custom handler + capturedOnError!(testError) + + expect(customErrorHandler).toHaveBeenCalledWith(testError) + }) + + it(`should not mark collection as ready on error`, () => { + const config = { + id: `no-premature-ready-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + onError: () => { + // Custom handler that swallows the error + }, + }, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Collection should start in loading state + expect(testCollection.status).toBe(`loading`) + + // Simulate an error from Electric + const testError = new Error(`Network connection failed`) + capturedOnError!(testError) + + // Collection should still be loading (not marked ready) + expect(testCollection.status).toBe(`loading`) + + // Verify collection has no data + expect(testCollection.size).toBe(0) + }) + + it(`should allow ShapeStream to call markReady after recovery from transitory error`, () => { + const config = { + id: `recovery-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + onError: () => { + // Custom handler that allows retry/recovery + }, + }, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Simulate a transitory error + const testError = new Error(`Temporary network issue`) + capturedOnError!(testError) + + // Collection should still be loading + expect(testCollection.status).toBe(`loading`) + + // Simulate ShapeStream recovering and sending data + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Now collection should be ready with data + expect(testCollection.status).toBe(`ready`) + expect(testCollection.has(1)).toBe(true) + }) + + it(`should set collection to error status after 10 seconds of continuous errors`, () => { + vi.useFakeTimers() + + const config = { + id: `persistent-error-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + onError: () => { + // Custom handler that swallows the error + }, + }, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Collection should start in loading state + expect(testCollection.status).toBe(`loading`) + + // Simulate an error + const testError = new Error(`Persistent network error`) + capturedOnError!(testError) + + // Immediately after error, should still be loading + expect(testCollection.status).toBe(`loading`) + + // Fast forward 9 seconds - should still be loading + vi.advanceTimersByTime(9000) + expect(testCollection.status).toBe(`loading`) + + // Fast forward past 10 seconds - should now be error + vi.advanceTimersByTime(1100) + expect(testCollection.status).toBe(`error`) + + vi.useRealTimers() + }) + + it(`should reject preload() after 10 seconds of continuous errors`, async () => { + vi.useFakeTimers() + + const config = { + id: `preload-reject-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + onError: () => { + // Custom handler that swallows the error + }, + }, + getKey: (item: Row) => item.id as number, + startSync: false, // Don't auto-start, we'll call preload + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Start preload + const preloadPromise = testCollection.preload() + + // Catch the rejection to prevent test failure + preloadPromise.catch(() => {}) + + // Simulate an error + const testError = new Error(`Persistent network error`) + capturedOnError!(testError) + + // Fast forward past 10 seconds to trigger error status + await vi.advanceTimersByTimeAsync(11000) + + // preload should reject + await expect(preloadPromise).rejects.toThrow() + + vi.useRealTimers() + }, 15000) + + it(`should clear error timer and not set error status if recovery happens within 10 seconds`, () => { + vi.useFakeTimers() + + const config = { + id: `recovery-within-grace-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + onError: () => { + // Custom handler that swallows the error + }, + }, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Simulate an error + const testError = new Error(`Temporary network error`) + capturedOnError!(testError) + + // Fast forward 5 seconds + vi.advanceTimersByTime(5000) + expect(testCollection.status).toBe(`loading`) + + // Simulate successful recovery + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Should be ready, not error + expect(testCollection.status).toBe(`ready`) + expect(testCollection.has(1)).toBe(true) + + // Fast forward past 10 seconds to ensure timer was cleared + vi.advanceTimersByTime(6000) + expect(testCollection.status).toBe(`ready`) // Still ready, not error + + vi.useRealTimers() + }) + }) }) diff --git a/packages/query-db-collection/src/query.ts b/packages/query-db-collection/src/query.ts index 2d53105cc..bbedaec60 100644 --- a/packages/query-db-collection/src/query.ts +++ b/packages/query-db-collection/src/query.ts @@ -422,7 +422,7 @@ export function queryCollectionOptions( let lastErrorUpdatedAt = 0 const internalSync: SyncConfig[`sync`] = (params) => { - const { begin, write, commit, markReady, collection } = params + const { begin, write, commit, markReady, markError, collection } = params const observerOptions: QueryObserverOptions< Array, @@ -537,15 +537,16 @@ export function queryCollectionOptions( lastError = result.error errorCount++ lastErrorUpdatedAt = result.errorUpdatedAt + + // Set collection status to error since TanStack Query has already + // exhausted all retry attempts before reaching this point + markError() } console.error( `[QueryCollection] Error observing query ${String(queryKey)}:`, result.error ) - - // Mark collection as ready even on error to avoid blocking apps - markReady() } } diff --git a/packages/query-db-collection/tests/query.test.ts b/packages/query-db-collection/tests/query.test.ts index b87caf67c..4b416ea19 100644 --- a/packages/query-db-collection/tests/query.test.ts +++ b/packages/query-db-collection/tests/query.test.ts @@ -2165,9 +2165,9 @@ describe(`QueryCollection`, () => { const options = queryCollectionOptions(config) const collection = createCollection(options) - // Wait for collection to be ready (even with error) + // Wait for collection to error after query fails await vi.waitFor(() => { - expect(collection.status).toBe(`ready`) + expect(collection.status).toBe(`error`) expect(collection.utils.isError()).toBe(true) }) @@ -2191,9 +2191,9 @@ describe(`QueryCollection`, () => { queryFn ) - // Wait for collection to be ready (even with error) + // Wait for collection to error after query fails await vi.waitFor(() => { - expect(collection.status).toBe(`ready`) + expect(collection.status).toBe(`error`) expect(collection.utils.isError()).toBe(true) }) @@ -2237,7 +2237,7 @@ describe(`QueryCollection`, () => { // Wait for all retry attempts to complete and final failure await vi.waitFor( () => { - expect(collection.status).toBe(`ready`) // Should be ready even with error + expect(collection.status).toBe(`error`) // Should be in error state after all retries exhausted expect(queryFn).toHaveBeenCalledTimes(totalAttempts) expect(collection.utils.isError()).toBe(true) },