Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .beads/electric-error-propegate.db
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats this?

Binary file not shown.
18 changes: 9 additions & 9 deletions packages/db/src/collection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ export class CollectionImpl<
public utils: Record<string, Fn> = {}

// Managers
private _events: CollectionEventsManager
public events: CollectionEventsManager
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the manager classes are intended to be a private api, but have to be public so that they can see each other, and we can introspect them from tests. This should change it to public, but maintain the underscore on the name.

The "public" face of the events api is a set of methods on the Collection class itself that forward to this manager.

Suggested change
public events: CollectionEventsManager
public _events: CollectionEventsManager

private _changes: CollectionChangesManager<TOutput, TKey, TSchema, TInput>
private _lifecycle: CollectionLifecycleManager<TOutput, TKey, TSchema, TInput>
private _sync: CollectionSyncManager<TOutput, TKey, TSchema, TInput>
Expand Down Expand Up @@ -261,7 +261,7 @@ export class CollectionImpl<
}

this._changes = new CollectionChangesManager()
this._events = new CollectionEventsManager()
this.events = new CollectionEventsManager()
this._indexes = new CollectionIndexesManager()
this._lifecycle = new CollectionLifecycleManager(config, this.id)
this._mutations = new CollectionMutationsManager(config, this.id)
Expand All @@ -272,9 +272,9 @@ export class CollectionImpl<
collection: this, // Required for passing to CollectionSubscription
lifecycle: this._lifecycle,
sync: this._sync,
events: this._events,
events: this.events,
})
this._events.setDeps({
this.events.setDeps({
collection: this, // Required for adding to emitted events
})
this._indexes.setDeps({
Expand All @@ -283,7 +283,7 @@ export class CollectionImpl<
})
this._lifecycle.setDeps({
changes: this._changes,
events: this._events,
events: this.events,
indexes: this._indexes,
state: this._state,
sync: this._sync,
Expand Down Expand Up @@ -810,7 +810,7 @@ export class CollectionImpl<
event: T,
callback: CollectionEventHandler<T>
) {
return this._events.on(event, callback)
return this.events.on(event, callback)
}

/**
Expand All @@ -820,7 +820,7 @@ export class CollectionImpl<
event: T,
callback: CollectionEventHandler<T>
) {
return this._events.once(event, callback)
return this.events.once(event, callback)
}

/**
Expand All @@ -830,7 +830,7 @@ export class CollectionImpl<
event: T,
callback: CollectionEventHandler<T>
) {
this._events.off(event, callback)
this.events.off(event, callback)
}

/**
Expand All @@ -840,7 +840,7 @@ export class CollectionImpl<
event: T,
timeout?: number
) {
return this._events.waitFor(event, timeout)
return this.events.waitFor(event, timeout)
}

/**
Expand Down
16 changes: 15 additions & 1 deletion packages/db/src/collection/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ export class CollectionLifecycleManager<
loading: [`initialCommit`, `ready`, `error`, `cleaned-up`],
initialCommit: [`ready`, `error`, `cleaned-up`],
ready: [`cleaned-up`, `error`],
error: [`cleaned-up`, `idle`],
error: [`cleaned-up`, `idle`, `loading`],
"cleaned-up": [`loading`, `error`],
}

Expand Down Expand Up @@ -144,6 +144,11 @@ export class CollectionLifecycleManager<
* @private - Should only be called by sync implementations
*/
public markReady(): void {
// If recovering from error, transition to loading first
if (this.status === `error`) {
this.setStatus(`loading`)
}
Comment on lines +147 to +150
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this, markReady should only move you to a ready state not loading.

We do not have a way in live queries to handle a ready => loading

How much handling of errors and moving between states to we expect the sync implementation to do? It seems to me there are two options for a collection:

  1. handle error recovery with no braking of the synced state, as they do with a truncate, and so a markReady to go from error -> ready makes sense.
  2. catastrophic "unrecoverable" error and the sync needs to be restarted. In that case a restartSync() method that the sync handler can call that:
  • calls .cleanup(), doing a gc and moves to cleaned-up
  • then calls .startSync(), which moves to loading and finally ready once the sync marks it as such.

Option 2 has some nuances arround subscriptions we would need to handle. Existing subscription would also need to be restarted, but we had not implemented that yet, it would be the truncate message from issue #634 proposing a refactor of the reconciliation process.


this.validateStatusTransition(this.status, `ready`)
// Can transition to ready from loading or initialCommit states
if (this.status === `loading` || this.status === `initialCommit`) {
Expand All @@ -170,6 +175,15 @@ export class CollectionLifecycleManager<
}
}

/**
* Mark the collection as being in an error state
* This is called by sync implementations when persistent errors occur
* @private - Should only be called by sync implementations
*/
public markError(): void {
this.setStatus(`error`)
}

/**
* Start the garbage collection timer
* Called when the collection becomes inactive (no subscribers)
Expand Down
17 changes: 17 additions & 0 deletions packages/db/src/collection/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ export class CollectionSyncManager<
markReady: () => {
this.lifecycle.markReady()
},
markError: () => {
this.lifecycle.markError()
},
truncate: () => {
const pendingTransaction =
this.state.pendingSyncedTransactions[
Expand Down Expand Up @@ -221,6 +224,19 @@ export class CollectionSyncManager<
resolve()
})

// Also listen for error status transitions and reject the promise
const unsubscribeError = this.collection.events.once(
`status:error`,
() => {
reject(new CollectionIsInErrorStateError())
}
)

// Clean up error listener when promise resolves
this.lifecycle.onFirstReady(() => {
unsubscribeError()
})

// Start sync if collection hasn't started yet or was cleaned up
if (
this.lifecycle.status === `idle` ||
Expand All @@ -229,6 +245,7 @@ export class CollectionSyncManager<
try {
this.startSync()
} catch (error) {
unsubscribeError()
reject(error)
return
}
Expand Down
1 change: 1 addition & 0 deletions packages/db/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ export interface SyncConfig<
write: (message: Omit<ChangeMessage<T>, `key`>) => void
commit: () => void
markReady: () => void
markError: () => void
truncate: () => void
}) => void | CleanupFn | SyncConfigRes

Expand Down
45 changes: 31 additions & 14 deletions packages/electric-db-collection/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,10 @@ function createElectricSync<T extends Row<unknown>>(
collectionId?: string
}
): SyncConfig<T> {
// Track first error for grace period before setting collection to error status
let firstErrorTimestamp: number | null = null
let errorGracePeriodTimeout: ReturnType<typeof setTimeout> | null = null
const ERROR_GRACE_PERIOD_MS = 10000 // 10 seconds
const {
seenTxids,
seenSnapshots,
Expand Down Expand Up @@ -620,7 +624,7 @@ function createElectricSync<T extends Row<unknown>>(

return {
sync: (params: Parameters<SyncConfig<T>[`sync`]>[0]) => {
const { begin, write, commit, markReady, truncate, collection } = params
const { begin, write, commit, markReady, markError, truncate } = params

// Abort controller for the stream - wraps the signal if provided
const abortController = new AbortController()
Expand Down Expand Up @@ -655,25 +659,29 @@ function createElectricSync<T extends Row<unknown>>(
...shapeOptions,
signal: abortController.signal,
onError: (errorParams) => {
// Just immediately mark ready if there's an error to avoid blocking
// apps waiting for `.preload()` to finish.
// Note that Electric sends a 409 error on a `must-refetch` message, but the
// ShapeStream handled this and it will not reach this handler, therefor
// this markReady will not be triggers by a `must-refetch`.
markReady()
// ShapeStream handles this and it will not reach this handler.
// If the error is transitory, ShapeStream will retry and eventually call
// markReady() naturally when it receives 'up-to-date'.

// Track first error for grace period
if (firstErrorTimestamp === null) {
firstErrorTimestamp = Date.now()

// After 10 seconds of continuous errors, set collection status to error
errorGracePeriodTimeout = setTimeout(() => {
markError()
}, ERROR_GRACE_PERIOD_MS)
}

if (shapeOptions.onError) {
return shapeOptions.onError(errorParams)
} else {
console.error(
`An error occurred while syncing collection: ${collection.id}, \n` +
`it has been marked as ready to avoid blocking apps waiting for '.preload()' to finish. \n` +
`You can provide an 'onError' handler on the shapeOptions to handle this error, and this message will not be logged.`,
errorParams
)
// If no custom error handler is provided, throw the error
// This ensures errors propagate to the app and aligns with
// Electric SQL's documented behavior
throw errorParams
}

return
},
})
let transactionStarted = false
Expand Down Expand Up @@ -767,6 +775,15 @@ function createElectricSync<T extends Row<unknown>>(
}

if (hasUpToDate) {
// Clear error tracking on successful sync (recovery from transitory errors)
if (firstErrorTimestamp !== null) {
firstErrorTimestamp = null
if (errorGracePeriodTimeout !== null) {
clearTimeout(errorGracePeriodTimeout)
errorGracePeriodTimeout = null
}
}

// Clear the current batch buffer since we're now up-to-date
currentBatchMessages.setState(() => [])

Expand Down
Loading
Loading