feat(teams): native streaming for DMs via emit (vercel/chat#416)#88
feat(teams): native streaming for DMs via emit (vercel/chat#416)#88patrick-chinchill wants to merge 3 commits intomainfrom
Conversation
Port upstream `feat(adapter-teams): native streaming for DMs via emit` (commit ed46bae). DM threads now dispatch chunks through the Bot Framework streaming protocol (typing activities with `channelData.streamType=streaming` + `streamSequence`, then a final `message` activity with `streamType=final`) instead of routing through `Thread._fallback_stream`. Group chats / channels accumulate the stream and post a single message — matching upstream's post-#416 behavior of avoiding the post+edit flicker where Teams doesn't support native streaming. Lifecycle: - `_handle_message_activity` registers a `_TeamsStreamSession` for DMs and `await`s the chat handler so the session stays alive through `stream()`. The handler is fire-and-forget for non-DM threads. - `_stream_via_emit` emits cumulative text per chunk; `streamId` is captured from the first send and threaded through subsequent chunks (Hazard #7 — the first chunk omits the key entirely). - `_close_stream_session` posts the final activity. Skipped on cancellation or zero-chunk streams (no orphan finalizers). - Iterator exceptions cancel the session and re-raise; transient send failures (e.g. 429 mid-stream) cancel the session and return the partial RawMessage instead of bubbling up. Tests: `tests/test_teams_native_streaming.py` covers wire-format invariants, dispatch decisions, cancellation paths, error mid-stream, very-short streams, two concurrent DMs, and the end-to-end webhook → process_message → stream → close lifecycle. Existing `TestStream` cases in `test_teams_coverage.py` and `test_teams_extended.py` updated to reflect the new accumulate-and-post semantics for group chats. Non-parity: updated existing "Fallback streaming" rows in `docs/UPSTREAM_SYNC.md` to scope the divergence to non-Teams adapters, and added two new rows documenting Teams-specific divergences (group chats now accumulate-and-post; native streaming soft-cancels on send failure). https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Plus Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request implements native streaming for Microsoft Teams DMs using the Bot Framework protocol, while transitioning group chats and channels to an accumulate-and-post strategy to eliminate UI flicker. It introduces a session management system for tracking in-flight DM streams and includes comprehensive test coverage for the new streaming lifecycle. Feedback suggests simplifying the stream finalization logic by removing a redundant check for empty content, as the presence of a stream ID already implies that content was sent.
| """ | ||
| if session.canceled: | ||
| return | ||
| if session.stream_id is None or not session._text: # noqa: SLF001 |
There was a problem hiding this comment.
The check not session._text is redundant here. session.stream_id is only set when a chunk with content is sent, which also guarantees session._text will be non-empty. Therefore, checking for session.stream_id is None is sufficient.
| if session.stream_id is None or not session._text: # noqa: SLF001 | |
| if session.stream_id is None: |
Address gemini-code-assist review on PR #88 (line 1218). ``session.stream_id`` is only assigned after a successful chunk send, and empty chunks are skipped before the send call — so ``stream_id is not None`` implies ``_text`` is non-empty. Drop the redundant ``not session._text`` check; the single condition expresses the invariant more clearly. Behavior unchanged. Tests still pass. https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
patrick-chinchill
left a comment
There was a problem hiding this comment.
Code review — feat(teams): native streaming for DMs via emit (vercel/chat#416)
Compared HEAD (58b7fc7, including the follow-up _text-check refactor) to upstream ed46bae at f55378a (packages/adapter-teams/src/index.ts). Walked the porting hazards (especially #5/#7) and the SELF_REVIEW adversarial checks (pass-interaction, emit/parse symmetry, sentinels).
✅ Looks good
- Hazard #7 (omit vs
None) is honored: the first chunk'schannelDatais constructed without astreamIdkey (if session.stream_id is not None: channel_data["streamId"] = ...), andtest_first_chunk_omits_stream_idpins the wire shape withassert "streamId" not in first_payload["channelData"]. streamSequencestarts at 1 and increments per emitted chunk (session.sequence += 1before building the payload, after the empty-skip), matching the Bot Framework protocol and pinned bytest_subsequent_chunks_carry_stream_id_and_increment_sequence.- DM detection matches upstream byte-for-byte — both Python
is_dmand TSisDMusenot conversationId.startsWith("19:")(notconversationType == "personal"as the brief implied). Group-chat IDs (19:...@thread.skype) correctly fall through to accumulate-and-post. - Invariant in the follow-up refactor holds:
stream_idis only set after a successful_teams_sendAND only whenresult.get("id")is non-empty (line 1174–1178); empty chunks are skipped beforeaccumulated += text. Sostream_id is not None ⇒ _text non-empty. Thenot session._textcheck was indeed redundant. - Cleanup-path safety: the finally in
_handle_message_activityusesif current is sessionbefore popping (so a concurrent re-registration on the same DM thread isn't clobbered), and_close_stream_sessionis wrapped in try/except so its failure can't leak the registry entry. Cancel/iterator-exception paths both callsession.cancel()before re-raising, so_close_stream_sessioncorrectly skips the final activity. - Soft-cancel-on-send-failure divergence is documented in
docs/UPSTREAM_SYNC.mdnon-parity table with rationale and tested bytest_emit_send_failure_cancels_session(verifies no exception bubbles,result.raw["text"]carries pre-failure content). - Group-chat accumulate-and-post is a parity behavior with upstream post-#416, not a divergence — the doc table row marks it "no divergence at the adapter level" for clarity, which is the right framing.
🟡 Medium
- Concurrent-DM test doesn't exercise
_active_streamsrace —test_two_concurrent_dm_streams_have_independent_sessionsconstructs two sessions for two distinct thread IDs and calls_stream_via_emitdirectly with explicit session args, bypassing_active_streamsentirely. The realistic race (two near-simultaneous webhooks for the same DM thread overwriting_active_streams[tid]) isn't covered. This race exists in upstream too (activeStreams.set(threadId, ctx.stream)overwrites identically), so it's parity behavior — but the test name oversells what's being verified. Suggest renaming or adding a same-thread test that drives two_handle_message_activitycalls underasyncio.gather. - First-chunk send returns
id="": if Teams accepts the typing activity but returns an empty id,session.stream_idstaysNone(line 1177 guard),_textbecomes non-empty, and_close_stream_sessionskips because of the (now-tightened)stream_id is Nonecheck. Net effect: user sees streamed chunks but no finalmessageactivity — the streaming UI stays running until Teams times it out client-side. Upstream has the same shape (messageId = "") but would still attempt the final send. Worth a one-line comment at the close-skip site explaining this, and ideally a test pinning the empty-idbehavior.
🔵 Nit
_chained_wait_untilinvokesupstream_wait_until(task)before_resolve_processing(task). If the caller-suppliedwait_untilraises, the exception propagates back throughprocess_message, hits the adapter's outer try, and falls intofinally(no hang —await processing_doneisn't reached). That's fine, but reversing the order — resolve first, then call upstream — would make the deadlock-immunity argument trivially obvious to a future reader._TeamsStreamSession._textis read from outside the class viasession._textwith# noqa: SLF001— a publictextattribute (or method) would be cleaner since the leading underscore isn't really enforcing privacy.
Posted by an automated reviewer agent. https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
Generated by Claude Code
- Add a same-thread concurrent _handle_message_activity test that exercises the realistic _active_streams race (two near-simultaneous DM webhooks for the same thread). Pins upstream's plain-Map clobber semantics: the second registration overwrites the first, both in-flight handlers observe the later session, and the registry ends empty after both finish. The original distinct-threads test is renamed to make clear it covers session ISOLATION, not the registry race. - Empty-id final-send fallback: when Teams accepts streaming chunks but returns id="" on the first activity, _close_stream_session now ships the final message anyway (omitting streamId from channelData) instead of skipping and leaving the streaming UI spinning until Teams times the session out client-side. Mirrors upstream's looser check (text non-empty → ship the final). Adds a regression test and a non-parity row in docs/UPSTREAM_SYNC.md. - _chained_wait_until: resolve our internal processing_done gate BEFORE invoking the caller-supplied waitUntil, so the deadlock-immunity argument is trivially obvious (a misbehaving upstream callback can't starve the await on processing_done). - _TeamsStreamSession: add a public read-only `text` property so external callers (now _close_stream_session) read through it instead of the underscore-prefixed _text attribute. _stream_via_emit retains the direct _text write as the canonical mutator.
patrick-chinchill
left a comment
There was a problem hiding this comment.
Re-review of fixes in 2e96fbb
Verified the four fixes against ed46bae (vercel/chat#416) and the broader adapter surface.
Verified — fixes land cleanly
test_same_thread_concurrent_handlers_clobber_active_stream(tests/test_teams_native_streaming.py:722) — drives two_handle_message_activitytasks for the same thread under a barrier, snapshots the registry post-overlap, and pins[second_session, second_session]. Matches upstream'sMapclobber semantics. The renamedtest_distinct_dm_threads_each_have_isolated_session_statemakes the isolation-vs-race split explicit.- Empty-
idfinal-send (adapter.py:1239–1271, test 317–362) —_close_stream_sessionnow ships the finalmessagewhentextis non-empty even ifstream_id is None, omittingstreamIdfromchannelData(rather than serializingNone— Hazard #7). Regression test asserts"streamId" not in final_payload["channelData"]. Non-parity row at UPSTREAM_SYNC.md:492 is accurate. _chained_wait_untilorder (adapter.py:462–471) —_resolve_processing(task)runs beforeupstream_wait_until(task), with a comment explaining the deadlock-immunity argument.- Public
textproperty (adapter.py:95–103, 1243, 1260) — read-onlytextaccessor exposed; reads in_close_stream_sessiongo through it; only the canonical mutator at adapter.py:1216 retains# noqa: SLF001. Tests still poke_textdirectly (lines 176, 300) — fine, they're inside the test module's own laxer scope and lint-clean.
Findings
Nit (parity gap, pre-existing across multiple adapters)
_stream_via_emit and the accumulate-and-post fallback both check isinstance(chunk, dict) and chunk.get("type") == "markdown_text" (adapter.py:1081, 1139). A real MarkdownTextChunk dataclass instance silently drops to "" because isinstance(MarkdownTextChunk(...), dict) is False. Other adapters (google_chat, whatsapp, slack, github) use hasattr(chunk, "type") for this. Same gap exists in discord/linear, so out of scope here, but worth a follow-up issue — the StreamChunk union in types.py:798 is dataclass-typed.
Nit (divergence not in UPSTREAM_SYNC.md)
_chained_wait_until resolves the internal gate before invoking the caller-supplied wait_until. Upstream TS does the opposite (baseOptions?.waitUntil?.(task) first, then task.then(...)). Functionally equivalent (both register callbacks; neither blocks), but it IS a deliberate ordering divergence and the commit message frames it as a hardening defense — consider a one-line entry in the non-parity table so future syncs don't "correct" it back.
Nit (forward-looking)
chained_options = WebhookOptions(wait_until=_chained_wait_until) (adapter.py:473) drops any other fields that may exist on the inbound options. Today WebhookOptions only has wait_until, so no observable bug — but upstream uses {...baseOptions, waitUntil: ...} for a reason. A replace(options, wait_until=...) (or explicit comment "WebhookOptions has only wait_until — re-spread if it grows") would future-proof.
Hunt results — no other issues
- Wire format:
streamSequence(1-indexed, ++ per emit),streamType("streaming"/"final"),streamId(omitted on first chunk + when server returned empty id), parallelstreaminfoentity — all match the Bot Framework streaming protocol that upstream'sIStreameremits under the hood. - Cleanup:
try/finallyin_handle_message_activitymirrors upstream'stry { await processingDone } finally { activeStreams.delete };asyncio.CancelledErrorcancels the session and re-raises (covered bytest_cancelled_error_propagates_and_marks_session_canceled). - DM detection:
is_dmchecks onlynot conversationId.startswith("19:")— matchesthread-id.ts:30exactly. - Test fidelity: upstream PR #416 added zero new
it()blocks inindex.test.ts; only assertion swaps inreplay-streaming.test.ts. Python's 23 native-streaming tests + 159 teams-streaming-related tests all pass; no missing TS coverage. - Hazard #5: no untracked tasks in the new code.
process_messagetracks via_active_tasks,_chained_wait_untilonly callsadd_done_callback, no barecreate_task/ensure_future.
Re-review verdict: PASS
The three nits are real but non-blocking (one is pre-existing, one is hardening with no observable behavior change, one is forward-looking). Fixes for the previous round all landed correctly.
Posted by an automated re-reviewer agent. https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
Generated by Claude Code
Final upstream-coverage audit before merging the 7 sync PRs (#84-#90) identified one undocumented N/A item: vercel/chat#415 (Teams SDK 2.0.8 + User-Agent) is a JS-only botbuilder dependency bump. The Python Teams adapter uses raw aiohttp (no botbuilder), so there is no equivalent dependency to bump. The optional User-Agent: Vercel.ChatSDK header on the ~9 outbound aiohttp call sites is a defense-in-depth nice-to-have; deferred as a follow-up rather than landed in this sync. Updates: - CHANGELOG.md: tick all completed items and link them to their PRs (#84, #85, #86, #87, #88, #89, #90, plus already-merged PR #74). Document #415 inline as N/A. - docs/UPSTREAM_SYNC.md non-parity table: add row for Teams User-Agent header divergence so future syncers don't try to "port" the JS bump. Item #6 (concurrency.maxConcurrent) is already implementation-covered in the Python port (existing divergence row at L492). The 4 new TS concurrency tests in chat.test.ts have Python-specific equivalents at test_chat_faithful.py L2969-3055 that don't name-match — leaving as deferred fidelity-baseline polish since the behavior is verified. Verdict from the coverage audit: all 18 substantive ports across PRs #84-#90 are upstream-verified. No commits in chat@4.26.0..f55378a were missed. Ready to start merging. https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
Summary
Ports upstream vercel/chat#416 (commit
ed46bae) —feat(adapter-teams): native streaming for DMs via emit— into the Python Teams adapter.DM threads now dispatch chunks through the Bot Framework streaming protocol (typing activities with
channelData.streamType=streaming+streamSequence, then a finalmessageactivity withstreamType=final) instead of routing throughThread._fallback_stream. Group chats / channels accumulate the stream and post a single message — matching upstream's post-#416 behavior of avoiding the post+edit flicker where Teams doesn't support native streaming.Lifecycle (hazard #5: async task lifecycle)
_handle_message_activityregisters a_TeamsStreamSessionfor DMs andawaits the chat handler so the session stays alive throughstream(). The handler is fire-and-forget for non-DM threads._stream_via_emitemits cumulative text per chunk;streamIdis captured from the first send and threaded through subsequent chunks (hazard fix: launch must-fix items — security, perf, docs #7 — the first chunk omits the key entirely)._close_stream_sessionposts the final activity. Skipped on cancellation or zero-chunk streams (no orphan finalizers).RawMessageinstead of bubbling up.Non-parity updates
docs/UPSTREAM_SYNC.md: scoped existing "Fallback streaming" rows to non-Teams adapters, and added two new Teams-specific divergences:RawMessagerather than re-raising).Tests
tests/test_teams_native_streaming.py(665 LOC, new) covers:channelData.streamType/streamSequenceshape, omitted vs presentstreamId).streamId/ sequence).process_message→stream()→_close_stream_sessionlifecycle.Existing
TestStreamcases intest_teams_coverage.pyandtest_teams_extended.pyupdated for the new accumulate-and-post semantics on group chats.Test plan
uv run ruff check src/ tests/ scripts/uv run ruff format --check src/ tests/ scripts/uv run python scripts/audit_test_quality.pyuv run pytest tests/ --tb=short -qUpstream ref: vercel/chat#416 (commit
ed46bae)https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
Generated by Claude Code