Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 69 additions & 3 deletions internal/mountsync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ type Syncer struct {
type mountState struct {
Files map[string]trackedFile `json:"files"`
EventsCursor string `json:"eventsCursor,omitempty"`
IncrementalCheckpoint *incrementalCheckpoint `json:"incrementalCheckpoint,omitempty"`
LastReconcileAt string `json:"lastReconcileAt,omitempty"`
LastSuccessfulReconcileAt string `json:"lastSuccessfulReconcileAt,omitempty"`
LastEventAt string `json:"lastEventAt,omitempty"`
Expand All @@ -712,6 +713,13 @@ type mountState struct {
BootstrapStartedAt string `json:"bootstrapStartedAt,omitempty"`
}

type incrementalCheckpoint struct {
Cursor string `json:"cursor,omitempty"`
PageCursor string `json:"pageCursor,omitempty"`
Phase string `json:"phase,omitempty"`
Path string `json:"path,omitempty"`
}

// telemetryCounters tracks defensive-guard activity so operators can see at
// a glance whether the breaker has fired, oversized writebacks have been
// dropped, root-target denials have hit, etc.
Expand Down Expand Up @@ -2017,6 +2025,7 @@ func (s *Syncer) pullRemote(ctx context.Context, conflicted map[string]struct{})
// (advanced past). Replaying from the prior cursor is safe —
// applyRemoteFile is idempotent and will no-op when on-disk
// content already matches.
s.state.IncrementalCheckpoint = nil
return nil
}
feed, err := s.client.ListEvents(ctx, s.workspace, s.eventProvider, s.state.EventsCursor, 1)
Expand Down Expand Up @@ -2048,6 +2057,7 @@ func (s *Syncer) pullRemote(ctx context.Context, conflicted map[string]struct{})
}
s.logf("events feed unavailable; falling back to full pull")
s.state.EventsCursor = ""
s.state.IncrementalCheckpoint = nil
}

// Restart fast-path. When EventsCursor is empty but the state file
Expand Down Expand Up @@ -2100,6 +2110,7 @@ func (s *Syncer) pullRemote(ctx context.Context, conflicted map[string]struct{})
// rw_517d60b6 partial-mirror hazard (still gated on
// BootstrapComplete).
s.state.EventsCursor = cursor
s.state.IncrementalCheckpoint = nil
s.logf("restart fast-path: seeded events cursor %q from %d tracked files; skipping bootstrap full pull", cursor, len(s.state.Files))
return nil
}
Expand All @@ -2125,6 +2136,7 @@ func (s *Syncer) pullRemote(ctx context.Context, conflicted map[string]struct{})
if err := s.pullRemoteFull(bctx, conflicted, bprog); err != nil {
return err
}
s.state.IncrementalCheckpoint = nil
if s.wsConn != nil {
return nil
}
Expand Down Expand Up @@ -2855,6 +2867,7 @@ func (s *Syncer) pullRemoteIncremental(ctx context.Context, conflicted map[strin
safeCursor := currentCursor

for {
pageStartCursor := currentCursor
feed, err := s.client.ListEvents(ctx, s.workspace, s.eventProvider, currentCursor, 500)
if err != nil {
return safeCursor, err
Expand Down Expand Up @@ -2909,9 +2922,11 @@ func (s *Syncer) pullRemoteIncremental(ctx context.Context, conflicted map[strin
delete(changed, remotePath)
}
}
if err := s.applyIncrementalChanges(ctx, changed, deleted, conflicted); err != nil {
checkpoint := s.incrementalCheckpointForPage(pageStartCursor, pageCursor)
if err := s.applyIncrementalChanges(ctx, changed, deleted, conflicted, pageStartCursor, pageCursor, checkpoint); err != nil {
return safeCursor, err
}
s.state.IncrementalCheckpoint = nil
if pageLastEventAt != "" {
s.state.LastEventAt = pageLastEventAt
}
Expand All @@ -2934,32 +2949,70 @@ func (s *Syncer) pullRemoteIncremental(ctx context.Context, conflicted map[strin
return safeCursor, nil
}

func (s *Syncer) applyIncrementalChanges(ctx context.Context, changed, deleted map[string]struct{}, conflicted map[string]struct{}) error {
func (s *Syncer) incrementalCheckpointForPage(cursor, pageCursor string) incrementalCheckpoint {
if s.state.IncrementalCheckpoint == nil {
return incrementalCheckpoint{}
}
checkpoint := *s.state.IncrementalCheckpoint
checkpoint.Cursor = strings.TrimSpace(checkpoint.Cursor)
checkpoint.PageCursor = strings.TrimSpace(checkpoint.PageCursor)
checkpoint.Phase = strings.TrimSpace(checkpoint.Phase)
checkpoint.Path = normalizeRemotePath(checkpoint.Path)
if checkpoint.Cursor != strings.TrimSpace(cursor) || checkpoint.PageCursor != strings.TrimSpace(pageCursor) {
return incrementalCheckpoint{}
}
if checkpoint.Phase != "changed" && checkpoint.Phase != "deleted" {
return incrementalCheckpoint{}
}
if checkpoint.Path == "/" || strings.TrimSpace(checkpoint.Path) == "" {
return incrementalCheckpoint{}
}
return checkpoint
}

func (s *Syncer) applyIncrementalChanges(
ctx context.Context,
changed, deleted map[string]struct{},
conflicted map[string]struct{},
pageStartCursor, pageCursor string,
checkpoint incrementalCheckpoint,
) error {
changedPaths := make([]string, 0, len(changed))
for remotePath := range changed {
changedPaths = append(changedPaths, remotePath)
}
sort.Strings(changedPaths)
for _, remotePath := range changedPaths {
if checkpoint.Phase == "changed" && remotePath <= checkpoint.Path {
continue
}
if checkpoint.Phase == "deleted" {
continue
}
Comment on lines +2986 to +2991
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Checkpoint skip logic drops 404-to-delete transitions for paths sorting before the checkpoint

When resuming from a checkpoint with Phase="changed", paths that sort lexicographically before or equal to checkpoint.Path are skipped entirely (line 2986). However, in the original (failed) run, some of those skipped paths may have received a 404 from ReadFile and been added to the local deleted map (line 2996) — without updating the checkpoint (since the checkpoint only advances on successful applyRemoteFile). Because the delete phase (deletedPaths loop at line 3024) only runs AFTER the changed loop completes, and the changed loop errored out before completing, those 404-redirected deletes were never applied.

On resume, the skipped paths are never re-read via ReadFile, so they're never re-added to the deleted map. The local file persists as stale until the next periodic full pull.

Example scenario

changedPaths (sorted): [A, B, C, D]

  • A: ReadFile → 404 → added to deleted map, no checkpoint update
  • B: ReadFile → success → applyRemoteFile → checkpoint = {Phase: "changed", Path: B}
  • C: ReadFile → timeout → return error

On resume (checkpoint.Path = B):

  • A: skipped (A <= B) — never re-read, never added to deleted
  • B: skipped (B <= B)
  • C, D: processed normally
  • Delete phase: A is missing from the deleted set

A's local file persists indefinitely (until next full pull).

This is a regression: without checkpoints, the entire page was retried and the 404 would be re-encountered, properly routing the file to deletion.

Prompt for agents
In applyIncrementalChanges, when checkpoint.Phase=="changed" and we skip paths <= checkpoint.Path, files that got a 404 (added to the deleted map) in the original run but sort before the checkpoint are silently dropped. The checkpoint only advances on successful applyRemoteFile, so 404'd paths are never checkpointed and are lost on resume.

Possible fix approaches:
1. Record 404'd paths in the checkpoint itself (e.g. a list of paths that need deletion), so they can be replayed on resume.
2. Do NOT skip 404-vulnerable paths — only skip paths that were actually confirmed applied (i.e. paths that appear in s.state.Files with the expected revision from this cycle).
3. Accept the trade-off but document it, relying on the periodic full-pull cadence to self-heal.

The simplest fix might be to not skip paths in the changed loop that don't have a tracked entry matching the current cycle's revision — those are the ones that might have gotten 404 without being checkpointed. But this would require knowing which tracked files were updated in this cycle vs previously, which adds complexity.

Alternatively, for paths being skipped, check if they exist in s.state.Files with a current revision — if not, they may need to be re-read.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

file, err := s.client.ReadFile(ctx, s.workspace, remotePath)
if err != nil {
var httpErr *HTTPError
if errors.As(err, &httpErr) && httpErr.StatusCode == http.StatusNotFound {
deleted[remotePath] = struct{}{}
if err := s.applyRemoteDelete(remotePath, conflicted); err != nil {
return err
}
s.markIncrementalCheckpoint(pageStartCursor, pageCursor, "changed", remotePath)
continue
}
if errors.As(err, &httpErr) && httpErr.StatusCode == http.StatusForbidden {
s.logf("skipping denied file: %s", remotePath)
if markErr := s.markReadDenied(remotePath); markErr != nil {
return markErr
}
s.markIncrementalCheckpoint(pageStartCursor, pageCursor, "changed", remotePath)
continue
}
return err
}
if err := s.applyRemoteFile(remotePath, file, conflicted); err != nil {
return err
}
s.markIncrementalCheckpoint(pageStartCursor, pageCursor, "changed", remotePath)
}

deletedPaths := make([]string, 0, len(deleted))
Expand All @@ -2968,13 +3021,26 @@ func (s *Syncer) applyIncrementalChanges(ctx context.Context, changed, deleted m
}
sort.Strings(deletedPaths)
for _, remotePath := range deletedPaths {
if checkpoint.Phase == "deleted" && remotePath <= checkpoint.Path {
continue
}
if err := s.applyRemoteDelete(remotePath, conflicted); err != nil {
return err
}
s.markIncrementalCheckpoint(pageStartCursor, pageCursor, "deleted", remotePath)
}
return nil
}

func (s *Syncer) markIncrementalCheckpoint(pageStartCursor, pageCursor, phase, remotePath string) {
s.state.IncrementalCheckpoint = &incrementalCheckpoint{
Cursor: strings.TrimSpace(pageStartCursor),
PageCursor: strings.TrimSpace(pageCursor),
Phase: strings.TrimSpace(phase),
Path: normalizeRemotePath(remotePath),
}
}

func (s *Syncer) resolveLatestEventCursor(ctx context.Context) (string, error) {
// Derive an OWN short deadline from rootCtx so a slow/hanging events
// feed can never wedge an otherwise healthy cycle (and is independent
Expand Down
199 changes: 199 additions & 0 deletions internal/mountsync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3456,6 +3456,8 @@ type fakeClient struct {
eventsUnsupported bool
listEventsErrAfter int
listEventsErr error
readFileErrAfter int
readFileErr error
}

// requestedReadCalls returns the cumulative number of ReadFile calls made
Expand Down Expand Up @@ -3574,6 +3576,9 @@ func (c *fakeClient) ReadFile(ctx context.Context, workspaceID, path string) (Re
c.readFileCallsByPath = make(map[string]int)
}
c.readFileCallsByPath[path]++
if c.readFileErr != nil && c.readFileCalls > c.readFileErrAfter {
return RemoteFile{}, c.readFileErr
}
file, ok := c.files[path]
if !ok {
return RemoteFile{}, &HTTPError{StatusCode: 404, Code: "not_found", Message: "not found"}
Expand Down Expand Up @@ -4626,6 +4631,200 @@ func TestPullRemoteIncrementalPersistsAppliedPageCursorOnListEventsError(t *test
}
}

func TestPullRemoteIncrementalResumesWithinAppliedPage(t *testing.T) {
files := map[string]RemoteFile{}
events := make([]FilesystemEvent, 0, 10)
for i := 1; i <= 10; i++ {
remotePath := fmt.Sprintf("/notion/Docs/%03d.md", i)
revision := fmt.Sprintf("rev_%03d", i)
files[remotePath] = RemoteFile{
Path: remotePath,
Revision: revision,
ContentType: "text/markdown",
Content: fmt.Sprintf("# %03d", i),
}
events = append(events, FilesystemEvent{
EventID: fmt.Sprintf("evt_%03d", i),
Type: "file.created",
Path: remotePath,
Revision: revision,
})
}
client := &fakeClient{
files: files,
events: events,
revisionCounter: 10,
eventCounter: 10,
readFileErrAfter: 3,
readFileErr: context.DeadlineExceeded,
}
localDir := t.TempDir()
syncer, err := NewSyncer(client, SyncerOptions{
WorkspaceID: "ws_page_resume",
RemoteRoot: "/notion",
LocalRoot: localDir,
FullPullEvery: -1,
WebSocket: boolPtr(false),
CursorTimeout: time.Second,
BootstrapTimeout: time.Second,
})
if err != nil {
t.Fatalf("new syncer failed: %v", err)
}
syncer.loaded = true
syncer.state = mountState{
Files: map[string]trackedFile{},
EventsCursor: "evt_000",
BootstrapComplete: true,
}

err = syncer.Reconcile(context.Background())
if !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("expected read-file deadline inside first page, got %v", err)
}
if got := syncer.state.EventsCursor; got != "evt_000" {
t.Fatalf("EventsCursor should stay on the unapplied page cursor; got %q", got)
}
if syncer.state.IncrementalCheckpoint == nil {
t.Fatalf("expected checkpoint after partial page")
}
if checkpoint := *syncer.state.IncrementalCheckpoint; checkpoint.Cursor != "evt_000" ||
checkpoint.PageCursor != "evt_010" ||
checkpoint.Phase != "changed" ||
checkpoint.Path != "/notion/Docs/003.md" {
t.Fatalf("unexpected checkpoint after partial page: %#v", checkpoint)
}
for i := 1; i <= 3; i++ {
path := filepath.Join(localDir, "Docs", fmt.Sprintf("%03d.md", i))
if _, err := os.ReadFile(path); err != nil {
t.Fatalf("expected file %03d to be applied before timeout: %v", i, err)
}
}

client.readFileErr = nil
before := make(map[string]int, len(client.readFileCallsByPath))
for path, calls := range client.readFileCallsByPath {
before[path] = calls
}
if err := syncer.Reconcile(context.Background()); err != nil {
t.Fatalf("reconcile after partial page failed: %v", err)
}
for i := 1; i <= 3; i++ {
remotePath := fmt.Sprintf("/notion/Docs/%03d.md", i)
if got := client.readFileCallsByPath[remotePath]; got != before[remotePath] {
t.Fatalf("expected %s to be skipped on resume; read calls went %d -> %d", remotePath, before[remotePath], got)
}
}
if got := client.readFileCallsByPath["/notion/Docs/004.md"]; got == 0 {
t.Fatalf("expected resume to continue after checkpoint path")
}
if got := syncer.state.EventsCursor; got != "evt_010" {
t.Fatalf("EventsCursor = %q, want completed page cursor evt_010", got)
}
if checkpoint := syncer.state.IncrementalCheckpoint; checkpoint != nil {
t.Fatalf("expected checkpoint to clear after completed page, got %#v", checkpoint)
}
data, err := os.ReadFile(filepath.Join(localDir, "Docs", "010.md"))
if err != nil {
t.Fatalf("expected final file to apply on resume: %v", err)
}
if string(data) != "# 010" {
t.Fatalf("unexpected final file content: %q", data)
}
}

func TestPullRemoteIncrementalCheckpointPreservesChangedPath404Delete(t *testing.T) {
files := map[string]RemoteFile{
"/notion/Docs/002.md": {
Path: "/notion/Docs/002.md",
Revision: "rev_002",
ContentType: "text/markdown",
Content: "# 002",
},
"/notion/Docs/003.md": {
Path: "/notion/Docs/003.md",
Revision: "rev_003",
ContentType: "text/markdown",
Content: "# 003",
},
}
client := &fakeClient{
files: files,
events: []FilesystemEvent{
{EventID: "evt_001", Type: "file.updated", Path: "/notion/Docs/001.md", Revision: "rev_001"},
{EventID: "evt_002", Type: "file.updated", Path: "/notion/Docs/002.md", Revision: "rev_002"},
{EventID: "evt_003", Type: "file.updated", Path: "/notion/Docs/003.md", Revision: "rev_003"},
},
revisionCounter: 3,
eventCounter: 3,
readFileErrAfter: 2,
readFileErr: context.DeadlineExceeded,
}
localDir := t.TempDir()
localPath := filepath.Join(localDir, "Docs", "001.md")
if err := os.MkdirAll(filepath.Dir(localPath), 0o755); err != nil {
t.Fatalf("mkdir local doc dir: %v", err)
}
if err := os.WriteFile(localPath, []byte("# stale"), 0o644); err != nil {
t.Fatalf("write stale local file: %v", err)
}
syncer, err := NewSyncer(client, SyncerOptions{
WorkspaceID: "ws_page_404_resume",
RemoteRoot: "/notion",
LocalRoot: localDir,
FullPullEvery: -1,
WebSocket: boolPtr(false),
CursorTimeout: time.Second,
BootstrapTimeout: time.Second,
})
if err != nil {
t.Fatalf("new syncer failed: %v", err)
}
syncer.loaded = true
syncer.state = mountState{
Files: map[string]trackedFile{
"/notion/Docs/001.md": {
Revision: "rev_old",
ContentType: "text/markdown",
Hash: hashBytes([]byte("# stale")),
},
},
EventsCursor: "evt_000",
BootstrapComplete: true,
}

err = syncer.Reconcile(context.Background())
if !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("expected read-file deadline after checkpointed 404 delete, got %v", err)
}
if _, err := os.Stat(localPath); !errors.Is(err, os.ErrNotExist) {
t.Fatalf("expected 404 changed path to delete stale local file before timeout; stat err=%v", err)
}
if _, ok := syncer.state.Files["/notion/Docs/001.md"]; ok {
t.Fatalf("expected 404 changed path to be removed from tracked state")
}
if checkpoint := syncer.state.IncrementalCheckpoint; checkpoint == nil ||
checkpoint.Phase != "changed" ||
checkpoint.Path != "/notion/Docs/002.md" {
t.Fatalf("unexpected checkpoint after partial changed page with 404: %#v", checkpoint)
}

before404Reads := client.readFileCallsByPath["/notion/Docs/001.md"]
client.readFileErr = nil
if err := syncer.Reconcile(context.Background()); err != nil {
t.Fatalf("reconcile after checkpointed 404 delete failed: %v", err)
}
if got := client.readFileCallsByPath["/notion/Docs/001.md"]; got != before404Reads {
t.Fatalf("expected deleted 404 path to be checkpointed and skipped on resume; read calls went %d -> %d", before404Reads, got)
}
if got := syncer.state.EventsCursor; got != "evt_003" {
t.Fatalf("EventsCursor = %q, want completed page cursor evt_003", got)
}
if checkpoint := syncer.state.IncrementalCheckpoint; checkpoint != nil {
t.Fatalf("expected checkpoint to clear after completed page, got %#v", checkpoint)
}
}

func TestScanLocalFilesLogsOversizedFileOncePerSize(t *testing.T) {
t.Setenv("RELAYFILE_MAX_WRITEBACK_BYTES", "4")

Expand Down
Loading