Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/salty-states-fry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@tanstack/react-db": patch
"@tanstack/db": patch
---

Fixed an issue with injecting the optimistic state removal into the reactive live query.
5 changes: 5 additions & 0 deletions .changeset/strong-groups-hunt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db-collections": patch
---

Added QueryCollection
21 changes: 15 additions & 6 deletions packages/db/src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,15 @@ export class Collection<T extends object = Record<string, unknown>> {
prevDepVals,
}) => {
const prevDerivedState = prevDepVals?.[0] ?? new Map<string, T>()
const prevOptimisticOperations = prevDepVals?.[1] ?? []
const changedKeys = new Set(this.syncedKeys)
optimisticOperations
.flat()
.filter((op) => op.isActive)
.forEach((op) => changedKeys.add(op.key))
prevOptimisticOperations.flat().forEach((op) => {
changedKeys.add(op.key)
})

if (changedKeys.size === 0) {
return []
Expand All @@ -309,12 +313,17 @@ export class Collection<T extends object = Record<string, unknown>> {
} else if (!prevDerivedState.has(key) && derivedState.has(key)) {
changes.push({ type: `insert`, key, value: derivedState.get(key)! })
} else if (prevDerivedState.has(key) && derivedState.has(key)) {
changes.push({
type: `update`,
key,
value: derivedState.get(key)!,
previousValue: prevDerivedState.get(key),
})
const value = derivedState.get(key)!
const previousValue = prevDerivedState.get(key)
if (value !== previousValue) {
// Comparing objects by reference as records are not mutated
changes.push({
type: `update`,
key,
value,
previousValue,
})
}
}
}

Expand Down
7 changes: 5 additions & 2 deletions packages/db/src/transactions.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { batch } from "@tanstack/store"
import { createDeferred } from "./deferred"
import type { Deferred } from "./deferred"
import type {
Expand Down Expand Up @@ -178,8 +179,10 @@ export class Transaction {
try {
await this.mutationFn({ transaction: this })

this.setState(`completed`)
this.touchCollection()
batch(() => {
this.setState(`completed`)
this.touchCollection()
})

this.isPersisted.resolve(this)
} catch (error) {
Expand Down
6 changes: 3 additions & 3 deletions packages/db/tests/collection-subscribe-changes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ describe(`Collection.subscribeChanges`, () => {
await tx.isPersisted.promise

// Verify synced update was emitted
expect(callback).toHaveBeenCalledTimes(1)
expect(callback).toHaveBeenCalledTimes(0)
// This is called 1 time when the mutationFn call returns
// and the optimistic state is dropped and the synced state applied.
callback.mockReset()
Expand Down Expand Up @@ -438,7 +438,7 @@ describe(`Collection.subscribeChanges`, () => {
await updateTx?.isPersisted.promise

// Verify synced update was emitted
expect(callback).toHaveBeenCalledTimes(1)
expect(callback).toHaveBeenCalledTimes(2) // FIXME: check is we can reduce this
// This is called 1 time when the mutationFn call returns
// and the optimistic state is dropped and the synced state applied.
callback.mockReset()
Expand Down Expand Up @@ -556,7 +556,7 @@ describe(`Collection.subscribeChanges`, () => {
await waitForChanges()

// Verify synced update was emitted
expect(callback).toHaveBeenCalledTimes(1)
expect(callback).toHaveBeenCalledTimes(0)
// This is called when the mutationFn returns and
// the optimistic state is dropped and synced state is
// applied.
Expand Down
134 changes: 134 additions & 0 deletions packages/db/tests/query/query-collection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import mitt from "mitt"
import { Collection } from "../../src/collection.js"
import { queryBuilder } from "../../src/query/query-builder.js"
import { compileQuery } from "../../src/query/compiled-query.js"
import { createTransaction } from "../../src/transactions.js"
import type { PendingMutation } from "../../src/types.js"

type Person = {
Expand Down Expand Up @@ -562,6 +563,139 @@ describe(`Query Collections`, () => {
{ id: `1`, name: `John Doe`, age: 40, _orderByIndex: 2 },
])
})

it(`optimistic state is dropped after commit`, async () => {
const emitter = mitt()

// Create person collection
const personCollection = new Collection<Person>({
id: `person-collection-test-bug`,
sync: {
sync: ({ begin, write, commit }) => {
// @ts-expect-error Mitt typing doesn't match our usage
emitter.on(`sync-person`, (changes: Array<PendingMutation>) => {
begin()
changes.forEach((change) => {
write({
key: change.key,
type: change.type,
value: change.changes as Person,
})
})
commit()
})
},
},
})

// Create issue collection
const issueCollection = new Collection<Issue>({
id: `issue-collection-test-bug`,
sync: {
sync: ({ begin, write, commit }) => {
// @ts-expect-error Mitt typing doesn't match our usage
emitter.on(`sync-issue`, (changes: Array<PendingMutation>) => {
begin()
changes.forEach((change) => {
write({
key: change.key,
type: change.type,
value: change.changes as Issue,
})
})
commit()
})
},
},
})

// Sync initial person data
emitter.emit(
`sync-person`,
initialPersons.map((person) => ({
key: person.id,
type: `insert`,
changes: person,
}))
)

// Sync initial issue data
emitter.emit(
`sync-issue`,
initialIssues.map((issue) => ({
key: issue.id,
type: `insert`,
changes: issue,
}))
)

// Create a query with a join between persons and issues
const query = queryBuilder()
.from({ issues: issueCollection })
.join({
type: `inner`,
from: { persons: personCollection },
on: [`@persons.id`, `=`, `@issues.userId`],
})
.select(`@issues.id`, `@issues.title`, `@persons.name`)
.keyBy(`@id`)

const compiledQuery = compileQuery(query)
compiledQuery.start()

const result = compiledQuery.results

await waitForChanges()

// Verify initial state
expect(result.state.size).toBe(3)

// Create a transaction to perform an optimistic mutation
const tx = createTransaction({
mutationFn: async () => {
emitter.emit(`sync-issue`, [
{
key: `4`,
type: `insert`,
changes: {
id: `4`,
title: `New Issue`,
description: `New Issue Description`,
userId: `1`,
},
},
])
return Promise.resolve()
},
})

// Perform optimistic insert of a new issue
tx.mutate(() =>
issueCollection.insert(
{
id: `temp-key`,
title: `New Issue`,
description: `New Issue Description`,
userId: `1`,
},
{ key: `temp-key` }
)
)

// Verify optimistic state is immediately reflected
expect(result.state.size).toBe(4)
expect(result.state.get(`temp-key`)).toEqual({
id: `temp-key`,
name: `John Doe`,
title: `New Issue`,
})

// Wait for the transaction to be committed
await tx.isPersisted.promise

expect(result.state.size).toBe(4)
expect(result.state.get(`4`)).toBeDefined()
})
})

async function waitForChanges(ms = 0) {
Expand Down
Loading