From b702326b1c2c4554863a45eb6b26fd2754f0972b Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 19 Nov 2025 12:17:39 +0000 Subject: [PATCH 1/7] fix electric colleciton progressive mode --- packages/db-collection-e2e/src/index.ts | 1 + .../src/suites/progressive.suite.ts | 387 ++++++++++++++++++ packages/db-collection-e2e/src/types.ts | 5 + .../e2e/electric.e2e.test.ts | 67 +++ packages/electric-db-collection/package.json | 2 +- .../electric-db-collection/src/electric.ts | 147 +++++-- .../tests/electric-live-query.test.ts | 68 +-- .../tests/electric.test.ts | 201 +++++---- pnpm-lock.yaml | 14 +- 9 files changed, 760 insertions(+), 132 deletions(-) create mode 100644 packages/db-collection-e2e/src/suites/progressive.suite.ts diff --git a/packages/db-collection-e2e/src/index.ts b/packages/db-collection-e2e/src/index.ts index 04509bb9e..ae6031196 100644 --- a/packages/db-collection-e2e/src/index.ts +++ b/packages/db-collection-e2e/src/index.ts @@ -25,3 +25,4 @@ export { createDeduplicationTestSuite } from "./suites/deduplication.suite" export { createCollationTestSuite } from "./suites/collation.suite" export { createMutationsTestSuite } from "./suites/mutations.suite" export { createLiveUpdatesTestSuite } from "./suites/live-updates.suite" +export { createProgressiveTestSuite } from "./suites/progressive.suite" diff --git a/packages/db-collection-e2e/src/suites/progressive.suite.ts b/packages/db-collection-e2e/src/suites/progressive.suite.ts new file mode 100644 index 000000000..5c2ba96e6 --- /dev/null +++ b/packages/db-collection-e2e/src/suites/progressive.suite.ts @@ -0,0 +1,387 @@ +/** + * Progressive Mode Test Suite (Electric only) + * + * Tests progressive sync mode behavior including: + * - Snapshot loading during initial sync + * - Atomic swap on first up-to-date + * - Incremental updates after swap + * - Txid tracking behavior + */ + +import { describe, expect, it } from "vitest" +import { createLiveQueryCollection, eq, gt } from "@tanstack/db" +import { waitFor, waitForQueryData } from "../utils/helpers" +import type { E2ETestConfig } from "../types" + +export function createProgressiveTestSuite( + getConfig: () => Promise +) { + describe(`Progressive Mode Suite (Electric only)`, () => { + describe(`Basic Progressive Mode`, () => { + it(`should validate snapshot phase behavior and atomic swap with status transition`, async () => { + const config = await getConfig() + if (!config.collections.progressive) { + return // Skip if progressive collections not available + } + const progressiveUsers = config.collections.progressive.users + + // Create a query - this will trigger a snapshot fetch if still in snapshot phase + const query = createLiveQueryCollection((q) => + q + .from({ user: progressiveUsers }) + .where(({ user }) => eq(user.age, 25)) + ) + + await query.preload() + await waitForQueryData(query, { minSize: 1, timeout: 10000 }) + + const querySize = query.size + const queryItems = Array.from(query.values()) + + // Validate query data + expect(querySize).toBeGreaterThan(0) + queryItems.forEach((user) => { + expect(user.age).toBe(25) + expect(user.id).toBeDefined() + }) + + // If we're still loading, we should be in snapshot phase + // Base collection should have data from snapshot (query subset) + const statusDuringQuery = progressiveUsers.status + if (statusDuringQuery === `loading`) { + // We're in snapshot phase! Validate snapshot behavior + // Collection should have the snapshot data + expect(progressiveUsers.size).toBeGreaterThan(0) + + // But collection size should be <= query size (only snapshot loaded) + // Actually it might have multiple snapshots if other tests ran, so just verify we have data + expect(progressiveUsers.size).toBeGreaterThan(0) + } + + // Wait for full sync to complete + await waitFor(() => progressiveUsers.status === `ready`, { + timeout: 30000, + message: `Progressive collection did not complete sync`, + }) + + // After atomic swap to full synced state + // Collection should have ALL users (not just age=25) + const finalCollectionSize = progressiveUsers.size + expect(finalCollectionSize).toBeGreaterThan(querySize) // More than just our query subset + + // Query should still work with consistent data + const finalQueryItems = Array.from(query.values()) + finalQueryItems.forEach((user) => { + expect(user.age).toBe(25) // Still matches predicate + expect(user.id).toBeDefined() + }) + + // Verify some of the original snapshot items are still present + queryItems.forEach((originalUser) => { + const foundInCollection = progressiveUsers.get(originalUser.id) + expect(foundInCollection).toBeDefined() + expect(foundInCollection?.age).toBe(25) + }) + + await query.cleanup() + }) + + it(`should load snapshots during initial sync and perform atomic swap`, async () => { + const config = await getConfig() + if (!config.collections.progressive) { + return // Skip if progressive collections not available + } + const progressiveUsers = config.collections.progressive.users + + // Progressive collections should only be marked ready AFTER first up-to-date + // If already ready, the full sync completed very fast - we can still test the end state + const wasStillLoading = progressiveUsers.status === `loading` + + // Query a subset + const query = createLiveQueryCollection((q) => + q + .from({ user: progressiveUsers }) + .where(({ user }) => eq(user.age, 25)) + ) + + await query.preload() + + // Wait for query to have data (either from snapshot during loading, or from final state if already ready) + await waitForQueryData(query, { minSize: 1, timeout: 10000 }) + + const beforeSwapSize = query.size + const beforeSwapItems = Array.from(query.values()) + + // Verify all items match the predicate + beforeSwapItems.forEach((user) => { + expect(user.age).toBe(25) + expect(user.id).toBeDefined() + expect(user.name).toBeDefined() + }) + + if (wasStillLoading) { + // If we caught it during snapshot phase, wait for atomic swap + await waitFor(() => progressiveUsers.status === `ready`, { + timeout: 30000, + message: `Progressive collection did not complete sync`, + }) + + // After atomic swap, verify data is consistent + // The query should have the same data (from full sync) + const afterSwapItems = Array.from(query.values()) + expect(afterSwapItems.length).toBeGreaterThanOrEqual(beforeSwapSize) + + // All original items should still be present + beforeSwapItems.forEach((originalUser) => { + const stillPresent = afterSwapItems.some( + (u) => u.id === originalUser.id + ) + expect(stillPresent).toBe(true) + }) + } else { + // Already ready - verify final state is correct + expect(progressiveUsers.status).toBe(`ready`) + } + + // Final validation: all items still match predicate + Array.from(query.values()).forEach((user) => { + expect(user.age).toBe(25) + }) + + await query.cleanup() + }) + + it(`should handle multiple snapshots with different predicates`, async () => { + const config = await getConfig() + if (!config.collections.progressive) { + return // Skip if progressive collections not available + } + const progressiveUsers = config.collections.progressive.users + + // Create multiple queries with different predicates + const query1 = createLiveQueryCollection((q) => + q + .from({ user: progressiveUsers }) + .where(({ user }) => eq(user.age, 25)) + ) + + const query2 = createLiveQueryCollection((q) => + q + .from({ user: progressiveUsers }) + .where(({ user }) => gt(user.age, 30)) + ) + + await Promise.all([query1.preload(), query2.preload()]) + + // Wait for both to load snapshots + await Promise.all([ + waitForQueryData(query1, { minSize: 1, timeout: 10000 }), + waitForQueryData(query2, { minSize: 1, timeout: 10000 }), + ]) + + expect(query1.size).toBeGreaterThan(0) + expect(query2.size).toBeGreaterThan(0) + + // Verify data correctness + const query1Snapshot = Array.from(query1.values()) + const query2Snapshot = Array.from(query2.values()) + + query1Snapshot.forEach((user) => { + expect(user.age).toBe(25) + }) + query2Snapshot.forEach((user) => { + expect(user.age).toBeGreaterThan(30) + }) + + // Wait for full sync + await waitFor(() => progressiveUsers.status === `ready`, { + timeout: 30000, + message: `Progressive collection did not complete sync`, + }) + + // Both queries should still have data after swap with same predicates + expect(query1.size).toBeGreaterThan(0) + expect(query2.size).toBeGreaterThan(0) + + // Verify predicates still match after swap + Array.from(query1.values()).forEach((user) => { + expect(user.age).toBe(25) + }) + Array.from(query2.values()).forEach((user) => { + expect(user.age).toBeGreaterThan(30) + }) + + await Promise.all([query1.cleanup(), query2.cleanup()]) + }) + }) + + describe(`Incremental Updates After Swap`, () => { + it(`should receive incremental updates after atomic swap`, async () => { + const config = await getConfig() + if (!config.collections.progressive || !config.mutations?.insertUser) { + return // Skip if progressive collections or mutations not available + } + const progressiveUsers = config.collections.progressive.users + + // Wait for full sync first + await waitFor(() => progressiveUsers.status === `ready`, { + timeout: 30000, + message: `Progressive collection did not complete sync`, + }) + + const initialSize = progressiveUsers.size + + // Insert new data + const newUser = { + id: crypto.randomUUID(), + name: `Progressive Test User`, + email: `progressive@test.com`, + age: 35, + isActive: true, + createdAt: new Date(), + metadata: null, + deletedAt: null, + } + + await config.mutations.insertUser(newUser) + + // Wait for incremental update + if (config.hasReplicationLag) { + await waitFor(() => progressiveUsers.size > initialSize, { + timeout: 10000, + message: `New user not synced via incremental update`, + }) + } + + expect(progressiveUsers.size).toBeGreaterThan(initialSize) + + // Verify the new user is in the collection with correct data + const foundUser = progressiveUsers.get(newUser.id) + expect(foundUser).toBeDefined() + expect(foundUser?.id).toBe(newUser.id) + expect(foundUser?.name).toBe(newUser.name) + expect(foundUser?.email).toBe(newUser.email) + expect(foundUser?.age).toBe(newUser.age) + }) + }) + + describe(`Predicate Handling`, () => { + it(`should correctly handle predicates during and after snapshot phase`, async () => { + const config = await getConfig() + if (!config.collections.progressive) { + return // Skip if progressive collections not available + } + const progressiveUsers = config.collections.progressive.users + + // Create query with predicate during snapshot phase + const query = createLiveQueryCollection((q) => + q + .from({ user: progressiveUsers }) + .where(({ user }) => gt(user.age, 25)) + .orderBy(({ user }) => [user.age, `asc`]) + .limit(5) + ) + + await query.preload() + await waitForQueryData(query, { minSize: 1, timeout: 10000 }) + + const snapshotPhaseSize = query.size + + // Wait for atomic swap + await waitFor(() => progressiveUsers.status === `ready`, { + timeout: 30000, + message: `Progressive collection did not complete sync`, + }) + + // Verify predicate still works after swap + const afterSwapSize = query.size + const afterSwapItems = Array.from(query.values()) + + // Size should be reasonable (at least what we had in snapshot phase) + expect(afterSwapSize).toBeGreaterThanOrEqual(snapshotPhaseSize) + + // All items should match the predicate + afterSwapItems.forEach((user) => { + expect(user.age).toBeGreaterThan(25) + }) + + // Should respect limit + expect(afterSwapSize).toBeLessThanOrEqual(5) + + await query.cleanup() + }) + + it(`should deduplicate snapshot requests during snapshot phase`, async () => { + const config = await getConfig() + if (!config.collections.progressive) { + return // Skip if progressive collections not available + } + const progressiveUsers = config.collections.progressive.users + + // Create multiple identical queries (should be deduplicated) + const queries = Array.from({ length: 3 }, () => + createLiveQueryCollection((q) => + q + .from({ user: progressiveUsers }) + .where(({ user }) => eq(user.age, 30)) + ) + ) + + // Execute concurrently + await Promise.all(queries.map((q) => q.preload())) + + // Wait for data + await Promise.all( + queries.map((q) => + waitForQueryData(q, { minSize: 1, timeout: 10000 }) + ) + ) + + // All should have the same size and same data + const sizes = queries.map((q) => q.size) + expect(new Set(sizes).size).toBe(1) // All sizes are identical + + // Verify all queries have identical data (deduplication working) + const firstQueryData = Array.from(queries[0]!.values()) + const firstQueryIds = new Set(firstQueryData.map((u) => u.id)) + + queries.forEach((query) => { + const queryData = Array.from(query.values()) + queryData.forEach((user) => { + expect(user.age).toBe(30) // All match predicate + expect(firstQueryIds.has(user.id)).toBe(true) // Same items + }) + }) + + await Promise.all(queries.map((q) => q.cleanup())) + }) + }) + + describe(`Progressive Mode Resilience`, () => { + it(`should handle cleanup and restart during snapshot phase`, async () => { + const config = await getConfig() + if (!config.collections.progressive) { + return // Skip if progressive collections not available + } + const progressiveUsers = config.collections.progressive.users + + // This test verifies the collection can be cleaned up even during snapshot phase + // and that the atomic swap doesn't cause issues + + const query = createLiveQueryCollection((q) => + q + .from({ user: progressiveUsers }) + .where(({ user }) => eq(user.age, 25)) + ) + + await query.preload() + + // Don't wait for data, just cleanup immediately + await query.cleanup() + + // Should not throw + expect(true).toBe(true) + }) + }) + }) +} diff --git a/packages/db-collection-e2e/src/types.ts b/packages/db-collection-e2e/src/types.ts index 86ad7d6d5..be65399c8 100644 --- a/packages/db-collection-e2e/src/types.ts +++ b/packages/db-collection-e2e/src/types.ts @@ -60,6 +60,11 @@ export interface E2ETestConfig { posts: Collection comments: Collection } + progressive?: { + users: Collection + posts: Collection + comments: Collection + } } // Mutation helpers using collection APIs (works for both Electric and Query) diff --git a/packages/electric-db-collection/e2e/electric.e2e.test.ts b/packages/electric-db-collection/e2e/electric.e2e.test.ts index d3b8135af..452614afd 100644 --- a/packages/electric-db-collection/e2e/electric.e2e.test.ts +++ b/packages/electric-db-collection/e2e/electric.e2e.test.ts @@ -16,12 +16,20 @@ import { createMutationsTestSuite, createPaginationTestSuite, createPredicatesTestSuite, + createProgressiveTestSuite, generateSeedData, } from "../../db-collection-e2e/src/index" import { waitFor } from "../../db-collection-e2e/src/utils/helpers" import type { E2ETestConfig } from "../../db-collection-e2e/src/types" import type { Client } from "pg" +declare module "vitest" { + export interface ProvidedContext { + baseUrl: string + testSchema: string + } +} + describe(`Electric Collection E2E Tests`, () => { let config: E2ETestConfig let dbClient: Client @@ -304,6 +312,51 @@ describe(`Electric Collection E2E Tests`, () => { }) ) + const progressiveUsers = createCollection( + electricCollectionOptions({ + id: `electric-e2e-users-progressive-${testId}`, + shapeOptions: { + url: `${baseUrl}/v1/shape`, + params: { + table: `${testSchema}.${usersTable}`, + }, + }, + syncMode: `progressive`, + getKey: (item: any) => item.id, + startSync: true, + }) + ) + + const progressivePosts = createCollection( + electricCollectionOptions({ + id: `electric-e2e-posts-progressive-${testId}`, + shapeOptions: { + url: `${baseUrl}/v1/shape`, + params: { + table: `${testSchema}.${postsTable}`, + }, + }, + syncMode: `progressive`, + getKey: (item: any) => item.id, + startSync: true, + }) + ) + + const progressiveComments = createCollection( + electricCollectionOptions({ + id: `electric-e2e-comments-progressive-${testId}`, + shapeOptions: { + url: `${baseUrl}/v1/shape`, + params: { + table: `${testSchema}.${commentsTable}`, + }, + }, + syncMode: `progressive`, + getKey: (item: any) => item.id, + startSync: true, + }) + ) + // Wait for eager collections to sync all data await eagerUsers.preload() await eagerPosts.preload() @@ -314,6 +367,11 @@ describe(`Electric Collection E2E Tests`, () => { await onDemandPosts.preload() await onDemandComments.preload() + // Progressive collections start syncing in background, just preload to get started + await progressiveUsers.preload() + await progressivePosts.preload() + await progressiveComments.preload() + config = { collections: { eager: { @@ -326,6 +384,11 @@ describe(`Electric Collection E2E Tests`, () => { posts: onDemandPosts as any, comments: onDemandComments as any, }, + progressive: { + users: progressiveUsers as any, + posts: progressivePosts as any, + comments: progressiveComments as any, + }, }, hasReplicationLag: true, // Electric has async replication lag mutations: { @@ -420,6 +483,9 @@ describe(`Electric Collection E2E Tests`, () => { onDemandUsers.cleanup(), onDemandPosts.cleanup(), onDemandComments.cleanup(), + progressiveUsers.cleanup(), + progressivePosts.cleanup(), + progressiveComments.cleanup(), ]) }, } @@ -458,4 +524,5 @@ describe(`Electric Collection E2E Tests`, () => { createCollationTestSuite(getConfig) createMutationsTestSuite(getConfig) createLiveUpdatesTestSuite(getConfig) + createProgressiveTestSuite(getConfig) }) diff --git a/packages/electric-db-collection/package.json b/packages/electric-db-collection/package.json index 8d1cf8108..ee5d38371 100644 --- a/packages/electric-db-collection/package.json +++ b/packages/electric-db-collection/package.json @@ -3,7 +3,7 @@ "description": "ElectricSQL collection for TanStack DB", "version": "0.2.2", "dependencies": { - "@electric-sql/client": "^1.1.4", + "@electric-sql/client": "https://pkg.pr.new/@electric-sql/client@3463", "@standard-schema/spec": "^1.0.0", "@tanstack/db": "workspace:*", "@tanstack/store": "^0.8.0", diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 0b377f672..62ac8cb15 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -734,6 +734,10 @@ function createElectricSync>( const newSnapshots: Array = [] let hasReceivedUpToDate = false // Track if we've completed initial sync in progressive mode + // Progressive mode state + let isBufferingInitialSync = syncMode === `progressive` // True until first up-to-date in progressive mode + const bufferedMessages: Array> = [] // Buffer change messages during initial sync + // Create deduplicated loadSubset wrapper for non-eager modes // This prevents redundant snapshot requests when multiple concurrent // live queries request overlapping or subset predicates @@ -742,12 +746,48 @@ function createElectricSync>( ? null : new DeduplicatedLoadSubset({ loadSubset: async (opts: LoadSubsetOptions) => { - // In progressive mode, stop requesting snapshots once full sync is complete - if (syncMode === `progressive` && hasReceivedUpToDate) { - return + // In progressive mode, use fetchSnapshot during snapshot phase + if (syncMode === `progressive`) { + if (hasReceivedUpToDate) { + // Full sync complete, no need to load more + return + } + // Snapshot phase: fetch and apply immediately + const snapshotParams = compileSQL(opts) + try { + const snapshot = await stream.fetchSnapshot(snapshotParams) + const rows = snapshot.data + + // Apply snapshot data in a sync transaction (only if we have data) + if (rows.length > 0) { + begin() + for (const row of rows) { + write({ + type: `insert`, + value: row.value, + metadata: { + ...row.headers, + }, + }) + } + commit() + + debug( + `${collectionId ? `[${collectionId}] ` : ``}Applied snapshot with ${rows.length} rows` + ) + } + } catch (error) { + debug( + `${collectionId ? `[${collectionId}] ` : ``}Error fetching snapshot: %o`, + error + ) + throw error + } + } else { + // On-demand mode: use requestSnapshot + const snapshotParams = compileSQL(opts) + await stream.requestSnapshot(snapshotParams) } - const snapshotParams = compileSQL(opts) - await stream.requestSnapshot(snapshotParams) }, }) @@ -769,7 +809,8 @@ function createElectricSync>( } // Check for txids in the message and add them to our store - if (hasTxids(message)) { + // Skip during buffered initial sync in progressive mode (txids will be extracted during atomic swap) + if (hasTxids(message) && !isBufferingInitialSync) { message.headers.txids?.forEach((txid) => newTxids.add(txid)) } @@ -803,21 +844,30 @@ function createElectricSync>( relationSchema.setState(() => schema) } - if (!transactionStarted) { - begin() - transactionStarted = true - } + // In buffered initial sync of progressive mode, buffer messages instead of writing + if (isBufferingInitialSync) { + bufferedMessages.push(message) + } else { + // Normal processing: write changes immediately + if (!transactionStarted) { + begin() + transactionStarted = true + } - write({ - type: message.headers.operation, - value: message.value, - // Include the primary key and relation info in the metadata - metadata: { - ...message.headers, - }, - }) + write({ + type: message.headers.operation, + value: message.value, + // Include the primary key and relation info in the metadata + metadata: { + ...message.headers, + }, + }) + } } else if (isSnapshotEndMessage(message)) { - newSnapshots.push(parseSnapshotMessage(message)) + // Skip snapshot-end tracking during buffered initial sync (will be extracted during atomic swap) + if (!isBufferingInitialSync) { + newSnapshots.push(parseSnapshotMessage(message)) + } hasSnapshotEnd = true } else if (isUpToDateMessage(message)) { hasUpToDate = true @@ -842,19 +892,68 @@ function createElectricSync>( hasUpToDate = false hasSnapshotEnd = false hasReceivedUpToDate = false // Reset for progressive mode - we're starting a new sync + isBufferingInitialSync = syncMode === `progressive` // Reset buffering state + bufferedMessages.length = 0 // Clear buffered messages } } if (hasUpToDate || hasSnapshotEnd) { - // Clear the current batch buffer since we're now up-to-date - currentBatchMessages.setState(() => []) + // PROGRESSIVE MODE: Atomic swap on first up-to-date + if (isBufferingInitialSync && hasUpToDate) { + debug( + `${collectionId ? `[${collectionId}] ` : ``}Progressive mode: Performing atomic swap with ${bufferedMessages.length} buffered messages` + ) + + // Start atomic swap transaction + begin() - // Commit transaction if one was started - if (transactionStarted) { + // Truncate to clear all snapshot data + truncate() + + // Apply all buffered change messages and extract txids/snapshots + for (const bufferedMsg of bufferedMessages) { + if (isChangeMessage(bufferedMsg)) { + write({ + type: bufferedMsg.headers.operation, + value: bufferedMsg.value, + metadata: { + ...bufferedMsg.headers, + }, + }) + + // Extract txids from buffered messages (will be committed to store after transaction) + if (hasTxids(bufferedMsg)) { + bufferedMsg.headers.txids?.forEach((txid) => + newTxids.add(txid) + ) + } + } else if (isSnapshotEndMessage(bufferedMsg)) { + // Extract snapshots from buffered messages (will be committed to store after transaction) + newSnapshots.push(parseSnapshotMessage(bufferedMsg)) + } + } + + // Commit the atomic swap commit() - transactionStarted = false + + // Exit buffering phase - now in normal sync mode + isBufferingInitialSync = false + bufferedMessages.length = 0 + + debug( + `${collectionId ? `[${collectionId}] ` : ``}Progressive mode: Atomic swap complete, now in normal sync mode` + ) + } else { + // Normal mode or on-demand: commit transaction if one was started + if (transactionStarted) { + commit() + transactionStarted = false + } } + // Clear the current batch buffer since we're now up-to-date + currentBatchMessages.setState(() => []) + if (hasUpToDate || (hasSnapshotEnd && syncMode === `on-demand`)) { // Mark the collection as ready now that sync is up to date markReady() diff --git a/packages/electric-db-collection/tests/electric-live-query.test.ts b/packages/electric-db-collection/tests/electric-live-query.test.ts index 5f269099a..3a6489454 100644 --- a/packages/electric-db-collection/tests/electric-live-query.test.ts +++ b/packages/electric-db-collection/tests/electric-live-query.test.ts @@ -56,8 +56,10 @@ const sampleUsers: Array = [ // Mock the ShapeStream module const mockSubscribe = vi.fn() const mockRequestSnapshot = vi.fn() +const mockFetchSnapshot = vi.fn() const mockStream = { subscribe: mockSubscribe, + fetchSnapshot: mockFetchSnapshot, requestSnapshot: async (...args: any) => { const result = await mockRequestSnapshot(...args) const subscribers = mockSubscribe.mock.calls.map((call) => call[0]) @@ -88,6 +90,14 @@ mockRequestSnapshot.mockResolvedValue({ data: [], }) +// Mock the fetchSnapshot method +// to return empty data with metadata +// since most tests don't use it +mockFetchSnapshot.mockResolvedValue({ + metadata: {}, + data: [], +}) + vi.mock(`@electric-sql/client`, async () => { const actual = await vi.importActual(`@electric-sql/client`) return { @@ -135,7 +145,13 @@ describe.each([ return createCollection({ ...options, startSync: true, - }) + }) as unknown as Collection< + User, + string | number, + ElectricCollectionUtils, + StandardSchemaV1, + User + > } function simulateInitialSync(users: Array = sampleUsers) { @@ -726,12 +742,12 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => { expect(liveQuery.size).toBeGreaterThan(2) }) - it(`should trigger requestSnapshot in progressive mode when live query needs more data`, async () => { + it(`should trigger fetchSnapshot in progressive mode when live query needs more data`, async () => { const electricCollection = createElectricCollectionWithSyncMode(`progressive`) - // Send initial snapshot with limited data (using snapshot-end, not up-to-date) - // This keeps the collection in "loading" state, simulating progressive mode still syncing + // In progressive mode, stream messages are buffered until up-to-date + // So collection starts empty even though we send data subscriber([ { key: sampleUsers[0]!.id.toString(), @@ -743,25 +759,19 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => { value: sampleUsers[1]!, headers: { operation: `insert` }, }, - { - headers: { - control: `snapshot-end`, - xmin: `100`, - xmax: `110`, - xip_list: [], - }, - }, ]) expect(electricCollection.status).toBe(`loading`) // Still syncing in progressive mode - expect(electricCollection.size).toBe(2) + // Messages are buffered, so size is 0 until up-to-date + expect(electricCollection.size).toBe(0) - // Mock requestSnapshot to return additional data - mockRequestSnapshot.mockResolvedValueOnce({ + // Mock fetchSnapshot to return data + mockFetchSnapshot.mockResolvedValueOnce({ + metadata: {}, data: [ { headers: { operation: `insert` }, - key: 3, + key: sampleUsers[2]!.id.toString(), value: sampleUsers[2]!, // Charlie }, ], @@ -781,15 +791,16 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => { // Wait for the live query to process await new Promise((resolve) => setTimeout(resolve, 0)) - // Should have requested more data from Electric with correct parameters - // First request asks for the full limit - expect(mockRequestSnapshot).toHaveBeenCalledWith( + // Should have fetched more data from Electric with correct parameters + // Progressive mode uses fetchSnapshot, not requestSnapshot + expect(mockFetchSnapshot).toHaveBeenCalledWith( expect.objectContaining({ limit: 3, // Requests full limit from Electric orderBy: `"id" NULLS FIRST`, params: {}, }) ) + expect(mockRequestSnapshot).not.toHaveBeenCalled() }) it(`should NOT trigger requestSnapshot in eager mode even when live query needs more data`, async () => { @@ -904,22 +915,10 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => { ) }) - it(`should handle complex filters in requestSnapshot`, async () => { + it(`should handle complex filters in fetchSnapshot`, async () => { const electricCollection = createElectricCollectionWithSyncMode(`progressive`) - // Send snapshot-end (not up-to-date) to keep collection in loading state - subscriber([ - { - headers: { - control: `snapshot-end`, - xmin: `100`, - xmax: `110`, - xip_list: [], - }, - }, - ]) - expect(electricCollection.status).toBe(`loading`) // Still syncing in progressive mode // Create live query with complex WHERE clause @@ -936,8 +935,8 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => { await new Promise((resolve) => setTimeout(resolve, 0)) - // Should have requested snapshot with complex WHERE clause - expect(mockRequestSnapshot).toHaveBeenCalledWith( + // Should have called fetchSnapshot with complex WHERE clause (not requestSnapshot) + expect(mockFetchSnapshot).toHaveBeenCalledWith( expect.objectContaining({ where: `"age" > $1`, params: { "1": `20` }, @@ -945,6 +944,7 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => { limit: 5, }) ) + expect(mockRequestSnapshot).not.toHaveBeenCalled() }) }) diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index 8637972f4..f536bc2da 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -20,9 +20,11 @@ import type { StandardSchemaV1 } from "@standard-schema/spec" // Mock the ShapeStream module const mockSubscribe = vi.fn() const mockRequestSnapshot = vi.fn() +const mockFetchSnapshot = vi.fn() const mockStream = { subscribe: mockSubscribe, requestSnapshot: mockRequestSnapshot, + fetchSnapshot: mockFetchSnapshot, } vi.mock(`@electric-sql/client`, async () => { @@ -72,7 +74,13 @@ describe(`Electric Integration`, () => { const options = electricCollectionOptions(config) // Create collection with Electric configuration using the new utility exposure pattern - collection = createCollection(options) + collection = createCollection(options) as unknown as Collection< + Row, + string | number, + ElectricCollectionUtils, + StandardSchemaV1, + Row + > }) it(`should commit an empty transaction when there's an up-to-date`, () => { @@ -673,9 +681,9 @@ describe(`Electric Integration`, () => { }) it(`should support custom timeout in matching strategy`, async () => { - const onInsert = vi.fn(async () => { + const onInsert = vi.fn(() => { // Return a txid that will never arrive with a very short timeout - return { txid: 999999, timeout: 100 } + return Promise.resolve({ txid: 999999, timeout: 100 }) }) const config = { @@ -1791,9 +1799,24 @@ describe(`Electric Integration`, () => { ) }) - it(`should request incremental snapshots in progressive mode when loadSubset is called before sync completes`, async () => { + it(`should fetch snapshots in progressive mode when loadSubset is called before sync completes`, async () => { vi.clearAllMocks() + mockSubscribe.mockImplementation((_callback) => { + return () => {} + }) + mockRequestSnapshot.mockResolvedValue(undefined) + mockFetchSnapshot.mockResolvedValue({ + metadata: {}, + data: [ + { + key: `2`, + value: { id: 2, name: `Snapshot User` }, + headers: { operation: `insert` }, + }, + ], + }) + const config = { id: `progressive-snapshot-test`, shapeOptions: { @@ -1809,35 +1832,23 @@ describe(`Electric Integration`, () => { const testCollection = createCollection(electricCollectionOptions(config)) - // Send initial data with snapshot-end (but not up-to-date yet - still syncing) - subscriber([ - { - key: `1`, - value: { id: 1, name: `Test User` }, - headers: { operation: `insert` }, - }, - { - headers: { - control: `snapshot-end`, - xmin: `100`, - xmax: `110`, - xip_list: [], - }, - }, - ]) - expect(testCollection.status).toBe(`loading`) // Not ready yet - // In progressive mode, calling loadSubset should request a snapshot BEFORE full sync completes + // In progressive mode, calling loadSubset should fetch a snapshot BEFORE full sync completes await testCollection._sync.loadSubset({ limit: 20 }) - // Verify requestSnapshot was called - expect(mockRequestSnapshot).toHaveBeenCalledWith( + // Verify fetchSnapshot was called (not requestSnapshot) + expect(mockFetchSnapshot).toHaveBeenCalledWith( expect.objectContaining({ limit: 20, params: {}, }) ) + expect(mockRequestSnapshot).not.toHaveBeenCalled() + + // Verify snapshot data was applied + expect(testCollection.has(2)).toBe(true) + expect(testCollection.get(2)).toEqual({ id: 2, name: `Snapshot User` }) }) it(`should not request snapshots when loadSubset is called in eager mode`, async () => { @@ -1875,6 +1886,23 @@ describe(`Electric Integration`, () => { it(`should handle progressive mode syncing in background`, async () => { vi.clearAllMocks() + let testSubscriber!: (messages: Array>) => void + mockSubscribe.mockImplementation((callback) => { + testSubscriber = callback + return () => {} + }) + mockRequestSnapshot.mockResolvedValue(undefined) + mockFetchSnapshot.mockResolvedValue({ + metadata: {}, + data: [ + { + key: `2`, + value: { id: 2, name: `Snapshot User` }, + headers: { operation: `insert` }, + }, + ], + }) + const config = { id: `progressive-background-sync-test`, shapeOptions: { @@ -1890,44 +1918,61 @@ describe(`Electric Integration`, () => { const testCollection = createCollection(electricCollectionOptions(config)) - // Send initial data with snapshot-end (but not up-to-date - still syncing) - subscriber([ + // Send stream data during snapshot phase (should be buffered) + testSubscriber([ { key: `1`, value: { id: 1, name: `Initial User` }, headers: { operation: `insert` }, }, - { - headers: { - control: `snapshot-end`, - xmin: `100`, - xmax: `110`, - xip_list: [], - }, - }, ]) - // Collection should have data but not be ready yet + // Collection should NOT have the buffered data yet expect(testCollection.status).toBe(`loading`) - expect(testCollection.has(1)).toBe(true) + expect(testCollection.has(1)).toBe(false) - // Should be able to request more data incrementally before full sync completes + // Should be able to fetch snapshot data incrementally before full sync completes await testCollection._sync.loadSubset({ limit: 10 }) - expect(mockRequestSnapshot).toHaveBeenCalled() + expect(mockFetchSnapshot).toHaveBeenCalled() + expect(mockRequestSnapshot).not.toHaveBeenCalled() - // Now send up-to-date to complete the sync - subscriber([ + // Snapshot data should be visible + expect(testCollection.has(2)).toBe(true) + + // Now send up-to-date to complete the sync (triggers atomic swap) + testSubscriber([ { headers: { control: `up-to-date` }, }, ]) expect(testCollection.status).toBe(`ready`) + + // After atomic swap, buffered data should be visible, snapshot data cleared + expect(testCollection.has(1)).toBe(true) + expect(testCollection.has(2)).toBe(false) // Snapshot data truncated }) - it(`should stop requesting snapshots in progressive mode after first up-to-date`, async () => { + it(`should stop fetching snapshots in progressive mode after first up-to-date and perform atomic swap`, async () => { vi.clearAllMocks() + let testSubscriber!: (messages: Array>) => void + mockSubscribe.mockImplementation((callback) => { + testSubscriber = callback + return () => {} + }) + mockRequestSnapshot.mockResolvedValue(undefined) + mockFetchSnapshot.mockResolvedValue({ + metadata: {}, + data: [ + { + key: `2`, + value: { id: 2, name: `Snapshot User` }, + headers: { operation: `insert` }, + }, + ], + }) + const config = { id: `progressive-stop-after-sync-test`, shapeOptions: { @@ -1943,33 +1988,28 @@ describe(`Electric Integration`, () => { const testCollection = createCollection(electricCollectionOptions(config)) - // Send initial data with snapshot-end (not up-to-date yet) - subscriber([ + expect(testCollection.status).toBe(`loading`) // Not ready yet in progressive + + // Should be able to fetch data before up-to-date (snapshot phase) + vi.clearAllMocks() + await testCollection._sync.loadSubset({ limit: 10 }) + expect(mockFetchSnapshot).toHaveBeenCalledTimes(1) + expect(testCollection.has(2)).toBe(true) // Snapshot data applied + + // Send change messages during snapshot phase (should be buffered) + testSubscriber([ { key: `1`, value: { id: 1, name: `User 1` }, headers: { operation: `insert` }, }, - { - headers: { - control: `snapshot-end`, - xmin: `100`, - xmax: `110`, - xip_list: [], - }, - }, ]) - expect(testCollection.status).toBe(`loading`) // Not ready yet in progressive - expect(testCollection.has(1)).toBe(true) - - // Should be able to request more data before up-to-date - vi.clearAllMocks() - await testCollection._sync.loadSubset({ limit: 10 }) - expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) + // Data should NOT be visible yet (buffered) + expect(testCollection.has(1)).toBe(false) - // Now send up-to-date to complete the full sync - subscriber([ + // Now send up-to-date to complete the full sync and trigger atomic swap + testSubscriber([ { headers: { control: `up-to-date` }, }, @@ -1977,10 +2017,15 @@ describe(`Electric Integration`, () => { expect(testCollection.status).toBe(`ready`) - // Try to request more data - should NOT make a request since full sync is complete + // After atomic swap, buffered data should be visible + expect(testCollection.has(1)).toBe(true) + // Snapshot data should be cleared by truncate, so id:2 should be gone + expect(testCollection.has(2)).toBe(false) + + // Try to fetch more data - should NOT make a request since full sync is complete vi.clearAllMocks() await testCollection._sync.loadSubset({ limit: 10 }) - expect(mockRequestSnapshot).not.toHaveBeenCalled() + expect(mockFetchSnapshot).not.toHaveBeenCalled() }) it(`should allow snapshots in on-demand mode even after up-to-date`, async () => { @@ -2229,7 +2274,17 @@ describe(`Electric Integration`, () => { expect(testCollection.status).toBe(`ready`) }) - it(`should commit on snapshot-end in progressive mode but not mark ready`, () => { + it(`should buffer changes during snapshot phase in progressive mode until up-to-date`, () => { + vi.clearAllMocks() + + let testSubscriber!: (messages: Array>) => void + mockSubscribe.mockImplementation((callback) => { + testSubscriber = callback + return () => {} + }) + mockRequestSnapshot.mockResolvedValue(undefined) + mockFetchSnapshot.mockResolvedValue({ metadata: {}, data: [] }) + const config = { id: `progressive-snapshot-end-test`, shapeOptions: { @@ -2244,7 +2299,8 @@ describe(`Electric Integration`, () => { const testCollection = createCollection(electricCollectionOptions(config)) // Send data followed by snapshot-end (but no up-to-date) - subscriber([ + // In progressive mode, these messages should be BUFFERED, not committed + testSubscriber([ { key: `1`, value: { id: 1, name: `Test User` }, @@ -2260,21 +2316,24 @@ describe(`Electric Integration`, () => { }, ]) - // Data should be committed (available in state) - expect(testCollection.has(1)).toBe(true) - expect(testCollection.get(1)).toEqual({ id: 1, name: `Test User` }) + // Data should NOT be visible yet (it's buffered during snapshot phase) + expect(testCollection.has(1)).toBe(false) - // But collection should NOT be marked as ready yet in progressive mode + // Collection should NOT be marked as ready yet in progressive mode expect(testCollection.status).toBe(`loading`) - // Now send up-to-date - subscriber([ + // Now send up-to-date (triggers atomic swap) + testSubscriber([ { headers: { control: `up-to-date` }, }, ]) - // Now it should be ready + // Now data should be visible after atomic swap + expect(testCollection.has(1)).toBe(true) + expect(testCollection.get(1)).toEqual({ id: 1, name: `Test User` }) + + // And it should be ready expect(testCollection.status).toBe(`ready`) }) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3b1c50ea4..7175eec82 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -790,8 +790,8 @@ importers: packages/electric-db-collection: dependencies: '@electric-sql/client': - specifier: ^1.1.4 - version: 1.1.4 + specifier: https://pkg.pr.new/@electric-sql/client@3463 + version: https://pkg.pr.new/@electric-sql/client@3463 '@standard-schema/spec': specifier: ^1.0.0 version: 1.0.0 @@ -1595,6 +1595,10 @@ packages: '@electric-sql/client@1.1.4': resolution: {integrity: sha512-88M2iEg5LYSE388wNWjrQMpsIgLzwge60cCN5znQSDEnUEprJZ13vB3r+S8IRNYbmOtUSNVEODuXn+nlqEhEGw==} + '@electric-sql/client@https://pkg.pr.new/@electric-sql/client@3463': + resolution: {tarball: https://pkg.pr.new/@electric-sql/client@3463} + version: 1.1.4 + '@emnapi/core@1.5.0': resolution: {integrity: sha512-sbP8GzB1WDzacS8fgNPpHlp6C9VZe+SJP3F90W9rLemaQj2PzIuTEl1qDOYQf58YIpyjViI24y9aPWCjEzY2cg==} @@ -9727,6 +9731,12 @@ snapshots: optionalDependencies: '@rollup/rollup-darwin-arm64': 4.52.5 + '@electric-sql/client@https://pkg.pr.new/@electric-sql/client@3463': + dependencies: + '@microsoft/fetch-event-source': 2.0.1 + optionalDependencies: + '@rollup/rollup-darwin-arm64': 4.52.5 + '@emnapi/core@1.5.0': dependencies: '@emnapi/wasi-threads': 1.1.0 From aebab1625f22077190ced336f17375c091ea9bac Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 19 Nov 2025 13:48:50 +0000 Subject: [PATCH 2/7] improve progressive e2e tests --- .../src/suites/progressive.suite.ts | 124 +++++++++++++----- packages/db-collection-e2e/src/types.ts | 6 + .../e2e/electric.e2e.test.ts | 56 ++++++-- .../electric-db-collection/src/electric.ts | 47 ++++++- 4 files changed, 188 insertions(+), 45 deletions(-) diff --git a/packages/db-collection-e2e/src/suites/progressive.suite.ts b/packages/db-collection-e2e/src/suites/progressive.suite.ts index 5c2ba96e6..acd281fab 100644 --- a/packages/db-collection-e2e/src/suites/progressive.suite.ts +++ b/packages/db-collection-e2e/src/suites/progressive.suite.ts @@ -18,14 +18,35 @@ export function createProgressiveTestSuite( ) { describe(`Progressive Mode Suite (Electric only)`, () => { describe(`Basic Progressive Mode`, () => { - it(`should validate snapshot phase behavior and atomic swap with status transition`, async () => { + it(`should explicitly validate snapshot phase and atomic swap transition`, async () => { const config = await getConfig() - if (!config.collections.progressive) { - return // Skip if progressive collections not available + if (!config.collections.progressive || !config.progressiveTestControl) { + return // Skip if progressive collections or test control not available } const progressiveUsers = config.collections.progressive.users - // Create a query - this will trigger a snapshot fetch if still in snapshot phase + // Start sync for this test + progressiveUsers.startSyncImmediate() + + // === PHASE 1: SNAPSHOT PHASE === + // Collection might already be ready from a previous test + // (progressive collections are shared and not cleaned up between tests) + await new Promise((resolve) => setTimeout(resolve, 50)) + + const initialStatus = progressiveUsers.status + + // If already ready, we can't test the explicit transition + if (initialStatus === `ready`) { + console.log( + `Progressive collection already ready, skipping explicit transition test` + ) + return + } + + // Should be loading or idle (hook prevents marking ready) + expect([`idle`, `loading`]).toContain(initialStatus) + + // Create a query - this triggers fetchSnapshot const query = createLiveQueryCollection((q) => q .from({ user: progressiveUsers }) @@ -35,49 +56,47 @@ export function createProgressiveTestSuite( await query.preload() await waitForQueryData(query, { minSize: 1, timeout: 10000 }) - const querySize = query.size - const queryItems = Array.from(query.values()) + // Validate snapshot data + const snapshotQuerySize = query.size + const snapshotItems = Array.from(query.values()) - // Validate query data - expect(querySize).toBeGreaterThan(0) - queryItems.forEach((user) => { + expect(snapshotQuerySize).toBeGreaterThan(0) + snapshotItems.forEach((user) => { expect(user.age).toBe(25) expect(user.id).toBeDefined() + expect(user.name).toBeDefined() }) - // If we're still loading, we should be in snapshot phase - // Base collection should have data from snapshot (query subset) - const statusDuringQuery = progressiveUsers.status - if (statusDuringQuery === `loading`) { - // We're in snapshot phase! Validate snapshot behavior - // Collection should have the snapshot data - expect(progressiveUsers.size).toBeGreaterThan(0) - - // But collection size should be <= query size (only snapshot loaded) - // Actually it might have multiple snapshots if other tests ran, so just verify we have data - expect(progressiveUsers.size).toBeGreaterThan(0) - } + // Collection should STILL be loading (paused before atomic swap) + expect(progressiveUsers.status).toBe(`loading`) + + // Collection has snapshot data + const snapshotCollectionSize = progressiveUsers.size + expect(snapshotCollectionSize).toBeGreaterThan(0) - // Wait for full sync to complete + // === PHASE 2: TRIGGER ATOMIC SWAP === + config.progressiveTestControl.releaseInitialSync() + + // === PHASE 3: POST-SWAP (FULLY SYNCED) === + // Wait for ready (atomic swap complete) await waitFor(() => progressiveUsers.status === `ready`, { timeout: 30000, message: `Progressive collection did not complete sync`, }) - // After atomic swap to full synced state - // Collection should have ALL users (not just age=25) + // Collection now has full dataset (more than just snapshot) const finalCollectionSize = progressiveUsers.size - expect(finalCollectionSize).toBeGreaterThan(querySize) // More than just our query subset + expect(finalCollectionSize).toBeGreaterThan(snapshotQuerySize) - // Query should still work with consistent data + // Query still works with consistent data const finalQueryItems = Array.from(query.values()) finalQueryItems.forEach((user) => { expect(user.age).toBe(25) // Still matches predicate expect(user.id).toBeDefined() }) - // Verify some of the original snapshot items are still present - queryItems.forEach((originalUser) => { + // Verify original snapshot items are still present after swap + snapshotItems.forEach((originalUser) => { const foundInCollection = progressiveUsers.get(originalUser.id) expect(foundInCollection).toBeDefined() expect(foundInCollection?.age).toBe(25) @@ -93,11 +112,10 @@ export function createProgressiveTestSuite( } const progressiveUsers = config.collections.progressive.users - // Progressive collections should only be marked ready AFTER first up-to-date - // If already ready, the full sync completed very fast - we can still test the end state - const wasStillLoading = progressiveUsers.status === `loading` + // Start sync for this test + progressiveUsers.startSyncImmediate() - // Query a subset + // Query a subset (triggers snapshot fetch) const query = createLiveQueryCollection((q) => q .from({ user: progressiveUsers }) @@ -106,9 +124,17 @@ export function createProgressiveTestSuite( await query.preload() - // Wait for query to have data (either from snapshot during loading, or from final state if already ready) + // Wait for query to have snapshot data await waitForQueryData(query, { minSize: 1, timeout: 10000 }) + // Collection should still be loading if test control is active + const wasStillLoading = progressiveUsers.status === `loading` + + // Release the initial sync to allow atomic swap + if (config.progressiveTestControl) { + config.progressiveTestControl.releaseInitialSync() + } + const beforeSwapSize = query.size const beforeSwapItems = Array.from(query.values()) @@ -158,6 +184,9 @@ export function createProgressiveTestSuite( } const progressiveUsers = config.collections.progressive.users + // Start sync for this test + progressiveUsers.startSyncImmediate() + // Create multiple queries with different predicates const query1 = createLiveQueryCollection((q) => q @@ -179,6 +208,11 @@ export function createProgressiveTestSuite( waitForQueryData(query2, { minSize: 1, timeout: 10000 }), ]) + // Release initial sync to allow atomic swap + if (config.progressiveTestControl) { + config.progressiveTestControl.releaseInitialSync() + } + expect(query1.size).toBeGreaterThan(0) expect(query2.size).toBeGreaterThan(0) @@ -223,7 +257,12 @@ export function createProgressiveTestSuite( } const progressiveUsers = config.collections.progressive.users - // Wait for full sync first + // Release initial sync immediately (we don't care about snapshot phase for this test) + if (config.progressiveTestControl) { + config.progressiveTestControl.releaseInitialSync() + } + + // Wait for full sync await waitFor(() => progressiveUsers.status === `ready`, { timeout: 30000, message: `Progressive collection did not complete sync`, @@ -287,6 +326,11 @@ export function createProgressiveTestSuite( const snapshotPhaseSize = query.size + // Release initial sync to allow atomic swap + if (config.progressiveTestControl) { + config.progressiveTestControl.releaseInitialSync() + } + // Wait for atomic swap await waitFor(() => progressiveUsers.status === `ready`, { timeout: 30000, @@ -337,6 +381,11 @@ export function createProgressiveTestSuite( ) ) + // Release initial sync to allow completion + if (config.progressiveTestControl) { + config.progressiveTestControl.releaseInitialSync() + } + // All should have the same size and same data const sizes = queries.map((q) => q.size) expect(new Set(sizes).size).toBe(1) // All sizes are identical @@ -366,8 +415,6 @@ export function createProgressiveTestSuite( const progressiveUsers = config.collections.progressive.users // This test verifies the collection can be cleaned up even during snapshot phase - // and that the atomic swap doesn't cause issues - const query = createLiveQueryCollection((q) => q .from({ user: progressiveUsers }) @@ -376,6 +423,11 @@ export function createProgressiveTestSuite( await query.preload() + // Release initial sync + if (config.progressiveTestControl) { + config.progressiveTestControl.releaseInitialSync() + } + // Don't wait for data, just cleanup immediately await query.cleanup() diff --git a/packages/db-collection-e2e/src/types.ts b/packages/db-collection-e2e/src/types.ts index be65399c8..16112997e 100644 --- a/packages/db-collection-e2e/src/types.ts +++ b/packages/db-collection-e2e/src/types.ts @@ -80,6 +80,12 @@ export interface E2ETestConfig { // When true, tests will wait for mutations to propagate before proceeding hasReplicationLag?: boolean + // Test control for progressive mode (Electric only) + // Allows explicit control over when initial sync completes for deterministic testing + progressiveTestControl?: { + releaseInitialSync: () => void + } + // Lifecycle hooks setup: () => Promise teardown: () => Promise diff --git a/packages/electric-db-collection/e2e/electric.e2e.test.ts b/packages/electric-db-collection/e2e/electric.e2e.test.ts index 452614afd..cb5604bbb 100644 --- a/packages/electric-db-collection/e2e/electric.e2e.test.ts +++ b/packages/electric-db-collection/e2e/electric.e2e.test.ts @@ -6,7 +6,7 @@ import { afterAll, afterEach, beforeAll, describe, inject } from "vitest" import { createCollection } from "@tanstack/db" -import { electricCollectionOptions } from "../src/electric" +import { ELECTRIC_TEST_HOOKS, electricCollectionOptions } from "../src/electric" import { makePgClient } from "../../db-collection-e2e/support/global-setup" import { createCollationTestSuite, @@ -312,6 +312,31 @@ describe(`Electric Collection E2E Tests`, () => { }) ) + // Create control mechanisms for progressive collections + // These allow tests to explicitly control when the atomic swap happens + // We use a ref object so each test can get a fresh promise + const usersUpToDateControl = { + current: null as (() => void) | null, + createPromise: () => + new Promise((resolve) => { + usersUpToDateControl.current = resolve + }), + } + const postsUpToDateControl = { + current: null as (() => void) | null, + createPromise: () => + new Promise((resolve) => { + postsUpToDateControl.current = resolve + }), + } + const commentsUpToDateControl = { + current: null as (() => void) | null, + createPromise: () => + new Promise((resolve) => { + commentsUpToDateControl.current = resolve + }), + } + const progressiveUsers = createCollection( electricCollectionOptions({ id: `electric-e2e-users-progressive-${testId}`, @@ -323,7 +348,10 @@ describe(`Electric Collection E2E Tests`, () => { }, syncMode: `progressive`, getKey: (item: any) => item.id, - startSync: true, + startSync: false, // Don't start immediately - tests will start when ready + [ELECTRIC_TEST_HOOKS]: { + beforeMarkingReady: () => usersUpToDateControl.createPromise(), + }, }) ) @@ -338,7 +366,10 @@ describe(`Electric Collection E2E Tests`, () => { }, syncMode: `progressive`, getKey: (item: any) => item.id, - startSync: true, + startSync: false, // Don't start immediately - tests will start when ready + [ELECTRIC_TEST_HOOKS]: { + beforeMarkingReady: () => postsUpToDateControl.createPromise(), + }, }) ) @@ -353,7 +384,10 @@ describe(`Electric Collection E2E Tests`, () => { }, syncMode: `progressive`, getKey: (item: any) => item.id, - startSync: true, + startSync: false, // Don't start immediately - tests will start when ready + [ELECTRIC_TEST_HOOKS]: { + beforeMarkingReady: () => commentsUpToDateControl.createPromise(), + }, }) ) @@ -367,10 +401,9 @@ describe(`Electric Collection E2E Tests`, () => { await onDemandPosts.preload() await onDemandComments.preload() - // Progressive collections start syncing in background, just preload to get started - await progressiveUsers.preload() - await progressivePosts.preload() - await progressiveComments.preload() + // Progressive collections start syncing in background + // Note: We DON'T call preload() here because the test hooks will block + // Individual progressive tests will handle preload and release as needed config = { collections: { @@ -391,6 +424,13 @@ describe(`Electric Collection E2E Tests`, () => { }, }, hasReplicationLag: true, // Electric has async replication lag + progressiveTestControl: { + releaseInitialSync: () => { + usersUpToDateControl.current?.() + postsUpToDateControl.current?.() + commentsUpToDateControl.current?.() + }, + }, mutations: { // Use direct SQL for Electric tests (simulates external changes) // This tests that Electric sync picks up database changes diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 62ac8cb15..8e11c955d 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -40,6 +40,22 @@ export { isChangeMessage, isControlMessage } from "@electric-sql/client" const debug = DebugModule.debug(`ts/db:electric`) +/** + * Symbol for internal test hooks (hidden from public API) + */ +export const ELECTRIC_TEST_HOOKS = Symbol(`electricTestHooks`) + +/** + * Internal test hooks interface (for testing only) + */ +export interface ElectricTestHooks { + /** + * Called before marking collection ready after first up-to-date in progressive mode + * Allows tests to pause and validate snapshot phase before atomic swap completes + */ + beforeMarkingReady?: () => Promise +} + /** * Type representing a transaction ID in ElectricSQL */ @@ -118,6 +134,12 @@ export interface ElectricCollectionConfig< shapeOptions: ShapeStreamOptions> syncMode?: ElectricSyncMode + /** + * Internal test hooks (for testing only) + * Hidden via Symbol to prevent accidental usage in production + */ + [ELECTRIC_TEST_HOOKS]?: ElectricTestHooks + /** * Optional asynchronous handler function called before an insert operation * @param params Object containing transaction and collection information @@ -379,6 +401,7 @@ export function electricCollectionOptions( removePendingMatches, resolveMatchedPendingMatches, collectionId: config.id, + testHooks: config[ELECTRIC_TEST_HOOKS], }) /** @@ -631,6 +654,7 @@ function createElectricSync>( removePendingMatches: (matchIds: Array) => void resolveMatchedPendingMatches: () => void collectionId?: string + testHooks?: ElectricTestHooks } ): SyncConfig { const { @@ -642,6 +666,7 @@ function createElectricSync>( removePendingMatches, resolveMatchedPendingMatches, collectionId, + testHooks, } = options const MAX_BATCH_MESSAGES = 1000 // Safety limit for message buffer @@ -669,6 +694,26 @@ function createElectricSync>( sync: (params: Parameters[`sync`]>[0]) => { const { begin, write, commit, markReady, truncate, collection } = params + // Wrap markReady to wait for test hook in progressive mode + let progressiveReadyGate: Promise | null = null + const wrappedMarkReady = () => { + // Only create gate if we're in buffering phase (first up-to-date) + if ( + isBufferingInitialSync && + syncMode === `progressive` && + testHooks?.beforeMarkingReady + ) { + // Create a new gate promise for this sync cycle + progressiveReadyGate = testHooks.beforeMarkingReady() + progressiveReadyGate.then(() => { + markReady() + }) + } else { + // No hook, not buffering, or already past first up-to-date + markReady() + } + } + // Abort controller for the stream - wraps the signal if provided const abortController = new AbortController() @@ -956,7 +1001,7 @@ function createElectricSync>( if (hasUpToDate || (hasSnapshotEnd && syncMode === `on-demand`)) { // Mark the collection as ready now that sync is up to date - markReady() + wrappedMarkReady() } // Track that we've received the first up-to-date for progressive mode From 93ebf98c8794e9a378574197848810fd0bc17003 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 19 Nov 2025 14:14:13 +0000 Subject: [PATCH 3/7] add txid tracking tests --- .../src/suites/progressive.suite.ts | 147 +++++++++++++++++- packages/db-collection-e2e/src/types.ts | 3 + .../e2e/electric.e2e.test.ts | 10 ++ 3 files changed, 157 insertions(+), 3 deletions(-) diff --git a/packages/db-collection-e2e/src/suites/progressive.suite.ts b/packages/db-collection-e2e/src/suites/progressive.suite.ts index acd281fab..75b081778 100644 --- a/packages/db-collection-e2e/src/suites/progressive.suite.ts +++ b/packages/db-collection-e2e/src/suites/progressive.suite.ts @@ -1,5 +1,5 @@ /** - * Progressive Mode Test Suite (Electric only) + * Progressive Mode Test Suite * * Tests progressive sync mode behavior including: * - Snapshot loading during initial sync @@ -11,12 +11,14 @@ import { describe, expect, it } from "vitest" import { createLiveQueryCollection, eq, gt } from "@tanstack/db" import { waitFor, waitForQueryData } from "../utils/helpers" -import type { E2ETestConfig } from "../types" +import type { E2ETestConfig, User } from "../types" +import type { Collection } from "@tanstack/db" +import type { ElectricCollectionUtils } from "@tanstack/electric-db-collection" export function createProgressiveTestSuite( getConfig: () => Promise ) { - describe(`Progressive Mode Suite (Electric only)`, () => { + describe(`Progressive Mode Suite`, () => { describe(`Basic Progressive Mode`, () => { it(`should explicitly validate snapshot phase and atomic swap transition`, async () => { const config = await getConfig() @@ -406,6 +408,145 @@ export function createProgressiveTestSuite( }) }) + describe(`Txid Tracking Behavior (Electric only)`, () => { + it(`should not track txids during snapshot phase but track them after atomic swap`, async () => { + const config = await getConfig() + if ( + !config.collections.progressive || + !config.mutations?.insertUser || + !config.getTxid + ) { + return // Skip if progressive collections, mutations, or getTxid not available + } + const progressiveUsers = config.collections.progressive + .users as Collection + + // awaitTxId is guaranteed to exist on ElectricCollectionUtils + // This test is Electric-only via the describe block name + + // Start sync but don't release yet (stay in snapshot phase) + progressiveUsers.startSyncImmediate() + await new Promise((resolve) => setTimeout(resolve, 100)) + + // Should be in loading state (snapshot phase) + if (progressiveUsers.status !== `loading`) { + console.log( + `Collection already ready, cannot test snapshot phase txid behavior` + ) + return + } + + // === PHASE 1: INSERT DURING SNAPSHOT PHASE === + const snapshotPhaseUser = { + id: crypto.randomUUID(), + name: `Snapshot Phase User`, + email: `snapshot@test.com`, + age: 28, + isActive: true, + createdAt: new Date(), + metadata: null, + deletedAt: null, + } + + // Insert user and track when awaitTxId completes + let txidResolved = false + + // Start the insert + await config.mutations.insertUser(snapshotPhaseUser) + + // Get the txid from postgres + const txid = await config.getTxid() + + if (!txid) { + console.log(`Could not get txid, skipping txid tracking validation`) + config.progressiveTestControl?.releaseInitialSync() + return + } + + // Start awaiting the txid (should NOT resolve during snapshot phase) + progressiveUsers.utils.awaitTxId(txid, 60000).then(() => { + txidResolved = true + }) + + // Wait a moment for sync to process + await new Promise((resolve) => setTimeout(resolve, 500)) + + // Txid should NOT have resolved yet (snapshot phase, txids not tracked) + expect(txidResolved).toBe(false) + + // Query for the user (triggers fetchSnapshot with this user) + const query = createLiveQueryCollection((q) => + q + .from({ user: progressiveUsers }) + .where(({ user }) => eq(user.id, snapshotPhaseUser.id)) + ) + + await query.preload() + await waitForQueryData(query, { minSize: 1, timeout: 10000 }) + + // User should be in snapshot data + expect(query.size).toBe(1) + expect(query.get(snapshotPhaseUser.id)).toBeDefined() + + // But collection is still in snapshot phase + expect(progressiveUsers.status).toBe(`loading`) + + // Txid should STILL not have resolved (snapshot doesn't track txids) + expect(txidResolved).toBe(false) + + // === PHASE 2: TRIGGER ATOMIC SWAP === + if (config.progressiveTestControl) { + config.progressiveTestControl.releaseInitialSync() + } + + // Wait for atomic swap to complete + await waitFor(() => progressiveUsers.status === `ready`, { + timeout: 30000, + message: `Progressive collection did not complete sync`, + }) + + // NOW txid should resolve (buffered messages include txids) + await waitFor(() => txidResolved, { + timeout: 5000, + message: `Txid did not resolve after atomic swap`, + }) + + expect(txidResolved).toBe(true) + + // === PHASE 3: VERIFY TXID TRACKING POST-SWAP === + // User should still be present after atomic swap + expect(progressiveUsers.get(snapshotPhaseUser.id)).toBeDefined() + + // Now insert another user and verify txid tracking works + const postSwapUser = { + id: crypto.randomUUID(), + name: `Post Swap User`, + email: `postswap@test.com`, + age: 29, + isActive: true, + createdAt: new Date(), + metadata: null, + deletedAt: null, + } + + await config.mutations.insertUser(postSwapUser) + + // Wait for incremental update (txid tracking should work now) + if (config.hasReplicationLag) { + await waitFor(() => progressiveUsers.has(postSwapUser.id), { + timeout: 10000, + message: `Post-swap user not synced via incremental update`, + }) + } + + // Both users should be present + expect(progressiveUsers.get(snapshotPhaseUser.id)).toBeDefined() + expect(progressiveUsers.get(postSwapUser.id)).toBeDefined() + + await query.cleanup() + }) + }) + describe(`Progressive Mode Resilience`, () => { it(`should handle cleanup and restart during snapshot phase`, async () => { const config = await getConfig() diff --git a/packages/db-collection-e2e/src/types.ts b/packages/db-collection-e2e/src/types.ts index 16112997e..561ce3b03 100644 --- a/packages/db-collection-e2e/src/types.ts +++ b/packages/db-collection-e2e/src/types.ts @@ -76,6 +76,9 @@ export interface E2ETestConfig { insertPost: (post: Post) => Promise } + // Helper to get txid for Electric txid tracking tests (Electric only) + getTxid?: () => Promise + // Indicates if the backend has replication lag (e.g., Electric sync) // When true, tests will wait for mutations to propagate before proceeding hasReplicationLag?: boolean diff --git a/packages/electric-db-collection/e2e/electric.e2e.test.ts b/packages/electric-db-collection/e2e/electric.e2e.test.ts index cb5604bbb..f928ef50b 100644 --- a/packages/electric-db-collection/e2e/electric.e2e.test.ts +++ b/packages/electric-db-collection/e2e/electric.e2e.test.ts @@ -431,6 +431,16 @@ describe(`Electric Collection E2E Tests`, () => { commentsUpToDateControl.current?.() }, }, + getTxid: async () => { + // Get the current transaction ID from the last operation + // This uses pg_current_xact_id_if_assigned() which returns the txid + // Note: This gets the CURRENT transaction's ID, so must be called + // immediately after an insert in the same transaction context + const result = await dbClient.query( + `SELECT pg_current_xact_id_if_assigned()::text::bigint as txid` + ) + return result.rows[0]?.txid || null + }, mutations: { // Use direct SQL for Electric tests (simulates external changes) // This tests that Electric sync picks up database changes From 132b57f6166606dd10c411740ae7e89d225d3e26 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 19 Nov 2025 14:16:25 +0000 Subject: [PATCH 4/7] changeset --- .../fix-progressive-mode-fetchsnapshot.md | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 .changeset/fix-progressive-mode-fetchsnapshot.md diff --git a/.changeset/fix-progressive-mode-fetchsnapshot.md b/.changeset/fix-progressive-mode-fetchsnapshot.md new file mode 100644 index 000000000..d82043ef4 --- /dev/null +++ b/.changeset/fix-progressive-mode-fetchsnapshot.md @@ -0,0 +1,30 @@ +--- +"@tanstack/electric-db-collection": patch +"@tanstack/db-collection-e2e": patch +"@tanstack/db": patch +--- + +Fix progressive mode to use fetchSnapshot and atomic swap + +Progressive mode was broken because `requestSnapshot()` injected snapshots into the stream in causally correct position, which didn't work properly with the `full` mode stream. This release fixes progressive mode by: + +**Core Changes:** +- Use `fetchSnapshot()` during initial sync to fetch and apply snapshots immediately in sync transactions +- Buffer all stream messages during initial sync (renamed flag to `isBufferingInitialSync`) +- Perform atomic swap on first `up-to-date`: truncate snapshot data → apply buffered messages → mark ready +- Track txids/snapshots only after atomic swap (enables correct optimistic transaction confirmation) + +**Test Infrastructure:** +- Added `ELECTRIC_TEST_HOOKS` symbol for test control (hidden from public API) +- Added `progressiveTestControl.releaseInitialSync()` to E2E test config for explicit transition control +- Created comprehensive progressive mode E2E test suite (8 tests): + - Explicit snapshot phase and atomic swap validation + - Txid tracking behavior (Electric-only) + - Multiple concurrent snapshots with deduplication + - Incremental updates after swap + - Predicate handling and resilience tests + +**Bug Fixes:** +- Fixed type errors in test files +- All 166 unit tests + 95 E2E tests passing + From 3d37b855bfde2374845d9861d301289928dd81f0 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 19 Nov 2025 14:19:02 +0000 Subject: [PATCH 5/7] format --- .changeset/fix-progressive-mode-fetchsnapshot.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.changeset/fix-progressive-mode-fetchsnapshot.md b/.changeset/fix-progressive-mode-fetchsnapshot.md index d82043ef4..862da4a30 100644 --- a/.changeset/fix-progressive-mode-fetchsnapshot.md +++ b/.changeset/fix-progressive-mode-fetchsnapshot.md @@ -9,12 +9,14 @@ Fix progressive mode to use fetchSnapshot and atomic swap Progressive mode was broken because `requestSnapshot()` injected snapshots into the stream in causally correct position, which didn't work properly with the `full` mode stream. This release fixes progressive mode by: **Core Changes:** + - Use `fetchSnapshot()` during initial sync to fetch and apply snapshots immediately in sync transactions - Buffer all stream messages during initial sync (renamed flag to `isBufferingInitialSync`) - Perform atomic swap on first `up-to-date`: truncate snapshot data → apply buffered messages → mark ready - Track txids/snapshots only after atomic swap (enables correct optimistic transaction confirmation) **Test Infrastructure:** + - Added `ELECTRIC_TEST_HOOKS` symbol for test control (hidden from public API) - Added `progressiveTestControl.releaseInitialSync()` to E2E test config for explicit transition control - Created comprehensive progressive mode E2E test suite (8 tests): @@ -25,6 +27,6 @@ Progressive mode was broken because `requestSnapshot()` injected snapshots into - Predicate handling and resilience tests **Bug Fixes:** + - Fixed type errors in test files - All 166 unit tests + 95 E2E tests passing - From 15270281f488a7187bf461d534635bef0296a50b Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 19 Nov 2025 14:54:45 +0000 Subject: [PATCH 6/7] bump electric client version --- packages/electric-db-collection/package.json | 2 +- pnpm-lock.yaml | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/packages/electric-db-collection/package.json b/packages/electric-db-collection/package.json index ee5d38371..ccae104a6 100644 --- a/packages/electric-db-collection/package.json +++ b/packages/electric-db-collection/package.json @@ -3,7 +3,7 @@ "description": "ElectricSQL collection for TanStack DB", "version": "0.2.2", "dependencies": { - "@electric-sql/client": "https://pkg.pr.new/@electric-sql/client@3463", + "@electric-sql/client": "^1.1.5", "@standard-schema/spec": "^1.0.0", "@tanstack/db": "workspace:*", "@tanstack/store": "^0.8.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 7175eec82..08559d125 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -790,8 +790,8 @@ importers: packages/electric-db-collection: dependencies: '@electric-sql/client': - specifier: https://pkg.pr.new/@electric-sql/client@3463 - version: https://pkg.pr.new/@electric-sql/client@3463 + specifier: ^1.1.5 + version: 1.1.5 '@standard-schema/spec': specifier: ^1.0.0 version: 1.0.0 @@ -1595,9 +1595,8 @@ packages: '@electric-sql/client@1.1.4': resolution: {integrity: sha512-88M2iEg5LYSE388wNWjrQMpsIgLzwge60cCN5znQSDEnUEprJZ13vB3r+S8IRNYbmOtUSNVEODuXn+nlqEhEGw==} - '@electric-sql/client@https://pkg.pr.new/@electric-sql/client@3463': - resolution: {tarball: https://pkg.pr.new/@electric-sql/client@3463} - version: 1.1.4 + '@electric-sql/client@1.1.5': + resolution: {integrity: sha512-5QXmdXkl6AtDX89TRNwV2AVX0+ZJXRcA8/F55iIaifOKWq4sZaXiI9+Rb6iMhAl/S3MSw9abZucaLHBL+hScQg==} '@emnapi/core@1.5.0': resolution: {integrity: sha512-sbP8GzB1WDzacS8fgNPpHlp6C9VZe+SJP3F90W9rLemaQj2PzIuTEl1qDOYQf58YIpyjViI24y9aPWCjEzY2cg==} @@ -9731,7 +9730,7 @@ snapshots: optionalDependencies: '@rollup/rollup-darwin-arm64': 4.52.5 - '@electric-sql/client@https://pkg.pr.new/@electric-sql/client@3463': + '@electric-sql/client@1.1.5': dependencies: '@microsoft/fetch-event-source': 2.0.1 optionalDependencies: From 2846a6ef5149930c4f784ebc267ea437163f5b74 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Fri, 21 Nov 2025 11:22:39 +0000 Subject: [PATCH 7/7] address review --- .../electric-db-collection/src/electric.ts | 170 +++++++++++------- .../tests/electric.test.ts | 78 ++++++++ 2 files changed, 187 insertions(+), 61 deletions(-) diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 8e11c955d..50ffe8f3c 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -279,6 +279,93 @@ function hasTxids>( return `txids` in message.headers && Array.isArray(message.headers.txids) } +/** + * Creates a deduplicated loadSubset handler for progressive/on-demand modes + * Returns null for eager mode, or a DeduplicatedLoadSubset instance for other modes. + * Handles fetching snapshots in progressive mode during buffering phase, + * and requesting snapshots in on-demand mode + */ +function createLoadSubsetDedupe>({ + stream, + syncMode, + isBufferingInitialSync, + begin, + write, + commit, + collectionId, +}: { + stream: ShapeStream + syncMode: ElectricSyncMode + isBufferingInitialSync: () => boolean + begin: () => void + write: (mutation: { + type: `insert` | `update` | `delete` + value: T + metadata: Record + }) => void + commit: () => void + collectionId?: string +}): DeduplicatedLoadSubset | null { + // Eager mode doesn't need subset loading + if (syncMode === `eager`) { + return null + } + + const loadSubset = async (opts: LoadSubsetOptions) => { + // In progressive mode, use fetchSnapshot during snapshot phase + if (isBufferingInitialSync()) { + // Progressive mode snapshot phase: fetch and apply immediately + const snapshotParams = compileSQL(opts) + try { + const { data: rows } = await stream.fetchSnapshot(snapshotParams) + + // Check again if we're still buffering - we might have received up-to-date + // and completed the atomic swap while waiting for the snapshot + if (!isBufferingInitialSync()) { + debug( + `${collectionId ? `[${collectionId}] ` : ``}Ignoring snapshot - sync completed while fetching` + ) + return + } + + // Apply snapshot data in a sync transaction (only if we have data) + if (rows.length > 0) { + begin() + for (const row of rows) { + write({ + type: `insert`, + value: row.value, + metadata: { + ...row.headers, + }, + }) + } + commit() + + debug( + `${collectionId ? `[${collectionId}] ` : ``}Applied snapshot with ${rows.length} rows` + ) + } + } catch (error) { + debug( + `${collectionId ? `[${collectionId}] ` : ``}Error fetching snapshot: %o`, + error + ) + throw error + } + } else if (syncMode === `progressive`) { + // Progressive mode after full sync complete: no need to load more + return + } else { + // On-demand mode: use requestSnapshot + const snapshotParams = compileSQL(opts) + await stream.requestSnapshot(snapshotParams) + } + } + + return new DeduplicatedLoadSubset({ loadSubset }) +} + /** * Type for the awaitTxId utility function */ @@ -696,10 +783,10 @@ function createElectricSync>( // Wrap markReady to wait for test hook in progressive mode let progressiveReadyGate: Promise | null = null - const wrappedMarkReady = () => { + const wrappedMarkReady = (isBuffering: boolean) => { // Only create gate if we're in buffering phase (first up-to-date) if ( - isBufferingInitialSync && + isBuffering && syncMode === `progressive` && testHooks?.beforeMarkingReady ) { @@ -780,61 +867,23 @@ function createElectricSync>( let hasReceivedUpToDate = false // Track if we've completed initial sync in progressive mode // Progressive mode state - let isBufferingInitialSync = syncMode === `progressive` // True until first up-to-date in progressive mode + // Helper to determine if we're buffering the initial sync + const isBufferingInitialSync = () => + syncMode === `progressive` && !hasReceivedUpToDate const bufferedMessages: Array> = [] // Buffer change messages during initial sync // Create deduplicated loadSubset wrapper for non-eager modes // This prevents redundant snapshot requests when multiple concurrent // live queries request overlapping or subset predicates - const loadSubsetDedupe = - syncMode === `eager` - ? null - : new DeduplicatedLoadSubset({ - loadSubset: async (opts: LoadSubsetOptions) => { - // In progressive mode, use fetchSnapshot during snapshot phase - if (syncMode === `progressive`) { - if (hasReceivedUpToDate) { - // Full sync complete, no need to load more - return - } - // Snapshot phase: fetch and apply immediately - const snapshotParams = compileSQL(opts) - try { - const snapshot = await stream.fetchSnapshot(snapshotParams) - const rows = snapshot.data - - // Apply snapshot data in a sync transaction (only if we have data) - if (rows.length > 0) { - begin() - for (const row of rows) { - write({ - type: `insert`, - value: row.value, - metadata: { - ...row.headers, - }, - }) - } - commit() - - debug( - `${collectionId ? `[${collectionId}] ` : ``}Applied snapshot with ${rows.length} rows` - ) - } - } catch (error) { - debug( - `${collectionId ? `[${collectionId}] ` : ``}Error fetching snapshot: %o`, - error - ) - throw error - } - } else { - // On-demand mode: use requestSnapshot - const snapshotParams = compileSQL(opts) - await stream.requestSnapshot(snapshotParams) - } - }, - }) + const loadSubsetDedupe = createLoadSubsetDedupe({ + stream, + syncMode, + isBufferingInitialSync, + begin, + write, + commit, + collectionId, + }) unsubscribeStream = stream.subscribe((messages: Array>) => { let hasUpToDate = false @@ -855,7 +904,7 @@ function createElectricSync>( // Check for txids in the message and add them to our store // Skip during buffered initial sync in progressive mode (txids will be extracted during atomic swap) - if (hasTxids(message) && !isBufferingInitialSync) { + if (hasTxids(message) && !isBufferingInitialSync()) { message.headers.txids?.forEach((txid) => newTxids.add(txid)) } @@ -890,7 +939,7 @@ function createElectricSync>( } // In buffered initial sync of progressive mode, buffer messages instead of writing - if (isBufferingInitialSync) { + if (isBufferingInitialSync()) { bufferedMessages.push(message) } else { // Normal processing: write changes immediately @@ -910,7 +959,7 @@ function createElectricSync>( } } else if (isSnapshotEndMessage(message)) { // Skip snapshot-end tracking during buffered initial sync (will be extracted during atomic swap) - if (!isBufferingInitialSync) { + if (!isBufferingInitialSync()) { newSnapshots.push(parseSnapshotMessage(message)) } hasSnapshotEnd = true @@ -936,15 +985,14 @@ function createElectricSync>( // Reset flags so we continue accumulating changes until next up-to-date hasUpToDate = false hasSnapshotEnd = false - hasReceivedUpToDate = false // Reset for progressive mode - we're starting a new sync - isBufferingInitialSync = syncMode === `progressive` // Reset buffering state + hasReceivedUpToDate = false // Reset for progressive mode (isBufferingInitialSync will reflect this) bufferedMessages.length = 0 // Clear buffered messages } } if (hasUpToDate || hasSnapshotEnd) { // PROGRESSIVE MODE: Atomic swap on first up-to-date - if (isBufferingInitialSync && hasUpToDate) { + if (isBufferingInitialSync() && hasUpToDate) { debug( `${collectionId ? `[${collectionId}] ` : ``}Progressive mode: Performing atomic swap with ${bufferedMessages.length} buffered messages` ) @@ -981,8 +1029,8 @@ function createElectricSync>( // Commit the atomic swap commit() - // Exit buffering phase - now in normal sync mode - isBufferingInitialSync = false + // Exit buffering phase by marking that we've received up-to-date + // isBufferingInitialSync() will now return false bufferedMessages.length = 0 debug( @@ -1001,7 +1049,7 @@ function createElectricSync>( if (hasUpToDate || (hasSnapshotEnd && syncMode === `on-demand`)) { // Mark the collection as ready now that sync is up to date - wrappedMarkReady() + wrappedMarkReady(isBufferingInitialSync()) } // Track that we've received the first up-to-date for progressive mode diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index f536bc2da..04ecd4fb1 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -2066,6 +2066,84 @@ describe(`Electric Integration`, () => { expect(mockRequestSnapshot).toHaveBeenCalled() }) + it(`should ignore snapshot data when fetchSnapshot completes after up-to-date in progressive mode`, async () => { + vi.clearAllMocks() + + let testSubscriber!: (messages: Array>) => void + let resolveFetchSnapshot!: (value: any) => void + const fetchSnapshotPromise = new Promise((resolve) => { + resolveFetchSnapshot = resolve as any + }) + + mockSubscribe.mockImplementation((callback) => { + testSubscriber = callback + return () => {} + }) + mockRequestSnapshot.mockResolvedValue(undefined) + // Mock fetchSnapshot to return a promise we control + mockFetchSnapshot.mockReturnValue(fetchSnapshotPromise) + + const config = { + id: `progressive-snapshot-race-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + syncMode: `progressive` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + expect(testCollection.status).toBe(`loading`) + + // Start a loadSubset request (should call fetchSnapshot) + const loadSubsetPromise = testCollection._sync.loadSubset({ limit: 10 }) + + // Verify fetchSnapshot was called + expect(mockFetchSnapshot).toHaveBeenCalled() + + // Before the snapshot completes, send up-to-date to complete the sync + testSubscriber([ + { + key: `1`, + value: { id: 1, name: `Buffered User` }, + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Sync should be complete now + expect(testCollection.status).toBe(`ready`) + expect(testCollection.has(1)).toBe(true) + expect(testCollection.size).toBe(1) + + // Now resolve the fetchSnapshot with data + resolveFetchSnapshot({ + metadata: {}, + data: [ + { + key: `2`, + value: { id: 2, name: `Late Snapshot User` }, + headers: { operation: `insert` }, + }, + ], + }) + + // Wait for loadSubset to complete + await loadSubsetPromise + + // The snapshot data should be IGNORED because sync already completed + expect(testCollection.has(2)).toBe(false) + expect(testCollection.size).toBe(1) // Still only the buffered user + expect(testCollection.get(1)).toEqual({ id: 1, name: `Buffered User` }) + }) + it(`should default offset to 'now' in on-demand mode when no offset provided`, async () => { vi.clearAllMocks()