diff --git a/AGENTS.md b/AGENTS.md index 751939528..236f18bfa 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -152,9 +152,11 @@ and milestone verification gates. keeps filesystem IO, worker runtime, `PathManager`, and port adapters until a reviewed runtime migration exists. - Remote-connect port baselines live in `bitfun-runtime-ports` and - `bitfun-services-integrations`; `AgentSubmissionPort` still rejects generic - attachments until image/multimodal equivalence tests and a runtime migration - plan are reviewed. + `bitfun-services-integrations`; tracker state and tracker event reduction + belong in `bitfun-services-integrations`, while core still owns dispatcher + assembly, session restore, terminal pre-warm, and product execution routing. + `AgentSubmissionPort` still rejects generic attachments until + image/multimodal equivalence tests and a runtime migration plan are reviewed. ### DeepReview guardrails diff --git a/docs/architecture/core-decomposition.md b/docs/architecture/core-decomposition.md index 018f2da60..d73a54688 100644 --- a/docs/architecture/core-decomposition.md +++ b/docs/architecture/core-decomposition.md @@ -69,7 +69,7 @@ Rust 编译和链接面。 | `bitfun-agent-tools` | 轻量 tool DTO / contract、runtime restriction、generic registry / provider container | partial:product manifest、`ToolUseContext`、`GetToolSpec` 和 concrete tools 仍在 core | | `bitfun-tool-packs` | 由 feature group 隔离的具体工具实现 | target/scaffold:不得声明 concrete tools 已迁移 | | `bitfun-services-core` | Config、session、workspace、storage、filesystem、system services | partial:部分 pure helper 已迁出;config/workspace/filesystem runtime 多数仍在 core | -| `bitfun-services-integrations` | Git、MCP、remote SSH、remote connect、file watch integrations | partial:MCP runtime 已迁入;remote SSH / remote-connect 仍只迁移低风险 contracts/helpers | +| `bitfun-services-integrations` | Git、MCP、remote SSH、remote connect、file watch integrations | partial:MCP runtime 已迁入;remote SSH 仍只迁移低风险 contracts/helpers;remote-connect 已拥有 wire DTO、request builder、tracker state 与 tracker event reduction,dispatcher/product execution 仍在 core | | `bitfun-product-domains` | Miniapp 和 function-agent 产品子域 | partial:pure decision、port、storage layout 可迁入;IO、worker、Git/AI service runtime 仍在 core | | `terminal-core` | 已有 terminal package,移动到 workspace 顶层 `src/crates/terminal` 路径 | done:已在 workspace 顶层 | | `tool-runtime` | 已有 tool runtime,移动到 workspace 顶层路径 | done:已在 workspace 顶层 | @@ -97,6 +97,9 @@ owner 边界,否则不要把一个 feature group 继续拆成更小的 crate - remote runtime port baseline 当前只提供契约和 core-owned adapter:`AgentSubmissionPort` 仍拒绝 generic attachments;remote image DTO、turn cancellation、remote state 和 event facts 不等于 remote-connect runtime 或多模态执行路径已经迁移。 +- remote-connect tracker state 可由 `bitfun-services-integrations` 拥有;core 只保留 + `EventSubscriber` adapter、global dispatcher、session restore、terminal pre-warm 和实际 + dialog submission routing。不要把 tracker state 回写到 core。 - `bitfun-core-types` 不得依赖 runtime manager、service crate、agent runtime、 app crate、Tauri、network client、process execution,或 `git2`、`rmcp`、`image`、 `tokio-tungstenite` 等重集成依赖。 diff --git a/docs/plans/core-decomposition-plan.md b/docs/plans/core-decomposition-plan.md index f31b86fea..e0dafbf73 100644 --- a/docs/plans/core-decomposition-plan.md +++ b/docs/plans/core-decomposition-plan.md @@ -975,13 +975,14 @@ product-full = ["git", "mcp", "remote-ssh", "remote-connect", "announcement", "f - [x] 先迁移 `announcement` 的纯 types contract,scheduler / state store / content loader / remote fetch 仍保留在 core。 - [x] 先完成 `remote-connect` contract slice:remote chat/image/tool/session wire DTO 与 relay/bot session/submission request builder 由 `bitfun-services-integrations` 拥有,relay/bot session 创建通过 `AgentSubmissionPort`。 - [x] 已补齐 remote runtime 迁移前的第一层 port baseline:`SessionTranscriptReader`、`AgentTurnCancellationPort`、`RemoteControlStatePort`、`RuntimeEventSink` 与 remote image attachment/request DTO;完整 `remote-connect` runtime 仍需后续单独迁移并补 queue/event/image 行为等价测试。 +- [x] `RemoteSessionStateTracker`、`TrackerEvent` 与 remote tool preview slimming helper 已迁入 `bitfun-services-integrations`;core 只保留 `EventSubscriber` adapter、dispatcher、session restore、terminal pre-warm 与实际 dialog submission routing。 - [x] 已迁移的集成能力保持 core 旧路径 re-export。 - [x] 产品完整 runtime 通过 `services-integrations/product-full` 启用已迁移集成能力。 -**当前安全迁移状态(2026-05-13):** +**当前安全迁移状态(2026-05-15):** - 已迁移到 `bitfun-services-integrations`:`service::file_watch`,通过 `file-watch` / `product-full` feature 启用,并保持 `core::service::file_watch` 旧路径。 -- `git` 已完成 DTO/params/graph/raw command output/text parser/arg builder、`GitError`、`GitService` runtime implementation 与 git utils 迁移;`bitfun-core::service::git::*` 仅保留 legacy facade re-export。`remote-ssh` 已迁移纯 contract/type、workspace path/identity helper 与 unresolved-session-key helper;SSH runtime manager / fs / terminal、password vault 与 PathManager-backed session mirror assembly 仍保留在 core。`mcp` 已迁移 tool-name / tool-info / protocol types / config location / server type-status、server config、cursor-format、JSON-RPC request builder、JSON config format/validation helper、config merge / remote authorization helper、OAuth credential vault / authorization bootstrap contract、remote auth error classifier、legacy remote header fallback helper、transport Authorization 归一化 helper、remote client capability helper、rmcp 到 BitFun protocol 的纯映射 helper、resource/prompt adapter、catalog cache、list-changed/reconnect policy、config service save-load orchestration、server process / local-remote transport lifecycle、dynamic tool descriptor / provider / result rendering helper,并用 owner crate contract test 锁定 wire shape、transport default、validation message、Cursor 兼容格式、config precedence / dedup 语义、OAuth vault 存储路径注入、NeedsAuth 分类、旧 env Authorization fallback、remote client capabilities、remote result metadata / structured content 映射、config load/save/delete contract、unsupported remote transport contract、context resource selection 和 dynamic manifest;`bitfun-core` 继续负责 core `ConfigService` store adapter、OAuth data-dir 注入、`BitFunError` 映射、legacy facade 和全局 tool registry / manifest 组装。`announcement` 仅迁移了纯 types contract,scheduler / state store / content loader / remote fetch 仍保留在 core;`remote-connect` 已完成 contract/request-builder slice,并补齐 cancellation/state/event/image 第一层 port baseline,但远程消息执行、image context 真正接入、tracker event、terminal pre-warm 与 workspace/session restore 仍保留在 core。它们涉及 SSH runtime、remote agent submission runtime、product tool manifest/exposure owner 化与 announcement config/path 边界,继续前需要单独确认端口方案与等价性测试。 +- `git` 已完成 DTO/params/graph/raw command output/text parser/arg builder、`GitError`、`GitService` runtime implementation 与 git utils 迁移;`bitfun-core::service::git::*` 仅保留 legacy facade re-export。`remote-ssh` 已迁移纯 contract/type、workspace path/identity helper 与 unresolved-session-key helper;SSH runtime manager / fs / terminal、password vault 与 PathManager-backed session mirror assembly 仍保留在 core。`mcp` 已迁移 tool-name / tool-info / protocol types / config location / server type-status、server config、cursor-format、JSON-RPC request builder、JSON config format/validation helper、config merge / remote authorization helper、OAuth credential vault / authorization bootstrap contract、remote auth error classifier、legacy remote header fallback helper、transport Authorization 归一化 helper、remote client capability helper、rmcp 到 BitFun protocol 的纯映射 helper、resource/prompt adapter、catalog cache、list-changed/reconnect policy、config service save-load orchestration、server process / local-remote transport lifecycle、dynamic tool descriptor / provider / result rendering helper,并用 owner crate contract test 锁定 wire shape、transport default、validation message、Cursor 兼容格式、config precedence / dedup 语义、OAuth vault 存储路径注入、NeedsAuth 分类、旧 env Authorization fallback、remote client capabilities、remote result metadata / structured content 映射、config load/save/delete contract、unsupported remote transport contract、context resource selection 和 dynamic manifest;`bitfun-core` 继续负责 core `ConfigService` store adapter、OAuth data-dir 注入、`BitFunError` 映射、legacy facade 和全局 tool registry / manifest 组装。`announcement` 仅迁移了纯 types contract,scheduler / state store / content loader / remote fetch 仍保留在 core;`remote-connect` 已完成 contract/request-builder slice,补齐 cancellation/state/event/image 第一层 port baseline,并迁出 tracker state / tracker event reduction / remote tool preview slimming helper,但远程消息执行、image context 真正接入、terminal pre-warm 与 workspace/session restore 仍保留在 core。它们涉及 SSH runtime、remote agent submission runtime、product tool manifest/exposure owner 化与 announcement config/path 边界,继续前需要单独确认端口方案与等价性测试。 - 最新主干的 Deep Review capacity / cost / queue、context profile、evidence ledger、session manifest、stream dedupe、search remote/fallback 与 session rollback persistence 仍属于 core runtime 或对应产品 runtime,不在本轮 `services-integrations` 迁移范围内;如果后续迁移 remote-connect / MCP / search / session,需要先定义运行状态 port 合约和等价测试。 **验证:** @@ -1614,7 +1615,7 @@ P2 后产品表面契约轨道(contract-only): - 保留边界:`bitfun-core` 只保留 core `ConfigService` store adapter、OAuth data-dir 注入、`BitFunError` 映射、legacy facade 和与全局 tool registry / manifest 的组装调用;配置写入、OAuth、SSE/session 与 registry / manifest 行为不得在本 PR 中改变。 - 后续切片:MCP concrete tool integration / product registry / manifest assembly 继续保留 dynamic provider metadata、工具清单顺序、expanded/collapsed exposure 和 snapshot wrapper 等价测试。 - 文档校正:P2 后补充文档中的 MCP runtime step 已由本 PR2 闭环;后续 MCP 相关工作只保留 concrete tool implementation 迁移或 product registry / manifest assembly,不再重复迁移 config/process/transport lifecycle。 -3. 已完成:remote-connect port baseline:产品表面 DTO、remote chat/image/tool/session wire DTO、relay/bot session/submission request builder、remote image attachment/request DTO、`AgentTurnCancellationPort`、`RemoteControlStatePort` 和 `RuntimeEventSink` 已具备 owner/port 契约;完整 remote runtime 仍保留在 core。 +3. 已完成:remote-connect tracker owner slice:产品表面 DTO、remote chat/image/tool/session wire DTO、relay/bot session/submission request builder、remote image attachment/request DTO、`AgentTurnCancellationPort`、`RemoteControlStatePort`、`RuntimeEventSink`、`RemoteSessionStateTracker` 和 `TrackerEvent` 已具备 owner/port 契约;core 仍保留 dispatcher/product execution。 - 后续切片:完整 remote-connect runtime 迁移必须单独执行,且先补 queue/event/image/cancel 等价测试,不得混入 tool/provider owner 化。 4. 已完成本轮可提交闭环:agent tools + `tool-packs` owner 化低风险部分。纯 tool contract/provider metadata、runtime restriction DTO、path resolution DTO、generic tool registry / dynamic provider container 已迁入 `bitfun-agent-tools`,并为 dynamic provider contract 提供 `agent-tools` 兼容 re-export;core tool runtime 保留产品完整工具列表、snapshot decorator、`dyn Tool` 适配、tool exposure / manifest resolution 和 `GetToolSpec` 按需工具说明发现。`ToolUseContext`、tool manifest/exposure 与 concrete tool implementation 按 feature group 外移需要新的 port/provider 设计,必须保持 builtin/readonly/dynamic manifest、expanded/collapsed exposure、prompt stub、unlock state、snapshot wrapping、runtime restrictions、cancellation 与 Deep Review tool flow 等价,作为后续高风险迁移单独审视。 5. `product-domains` runtime + core facade finalization:迁移 miniapp runtime/compiler/builtin 与 function-agent 运行逻辑,最后把 `bitfun-core` 收敛为 facade + product runtime assembly;不在本 PR 中修改 `bitfun-core default = []` 或 per-product feature matrix。 @@ -1722,7 +1723,7 @@ git diff -- package.json scripts/dev.cjs scripts/desktop-tauri-build.mjs scripts 11. 已提交:PR 2 `Services/Product Runtime Owner Closure`,收口 remote-SSH session identity / mirror path / unresolved-session layout 与 MiniApp storage file layout owner;core 保留 `PathManager` 注入、SSH manager、remote FS / terminal、MiniApp filesystem IO 和 worker runtime。 12. 历史已完成:MCP runtime 与 dynamic tools;已迁移 config service orchestration、server process / transport lifecycle、adapter、dynamic tool/resource/prompt provider,core 保留 ConfigService store adapter、OAuth data-dir 注入、BitFunError 映射、legacy facade 和 product registry / manifest assembly。 13. P2 后前置轨道:产品表面 contract-only 补强,可在后续 PR 第一组提交中处理;只允许 DTO/port、round-trip/no-op tests 和 boundary check,不实现 CLI/Desktop/Remote/ACP UI 或命令变更。 -14. 已完成:remote-connect port baseline:产品表面 DTO 已以 contract-only 方式进入 `bitfun-core-types`;`bitfun-services-integrations` 的 `remote-connect` feature 拥有 remote chat/image/tool/session wire DTO、relay/bot session/submission request builder 与 remote image attachment/request DTO;relay/bot 创建 session 通过 `AgentSubmissionPort`,取消、远程状态读取和事件事实已有 `runtime-ports` 契约。远程消息执行、image context 真正接入、tracker event、terminal pre-warm 与 workspace/session restore 仍保留在 `bitfun-core` product runtime assembly。 +14. 已完成:remote-connect tracker owner slice:产品表面 DTO 已以 contract-only 方式进入 `bitfun-core-types`;`bitfun-services-integrations` 的 `remote-connect` feature 拥有 remote chat/image/tool/session wire DTO、relay/bot session/submission request builder、remote image attachment/request DTO、tracker state 与 tracker event reduction;relay/bot 创建 session 通过 `AgentSubmissionPort`,取消、远程状态读取和事件事实已有 `runtime-ports` 契约。远程消息执行、image context 真正接入、terminal pre-warm 与 workspace/session restore 仍保留在 `bitfun-core` product runtime assembly。 15. 已完成:agent tools + `tool-packs` owner 化低风险闭环;tool contract / DTO、runtime restriction、path resolution、generic registry / dynamic provider container 已归属 `bitfun-agent-tools`,core 保留产品工具列表、snapshot decorator、`ToolUseContext` 和 concrete tool implementation,后续外移需单独 port/provider 设计。 16. 已完成:关键语义回归 baseline,不移动 runtime owner。覆盖 MCP config failure / catalog invalidation / 既有 list-changed helper / dynamic manifest、tool manifest / `GetToolSpec`、product-domains adapter equivalence、remote workspace search fallback 的 focused tests 或 snapshots。 17. 下一步:remote-connect runtime 迁移前快照。先基于当前 port baseline 记录 remote session restore、active turn、cancel、image context、tracker event、queue/event fanout 的输入输出和验证命令;证明等价后再移动 runtime owner,不能把 generic attachment guard 当作已接入多模态行为。 diff --git a/scripts/check-core-boundaries.mjs b/scripts/check-core-boundaries.mjs index 43b04ce27..532beaa12 100644 --- a/scripts/check-core-boundaries.mjs +++ b/scripts/check-core-boundaries.mjs @@ -822,6 +822,26 @@ const forbiddenContentRules = [ regex: /\bpub struct SessionInfo\b/, message: 'core remote-connect server must not redefine session info DTOs; use the integrations contract', }, + { + regex: /\bstruct TrackerState\b/, + message: 'core remote-connect server must not own tracker state; use the integrations tracker', + }, + { + regex: /\bpub enum TrackerEvent\b/, + message: 'core remote-connect server must not redefine tracker events; use the integrations tracker', + }, + { + regex: /\bpub struct RemoteSessionStateTracker\b/, + message: 'core remote-connect server must not own tracker state; use the integrations tracker', + }, + { + regex: /\bfn make_slim_params\b/, + message: 'core remote-connect server must not own remote tool preview slimming; use the integrations helper', + }, + { + regex: /\bmatch mobile_type\b/, + message: 'core remote-connect server must not own remote agent type alias mapping; use the integrations helper', + }, ], }, { @@ -1807,6 +1827,16 @@ function runManifestParserSelfTest() { 'generic attachments', ], }, + { + path: 'src/crates/services-integrations/src/remote_connect.rs', + contracts: [ + 'RemoteSessionStateTracker', + 'TrackerEvent', + 'make_slim_tool_params', + 'handle_agentic_event', + 'resolve_remote_agent_type', + ], + }, { path: 'src/crates/core/src/agentic/tools/registry.rs', contracts: ['register_all_tools', 'GetToolSpecTool', 'get_collapsed_tool_names'], @@ -2227,6 +2257,11 @@ function runManifestParserSelfTest() { 'RemoteToolStatus', 'ActiveTurnSnapshot', 'SessionInfo', + 'TrackerState', + 'TrackerEvent', + 'RemoteSessionStateTracker', + 'make_slim_params', + 'match mobile_type', ]; const remoteConnectRuleText = remoteConnectRule.patterns .map((pattern) => pattern.regex.source) diff --git a/src/crates/core/src/service/remote_connect/remote_server.rs b/src/crates/core/src/service/remote_connect/remote_server.rs index 697eacb28..726883c65 100644 --- a/src/crates/core/src/service/remote_connect/remote_server.rs +++ b/src/crates/core/src/service/remote_connect/remote_server.rs @@ -8,18 +8,18 @@ //! for state changes using the `PollSession` command, receiving only //! incremental updates (new messages + current active turn snapshot). -use anyhow::{anyhow, Result}; +use anyhow::{Result, anyhow}; use dashmap::DashMap; use log::{debug, error, info}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::{Arc, OnceLock, RwLock}; +use std::sync::{Arc, OnceLock}; use super::encryption; pub use bitfun_services_integrations::remote_connect::{ ActiveTurnSnapshot, AssistantEntry, ChatImageAttachment, ChatMessage, ChatMessageItem, - ImageAttachment, RecentWorkspaceEntry, RemoteToolStatus, SessionInfo, + ImageAttachment, RecentWorkspaceEntry, RemoteSessionStateTracker, RemoteToolStatus, + SessionInfo, TrackerEvent, }; fn current_workspace_path() -> Option { @@ -486,35 +486,12 @@ pub enum RemoteResponse { pub type EncryptedPayload = (String, String); -/// Build a slim version of tool params for mobile preview. -/// Strips large string values (file content, diffs, etc.) to keep payload small, -/// while preserving all short fields so the frontend can parse and display them. -fn make_slim_params(params: &serde_json::Value) -> Option { - match params { - serde_json::Value::Object(obj) => { - let slim: serde_json::Map = obj - .iter() - .filter_map(|(k, v)| match v { - serde_json::Value::String(s) if s.len() > 200 => None, - _ => Some((k.clone(), v.clone())), - }) - .collect(); - if slim.is_empty() { - return None; - } - serde_json::to_string(&serde_json::Value::Object(slim)).ok() - } - serde_json::Value::String(s) => Some(s.chars().take(200).collect()), - _ => None, - } -} - /// Compress a base64 data-URL image to a small thumbnail for mobile display. /// Falls back to the original if decoding/compression fails or the image is /// already within `max_bytes`. fn compress_data_url_for_mobile(data_url: &str, max_bytes: usize) -> String { - use base64::engine::general_purpose::STANDARD as BASE64; use base64::Engine; + use base64::engine::general_purpose::STANDARD as BASE64; use image::imageops::FilterType; const MAX_THUMBNAIL_DIM: u32 = 400; @@ -699,7 +676,10 @@ fn turns_to_chat_messages(turns: &[crate::service::session::DialogTurnData]) -> status: status_str.to_string(), duration_ms: t.duration_ms, start_ms: Some(t.start_time), - input_preview: make_slim_params(&t.tool_call.input), + input_preview: + bitfun_services_integrations::remote_connect::make_slim_tool_params( + &t.tool_call.input, + ), tool_input: if t.tool_name == "AskUserQuestion" || t.tool_name == "Task" || t.tool_name == "TodoWrite" @@ -806,13 +786,7 @@ fn strip_user_input_tags(content: &str) -> String { } fn resolve_agent_type(mobile_type: Option<&str>) -> &'static str { - match mobile_type { - Some("code") | Some("agentic") | Some("Agentic") => "agentic", - Some("cowork") | Some("Cowork") => "Cowork", - Some("plan") | Some("Plan") => "Plan", - Some("debug") | Some("Debug") => "debug", - _ => "agentic", - } + bitfun_services_integrations::remote_connect::resolve_remote_agent_type(mobile_type) } /// Convert legacy `ImageAttachment` to unified `ImageContextData`. @@ -849,637 +823,7 @@ pub fn images_to_contexts( .collect() } -// ── RemoteSessionStateTracker ────────────────────────────────────── - -/// Mutable state snapshot updated by the event subscriber. -#[derive(Debug)] -struct TrackerState { - session_state: String, - title: String, - turn_id: Option, - turn_status: String, - accumulated_text: String, - accumulated_thinking: String, - active_tools: Vec, - round_index: usize, - /// Ordered items preserving the interleaved arrival order for real-time display. - active_items: Vec, - /// Set on structural events (turn start/complete) that change persisted - /// messages. Cleared after the poll handler loads persistence. Allows - /// skipping the expensive disk read during streaming. - persistence_dirty: bool, -} - -/// Lightweight event broadcast by the tracker for real-time consumers (e.g. bots). -#[derive(Debug, Clone)] -pub enum TrackerEvent { - TextChunk(String), - ThinkingChunk(String), - /// All thinking content for the current round has been emitted. - /// Carries the full accumulated thinking text so consumers can send - /// a single summary instead of per-chunk messages. - ThinkingEnd, - ToolStarted { - tool_id: String, - tool_name: String, - params: Option, - }, - ToolCompleted { - tool_id: String, - tool_name: String, - duration_ms: Option, - success: bool, - }, - TurnCompleted { - turn_id: String, - }, - TurnFailed { - turn_id: String, - error: String, - }, - TurnCancelled { - turn_id: String, - }, -} - -/// Tracks the real-time state of a session for polling by the mobile client. -/// Subscribes to `AgenticEvent` and updates an in-memory snapshot. -/// Also broadcasts lightweight `TrackerEvent`s for real-time consumers. -pub struct RemoteSessionStateTracker { - target_session_id: String, - version: AtomicU64, - state: RwLock, - event_tx: tokio::sync::broadcast::Sender, -} - -impl RemoteSessionStateTracker { - pub fn new(session_id: String) -> Self { - let (event_tx, _) = tokio::sync::broadcast::channel(1024); - Self { - target_session_id: session_id, - version: AtomicU64::new(0), - state: RwLock::new(TrackerState { - session_state: "idle".to_string(), - title: String::new(), - turn_id: None, - turn_status: String::new(), - accumulated_text: String::new(), - accumulated_thinking: String::new(), - active_tools: Vec::new(), - round_index: 0, - active_items: Vec::new(), - persistence_dirty: true, - }), - event_tx, - } - } - - /// Subscribe to real-time tracker events (for bot streaming). - pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver { - self.event_tx.subscribe() - } - - pub fn version(&self) -> u64 { - self.version.load(Ordering::Relaxed) - } - - fn bump_version(&self) { - self.version.fetch_add(1, Ordering::Relaxed); - } - - pub fn snapshot_active_turn(&self) -> Option { - let s = self.state.read().unwrap(); - let has_items = !s.active_items.is_empty(); - s.turn_id.as_ref().map(|tid| ActiveTurnSnapshot { - turn_id: tid.clone(), - status: s.turn_status.clone(), - // When items exist they already contain the text/thinking content. - // Skip the duplicate top-level fields to halve the payload. - text: if has_items { - String::new() - } else { - s.accumulated_text.clone() - }, - thinking: if has_items { - String::new() - } else { - s.accumulated_thinking.clone() - }, - tools: s.active_tools.clone(), - round_index: s.round_index, - items: if has_items { - Some(s.active_items.clone()) - } else { - None - }, - }) - } - - pub fn session_state(&self) -> String { - self.state.read().unwrap().session_state.clone() - } - - pub fn title(&self) -> String { - self.state.read().unwrap().title.clone() - } - - pub fn turn_status(&self) -> String { - self.state.read().unwrap().turn_status.clone() - } - - /// Return the full accumulated response text for the current turn. - /// - /// Unlike the broadcast channel (which can lag and drop chunks), this - /// is maintained directly from the source `AgenticEvent` stream and is - /// therefore authoritative. - pub fn accumulated_text(&self) -> String { - self.state.read().unwrap().accumulated_text.clone() - } - - /// Return the full accumulated thinking text for the current turn. - pub fn accumulated_thinking(&self) -> String { - self.state.read().unwrap().accumulated_thinking.clone() - } - - /// Returns true if the turn has ended (completed/failed/cancelled) but - /// the tracker state hasn't been cleaned up yet (waiting for persistence). - pub fn is_turn_finished(&self) -> bool { - let s = self.state.read().unwrap(); - s.turn_id.is_some() - && matches!(s.turn_status.as_str(), "completed" | "failed" | "cancelled") - } - - /// Seed initial turn state when the tracker is created after the - /// `DialogTurnStarted` event already fired (e.g. desktop-triggered turns). - /// Subsequent streaming events will be captured normally by the subscriber. - pub fn initialize_active_turn(&self, turn_id: String) { - let mut s = self.state.write().unwrap(); - if s.turn_id.is_none() { - s.turn_id = Some(turn_id); - s.turn_status = "active".to_string(); - s.session_state = "running".to_string(); - } - drop(s); - self.bump_version(); - } - - /// Clear tracker state after the persisted historical message is confirmed - /// available. Called by the poll handler to complete the atomic transition. - pub fn finalize_completed_turn(&self) { - let mut s = self.state.write().unwrap(); - if matches!(s.turn_status.as_str(), "completed" | "failed" | "cancelled") { - s.turn_id = None; - s.accumulated_text.clear(); - s.accumulated_thinking.clear(); - s.active_tools.clear(); - s.active_items.clear(); - } - } - - /// Whether the persisted message list may have changed since the last - /// poll. Structural events (turn start / complete) set this flag; - /// streaming events (text / thinking chunks) do not. - pub fn is_persistence_dirty(&self) -> bool { - self.state.read().unwrap().persistence_dirty - } - - pub fn mark_persistence_clean(&self) { - self.state.write().unwrap().persistence_dirty = false; - } - - /// Find the last item of `target_type` with matching `subagent_marker` that - /// can be extended, skipping over the complementary text/thinking type. - /// Tool items act as boundaries — we never merge across tool items. - /// This mirrors the desktop's EventBatcher behaviour where text and thinking - /// accumulate independently within a single ModelRound. - fn find_mergeable_item( - items: &[ChatMessageItem], - target_type: &str, - subagent_marker: &Option, - ) -> Option { - for i in (0..items.len()).rev() { - let item = &items[i]; - if item.item_type == "tool" { - return None; - } - if item.item_type == target_type && &item.is_subagent == subagent_marker { - return Some(i); - } - } - None - } - - fn upsert_active_tool( - state: &mut TrackerState, - tool_id: &str, - tool_name: &str, - status: &str, - input_preview: Option, - tool_input: Option, - is_subagent: bool, - ) { - let resolved_id = if tool_id.is_empty() { - format!("{}-{}", tool_name, state.active_tools.len()) - } else { - tool_id.to_string() - }; - let allow_name_fallback = tool_id.is_empty() && !tool_name.is_empty(); - let subagent_marker = if is_subagent { Some(true) } else { None }; - - if let Some(tool) = state - .active_tools - .iter_mut() - .rev() - .find(|t| t.id == resolved_id || (allow_name_fallback && t.name == tool_name)) - { - tool.status = status.to_string(); - if input_preview.is_some() { - tool.input_preview = input_preview.clone(); - } - if tool_input.is_some() { - tool.tool_input = tool_input.clone(); - } - } else { - let tool_status = RemoteToolStatus { - id: resolved_id.clone(), - name: tool_name.to_string(), - status: status.to_string(), - duration_ms: None, - start_ms: Some( - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as u64, - ), - input_preview, - tool_input, - }; - state.active_tools.push(tool_status.clone()); - state.active_items.push(ChatMessageItem { - item_type: "tool".to_string(), - content: None, - tool: Some(tool_status), - is_subagent: subagent_marker, - }); - return; - } - - if let Some(item) = state.active_items.iter_mut().rev().find(|i| { - i.item_type == "tool" - && i.tool.as_ref().is_some_and(|t| { - t.id == resolved_id || (allow_name_fallback && t.name == tool_name) - }) - }) { - if let Some(tool) = item.tool.as_mut() { - tool.status = status.to_string(); - if input_preview.is_some() { - tool.input_preview = input_preview; - } - if tool_input.is_some() { - tool.tool_input = tool_input; - } - } - } - } - - fn handle_event(&self, event: &crate::agentic::events::AgenticEvent) { - use bitfun_events::AgenticEvent as AE; - - let is_direct = event.session_id() == Some(self.target_session_id.as_str()); - let is_subagent = if !is_direct { - match event { - AE::TextChunk { - subagent_parent_info, - .. - } - | AE::ThinkingChunk { - subagent_parent_info, - .. - } - | AE::ToolEvent { - subagent_parent_info, - .. - } => subagent_parent_info - .as_ref() - .is_some_and(|p| p.session_id == self.target_session_id), - _ => false, - } - } else { - false - }; - - if !is_direct && !is_subagent { - return; - } - - match event { - AE::TextChunk { text, .. } => { - let subagent_marker = if is_subagent { Some(true) } else { None }; - let mut s = self.state.write().unwrap(); - if !is_subagent { - s.accumulated_text.push_str(text); - } - let extend_idx = - Self::find_mergeable_item(&s.active_items, "text", &subagent_marker); - if let Some(idx) = extend_idx { - let item = &mut s.active_items[idx]; - let c = item.content.get_or_insert_with(String::new); - c.push_str(text); - } else { - s.active_items.push(ChatMessageItem { - item_type: "text".to_string(), - content: Some(text.clone()), - tool: None, - is_subagent: subagent_marker, - }); - } - drop(s); - self.bump_version(); - let _ = self.event_tx.send(TrackerEvent::TextChunk(text.clone())); - } - AE::ThinkingChunk { - content, is_end, .. - } => { - let clean = content.replace("", "").replace("", ""); - let subagent_marker = if is_subagent { Some(true) } else { None }; - let mut s = self.state.write().unwrap(); - if !is_subagent { - s.accumulated_thinking.push_str(&clean); - } - let extend_idx = - Self::find_mergeable_item(&s.active_items, "thinking", &subagent_marker); - if let Some(idx) = extend_idx { - let item = &mut s.active_items[idx]; - let c = item.content.get_or_insert_with(String::new); - c.push_str(&clean); - } else { - s.active_items.push(ChatMessageItem { - item_type: "thinking".to_string(), - content: Some(clean), - tool: None, - is_subagent: subagent_marker, - }); - } - drop(s); - self.bump_version(); - if *is_end { - let _ = self.event_tx.send(TrackerEvent::ThinkingEnd); - } else if !content.is_empty() { - let _ = self - .event_tx - .send(TrackerEvent::ThinkingChunk(content.clone())); - } - } - AE::ToolEvent { tool_event, .. } => { - if let Ok(val) = serde_json::to_value(tool_event) { - let event_type = val.get("event_type").and_then(|v| v.as_str()).unwrap_or(""); - let tool_id = val - .get("tool_id") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - let tool_name = val - .get("tool_name") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - - let mut s = self.state.write().unwrap(); - let allow_name_fallback = tool_id.is_empty() && !tool_name.is_empty(); - let mut pending_tool_event: Option = None; - match event_type { - "EarlyDetected" => { - Self::upsert_active_tool( - &mut s, - &tool_id, - &tool_name, - "preparing", - None, - None, - is_subagent, - ); - } - "ConfirmationNeeded" => { - let params = val.get("params").cloned(); - let input_preview = params.as_ref().and_then(make_slim_params); - Self::upsert_active_tool( - &mut s, - &tool_id, - &tool_name, - "pending_confirmation", - input_preview, - params, - is_subagent, - ); - } - "Started" => { - let params = val.get("params").cloned(); - let input_preview = params.as_ref().and_then(make_slim_params); - let tool_input = if tool_name == "AskUserQuestion" - || tool_name == "Task" - || tool_name == "TodoWrite" - { - params.clone() - } else { - None - }; - Self::upsert_active_tool( - &mut s, - &tool_id, - &tool_name, - "running", - input_preview, - tool_input, - is_subagent, - ); - let _ = self.event_tx.send(TrackerEvent::ToolStarted { - tool_id: tool_id.clone(), - tool_name: tool_name.clone(), - params, - }); - } - "Confirmed" => { - Self::upsert_active_tool( - &mut s, - &tool_id, - &tool_name, - "confirmed", - None, - None, - is_subagent, - ); - } - "Rejected" => { - Self::upsert_active_tool( - &mut s, - &tool_id, - &tool_name, - "rejected", - None, - None, - is_subagent, - ); - } - "Completed" | "Succeeded" => { - let duration = val.get("duration_ms").and_then(|v| v.as_u64()); - if let Some(t) = s.active_tools.iter_mut().rev().find(|t| { - (t.id == tool_id || (allow_name_fallback && t.name == tool_name)) - && t.status == "running" - }) { - t.status = "completed".to_string(); - t.duration_ms = duration; - } - if let Some(item) = s.active_items.iter_mut().rev().find(|i| { - i.item_type == "tool" - && i.tool.as_ref().is_some_and(|t| { - (t.id == tool_id - || (allow_name_fallback && t.name == tool_name)) - && t.status == "running" - }) - }) { - if let Some(t) = item.tool.as_mut() { - t.status = "completed".to_string(); - t.duration_ms = duration; - } - } - pending_tool_event = Some(TrackerEvent::ToolCompleted { - tool_id: tool_id.clone(), - tool_name: tool_name.clone(), - duration_ms: duration, - success: true, - }); - } - "Failed" => { - if let Some(t) = s.active_tools.iter_mut().rev().find(|t| { - (t.id == tool_id || (allow_name_fallback && t.name == tool_name)) - && t.status == "running" - }) { - t.status = "failed".to_string(); - } - if let Some(item) = s.active_items.iter_mut().rev().find(|i| { - i.item_type == "tool" - && i.tool.as_ref().is_some_and(|t| { - (t.id == tool_id - || (allow_name_fallback && t.name == tool_name)) - && t.status == "running" - }) - }) { - if let Some(t) = item.tool.as_mut() { - t.status = "failed".to_string(); - } - } - pending_tool_event = Some(TrackerEvent::ToolCompleted { - tool_id: tool_id.clone(), - tool_name: tool_name.clone(), - duration_ms: None, - success: false, - }); - } - "Cancelled" => { - if let Some(t) = s.active_tools.iter_mut().rev().find(|t| { - (t.id == tool_id || (allow_name_fallback && t.name == tool_name)) - && matches!( - t.status.as_str(), - "running" | "pending_confirmation" | "confirmed" - ) - }) { - t.status = "cancelled".to_string(); - } - if let Some(item) = s.active_items.iter_mut().rev().find(|i| { - i.item_type == "tool" - && i.tool.as_ref().is_some_and(|t| { - (t.id == tool_id - || (allow_name_fallback && t.name == tool_name)) - && matches!( - t.status.as_str(), - "running" | "pending_confirmation" | "confirmed" - ) - }) - }) { - if let Some(t) = item.tool.as_mut() { - t.status = "cancelled".to_string(); - } - } - } - _ => {} - } - drop(s); - self.bump_version(); - if let Some(evt) = pending_tool_event { - let _ = self.event_tx.send(evt); - } - } - } - AE::DialogTurnStarted { turn_id, .. } if is_direct => { - let mut s = self.state.write().unwrap(); - s.turn_id = Some(turn_id.clone()); - s.turn_status = "active".to_string(); - s.accumulated_text.clear(); - s.accumulated_thinking.clear(); - s.active_tools.clear(); - s.active_items.clear(); - s.round_index = 0; - s.session_state = "running".to_string(); - s.persistence_dirty = true; - drop(s); - self.bump_version(); - } - AE::DialogTurnCompleted { turn_id, .. } if is_direct => { - let mut s = self.state.write().unwrap(); - s.turn_status = "completed".to_string(); - s.session_state = "idle".to_string(); - s.persistence_dirty = true; - drop(s); - self.bump_version(); - let _ = self.event_tx.send(TrackerEvent::TurnCompleted { - turn_id: turn_id.clone(), - }); - } - AE::DialogTurnFailed { turn_id, error, .. } if is_direct => { - let mut s = self.state.write().unwrap(); - s.turn_status = "failed".to_string(); - s.session_state = "idle".to_string(); - s.persistence_dirty = true; - drop(s); - self.bump_version(); - let _ = self.event_tx.send(TrackerEvent::TurnFailed { - turn_id: turn_id.clone(), - error: error.clone(), - }); - } - AE::DialogTurnCancelled { turn_id, .. } if is_direct => { - let mut s = self.state.write().unwrap(); - s.turn_status = "cancelled".to_string(); - s.session_state = "idle".to_string(); - s.persistence_dirty = true; - drop(s); - self.bump_version(); - let _ = self.event_tx.send(TrackerEvent::TurnCancelled { - turn_id: turn_id.clone(), - }); - } - AE::ModelRoundStarted { round_index, .. } if is_direct => { - let mut s = self.state.write().unwrap(); - s.round_index = *round_index; - drop(s); - self.bump_version(); - } - AE::SessionStateChanged { new_state, .. } if is_direct => { - let mut s = self.state.write().unwrap(); - s.session_state = new_state.clone(); - drop(s); - self.bump_version(); - } - AE::SessionTitleGenerated { title, .. } if is_direct => { - let mut s = self.state.write().unwrap(); - s.title = title.clone(); - drop(s); - self.bump_version(); - } - _ => {} - } - } -} +// ── RemoteSessionStateTracker subscriber adapter ───────────────── #[async_trait::async_trait] impl crate::agentic::events::EventSubscriber for Arc { @@ -1487,7 +831,7 @@ impl crate::agentic::events::EventSubscriber for Arc &self, event: &crate::agentic::events::AgenticEvent, ) -> crate::util::errors::BitFunResult<()> { - self.handle_event(event); + self.handle_agentic_event(event); Ok(()) } } @@ -1841,7 +1185,7 @@ impl RemoteServer { ) -> RemoteResponse { use crate::agentic::persistence::PersistenceManager; use crate::infrastructure::PathManager; - use crate::service::workspace::{get_global_workspace_service, WorkspaceKind}; + use crate::service::workspace::{WorkspaceKind, get_global_workspace_service}; let ( ws_path, @@ -2064,7 +1408,7 @@ impl RemoteServer { /// Relative paths are resolved against the session workspace when possible, /// otherwise the current workspace root. Rejects files larger than 30 MB. async fn handle_read_file(&self, raw_path: &str, session_id: Option<&str>) -> RemoteResponse { - use crate::service::remote_connect::bot::{read_workspace_file, WorkspaceFileContent}; + use crate::service::remote_connect::bot::{WorkspaceFileContent, read_workspace_file}; const MAX_SIZE: u64 = 30 * 1024 * 1024; // Unified 30 MB cap (Feishu API hard limit) let workspace_root = resolve_file_workspace_root(session_id).await; @@ -2105,7 +1449,7 @@ impl RemoteServer { None => { return RemoteResponse::Error { message: format!("Remote file path could not be resolved: {raw_path}"), - } + }; } }; if !abs.exists() || !abs.is_file() { @@ -2119,7 +1463,7 @@ impl RemoteServer { Err(e) => { return RemoteResponse::Error { message: format!("Cannot read file metadata: {e}"), - } + }; } }; @@ -2134,7 +1478,7 @@ impl RemoteServer { Err(e) => { return RemoteResponse::Error { message: format!("Cannot read file: {e}"), - } + }; } }; @@ -2174,7 +1518,7 @@ impl RemoteServer { None => { return RemoteResponse::Error { message: format!("Remote file path could not be resolved: {raw_path}"), - } + }; } }; @@ -2194,7 +1538,7 @@ impl RemoteServer { Err(e) => { return RemoteResponse::Error { message: format!("Cannot read file metadata: {e}"), - } + }; } }; @@ -2218,7 +1562,7 @@ impl RemoteServer { match cmd { RemoteCommand::GetWorkspaceInfo => { - use crate::service::workspace::{get_global_workspace_service, WorkspaceKind}; + use crate::service::workspace::{WorkspaceKind, get_global_workspace_service}; if let Some(ws_service) = get_global_workspace_service() { if let Some(ws) = ws_service.get_current_workspace().await { @@ -2390,7 +1734,7 @@ impl RemoteServer { use crate::agentic::coordination::get_global_coordinator; use bitfun_runtime_ports::AgentSubmissionPort; use bitfun_services_integrations::remote_connect::{ - build_remote_session_create_request, RemoteConnectSubmissionSource, + RemoteConnectSubmissionSource, build_remote_session_create_request, }; let coordinator = match get_global_coordinator() { @@ -2598,7 +1942,7 @@ impl RemoteServer { Err(e) => { return RemoteResponse::Error { message: format!("Failed to load AI config: {e}"), - } + }; } }; match ai_config.resolve_model_reference(requested_model_id) { @@ -2608,7 +1952,7 @@ impl RemoteServer { message: format!( "Unknown model selection: {requested_model_id}" ), - } + }; } } }; @@ -2708,7 +2052,7 @@ impl RemoteServer { async fn handle_execution_command(&self, cmd: &RemoteCommand) -> RemoteResponse { use crate::agentic::coordination::{ - get_global_coordinator, DialogSubmissionPolicy, DialogTriggerSource, + DialogSubmissionPolicy, DialogTriggerSource, get_global_coordinator, }; let dispatcher = get_or_init_global_dispatcher(); diff --git a/src/crates/services-integrations/src/remote_connect.rs b/src/crates/services-integrations/src/remote_connect.rs index 6695cd424..4c2d34925 100644 --- a/src/crates/services-integrations/src/remote_connect.rs +++ b/src/crates/services-integrations/src/remote_connect.rs @@ -1,13 +1,16 @@ //! Remote-connect integration contracts. //! -//! This module owns stable remote-facing DTOs and runtime-port request -//! construction. Network lifecycle, tracker state, and product assembly stay in -//! `bitfun-core` until their ports are explicit. +//! This module owns stable remote-facing DTOs, runtime-port request +//! construction, and remote session tracker state. Network lifecycle and +//! product assembly stay in `bitfun-core` until their ports are explicit. +use bitfun_events::AgenticEvent; use bitfun_runtime_ports::{ AgentInputAttachment, AgentSessionCreateRequest, AgentSubmissionRequest, AgentSubmissionSource, }; use serde::{Deserialize, Serialize}; +use std::sync::RwLock; +use std::sync::atomic::{AtomicU64, Ordering}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] @@ -100,6 +103,16 @@ pub fn build_remote_image_submission_request( } } +pub fn resolve_remote_agent_type(mobile_type: Option<&str>) -> &'static str { + match mobile_type { + Some("code") | Some("agentic") | Some("Agentic") => "agentic", + Some("cowork") | Some("Cowork") => "Cowork", + Some("plan") | Some("Plan") => "Plan", + Some("debug") | Some("Debug") => "debug", + _ => "agentic", + } +} + /// Image sent from a remote client as a base64 data URL. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ImageAttachment { @@ -198,3 +211,636 @@ pub struct RemoteToolStatus { #[serde(skip_serializing_if = "Option::is_none")] pub tool_input: Option, } + +/// Build a slim version of tool params for remote preview payloads. +/// +/// Large string values such as file contents and diffs are omitted, while +/// short structured fields stay available for remote clients that need to +/// render tool details. +pub fn make_slim_tool_params(params: &serde_json::Value) -> Option { + match params { + serde_json::Value::Object(obj) => { + let slim: serde_json::Map = obj + .iter() + .filter_map(|(key, value)| match value { + serde_json::Value::String(text) if text.len() > 200 => None, + _ => Some((key.clone(), value.clone())), + }) + .collect(); + if slim.is_empty() { + return None; + } + serde_json::to_string(&serde_json::Value::Object(slim)).ok() + } + serde_json::Value::String(text) => Some(text.chars().take(200).collect()), + _ => None, + } +} + +#[derive(Debug)] +struct TrackerState { + session_state: String, + title: String, + turn_id: Option, + turn_status: String, + accumulated_text: String, + accumulated_thinking: String, + active_tools: Vec, + round_index: usize, + active_items: Vec, + persistence_dirty: bool, +} + +/// Lightweight event broadcast by the tracker for real-time consumers. +#[derive(Debug, Clone, PartialEq)] +pub enum TrackerEvent { + TextChunk(String), + ThinkingChunk(String), + ThinkingEnd, + ToolStarted { + tool_id: String, + tool_name: String, + params: Option, + }, + ToolCompleted { + tool_id: String, + tool_name: String, + duration_ms: Option, + success: bool, + }, + TurnCompleted { + turn_id: String, + }, + TurnFailed { + turn_id: String, + error: String, + }, + TurnCancelled { + turn_id: String, + }, +} + +/// Tracks the real-time state of a session for remote polling and bot streams. +pub struct RemoteSessionStateTracker { + target_session_id: String, + version: AtomicU64, + state: RwLock, + event_tx: tokio::sync::broadcast::Sender, +} + +impl RemoteSessionStateTracker { + pub fn new(session_id: String) -> Self { + let (event_tx, _) = tokio::sync::broadcast::channel(1024); + Self { + target_session_id: session_id, + version: AtomicU64::new(0), + state: RwLock::new(TrackerState { + session_state: "idle".to_string(), + title: String::new(), + turn_id: None, + turn_status: String::new(), + accumulated_text: String::new(), + accumulated_thinking: String::new(), + active_tools: Vec::new(), + round_index: 0, + active_items: Vec::new(), + persistence_dirty: true, + }), + event_tx, + } + } + + pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver { + self.event_tx.subscribe() + } + + pub fn version(&self) -> u64 { + self.version.load(Ordering::Relaxed) + } + + fn bump_version(&self) { + self.version.fetch_add(1, Ordering::Relaxed); + } + + pub fn snapshot_active_turn(&self) -> Option { + let state = self.state.read().unwrap(); + let has_items = !state.active_items.is_empty(); + state.turn_id.as_ref().map(|turn_id| ActiveTurnSnapshot { + turn_id: turn_id.clone(), + status: state.turn_status.clone(), + text: if has_items { + String::new() + } else { + state.accumulated_text.clone() + }, + thinking: if has_items { + String::new() + } else { + state.accumulated_thinking.clone() + }, + tools: state.active_tools.clone(), + round_index: state.round_index, + items: if has_items { + Some(state.active_items.clone()) + } else { + None + }, + }) + } + + pub fn session_state(&self) -> String { + self.state.read().unwrap().session_state.clone() + } + + pub fn title(&self) -> String { + self.state.read().unwrap().title.clone() + } + + pub fn turn_status(&self) -> String { + self.state.read().unwrap().turn_status.clone() + } + + pub fn accumulated_text(&self) -> String { + self.state.read().unwrap().accumulated_text.clone() + } + + pub fn accumulated_thinking(&self) -> String { + self.state.read().unwrap().accumulated_thinking.clone() + } + + pub fn is_turn_finished(&self) -> bool { + let state = self.state.read().unwrap(); + state.turn_id.is_some() + && matches!( + state.turn_status.as_str(), + "completed" | "failed" | "cancelled" + ) + } + + pub fn initialize_active_turn(&self, turn_id: String) { + let mut state = self.state.write().unwrap(); + if state.turn_id.is_none() { + state.turn_id = Some(turn_id); + state.turn_status = "active".to_string(); + state.session_state = "running".to_string(); + } + drop(state); + self.bump_version(); + } + + pub fn finalize_completed_turn(&self) { + let mut state = self.state.write().unwrap(); + if matches!( + state.turn_status.as_str(), + "completed" | "failed" | "cancelled" + ) { + state.turn_id = None; + state.accumulated_text.clear(); + state.accumulated_thinking.clear(); + state.active_tools.clear(); + state.active_items.clear(); + } + } + + pub fn is_persistence_dirty(&self) -> bool { + self.state.read().unwrap().persistence_dirty + } + + pub fn mark_persistence_clean(&self) { + self.state.write().unwrap().persistence_dirty = false; + } + + fn find_mergeable_item( + items: &[ChatMessageItem], + target_type: &str, + subagent_marker: &Option, + ) -> Option { + for index in (0..items.len()).rev() { + let item = &items[index]; + if item.item_type == "tool" { + return None; + } + if item.item_type == target_type && &item.is_subagent == subagent_marker { + return Some(index); + } + } + None + } + + fn upsert_active_tool( + state: &mut TrackerState, + tool_id: &str, + tool_name: &str, + status: &str, + input_preview: Option, + tool_input: Option, + is_subagent: bool, + ) { + let resolved_id = if tool_id.is_empty() { + format!("{}-{}", tool_name, state.active_tools.len()) + } else { + tool_id.to_string() + }; + let allow_name_fallback = tool_id.is_empty() && !tool_name.is_empty(); + let subagent_marker = if is_subagent { Some(true) } else { None }; + + if let Some(tool) = + state.active_tools.iter_mut().rev().find(|tool| { + tool.id == resolved_id || (allow_name_fallback && tool.name == tool_name) + }) + { + tool.status = status.to_string(); + if input_preview.is_some() { + tool.input_preview = input_preview.clone(); + } + if tool_input.is_some() { + tool.tool_input = tool_input.clone(); + } + } else { + let tool_status = RemoteToolStatus { + id: resolved_id.clone(), + name: tool_name.to_string(), + status: status.to_string(), + duration_ms: None, + start_ms: Some( + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64, + ), + input_preview, + tool_input, + }; + state.active_tools.push(tool_status.clone()); + state.active_items.push(ChatMessageItem { + item_type: "tool".to_string(), + content: None, + tool: Some(tool_status), + is_subagent: subagent_marker, + }); + return; + } + + if let Some(item) = state.active_items.iter_mut().rev().find(|item| { + item.item_type == "tool" + && item.tool.as_ref().is_some_and(|tool| { + tool.id == resolved_id || (allow_name_fallback && tool.name == tool_name) + }) + }) { + if let Some(tool) = item.tool.as_mut() { + tool.status = status.to_string(); + if input_preview.is_some() { + tool.input_preview = input_preview; + } + if tool_input.is_some() { + tool.tool_input = tool_input; + } + } + } + } + + pub fn handle_agentic_event(&self, event: &AgenticEvent) { + use bitfun_events::AgenticEvent as AE; + + let is_direct = event.session_id() == Some(self.target_session_id.as_str()); + let is_subagent = if !is_direct { + match event { + AE::TextChunk { + subagent_parent_info, + .. + } + | AE::ThinkingChunk { + subagent_parent_info, + .. + } + | AE::ToolEvent { + subagent_parent_info, + .. + } => subagent_parent_info + .as_ref() + .is_some_and(|parent| parent.session_id == self.target_session_id), + _ => false, + } + } else { + false + }; + + if !is_direct && !is_subagent { + return; + } + + match event { + AE::TextChunk { text, .. } => { + let subagent_marker = if is_subagent { Some(true) } else { None }; + let mut state = self.state.write().unwrap(); + if !is_subagent { + state.accumulated_text.push_str(text); + } + if let Some(index) = + Self::find_mergeable_item(&state.active_items, "text", &subagent_marker) + { + let item = &mut state.active_items[index]; + item.content.get_or_insert_with(String::new).push_str(text); + } else { + state.active_items.push(ChatMessageItem { + item_type: "text".to_string(), + content: Some(text.clone()), + tool: None, + is_subagent: subagent_marker, + }); + } + drop(state); + self.bump_version(); + let _ = self.event_tx.send(TrackerEvent::TextChunk(text.clone())); + } + AE::ThinkingChunk { + content, is_end, .. + } => { + let clean = content.replace("", "").replace("", ""); + let subagent_marker = if is_subagent { Some(true) } else { None }; + let mut state = self.state.write().unwrap(); + if !is_subagent { + state.accumulated_thinking.push_str(&clean); + } + if let Some(index) = + Self::find_mergeable_item(&state.active_items, "thinking", &subagent_marker) + { + let item = &mut state.active_items[index]; + item.content + .get_or_insert_with(String::new) + .push_str(&clean); + } else { + state.active_items.push(ChatMessageItem { + item_type: "thinking".to_string(), + content: Some(clean), + tool: None, + is_subagent: subagent_marker, + }); + } + drop(state); + self.bump_version(); + if *is_end { + let _ = self.event_tx.send(TrackerEvent::ThinkingEnd); + } else if !content.is_empty() { + let _ = self + .event_tx + .send(TrackerEvent::ThinkingChunk(content.clone())); + } + } + AE::ToolEvent { tool_event, .. } => { + if let Ok(value) = serde_json::to_value(tool_event) { + let event_type = value + .get("event_type") + .and_then(|value| value.as_str()) + .unwrap_or(""); + let tool_id = value + .get("tool_id") + .and_then(|value| value.as_str()) + .unwrap_or("") + .to_string(); + let tool_name = value + .get("tool_name") + .and_then(|value| value.as_str()) + .unwrap_or("") + .to_string(); + + let mut state = self.state.write().unwrap(); + let allow_name_fallback = tool_id.is_empty() && !tool_name.is_empty(); + let mut pending_tool_event: Option = None; + match event_type { + "EarlyDetected" => { + Self::upsert_active_tool( + &mut state, + &tool_id, + &tool_name, + "preparing", + None, + None, + is_subagent, + ); + } + "ConfirmationNeeded" => { + let params = value.get("params").cloned(); + let input_preview = params.as_ref().and_then(make_slim_tool_params); + Self::upsert_active_tool( + &mut state, + &tool_id, + &tool_name, + "pending_confirmation", + input_preview, + params, + is_subagent, + ); + } + "Started" => { + let params = value.get("params").cloned(); + let input_preview = params.as_ref().and_then(make_slim_tool_params); + let tool_input = if tool_name == "AskUserQuestion" + || tool_name == "Task" + || tool_name == "TodoWrite" + { + params.clone() + } else { + None + }; + Self::upsert_active_tool( + &mut state, + &tool_id, + &tool_name, + "running", + input_preview, + tool_input, + is_subagent, + ); + let _ = self.event_tx.send(TrackerEvent::ToolStarted { + tool_id: tool_id.clone(), + tool_name: tool_name.clone(), + params, + }); + } + "Confirmed" => { + Self::upsert_active_tool( + &mut state, + &tool_id, + &tool_name, + "confirmed", + None, + None, + is_subagent, + ); + } + "Rejected" => { + Self::upsert_active_tool( + &mut state, + &tool_id, + &tool_name, + "rejected", + None, + None, + is_subagent, + ); + } + "Completed" | "Succeeded" => { + let duration = + value.get("duration_ms").and_then(|value| value.as_u64()); + if let Some(tool) = state.active_tools.iter_mut().rev().find(|tool| { + (tool.id == tool_id + || (allow_name_fallback && tool.name == tool_name)) + && tool.status == "running" + }) { + tool.status = "completed".to_string(); + tool.duration_ms = duration; + } + if let Some(item) = state.active_items.iter_mut().rev().find(|item| { + item.item_type == "tool" + && item.tool.as_ref().is_some_and(|tool| { + (tool.id == tool_id + || (allow_name_fallback && tool.name == tool_name)) + && tool.status == "running" + }) + }) { + if let Some(tool) = item.tool.as_mut() { + tool.status = "completed".to_string(); + tool.duration_ms = duration; + } + } + pending_tool_event = Some(TrackerEvent::ToolCompleted { + tool_id: tool_id.clone(), + tool_name: tool_name.clone(), + duration_ms: duration, + success: true, + }); + } + "Failed" => { + if let Some(tool) = state.active_tools.iter_mut().rev().find(|tool| { + (tool.id == tool_id + || (allow_name_fallback && tool.name == tool_name)) + && tool.status == "running" + }) { + tool.status = "failed".to_string(); + } + if let Some(item) = state.active_items.iter_mut().rev().find(|item| { + item.item_type == "tool" + && item.tool.as_ref().is_some_and(|tool| { + (tool.id == tool_id + || (allow_name_fallback && tool.name == tool_name)) + && tool.status == "running" + }) + }) { + if let Some(tool) = item.tool.as_mut() { + tool.status = "failed".to_string(); + } + } + pending_tool_event = Some(TrackerEvent::ToolCompleted { + tool_id: tool_id.clone(), + tool_name: tool_name.clone(), + duration_ms: None, + success: false, + }); + } + "Cancelled" => { + if let Some(tool) = state.active_tools.iter_mut().rev().find(|tool| { + (tool.id == tool_id + || (allow_name_fallback && tool.name == tool_name)) + && matches!( + tool.status.as_str(), + "running" | "pending_confirmation" | "confirmed" + ) + }) { + tool.status = "cancelled".to_string(); + } + if let Some(item) = state.active_items.iter_mut().rev().find(|item| { + item.item_type == "tool" + && item.tool.as_ref().is_some_and(|tool| { + (tool.id == tool_id + || (allow_name_fallback && tool.name == tool_name)) + && matches!( + tool.status.as_str(), + "running" | "pending_confirmation" | "confirmed" + ) + }) + }) { + if let Some(tool) = item.tool.as_mut() { + tool.status = "cancelled".to_string(); + } + } + } + _ => {} + } + drop(state); + self.bump_version(); + if let Some(event) = pending_tool_event { + let _ = self.event_tx.send(event); + } + } + } + AE::DialogTurnStarted { turn_id, .. } if is_direct => { + let mut state = self.state.write().unwrap(); + state.turn_id = Some(turn_id.clone()); + state.turn_status = "active".to_string(); + state.accumulated_text.clear(); + state.accumulated_thinking.clear(); + state.active_tools.clear(); + state.active_items.clear(); + state.round_index = 0; + state.session_state = "running".to_string(); + state.persistence_dirty = true; + drop(state); + self.bump_version(); + } + AE::DialogTurnCompleted { turn_id, .. } if is_direct => { + let mut state = self.state.write().unwrap(); + state.turn_status = "completed".to_string(); + state.session_state = "idle".to_string(); + state.persistence_dirty = true; + drop(state); + self.bump_version(); + let _ = self.event_tx.send(TrackerEvent::TurnCompleted { + turn_id: turn_id.clone(), + }); + } + AE::DialogTurnFailed { turn_id, error, .. } if is_direct => { + let mut state = self.state.write().unwrap(); + state.turn_status = "failed".to_string(); + state.session_state = "idle".to_string(); + state.persistence_dirty = true; + drop(state); + self.bump_version(); + let _ = self.event_tx.send(TrackerEvent::TurnFailed { + turn_id: turn_id.clone(), + error: error.clone(), + }); + } + AE::DialogTurnCancelled { turn_id, .. } if is_direct => { + let mut state = self.state.write().unwrap(); + state.turn_status = "cancelled".to_string(); + state.session_state = "idle".to_string(); + state.persistence_dirty = true; + drop(state); + self.bump_version(); + let _ = self.event_tx.send(TrackerEvent::TurnCancelled { + turn_id: turn_id.clone(), + }); + } + AE::ModelRoundStarted { round_index, .. } if is_direct => { + let mut state = self.state.write().unwrap(); + state.round_index = *round_index; + drop(state); + self.bump_version(); + } + AE::SessionStateChanged { new_state, .. } if is_direct => { + let mut state = self.state.write().unwrap(); + state.session_state = new_state.clone(); + drop(state); + self.bump_version(); + } + AE::SessionTitleGenerated { title, .. } if is_direct => { + let mut state = self.state.write().unwrap(); + state.title = title.clone(); + drop(state); + self.bump_version(); + } + _ => {} + } + } +} diff --git a/src/crates/services-integrations/tests/remote_connect_contracts.rs b/src/crates/services-integrations/tests/remote_connect_contracts.rs index 973a9a009..8679434da 100644 --- a/src/crates/services-integrations/tests/remote_connect_contracts.rs +++ b/src/crates/services-integrations/tests/remote_connect_contracts.rs @@ -1,10 +1,13 @@ #![cfg(feature = "remote-connect")] +use bitfun_events::{AgenticEvent, ToolEventData}; use bitfun_runtime_ports::AgentSubmissionSource; use bitfun_services_integrations::remote_connect::{ + ChatImageAttachment, ChatMessage, ChatMessageItem, ImageAttachment, + RemoteConnectSubmissionSource, RemoteSessionStateTracker, RemoteToolStatus, TrackerEvent, build_remote_image_attachment, build_remote_image_submission_request, - build_remote_session_create_request, build_remote_submission_request, ChatImageAttachment, - ChatMessage, ChatMessageItem, ImageAttachment, RemoteConnectSubmissionSource, RemoteToolStatus, + build_remote_session_create_request, build_remote_submission_request, + resolve_remote_agent_type, }; #[test] @@ -98,6 +101,21 @@ fn remote_connect_session_create_contract_preserves_workspace_binding() { assert_eq!(request.metadata["source"], "remote_relay"); } +#[test] +fn remote_connect_agent_type_mapping_preserves_current_mobile_aliases() { + assert_eq!(resolve_remote_agent_type(Some("code")), "agentic"); + assert_eq!(resolve_remote_agent_type(Some("agentic")), "agentic"); + assert_eq!(resolve_remote_agent_type(Some("Agentic")), "agentic"); + assert_eq!(resolve_remote_agent_type(Some("cowork")), "Cowork"); + assert_eq!(resolve_remote_agent_type(Some("Cowork")), "Cowork"); + assert_eq!(resolve_remote_agent_type(Some("plan")), "Plan"); + assert_eq!(resolve_remote_agent_type(Some("Plan")), "Plan"); + assert_eq!(resolve_remote_agent_type(Some("debug")), "debug"); + assert_eq!(resolve_remote_agent_type(Some("Debug")), "debug"); + assert_eq!(resolve_remote_agent_type(Some("unknown")), "agentic"); + assert_eq!(resolve_remote_agent_type(None), "agentic"); +} + #[test] fn remote_connect_message_dtos_keep_current_wire_shape() { let image = ImageAttachment { @@ -139,3 +157,134 @@ fn remote_connect_message_dtos_keep_current_wire_shape() { assert_eq!(json["items"][0]["type"], "tool"); assert_eq!(json["images"][0]["data_url"], "data:image/png;base64,abc"); } + +#[test] +fn remote_connect_tracker_preserves_streaming_snapshot_contract() { + let tracker = RemoteSessionStateTracker::new("session-1".to_string()); + + tracker.handle_agentic_event(&AgenticEvent::DialogTurnStarted { + session_id: "session-1".to_string(), + turn_id: "turn-1".to_string(), + turn_index: 0, + user_input: "hello".to_string(), + original_user_input: None, + user_message_metadata: None, + subagent_parent_info: None, + }); + tracker.handle_agentic_event(&AgenticEvent::ModelRoundStarted { + session_id: "session-1".to_string(), + turn_id: "turn-1".to_string(), + round_id: "round-1".to_string(), + round_index: 3, + subagent_parent_info: None, + model_id: None, + }); + tracker.handle_agentic_event(&AgenticEvent::ThinkingChunk { + session_id: "session-1".to_string(), + turn_id: "turn-1".to_string(), + round_id: "round-1".to_string(), + content: "plan".to_string(), + is_end: false, + subagent_parent_info: None, + }); + tracker.handle_agentic_event(&AgenticEvent::TextChunk { + session_id: "session-1".to_string(), + turn_id: "turn-1".to_string(), + round_id: "round-1".to_string(), + text: "answer".to_string(), + subagent_parent_info: None, + }); + + let snapshot = tracker + .snapshot_active_turn() + .expect("active turn snapshot"); + + assert_eq!(tracker.session_state(), "running"); + assert_eq!(snapshot.turn_id, "turn-1"); + assert_eq!(snapshot.status, "active"); + assert_eq!(snapshot.round_index, 3); + assert_eq!(snapshot.text, ""); + assert_eq!(snapshot.thinking, ""); + let items = snapshot.items.expect("ordered streaming items"); + assert_eq!(items.len(), 2); + assert_eq!(items[0].item_type, "thinking"); + assert_eq!(items[0].content.as_deref(), Some("plan")); + assert_eq!(items[1].item_type, "text"); + assert_eq!(items[1].content.as_deref(), Some("answer")); +} + +#[test] +fn remote_connect_tracker_keeps_subagent_items_out_of_parent_accumulators() { + let tracker = RemoteSessionStateTracker::new("parent-session".to_string()); + let subagent_parent_info = Some(bitfun_events::SubagentParentInfo { + tool_call_id: "task-1".to_string(), + session_id: "parent-session".to_string(), + dialog_turn_id: "parent-turn".to_string(), + }); + + tracker.initialize_active_turn("parent-turn".to_string()); + tracker.handle_agentic_event(&AgenticEvent::TextChunk { + session_id: "child-session".to_string(), + turn_id: "child-turn".to_string(), + round_id: "round-1".to_string(), + text: "child text".to_string(), + subagent_parent_info, + }); + + assert_eq!(tracker.accumulated_text(), ""); + let snapshot = tracker + .snapshot_active_turn() + .expect("active turn snapshot"); + let items = snapshot.items.expect("subagent item"); + assert_eq!(items[0].content.as_deref(), Some("child text")); + assert_eq!(items[0].is_subagent, Some(true)); +} + +#[tokio::test] +async fn remote_connect_tracker_broadcasts_tool_and_turn_events() { + let tracker = RemoteSessionStateTracker::new("session-1".to_string()); + let mut events = tracker.subscribe(); + + tracker.handle_agentic_event(&AgenticEvent::DialogTurnStarted { + session_id: "session-1".to_string(), + turn_id: "turn-1".to_string(), + turn_index: 0, + user_input: "hello".to_string(), + original_user_input: None, + user_message_metadata: None, + subagent_parent_info: None, + }); + tracker.handle_agentic_event(&AgenticEvent::ToolEvent { + session_id: "session-1".to_string(), + turn_id: "turn-1".to_string(), + tool_event: ToolEventData::Started { + tool_id: "tool-1".to_string(), + tool_name: "AskUserQuestion".to_string(), + params: serde_json::json!({ "questions": [] }), + timeout_seconds: None, + }, + subagent_parent_info: None, + }); + tracker.handle_agentic_event(&AgenticEvent::DialogTurnCancelled { + session_id: "session-1".to_string(), + turn_id: "turn-1".to_string(), + subagent_parent_info: None, + }); + + match events.recv().await.expect("tool started event") { + TrackerEvent::ToolStarted { + tool_id, + tool_name, + params, + } => { + assert_eq!(tool_id, "tool-1"); + assert_eq!(tool_name, "AskUserQuestion"); + assert!(params.is_some()); + } + other => panic!("unexpected event: {other:?}"), + } + match events.recv().await.expect("turn cancelled event") { + TrackerEvent::TurnCancelled { turn_id } => assert_eq!(turn_id, "turn-1"), + other => panic!("unexpected event: {other:?}"), + } +}