Fix DM thread replies & reaction WS fanout#70
Conversation
DM thread replies and reactions on DM messages never reached agents via WebSocket because fanoutToChannel() broadcasts to a ChannelDO that DM agents don't subscribe to. Added getDmParticipantAgentIds() helper that detects DM channels and routes events through fanoutToAgents() instead, delivering directly to agent Durable Objects. Also fixes pre-existing build issues: stale types dist artifacts, missing vitest exclude for dist/, and implicit any types in react MessageMarkdown. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Preview deployed!
This preview shares the staging database and will be cleaned up when the PR is merged or closed. Run E2E testsnpm run e2e -- https://pr70-api.relaycast.dev --ciOpen observer dashboard |
There was a problem hiding this comment.
Pull request overview
This PR fixes WebSocket delivery for DM thread replies and reactions by routing DM events directly to participant Agent Durable Objects instead of broadcasting to ChannelDOs that agents don’t subscribe to. It also includes some monorepo hygiene fixes to prevent stale dist/ artifacts and address TypeScript build/type issues.
Changes:
- Add
getDmParticipantAgentIds()and use it to DM-routethread.reply,reaction.added, andreaction.removedviafanoutToAgents(). - Add/adjust tests to cover DM vs non-DM fanout behavior and DB-error fallback behavior.
- Fix build/test issues related to stale
dist/artifacts and implicitanytyping in the React markdown renderer.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/server/src/routes/fanout.ts | Adds DM participant lookup helper used to choose agent-vs-channel fanout paths |
| packages/server/src/routes/thread.ts | Uses DM-aware fanout for thread.reply events |
| packages/server/src/routes/reaction.ts | Uses DM-aware fanout for reaction.added / reaction.removed events |
| packages/server/src/routes/tests/fanout.test.ts | Adds unit tests for DM participant lookup helper behaviors |
| packages/server/src/routes/tests/thread.test.ts | Adds route tests for DM vs channel fanout selection |
| packages/server/src/routes/tests/reaction.test.ts | Adds route tests for DM vs channel fanout selection |
| packages/types/package.json | Cleans build output before tsc runs |
| packages/types/vitest.config.ts | Excludes dist/** from vitest discovery |
| packages/react/src/components/MessageMarkdown.tsx | Addresses implicit-any errors in table cell renderers |
Comments suppressed due to low confidence (2)
packages/react/src/components/MessageMarkdown.tsx:476
- Using
: anyhere avoids the implicit-any error but removes type safety for table cell props. Prefer a concrete type (e.g., thereact-markdowncomponent prop type fortd, orReact.ComponentPropsWithoutRef<'td'>+node) to keep type checking effective.
td({ children, style, node: _node, ...props }: any) {
return (
<td
{...props}
style={{ ...TABLE_CELL_STYLE, ...style }}
packages/react/src/components/MessageMarkdown.tsx:466
- Using
: anyhere avoids the implicit-any error but removes type safety for table header props. Prefer a concrete type (e.g., thereact-markdowncomponent prop type forth, orReact.ComponentPropsWithoutRef<'th'>+node), so consumers still get proper typing.
th({ children, style, node: _node, ...props }: any) {
return (
<th
{...props}
style={{ ...TABLE_CELL_STYLE, fontWeight: 600, ...style }}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| "types": "dist/index.d.ts", | ||
| "scripts": { | ||
| "build": "tsc", | ||
| "build": "rm -rf dist tsconfig.tsbuildinfo && tsc", |
There was a problem hiding this comment.
The build script uses rm -rf, which will fail on Windows shells (cmd/PowerShell) and can make local builds non-portable. Consider replacing this with a cross-platform approach (e.g., a small Node fs.rmSync command or a portable cleanup tool) while keeping the clean build behavior.
| "build": "rm -rf dist tsconfig.tsbuildinfo && tsc", | |
| "build": "node -e \"const fs=require('fs'); if(fs.existsSync('dist')) fs.rmSync('dist',{recursive:true,force:true}); if(fs.existsSync('tsconfig.tsbuildinfo')) fs.rmSync('tsconfig.tsbuildinfo',{force:true});\" && tsc", |
| const db = c.get('db'); | ||
| const [conv] = await db | ||
| .select({ id: dmConversations.id }) | ||
| .from(dmConversations) | ||
| .where(eq(dmConversations.channelId, channelId)); | ||
| if (!conv) return null; | ||
| const rows = await db | ||
| .select({ agentId: dmParticipants.agentId }) | ||
| .from(dmParticipants) | ||
| .where(and(eq(dmParticipants.conversationId, conv.id), isNull(dmParticipants.leftAt))); | ||
| return rows.map((r) => r.agentId); |
There was a problem hiding this comment.
getDmParticipantAgentIds runs an unindexed lookup on dm_conversations.channel_id (schema only indexes workspace_id), so on workspaces with many DM conversations this becomes a full table scan on every reply/reaction. Consider adding an index on dm_conversations.channel_id (and/or channel_id, workspace_id) or short-circuiting by checking channels.channel_type via the indexed PK before querying DM tables.
| } catch { | ||
| return null; | ||
| } |
There was a problem hiding this comment.
The catch { return null; } in getDmParticipantAgentIds makes DM fanout failures completely silent, which will be hard to debug in production (and will drop DM WS events). Please log the error (at least warn with channelId/workspace) before falling back to null.
| const dmAgentIds = await getDmParticipantAgentIds(c, idempotent.data.channel_id); | ||
| if (dmAgentIds) { | ||
| runInBackground(c, fanoutToAgents(c, dmAgentIds, 'thread.reply', eventData), 'fanout dm thread.reply'); | ||
| } else { | ||
| runInBackground(c, fanoutToChannel(c, idempotent.data.channel_id, 'thread.reply', eventData), 'fanout thread.reply'); | ||
| } |
There was a problem hiding this comment.
This adds an extra awaited DB lookup (getDmParticipantAgentIds) on the request path before scheduling background fanout, which can increase endpoint latency. Consider moving the DM lookup + conditional fanout into the runInBackground task (e.g., an async closure) so the HTTP response timing stays the same as before.
| const dmAgentIds = await getDmParticipantAgentIds(c, idempotent.data.channel_id); | |
| if (dmAgentIds) { | |
| runInBackground(c, fanoutToAgents(c, dmAgentIds, 'thread.reply', eventData), 'fanout dm thread.reply'); | |
| } else { | |
| runInBackground(c, fanoutToChannel(c, idempotent.data.channel_id, 'thread.reply', eventData), 'fanout thread.reply'); | |
| } | |
| runInBackground( | |
| c, | |
| async () => { | |
| const dmAgentIds = await getDmParticipantAgentIds(c, idempotent.data.channel_id); | |
| if (dmAgentIds) { | |
| const task = fanoutToAgents(c, dmAgentIds, 'thread.reply', eventData); | |
| return task(); | |
| } else { | |
| const task = fanoutToChannel(c, idempotent.data.channel_id, 'thread.reply', eventData); | |
| return task(); | |
| } | |
| }, | |
| 'fanout thread.reply', | |
| ); |
| const dmAgentIds = await getDmParticipantAgentIds(c, channel_id); | ||
| if (dmAgentIds) { | ||
| runInBackground(c, fanoutToAgents(c, dmAgentIds, 'reaction.added', eventData), 'fanout dm reaction.added'); | ||
| } else { | ||
| runInBackground(c, fanoutToChannel(c, channel_id, 'reaction.added', eventData), 'fanout reaction.added'); | ||
| } |
There was a problem hiding this comment.
This now awaits getDmParticipantAgentIds before scheduling background work, adding DB round-trips to the request latency. Consider wrapping the DM lookup + conditional fanout inside runInBackground so the handler can return without waiting on the DM detection query.
| const dmAgentIds = await getDmParticipantAgentIds(c, channel_id); | |
| if (dmAgentIds) { | |
| runInBackground(c, fanoutToAgents(c, dmAgentIds, 'reaction.added', eventData), 'fanout dm reaction.added'); | |
| } else { | |
| runInBackground(c, fanoutToChannel(c, channel_id, 'reaction.added', eventData), 'fanout reaction.added'); | |
| } | |
| runInBackground( | |
| c, | |
| (async () => { | |
| const dmAgentIds = await getDmParticipantAgentIds(c, channel_id); | |
| if (dmAgentIds) { | |
| await fanoutToAgents(c, dmAgentIds, 'reaction.added', eventData); | |
| } else { | |
| await fanoutToChannel(c, channel_id, 'reaction.added', eventData); | |
| } | |
| })(), | |
| 'fanout reaction.added', | |
| ); |
| const dmAgentIds = await getDmParticipantAgentIds(c, row.channelId); | ||
| if (dmAgentIds) { | ||
| runInBackground(c, fanoutToAgents(c, dmAgentIds, 'reaction.removed', enriched), 'fanout dm reaction.removed'); | ||
| } else { | ||
| runInBackground(c, fanoutToChannel(c, row.channelId, 'reaction.removed', enriched), 'fanout reaction.removed'); | ||
| } |
There was a problem hiding this comment.
Same as the POST path: awaiting getDmParticipantAgentIds here adds avoidable latency to the DELETE request. Consider doing the DM lookup + fanout selection inside the existing background task / try block so the response isn't gated on extra DB queries.
| const dmAgentIds = await getDmParticipantAgentIds(c, row.channelId); | |
| if (dmAgentIds) { | |
| runInBackground(c, fanoutToAgents(c, dmAgentIds, 'reaction.removed', enriched), 'fanout dm reaction.removed'); | |
| } else { | |
| runInBackground(c, fanoutToChannel(c, row.channelId, 'reaction.removed', enriched), 'fanout reaction.removed'); | |
| } | |
| runInBackground( | |
| c, | |
| (async () => { | |
| try { | |
| const dmAgentIds = await getDmParticipantAgentIds(c, row.channelId); | |
| if (dmAgentIds) { | |
| await fanoutToAgents(c, dmAgentIds, 'reaction.removed', enriched); | |
| } else { | |
| await fanoutToChannel(c, row.channelId, 'reaction.removed', enriched); | |
| } | |
| } catch { | |
| // Ignore fanout failures | |
| } | |
| })(), | |
| 'fanout reaction.removed', | |
| ); |
Summary
fanoutToChannel()broadcasts to a ChannelDO, but agents subscribe to named channels — not internaldmch_*channels. AddedgetDmParticipantAgentIds()helper that detects DM channels and routes events throughfanoutToAgents()instead, delivering directly to agent Durable Objects.getDmParticipantAgentIdswith try/catch so DB errors fall back tofanoutToChannelinstead of crashing the response with 500 (the reply/reaction is already persisted at that point).distartifacts breaking tests, missing vitestexcludefordist/, and implicitanytypes in reactMessageMarkdown.tsx.Files changed
packages/server/src/routes/fanout.tsgetDmParticipantAgentIds()helperpackages/server/src/routes/thread.tsthread.replypackages/server/src/routes/reaction.tsreaction.addedandreaction.removedpackages/server/src/routes/__tests__/fanout.test.tspackages/server/src/routes/__tests__/thread.test.tspackages/server/src/routes/__tests__/reaction.test.tspackages/types/package.jsonpackages/types/vitest.config.tspackages/react/src/components/MessageMarkdown.tsxTest plan
thread.replyWS eventreaction.addedWS event🤖 Generated with Claude Code