Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-snapshot-end-before-up-to-date.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/electric-db-collection": patch
---

Fix eager mode incorrectly committing data on `snapshot-end` before receiving the first `up-to-date` message. The `snapshot-end` in the Electric log can be from a significant period before the stream is actually up to date, so commits should only occur on `up-to-date` (or on `snapshot-end` after the first `up-to-date` has been received). This change does not affect `on-demand` mode where `snapshot-end` correctly triggers commits, or `progressive` mode which was already protected by its buffering mechanism.
8 changes: 7 additions & 1 deletion packages/electric-db-collection/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,13 @@ function createElectricSync<T extends Row<unknown>>(
)
} else {
// Normal mode or on-demand: commit transaction if one was started
if (transactionStarted) {
// In eager mode, only commit on snapshot-end if we've already received
// the first up-to-date, because the snapshot-end in the log could be from
// a significant period before the stream is actually up to date
const shouldCommit =
hasUpToDate || syncMode === `on-demand` || hasReceivedUpToDate

if (transactionStarted && shouldCommit) {
commit()
transactionStarted = false
}
Expand Down
107 changes: 88 additions & 19 deletions packages/electric-db-collection/tests/electric.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2264,22 +2264,35 @@ describe(`Electric Integration`, () => {

// Tests for commit and ready behavior with snapshot-end and up-to-date messages
describe(`Commit and ready behavior`, () => {
it(`should commit on snapshot-end in eager mode but not mark ready`, () => {
it(`should ignore snapshot-end before first up-to-date in progressive mode`, () => {
vi.clearAllMocks()

let testSubscriber!: (messages: Array<Message<Row>>) => void
mockSubscribe.mockImplementation((callback) => {
testSubscriber = callback
return () => {}
})
mockRequestSnapshot.mockResolvedValue(undefined)
mockFetchSnapshot.mockResolvedValue({ metadata: {}, data: [] })

const config = {
id: `eager-snapshot-end-test`,
id: `progressive-ignore-snapshot-end-test`,
shapeOptions: {
url: `http://test-url`,
params: { table: `test_table` },
},
syncMode: `eager` as const,
syncMode: `progressive` as const,
getKey: (item: Row) => item.id as number,
startSync: true,
}

const testCollection = createCollection(electricCollectionOptions(config))

// Send data followed by snapshot-end (but no up-to-date)
subscriber([
// In progressive mode, these messages should be BUFFERED, and snapshot-end
// should NOT trigger a commit because the snapshot-end in the log could be
// from a significant period before the stream is actually up to date
testSubscriber([
{
key: `1`,
value: { id: 1, name: `Test User` },
Expand All @@ -2295,21 +2308,74 @@ describe(`Electric Integration`, () => {
},
])

// Data should be committed (available in state)
// Data should NOT be visible yet (snapshot-end should be ignored before up-to-date)
expect(testCollection.has(1)).toBe(false)
expect(testCollection.status).toBe(`loading`)

// Now send up-to-date (triggers atomic swap)
testSubscriber([
{
headers: { control: `up-to-date` },
},
])

// Now data should be visible after atomic swap
expect(testCollection.has(1)).toBe(true)
expect(testCollection.get(1)).toEqual({ id: 1, name: `Test User` })
expect(testCollection.status).toBe(`ready`)
})

// But collection should NOT be marked as ready yet in eager mode
expect(testCollection.status).toBe(`loading`)
it(`should commit on snapshot-end in eager mode AFTER first up-to-date`, () => {
const config = {
id: `eager-snapshot-end-test`,
shapeOptions: {
url: `http://test-url`,
params: { table: `test_table` },
},
syncMode: `eager` as const,
getKey: (item: Row) => item.id as number,
startSync: true,
}

// Now send up-to-date
const testCollection = createCollection(electricCollectionOptions(config))

// First send up-to-date (with initial data) to establish the connection
subscriber([
{
key: `1`,
value: { id: 1, name: `Test User` },
headers: { operation: `insert` },
},
{
headers: { control: `up-to-date` },
},
])

// Now it should be ready
// Data should be committed and collection ready
expect(testCollection.has(1)).toBe(true)
expect(testCollection.status).toBe(`ready`)

// Now send more data followed by snapshot-end (simulating incremental snapshot)
// After the first up-to-date, snapshot-end SHOULD commit
subscriber([
{
key: `2`,
value: { id: 2, name: `Second User` },
headers: { operation: `insert` },
},
{
headers: {
control: `snapshot-end`,
xmin: `100`,
xmax: `110`,
xip_list: [],
},
},
])

// Data should be committed since we've already received up-to-date
expect(testCollection.has(2)).toBe(true)
expect(testCollection.get(2)).toEqual({ id: 2, name: `Second User` })
expect(testCollection.status).toBe(`ready`)
})

Expand Down Expand Up @@ -2415,7 +2481,7 @@ describe(`Electric Integration`, () => {
expect(testCollection.status).toBe(`ready`)
})

it(`should commit multiple snapshot-end messages before up-to-date in eager mode`, () => {
it(`should NOT commit multiple snapshot-end messages before up-to-date in eager mode`, () => {
const config = {
id: `eager-multiple-snapshots-test`,
shapeOptions: {
Expand All @@ -2429,7 +2495,7 @@ describe(`Electric Integration`, () => {

const testCollection = createCollection(electricCollectionOptions(config))

// First snapshot with data
// First snapshot with data - snapshot-end should be ignored before up-to-date
subscriber([
{
key: `1`,
Expand All @@ -2446,11 +2512,11 @@ describe(`Electric Integration`, () => {
},
])

// First data should be committed
expect(testCollection.has(1)).toBe(true)
// First data should NOT be committed yet (snapshot-end ignored before up-to-date)
expect(testCollection.has(1)).toBe(false)
expect(testCollection.status).toBe(`loading`)

// Second snapshot with more data
// Second snapshot with more data - still before up-to-date, so should be ignored
subscriber([
{
key: `2`,
Expand All @@ -2467,19 +2533,22 @@ describe(`Electric Integration`, () => {
},
])

// Second data should also be committed
expect(testCollection.has(2)).toBe(true)
expect(testCollection.size).toBe(2)
// Second data should also NOT be committed yet
expect(testCollection.has(2)).toBe(false)
expect(testCollection.size).toBe(0)
expect(testCollection.status).toBe(`loading`)

// Finally send up-to-date
// Finally send up-to-date - this commits all the pending data
subscriber([
{
headers: { control: `up-to-date` },
},
])

// Now should be ready
// Now all data should be committed and collection ready
expect(testCollection.has(1)).toBe(true)
expect(testCollection.has(2)).toBe(true)
expect(testCollection.size).toBe(2)
expect(testCollection.status).toBe(`ready`)
})

Expand Down
Loading