Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/eager-wings-jog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/electric-db-collection": patch
---

The awaitTxId utility now resolves transaction IDs based on snapshot-end message metadata (xmin, xmax, xip_list) in addition to explicit txid arrays, enabling matching on the initial snapshot at the start of a new shape.
2 changes: 1 addition & 1 deletion packages/electric-db-collection/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
"description": "ElectricSQL collection for TanStack DB",
"version": "0.1.28",
"dependencies": {
"@standard-schema/spec": "^1.0.0",
"@electric-sql/client": "^1.0.14",
"@standard-schema/spec": "^1.0.0",
"@tanstack/db": "workspace:*",
"@tanstack/store": "^0.7.7",
"debug": "^4.4.3"
Expand Down
92 changes: 85 additions & 7 deletions packages/electric-db-collection/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
ShapeStream,
isChangeMessage,
isControlMessage,
isVisibleInSnapshot,
} from "@electric-sql/client"
import { Store } from "@tanstack/store"
import DebugModule from "debug"
Expand All @@ -27,6 +28,7 @@ import type {
ControlMessage,
GetExtensions,
Message,
PostgresSnapshot,
Row,
ShapeStreamOptions,
} from "@electric-sql/client"
Expand All @@ -38,6 +40,23 @@ const debug = DebugModule.debug(`ts/db:electric`)
*/
export type Txid = number

/**
* Type representing the result of an insert, update, or delete handler
*/
type MaybeTxId =
| {
txid?: Txid | Array<Txid>
}
| undefined
| null

/**
* Type representing a snapshot end message
*/
type SnapshotEndMessage = ControlMessage & {
headers: { control: `snapshot-end` }
}

// The `InferSchemaOutput` and `ResolveType` are copied from the `@tanstack/db` package
// but we modified `InferSchemaOutput` slightly to restrict the schema output to `Row<unknown>`
// This is needed in order for `GetExtensions` to be able to infer the parser extensions type from the schema
Expand Down Expand Up @@ -80,6 +99,20 @@ function isMustRefetchMessage<T extends Row<unknown>>(
return isControlMessage(message) && message.headers.control === `must-refetch`
}

function isSnapshotEndMessage<T extends Row<unknown>>(
message: Message<T>
): message is SnapshotEndMessage {
return isControlMessage(message) && message.headers.control === `snapshot-end`
}

function parseSnapshotMessage(message: SnapshotEndMessage): PostgresSnapshot {
return {
xmin: message.headers.xmin,
xmax: message.headers.xmax,
xip_list: message.headers.xip_list,
}
}

// Check if a message contains txids in its headers
function hasTxids<T extends Row<unknown>>(
message: Message<T>
Expand Down Expand Up @@ -139,8 +172,10 @@ export function electricCollectionOptions(
schema?: any
} {
const seenTxids = new Store<Set<Txid>>(new Set([]))
const seenSnapshots = new Store<Array<PostgresSnapshot>>([])
const sync = createElectricSync<any>(config.shapeOptions, {
seenTxids,
seenSnapshots,
})

/**
Expand All @@ -158,20 +193,46 @@ export function electricCollectionOptions(
throw new ExpectedNumberInAwaitTxIdError(typeof txId)
}

// First check if the txid is in the seenTxids store
const hasTxid = seenTxids.state.has(txId)
if (hasTxid) return true

// Then check if the txid is in any of the seen snapshots
const hasSnapshot = seenSnapshots.state.some((snapshot) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How big are we envisioning this seenSnapshots list could become? Worst case we're looping over all snapshots and for each snapshot we check isVisibleInSnapshot which in the worst case loop over the snapshot's xip_list. So i'm wondering here if this could become a performance issue. If it is, we could as well keep a set of all txIds we've seen in snapshots and then we could just do a set lookup (which is in constant time).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially it should only be one, the initial snapshot, but with incremental sync it could grow. I intend to revisit both this and the seenTxids in a future PR

isVisibleInSnapshot(txId, snapshot)
)
if (hasSnapshot) return true

return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
unsubscribe()
unsubscribeSeenTxids()
unsubscribeSeenSnapshots()
reject(new TimeoutWaitingForTxIdError(txId))
}, timeout)

const unsubscribe = seenTxids.subscribe(() => {
const unsubscribeSeenTxids = seenTxids.subscribe(() => {
if (seenTxids.state.has(txId)) {
debug(`awaitTxId found match for txid %o`, txId)
clearTimeout(timeoutId)
unsubscribe()
unsubscribeSeenTxids()
unsubscribeSeenSnapshots()
resolve(true)
}
})

const unsubscribeSeenSnapshots = seenSnapshots.subscribe(() => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iiuc this subscription is racing against the seenTxidssubscription. The first one to find the txId will resolve the promise.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

const visibleSnapshot = seenSnapshots.state.find((snapshot) =>
isVisibleInSnapshot(txId, snapshot)
)
if (visibleSnapshot) {
debug(
`awaitTxId found match for txid %o in snapshot %o`,
txId,
visibleSnapshot
)
clearTimeout(timeoutId)
unsubscribeSeenSnapshots()
unsubscribeSeenTxids()
resolve(true)
}
})
Expand All @@ -183,8 +244,9 @@ export function electricCollectionOptions(
? async (params: InsertMutationFnParams<any>) => {
// Runtime check (that doesn't follow type)

const handlerResult = (await config.onInsert!(params)) ?? {}
const txid = (handlerResult as { txid?: Txid | Array<Txid> }).txid
const handlerResult =
((await config.onInsert!(params)) as MaybeTxId) ?? {}
const txid = handlerResult.txid

if (!txid) {
throw new ElectricInsertHandlerMustReturnTxIdError()
Expand All @@ -205,8 +267,9 @@ export function electricCollectionOptions(
? async (params: UpdateMutationFnParams<any>) => {
// Runtime check (that doesn't follow type)

const handlerResult = (await config.onUpdate!(params)) ?? {}
const txid = (handlerResult as { txid?: Txid | Array<Txid> }).txid
const handlerResult =
((await config.onUpdate!(params)) as MaybeTxId) ?? {}
const txid = handlerResult.txid

if (!txid) {
throw new ElectricUpdateHandlerMustReturnTxIdError()
Expand Down Expand Up @@ -269,9 +332,11 @@ function createElectricSync<T extends Row<unknown>>(
shapeOptions: ShapeStreamOptions<GetExtensions<T>>,
options: {
seenTxids: Store<Set<Txid>>
seenSnapshots: Store<Array<PostgresSnapshot>>
}
): SyncConfig<T> {
const { seenTxids } = options
const { seenSnapshots } = options

// Store for the relation schema information
const relationSchema = new Store<string | undefined>(undefined)
Expand Down Expand Up @@ -342,6 +407,7 @@ function createElectricSync<T extends Row<unknown>>(
})
let transactionStarted = false
const newTxids = new Set<Txid>()
const newSnapshots: Array<PostgresSnapshot> = []

unsubscribeStream = stream.subscribe((messages: Array<Message<T>>) => {
let hasUpToDate = false
Expand Down Expand Up @@ -373,6 +439,8 @@ function createElectricSync<T extends Row<unknown>>(
...message.headers,
},
})
} else if (isSnapshotEndMessage(message)) {
newSnapshots.push(parseSnapshotMessage(message))
} else if (isUpToDateMessage(message)) {
hasUpToDate = true
} else if (isMustRefetchMessage(message)) {
Expand Down Expand Up @@ -413,6 +481,16 @@ function createElectricSync<T extends Row<unknown>>(
newTxids.clear()
return clonedSeen
})

// Always commit snapshots when we receive up-to-date, regardless of transaction state
seenSnapshots.setState((currentSnapshots) => {
const seen = [...currentSnapshots, ...newSnapshots]
newSnapshots.forEach((snapshot) =>
debug(`new snapshot synced from pg %o`, snapshot)
)
newSnapshots.length = 0
return seen
})
}
})

Expand Down
Loading
Loading