Feat notification system#2472
Merged
Merged
Conversation
Adds a per-user SSE pipe (GET /api/events) plus a per-message chat-stream reconnect endpoint (GET /api/messages/<id>/events). Backend substrate: - application/events/ — durable journal (Redis Streams) + live pub/sub for user-scoped events, with publish_user_event() as the worker-side entrypoint. - application/streaming/ — broadcast_channel for pub/sub fanout and event_replay for the per-message snapshot+tail path. - application/storage/db/repositories/message_events.py + alembic 0007 — Postgres journal for chat-stream events. - application/worker.py — ingest/reingest/remote/connector/ attachment/mcp_oauth tasks publish queued/progress/completed/ failed envelopes alongside their existing status updates. Frontend client: - frontend/src/events/ — connect/reconnect, Last-Event-ID cursor, backoff with jitter. Each tab runs its own connection; no cross-tab dedup (future work). - frontend/src/notifications/ — recentEvents ring, cursor tracking, tool-approval toast. - frontend/src/upload/uploadSlice.ts — extraReducers for source.ingest.* and attachment.* events. Coverage: 132 SSE tests across events substrate, replay, journal, routes, and worker publishes.
frontend/src/components/MessageInput.tsx no longer runs a 2s setInterval against getTaskStatus for every processing attachment. The attachment.* SSE reducers in uploadSlice.ts are now the sole driver of attachment state transitions.
frontend/src/components/ConnectorTree.tsx now mirrors FileTree's
slice-walking pattern: it watches notifications.recentEvents
for source.ingest.{completed,failed} envelopes matching the
sync's source id, and no longer polls /task_status every 2s.
frontend/src/upload/Upload.tsx and frontend/src/components/FileTree.tsx no longer run getTaskStatus polling fallbacks. The source.ingest.* SSE reducers in uploadSlice.ts and FileTree's slice walk are now the sole drivers of upload/reingest state transitions.
application/worker.py::mcp_oauth now publishes authorization_url on the mcp.oauth.awaiting_redirect envelope. frontend/src/modals/MCPServerModal.tsx consumes it from SSE instead of polling /oauth_status/<task_id> every 1s. The URL is generated inside DocsGPTOAuth.redirect_handler when the FastMCP client triggers OAuth. The worker now plumbs a publish callback through tool_config -> MCPTool -> DocsGPTOAuth so the awaiting_redirect publish fires from inside the handler at the exact point the URL becomes known. The legacy Redis mcp_oauth_status setex writes and the GET /api/mcp_server/oauth_status/<task_id> endpoint are kept as belt-and-suspenders; nothing in the frontend reads them now.
application/worker.py::ingest_worker and remote_worker now publish ``limited: bool`` on the source.ingest.completed envelope. uploadSlice routes ``payload.limited === true`` to a failed status with a ``tokenLimitReached`` flag, and UploadToast surfaces the translated tokenLimit i18n string. No worker code path sets limited=true today; this is a forward-looking contract so when token-cap detection lands, the UX is already wired.
MCPOAuthManager.get_oauth_status now walks the per-user SSE Streams
journal (user:{user_id}:stream) for the latest mcp.oauth.* envelope
matching the task id, returning the status string derived from the
event type suffix and the payload fields. The worker is the single
source of truth — its publish_user_event calls write the same
record the SSE client receives live.
Removed:
- /api/mcp_server/oauth_status/<task_id> route in
application/api/user/tools/mcp.py
- mcp_oauth_status worker function and mcp_oauth_status_task Celery
wrapper
- All mcp_oauth_status:{task_id} Redis setex writes (4 in mcp_oauth,
2 in DocsGPTOAuth.redirect_handler / callback_handler)
- The update_status closure in mcp_oauth that wrote the polling
payload
Tests updated:
- get_oauth_status now takes (task_id, user_id); new coverage walks
a fake xrevrange response for the completed envelope, the no-match
case, and a Redis-down case
- Removed TestMCPOAuthStatus route tests and TestMcpOauthStatusTask
celery-wrapper test
- Removed the two oauth_status methods from the integration runner
mcp_oauth:auth_url/state/code/error Redis keys remain — they are
the OAuth flow's own state (not the dropped polling payload).
The /api/mcp_server/oauth_status/<task_id> endpoint was removed in the prior commit; the corresponding userService method and the MCP_OAUTH_STATUS endpoint constant had no remaining callers in the frontend, so they're deleted along with it.
application/events/publisher.py returned an envelope to live pubsub subscribers even when the XADD to the durable journal failed. The envelope had no ``id`` field, which bypassed the SSE route's dedup floor and broke ``Last-Event-ID`` semantics for any reconnecting client. Best-effort delivery means dropping consistently, not delivering inconsistent state. Now: if the journal write fails the publisher returns None and skips the live publish entirely.
Snapshot replay + live tail can both deliver the same id when the live pubsub frame and the replay XRANGE overlap. The route's own dedup floor catches the common case, but consumers walking ``recentEvents`` (FileTree, ConnectorTree, MCPServerModal, ToolApprovalToast) would otherwise act on the same envelope twice when a duplicate slipped through. Belt-and-suspenders: short-circuit when the most recent id in the ring matches the incoming one.
_allow_replay incremented the per-user counter on every /api/events GET, including no-op connects from a fresh client with no cursor against an empty backlog. React StrictMode dev double-mounts plus a few tabs trivially tripped the default 30-per-60s budget on idle reconnects. XLEN pre-check: when last_event_id is None and the user stream is empty, the connect can't do snapshot work — return True without INCR. Cursor-bearing connects still INCR unconditionally (probing the cursor's relationship to stream contents would require a redundant XRANGE).
Two related fixes to application/streaming/message_journal.py.
1. record_event now rejects non-dict payloads at the gate. The
live path (base.py::_emit) wrapped non-dicts as
{"value": payload}; the replay path in event_replay synthesized
{"type": event_type}. A reconnecting client would receive a
different envelope than the one originally streamed. Now both
paths see byte-identical envelopes because non-dicts can't be
journaled at all. The corresponding event_replay fallback is
replaced with a warn-and-skip for any legacy rows.
2. record_event handles IntegrityError on (message_id, sequence_no)
collisions by reading latest_sequence_no and retrying once with
latest+1. The most likely cause is a stale seq seed on a
continuation retry where the route read MAX(seq) from a
separate connection before another writer committed past it.
Previously the error was swallowed and the event silently
dropped from the journal; now it lands at the next available
seq. The live pubsub publish uses the materialised seq so the
journal row and the live frame agree.
complete_stream previously opened a fresh db_session() per yielded event, doing one Postgres INSERT + commit per chunk on the WSGI thread. Streaming answers emit ~100s of answer chunks per response, so the route was paying ~100 PG roundtrips per stream serialized on commit latency. New BatchedJournalWriter in application/streaming/message_journal.py accumulates rows per stream and flushes on three triggers: - size: buffer reaches 16 entries - time: 100ms elapsed since the last flush - lifecycle: close() at end-of-stream Live pubsub publishes still fire synchronously per record(), so subscribers see events in real time — only the durable journal write is amortized. On bulk INSERT IntegrityError the writer falls back to per-row record() with the existing seq+1 retry so a single colliding seq doesn't drop the rest of the batch. complete_stream wires journal_writer.close() into every exit path (happy end, tool-approval-paused end, GeneratorExit, error handler) so the terminal event is committed before the generator returns — otherwise a reconnecting client could snapshot up to the last flush boundary and live-tail waiting for an end that's still in memory. Repository gets bulk_record() — one SQLAlchemy executemany INSERT for the bulk path. All-or-nothing on collision (Postgres aborts the whole batch); the writer's per-row fallback handles recovery.
The lastEventAt field on UploadTask had no remaining consumers — the matching Attachment.lastEventAt was cleaned up earlier. Remove the field declaration and the slice write site.
After the polling-removal sweep no caller in frontend/src/ references userService.getTaskStatus or endpoints.USER.TASK_STATUS. The backend route /api/task_status itself stays — agents, webhooks, e2e specs, and the public docs still depend on it.
notification-channel-design.md, plan.md, and reminder-tool-design.md were leftover Claude planning artifacts from the SSE substrate work that landed accidentally. CLAUDE.md prohibits creating planning docs unless asked — delete them.
MessageEventsRepository.record accepts any JSONB-compatible value; the streaming wrapper record_event tightens this to dicts only because the live and replay paths reconstruct non-dict payloads differently. Spell the split out so the next reader of the repo method doesn't assume the wrapper's contract applies here.
stream_id_compare's lex-fallback branch was a footgun: a malformed id that sorts lex-greater than a real one would pin live-tail dedup forever, dropping every subsequent legitimate event silently. Both current callers in application/api/events/routes.py pre-validate inputs against _STREAM_ID_RE before calling, so changing the function to raise ValueError is a no-op on the happy path and turns the future- caller footgun into a loud failure.
Adds skipped-when-no-POSTGRES_URI and happy-path coverage for the Celery janitor. The skipped path returns the documented short-circuit shape without touching the repo. The happy path seeds a backdated row, runs the task against the pg_conn fixture, and asserts the retention window's row is deleted while in-window rows survive. Mirrors the TestCleanupPendingToolState pattern.
useMatch('/c/:conversationId') treats the literal URL /c/new as a
real conversation id, so the toast suppression check confused
'user is on /c/new' with 'user is on the conversation needing
approval'. Explicit guard: when the matched id is 'new', fall
through to the no-match case so approval toasts still surface.
The function returns Optional[str] today, with None conflating five distinct outcomes (missing args / push disabled / unserialisable / Redis down / XADD failed). Every current call site is fire-and- forget and ignores the return, so the right move is to document the five cases rather than promote to an enum return — keeps the API small while making the diagnostic surface (logs) obvious. If a future caller needs to react differently per reason, promote then.
application/api/user/sources/upload.py imported _derive_source_id from application.worker — pulling the entire Celery worker module into the API process at import time just for a two-line helper. Move DOCSGPT_INGEST_NAMESPACE and the derivation function to a new application/storage/db/source_ids.py module that both layers can import without that dependency edge. worker.py re-exports the old names (_derive_source_id, DOCSGPT_INGEST_NAMESPACE) for backward-compatible imports from tests and any other in-tree callers; new code should import from the new module directly.
Without health_check_interval, a half-open TCP socket (NAT silently dropped state, ELB idle-close) can leave pubsub.get_message hanging past the SSE generator's keepalive cadence — the kernel never surfaces the dead socket because no payload is in flight. Setting health_check_interval=10 makes redis-py ping every 10s when otherwise idle, so the next get_message after the dead window raises and the SSE loop falls into its reconnect path instead of silently freezing on the user.
…ogress The event-type taxonomy was inconsistent: source ingest emits source.ingest.progress (three segments) while attachments emitted attachment.processing.progress (four segments). Drops the .processing. infix for parity. Worker publish sites, the slice reducer's match, and the worker tests all flip together. No external consumers — the event type is purely internal between the publisher and the in-tab slice; safe to rename in one commit.
# Conflicts: # frontend/src/api/services/userService.ts # frontend/src/utils/providerUtils.ts
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
Contributor
There was a problem hiding this comment.
CodeQL found more than 20 potential problems in the proposed changes. Check the Files changed tab for more details.
| "SSE serving user_id='local' (AUTH_TYPE not set). " | ||
| "All clients on this deployment will share one event stream." | ||
| ) | ||
| _local_user_warned = True |
Comment on lines
+61
to
+64
| from application.storage.db.source_ids import ( # noqa: E402, F401 | ||
| DOCSGPT_INGEST_NAMESPACE, | ||
| derive_source_id as _derive_source_id, | ||
| ) |
| candidate | ||
| ): | ||
| event_id = candidate | ||
| except Exception: |
| try: | ||
| with pg_conn.begin_nested(): | ||
| repo.record(message_id, 0, "answer", {"chunk": "duplicate"}) | ||
| except IntegrityError: |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2472 +/- ##
==========================================
+ Coverage 91.34% 91.41% +0.07%
==========================================
Files 248 265 +17
Lines 20709 22622 +1913
==========================================
+ Hits 18916 20681 +1765
- Misses 1793 1941 +148 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What kind of change does this PR introduce? (Bug fix, feature, docs update, ...)
Why was this change needed? (You can also link to an open issue here)
Other information: