From 401f4fc807ddeea8d6111ccdb27a7168058f5226 Mon Sep 17 00:00:00 2001 From: Proactive Runtime Bot Date: Thu, 21 May 2026 12:10:03 +0200 Subject: [PATCH] fix: resume incremental pulls within event pages --- internal/mountsync/syncer.go | 72 ++++++++++- internal/mountsync/syncer_test.go | 199 ++++++++++++++++++++++++++++++ 2 files changed, 268 insertions(+), 3 deletions(-) diff --git a/internal/mountsync/syncer.go b/internal/mountsync/syncer.go index a0996d9..071a4ca 100644 --- a/internal/mountsync/syncer.go +++ b/internal/mountsync/syncer.go @@ -688,6 +688,7 @@ type Syncer struct { type mountState struct { Files map[string]trackedFile `json:"files"` EventsCursor string `json:"eventsCursor,omitempty"` + IncrementalCheckpoint *incrementalCheckpoint `json:"incrementalCheckpoint,omitempty"` LastReconcileAt string `json:"lastReconcileAt,omitempty"` LastSuccessfulReconcileAt string `json:"lastSuccessfulReconcileAt,omitempty"` LastEventAt string `json:"lastEventAt,omitempty"` @@ -712,6 +713,13 @@ type mountState struct { BootstrapStartedAt string `json:"bootstrapStartedAt,omitempty"` } +type incrementalCheckpoint struct { + Cursor string `json:"cursor,omitempty"` + PageCursor string `json:"pageCursor,omitempty"` + Phase string `json:"phase,omitempty"` + Path string `json:"path,omitempty"` +} + // telemetryCounters tracks defensive-guard activity so operators can see at // a glance whether the breaker has fired, oversized writebacks have been // dropped, root-target denials have hit, etc. @@ -2017,6 +2025,7 @@ func (s *Syncer) pullRemote(ctx context.Context, conflicted map[string]struct{}) // (advanced past). Replaying from the prior cursor is safe — // applyRemoteFile is idempotent and will no-op when on-disk // content already matches. + s.state.IncrementalCheckpoint = nil return nil } feed, err := s.client.ListEvents(ctx, s.workspace, s.eventProvider, s.state.EventsCursor, 1) @@ -2048,6 +2057,7 @@ func (s *Syncer) pullRemote(ctx context.Context, conflicted map[string]struct{}) } s.logf("events feed unavailable; falling back to full pull") s.state.EventsCursor = "" + s.state.IncrementalCheckpoint = nil } // Restart fast-path. When EventsCursor is empty but the state file @@ -2100,6 +2110,7 @@ func (s *Syncer) pullRemote(ctx context.Context, conflicted map[string]struct{}) // rw_517d60b6 partial-mirror hazard (still gated on // BootstrapComplete). s.state.EventsCursor = cursor + s.state.IncrementalCheckpoint = nil s.logf("restart fast-path: seeded events cursor %q from %d tracked files; skipping bootstrap full pull", cursor, len(s.state.Files)) return nil } @@ -2125,6 +2136,7 @@ func (s *Syncer) pullRemote(ctx context.Context, conflicted map[string]struct{}) if err := s.pullRemoteFull(bctx, conflicted, bprog); err != nil { return err } + s.state.IncrementalCheckpoint = nil if s.wsConn != nil { return nil } @@ -2855,6 +2867,7 @@ func (s *Syncer) pullRemoteIncremental(ctx context.Context, conflicted map[strin safeCursor := currentCursor for { + pageStartCursor := currentCursor feed, err := s.client.ListEvents(ctx, s.workspace, s.eventProvider, currentCursor, 500) if err != nil { return safeCursor, err @@ -2909,9 +2922,11 @@ func (s *Syncer) pullRemoteIncremental(ctx context.Context, conflicted map[strin delete(changed, remotePath) } } - if err := s.applyIncrementalChanges(ctx, changed, deleted, conflicted); err != nil { + checkpoint := s.incrementalCheckpointForPage(pageStartCursor, pageCursor) + if err := s.applyIncrementalChanges(ctx, changed, deleted, conflicted, pageStartCursor, pageCursor, checkpoint); err != nil { return safeCursor, err } + s.state.IncrementalCheckpoint = nil if pageLastEventAt != "" { s.state.LastEventAt = pageLastEventAt } @@ -2934,18 +2949,54 @@ func (s *Syncer) pullRemoteIncremental(ctx context.Context, conflicted map[strin return safeCursor, nil } -func (s *Syncer) applyIncrementalChanges(ctx context.Context, changed, deleted map[string]struct{}, conflicted map[string]struct{}) error { +func (s *Syncer) incrementalCheckpointForPage(cursor, pageCursor string) incrementalCheckpoint { + if s.state.IncrementalCheckpoint == nil { + return incrementalCheckpoint{} + } + checkpoint := *s.state.IncrementalCheckpoint + checkpoint.Cursor = strings.TrimSpace(checkpoint.Cursor) + checkpoint.PageCursor = strings.TrimSpace(checkpoint.PageCursor) + checkpoint.Phase = strings.TrimSpace(checkpoint.Phase) + checkpoint.Path = normalizeRemotePath(checkpoint.Path) + if checkpoint.Cursor != strings.TrimSpace(cursor) || checkpoint.PageCursor != strings.TrimSpace(pageCursor) { + return incrementalCheckpoint{} + } + if checkpoint.Phase != "changed" && checkpoint.Phase != "deleted" { + return incrementalCheckpoint{} + } + if checkpoint.Path == "/" || strings.TrimSpace(checkpoint.Path) == "" { + return incrementalCheckpoint{} + } + return checkpoint +} + +func (s *Syncer) applyIncrementalChanges( + ctx context.Context, + changed, deleted map[string]struct{}, + conflicted map[string]struct{}, + pageStartCursor, pageCursor string, + checkpoint incrementalCheckpoint, +) error { changedPaths := make([]string, 0, len(changed)) for remotePath := range changed { changedPaths = append(changedPaths, remotePath) } sort.Strings(changedPaths) for _, remotePath := range changedPaths { + if checkpoint.Phase == "changed" && remotePath <= checkpoint.Path { + continue + } + if checkpoint.Phase == "deleted" { + continue + } file, err := s.client.ReadFile(ctx, s.workspace, remotePath) if err != nil { var httpErr *HTTPError if errors.As(err, &httpErr) && httpErr.StatusCode == http.StatusNotFound { - deleted[remotePath] = struct{}{} + if err := s.applyRemoteDelete(remotePath, conflicted); err != nil { + return err + } + s.markIncrementalCheckpoint(pageStartCursor, pageCursor, "changed", remotePath) continue } if errors.As(err, &httpErr) && httpErr.StatusCode == http.StatusForbidden { @@ -2953,6 +3004,7 @@ func (s *Syncer) applyIncrementalChanges(ctx context.Context, changed, deleted m if markErr := s.markReadDenied(remotePath); markErr != nil { return markErr } + s.markIncrementalCheckpoint(pageStartCursor, pageCursor, "changed", remotePath) continue } return err @@ -2960,6 +3012,7 @@ func (s *Syncer) applyIncrementalChanges(ctx context.Context, changed, deleted m if err := s.applyRemoteFile(remotePath, file, conflicted); err != nil { return err } + s.markIncrementalCheckpoint(pageStartCursor, pageCursor, "changed", remotePath) } deletedPaths := make([]string, 0, len(deleted)) @@ -2968,13 +3021,26 @@ func (s *Syncer) applyIncrementalChanges(ctx context.Context, changed, deleted m } sort.Strings(deletedPaths) for _, remotePath := range deletedPaths { + if checkpoint.Phase == "deleted" && remotePath <= checkpoint.Path { + continue + } if err := s.applyRemoteDelete(remotePath, conflicted); err != nil { return err } + s.markIncrementalCheckpoint(pageStartCursor, pageCursor, "deleted", remotePath) } return nil } +func (s *Syncer) markIncrementalCheckpoint(pageStartCursor, pageCursor, phase, remotePath string) { + s.state.IncrementalCheckpoint = &incrementalCheckpoint{ + Cursor: strings.TrimSpace(pageStartCursor), + PageCursor: strings.TrimSpace(pageCursor), + Phase: strings.TrimSpace(phase), + Path: normalizeRemotePath(remotePath), + } +} + func (s *Syncer) resolveLatestEventCursor(ctx context.Context) (string, error) { // Derive an OWN short deadline from rootCtx so a slow/hanging events // feed can never wedge an otherwise healthy cycle (and is independent diff --git a/internal/mountsync/syncer_test.go b/internal/mountsync/syncer_test.go index 6fcb2db..0c79aab 100644 --- a/internal/mountsync/syncer_test.go +++ b/internal/mountsync/syncer_test.go @@ -3456,6 +3456,8 @@ type fakeClient struct { eventsUnsupported bool listEventsErrAfter int listEventsErr error + readFileErrAfter int + readFileErr error } // requestedReadCalls returns the cumulative number of ReadFile calls made @@ -3574,6 +3576,9 @@ func (c *fakeClient) ReadFile(ctx context.Context, workspaceID, path string) (Re c.readFileCallsByPath = make(map[string]int) } c.readFileCallsByPath[path]++ + if c.readFileErr != nil && c.readFileCalls > c.readFileErrAfter { + return RemoteFile{}, c.readFileErr + } file, ok := c.files[path] if !ok { return RemoteFile{}, &HTTPError{StatusCode: 404, Code: "not_found", Message: "not found"} @@ -4626,6 +4631,200 @@ func TestPullRemoteIncrementalPersistsAppliedPageCursorOnListEventsError(t *test } } +func TestPullRemoteIncrementalResumesWithinAppliedPage(t *testing.T) { + files := map[string]RemoteFile{} + events := make([]FilesystemEvent, 0, 10) + for i := 1; i <= 10; i++ { + remotePath := fmt.Sprintf("/notion/Docs/%03d.md", i) + revision := fmt.Sprintf("rev_%03d", i) + files[remotePath] = RemoteFile{ + Path: remotePath, + Revision: revision, + ContentType: "text/markdown", + Content: fmt.Sprintf("# %03d", i), + } + events = append(events, FilesystemEvent{ + EventID: fmt.Sprintf("evt_%03d", i), + Type: "file.created", + Path: remotePath, + Revision: revision, + }) + } + client := &fakeClient{ + files: files, + events: events, + revisionCounter: 10, + eventCounter: 10, + readFileErrAfter: 3, + readFileErr: context.DeadlineExceeded, + } + localDir := t.TempDir() + syncer, err := NewSyncer(client, SyncerOptions{ + WorkspaceID: "ws_page_resume", + RemoteRoot: "/notion", + LocalRoot: localDir, + FullPullEvery: -1, + WebSocket: boolPtr(false), + CursorTimeout: time.Second, + BootstrapTimeout: time.Second, + }) + if err != nil { + t.Fatalf("new syncer failed: %v", err) + } + syncer.loaded = true + syncer.state = mountState{ + Files: map[string]trackedFile{}, + EventsCursor: "evt_000", + BootstrapComplete: true, + } + + err = syncer.Reconcile(context.Background()) + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("expected read-file deadline inside first page, got %v", err) + } + if got := syncer.state.EventsCursor; got != "evt_000" { + t.Fatalf("EventsCursor should stay on the unapplied page cursor; got %q", got) + } + if syncer.state.IncrementalCheckpoint == nil { + t.Fatalf("expected checkpoint after partial page") + } + if checkpoint := *syncer.state.IncrementalCheckpoint; checkpoint.Cursor != "evt_000" || + checkpoint.PageCursor != "evt_010" || + checkpoint.Phase != "changed" || + checkpoint.Path != "/notion/Docs/003.md" { + t.Fatalf("unexpected checkpoint after partial page: %#v", checkpoint) + } + for i := 1; i <= 3; i++ { + path := filepath.Join(localDir, "Docs", fmt.Sprintf("%03d.md", i)) + if _, err := os.ReadFile(path); err != nil { + t.Fatalf("expected file %03d to be applied before timeout: %v", i, err) + } + } + + client.readFileErr = nil + before := make(map[string]int, len(client.readFileCallsByPath)) + for path, calls := range client.readFileCallsByPath { + before[path] = calls + } + if err := syncer.Reconcile(context.Background()); err != nil { + t.Fatalf("reconcile after partial page failed: %v", err) + } + for i := 1; i <= 3; i++ { + remotePath := fmt.Sprintf("/notion/Docs/%03d.md", i) + if got := client.readFileCallsByPath[remotePath]; got != before[remotePath] { + t.Fatalf("expected %s to be skipped on resume; read calls went %d -> %d", remotePath, before[remotePath], got) + } + } + if got := client.readFileCallsByPath["/notion/Docs/004.md"]; got == 0 { + t.Fatalf("expected resume to continue after checkpoint path") + } + if got := syncer.state.EventsCursor; got != "evt_010" { + t.Fatalf("EventsCursor = %q, want completed page cursor evt_010", got) + } + if checkpoint := syncer.state.IncrementalCheckpoint; checkpoint != nil { + t.Fatalf("expected checkpoint to clear after completed page, got %#v", checkpoint) + } + data, err := os.ReadFile(filepath.Join(localDir, "Docs", "010.md")) + if err != nil { + t.Fatalf("expected final file to apply on resume: %v", err) + } + if string(data) != "# 010" { + t.Fatalf("unexpected final file content: %q", data) + } +} + +func TestPullRemoteIncrementalCheckpointPreservesChangedPath404Delete(t *testing.T) { + files := map[string]RemoteFile{ + "/notion/Docs/002.md": { + Path: "/notion/Docs/002.md", + Revision: "rev_002", + ContentType: "text/markdown", + Content: "# 002", + }, + "/notion/Docs/003.md": { + Path: "/notion/Docs/003.md", + Revision: "rev_003", + ContentType: "text/markdown", + Content: "# 003", + }, + } + client := &fakeClient{ + files: files, + events: []FilesystemEvent{ + {EventID: "evt_001", Type: "file.updated", Path: "/notion/Docs/001.md", Revision: "rev_001"}, + {EventID: "evt_002", Type: "file.updated", Path: "/notion/Docs/002.md", Revision: "rev_002"}, + {EventID: "evt_003", Type: "file.updated", Path: "/notion/Docs/003.md", Revision: "rev_003"}, + }, + revisionCounter: 3, + eventCounter: 3, + readFileErrAfter: 2, + readFileErr: context.DeadlineExceeded, + } + localDir := t.TempDir() + localPath := filepath.Join(localDir, "Docs", "001.md") + if err := os.MkdirAll(filepath.Dir(localPath), 0o755); err != nil { + t.Fatalf("mkdir local doc dir: %v", err) + } + if err := os.WriteFile(localPath, []byte("# stale"), 0o644); err != nil { + t.Fatalf("write stale local file: %v", err) + } + syncer, err := NewSyncer(client, SyncerOptions{ + WorkspaceID: "ws_page_404_resume", + RemoteRoot: "/notion", + LocalRoot: localDir, + FullPullEvery: -1, + WebSocket: boolPtr(false), + CursorTimeout: time.Second, + BootstrapTimeout: time.Second, + }) + if err != nil { + t.Fatalf("new syncer failed: %v", err) + } + syncer.loaded = true + syncer.state = mountState{ + Files: map[string]trackedFile{ + "/notion/Docs/001.md": { + Revision: "rev_old", + ContentType: "text/markdown", + Hash: hashBytes([]byte("# stale")), + }, + }, + EventsCursor: "evt_000", + BootstrapComplete: true, + } + + err = syncer.Reconcile(context.Background()) + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("expected read-file deadline after checkpointed 404 delete, got %v", err) + } + if _, err := os.Stat(localPath); !errors.Is(err, os.ErrNotExist) { + t.Fatalf("expected 404 changed path to delete stale local file before timeout; stat err=%v", err) + } + if _, ok := syncer.state.Files["/notion/Docs/001.md"]; ok { + t.Fatalf("expected 404 changed path to be removed from tracked state") + } + if checkpoint := syncer.state.IncrementalCheckpoint; checkpoint == nil || + checkpoint.Phase != "changed" || + checkpoint.Path != "/notion/Docs/002.md" { + t.Fatalf("unexpected checkpoint after partial changed page with 404: %#v", checkpoint) + } + + before404Reads := client.readFileCallsByPath["/notion/Docs/001.md"] + client.readFileErr = nil + if err := syncer.Reconcile(context.Background()); err != nil { + t.Fatalf("reconcile after checkpointed 404 delete failed: %v", err) + } + if got := client.readFileCallsByPath["/notion/Docs/001.md"]; got != before404Reads { + t.Fatalf("expected deleted 404 path to be checkpointed and skipped on resume; read calls went %d -> %d", before404Reads, got) + } + if got := syncer.state.EventsCursor; got != "evt_003" { + t.Fatalf("EventsCursor = %q, want completed page cursor evt_003", got) + } + if checkpoint := syncer.state.IncrementalCheckpoint; checkpoint != nil { + t.Fatalf("expected checkpoint to clear after completed page, got %#v", checkpoint) + } +} + func TestScanLocalFilesLogsOversizedFileOncePerSize(t *testing.T) { t.Setenv("RELAYFILE_MAX_WRITEBACK_BYTES", "4")