Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/tender-carpets-cheat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/electric-db-collection": patch
---

Handle predicates that are pushed down.
70 changes: 61 additions & 9 deletions packages/electric-db-collection/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import {
TimeoutWaitingForMatchError,
TimeoutWaitingForTxIdError,
} from "./errors"
import { compileSQL } from "./sql-compiler"
import type {
BaseCollectionConfig,
CollectionConfig,
DeleteMutationFnParams,
InsertMutationFnParams,
OnLoadMoreOptions,
SyncConfig,
UpdateMutationFnParams,
UtilsRecord,
Expand Down Expand Up @@ -72,6 +74,24 @@ type InferSchemaOutput<T> = T extends StandardSchemaV1
: Record<string, unknown>
: Record<string, unknown>

/**
* The mode of sync to use for the collection.
* @default `eager`
* @description
* - `eager`:
* - syncs all data immediately on preload
* - collection will be marked as ready once the sync is complete
* - there is no incremental sync
* - `on-demand`:
* - syncs data synced in incremental snapshots as the collection is queried
* - collection will be marked as ready immediately after the first snapshot is synced
* - `progressive`:
* - syncs all data in the shape in the background
* - uses incremental snapshots during the initial sync to provide a fast path to the data required for queries
* - collection will be marked as ready once the initial sync is complete
*/
export type SyncMode = `eager` | `on-demand` | `progressive`

/**
* Configuration interface for Electric collection options
* @template T - The type of items in the collection
Expand All @@ -88,6 +108,7 @@ export interface ElectricCollectionConfig<
* Configuration options for the ElectricSQL ShapeStream
*/
shapeOptions: ShapeStreamOptions<GetExtensions<T>>
syncMode?: SyncMode

/**
* Optional asynchronous handler function called before an insert operation
Expand Down Expand Up @@ -281,6 +302,7 @@ export function electricCollectionOptions(
} {
const seenTxids = new Store<Set<Txid>>(new Set([]))
const seenSnapshots = new Store<Array<PostgresSnapshot>>([])
const syncMode = config.syncMode ?? `eager`
const pendingMatches = new Store<
Map<
string,
Expand Down Expand Up @@ -331,6 +353,7 @@ export function electricCollectionOptions(
const sync = createElectricSync<any>(config.shapeOptions, {
seenTxids,
seenSnapshots,
syncMode,
pendingMatches,
currentBatchMessages,
removePendingMatches,
Expand Down Expand Up @@ -567,6 +590,7 @@ export function electricCollectionOptions(
function createElectricSync<T extends Row<unknown>>(
shapeOptions: ShapeStreamOptions<GetExtensions<T>>,
options: {
syncMode: SyncMode
seenTxids: Store<Set<Txid>>
seenSnapshots: Store<Array<PostgresSnapshot>>
pendingMatches: Store<
Expand All @@ -590,6 +614,7 @@ function createElectricSync<T extends Row<unknown>>(
const {
seenTxids,
seenSnapshots,
syncMode,
pendingMatches,
currentBatchMessages,
removePendingMatches,
Expand Down Expand Up @@ -653,6 +678,15 @@ function createElectricSync<T extends Row<unknown>>(

const stream = new ShapeStream({
...shapeOptions,
// In on-demand mode, we only want to sync changes, so we set the log to `changes_only`
log: syncMode === `on-demand` ? `changes_only` : undefined,
// In on-demand mode, we only need the changes from the point of time the collection was created
// so we default to `now` when there is no saved offset.
offset: shapeOptions.offset
? shapeOptions.offset
: syncMode === `on-demand`
? `now`
: undefined,
signal: abortController.signal,
onError: (errorParams) => {
// Just immediately mark ready if there's an error to avoid blocking
Expand Down Expand Up @@ -682,6 +716,7 @@ function createElectricSync<T extends Row<unknown>>(

unsubscribeStream = stream.subscribe((messages: Array<Message<T>>) => {
let hasUpToDate = false
let hasSnapshotEnd = false

for (const message of messages) {
// Add message to current batch buffer (for race condition handling)
Expand Down Expand Up @@ -746,6 +781,7 @@ function createElectricSync<T extends Row<unknown>>(
})
} else if (isSnapshotEndMessage(message)) {
newSnapshots.push(parseSnapshotMessage(message))
hasSnapshotEnd = true
} else if (isUpToDateMessage(message)) {
hasUpToDate = true
} else if (isMustRefetchMessage(message)) {
Expand All @@ -763,10 +799,11 @@ function createElectricSync<T extends Row<unknown>>(

// Reset hasUpToDate so we continue accumulating changes until next up-to-date
hasUpToDate = false
hasSnapshotEnd = false
}
}

if (hasUpToDate) {
if (hasUpToDate || hasSnapshotEnd) {
// Clear the current batch buffer since we're now up-to-date
currentBatchMessages.setState(() => [])

Expand All @@ -776,8 +813,10 @@ function createElectricSync<T extends Row<unknown>>(
transactionStarted = false
}

// Mark the collection as ready now that sync is up to date
markReady()
if (hasUpToDate || (hasSnapshotEnd && syncMode === `on-demand`)) {
// Mark the collection as ready now that sync is up to date
markReady()
}

// Always commit txids when we receive up-to-date, regardless of transaction state
seenTxids.setState((currentTxids) => {
Expand Down Expand Up @@ -811,12 +850,25 @@ function createElectricSync<T extends Row<unknown>>(
}
})

// Return the unsubscribe function
return () => {
// Unsubscribe from the stream
unsubscribeStream()
// Abort the abort controller to stop the stream
abortController.abort()
// Only set onLoadMore if the sync mode is not eager, this indicates to the sync
// layer can load more data on demand via the requestSnapshot method when,
// the syncMode = `on-demand` or `progressive`
const onLoadMore =
syncMode === `eager`
? undefined
: async (opts: OnLoadMoreOptions) => {
const snapshotParams = compileSQL<T>(opts)
await stream.requestSnapshot(snapshotParams)
}

return {
onLoadMore,
cleanup: () => {
// Unsubscribe from the stream
unsubscribeStream()
// Abort the abort controller to stop the stream
abortController.abort()
},
}
},
// Expose the getSyncMetadata function
Expand Down
27 changes: 27 additions & 0 deletions packages/electric-db-collection/src/pg-serializer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
export function serialize(value: unknown): string {
if (typeof value === `string`) {
return `'${value}'`
}

if (typeof value === `number`) {
return value.toString()
}

if (value === null || value === undefined) {
return `NULL`
}

if (typeof value === `boolean`) {
return value ? `true` : `false`
}

if (value instanceof Date) {
return `'${value.toISOString()}'`
}

if (Array.isArray(value)) {
return `ARRAY[${value.map(serialize).join(`,`)}]`
}

throw new Error(`Cannot serialize value: ${JSON.stringify(value)}`)
}
163 changes: 163 additions & 0 deletions packages/electric-db-collection/src/sql-compiler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import { serialize } from "./pg-serializer"
import type { SubsetParams } from "@electric-sql/client"
import type { IR, OnLoadMoreOptions } from "@tanstack/db"

export type CompiledSqlRecord = Omit<SubsetParams, `params`> & {
params?: Array<unknown>
}

export function compileSQL<T>(options: OnLoadMoreOptions): SubsetParams {
const { where, orderBy, limit } = options

const params: Array<T> = []
const compiledSQL: CompiledSqlRecord = { params }

if (where) {
// TODO: this only works when the where expression's PropRefs directly reference a column of the collection
// doesn't work if it goes through aliases because then we need to know the entire query to be able to follow the reference until the base collection (cf. followRef function)
compiledSQL.where = compileBasicExpression(where, params)
}

if (orderBy) {
compiledSQL.orderBy = compileOrderBy(orderBy, params)
}

if (limit) {
compiledSQL.limit = limit
}

// Serialize the values in the params array into PG formatted strings
// and transform the array into a Record<string, string>
const paramsRecord = params.reduce(
(acc, param, index) => {
acc[`${index + 1}`] = serialize(param)
return acc
},
{} as Record<string, string>
)

return {
...compiledSQL,
params: paramsRecord,
}
}

/**
* Compiles the expression to a SQL string and mutates the params array with the values.
* @param exp - The expression to compile
* @param params - The params array
* @returns The compiled SQL string
*/
function compileBasicExpression(
exp: IR.BasicExpression<unknown>,
params: Array<unknown>
): string {
switch (exp.type) {
case `val`:
params.push(exp.value)
return `$${params.length}`
case `ref`:
// TODO: doesn't yet support JSON(B) values which could be accessed with nested props
if (exp.path.length !== 1) {
throw new Error(
`Compiler can't handle nested properties: ${exp.path.join(`.`)}`
)
}
return exp.path[0]!
case `func`:
return compileFunction(exp, params)
default:
throw new Error(`Unknown expression type`)
}
}

function compileOrderBy(orderBy: IR.OrderBy, params: Array<unknown>): string {
const compiledOrderByClauses = orderBy.map((clause: IR.OrderByClause) =>
compileOrderByClause(clause, params)
)
return compiledOrderByClauses.join(`,`)
}

function compileOrderByClause(
clause: IR.OrderByClause,
params: Array<unknown>
): string {
// TODO: what to do with stringSort and locale?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This raises the question on if we have the wrong default in DB, should we default to lexical sort?

// Correctly supporting them is tricky as it depends on Postgres' collation
const { expression, compareOptions } = clause
let sql = compileBasicExpression(expression, params)

if (compareOptions.direction === `desc`) {
sql = `${sql} DESC`
}

if (compareOptions.nulls === `first`) {
sql = `${sql} NULLS FIRST`
}

if (compareOptions.nulls === `last`) {
sql = `${sql} NULLS LAST`
}

return sql
}

function compileFunction(
exp: IR.Func<unknown>,
params: Array<unknown> = []
): string {
const { name, args } = exp

const opName = getOpName(name)

const compiledArgs = args.map((arg: IR.BasicExpression) =>
compileBasicExpression(arg, params)
)

if (isBinaryOp(name)) {
if (compiledArgs.length !== 2) {
throw new Error(`Binary operator ${name} expects 2 arguments`)
}
const [lhs, rhs] = compiledArgs
return `${lhs} ${opName} ${rhs}`
}

return `${opName}(${compiledArgs.join(`,`)})`
}

function isBinaryOp(name: string): boolean {
const binaryOps = [`eq`, `gt`, `gte`, `lt`, `lte`, `and`, `or`]
return binaryOps.includes(name)
}

function getOpName(name: string): string {
const opNames = {
eq: `=`,
gt: `>`,
gte: `>=`,
lt: `<`,
lte: `<=`,
add: `+`,
and: `AND`,
or: `OR`,
not: `NOT`,
isUndefined: `IS NULL`,
isNull: `IS NULL`,
in: `IN`,
like: `LIKE`,
ilike: `ILIKE`,
upper: `UPPER`,
lower: `LOWER`,
length: `LENGTH`,
concat: `CONCAT`,
coalesce: `COALESCE`,
}

const opName = opNames[name as keyof typeof opNames]

if (!opName) {
throw new Error(`Unknown operator/function: ${name}`)
}

return opName
}
Loading
Loading