Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/fresh-yaks-lay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@tanstack/electric-db-collection": patch
"@tanstack/db": patch
---

prefix logs and errors with collection id, when available
7 changes: 5 additions & 2 deletions packages/db/src/collection/change-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import type { BasicExpression } from "../query/ir.js"
export interface CollectionLike<
T extends object = Record<string, unknown>,
TKey extends string | number = string | number,
> extends Pick<Collection<T, TKey>, `get` | `has` | `entries` | `indexes`> {}
> extends Pick<
Collection<T, TKey>,
`get` | `has` | `entries` | `indexes` | `id`
> {}

/**
* Returns the current state of the collection as an array of changes
Expand Down Expand Up @@ -109,7 +112,7 @@ export function currentStateAsChanges<
} catch (error) {
// If anything goes wrong with the where clause, fall back to full scan
console.warn(
`Error processing where clause, falling back to full scan:`,
`${collection.id ? `[${collection.id}] ` : ``}Error processing where clause, falling back to full scan:`,
error
)

Expand Down
5 changes: 4 additions & 1 deletion packages/db/src/collection/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ export class CollectionLifecycleManager<
if (newStatus === `ready` && !this.indexes.isIndexesResolved) {
// Resolve indexes asynchronously without blocking
this.indexes.resolveAllIndexes().catch((error) => {
console.warn(`Failed to resolve indexes:`, error)
console.warn(
`${this.config.id ? `[${this.config.id}] ` : ``}Failed to resolve indexes:`,
error
)
})
}

Expand Down
5 changes: 4 additions & 1 deletion packages/db/src/indexes/auto-index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ export function ensureIndexForField<
options: compareFn ? { compareFn, compareOptions } : {},
})
} catch (error) {
console.warn(`Failed to create auto-index for field "${fieldName}":`, error)
console.warn(
`${collection.id ? `[${collection.id}] ` : ``}Failed to create auto-index for field "${fieldName}":`,
error
)
}
}

Expand Down
39 changes: 26 additions & 13 deletions packages/electric-db-collection/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ export function electricCollectionOptions(
const sync = createElectricSync<any>(config.shapeOptions, {
seenTxids,
seenSnapshots,
collectionId: config.id,
})

/**
Expand All @@ -188,9 +189,12 @@ export function electricCollectionOptions(
txId: Txid,
timeout: number = 30000
): Promise<boolean> => {
debug(`awaitTxId called with txid %d`, txId)
debug(
`${config.id ? `[${config.id}] ` : ``}awaitTxId called with txid %d`,
txId
)
if (typeof txId !== `number`) {
throw new ExpectedNumberInAwaitTxIdError(typeof txId)
throw new ExpectedNumberInAwaitTxIdError(typeof txId, config.id)
}

// First check if the txid is in the seenTxids store
Expand All @@ -207,12 +211,15 @@ export function electricCollectionOptions(
const timeoutId = setTimeout(() => {
unsubscribeSeenTxids()
unsubscribeSeenSnapshots()
reject(new TimeoutWaitingForTxIdError(txId))
reject(new TimeoutWaitingForTxIdError(txId, config.id))
}, timeout)

const unsubscribeSeenTxids = seenTxids.subscribe(() => {
if (seenTxids.state.has(txId)) {
debug(`awaitTxId found match for txid %o`, txId)
debug(
`${config.id ? `[${config.id}] ` : ``}awaitTxId found match for txid %o`,
txId
)
clearTimeout(timeoutId)
unsubscribeSeenTxids()
unsubscribeSeenSnapshots()
Expand All @@ -226,7 +233,7 @@ export function electricCollectionOptions(
)
if (visibleSnapshot) {
debug(
`awaitTxId found match for txid %o in snapshot %o`,
`${config.id ? `[${config.id}] ` : ``}awaitTxId found match for txid %o in snapshot %o`,
txId,
visibleSnapshot
)
Expand All @@ -249,7 +256,7 @@ export function electricCollectionOptions(
const txid = handlerResult.txid

if (!txid) {
throw new ElectricInsertHandlerMustReturnTxIdError()
throw new ElectricInsertHandlerMustReturnTxIdError(config.id)
}

// Handle both single txid and array of txids
Expand All @@ -272,7 +279,7 @@ export function electricCollectionOptions(
const txid = handlerResult.txid

if (!txid) {
throw new ElectricUpdateHandlerMustReturnTxIdError()
throw new ElectricUpdateHandlerMustReturnTxIdError(config.id)
}

// Handle both single txid and array of txids
Expand All @@ -290,7 +297,7 @@ export function electricCollectionOptions(
? async (params: DeleteMutationFnParams<any>) => {
const handlerResult = await config.onDelete!(params)
if (!handlerResult.txid) {
throw new ElectricDeleteHandlerMustReturnTxIdError()
throw new ElectricDeleteHandlerMustReturnTxIdError(config.id)
}

// Handle both single txid and array of txids
Expand Down Expand Up @@ -333,10 +340,10 @@ function createElectricSync<T extends Row<unknown>>(
options: {
seenTxids: Store<Set<Txid>>
seenSnapshots: Store<Array<PostgresSnapshot>>
collectionId?: string
}
): SyncConfig<T> {
const { seenTxids } = options
const { seenSnapshots } = options
const { seenTxids, seenSnapshots, collectionId } = options

// Store for the relation schema information
const relationSchema = new Store<string | undefined>(undefined)
Expand Down Expand Up @@ -445,7 +452,7 @@ function createElectricSync<T extends Row<unknown>>(
hasUpToDate = true
} else if (isMustRefetchMessage(message)) {
debug(
`Received must-refetch message, starting transaction with truncate`
`${collectionId ? `[${collectionId}] ` : ``}Received must-refetch message, starting transaction with truncate`
)

// Start a transaction and truncate the collection
Expand Down Expand Up @@ -475,7 +482,10 @@ function createElectricSync<T extends Row<unknown>>(
seenTxids.setState((currentTxids) => {
const clonedSeen = new Set<Txid>(currentTxids)
if (newTxids.size > 0) {
debug(`new txids synced from pg %O`, Array.from(newTxids))
debug(
`${collectionId ? `[${collectionId}] ` : ``}new txids synced from pg %O`,
Array.from(newTxids)
)
}
newTxids.forEach((txid) => clonedSeen.add(txid))
newTxids.clear()
Expand All @@ -486,7 +496,10 @@ function createElectricSync<T extends Row<unknown>>(
seenSnapshots.setState((currentSnapshots) => {
const seen = [...currentSnapshots, ...newSnapshots]
newSnapshots.forEach((snapshot) =>
debug(`new snapshot synced from pg %o`, snapshot)
debug(
`${collectionId ? `[${collectionId}] ` : ``}new snapshot synced from pg %o`,
snapshot
)
)
newSnapshots.length = 0
return seen
Expand Down
27 changes: 15 additions & 12 deletions packages/electric-db-collection/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,51 @@ import { TanStackDBError } from "@tanstack/db"

// Electric DB Collection Errors
export class ElectricDBCollectionError extends TanStackDBError {
constructor(message: string) {
super(message)
constructor(message: string, collectionId?: string) {
super(`${collectionId ? `[${collectionId}] ` : ``}${message}`)
this.name = `ElectricDBCollectionError`
}
}

export class ExpectedNumberInAwaitTxIdError extends ElectricDBCollectionError {
constructor(txIdType: string) {
super(`Expected number in awaitTxId, received ${txIdType}`)
constructor(txIdType: string, collectionId?: string) {
super(`Expected number in awaitTxId, received ${txIdType}`, collectionId)
this.name = `ExpectedNumberInAwaitTxIdError`
}
}

export class TimeoutWaitingForTxIdError extends ElectricDBCollectionError {
constructor(txId: number) {
super(`Timeout waiting for txId: ${txId}`)
constructor(txId: number, collectionId?: string) {
super(`Timeout waiting for txId: ${txId}`, collectionId)
this.name = `TimeoutWaitingForTxIdError`
}
}

export class ElectricInsertHandlerMustReturnTxIdError extends ElectricDBCollectionError {
constructor() {
constructor(collectionId?: string) {
super(
`Electric collection onInsert handler must return a txid or array of txids`
`Electric collection onInsert handler must return a txid or array of txids`,
collectionId
)
this.name = `ElectricInsertHandlerMustReturnTxIdError`
}
}

export class ElectricUpdateHandlerMustReturnTxIdError extends ElectricDBCollectionError {
constructor() {
constructor(collectionId?: string) {
super(
`Electric collection onUpdate handler must return a txid or array of txids`
`Electric collection onUpdate handler must return a txid or array of txids`,
collectionId
)
this.name = `ElectricUpdateHandlerMustReturnTxIdError`
}
}

export class ElectricDeleteHandlerMustReturnTxIdError extends ElectricDBCollectionError {
constructor() {
constructor(collectionId?: string) {
super(
`Electric collection onDelete handler must return a txid or array of txids`
`Electric collection onDelete handler must return a txid or array of txids`,
collectionId
)
this.name = `ElectricDeleteHandlerMustReturnTxIdError`
}
Expand Down
2 changes: 1 addition & 1 deletion packages/electric-db-collection/tests/electric.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ describe(`Electric Integration`, () => {
// @ts-expect-error
collection.utils.awaitTxId(`123`)
).rejects.toThrowErrorMatchingInlineSnapshot(
`[ExpectedNumberInAwaitTxIdError: Expected number in awaitTxId, received string]`
`[ExpectedNumberInAwaitTxIdError: [test] Expected number in awaitTxId, received string]`
)

// The txid should be tracked and awaitTxId should resolve immediately
Expand Down
Loading