From 10e4db1ad3d85411fdc99690fc94253e28cae43e Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Thu, 27 Nov 2025 14:08:56 +0000 Subject: [PATCH 1/3] add failing test --- .../tests/electric.test.ts | 107 ++++++++++++++---- 1 file changed, 88 insertions(+), 19 deletions(-) diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index 04ecd4fb1..9808c36fe 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -2264,14 +2264,24 @@ describe(`Electric Integration`, () => { // Tests for commit and ready behavior with snapshot-end and up-to-date messages describe(`Commit and ready behavior`, () => { - it(`should commit on snapshot-end in eager mode but not mark ready`, () => { + it(`should ignore snapshot-end before first up-to-date in progressive mode`, () => { + vi.clearAllMocks() + + let testSubscriber!: (messages: Array>) => void + mockSubscribe.mockImplementation((callback) => { + testSubscriber = callback + return () => {} + }) + mockRequestSnapshot.mockResolvedValue(undefined) + mockFetchSnapshot.mockResolvedValue({ metadata: {}, data: [] }) + const config = { - id: `eager-snapshot-end-test`, + id: `progressive-ignore-snapshot-end-test`, shapeOptions: { url: `http://test-url`, params: { table: `test_table` }, }, - syncMode: `eager` as const, + syncMode: `progressive` as const, getKey: (item: Row) => item.id as number, startSync: true, } @@ -2279,7 +2289,10 @@ describe(`Electric Integration`, () => { const testCollection = createCollection(electricCollectionOptions(config)) // Send data followed by snapshot-end (but no up-to-date) - subscriber([ + // In progressive mode, these messages should be BUFFERED, and snapshot-end + // should NOT trigger a commit because the snapshot-end in the log could be + // from a significant period before the stream is actually up to date + testSubscriber([ { key: `1`, value: { id: 1, name: `Test User` }, @@ -2295,21 +2308,74 @@ describe(`Electric Integration`, () => { }, ]) - // Data should be committed (available in state) + // Data should NOT be visible yet (snapshot-end should be ignored before up-to-date) + expect(testCollection.has(1)).toBe(false) + expect(testCollection.status).toBe(`loading`) + + // Now send up-to-date (triggers atomic swap) + testSubscriber([ + { + headers: { control: `up-to-date` }, + }, + ]) + + // Now data should be visible after atomic swap expect(testCollection.has(1)).toBe(true) expect(testCollection.get(1)).toEqual({ id: 1, name: `Test User` }) + expect(testCollection.status).toBe(`ready`) + }) - // But collection should NOT be marked as ready yet in eager mode - expect(testCollection.status).toBe(`loading`) + it(`should commit on snapshot-end in eager mode AFTER first up-to-date`, () => { + const config = { + id: `eager-snapshot-end-test`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + syncMode: `eager` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } - // Now send up-to-date + const testCollection = createCollection(electricCollectionOptions(config)) + + // First send up-to-date (with initial data) to establish the connection subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { operation: `insert` }, + }, { headers: { control: `up-to-date` }, }, ]) - // Now it should be ready + // Data should be committed and collection ready + expect(testCollection.has(1)).toBe(true) + expect(testCollection.status).toBe(`ready`) + + // Now send more data followed by snapshot-end (simulating incremental snapshot) + // After the first up-to-date, snapshot-end SHOULD commit + subscriber([ + { + key: `2`, + value: { id: 2, name: `Second User` }, + headers: { operation: `insert` }, + }, + { + headers: { + control: `snapshot-end`, + xmin: `100`, + xmax: `110`, + xip_list: [], + }, + }, + ]) + + // Data should be committed since we've already received up-to-date + expect(testCollection.has(2)).toBe(true) + expect(testCollection.get(2)).toEqual({ id: 2, name: `Second User` }) expect(testCollection.status).toBe(`ready`) }) @@ -2415,7 +2481,7 @@ describe(`Electric Integration`, () => { expect(testCollection.status).toBe(`ready`) }) - it(`should commit multiple snapshot-end messages before up-to-date in eager mode`, () => { + it(`should NOT commit multiple snapshot-end messages before up-to-date in eager mode`, () => { const config = { id: `eager-multiple-snapshots-test`, shapeOptions: { @@ -2429,7 +2495,7 @@ describe(`Electric Integration`, () => { const testCollection = createCollection(electricCollectionOptions(config)) - // First snapshot with data + // First snapshot with data - snapshot-end should be ignored before up-to-date subscriber([ { key: `1`, @@ -2446,11 +2512,11 @@ describe(`Electric Integration`, () => { }, ]) - // First data should be committed - expect(testCollection.has(1)).toBe(true) + // First data should NOT be committed yet (snapshot-end ignored before up-to-date) + expect(testCollection.has(1)).toBe(false) expect(testCollection.status).toBe(`loading`) - // Second snapshot with more data + // Second snapshot with more data - still before up-to-date, so should be ignored subscriber([ { key: `2`, @@ -2467,19 +2533,22 @@ describe(`Electric Integration`, () => { }, ]) - // Second data should also be committed - expect(testCollection.has(2)).toBe(true) - expect(testCollection.size).toBe(2) + // Second data should also NOT be committed yet + expect(testCollection.has(2)).toBe(false) + expect(testCollection.size).toBe(0) expect(testCollection.status).toBe(`loading`) - // Finally send up-to-date + // Finally send up-to-date - this commits all the pending data subscriber([ { headers: { control: `up-to-date` }, }, ]) - // Now should be ready + // Now all data should be committed and collection ready + expect(testCollection.has(1)).toBe(true) + expect(testCollection.has(2)).toBe(true) + expect(testCollection.size).toBe(2) expect(testCollection.status).toBe(`ready`) }) From 7f80a6a3fe7516bf2c8a1463a756674e24c69407 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Thu, 27 Nov 2025 14:21:33 +0000 Subject: [PATCH 2/3] fix it --- packages/electric-db-collection/src/electric.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 50ffe8f3c..360f07bbb 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -1038,7 +1038,13 @@ function createElectricSync>( ) } else { // Normal mode or on-demand: commit transaction if one was started - if (transactionStarted) { + // In eager mode, only commit on snapshot-end if we've already received + // the first up-to-date, because the snapshot-end in the log could be from + // a significant period before the stream is actually up to date + const shouldCommit = + hasUpToDate || syncMode === `on-demand` || hasReceivedUpToDate + + if (transactionStarted && shouldCommit) { commit() transactionStarted = false } From 1e178dbbbdbb1e15a68429130cb73b9dd3220ae0 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Thu, 27 Nov 2025 14:21:59 +0000 Subject: [PATCH 3/3] changeset --- .changeset/fix-snapshot-end-before-up-to-date.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/fix-snapshot-end-before-up-to-date.md diff --git a/.changeset/fix-snapshot-end-before-up-to-date.md b/.changeset/fix-snapshot-end-before-up-to-date.md new file mode 100644 index 000000000..3fa8b45df --- /dev/null +++ b/.changeset/fix-snapshot-end-before-up-to-date.md @@ -0,0 +1,5 @@ +--- +"@tanstack/electric-db-collection": patch +--- + +Fix eager mode incorrectly committing data on `snapshot-end` before receiving the first `up-to-date` message. The `snapshot-end` in the Electric log can be from a significant period before the stream is actually up to date, so commits should only occur on `up-to-date` (or on `snapshot-end` after the first `up-to-date` has been received). This change does not affect `on-demand` mode where `snapshot-end` correctly triggers commits, or `progressive` mode which was already protected by its buffering mechanism.