Skip to content

fix(code): break unrecoverable cloud log reconcile loop on stable deficiency#2284

Open
k11kirky wants to merge 4 commits into
posthog-code/debounce-write-local-logsfrom
posthog-code/bound-cloud-reconcile-loop
Open

fix(code): break unrecoverable cloud log reconcile loop on stable deficiency#2284
k11kirky wants to merge 4 commits into
posthog-code/debounce-write-local-logsfrom
posthog-code/bound-cloud-reconcile-loop

Conversation

@k11kirky
Copy link
Copy Markdown
Contributor

@k11kirky k11kirky commented May 21, 2026

Problem

Cloud log reconciliation could get stuck in an infinite loop when log files contained corrupted or unparseable lines. Because processedLineCount was never advanced past the gap, every new snapshot delta would re-trigger a reconcile that could never succeed — either because lines were permanently malformed (proven corruption) or because S3 simply wasn't catching up.

Changes

Introduced two early-exit conditions in the reconcile loop that commit a best-effort state and advance processedLineCount past the gap:

  1. Parse failures detected on first observation — if any lines in the fetched log fail JSON.parse, the corruption is permanent and S3 will never fix it. The reconcile breaks immediately rather than waiting.
  2. Same deficiency observed twice in a row — if a second reconcile produces the same (expectedCount, observedLineCount) pair as the previous one, S3 is not catching up. The reconcile breaks on the second observation.

In all other cases the deficiency is treated as transient lag and the reconcile waits for the next snapshot update as before.

A parseFailureCount field was added to ParsedSessionLogs so the reconcile handler can distinguish between "fewer lines than expected" and "lines exist but are corrupt." A cloudLogReconcileDeficiency map tracks the last observed deficiency per taskRunId and is cleaned up on session removal, watcher teardown, and full reset.

How did you test this?

Two new unit tests were added:

  • "breaks the reconcile loop on first observation when parse failures are present" — feeds a log with mixed valid and malformed JSON lines and asserts that processedLineCount is advanced to expectedCount on the first reconcile attempt.
  • "breaks the reconcile loop after a repeated stable deficiency" — feeds a log with fewer parseable lines than the server-reported count, fires two identical snapshot updates, and asserts that processedLineCount is only advanced after the second (repeated) observation.

Publish to changelog?

No

Copy link
Copy Markdown
Contributor Author

k11kirky commented May 21, 2026

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more

This stack of pull requests is managed by Graphite. Learn more about stacking.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 21, 2026

Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 1
apps/code/src/renderer/features/sessions/service/service.test.ts:2514-2660
**Duplicated test setup across two new cases**

The two new tests repeat an identical block (~25 lines) to create `service`, `existingSession`, set up all three store-setter mocks, start `watchCloudTask`, and extract `subscribeOptions`. Per the OnceAndOnlyOnce simplicity rule (and the project-wide preference for parameterised tests), this shared scaffolding should be extracted — either into a shared `beforeEach` / helper factory within this `describe` block, or, since the two scenarios diverge only in the log content and the number of `onData` invocations, as a single parameterised case using `it.each`.

Reviews (1): Last reviewed commit: "chore(code): trim verbose comments on cl..." | Re-trigger Greptile

Comment on lines 2514 to 2660
});

it("breaks the reconcile loop on first observation when parse failures are present", async () => {
const service = getSessionService();
const existingSession = createMockSession({
taskRunId: "run-123",
taskId: "task-123",
status: "connected",
isCloud: true,
logUrl: "https://logs.example.com/run-123",
processedLineCount: 5,
events: [],
});
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(
existingSession,
);
mockSessionStoreSetters.getSessions.mockReturnValue({
"run-123": existingSession,
});

const validLine = JSON.stringify({
type: "notification",
timestamp: "2024-01-01T00:00:00Z",
notification: { method: "session/update" },
});
const corruptedContent = [
...Array.from({ length: 8 }, () => validLine),
"}}not-json{{",
"{broken",
].join("\n");
mockTrpcLogs.readLocalLogs.query.mockResolvedValue(corruptedContent);
mockTrpcLogs.fetchS3Logs.query.mockResolvedValue(corruptedContent);

service.watchCloudTask(
"task-123",
"run-123",
"https://api.anthropic.com",
123,
);
const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock
.calls[0][1] as {
onData: (update: unknown) => void;
};

subscribeOptions.onData({
kind: "logs",
taskId: "task-123",
runId: "run-123",
totalEntryCount: 20,
newEntries: [
{
type: "notification",
timestamp: "2024-01-01T00:00:01Z",
notification: { method: "session/update" },
},
],
});

await vi.waitFor(() => {
expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith(
"run-123",
expect.objectContaining({ processedLineCount: 20 }),
);
});
});

it("breaks the reconcile loop after a repeated stable deficiency", async () => {
const service = getSessionService();
const existingSession = createMockSession({
taskRunId: "run-123",
taskId: "task-123",
status: "connected",
isCloud: true,
logUrl: "https://logs.example.com/run-123",
processedLineCount: 5,
events: [],
});
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(
existingSession,
);
mockSessionStoreSetters.getSessions.mockReturnValue({
"run-123": existingSession,
});

const storedLine = JSON.stringify({
type: "notification",
timestamp: "2024-01-01T00:00:00Z",
notification: { method: "session/update" },
});
mockTrpcLogs.readLocalLogs.query.mockResolvedValue(
Array.from({ length: 8 }, () => storedLine).join("\n"),
);
mockTrpcLogs.fetchS3Logs.query.mockResolvedValue(
Array.from({ length: 8 }, () => storedLine).join("\n"),
);

service.watchCloudTask(
"task-123",
"run-123",
"https://api.anthropic.com",
123,
);
const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock
.calls[0][1] as {
onData: (update: unknown) => void;
};

const newEntry = {
type: "notification",
timestamp: "2024-01-01T00:00:01Z",
notification: { method: "session/update" },
};

subscribeOptions.onData({
kind: "logs",
taskId: "task-123",
runId: "run-123",
totalEntryCount: 14,
newEntries: [newEntry],
});
await vi.waitFor(() => {
expect(mockTrpcLogs.fetchS3Logs.query).toHaveBeenCalledTimes(1);
});

expect(mockSessionStoreSetters.updateSession).not.toHaveBeenCalledWith(
"run-123",
expect.objectContaining({ processedLineCount: 14 }),
);

subscribeOptions.onData({
kind: "logs",
taskId: "task-123",
runId: "run-123",
totalEntryCount: 14,
newEntries: [newEntry],
});

await vi.waitFor(() => {
expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith(
"run-123",
expect.objectContaining({ processedLineCount: 14 }),
);
});
});

it("flips status to connected on _posthog/run_started", async () => {
const service = getSessionService();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Duplicated test setup across two new cases

The two new tests repeat an identical block (~25 lines) to create service, existingSession, set up all three store-setter mocks, start watchCloudTask, and extract subscribeOptions. Per the OnceAndOnlyOnce simplicity rule (and the project-wide preference for parameterised tests), this shared scaffolding should be extracted — either into a shared beforeEach / helper factory within this describe block, or, since the two scenarios diverge only in the log content and the number of onData invocations, as a single parameterised case using it.each.

Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/code/src/renderer/features/sessions/service/service.test.ts
Line: 2514-2660

Comment:
**Duplicated test setup across two new cases**

The two new tests repeat an identical block (~25 lines) to create `service`, `existingSession`, set up all three store-setter mocks, start `watchCloudTask`, and extract `subscribeOptions`. Per the OnceAndOnlyOnce simplicity rule (and the project-wide preference for parameterised tests), this shared scaffolding should be extracted — either into a shared `beforeEach` / helper factory within this `describe` block, or, since the two scenarios diverge only in the log content and the number of `onData` invocations, as a single parameterised case using `it.each`.

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

@k11kirky k11kirky force-pushed the posthog-code/bound-cloud-reconcile-loop branch from 21d3033 to 419607b Compare May 21, 2026 14:37
k11kirky added 4 commits May 22, 2026 13:57
…iciency

When the cloud-task gap-reconcile loop fetches and finds `totalLineCount <
expectedCount` (corruption — usually concatenated NDJSON records without
newlines), it used to warn and skip, leaving processedLineCount unchanged.
Every subsequent SSE snapshot then re-triggered another fetch + writeLocalLogs
because the snapshot delta stayed positive forever — this is the
"can't click cloud task" failure mode that survives layer 1's write
coalescer.

Track the last observed (expectedCount, observedLineCount) per taskRunId.
If a second reconcile produces the same pair, S3 is not catching up:
commit the parseable entries best-effort and advance processedLineCount
past the gap so the snapshot handler stops re-triggering. New higher
expectedCount values get one more retry before being treated as exhausted.

Also surface `parseFailureCount` from `parseLogContent` for visibility
in the inconsistency-warning log.

Generated-By: PostHog Code
Task-Id: 65171e71-4765-479c-b0c8-e4d0dd5c5bc7
Previously the loop-break required two reconciles producing identical
(expectedCount, observedLineCount) — brittle when S3 dribbled in a
single record between attempts. parseFailureCount > 0 is a stronger
signal: lines exist on disk that don't parse, so further fetches won't
recover them. Treat it as proof of corruption and commit best-effort on
the first observation.

Also clean up cloudLogReconcileDeficiency in stopCloudTaskWatch to match
the existing watcher teardown pattern.

Generated-By: PostHog Code
Task-Id: 65171e71-4765-479c-b0c8-e4d0dd5c5bc7
Generated-By: PostHog Code
Task-Id: 65171e71-4765-479c-b0c8-e4d0dd5c5bc7
Generated-By: PostHog Code
Task-Id: 65171e71-4765-479c-b0c8-e4d0dd5c5bc7
@k11kirky k11kirky force-pushed the posthog-code/debounce-write-local-logs branch from 8913168 to d7b92a8 Compare May 22, 2026 13:02
@k11kirky k11kirky force-pushed the posthog-code/bound-cloud-reconcile-loop branch from 419607b to 5d2948c Compare May 22, 2026 13:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants