feat(wizard): stream session updates via SSE#59109
Merged
Merged
Conversation
Contributor
Author
This stack of pull requests is managed by Graphite. Learn more about stacking. |
This was referenced May 19, 2026
Contributor
Prompt To Fix All With AIFix the following 2 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 2
products/wizard/backend/presentation/views.py:155-161
**Subscribe-after-read race condition**
`get_latest` is called before `subscribe` establishes the channel. Any upsert published in the window between these two calls is permanently lost: the initial snapshot is already stale and the subscription won't deliver the missed event. A client could connect, receive an outdated snapshot, and then only see events from after the subscription was set up — silently skipping the intermediate state.
The standard fix is to subscribe *first*, then snapshot, then emit the snapshot. Because the subscription buffer captures any publish that arrives after `subscribe()` but before `get_latest()` returns, there is no gap.
```suggestion
with subscribe(team_id, workflow_id, skill_id) as pubsub:
latest = wizard_facade.get_latest(team_id, workflow_id, skill_id)
if latest is not None:
yield _format_event(latest)
last_heartbeat = time.monotonic()
while True:
```
### Issue 2 of 2
products/wizard/backend/presentation/views.py:180-187
**Duplicated `_json_default` logic (OnceAndOnlyOnce)**
The `_json_default` function is defined verbatim in both `pubsub.py` (lines 67-74) and here. If the serialisation rules ever change (e.g. a new type is added to `WizardSessionDTO`), both copies need updating in sync. Move the definition to `pubsub.py` and import it in `views.py`.
Reviews (1): Last reviewed commit: "feat: add sse" | Re-trigger Greptile |
Contributor
|
Size Change: 0 B Total Size: 80.2 MB ℹ️ View Unchanged
|
c01635b to
f572a33
Compare
e2ed16e to
e539444
Compare
5bbf102 to
b74977d
Compare
2 tasks
b74977d to
65d1428
Compare
65d1428 to
305fb81
Compare
c5c97e8 to
454a249
Compare
305fb81 to
2097f27
Compare
2097f27 to
498eae7
Compare
rafaeelaudibert
approved these changes
May 21, 2026
MattBro
reviewed
May 21, 2026
gewenyu99
approved these changes
May 22, 2026
Contributor
gewenyu99
left a comment
There was a problem hiding this comment.
Do you think we're missing timeout logic here? If a connection times out, do we/should we do something to update the session to mark it stale, put it in error state, etc?
| """Yield SSE-formatted bytes for a wizard session subscription. | ||
|
|
||
| If `skill_id` is provided, scope to that exact pair. Otherwise pattern- | ||
| subscribe to all skills under (team, workflow_id). |
Contributor
There was a problem hiding this comment.
👀 Is it necessary to do this? I think we'll always have a workflow-skill pair. What would be the need for subscribing to multiple skills?
I think if we handle, say, monorepos, it would be with nesting skills under a parent workflow + skill pair.
4123d15 to
0253a6b
Compare
62f639e to
72a6e19
Compare
dcc38d2 to
e75ddc2
Compare
184276f to
d9d8928
Compare
54adf92 to
c1792f7
Compare
The stream action's ASGI gateway check ran before the workflow_id presence check, so a missing workflow_id returned 500 (RuntimeError) under the DRF test client (which runs the WSGI handler) instead of the expected 400. Run input validation first so the response code reflects the request error regardless of gateway, and keep the ASGI guard as a safety net for the async generator. Generated-By: PostHog Code Task-Id: dbb0c85c-f333-4333-a692-71dbc0cfdcc3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.

Problem
The REST endpoint from #59062 lets the CLI push state, but the web app would have to poll to see updates. The RFC recommends SSE. This PR adds the SSE fanout so a connected client gets pushes in real time, regardless of which workflow / framework the wizard is running.
Changes
GET /api/projects/{id}/wizard_sessions/stream/?workflow_id=…returns atext/event-streamconnection.skill_idis optional — when omitted, the server pattern-subscribes to every skill under that workflow, so the web app doesn't need to know the framework up front.async defand dispatches blocking Redis calls throughsync_to_async. Chunks reach the browser as they're produced.transaction.on_commitso subscribers never see uncommitted state and rolled-back upserts don't broadcast phantom updates. Workflow-only subscribers use RedisPSUBSCRIBEwith a channel pattern; bothmessageandpmessagetypes are forwarded.team_scopere-entry inside the generator. Streaming runs after the view returns on a sync worker thread without the request's thread-local team scope, so any DB access inside the generator is wrapped explicitly.EventStreamRendererregistered on the stream action. DRF's content negotiation 406sAccept: text/event-streamotherwise; the renderer is never invoked to render (the response is aStreamingHttpResponse), it just satisfies negotiation.: pingheartbeats every 15s so proxies don't time them out.How did you test this code?
@pytest.mark.django_db(transaction=True)wheretransaction.on_commitsemantics need to fire.Agent-authored.
Publish to changelog?
no
Docs update
n/a — internal endpoint, no public docs yet.
🤖 Agent context
Claude Code (Opus 4.7, 1M context). Notable decisions:
workflow_id="posthog-integration"with skill keyed to the detected framework (laravel,nextjs,django, …). The frontend doesn't know the framework up front, so subscribing by workflow alone removes a chicken-and-egg problem.