Skip to content

feat(tasks): add task run event stream sender#2110

Open
tatoalo wants to merge 1 commit intomainfrom
feat/tasks-event-stream-sender
Open

feat(tasks): add task run event stream sender#2110
tatoalo wants to merge 1 commit intomainfrom
feat/tasks-event-stream-sender

Conversation

@tatoalo
Copy link
Copy Markdown
Contributor

@tatoalo tatoalo commented May 8, 2026

Problem

agent server needs a resilient sender for task run live events before it can deliver events directly to the backend. It should preserve event order, tolerate retries, and avoid making agent execution wait on each individual live event write.

Changes

  • added TaskRunEventStreamSender for NDJSON event ingest requests to PostHog BE
  • added sequence sync and rebase handling so retries can recover from already-accepted events
  • added batching, request timeouts, stop-time draining, completion handshakes, oversized-event drops, and bounded backpressure
  • added unit coverage tests for various scenarios

Copy link
Copy Markdown
Contributor Author

tatoalo commented May 8, 2026

This stack of pull requests is managed by Graphite. Learn more about stacking.

@tatoalo tatoalo marked this pull request as ready for review May 8, 2026 15:38
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 8, 2026

Prompt To Fix All With AI
Fix the following 3 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 3
packages/agent/src/server/event-stream-sender.test.ts:169-203
**Assertion inside mock is silently swallowed on failure**

`expect(init?.body).toBe("")` on line 172 lives inside the async mock. If the assertion throws, it rejects the promise returned by `fetchMock`, which is caught by `flush()`'s catch block and logged as a warning — the test still passes, masking the regression. The same pattern appears at lines 332–336 in "drains multiple capped batches on stop" where `expect(…X-PostHog-Event-Stream-Complete…).toBeUndefined()` is also inside the mock body. Move these assertions outside the mock (collect call args with `fetchMock.mock.calls` after `await sender.stop()` and assert there) to ensure vitest actually fails the test on violation.

### Issue 2 of 3
packages/agent/src/server/event-stream-sender.test.ts:205-304
**Prefer parameterised tests for drop scenarios**

"drops new events before assigning sequence when the buffer is full" (line 205) and "drops oversized events before assigning sequence" (line 254) test the identical observable behaviour — enqueue two events, the second is silently dropped, only the first reaches the server with `seq: 1`. The only difference is the drop trigger (`maxBufferedEvents: 1` vs `maxEventBytes: 120`). Per the team's rule, these should be a single `it.each` parameterised test sharing the mock and assertion logic rather than two separate near-identical blocks.

### Issue 3 of 3
packages/agent/src/server/event-stream-sender.ts:403-406
**Oversized-event size estimate is larger than the actual serialised size**

The size check serialises the envelope with `Number.MAX_SAFE_INTEGER` (16 digits) as the placeholder `seq`. The actual assigned sequence number will always be far smaller (e.g., 1 digit for `seq: 1`), so `eventBytes` can overestimate the real size by up to ~15 bytes. For events right at the `maxEventBytes` boundary this causes valid events to be incorrectly dropped. A safer upper bound would be the current `this.sequence + 1` (the next seq to be assigned), or a small fixed constant like `999_999` rather than `Number.MAX_SAFE_INTEGER`.

Reviews (1): Last reviewed commit: "feat(tasks): add task run event stream s..." | Re-trigger Greptile

Comment thread packages/agent/src/server/event-stream-sender.test.ts
Comment thread packages/agent/src/server/event-stream-sender.test.ts Outdated
Comment thread packages/agent/src/server/event-stream-sender.ts
@tatoalo tatoalo force-pushed the feat/tasks-event-stream-sender branch from ba262d3 to 70c5092 Compare May 8, 2026 15:50
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 8, 2026

Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 1
packages/agent/src/server/event-stream-sender.ts:204-219
**AbortController + timeout fetch pattern repeated three times**

The same `AbortController` / `setTimeout` / `try { await fetch } finally { clearTimeout }` block appears identically in `syncSequenceWithServer` (lines 310–325), `sendBufferedEvents` (lines 204–219), and `completeTransport` (lines 246–265). This violates the team's OnceAndOnlyOnce rule. A private helper such as `fetchWithTimeout(init: RequestInit): Promise<Response>` would centralize the pattern and make future changes (e.g. adjusting abort semantics) a single-site edit.

Reviews (2): Last reviewed commit: "feat(tasks): add task run event stream s..." | Re-trigger Greptile

@tatoalo tatoalo self-assigned this May 8, 2026
@tatoalo tatoalo requested a review from a team May 8, 2026 16:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant