diff --git a/.changeset/fix-progressive-mode-fetchsnapshot.md b/.changeset/fix-progressive-mode-fetchsnapshot.md new file mode 100644 index 000000000..862da4a30 --- /dev/null +++ b/.changeset/fix-progressive-mode-fetchsnapshot.md @@ -0,0 +1,32 @@ +--- +"@tanstack/electric-db-collection": patch +"@tanstack/db-collection-e2e": patch +"@tanstack/db": patch +--- + +Fix progressive mode to use fetchSnapshot and atomic swap + +Progressive mode was broken because `requestSnapshot()` injected snapshots into the stream in causally correct position, which didn't work properly with the `full` mode stream. This release fixes progressive mode by: + +**Core Changes:** + +- Use `fetchSnapshot()` during initial sync to fetch and apply snapshots immediately in sync transactions +- Buffer all stream messages during initial sync (renamed flag to `isBufferingInitialSync`) +- Perform atomic swap on first `up-to-date`: truncate snapshot data → apply buffered messages → mark ready +- Track txids/snapshots only after atomic swap (enables correct optimistic transaction confirmation) + +**Test Infrastructure:** + +- Added `ELECTRIC_TEST_HOOKS` symbol for test control (hidden from public API) +- Added `progressiveTestControl.releaseInitialSync()` to E2E test config for explicit transition control +- Created comprehensive progressive mode E2E test suite (8 tests): + - Explicit snapshot phase and atomic swap validation + - Txid tracking behavior (Electric-only) + - Multiple concurrent snapshots with deduplication + - Incremental updates after swap + - Predicate handling and resilience tests + +**Bug Fixes:** + +- Fixed type errors in test files +- All 166 unit tests + 95 E2E tests passing diff --git a/packages/db-collection-e2e/src/index.ts b/packages/db-collection-e2e/src/index.ts index 04509bb9e..ae6031196 100644 --- a/packages/db-collection-e2e/src/index.ts +++ b/packages/db-collection-e2e/src/index.ts @@ -25,3 +25,4 @@ export { createDeduplicationTestSuite } from "./suites/deduplication.suite" export { createCollationTestSuite } from "./suites/collation.suite" export { createMutationsTestSuite } from "./suites/mutations.suite" export { createLiveUpdatesTestSuite } from "./suites/live-updates.suite" +export { createProgressiveTestSuite } from "./suites/progressive.suite" diff --git a/packages/db-collection-e2e/src/suites/progressive.suite.ts b/packages/db-collection-e2e/src/suites/progressive.suite.ts new file mode 100644 index 000000000..75b081778 --- /dev/null +++ b/packages/db-collection-e2e/src/suites/progressive.suite.ts @@ -0,0 +1,580 @@ +/** + * Progressive Mode Test Suite + * + * Tests progressive sync mode behavior including: + * - Snapshot loading during initial sync + * - Atomic swap on first up-to-date + * - Incremental updates after swap + * - Txid tracking behavior + */ + +import { describe, expect, it } from "vitest" +import { createLiveQueryCollection, eq, gt } from "@tanstack/db" +import { waitFor, waitForQueryData } from "../utils/helpers" +import type { E2ETestConfig, User } from "../types" +import type { Collection } from "@tanstack/db" +import type { ElectricCollectionUtils } from "@tanstack/electric-db-collection" + +export function createProgressiveTestSuite( + getConfig: () => Promise +) { + describe(`Progressive Mode Suite`, () => { + describe(`Basic Progressive Mode`, () => { + it(`should explicitly validate snapshot phase and atomic swap transition`, async () => { + const config = await getConfig() + if (!config.collections.progressive || !config.progressiveTestControl) { + return // Skip if progressive collections or test control not available + } + const progressiveUsers = config.collections.progressive.users + + // Start sync for this test + progressiveUsers.startSyncImmediate() + + // === PHASE 1: SNAPSHOT PHASE === + // Collection might already be ready from a previous test + // (progressive collections are shared and not cleaned up between tests) + await new Promise((resolve) => setTimeout(resolve, 50)) + + const initialStatus = progressiveUsers.status + + // If already ready, we can't test the explicit transition + if (initialStatus === `ready`) { + console.log( + `Progressive collection already ready, skipping explicit transition test` + ) + return + } + + // Should be loading or idle (hook prevents marking ready) + expect([`idle`, `loading`]).toContain(initialStatus) + + // Create a query - this triggers fetchSnapshot + const query = createLiveQueryCollection((q) => + q + .from({ user: progressiveUsers }) + .where(({ user }) => eq(user.age, 25)) + ) + + await query.preload() + await waitForQueryData(query, { minSize: 1, timeout: 10000 }) + + // Validate snapshot data + const snapshotQuerySize = query.size + const snapshotItems = Array.from(query.values()) + + expect(snapshotQuerySize).toBeGreaterThan(0) + snapshotItems.forEach((user) => { + expect(user.age).toBe(25) + expect(user.id).toBeDefined() + expect(user.name).toBeDefined() + }) + + // Collection should STILL be loading (paused before atomic swap) + expect(progressiveUsers.status).toBe(`loading`) + + // Collection has snapshot data + const snapshotCollectionSize = progressiveUsers.size + expect(snapshotCollectionSize).toBeGreaterThan(0) + + // === PHASE 2: TRIGGER ATOMIC SWAP === + config.progressiveTestControl.releaseInitialSync() + + // === PHASE 3: POST-SWAP (FULLY SYNCED) === + // Wait for ready (atomic swap complete) + await waitFor(() => progressiveUsers.status === `ready`, { + timeout: 30000, + message: `Progressive collection did not complete sync`, + }) + + // Collection now has full dataset (more than just snapshot) + const finalCollectionSize = progressiveUsers.size + expect(finalCollectionSize).toBeGreaterThan(snapshotQuerySize) + + // Query still works with consistent data + const finalQueryItems = Array.from(query.values()) + finalQueryItems.forEach((user) => { + expect(user.age).toBe(25) // Still matches predicate + expect(user.id).toBeDefined() + }) + + // Verify original snapshot items are still present after swap + snapshotItems.forEach((originalUser) => { + const foundInCollection = progressiveUsers.get(originalUser.id) + expect(foundInCollection).toBeDefined() + expect(foundInCollection?.age).toBe(25) + }) + + await query.cleanup() + }) + + it(`should load snapshots during initial sync and perform atomic swap`, async () => { + const config = await getConfig() + if (!config.collections.progressive) { + return // Skip if progressive collections not available + } + const progressiveUsers = config.collections.progressive.users + + // Start sync for this test + progressiveUsers.startSyncImmediate() + + // Query a subset (triggers snapshot fetch) + const query = createLiveQueryCollection((q) => + q + .from({ user: progressiveUsers }) + .where(({ user }) => eq(user.age, 25)) + ) + + await query.preload() + + // Wait for query to have snapshot data + await waitForQueryData(query, { minSize: 1, timeout: 10000 }) + + // Collection should still be loading if test control is active + const wasStillLoading = progressiveUsers.status === `loading` + + // Release the initial sync to allow atomic swap + if (config.progressiveTestControl) { + config.progressiveTestControl.releaseInitialSync() + } + + const beforeSwapSize = query.size + const beforeSwapItems = Array.from(query.values()) + + // Verify all items match the predicate + beforeSwapItems.forEach((user) => { + expect(user.age).toBe(25) + expect(user.id).toBeDefined() + expect(user.name).toBeDefined() + }) + + if (wasStillLoading) { + // If we caught it during snapshot phase, wait for atomic swap + await waitFor(() => progressiveUsers.status === `ready`, { + timeout: 30000, + message: `Progressive collection did not complete sync`, + }) + + // After atomic swap, verify data is consistent + // The query should have the same data (from full sync) + const afterSwapItems = Array.from(query.values()) + expect(afterSwapItems.length).toBeGreaterThanOrEqual(beforeSwapSize) + + // All original items should still be present + beforeSwapItems.forEach((originalUser) => { + const stillPresent = afterSwapItems.some( + (u) => u.id === originalUser.id + ) + expect(stillPresent).toBe(true) + }) + } else { + // Already ready - verify final state is correct + expect(progressiveUsers.status).toBe(`ready`) + } + + // Final validation: all items still match predicate + Array.from(query.values()).forEach((user) => { + expect(user.age).toBe(25) + }) + + await query.cleanup() + }) + + it(`should handle multiple snapshots with different predicates`, async () => { + const config = await getConfig() + if (!config.collections.progressive) { + return // Skip if progressive collections not available + } + const progressiveUsers = config.collections.progressive.users + + // Start sync for this test + progressiveUsers.startSyncImmediate() + + // Create multiple queries with different predicates + const query1 = createLiveQueryCollection((q) => + q + .from({ user: progressiveUsers }) + .where(({ user }) => eq(user.age, 25)) + ) + + const query2 = createLiveQueryCollection((q) => + q + .from({ user: progressiveUsers }) + .where(({ user }) => gt(user.age, 30)) + ) + + await Promise.all([query1.preload(), query2.preload()]) + + // Wait for both to load snapshots + await Promise.all([ + waitForQueryData(query1, { minSize: 1, timeout: 10000 }), + waitForQueryData(query2, { minSize: 1, timeout: 10000 }), + ]) + + // Release initial sync to allow atomic swap + if (config.progressiveTestControl) { + config.progressiveTestControl.releaseInitialSync() + } + + expect(query1.size).toBeGreaterThan(0) + expect(query2.size).toBeGreaterThan(0) + + // Verify data correctness + const query1Snapshot = Array.from(query1.values()) + const query2Snapshot = Array.from(query2.values()) + + query1Snapshot.forEach((user) => { + expect(user.age).toBe(25) + }) + query2Snapshot.forEach((user) => { + expect(user.age).toBeGreaterThan(30) + }) + + // Wait for full sync + await waitFor(() => progressiveUsers.status === `ready`, { + timeout: 30000, + message: `Progressive collection did not complete sync`, + }) + + // Both queries should still have data after swap with same predicates + expect(query1.size).toBeGreaterThan(0) + expect(query2.size).toBeGreaterThan(0) + + // Verify predicates still match after swap + Array.from(query1.values()).forEach((user) => { + expect(user.age).toBe(25) + }) + Array.from(query2.values()).forEach((user) => { + expect(user.age).toBeGreaterThan(30) + }) + + await Promise.all([query1.cleanup(), query2.cleanup()]) + }) + }) + + describe(`Incremental Updates After Swap`, () => { + it(`should receive incremental updates after atomic swap`, async () => { + const config = await getConfig() + if (!config.collections.progressive || !config.mutations?.insertUser) { + return // Skip if progressive collections or mutations not available + } + const progressiveUsers = config.collections.progressive.users + + // Release initial sync immediately (we don't care about snapshot phase for this test) + if (config.progressiveTestControl) { + config.progressiveTestControl.releaseInitialSync() + } + + // Wait for full sync + await waitFor(() => progressiveUsers.status === `ready`, { + timeout: 30000, + message: `Progressive collection did not complete sync`, + }) + + const initialSize = progressiveUsers.size + + // Insert new data + const newUser = { + id: crypto.randomUUID(), + name: `Progressive Test User`, + email: `progressive@test.com`, + age: 35, + isActive: true, + createdAt: new Date(), + metadata: null, + deletedAt: null, + } + + await config.mutations.insertUser(newUser) + + // Wait for incremental update + if (config.hasReplicationLag) { + await waitFor(() => progressiveUsers.size > initialSize, { + timeout: 10000, + message: `New user not synced via incremental update`, + }) + } + + expect(progressiveUsers.size).toBeGreaterThan(initialSize) + + // Verify the new user is in the collection with correct data + const foundUser = progressiveUsers.get(newUser.id) + expect(foundUser).toBeDefined() + expect(foundUser?.id).toBe(newUser.id) + expect(foundUser?.name).toBe(newUser.name) + expect(foundUser?.email).toBe(newUser.email) + expect(foundUser?.age).toBe(newUser.age) + }) + }) + + describe(`Predicate Handling`, () => { + it(`should correctly handle predicates during and after snapshot phase`, async () => { + const config = await getConfig() + if (!config.collections.progressive) { + return // Skip if progressive collections not available + } + const progressiveUsers = config.collections.progressive.users + + // Create query with predicate during snapshot phase + const query = createLiveQueryCollection((q) => + q + .from({ user: progressiveUsers }) + .where(({ user }) => gt(user.age, 25)) + .orderBy(({ user }) => [user.age, `asc`]) + .limit(5) + ) + + await query.preload() + await waitForQueryData(query, { minSize: 1, timeout: 10000 }) + + const snapshotPhaseSize = query.size + + // Release initial sync to allow atomic swap + if (config.progressiveTestControl) { + config.progressiveTestControl.releaseInitialSync() + } + + // Wait for atomic swap + await waitFor(() => progressiveUsers.status === `ready`, { + timeout: 30000, + message: `Progressive collection did not complete sync`, + }) + + // Verify predicate still works after swap + const afterSwapSize = query.size + const afterSwapItems = Array.from(query.values()) + + // Size should be reasonable (at least what we had in snapshot phase) + expect(afterSwapSize).toBeGreaterThanOrEqual(snapshotPhaseSize) + + // All items should match the predicate + afterSwapItems.forEach((user) => { + expect(user.age).toBeGreaterThan(25) + }) + + // Should respect limit + expect(afterSwapSize).toBeLessThanOrEqual(5) + + await query.cleanup() + }) + + it(`should deduplicate snapshot requests during snapshot phase`, async () => { + const config = await getConfig() + if (!config.collections.progressive) { + return // Skip if progressive collections not available + } + const progressiveUsers = config.collections.progressive.users + + // Create multiple identical queries (should be deduplicated) + const queries = Array.from({ length: 3 }, () => + createLiveQueryCollection((q) => + q + .from({ user: progressiveUsers }) + .where(({ user }) => eq(user.age, 30)) + ) + ) + + // Execute concurrently + await Promise.all(queries.map((q) => q.preload())) + + // Wait for data + await Promise.all( + queries.map((q) => + waitForQueryData(q, { minSize: 1, timeout: 10000 }) + ) + ) + + // Release initial sync to allow completion + if (config.progressiveTestControl) { + config.progressiveTestControl.releaseInitialSync() + } + + // All should have the same size and same data + const sizes = queries.map((q) => q.size) + expect(new Set(sizes).size).toBe(1) // All sizes are identical + + // Verify all queries have identical data (deduplication working) + const firstQueryData = Array.from(queries[0]!.values()) + const firstQueryIds = new Set(firstQueryData.map((u) => u.id)) + + queries.forEach((query) => { + const queryData = Array.from(query.values()) + queryData.forEach((user) => { + expect(user.age).toBe(30) // All match predicate + expect(firstQueryIds.has(user.id)).toBe(true) // Same items + }) + }) + + await Promise.all(queries.map((q) => q.cleanup())) + }) + }) + + describe(`Txid Tracking Behavior (Electric only)`, () => { + it(`should not track txids during snapshot phase but track them after atomic swap`, async () => { + const config = await getConfig() + if ( + !config.collections.progressive || + !config.mutations?.insertUser || + !config.getTxid + ) { + return // Skip if progressive collections, mutations, or getTxid not available + } + const progressiveUsers = config.collections.progressive + .users as Collection + + // awaitTxId is guaranteed to exist on ElectricCollectionUtils + // This test is Electric-only via the describe block name + + // Start sync but don't release yet (stay in snapshot phase) + progressiveUsers.startSyncImmediate() + await new Promise((resolve) => setTimeout(resolve, 100)) + + // Should be in loading state (snapshot phase) + if (progressiveUsers.status !== `loading`) { + console.log( + `Collection already ready, cannot test snapshot phase txid behavior` + ) + return + } + + // === PHASE 1: INSERT DURING SNAPSHOT PHASE === + const snapshotPhaseUser = { + id: crypto.randomUUID(), + name: `Snapshot Phase User`, + email: `snapshot@test.com`, + age: 28, + isActive: true, + createdAt: new Date(), + metadata: null, + deletedAt: null, + } + + // Insert user and track when awaitTxId completes + let txidResolved = false + + // Start the insert + await config.mutations.insertUser(snapshotPhaseUser) + + // Get the txid from postgres + const txid = await config.getTxid() + + if (!txid) { + console.log(`Could not get txid, skipping txid tracking validation`) + config.progressiveTestControl?.releaseInitialSync() + return + } + + // Start awaiting the txid (should NOT resolve during snapshot phase) + progressiveUsers.utils.awaitTxId(txid, 60000).then(() => { + txidResolved = true + }) + + // Wait a moment for sync to process + await new Promise((resolve) => setTimeout(resolve, 500)) + + // Txid should NOT have resolved yet (snapshot phase, txids not tracked) + expect(txidResolved).toBe(false) + + // Query for the user (triggers fetchSnapshot with this user) + const query = createLiveQueryCollection((q) => + q + .from({ user: progressiveUsers }) + .where(({ user }) => eq(user.id, snapshotPhaseUser.id)) + ) + + await query.preload() + await waitForQueryData(query, { minSize: 1, timeout: 10000 }) + + // User should be in snapshot data + expect(query.size).toBe(1) + expect(query.get(snapshotPhaseUser.id)).toBeDefined() + + // But collection is still in snapshot phase + expect(progressiveUsers.status).toBe(`loading`) + + // Txid should STILL not have resolved (snapshot doesn't track txids) + expect(txidResolved).toBe(false) + + // === PHASE 2: TRIGGER ATOMIC SWAP === + if (config.progressiveTestControl) { + config.progressiveTestControl.releaseInitialSync() + } + + // Wait for atomic swap to complete + await waitFor(() => progressiveUsers.status === `ready`, { + timeout: 30000, + message: `Progressive collection did not complete sync`, + }) + + // NOW txid should resolve (buffered messages include txids) + await waitFor(() => txidResolved, { + timeout: 5000, + message: `Txid did not resolve after atomic swap`, + }) + + expect(txidResolved).toBe(true) + + // === PHASE 3: VERIFY TXID TRACKING POST-SWAP === + // User should still be present after atomic swap + expect(progressiveUsers.get(snapshotPhaseUser.id)).toBeDefined() + + // Now insert another user and verify txid tracking works + const postSwapUser = { + id: crypto.randomUUID(), + name: `Post Swap User`, + email: `postswap@test.com`, + age: 29, + isActive: true, + createdAt: new Date(), + metadata: null, + deletedAt: null, + } + + await config.mutations.insertUser(postSwapUser) + + // Wait for incremental update (txid tracking should work now) + if (config.hasReplicationLag) { + await waitFor(() => progressiveUsers.has(postSwapUser.id), { + timeout: 10000, + message: `Post-swap user not synced via incremental update`, + }) + } + + // Both users should be present + expect(progressiveUsers.get(snapshotPhaseUser.id)).toBeDefined() + expect(progressiveUsers.get(postSwapUser.id)).toBeDefined() + + await query.cleanup() + }) + }) + + describe(`Progressive Mode Resilience`, () => { + it(`should handle cleanup and restart during snapshot phase`, async () => { + const config = await getConfig() + if (!config.collections.progressive) { + return // Skip if progressive collections not available + } + const progressiveUsers = config.collections.progressive.users + + // This test verifies the collection can be cleaned up even during snapshot phase + const query = createLiveQueryCollection((q) => + q + .from({ user: progressiveUsers }) + .where(({ user }) => eq(user.age, 25)) + ) + + await query.preload() + + // Release initial sync + if (config.progressiveTestControl) { + config.progressiveTestControl.releaseInitialSync() + } + + // Don't wait for data, just cleanup immediately + await query.cleanup() + + // Should not throw + expect(true).toBe(true) + }) + }) + }) +} diff --git a/packages/db-collection-e2e/src/types.ts b/packages/db-collection-e2e/src/types.ts index 86ad7d6d5..561ce3b03 100644 --- a/packages/db-collection-e2e/src/types.ts +++ b/packages/db-collection-e2e/src/types.ts @@ -60,6 +60,11 @@ export interface E2ETestConfig { posts: Collection comments: Collection } + progressive?: { + users: Collection + posts: Collection + comments: Collection + } } // Mutation helpers using collection APIs (works for both Electric and Query) @@ -71,10 +76,19 @@ export interface E2ETestConfig { insertPost: (post: Post) => Promise } + // Helper to get txid for Electric txid tracking tests (Electric only) + getTxid?: () => Promise + // Indicates if the backend has replication lag (e.g., Electric sync) // When true, tests will wait for mutations to propagate before proceeding hasReplicationLag?: boolean + // Test control for progressive mode (Electric only) + // Allows explicit control over when initial sync completes for deterministic testing + progressiveTestControl?: { + releaseInitialSync: () => void + } + // Lifecycle hooks setup: () => Promise teardown: () => Promise diff --git a/packages/electric-db-collection/e2e/electric.e2e.test.ts b/packages/electric-db-collection/e2e/electric.e2e.test.ts index d3b8135af..f928ef50b 100644 --- a/packages/electric-db-collection/e2e/electric.e2e.test.ts +++ b/packages/electric-db-collection/e2e/electric.e2e.test.ts @@ -6,7 +6,7 @@ import { afterAll, afterEach, beforeAll, describe, inject } from "vitest" import { createCollection } from "@tanstack/db" -import { electricCollectionOptions } from "../src/electric" +import { ELECTRIC_TEST_HOOKS, electricCollectionOptions } from "../src/electric" import { makePgClient } from "../../db-collection-e2e/support/global-setup" import { createCollationTestSuite, @@ -16,12 +16,20 @@ import { createMutationsTestSuite, createPaginationTestSuite, createPredicatesTestSuite, + createProgressiveTestSuite, generateSeedData, } from "../../db-collection-e2e/src/index" import { waitFor } from "../../db-collection-e2e/src/utils/helpers" import type { E2ETestConfig } from "../../db-collection-e2e/src/types" import type { Client } from "pg" +declare module "vitest" { + export interface ProvidedContext { + baseUrl: string + testSchema: string + } +} + describe(`Electric Collection E2E Tests`, () => { let config: E2ETestConfig let dbClient: Client @@ -304,6 +312,85 @@ describe(`Electric Collection E2E Tests`, () => { }) ) + // Create control mechanisms for progressive collections + // These allow tests to explicitly control when the atomic swap happens + // We use a ref object so each test can get a fresh promise + const usersUpToDateControl = { + current: null as (() => void) | null, + createPromise: () => + new Promise((resolve) => { + usersUpToDateControl.current = resolve + }), + } + const postsUpToDateControl = { + current: null as (() => void) | null, + createPromise: () => + new Promise((resolve) => { + postsUpToDateControl.current = resolve + }), + } + const commentsUpToDateControl = { + current: null as (() => void) | null, + createPromise: () => + new Promise((resolve) => { + commentsUpToDateControl.current = resolve + }), + } + + const progressiveUsers = createCollection( + electricCollectionOptions({ + id: `electric-e2e-users-progressive-${testId}`, + shapeOptions: { + url: `${baseUrl}/v1/shape`, + params: { + table: `${testSchema}.${usersTable}`, + }, + }, + syncMode: `progressive`, + getKey: (item: any) => item.id, + startSync: false, // Don't start immediately - tests will start when ready + [ELECTRIC_TEST_HOOKS]: { + beforeMarkingReady: () => usersUpToDateControl.createPromise(), + }, + }) + ) + + const progressivePosts = createCollection( + electricCollectionOptions({ + id: `electric-e2e-posts-progressive-${testId}`, + shapeOptions: { + url: `${baseUrl}/v1/shape`, + params: { + table: `${testSchema}.${postsTable}`, + }, + }, + syncMode: `progressive`, + getKey: (item: any) => item.id, + startSync: false, // Don't start immediately - tests will start when ready + [ELECTRIC_TEST_HOOKS]: { + beforeMarkingReady: () => postsUpToDateControl.createPromise(), + }, + }) + ) + + const progressiveComments = createCollection( + electricCollectionOptions({ + id: `electric-e2e-comments-progressive-${testId}`, + shapeOptions: { + url: `${baseUrl}/v1/shape`, + params: { + table: `${testSchema}.${commentsTable}`, + }, + }, + syncMode: `progressive`, + getKey: (item: any) => item.id, + startSync: false, // Don't start immediately - tests will start when ready + [ELECTRIC_TEST_HOOKS]: { + beforeMarkingReady: () => commentsUpToDateControl.createPromise(), + }, + }) + ) + // Wait for eager collections to sync all data await eagerUsers.preload() await eagerPosts.preload() @@ -314,6 +401,10 @@ describe(`Electric Collection E2E Tests`, () => { await onDemandPosts.preload() await onDemandComments.preload() + // Progressive collections start syncing in background + // Note: We DON'T call preload() here because the test hooks will block + // Individual progressive tests will handle preload and release as needed + config = { collections: { eager: { @@ -326,8 +417,30 @@ describe(`Electric Collection E2E Tests`, () => { posts: onDemandPosts as any, comments: onDemandComments as any, }, + progressive: { + users: progressiveUsers as any, + posts: progressivePosts as any, + comments: progressiveComments as any, + }, }, hasReplicationLag: true, // Electric has async replication lag + progressiveTestControl: { + releaseInitialSync: () => { + usersUpToDateControl.current?.() + postsUpToDateControl.current?.() + commentsUpToDateControl.current?.() + }, + }, + getTxid: async () => { + // Get the current transaction ID from the last operation + // This uses pg_current_xact_id_if_assigned() which returns the txid + // Note: This gets the CURRENT transaction's ID, so must be called + // immediately after an insert in the same transaction context + const result = await dbClient.query( + `SELECT pg_current_xact_id_if_assigned()::text::bigint as txid` + ) + return result.rows[0]?.txid || null + }, mutations: { // Use direct SQL for Electric tests (simulates external changes) // This tests that Electric sync picks up database changes @@ -420,6 +533,9 @@ describe(`Electric Collection E2E Tests`, () => { onDemandUsers.cleanup(), onDemandPosts.cleanup(), onDemandComments.cleanup(), + progressiveUsers.cleanup(), + progressivePosts.cleanup(), + progressiveComments.cleanup(), ]) }, } @@ -458,4 +574,5 @@ describe(`Electric Collection E2E Tests`, () => { createCollationTestSuite(getConfig) createMutationsTestSuite(getConfig) createLiveUpdatesTestSuite(getConfig) + createProgressiveTestSuite(getConfig) }) diff --git a/packages/electric-db-collection/package.json b/packages/electric-db-collection/package.json index 8d1cf8108..ccae104a6 100644 --- a/packages/electric-db-collection/package.json +++ b/packages/electric-db-collection/package.json @@ -3,7 +3,7 @@ "description": "ElectricSQL collection for TanStack DB", "version": "0.2.2", "dependencies": { - "@electric-sql/client": "^1.1.4", + "@electric-sql/client": "^1.1.5", "@standard-schema/spec": "^1.0.0", "@tanstack/db": "workspace:*", "@tanstack/store": "^0.8.0", diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 0b377f672..50ffe8f3c 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -40,6 +40,22 @@ export { isChangeMessage, isControlMessage } from "@electric-sql/client" const debug = DebugModule.debug(`ts/db:electric`) +/** + * Symbol for internal test hooks (hidden from public API) + */ +export const ELECTRIC_TEST_HOOKS = Symbol(`electricTestHooks`) + +/** + * Internal test hooks interface (for testing only) + */ +export interface ElectricTestHooks { + /** + * Called before marking collection ready after first up-to-date in progressive mode + * Allows tests to pause and validate snapshot phase before atomic swap completes + */ + beforeMarkingReady?: () => Promise +} + /** * Type representing a transaction ID in ElectricSQL */ @@ -118,6 +134,12 @@ export interface ElectricCollectionConfig< shapeOptions: ShapeStreamOptions> syncMode?: ElectricSyncMode + /** + * Internal test hooks (for testing only) + * Hidden via Symbol to prevent accidental usage in production + */ + [ELECTRIC_TEST_HOOKS]?: ElectricTestHooks + /** * Optional asynchronous handler function called before an insert operation * @param params Object containing transaction and collection information @@ -257,6 +279,93 @@ function hasTxids>( return `txids` in message.headers && Array.isArray(message.headers.txids) } +/** + * Creates a deduplicated loadSubset handler for progressive/on-demand modes + * Returns null for eager mode, or a DeduplicatedLoadSubset instance for other modes. + * Handles fetching snapshots in progressive mode during buffering phase, + * and requesting snapshots in on-demand mode + */ +function createLoadSubsetDedupe>({ + stream, + syncMode, + isBufferingInitialSync, + begin, + write, + commit, + collectionId, +}: { + stream: ShapeStream + syncMode: ElectricSyncMode + isBufferingInitialSync: () => boolean + begin: () => void + write: (mutation: { + type: `insert` | `update` | `delete` + value: T + metadata: Record + }) => void + commit: () => void + collectionId?: string +}): DeduplicatedLoadSubset | null { + // Eager mode doesn't need subset loading + if (syncMode === `eager`) { + return null + } + + const loadSubset = async (opts: LoadSubsetOptions) => { + // In progressive mode, use fetchSnapshot during snapshot phase + if (isBufferingInitialSync()) { + // Progressive mode snapshot phase: fetch and apply immediately + const snapshotParams = compileSQL(opts) + try { + const { data: rows } = await stream.fetchSnapshot(snapshotParams) + + // Check again if we're still buffering - we might have received up-to-date + // and completed the atomic swap while waiting for the snapshot + if (!isBufferingInitialSync()) { + debug( + `${collectionId ? `[${collectionId}] ` : ``}Ignoring snapshot - sync completed while fetching` + ) + return + } + + // Apply snapshot data in a sync transaction (only if we have data) + if (rows.length > 0) { + begin() + for (const row of rows) { + write({ + type: `insert`, + value: row.value, + metadata: { + ...row.headers, + }, + }) + } + commit() + + debug( + `${collectionId ? `[${collectionId}] ` : ``}Applied snapshot with ${rows.length} rows` + ) + } + } catch (error) { + debug( + `${collectionId ? `[${collectionId}] ` : ``}Error fetching snapshot: %o`, + error + ) + throw error + } + } else if (syncMode === `progressive`) { + // Progressive mode after full sync complete: no need to load more + return + } else { + // On-demand mode: use requestSnapshot + const snapshotParams = compileSQL(opts) + await stream.requestSnapshot(snapshotParams) + } + } + + return new DeduplicatedLoadSubset({ loadSubset }) +} + /** * Type for the awaitTxId utility function */ @@ -379,6 +488,7 @@ export function electricCollectionOptions( removePendingMatches, resolveMatchedPendingMatches, collectionId: config.id, + testHooks: config[ELECTRIC_TEST_HOOKS], }) /** @@ -631,6 +741,7 @@ function createElectricSync>( removePendingMatches: (matchIds: Array) => void resolveMatchedPendingMatches: () => void collectionId?: string + testHooks?: ElectricTestHooks } ): SyncConfig { const { @@ -642,6 +753,7 @@ function createElectricSync>( removePendingMatches, resolveMatchedPendingMatches, collectionId, + testHooks, } = options const MAX_BATCH_MESSAGES = 1000 // Safety limit for message buffer @@ -669,6 +781,26 @@ function createElectricSync>( sync: (params: Parameters[`sync`]>[0]) => { const { begin, write, commit, markReady, truncate, collection } = params + // Wrap markReady to wait for test hook in progressive mode + let progressiveReadyGate: Promise | null = null + const wrappedMarkReady = (isBuffering: boolean) => { + // Only create gate if we're in buffering phase (first up-to-date) + if ( + isBuffering && + syncMode === `progressive` && + testHooks?.beforeMarkingReady + ) { + // Create a new gate promise for this sync cycle + progressiveReadyGate = testHooks.beforeMarkingReady() + progressiveReadyGate.then(() => { + markReady() + }) + } else { + // No hook, not buffering, or already past first up-to-date + markReady() + } + } + // Abort controller for the stream - wraps the signal if provided const abortController = new AbortController() @@ -734,22 +866,24 @@ function createElectricSync>( const newSnapshots: Array = [] let hasReceivedUpToDate = false // Track if we've completed initial sync in progressive mode + // Progressive mode state + // Helper to determine if we're buffering the initial sync + const isBufferingInitialSync = () => + syncMode === `progressive` && !hasReceivedUpToDate + const bufferedMessages: Array> = [] // Buffer change messages during initial sync + // Create deduplicated loadSubset wrapper for non-eager modes // This prevents redundant snapshot requests when multiple concurrent // live queries request overlapping or subset predicates - const loadSubsetDedupe = - syncMode === `eager` - ? null - : new DeduplicatedLoadSubset({ - loadSubset: async (opts: LoadSubsetOptions) => { - // In progressive mode, stop requesting snapshots once full sync is complete - if (syncMode === `progressive` && hasReceivedUpToDate) { - return - } - const snapshotParams = compileSQL(opts) - await stream.requestSnapshot(snapshotParams) - }, - }) + const loadSubsetDedupe = createLoadSubsetDedupe({ + stream, + syncMode, + isBufferingInitialSync, + begin, + write, + commit, + collectionId, + }) unsubscribeStream = stream.subscribe((messages: Array>) => { let hasUpToDate = false @@ -769,7 +903,8 @@ function createElectricSync>( } // Check for txids in the message and add them to our store - if (hasTxids(message)) { + // Skip during buffered initial sync in progressive mode (txids will be extracted during atomic swap) + if (hasTxids(message) && !isBufferingInitialSync()) { message.headers.txids?.forEach((txid) => newTxids.add(txid)) } @@ -803,21 +938,30 @@ function createElectricSync>( relationSchema.setState(() => schema) } - if (!transactionStarted) { - begin() - transactionStarted = true - } + // In buffered initial sync of progressive mode, buffer messages instead of writing + if (isBufferingInitialSync()) { + bufferedMessages.push(message) + } else { + // Normal processing: write changes immediately + if (!transactionStarted) { + begin() + transactionStarted = true + } - write({ - type: message.headers.operation, - value: message.value, - // Include the primary key and relation info in the metadata - metadata: { - ...message.headers, - }, - }) + write({ + type: message.headers.operation, + value: message.value, + // Include the primary key and relation info in the metadata + metadata: { + ...message.headers, + }, + }) + } } else if (isSnapshotEndMessage(message)) { - newSnapshots.push(parseSnapshotMessage(message)) + // Skip snapshot-end tracking during buffered initial sync (will be extracted during atomic swap) + if (!isBufferingInitialSync()) { + newSnapshots.push(parseSnapshotMessage(message)) + } hasSnapshotEnd = true } else if (isUpToDateMessage(message)) { hasUpToDate = true @@ -841,23 +985,71 @@ function createElectricSync>( // Reset flags so we continue accumulating changes until next up-to-date hasUpToDate = false hasSnapshotEnd = false - hasReceivedUpToDate = false // Reset for progressive mode - we're starting a new sync + hasReceivedUpToDate = false // Reset for progressive mode (isBufferingInitialSync will reflect this) + bufferedMessages.length = 0 // Clear buffered messages } } if (hasUpToDate || hasSnapshotEnd) { - // Clear the current batch buffer since we're now up-to-date - currentBatchMessages.setState(() => []) + // PROGRESSIVE MODE: Atomic swap on first up-to-date + if (isBufferingInitialSync() && hasUpToDate) { + debug( + `${collectionId ? `[${collectionId}] ` : ``}Progressive mode: Performing atomic swap with ${bufferedMessages.length} buffered messages` + ) + + // Start atomic swap transaction + begin() + + // Truncate to clear all snapshot data + truncate() + + // Apply all buffered change messages and extract txids/snapshots + for (const bufferedMsg of bufferedMessages) { + if (isChangeMessage(bufferedMsg)) { + write({ + type: bufferedMsg.headers.operation, + value: bufferedMsg.value, + metadata: { + ...bufferedMsg.headers, + }, + }) + + // Extract txids from buffered messages (will be committed to store after transaction) + if (hasTxids(bufferedMsg)) { + bufferedMsg.headers.txids?.forEach((txid) => + newTxids.add(txid) + ) + } + } else if (isSnapshotEndMessage(bufferedMsg)) { + // Extract snapshots from buffered messages (will be committed to store after transaction) + newSnapshots.push(parseSnapshotMessage(bufferedMsg)) + } + } - // Commit transaction if one was started - if (transactionStarted) { + // Commit the atomic swap commit() - transactionStarted = false + + // Exit buffering phase by marking that we've received up-to-date + // isBufferingInitialSync() will now return false + bufferedMessages.length = 0 + + debug( + `${collectionId ? `[${collectionId}] ` : ``}Progressive mode: Atomic swap complete, now in normal sync mode` + ) + } else { + // Normal mode or on-demand: commit transaction if one was started + if (transactionStarted) { + commit() + transactionStarted = false + } } + // Clear the current batch buffer since we're now up-to-date + currentBatchMessages.setState(() => []) + if (hasUpToDate || (hasSnapshotEnd && syncMode === `on-demand`)) { // Mark the collection as ready now that sync is up to date - markReady() + wrappedMarkReady(isBufferingInitialSync()) } // Track that we've received the first up-to-date for progressive mode diff --git a/packages/electric-db-collection/tests/electric-live-query.test.ts b/packages/electric-db-collection/tests/electric-live-query.test.ts index 5f269099a..3a6489454 100644 --- a/packages/electric-db-collection/tests/electric-live-query.test.ts +++ b/packages/electric-db-collection/tests/electric-live-query.test.ts @@ -56,8 +56,10 @@ const sampleUsers: Array = [ // Mock the ShapeStream module const mockSubscribe = vi.fn() const mockRequestSnapshot = vi.fn() +const mockFetchSnapshot = vi.fn() const mockStream = { subscribe: mockSubscribe, + fetchSnapshot: mockFetchSnapshot, requestSnapshot: async (...args: any) => { const result = await mockRequestSnapshot(...args) const subscribers = mockSubscribe.mock.calls.map((call) => call[0]) @@ -88,6 +90,14 @@ mockRequestSnapshot.mockResolvedValue({ data: [], }) +// Mock the fetchSnapshot method +// to return empty data with metadata +// since most tests don't use it +mockFetchSnapshot.mockResolvedValue({ + metadata: {}, + data: [], +}) + vi.mock(`@electric-sql/client`, async () => { const actual = await vi.importActual(`@electric-sql/client`) return { @@ -135,7 +145,13 @@ describe.each([ return createCollection({ ...options, startSync: true, - }) + }) as unknown as Collection< + User, + string | number, + ElectricCollectionUtils, + StandardSchemaV1, + User + > } function simulateInitialSync(users: Array = sampleUsers) { @@ -726,12 +742,12 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => { expect(liveQuery.size).toBeGreaterThan(2) }) - it(`should trigger requestSnapshot in progressive mode when live query needs more data`, async () => { + it(`should trigger fetchSnapshot in progressive mode when live query needs more data`, async () => { const electricCollection = createElectricCollectionWithSyncMode(`progressive`) - // Send initial snapshot with limited data (using snapshot-end, not up-to-date) - // This keeps the collection in "loading" state, simulating progressive mode still syncing + // In progressive mode, stream messages are buffered until up-to-date + // So collection starts empty even though we send data subscriber([ { key: sampleUsers[0]!.id.toString(), @@ -743,25 +759,19 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => { value: sampleUsers[1]!, headers: { operation: `insert` }, }, - { - headers: { - control: `snapshot-end`, - xmin: `100`, - xmax: `110`, - xip_list: [], - }, - }, ]) expect(electricCollection.status).toBe(`loading`) // Still syncing in progressive mode - expect(electricCollection.size).toBe(2) + // Messages are buffered, so size is 0 until up-to-date + expect(electricCollection.size).toBe(0) - // Mock requestSnapshot to return additional data - mockRequestSnapshot.mockResolvedValueOnce({ + // Mock fetchSnapshot to return data + mockFetchSnapshot.mockResolvedValueOnce({ + metadata: {}, data: [ { headers: { operation: `insert` }, - key: 3, + key: sampleUsers[2]!.id.toString(), value: sampleUsers[2]!, // Charlie }, ], @@ -781,15 +791,16 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => { // Wait for the live query to process await new Promise((resolve) => setTimeout(resolve, 0)) - // Should have requested more data from Electric with correct parameters - // First request asks for the full limit - expect(mockRequestSnapshot).toHaveBeenCalledWith( + // Should have fetched more data from Electric with correct parameters + // Progressive mode uses fetchSnapshot, not requestSnapshot + expect(mockFetchSnapshot).toHaveBeenCalledWith( expect.objectContaining({ limit: 3, // Requests full limit from Electric orderBy: `"id" NULLS FIRST`, params: {}, }) ) + expect(mockRequestSnapshot).not.toHaveBeenCalled() }) it(`should NOT trigger requestSnapshot in eager mode even when live query needs more data`, async () => { @@ -904,22 +915,10 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => { ) }) - it(`should handle complex filters in requestSnapshot`, async () => { + it(`should handle complex filters in fetchSnapshot`, async () => { const electricCollection = createElectricCollectionWithSyncMode(`progressive`) - // Send snapshot-end (not up-to-date) to keep collection in loading state - subscriber([ - { - headers: { - control: `snapshot-end`, - xmin: `100`, - xmax: `110`, - xip_list: [], - }, - }, - ]) - expect(electricCollection.status).toBe(`loading`) // Still syncing in progressive mode // Create live query with complex WHERE clause @@ -936,8 +935,8 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => { await new Promise((resolve) => setTimeout(resolve, 0)) - // Should have requested snapshot with complex WHERE clause - expect(mockRequestSnapshot).toHaveBeenCalledWith( + // Should have called fetchSnapshot with complex WHERE clause (not requestSnapshot) + expect(mockFetchSnapshot).toHaveBeenCalledWith( expect.objectContaining({ where: `"age" > $1`, params: { "1": `20` }, @@ -945,6 +944,7 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => { limit: 5, }) ) + expect(mockRequestSnapshot).not.toHaveBeenCalled() }) }) diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index 8637972f4..04ecd4fb1 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -20,9 +20,11 @@ import type { StandardSchemaV1 } from "@standard-schema/spec" // Mock the ShapeStream module const mockSubscribe = vi.fn() const mockRequestSnapshot = vi.fn() +const mockFetchSnapshot = vi.fn() const mockStream = { subscribe: mockSubscribe, requestSnapshot: mockRequestSnapshot, + fetchSnapshot: mockFetchSnapshot, } vi.mock(`@electric-sql/client`, async () => { @@ -72,7 +74,13 @@ describe(`Electric Integration`, () => { const options = electricCollectionOptions(config) // Create collection with Electric configuration using the new utility exposure pattern - collection = createCollection(options) + collection = createCollection(options) as unknown as Collection< + Row, + string | number, + ElectricCollectionUtils, + StandardSchemaV1, + Row + > }) it(`should commit an empty transaction when there's an up-to-date`, () => { @@ -673,9 +681,9 @@ describe(`Electric Integration`, () => { }) it(`should support custom timeout in matching strategy`, async () => { - const onInsert = vi.fn(async () => { + const onInsert = vi.fn(() => { // Return a txid that will never arrive with a very short timeout - return { txid: 999999, timeout: 100 } + return Promise.resolve({ txid: 999999, timeout: 100 }) }) const config = { @@ -1791,9 +1799,24 @@ describe(`Electric Integration`, () => { ) }) - it(`should request incremental snapshots in progressive mode when loadSubset is called before sync completes`, async () => { + it(`should fetch snapshots in progressive mode when loadSubset is called before sync completes`, async () => { vi.clearAllMocks() + mockSubscribe.mockImplementation((_callback) => { + return () => {} + }) + mockRequestSnapshot.mockResolvedValue(undefined) + mockFetchSnapshot.mockResolvedValue({ + metadata: {}, + data: [ + { + key: `2`, + value: { id: 2, name: `Snapshot User` }, + headers: { operation: `insert` }, + }, + ], + }) + const config = { id: `progressive-snapshot-test`, shapeOptions: { @@ -1809,35 +1832,23 @@ describe(`Electric Integration`, () => { const testCollection = createCollection(electricCollectionOptions(config)) - // Send initial data with snapshot-end (but not up-to-date yet - still syncing) - subscriber([ - { - key: `1`, - value: { id: 1, name: `Test User` }, - headers: { operation: `insert` }, - }, - { - headers: { - control: `snapshot-end`, - xmin: `100`, - xmax: `110`, - xip_list: [], - }, - }, - ]) - expect(testCollection.status).toBe(`loading`) // Not ready yet - // In progressive mode, calling loadSubset should request a snapshot BEFORE full sync completes + // In progressive mode, calling loadSubset should fetch a snapshot BEFORE full sync completes await testCollection._sync.loadSubset({ limit: 20 }) - // Verify requestSnapshot was called - expect(mockRequestSnapshot).toHaveBeenCalledWith( + // Verify fetchSnapshot was called (not requestSnapshot) + expect(mockFetchSnapshot).toHaveBeenCalledWith( expect.objectContaining({ limit: 20, params: {}, }) ) + expect(mockRequestSnapshot).not.toHaveBeenCalled() + + // Verify snapshot data was applied + expect(testCollection.has(2)).toBe(true) + expect(testCollection.get(2)).toEqual({ id: 2, name: `Snapshot User` }) }) it(`should not request snapshots when loadSubset is called in eager mode`, async () => { @@ -1875,6 +1886,23 @@ describe(`Electric Integration`, () => { it(`should handle progressive mode syncing in background`, async () => { vi.clearAllMocks() + let testSubscriber!: (messages: Array>) => void + mockSubscribe.mockImplementation((callback) => { + testSubscriber = callback + return () => {} + }) + mockRequestSnapshot.mockResolvedValue(undefined) + mockFetchSnapshot.mockResolvedValue({ + metadata: {}, + data: [ + { + key: `2`, + value: { id: 2, name: `Snapshot User` }, + headers: { operation: `insert` }, + }, + ], + }) + const config = { id: `progressive-background-sync-test`, shapeOptions: { @@ -1890,44 +1918,61 @@ describe(`Electric Integration`, () => { const testCollection = createCollection(electricCollectionOptions(config)) - // Send initial data with snapshot-end (but not up-to-date - still syncing) - subscriber([ + // Send stream data during snapshot phase (should be buffered) + testSubscriber([ { key: `1`, value: { id: 1, name: `Initial User` }, headers: { operation: `insert` }, }, - { - headers: { - control: `snapshot-end`, - xmin: `100`, - xmax: `110`, - xip_list: [], - }, - }, ]) - // Collection should have data but not be ready yet + // Collection should NOT have the buffered data yet expect(testCollection.status).toBe(`loading`) - expect(testCollection.has(1)).toBe(true) + expect(testCollection.has(1)).toBe(false) - // Should be able to request more data incrementally before full sync completes + // Should be able to fetch snapshot data incrementally before full sync completes await testCollection._sync.loadSubset({ limit: 10 }) - expect(mockRequestSnapshot).toHaveBeenCalled() + expect(mockFetchSnapshot).toHaveBeenCalled() + expect(mockRequestSnapshot).not.toHaveBeenCalled() - // Now send up-to-date to complete the sync - subscriber([ + // Snapshot data should be visible + expect(testCollection.has(2)).toBe(true) + + // Now send up-to-date to complete the sync (triggers atomic swap) + testSubscriber([ { headers: { control: `up-to-date` }, }, ]) expect(testCollection.status).toBe(`ready`) + + // After atomic swap, buffered data should be visible, snapshot data cleared + expect(testCollection.has(1)).toBe(true) + expect(testCollection.has(2)).toBe(false) // Snapshot data truncated }) - it(`should stop requesting snapshots in progressive mode after first up-to-date`, async () => { + it(`should stop fetching snapshots in progressive mode after first up-to-date and perform atomic swap`, async () => { vi.clearAllMocks() + let testSubscriber!: (messages: Array>) => void + mockSubscribe.mockImplementation((callback) => { + testSubscriber = callback + return () => {} + }) + mockRequestSnapshot.mockResolvedValue(undefined) + mockFetchSnapshot.mockResolvedValue({ + metadata: {}, + data: [ + { + key: `2`, + value: { id: 2, name: `Snapshot User` }, + headers: { operation: `insert` }, + }, + ], + }) + const config = { id: `progressive-stop-after-sync-test`, shapeOptions: { @@ -1943,33 +1988,28 @@ describe(`Electric Integration`, () => { const testCollection = createCollection(electricCollectionOptions(config)) - // Send initial data with snapshot-end (not up-to-date yet) - subscriber([ + expect(testCollection.status).toBe(`loading`) // Not ready yet in progressive + + // Should be able to fetch data before up-to-date (snapshot phase) + vi.clearAllMocks() + await testCollection._sync.loadSubset({ limit: 10 }) + expect(mockFetchSnapshot).toHaveBeenCalledTimes(1) + expect(testCollection.has(2)).toBe(true) // Snapshot data applied + + // Send change messages during snapshot phase (should be buffered) + testSubscriber([ { key: `1`, value: { id: 1, name: `User 1` }, headers: { operation: `insert` }, }, - { - headers: { - control: `snapshot-end`, - xmin: `100`, - xmax: `110`, - xip_list: [], - }, - }, ]) - expect(testCollection.status).toBe(`loading`) // Not ready yet in progressive - expect(testCollection.has(1)).toBe(true) - - // Should be able to request more data before up-to-date - vi.clearAllMocks() - await testCollection._sync.loadSubset({ limit: 10 }) - expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) + // Data should NOT be visible yet (buffered) + expect(testCollection.has(1)).toBe(false) - // Now send up-to-date to complete the full sync - subscriber([ + // Now send up-to-date to complete the full sync and trigger atomic swap + testSubscriber([ { headers: { control: `up-to-date` }, }, @@ -1977,10 +2017,15 @@ describe(`Electric Integration`, () => { expect(testCollection.status).toBe(`ready`) - // Try to request more data - should NOT make a request since full sync is complete + // After atomic swap, buffered data should be visible + expect(testCollection.has(1)).toBe(true) + // Snapshot data should be cleared by truncate, so id:2 should be gone + expect(testCollection.has(2)).toBe(false) + + // Try to fetch more data - should NOT make a request since full sync is complete vi.clearAllMocks() await testCollection._sync.loadSubset({ limit: 10 }) - expect(mockRequestSnapshot).not.toHaveBeenCalled() + expect(mockFetchSnapshot).not.toHaveBeenCalled() }) it(`should allow snapshots in on-demand mode even after up-to-date`, async () => { @@ -2021,6 +2066,84 @@ describe(`Electric Integration`, () => { expect(mockRequestSnapshot).toHaveBeenCalled() }) + it(`should ignore snapshot data when fetchSnapshot completes after up-to-date in progressive mode`, async () => { + vi.clearAllMocks() + + let testSubscriber!: (messages: Array>) => void + let resolveFetchSnapshot!: (value: any) => void + const fetchSnapshotPromise = new Promise((resolve) => { + resolveFetchSnapshot = resolve as any + }) + + mockSubscribe.mockImplementation((callback) => { + testSubscriber = callback + return () => {} + }) + mockRequestSnapshot.mockResolvedValue(undefined) + // Mock fetchSnapshot to return a promise we control + mockFetchSnapshot.mockReturnValue(fetchSnapshotPromise) + + const config = { + id: `progressive-snapshot-race-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + syncMode: `progressive` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + expect(testCollection.status).toBe(`loading`) + + // Start a loadSubset request (should call fetchSnapshot) + const loadSubsetPromise = testCollection._sync.loadSubset({ limit: 10 }) + + // Verify fetchSnapshot was called + expect(mockFetchSnapshot).toHaveBeenCalled() + + // Before the snapshot completes, send up-to-date to complete the sync + testSubscriber([ + { + key: `1`, + value: { id: 1, name: `Buffered User` }, + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Sync should be complete now + expect(testCollection.status).toBe(`ready`) + expect(testCollection.has(1)).toBe(true) + expect(testCollection.size).toBe(1) + + // Now resolve the fetchSnapshot with data + resolveFetchSnapshot({ + metadata: {}, + data: [ + { + key: `2`, + value: { id: 2, name: `Late Snapshot User` }, + headers: { operation: `insert` }, + }, + ], + }) + + // Wait for loadSubset to complete + await loadSubsetPromise + + // The snapshot data should be IGNORED because sync already completed + expect(testCollection.has(2)).toBe(false) + expect(testCollection.size).toBe(1) // Still only the buffered user + expect(testCollection.get(1)).toEqual({ id: 1, name: `Buffered User` }) + }) + it(`should default offset to 'now' in on-demand mode when no offset provided`, async () => { vi.clearAllMocks() @@ -2229,7 +2352,17 @@ describe(`Electric Integration`, () => { expect(testCollection.status).toBe(`ready`) }) - it(`should commit on snapshot-end in progressive mode but not mark ready`, () => { + it(`should buffer changes during snapshot phase in progressive mode until up-to-date`, () => { + vi.clearAllMocks() + + let testSubscriber!: (messages: Array>) => void + mockSubscribe.mockImplementation((callback) => { + testSubscriber = callback + return () => {} + }) + mockRequestSnapshot.mockResolvedValue(undefined) + mockFetchSnapshot.mockResolvedValue({ metadata: {}, data: [] }) + const config = { id: `progressive-snapshot-end-test`, shapeOptions: { @@ -2244,7 +2377,8 @@ describe(`Electric Integration`, () => { const testCollection = createCollection(electricCollectionOptions(config)) // Send data followed by snapshot-end (but no up-to-date) - subscriber([ + // In progressive mode, these messages should be BUFFERED, not committed + testSubscriber([ { key: `1`, value: { id: 1, name: `Test User` }, @@ -2260,21 +2394,24 @@ describe(`Electric Integration`, () => { }, ]) - // Data should be committed (available in state) - expect(testCollection.has(1)).toBe(true) - expect(testCollection.get(1)).toEqual({ id: 1, name: `Test User` }) + // Data should NOT be visible yet (it's buffered during snapshot phase) + expect(testCollection.has(1)).toBe(false) - // But collection should NOT be marked as ready yet in progressive mode + // Collection should NOT be marked as ready yet in progressive mode expect(testCollection.status).toBe(`loading`) - // Now send up-to-date - subscriber([ + // Now send up-to-date (triggers atomic swap) + testSubscriber([ { headers: { control: `up-to-date` }, }, ]) - // Now it should be ready + // Now data should be visible after atomic swap + expect(testCollection.has(1)).toBe(true) + expect(testCollection.get(1)).toEqual({ id: 1, name: `Test User` }) + + // And it should be ready expect(testCollection.status).toBe(`ready`) }) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3b1c50ea4..08559d125 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -790,8 +790,8 @@ importers: packages/electric-db-collection: dependencies: '@electric-sql/client': - specifier: ^1.1.4 - version: 1.1.4 + specifier: ^1.1.5 + version: 1.1.5 '@standard-schema/spec': specifier: ^1.0.0 version: 1.0.0 @@ -1595,6 +1595,9 @@ packages: '@electric-sql/client@1.1.4': resolution: {integrity: sha512-88M2iEg5LYSE388wNWjrQMpsIgLzwge60cCN5znQSDEnUEprJZ13vB3r+S8IRNYbmOtUSNVEODuXn+nlqEhEGw==} + '@electric-sql/client@1.1.5': + resolution: {integrity: sha512-5QXmdXkl6AtDX89TRNwV2AVX0+ZJXRcA8/F55iIaifOKWq4sZaXiI9+Rb6iMhAl/S3MSw9abZucaLHBL+hScQg==} + '@emnapi/core@1.5.0': resolution: {integrity: sha512-sbP8GzB1WDzacS8fgNPpHlp6C9VZe+SJP3F90W9rLemaQj2PzIuTEl1qDOYQf58YIpyjViI24y9aPWCjEzY2cg==} @@ -9727,6 +9730,12 @@ snapshots: optionalDependencies: '@rollup/rollup-darwin-arm64': 4.52.5 + '@electric-sql/client@1.1.5': + dependencies: + '@microsoft/fetch-event-source': 2.0.1 + optionalDependencies: + '@rollup/rollup-darwin-arm64': 4.52.5 + '@emnapi/core@1.5.0': dependencies: '@emnapi/wasi-threads': 1.1.0