diff --git a/.gitignore b/.gitignore index 57f5184a9..c4626b245 100644 --- a/.gitignore +++ b/.gitignore @@ -72,3 +72,4 @@ tests/e2e/reports/ ASSETS_LICENSES.md external/ +resources/omp/*.exe diff --git a/resources/claude-bridge/bridge.mjs b/resources/claude-bridge/bridge.mjs new file mode 100644 index 000000000..2e313a9aa --- /dev/null +++ b/resources/claude-bridge/bridge.mjs @@ -0,0 +1,226 @@ +#!/usr/bin/env node +/** + * Claude Agent SDK → BitFun JSONL bridge. + * + * Reads JSONL commands from stdin, writes JSONL events to stdout. + * Uses @anthropic-ai/claude-agent-sdk for agent execution. + * + * Command format (stdin, one JSON object per line): + * {"command":"prompt","text":"...","model":"...","workingDir":"..."} + * {"command":"abort"} + * + * Event format (stdout, one JSON object per line): + * {"type":"text_delta","delta":"..."} + * {"type":"thinking_delta","delta":"..."} + * {"type":"tool_call_start","tool_call_id":"...","tool_name":"..."} + * {"type":"tool_call_delta","tool_call_id":"...","delta":"..."} + * {"type":"tool_result","tool_call_id":"...","result":"..."} + * {"type":"turn_end","stopReason":"completed"} + * {"type":"error","message":"..."} + */ + +import { query } from '@anthropic-ai/claude-agent-sdk'; +import { createInterface } from 'node:readline'; + +// ── Message translation ───────────────────────────────────────────────────── + +/** + * Translate a Claude SDK message into one or more BitFun JSONL events. + * Returns an array of event objects (may be empty if the message type is unhandled). + */ +function translateMessage(msg) { + const events = []; + + // Case 1: stream_event — the SDK emits streaming content deltas + if (msg.type === 'stream_event' && msg.event) { + const ev = msg.event; + switch (ev.type) { + case 'content_block_start': { + const block = ev.content_block || ev.index != null ? ev : null; + if (block?.content_block?.type === 'tool_use') { + const tu = block.content_block; + events.push({ + type: 'tool_call_start', + tool_call_id: tu.id ?? '', + tool_name: tu.name ?? '', + }); + } + break; + } + case 'content_block_delta': { + const delta = ev.delta; + if (!delta) break; + if (delta.type === 'text_delta') { + events.push({ type: 'text_delta', delta: delta.text ?? '' }); + } else if (delta.type === 'input_json_delta') { + // Tool call argument streaming — emit as tool_call_delta + events.push({ + type: 'tool_call_delta', + tool_call_id: ev.index != null ? String(ev.index) : '', + delta: delta.partial_json ?? '', + }); + } else if (delta.type === 'thinking_delta') { + events.push({ + type: 'thinking_delta', + delta: delta.thinking ?? '', + }); + } else if (delta.type === 'signature_delta') { + // signature deltas are internal; skip + } + break; + } + case 'content_block_stop': { + // End of a content block; no event to emit + break; + } + default: + // Unknown stream event — log to stderr but don't fail + break; + } + return events; + } + + // Case 2: Full assistant message (non-streaming or final) + if (msg.type === 'assistant' && msg.message?.content) { + for (const block of msg.message.content) { + switch (block.type) { + case 'text': + events.push({ type: 'text_delta', delta: block.text ?? '' }); + break; + case 'tool_use': + events.push({ + type: 'tool_call_start', + tool_call_id: block.id ?? '', + tool_name: block.name ?? '', + }); + if (block.input) { + events.push({ + type: 'tool_call_delta', + tool_call_id: block.id ?? '', + delta: JSON.stringify(block.input), + }); + } + break; + case 'thinking': + events.push({ + type: 'thinking_delta', + delta: block.thinking ?? '', + }); + break; + default: + break; + } + } + return events; + } + + // Case 3: User message (tool results come back as user messages with tool_result blocks) + if (msg.type === 'user' && msg.message?.content) { + for (const block of msg.message.content) { + if (block.type === 'tool_result') { + const resultText = + typeof block.content === 'string' + ? block.content + : Array.isArray(block.content) + ? block.content.map(c => c.text ?? '').join('') + : JSON.stringify(block.content ?? ''); + events.push({ + type: 'tool_result', + tool_call_id: block.tool_use_id ?? '', + result: resultText, + }); + } + } + return events; + } + + // Case 4: Result message (end of query) + if (msg.type === 'result' || msg.subtype === 'success' || msg.result !== undefined) { + // This is emitted by the bridge after the loop, not here + return events; + } + + // Case 5: Error message + if (msg.type === 'error') { + events.push({ + type: 'error', + message: msg.message ?? msg.error ?? 'Unknown SDK error', + }); + return events; + } + + // Unknown message shape — log for debugging, but skip + return events; +} + +// ── Main loop ──────────────────────────────────────────────────────────────── + +async function main() { + const rl = createInterface({ + input: process.stdin, + crlfDelay: Infinity, + }); + + // Process commands line by line + for await (const line of rl) { + const trimmed = line.trim(); + if (!trimmed) continue; + + let cmd; + try { + cmd = JSON.parse(trimmed); + } catch { + process.stderr.write(`bridge: invalid JSON: ${trimmed}\n`); + continue; + } + + if (cmd.command === 'abort') { + process.exit(0); + } + + if (cmd.command !== 'prompt') { + process.stderr.write(`bridge: unknown command: ${cmd.command}\n`); + continue; + } + + // Build options for the Claude SDK + const options = {}; + if (cmd.model) options.model = cmd.model; + if (cmd.workingDir) options.workingDir = cmd.workingDir; + + try { + const messages = query({ + prompt: cmd.text, + options, + }); + + for await (const msg of messages) { + const events = translateMessage(msg); + for (const ev of events) { + process.stdout.write(JSON.stringify(ev) + '\n'); + } + } + + // Turn completed successfully + process.stdout.write( + JSON.stringify({ type: 'turn_end', stopReason: 'completed' }) + '\n', + ); + } catch (err) { + // Report error and end turn with error status + process.stdout.write( + JSON.stringify({ + type: 'error', + message: err.message ?? String(err), + }) + '\n', + ); + process.stdout.write( + JSON.stringify({ type: 'turn_end', stopReason: 'error' }) + '\n', + ); + } + } +} + +main().catch((err) => { + process.stderr.write(`bridge fatal: ${err.message}\n`); + process.exit(1); +}); diff --git a/resources/claude-bridge/package.json b/resources/claude-bridge/package.json new file mode 100644 index 000000000..d1f863fdd --- /dev/null +++ b/resources/claude-bridge/package.json @@ -0,0 +1,9 @@ +{ + "name": "claude-agent-bridge", + "version": "0.1.0", + "private": true, + "type": "module", + "dependencies": { + "@anthropic-ai/claude-agent-sdk": "^0.3.154" + } +} diff --git a/resources/omp/.gitkeep b/resources/omp/.gitkeep new file mode 100644 index 000000000..c59adeb7d --- /dev/null +++ b/resources/omp/.gitkeep @@ -0,0 +1 @@ +# placeholder - omp binary is bundled at build time diff --git a/src/apps/desktop/src/api/agentic_api.rs b/src/apps/desktop/src/api/agentic_api.rs index a4bfb841f..fc1a2832e 100644 --- a/src/apps/desktop/src/api/agentic_api.rs +++ b/src/apps/desktop/src/api/agentic_api.rs @@ -80,6 +80,13 @@ pub struct UpdateSessionModelRequest { pub model_name: String, } +#[derive(Debug, Clone, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UpdateSessionRuntimeRequest { + pub session_id: String, + pub runtime_id: String, +} + #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct UpdateSessionTitleRequest { @@ -209,6 +216,8 @@ pub struct SessionResponse { pub state: String, pub turn_count: usize, pub created_at: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub runtime_id: Option, } #[derive(Debug, Serialize)] @@ -590,6 +599,7 @@ pub async fn create_session( remote_connection_id: remote_conn.clone(), remote_ssh_host: remote_ssh_host.clone(), model_id: c.model_name, + runtime_id: None, }) .unwrap_or(SessionConfig { workspace_path: Some(request.workspace_path.clone()), @@ -658,6 +668,17 @@ pub async fn update_session_model( .map_err(|e| format!("Failed to update session model: {}", e)) } +#[tauri::command] +pub async fn update_session_runtime( + coordinator: State<'_, Arc>, + request: UpdateSessionRuntimeRequest, +) -> Result<(), String> { + coordinator + .update_session_runtime(&request.session_id, &request.runtime_id) + .await + .map_err(|e| format!("Failed to update session runtime: {}", e)) +} + #[tauri::command] pub async fn update_session_title( coordinator: State<'_, Arc>, @@ -1469,6 +1490,7 @@ pub async fn list_sessions( state: format!("{:?}", summary.state), turn_count: summary.turn_count, created_at: system_time_to_unix_secs(summary.created_at), + runtime_id: None, }) .collect(); @@ -1628,6 +1650,7 @@ fn session_to_response(session: Session) -> SessionResponse { state: format!("{:?}", session.state), turn_count: session.dialog_turn_ids.len(), created_at: system_time_to_unix_secs(session.created_at), + runtime_id: session.config.runtime_id.clone(), } } diff --git a/src/apps/desktop/src/api/runtime_api.rs b/src/apps/desktop/src/api/runtime_api.rs index 6ec8531fc..b485a6da1 100644 --- a/src/apps/desktop/src/api/runtime_api.rs +++ b/src/apps/desktop/src/api/runtime_api.rs @@ -1,6 +1,7 @@ //! Runtime capability API use crate::api::app_state::AppState; +use bitfun_core::runtime_ports::registry::get_global_runtime_registry; use bitfun_core::service::runtime::{RuntimeCommandCapability, RuntimeManager}; use tauri::State; @@ -11,3 +12,98 @@ pub async fn get_runtime_capabilities( let manager = RuntimeManager::new().map_err(|e| e.to_string())?; Ok(manager.get_capabilities()) } + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct AgentRuntimeDto { + pub id: String, + pub display_name: String, + pub description: String, + pub available: bool, + pub error: Option, + pub supports_steer: bool, + pub supports_thinking: bool, + pub autonomous_tools: bool, +} + +#[tauri::command] +pub async fn list_agent_runtimes() -> Result, String> { + let registry = get_global_runtime_registry(); + let registered = registry.list_all(); + log::info!("[runtime_api] registry has {} runtimes", registered.len()); + + if registered.is_empty() { + log::warn!("[runtime_api] registry empty, returning fallback"); + return Ok(fallback_runtimes()); + } + + // Health check each runtime individually — one failure must not break the rest. + let mut result = Vec::new(); + for rt in registered.iter() { + let (available, error) = match rt.health_check().await { + Ok(()) => { + log::info!("[runtime_api] health_check {} => OK", rt.id()); + (true, None) + } + Err(e) => { + log::info!("[runtime_api] health_check {} => ERR: {}", rt.id(), e.message); + (false, Some(e.message.clone())) + } + }; + result.push(AgentRuntimeDto { + id: rt.id().to_string(), + display_name: rt.display_name().to_string(), + description: rt.capabilities().description.clone(), + available, + error, + supports_steer: rt.capabilities().supports_steer, + supports_thinking: rt.capabilities().supports_thinking, + autonomous_tools: rt.capabilities().autonomous_tools, + }); + } + log::info!("[runtime_api] returning {} runtimes", result.len()); + Ok(result) +} + +fn fallback_runtimes() -> Vec { + vec![ + AgentRuntimeDto { + id: "bitfun".into(), + display_name: "BitFun Native".into(), + description: "BitFun built-in agent runtime".into(), + available: true, + error: None, + supports_steer: false, + supports_thinking: true, + autonomous_tools: false, + }, + AgentRuntimeDto { + id: "omp".into(), + display_name: "OMP (Oh My Pi)".into(), + description: "OMP agent runtime via RPC subprocess".into(), + available: false, + error: Some("bundled binary not found".into()), + supports_steer: true, + supports_thinking: true, + autonomous_tools: true, + }, + AgentRuntimeDto { + id: "claude".into(), + display_name: "Claude Agent SDK".into(), + description: "Claude Agent SDK via Node.js bridge".into(), + available: false, + error: Some("SDK not found".into()), + supports_steer: false, + supports_thinking: true, + autonomous_tools: true, + }, + ] +} + +#[tauri::command] +pub async fn get_default_agent_runtime() -> Result { + let runtimes = list_agent_runtimes().await?; + let default = runtimes.iter().find(|r| r.available) + .unwrap_or_else(|| runtimes.first().unwrap()); + Ok(default.clone()) +} diff --git a/src/apps/desktop/src/lib.rs b/src/apps/desktop/src/lib.rs index 3c33cc8fc..d60c83e19 100644 --- a/src/apps/desktop/src/lib.rs +++ b/src/apps/desktop/src/lib.rs @@ -692,6 +692,9 @@ pub async fn run() { get_runtime_logging_info, export_diagnostics_bundle, get_runtime_capabilities, + api::runtime_api::list_agent_runtimes, + api::runtime_api::get_default_agent_runtime, + api::agentic_api::update_session_runtime, get_agent_profile_configs, get_agent_profile_config, set_agent_profile_config, @@ -1213,6 +1216,28 @@ async fn init_agentic_system() -> anyhow::Result<( cron_service.start(); log::info!("Cron service initialized and subscriber registered"); + + // Register agent runtime adapters in the global registry + { + use bitfun_core::agentic::runtime_adapters::bitfun_runtime::BitfunRuntime; + use bitfun_core::agentic::runtime_adapters::omp_runtime::OmpRuntime; + use bitfun_core::agentic::runtime_adapters::claude_runtime::ClaudeRuntime; + use bitfun_core::agentic::system::AgenticSystem; + use bitfun_core::runtime_ports::registry::{init_global_runtime_registry, RuntimeRegistry}; + + let agentic_system = AgenticSystem { + coordinator: coordinator.clone(), + event_queue: event_queue.clone(), + token_usage_service: token_usage_service.clone(), + }; + let mut registry = RuntimeRegistry::new(); + registry.register(std::sync::Arc::new(BitfunRuntime::new(std::sync::Arc::new(agentic_system)))); + registry.register(std::sync::Arc::new(OmpRuntime::new())); + registry.register(std::sync::Arc::new(ClaudeRuntime::new())); + init_global_runtime_registry(registry); + log::info!("Agent runtime registry initialized (BitFun, OMP, Claude)"); + } + log::info!("Agentic system initialized"); Ok(( coordinator, diff --git a/src/apps/desktop/tauri.conf.json b/src/apps/desktop/tauri.conf.json index b11c6b4bc..4321f6bbb 100644 --- a/src/apps/desktop/tauri.conf.json +++ b/src/apps/desktop/tauri.conf.json @@ -18,7 +18,9 @@ ], "resources": { "../../mobile-web/dist": "mobile-web/dist", - "resources/worker_host.js": "resources/worker_host.js" + "resources/worker_host.js": "resources/worker_host.js", + "../../../resources/claude-bridge": "resources/claude-bridge", + "../../../resources/omp": "resources/omp" }, "linux": { "deb": { diff --git a/src/crates/core/src/agentic/coordination/coordinator.rs b/src/crates/core/src/agentic/coordination/coordinator.rs index f39b75b1d..3ac57a099 100644 --- a/src/crates/core/src/agentic/coordination/coordinator.rs +++ b/src/crates/core/src/agentic/coordination/coordinator.rs @@ -1033,7 +1033,6 @@ Update the persona files and delete BOOTSTRAP.md as soon as bootstrap is complet ); Ok(()) } - /// Create a new session pub async fn create_session( &self, @@ -1123,6 +1122,19 @@ Update the persona files and delete BOOTSTRAP.md as soon as bootstrap is complet Ok(()) } + pub async fn update_session_runtime(&self, session_id: &str, runtime_id: &str) -> BitFunResult<()> { + self.session_manager + .update_session_runtime_id(session_id, runtime_id) + .await?; + + info!( + "Coordinator updated session runtime: session_id={}, runtime_id={}", + session_id, runtime_id + ); + + Ok(()) + } + /// Create a new session with explicit creator identity. pub async fn create_session_with_workspace_and_creator( &self, diff --git a/src/crates/core/src/agentic/core/session.rs b/src/crates/core/src/agentic/core/session.rs index b677c0a1a..1ec0131ed 100644 --- a/src/crates/core/src/agentic/core/session.rs +++ b/src/crates/core/src/agentic/core/session.rs @@ -162,6 +162,9 @@ pub struct SessionConfig { /// Model config ID used by this session (for token usage tracking) #[serde(default, skip_serializing_if = "Option::is_none")] pub model_id: Option, + /// Runtime ID used by this session (for runtime-specific behavior) + #[serde(default, skip_serializing_if = "Option::is_none")] + pub runtime_id: Option, } impl Default for SessionConfig { @@ -179,6 +182,7 @@ impl Default for SessionConfig { remote_connection_id: None, remote_ssh_host: None, model_id: None, + runtime_id: None, } } } diff --git a/src/crates/core/src/agentic/mod.rs b/src/crates/core/src/agentic/mod.rs index cc4b52316..c196374dd 100644 --- a/src/crates/core/src/agentic/mod.rs +++ b/src/crates/core/src/agentic/mod.rs @@ -44,6 +44,7 @@ pub mod system; // Agents module pub mod agents; pub mod workspace; +pub mod runtime_adapters; mod util; diff --git a/src/crates/core/src/agentic/runtime_adapters/bitfun_runtime.rs b/src/crates/core/src/agentic/runtime_adapters/bitfun_runtime.rs new file mode 100644 index 000000000..769b549a7 --- /dev/null +++ b/src/crates/core/src/agentic/runtime_adapters/bitfun_runtime.rs @@ -0,0 +1,392 @@ +//! BitFun native runtime adapter. +//! +//! Wraps the in-process AgenticSystem so it can present the same +//! AgentRuntime / AgentSession interface that external runtimes (OMP, Claude) +//! expose through the runtime-ports abstraction. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use async_trait::async_trait; +use tokio::sync::{broadcast, mpsc}; +use uuid::Uuid; + +use bitfun_runtime_ports::agent_runtime::{ + AgentEvent, AgentEventStream, AgentRuntime, AgentSession, RuntimeCapabilities, SessionConfig, + StopReason, +}; +use bitfun_runtime_ports::{ + AgentInputAttachment, AgentSubmissionPort, AgentSubmissionRequest, AgentSubmissionSource, + AgentTurnCancellationPort, AgentTurnCancellationRequest, PortError, PortErrorKind, PortResult, +}; + +use crate::agentic::core::SessionConfig as CoreSessionConfig; +use crate::agentic::events::AgenticEvent; +use crate::agentic::system::AgenticSystem; + +// --------------------------------------------------------------------------- +// BitfunRuntime +// --------------------------------------------------------------------------- + +/// Wraps the in-process AgenticSystem behind the AgentRuntime trait. +pub struct BitfunRuntime { + system: Arc, + capabilities: RuntimeCapabilities, +} + +impl BitfunRuntime { + pub fn new(system: Arc) -> Self { + Self { + system, + capabilities: RuntimeCapabilities { + description: "BitFun's built-in agent runtime".to_string(), + supports_steer: false, + supports_thinking: true, + autonomous_tools: false, + extras: HashMap::new(), + }, + } + } +} + +fn to_port_error(e: crate::util::errors::BitFunError) -> PortError { + PortError::new(PortErrorKind::Backend, e.to_string()) +} + +#[async_trait] +impl AgentRuntime for BitfunRuntime { + fn id(&self) -> &str { + "bitfun" + } + + fn display_name(&self) -> &str { + "BitFun Native" + } + + fn capabilities(&self) -> &RuntimeCapabilities { + &self.capabilities + } + + async fn create_session(&self, config: SessionConfig) -> PortResult> { + let session_id = Uuid::new_v4().to_string(); + + let workspace_path = config.working_dir.clone().ok_or_else(|| { + PortError::new( + PortErrorKind::InvalidRequest, + "working_dir is required to create a BitFun session", + ) + })?; + + let core_config = CoreSessionConfig { + workspace_path: Some(workspace_path), + ..CoreSessionConfig::default() + }; + + self.system + .coordinator + .create_session( + format!("BitFun adapter session {}", &session_id[..8]), + "bitfun".to_string(), + core_config, + ) + .await + .map_err(to_port_error)?; + + Ok(Box::new(BitfunSession { + session_id, + system: self.system.clone(), + working_dir: config.working_dir, + })) + } + + async fn health_check(&self) -> PortResult<()> { + Ok(()) + } +} + +// --------------------------------------------------------------------------- +// BitfunSession +// --------------------------------------------------------------------------- + +pub struct BitfunSession { + session_id: String, + system: Arc, + #[allow(dead_code)] + working_dir: Option, +} + +impl BitfunSession { + /// Extract the turn_id (turn_id) from an AgenticEvent, if present. + fn get_turn_id(event: &AgenticEvent) -> Option<&str> { + match event { + AgenticEvent::DialogTurnStarted { turn_id, .. } + | AgenticEvent::DialogTurnCompleted { turn_id, .. } + | AgenticEvent::DialogTurnCancelled { turn_id, .. } + | AgenticEvent::DialogTurnFailed { turn_id, .. } + | AgenticEvent::ModelRoundStarted { turn_id, .. } + | AgenticEvent::ModelRoundCompleted { turn_id, .. } + | AgenticEvent::TextChunk { turn_id, .. } + | AgenticEvent::ThinkingChunk { turn_id, .. } + | AgenticEvent::ToolEvent { turn_id, .. } + | AgenticEvent::UserSteeringInjected { turn_id, .. } => { + Some(turn_id.as_str()) + } + _ => None, + } + } + + /// Map an AgenticEvent to zero or more AgentEvent variants. + fn map_event( + event: &AgenticEvent, + seen_tool_starts: &mut HashSet, + ) -> Vec { + match event { + AgenticEvent::DialogTurnStarted { .. } => { + vec![AgentEvent::TurnStart { + metadata: HashMap::new(), + }] + } + + AgenticEvent::TextChunk { text, .. } => { + vec![AgentEvent::TextDelta { + delta: text.clone(), + metadata: HashMap::new(), + }] + } + + AgenticEvent::ThinkingChunk { content, .. } => { + vec![AgentEvent::ThinkingDelta { + delta: content.clone(), + metadata: HashMap::new(), + }] + } + + AgenticEvent::ToolEvent { tool_event, .. } => { + Self::map_tool_event(tool_event, seen_tool_starts) + } + + AgenticEvent::DialogTurnCompleted { .. } => { + vec![AgentEvent::TurnEnd { + stop_reason: StopReason::Completed, + metadata: HashMap::new(), + }] + } + + AgenticEvent::DialogTurnCancelled { .. } => { + vec![AgentEvent::TurnEnd { + stop_reason: StopReason::Aborted, + metadata: HashMap::new(), + }] + } + + AgenticEvent::DialogTurnFailed { error, .. } => { + vec![AgentEvent::Error { + message: error.clone(), + metadata: HashMap::new(), + }] + } + + AgenticEvent::SystemError { error, .. } => { + vec![AgentEvent::Error { + message: error.clone(), + metadata: HashMap::new(), + }] + } + + // Non-user-visible events — silently drop. + _ => vec![], + } + } + + fn map_tool_event( + tool_event: &bitfun_events::agentic::ToolEventData, + seen_tool_starts: &mut HashSet, + ) -> Vec { + use bitfun_events::agentic::ToolEventData::*; + + match tool_event { + EarlyDetected { tool_id, tool_name, .. } + | Started { tool_id, tool_name, .. } => { + if seen_tool_starts.insert(tool_id.clone()) { + vec![AgentEvent::ToolCallStart { + tool_call_id: tool_id.clone(), + tool_name: tool_name.clone(), + metadata: HashMap::new(), + }] + } else { + vec![] + } + } + + ParamsPartial { tool_id, params, .. } => { + vec![AgentEvent::ToolCallDelta { + tool_call_id: tool_id.clone(), + delta: params.clone(), + metadata: HashMap::new(), + }] + } + + Progress { tool_id, message, .. } => { + vec![AgentEvent::ToolCallDelta { + tool_call_id: tool_id.clone(), + delta: message.clone(), + metadata: HashMap::new(), + }] + } + + StreamChunk { tool_id, data, .. } => { + vec![AgentEvent::ToolCallDelta { + tool_call_id: tool_id.clone(), + delta: serde_json::to_string(data).unwrap_or_default(), + metadata: HashMap::new(), + }] + } + + Completed { tool_id, result, .. } => { + vec![AgentEvent::ToolResult { + tool_call_id: tool_id.clone(), + result: serde_json::to_string(result).unwrap_or_default(), + metadata: HashMap::new(), + }] + } + + Failed { tool_id, error, .. } => { + vec![AgentEvent::ToolResult { + tool_call_id: tool_id.clone(), + result: format!("Error: {}", error), + metadata: HashMap::new(), + }] + } + + Cancelled { tool_id, reason, .. } => { + vec![AgentEvent::ToolResult { + tool_call_id: tool_id.clone(), + result: format!("Cancelled: {}", reason), + metadata: HashMap::new(), + }] + } + + Queued { .. } + | Waiting { .. } + | Streaming { .. } + | ConfirmationNeeded { .. } + | Confirmed { .. } + | Rejected { .. } => vec![], + } + } +} + +#[async_trait] +impl AgentSession for BitfunSession { + fn session_id(&self) -> &str { + &self.session_id + } + + async fn prompt( + &self, + input: &str, + attachments: Vec, + ) -> PortResult { + // 1. Subscribe to the event queue BEFORE submitting. + let mut broadcast_rx = self.system.event_queue.subscribe(); + + // 2. Submit the message through the coordinator. + let submission_result = self + .system + .coordinator + .submit_message(AgentSubmissionRequest { + session_id: self.session_id.clone(), + message: input.to_string(), + turn_id: None, + source: Some(AgentSubmissionSource::AgentSession), + attachments, + metadata: serde_json::Map::new(), + }) + .await + ?; + + let turn_id = submission_result.turn_id; + + // 3. Bridge broadcast events into a mpsc stream. + let (tx, rx) = mpsc::channel::(256); + let session_id = self.session_id.clone(); + + tokio::spawn(async move { + let mut seen_tool_starts: HashSet = HashSet::new(); + let mut turn_ended = false; + + loop { + match broadcast_rx.recv().await { + Ok(envelope) => { + let event = &envelope.event; + + // Filter: only events for our session. + let event_session_id = event.session_id().unwrap_or(""); + if event_session_id != session_id { + continue; + } + + // Filter: only events for our turn (if we have a turn_id). + if !turn_id.is_empty() { + if let Some(event_tid) = Self::get_turn_id(event) { + if event_tid != turn_id { + continue; + } + } + } + + let mapped = Self::map_event(event, &mut seen_tool_starts); + + for agent_event in mapped { + let is_terminal = matches!( + &agent_event, + AgentEvent::TurnEnd { .. } | AgentEvent::Error { .. } + ); + + if tx.send(agent_event).await.is_err() { + return; // Receiver dropped + } + + if is_terminal { + turn_ended = true; + } + } + + if turn_ended { + return; + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + log::warn!("BitfunSession broadcast receiver lagged by {} messages", n); + } + Err(broadcast::error::RecvError::Closed) => { + return; + } + } + } + }); + + // 4. Return the stream. + let stream = tokio_stream::wrappers::ReceiverStream::new(rx); + Ok(Box::pin(stream)) + } + + async fn abort(&self) -> PortResult<()> { + self.system + .coordinator + .cancel_turn(AgentTurnCancellationRequest { + session_id: self.session_id.clone(), + turn_id: None, + source: Some(AgentSubmissionSource::AgentSession), + reason: Some("user abort".to_string()), + wait_timeout_ms: None, + }) + .await + ?; + Ok(()) + } + + async fn dispose(self: Box) -> PortResult<()> { + Ok(()) + } +} diff --git a/src/crates/core/src/agentic/runtime_adapters/claude_runtime.rs b/src/crates/core/src/agentic/runtime_adapters/claude_runtime.rs new file mode 100644 index 000000000..40926ea0d --- /dev/null +++ b/src/crates/core/src/agentic/runtime_adapters/claude_runtime.rs @@ -0,0 +1,426 @@ +//! Claude Agent SDK runtime adapter. +//! +//! Spawns a Node.js bridge process that wraps @anthropic-ai/claude-agent-sdk. +//! Communication is JSONL over stdio: commands go to stdin, events come from stdout. + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; + +use async_trait::async_trait; +use serde_json::Value; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::process::{Child, ChildStdin, Command}; +use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; + +use bitfun_runtime_ports::agent_runtime::{ + AgentEvent, AgentRuntime, AgentSession, AgentEventStream, RuntimeCapabilities, SessionConfig, + StopReason, +}; +use bitfun_runtime_ports::{PortError, PortErrorKind, PortResult}; + +// ── Runtime ────────────────────────────────────────────────────────────────── + +pub struct ClaudeRuntime; + +impl ClaudeRuntime { + pub fn new() -> Self { + Self + } + + /// Resolve the path to `bridge.mjs` at runtime. + /// + /// Tries several locations in order: + /// 1. Relative to the workspace root (development) + /// 2. Relative to the current executable (bundled) + fn bridge_path() -> PathBuf { + // Development: resolve from CARGO_MANIFEST_DIR + let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let workspace_root = manifest_dir.join("../../.."); + let dev_path = workspace_root.join("resources/claude-bridge/bridge.mjs"); + if dev_path.exists() { + return dev_path; + } + + // Bundled: relative to the current executable + if let Ok(exe) = std::env::current_exe() { + if let Some(parent) = exe.parent() { + let bundled = parent.join("resources/claude-bridge/bridge.mjs"); + if bundled.exists() { + return bundled; + } + // macOS .app bundle + let macos = parent.join("../Resources/claude-bridge/bridge.mjs"); + if macos.exists() { + return macos; + } + } + } + + // Fallback: return the dev path anyway (will fail later with a clear error) + dev_path + } +} + +#[async_trait] +impl AgentRuntime for ClaudeRuntime { + fn id(&self) -> &str { + "claude" + } + + fn display_name(&self) -> &str { + "Claude Agent SDK" + } + + fn capabilities(&self) -> &RuntimeCapabilities { + // Stored as a static to avoid allocating on every call + static CAPS: std::sync::OnceLock = std::sync::OnceLock::new(); + CAPS.get_or_init(|| RuntimeCapabilities { + description: "Claude Agent SDK via Node.js bridge".into(), + supports_steer: false, + supports_thinking: true, + autonomous_tools: true, + extras: HashMap::new(), + }) + } + + async fn create_session( + &self, + config: SessionConfig, + ) -> PortResult> { + let bridge = Self::bridge_path(); + if !bridge.exists() { + return Err(PortError::new( + PortErrorKind::NotFound, + format!( + "Claude bridge script not found at {}", + bridge.display() + ), + )); + } + // Pre-flight: verify Node.js is available + if which::which("node").is_err() { + return Err(PortError::new( + PortErrorKind::NotAvailable, + "Node.js is not installed. Install Node.js to use the Claude Agent SDK runtime.".to_string(), + )); + } + // Pre-flight: verify ANTHROPIC_API_KEY + if std::env::var("ANTHROPIC_API_KEY").is_err() { + return Err(PortError::new( + PortErrorKind::PermissionDenied, + "ANTHROPIC_API_KEY environment variable not set. Set it to use the Claude Agent SDK runtime.".to_string(), + )); + } + + // Determine the working directory + let working_dir = config + .working_dir + .as_deref() + .unwrap_or(".") + .to_string(); + + let mut child = Command::new("node") + .arg(bridge.to_str().ok_or_else(|| { + PortError::new( + PortErrorKind::Backend, + "bridge path contains invalid UTF-8", + ) + })?) + .current_dir(&working_dir) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::inherit()) + .spawn() + .map_err(|e| { + PortError::new( + PortErrorKind::Backend, + format!("Failed to spawn Node.js bridge process: {e}"), + ) + })?; + + let stdin = child.stdin.take().ok_or_else(|| { + PortError::new(PortErrorKind::Backend, "bridge did not provide stdin") + })?; + let stdout = child.stdout.take().ok_or_else(|| { + PortError::new(PortErrorKind::Backend, "bridge did not provide stdout") + })?; + + let session_id = uuid::Uuid::new_v4().to_string(); + let abort_token = CancellationToken::new(); + let abort_clone = abort_token.clone(); + + // Shared channel sender — the background reader task routes events here + let event_tx: Arc>>> = + Arc::new(Mutex::new(None)); + let event_tx_bg = event_tx.clone(); + + // Spawn long-lived stdout reader task + tokio::spawn(async move { + let mut reader = BufReader::new(stdout); + let mut line = String::new(); + loop { + tokio::select! { + _ = abort_clone.cancelled() => break, + result = reader.read_line(&mut line) => { + match result { + Ok(0) => { + // EOF — child process exited + break; + } + Ok(_) => { + let trimmed = line.trim().to_string(); + line.clear(); + if trimmed.is_empty() { + continue; + } + if let Ok(event) = serde_json::from_str::(&trimmed) { + if let Some(agent_event) = translate_bridge_event(&event) { + let guard = event_tx_bg.lock().await; + if let Some(ref tx) = *guard { + let _ = tx.send(agent_event).await; + } + } + } + } + Err(_) => { + // Read error — child likely exited + break; + } + } + } + } + } + }); + + Ok(Box::new(ClaudeSession { + session_id, + stdin: Mutex::new(stdin), + child: Mutex::new(child), + abort_token, + event_tx, + model_id: config.model_id, + working_dir, + })) + } + + async fn health_check(&self) -> PortResult<()> { + // Only verify the bridge script exists — it is bundled with the app. + // Node.js availability and ANTHROPIC_API_KEY are checked at session + // creation time so the runtime always shows as "available" in the UI. + let bridge = Self::bridge_path(); + if !bridge.exists() { + return Err(PortError::new( + PortErrorKind::NotFound, + format!( + "Claude bridge script not found at {}", + bridge.display() + ), + )); + } + Ok(()) + } + + async fn shutdown(&self) -> PortResult<()> { + Ok(()) + } +} + +// ── Session ────────────────────────────────────────────────────────────────── + +pub struct ClaudeSession { + session_id: String, + stdin: Mutex, + child: Mutex, + abort_token: CancellationToken, + /// Current turn's event sender. Set by `prompt()`, consumed by the background reader task. + event_tx: Arc>>>, + /// Model override for this session. + model_id: Option, + /// Working directory for this session. + working_dir: String, +} + +#[async_trait] +impl AgentSession for ClaudeSession { + fn session_id(&self) -> &str { + &self.session_id + } + + async fn prompt( + &self, + input: &str, + _attachments: Vec, + ) -> PortResult { + // Create a fresh channel for this turn + let (tx, rx) = tokio::sync::mpsc::channel(64); + + // Register as the active event target + { + let mut guard = self.event_tx.lock().await; + *guard = Some(tx); + } + + // Build the command + let mut cmd = serde_json::json!({ + "command": "prompt", + "text": input, + "workingDir": self.working_dir, + }); + if let Some(ref model) = self.model_id { + cmd["model"] = Value::String(model.clone()); + } + let cmd_line = format!("{}\n", cmd); + + // Write to bridge stdin + { + let mut stdin = self.stdin.lock().await; + stdin + .write_all(cmd_line.as_bytes()) + .await + .map_err(|e| PortError::new(PortErrorKind::Backend, format!("stdin write failed: {e}")))?; + } + + Ok(Box::pin( + tokio_stream::wrappers::ReceiverStream::new(rx), + )) + } + + async fn abort(&self) -> PortResult<()> { + self.abort_token.cancel(); + + let mut child = self.child.lock().await; + child + .kill() + .await + .map_err(|e| PortError::new(PortErrorKind::Backend, format!("failed to kill child: {e}")))?; + + Ok(()) + } + + async fn dispose(self: Box) -> PortResult<()> { + self.abort_token.cancel(); + + let mut child = self.child.lock().await; + child + .kill() + .await + .map_err(|e| PortError::new(PortErrorKind::Backend, format!("failed to kill child: {e}")))?; + + Ok(()) + } +} + +// ── Event translation ──────────────────────────────────────────────────────── + +/// Translate a single bridge JSONL event into an `AgentEvent`. +/// +/// Returns `None` for bridge events that have no corresponding `AgentEvent` +/// (e.g. unknown types or internal events). +fn translate_bridge_event(val: &Value) -> Option { + let event_type = val.get("type")?.as_str()?; + + match event_type { + "text_delta" => { + let delta = val + .get("delta") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + Some(AgentEvent::TextDelta { + delta, + metadata: HashMap::new(), + }) + } + "thinking_delta" => { + let delta = val + .get("delta") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + Some(AgentEvent::ThinkingDelta { + delta, + metadata: HashMap::new(), + }) + } + "tool_call_start" => { + let tool_call_id = val + .get("tool_call_id") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + let tool_name = val + .get("tool_name") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + Some(AgentEvent::ToolCallStart { + tool_call_id, + tool_name, + metadata: HashMap::new(), + }) + } + "tool_call_delta" => { + let tool_call_id = val + .get("tool_call_id") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + let delta = val + .get("delta") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + Some(AgentEvent::ToolCallDelta { + tool_call_id, + delta, + metadata: HashMap::new(), + }) + } + "tool_result" => { + let tool_call_id = val + .get("tool_call_id") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + let result = val + .get("result") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + Some(AgentEvent::ToolResult { + tool_call_id, + result, + metadata: HashMap::new(), + }) + } + "turn_end" => { + let stop_reason = match val.get("stopReason").and_then(|v| v.as_str()) { + Some("completed") => StopReason::Completed, + Some("aborted") => StopReason::Aborted, + Some("error") => StopReason::Error, + _ => StopReason::Completed, + }; + Some(AgentEvent::TurnEnd { + stop_reason, + metadata: HashMap::new(), + }) + } + "error" => { + let message = val + .get("message") + .and_then(|v| v.as_str()) + .unwrap_or("Unknown bridge error") + .to_string(); + Some(AgentEvent::Error { + message, + metadata: HashMap::new(), + }) + } + _ => { + // Unknown event type — skip + None + } + } +} diff --git a/src/crates/core/src/agentic/runtime_adapters/mod.rs b/src/crates/core/src/agentic/runtime_adapters/mod.rs new file mode 100644 index 000000000..ab445a131 --- /dev/null +++ b/src/crates/core/src/agentic/runtime_adapters/mod.rs @@ -0,0 +1,3 @@ +pub mod bitfun_runtime; +pub mod claude_runtime; +pub mod omp_runtime; diff --git a/src/crates/core/src/agentic/runtime_adapters/omp_runtime.rs b/src/crates/core/src/agentic/runtime_adapters/omp_runtime.rs new file mode 100644 index 000000000..6c4372af9 --- /dev/null +++ b/src/crates/core/src/agentic/runtime_adapters/omp_runtime.rs @@ -0,0 +1,369 @@ +//! OMP (Oh My Pi) runtime adapter. +//! +//! Communicates with the `omp` binary via JSONL over stdin/stdout in RPC mode. +//! Each `OmpSession` spawns a dedicated `omp --mode rpc --no-session` subprocess. +//! +//! Binary resolution order: +//! 1. Bundled: `/../resources/omp/omp<.exe>` (Tauri resource layout) +//! 2. PATH: system-installed `omp` + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use bitfun_runtime_ports::agent_runtime::{ + AgentEvent, AgentEventStream, AgentRuntime, AgentSession, RuntimeCapabilities, + SessionConfig, StopReason, +}; +use bitfun_runtime_ports::{AgentInputAttachment, PortError, PortErrorKind, PortResult}; +use serde_json::Value; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::process::{Child, ChildStdin, ChildStdout, Command}; +use tokio::sync::{mpsc, Mutex}; +use tokio_util::sync::CancellationToken; +use uuid::Uuid; + +// --------------------------------------------------------------------------- +// Binary resolution +// --------------------------------------------------------------------------- + +/// Resolve the omp binary path. +/// Order: bundled resource → PATH lookup. +fn resolve_omp_binary() -> Option { + let exe = std::env::current_exe().ok()?; + + // Tauri resource layout: /bin/bitfun-desktop.exe + // Resources are at: /resources/omp/omp.exe + // In dev: target/debug/bitfun-desktop.exe + // Resources at: /resources/omp/omp.exe + let exe_dir = exe.parent()?; + + // Try 1: /../resources/omp/omp<.exe> + if let Some(parent) = exe_dir.parent() { + let bundled = parent.join("resources").join("omp").join("omp"); + if bundled.exists() { + return Some(bundled); + } + #[cfg(windows)] + { + let bundled_exe = parent.join("resources").join("omp").join("omp.exe"); + if bundled_exe.exists() { + return Some(bundled_exe); + } + } + } + + // Try 2: /../../resources/omp/omp<.exe> (deeper nesting, some installers) + if let Some(grandparent) = exe_dir.parent().and_then(|p| p.parent()) { + let bundled = grandparent.join("resources").join("omp").join("omp"); + if bundled.exists() { + return Some(bundled); + } + #[cfg(windows)] + { + let bundled_exe = grandparent.join("resources").join("omp").join("omp.exe"); + if bundled_exe.exists() { + return Some(bundled_exe); + } + } + } + + // Try 3: PATH + which::which("omp").ok() +} + +// --------------------------------------------------------------------------- +// OmpRuntime +// --------------------------------------------------------------------------- + +/// OMP runtime provider. Resolves bundled or system omp binary. +pub struct OmpRuntime { + capabilities: RuntimeCapabilities, +} + +impl OmpRuntime { + pub fn new() -> Self { + Self { + capabilities: RuntimeCapabilities { + description: "OMP agent runtime via RPC subprocess".to_string(), + supports_steer: true, + supports_thinking: true, + autonomous_tools: true, + extras: HashMap::new(), + }, + } + } +} + +impl Default for OmpRuntime { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl AgentRuntime for OmpRuntime { + fn id(&self) -> &str { + "omp" + } + + fn display_name(&self) -> &str { + "OMP (Oh My Pi)" + } + + fn capabilities(&self) -> &RuntimeCapabilities { + &self.capabilities + } + + async fn create_session(&self, _config: SessionConfig) -> PortResult> { + let omp_path = resolve_omp_binary().ok_or_else(|| { + PortError::new( + PortErrorKind::NotFound, + "omp binary not found: not bundled and not in PATH".to_string(), + ) + })?; + + let mut child = Command::new(&omp_path) + .args(["--mode", "rpc", "--no-session"]) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::inherit()) + .kill_on_drop(true) + .spawn() + .map_err(|e| { + PortError::new( + PortErrorKind::Backend, + format!("failed to spawn omp subprocess ({}): {}", omp_path.display(), e), + ) + })?; + + let stdin = child + .stdin + .take() + .ok_or_else(|| PortError::new(PortErrorKind::Backend, "omp stdin not available"))?; + + let stdout = child + .stdout + .take() + .ok_or_else(|| PortError::new(PortErrorKind::Backend, "omp stdout not available"))?; + + let session_id = Uuid::new_v4().to_string(); + + Ok(Box::new(OmpSession { + session_id, + child: Mutex::new(child), + stdin: Mutex::new(stdin), + stdout: Arc::new(Mutex::new(BufReader::new(stdout))), + abort_token: CancellationToken::new(), + })) + } + + async fn health_check(&self) -> PortResult<()> { + match resolve_omp_binary() { + Some(path) => { + log::info!("[OmpRuntime] found omp at: {}", path.display()); + Ok(()) + } + None => Err(PortError::new( + PortErrorKind::NotFound, + "omp binary not found: not bundled and not in PATH", + )), + } + } + + async fn shutdown(&self) -> PortResult<()> { + Ok(()) + } +} + +// --------------------------------------------------------------------------- +// OmpSession +// --------------------------------------------------------------------------- + +pub struct OmpSession { + session_id: String, + child: Mutex, + stdin: Mutex, + stdout: Arc>>, + abort_token: CancellationToken, +} + +impl OmpSession { + /// Write a JSON command as a single JSONL line to stdin. + async fn write_command(stdin: &Mutex, cmd: &Value) -> Result<(), PortError> { + let mut line = serde_json::to_string(cmd).map_err(|e| { + PortError::new(PortErrorKind::Backend, format!("serialize error: {}", e)) + })?; + line.push('\n'); + + let mut writer = stdin.lock().await; + writer.write_all(line.as_bytes()).await.map_err(|e| { + PortError::new(PortErrorKind::Backend, format!("stdin write error: {}", e)) + }) + } + + /// Spawn a background task reading OMP JSONL stdout, translating to AgentEvents. + fn spawn_reader( + stdout: Arc>>, + tx: mpsc::UnboundedSender, + abort_token: CancellationToken, + ) { + tokio::spawn(async move { + let mut reader = stdout.lock().await; + let mut line = String::new(); + + loop { + tokio::select! { + _ = abort_token.cancelled() => { + let _ = tx.send(AgentEvent::TurnEnd { + stop_reason: StopReason::Aborted, + metadata: HashMap::new(), + }); + break; + } + result = reader.read_line(&mut line) => { + match result { + Ok(0) => break, + Ok(_) => { + let trimmed = line.trim(); + if !trimmed.is_empty() { + match serde_json::from_str::(trimmed) { + Ok(json) => { + let event = Self::translate_message(&json); + if tx.send(event).is_err() { + break; + } + } + Err(e) => { + let _ = tx.send(AgentEvent::Error { + message: format!( + "JSON parse error from omp: {} (line: {})", + e, trimmed, + ), + metadata: HashMap::new(), + }); + } + } + } + line.clear(); + } + Err(e) => { + if !abort_token.is_cancelled() { + let _ = tx.send(AgentEvent::Error { + message: format!("stdout read error: {}", e), + metadata: HashMap::new(), + }); + } + break; + } + } + } + } + } + }); + } + + /// Translate a single OMP JSONL message to an AgentEvent. + fn translate_message(msg: &Value) -> AgentEvent { + let msg_type = msg["type"].as_str().unwrap_or(""); + + match msg_type { + "message_update" => { + let data = &msg["data"]; + let data_type = data["type"].as_str().unwrap_or(""); + match data_type { + "text_delta" => AgentEvent::TextDelta { + delta: data["delta"].as_str().unwrap_or("").to_string(), + metadata: HashMap::new(), + }, + "thinking_delta" => AgentEvent::ThinkingDelta { + delta: data["delta"].as_str().unwrap_or("").to_string(), + metadata: HashMap::new(), + }, + "tool_call_start" => AgentEvent::ToolCallStart { + tool_call_id: data["tool_call_id"].as_str().unwrap_or("").to_string(), + tool_name: data["tool_name"].as_str().unwrap_or("").to_string(), + metadata: HashMap::new(), + }, + "tool_call_delta" => AgentEvent::ToolCallDelta { + tool_call_id: data["tool_call_id"].as_str().unwrap_or("").to_string(), + delta: data["delta"].as_str().unwrap_or("").to_string(), + metadata: HashMap::new(), + }, + "tool_result" => AgentEvent::ToolResult { + tool_call_id: data["tool_call_id"].as_str().unwrap_or("").to_string(), + result: data["result"].as_str().unwrap_or("").to_string(), + metadata: HashMap::new(), + }, + _ => AgentEvent::Error { + message: format!("unknown message_update data type from omp: {}", data_type), + metadata: HashMap::new(), + }, + } + } + "agent_start" => AgentEvent::TurnStart { + metadata: HashMap::new(), + }, + "agent_end" => AgentEvent::TurnEnd { + stop_reason: StopReason::Completed, + metadata: HashMap::new(), + }, + _ => AgentEvent::Error { + message: format!("unknown message type from omp: {}", msg_type), + metadata: HashMap::new(), + }, + } + } +} + +#[async_trait] +impl AgentSession for OmpSession { + fn session_id(&self) -> &str { + &self.session_id + } + + async fn prompt( + &self, + input: &str, + _attachments: Vec, + ) -> PortResult { + let cmd = serde_json::json!({ + "command": "prompt", + "text": input, + }); + Self::write_command(&self.stdin, &cmd).await?; + + let (tx, rx) = mpsc::unbounded_channel::(); + + Self::spawn_reader( + Arc::clone(&self.stdout), + tx, + self.abort_token.clone(), + ); + + let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx); + Ok(Box::pin(stream)) + } + + async fn steer(&self, message: &str) -> PortResult<()> { + let cmd = serde_json::json!({ + "command": "steer", + "text": message, + }); + Self::write_command(&self.stdin, &cmd).await + } + + async fn abort(&self) -> PortResult<()> { + self.abort_token.cancel(); + let cmd = serde_json::json!({"command": "abort"}); + Self::write_command(&self.stdin, &cmd).await + } + + async fn dispose(self: Box) -> PortResult<()> { + self.abort_token.cancel(); + let mut child = self.child.lock().await; + let _ = child.kill().await; + let _ = child.wait().await; + Ok(()) + } +} diff --git a/src/crates/core/src/agentic/session/session_manager.rs b/src/crates/core/src/agentic/session/session_manager.rs index 865b39ca3..6eea63699 100644 --- a/src/crates/core/src/agentic/session/session_manager.rs +++ b/src/crates/core/src/agentic/session/session_manager.rs @@ -1649,6 +1649,56 @@ impl SessionManager { Ok(()) } + /// Update session runtime id (in-memory + persistence) + pub async fn update_session_runtime_id( + &self, + session_id: &str, + runtime_id: &str, + ) -> BitFunResult<()> { + // If the session was evicted from memory, try to restore it + if !self.sessions.contains_key(session_id) && self.config.enable_persistence { + let workspace_path = self + .session_workspace_index + .get(session_id) + .map(|entry| entry.clone()); + if let Some(workspace_path) = workspace_path { + debug!( + "Session evicted from memory, restoring for runtime update: session_id={}", + session_id + ); + let _ = self.restore_session(&workspace_path, session_id).await; + } + } + + if let Some(mut session) = self.sessions.get_mut(session_id) { + session.config.runtime_id = Some(runtime_id.to_string()); + session.updated_at = SystemTime::now(); + session.last_activity_at = SystemTime::now(); + } else { + return Err(BitFunError::NotFound(format!( + "Session not found: {}", + session_id + ))); + } + + if self.should_persist_session_id(session_id) { + let effective_path = self.effective_session_workspace_path(session_id).await; + let session_snapshot = self.sessions.get(session_id).map(|s| s.clone()); + if let (Some(workspace_path), Some(session)) = (effective_path, session_snapshot) { + self.persistence_manager + .save_session(&workspace_path, &session) + .await?; + } + } + + debug!( + "Session runtime id updated: session_id={}, runtime_id={}", + session_id, runtime_id + ); + + Ok(()) + } + /// Update session activity time pub fn touch_session(&self, session_id: &str) { if let Some(mut session) = self.sessions.get_mut(session_id) { diff --git a/src/crates/core/src/agentic/system.rs b/src/crates/core/src/agentic/system.rs index 1d5ab1677..09b89daa4 100644 --- a/src/crates/core/src/agentic/system.rs +++ b/src/crates/core/src/agentic/system.rs @@ -5,10 +5,15 @@ use std::sync::Arc; use anyhow::Result; use log::info; +use bitfun_runtime_ports::registry::{init_global_runtime_registry, RuntimeRegistry}; + use crate::agentic::coordination; use crate::agentic::events; use crate::agentic::execution; use crate::agentic::persistence; +use crate::agentic::runtime_adapters::bitfun_runtime::BitfunRuntime; +use crate::agentic::runtime_adapters::claude_runtime::ClaudeRuntime; +use crate::agentic::runtime_adapters::omp_runtime::OmpRuntime; use crate::agentic::session; use crate::agentic::tools; use crate::infrastructure::ai::AIClientFactory; @@ -98,11 +103,19 @@ pub async fn init_agentic_system() -> Result { } }); - info!("Agentic system initialization complete"); - - Ok(AgenticSystem { - coordinator, + let agentic_system = AgenticSystem { + coordinator: coordinator.clone(), event_queue, token_usage_service, - }) + }; + + // Register runtime adapters in the global registry + let mut registry = RuntimeRegistry::new(); + registry.register(Arc::new(BitfunRuntime::new(Arc::new(agentic_system.clone())))); + registry.register(Arc::new(OmpRuntime::new())); + registry.register(Arc::new(ClaudeRuntime::new())); + init_global_runtime_registry(registry); + info!("Agent runtime registry initialized with {} runtimes", 3); + + Ok(agentic_system) } diff --git a/src/crates/runtime-ports/Cargo.toml b/src/crates/runtime-ports/Cargo.toml index 4f773ccc0..cbd84e683 100644 --- a/src/crates/runtime-ports/Cargo.toml +++ b/src/crates/runtime-ports/Cargo.toml @@ -13,3 +13,7 @@ crate-type = ["rlib"] async-trait = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +futures = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["rt", "macros"] } diff --git a/src/crates/runtime-ports/src/agent_runtime.rs b/src/crates/runtime-ports/src/agent_runtime.rs new file mode 100644 index 000000000..e10bcc6c2 --- /dev/null +++ b/src/crates/runtime-ports/src/agent_runtime.rs @@ -0,0 +1,189 @@ +//! Agent Runtime Abstraction +//! +//! Defines the trait hierarchy for switching between different agent runtimes. +//! Each runtime (BitFun native, OMP, Claude Agent SDK) implements these traits +//! to provide a unified interface for session management and event streaming. +//! +//! Design principle: autonomous subprocess model (Model C). +//! External runtimes are self-contained black boxes with their own toolchains. +//! BitFun only passes prompts in and receives event streams out. + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::pin::Pin; + +use crate::AgentInputAttachment; +use crate::{PortError, PortResult}; + +// --------------------------------------------------------------------------- +// AgentRuntime — lifecycle management for a runtime provider +// --------------------------------------------------------------------------- + +/// Capabilities a runtime advertises. Used by UI to show feature differences. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RuntimeCapabilities { + /// Human-readable description + pub description: String, + /// Runtime supports mid-turn steering + pub supports_steer: bool, + /// Runtime emits thinking/reasoning deltas + pub supports_thinking: bool, + /// Runtime manages its own tools autonomously + pub autonomous_tools: bool, + /// Arbitrary capability flags for runtime-specific features + #[serde(default)] + pub extras: HashMap, +} + +/// Configuration for creating a new agent session. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SessionConfig { + /// Optional model override + #[serde(skip_serializing_if = "Option::is_none")] + pub model_id: Option, + /// Working directory + #[serde(skip_serializing_if = "Option::is_none")] + pub working_dir: Option, + /// Runtime-specific configuration + #[serde(default)] + pub runtime_options: HashMap, +} + +/// A provider of agent sessions. Each runtime (BitFun, OMP, Claude) implements this. +#[async_trait] +pub trait AgentRuntime: Send + Sync { + /// Stable identifier: "bitfun" | "omp" | "claude" + fn id(&self) -> &str; + + /// Human-readable name for UI display + fn display_name(&self) -> &str; + + /// What this runtime can do + fn capabilities(&self) -> &RuntimeCapabilities; + + /// Create a new agent session + async fn create_session( + &self, + config: SessionConfig, + ) -> PortResult>; + + /// Check if this runtime is usable (binary found, API key configured, etc.) + async fn health_check(&self) -> PortResult<()>; + + /// Release runtime-level resources + async fn shutdown(&self) -> PortResult<()> { + Ok(()) + } +} + +// --------------------------------------------------------------------------- +// AgentSession — interaction with a single agent session +// --------------------------------------------------------------------------- + +/// A boxed, pinned, Send stream of agent events. +pub type AgentEventStream = Pin + Send>>; + +/// An active agent session. Created by an AgentRuntime, consumed by the coordinator. +#[async_trait] +pub trait AgentSession: Send + Sync { + /// Opaque session identifier + fn session_id(&self) -> &str; + + /// Send a user message and receive streaming events. + /// The stream ends when the agent turn completes (TurnEnd) or errors. + async fn prompt( + &self, + input: &str, + attachments: Vec, + ) -> PortResult; + + /// Inject a steering message into the running turn (if supported) + async fn steer(&self, _message: &str) -> PortResult<()> { + Err(PortError::new( + crate::PortErrorKind::NotAvailable, + "steer not supported by this runtime", + )) + } + + /// Abort the current turn + async fn abort(&self) -> PortResult<()>; + + /// Release session resources + async fn dispose(self: Box) -> PortResult<()>; +} + +// --------------------------------------------------------------------------- +// AgentEvent — unified event model across all runtimes +// --------------------------------------------------------------------------- + +/// Why a turn ended. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum StopReason { + Completed, + Aborted, + Error, + ToolLimit, +} + +/// Unified event type. Each runtime translates its native events into these. +/// Unmapped runtime-specific data goes into `metadata`. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum AgentEvent { + TextDelta { + delta: String, + #[serde(default)] + metadata: HashMap, + }, + ThinkingDelta { + delta: String, + #[serde(default)] + metadata: HashMap, + }, + ToolCallStart { + tool_call_id: String, + tool_name: String, + #[serde(default)] + metadata: HashMap, + }, + ToolCallDelta { + tool_call_id: String, + delta: String, + #[serde(default)] + metadata: HashMap, + }, + ToolResult { + tool_call_id: String, + result: String, + #[serde(default)] + metadata: HashMap, + }, + TurnStart { + #[serde(default)] + metadata: HashMap, + }, + TurnEnd { + stop_reason: StopReason, + #[serde(default)] + metadata: HashMap, + }, + Error { + message: String, + #[serde(default)] + metadata: HashMap, + }, +} + +/// Health status of a single runtime at a point in time. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RuntimeHealthStatus { + pub runtime_id: String, + pub available: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} diff --git a/src/crates/runtime-ports/src/lib.rs b/src/crates/runtime-ports/src/lib.rs index 6edd6ff80..98b85a39c 100644 --- a/src/crates/runtime-ports/src/lib.rs +++ b/src/crates/runtime-ports/src/lib.rs @@ -4,6 +4,8 @@ //! This crate intentionally contains only DTOs and traits. It must not depend //! on concrete managers, platform adapters, `bitfun-core`, or app crates. +pub mod agent_runtime; +pub mod registry; use serde::{Deserialize, Serialize}; pub type PortResult = Result; diff --git a/src/crates/runtime-ports/src/registry.rs b/src/crates/runtime-ports/src/registry.rs new file mode 100644 index 000000000..5074d2de2 --- /dev/null +++ b/src/crates/runtime-ports/src/registry.rs @@ -0,0 +1,187 @@ +//! Runtime Registry +//! +//! Global singleton that holds all registered AgentRuntime implementations. +//! Runtimes are registered at startup; the registry is read-only during operation. + +use std::sync::{Arc, OnceLock}; + +use crate::agent_runtime::{AgentRuntime, RuntimeHealthStatus}; + +/// Registry of all available agent runtimes. +pub struct RuntimeRegistry { + runtimes: Vec>, + default_id: String, +} + +impl std::fmt::Debug for RuntimeRegistry { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let ids: Vec<&str> = self.runtimes.iter().map(|r| r.id()).collect(); + f.debug_struct("RuntimeRegistry") + .field("runtime_ids", &ids) + .field("default_id", &self.default_id) + .finish() + } +} + +impl RuntimeRegistry { + /// Create an empty registry. + pub fn new() -> Self { + Self { + runtimes: Vec::new(), + default_id: "bitfun".to_string(), + } + } + + /// Register a runtime. Order determines priority for default selection. + pub fn register(&mut self, runtime: Arc) { + self.runtimes.push(runtime); + } + + /// List all registered runtimes. + pub fn list_all(&self) -> &[Arc] { + &self.runtimes + } + + /// Get a runtime by id. + pub fn get(&self, id: &str) -> Option<&Arc> { + self.runtimes.iter().find(|r| r.id() == id) + } + + /// Check health of all runtimes. + pub async fn health_check_all(&self) -> Vec { + let mut results = Vec::with_capacity(self.runtimes.len()); + for runtime in &self.runtimes { + let status = match runtime.health_check().await { + Ok(()) => RuntimeHealthStatus { + runtime_id: runtime.id().to_string(), + available: true, + error: None, + }, + Err(e) => RuntimeHealthStatus { + runtime_id: runtime.id().to_string(), + available: false, + error: Some(e.message.clone()), + }, + }; + results.push(status); + } + results + } + + /// Select the default runtime based on priority order and health. + /// Priority: OMP -> Claude -> BitFun (first healthy wins). + pub fn select_default(&self, health_statuses: &[RuntimeHealthStatus]) -> &Arc { + let healthy_ids: Vec<&str> = health_statuses + .iter() + .filter(|s| s.available) + .map(|s| s.runtime_id.as_str()) + .collect(); + + // Priority order + for preferred in &["omp", "claude", "bitfun"] { + if healthy_ids.contains(preferred) { + if let Some(rt) = self.get(preferred) { + return rt; + } + } + } + + // Fallback: first registered runtime + self.runtimes + .first() + .expect("RuntimeRegistry must have at least one runtime registered") + } +} + +// --------------------------------------------------------------------------- +// Global singleton +// --------------------------------------------------------------------------- + +static GLOBAL_RUNTIME_REGISTRY: OnceLock = OnceLock::new(); + +/// Get the global runtime registry. +pub fn get_global_runtime_registry() -> &'static RuntimeRegistry { + GLOBAL_RUNTIME_REGISTRY.get_or_init(RuntimeRegistry::new) +} + +/// Set the global runtime registry. Call once during startup. +/// Panics if called more than once. +pub fn init_global_runtime_registry(registry: RuntimeRegistry) { + GLOBAL_RUNTIME_REGISTRY + .set(registry) + .expect("RuntimeRegistry already initialized"); +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use async_trait::async_trait; + use crate::agent_runtime::{AgentSession, RuntimeCapabilities, SessionConfig}; + use crate::{PortError, PortErrorKind, PortResult}; + use std::collections::HashMap; + use std::sync::LazyLock; + + struct MockRuntime { + id: &'static str, + healthy: bool, + } + + #[async_trait] + impl AgentRuntime for MockRuntime { + fn id(&self) -> &str { self.id } + fn display_name(&self) -> &str { self.id } + fn capabilities(&self) -> &RuntimeCapabilities { + static CAPS: LazyLock = LazyLock::new(|| RuntimeCapabilities { + description: String::new(), + supports_steer: false, + supports_thinking: false, + autonomous_tools: false, + extras: HashMap::new(), + }); + &CAPS + } + async fn create_session(&self, _: SessionConfig) -> PortResult> { + Err(PortError::new(PortErrorKind::NotAvailable, "mock")) + } + async fn health_check(&self) -> PortResult<()> { + if self.healthy { Ok(()) } else { Err(PortError::new(PortErrorKind::NotAvailable, "unhealthy")) } + } + } + + #[tokio::test] + async fn test_registry_get() { + let mut reg = RuntimeRegistry::new(); + reg.register(Arc::new(MockRuntime { id: "omp", healthy: true })); + reg.register(Arc::new(MockRuntime { id: "claude", healthy: true })); + + assert_eq!(reg.get("omp").unwrap().id(), "omp"); + assert_eq!(reg.get("claude").unwrap().id(), "claude"); + assert!(reg.get("bitfun").is_none()); + } + + #[tokio::test] + async fn test_select_default_prefers_omp() { + let mut reg = RuntimeRegistry::new(); + reg.register(Arc::new(MockRuntime { id: "bitfun", healthy: true })); + reg.register(Arc::new(MockRuntime { id: "omp", healthy: true })); + + let health = reg.health_check_all().await; + let default = reg.select_default(&health); + assert_eq!(default.id(), "omp"); + } + + #[tokio::test] + async fn test_select_default_falls_back_when_omp_unhealthy() { + let mut reg = RuntimeRegistry::new(); + reg.register(Arc::new(MockRuntime { id: "bitfun", healthy: true })); + reg.register(Arc::new(MockRuntime { id: "omp", healthy: false })); + + let health = reg.health_check_all().await; + let default = reg.select_default(&health); + assert_eq!(default.id(), "bitfun"); + } +} diff --git a/src/web-ui/src/flow_chat/components/ChatInput.tsx b/src/web-ui/src/flow_chat/components/ChatInput.tsx index 3361db881..21802a891 100644 --- a/src/web-ui/src/flow_chat/components/ChatInput.tsx +++ b/src/web-ui/src/flow_chat/components/ChatInput.tsx @@ -19,6 +19,7 @@ import { } from '../hooks/useSessionStateMachine'; import { SessionExecutionEvent } from '../state-machine/types'; import { ModelSelector } from './ModelSelector'; +import { RuntimeSelector } from './RuntimeSelector'; import { FlowChatStore } from '../store/FlowChatStore'; import type { FlowChatState, Session } from '../types/flow-chat'; import type { FileContext, DirectoryContext, ImageContext } from '@/types/context.ts'; @@ -300,6 +301,38 @@ export const ChatInput: React.FC = ({ const currentReviewActivity = useSessionReviewActivity(currentSessionId); useSessionStateMachine(effectiveTargetSessionId); const { confirmDeepReviewLaunch, deepReviewConsentDialog } = useDeepReviewConsent(); + + // Runtime selector state - locked once session has dialog turns + const [selectedRuntimeId, setSelectedRuntimeId] = useState(null); + const hasDialogTurns = Boolean(effectiveTargetSession?.dialogTurns?.length); + + // Sync runtime from session on switch + const sessionRuntimeId = effectiveTargetSession?.runtimeId; + React.useEffect(() => { + if (sessionRuntimeId) { + setSelectedRuntimeId(sessionRuntimeId); + } else { + setSelectedRuntimeId(null); + } + }, [sessionRuntimeId]); + + // Persist runtime selection to session store + const handleRuntimeChange = useCallback((runtimeId: string) => { + setSelectedRuntimeId(runtimeId); + if (effectiveTargetSessionId) { + const store = FlowChatStore.getInstance(); + store.updateSessionRuntimeId(effectiveTargetSessionId, runtimeId); + const session = store.getState().sessions.get(effectiveTargetSessionId); + if (!session?.isTransient) { + agentAPI.updateSessionRuntime({ + sessionId: effectiveTargetSessionId, + runtimeId, + }).catch((err) => { + console.warn('[ChatInput] failed to persist runtime:', err); + }); + } + } + }, [effectiveTargetSessionId]); // isMultiLine: true when content overflows a single line (scrollHeight > threshold or has newlines) const [isMultiLine, setIsMultiLine] = useState(false); // showPlaceholder is true when the editor DOM is truly empty (value empty AND no residual
) @@ -3178,6 +3211,11 @@ export const ChatInput: React.FC = ({ currentTokens={tokenUsage.current} maxTokens={tokenUsage.max} /> + {renderActionButton()} diff --git a/src/web-ui/src/flow_chat/components/RuntimeSelector.scss b/src/web-ui/src/flow_chat/components/RuntimeSelector.scss new file mode 100644 index 000000000..c5e614da6 --- /dev/null +++ b/src/web-ui/src/flow_chat/components/RuntimeSelector.scss @@ -0,0 +1,183 @@ +/** + * RuntimeSelector styles. + * Visually mirrors ModelSelector: compact pill trigger, floating dropdown. + */ + +@use '../../component-library/styles/tokens' as *; + +.bitfun-runtime-selector { + position: relative; + display: inline-flex; + align-items: center; + + // ==================== Trigger ==================== + + &__trigger { + display: inline-flex; + align-items: center; + gap: 4px; + height: 20px; + padding: 0 8px; + border-radius: 10px; + border: none; + background: transparent; + color: var(--color-text-secondary); + font-size: var(--flowchat-font-size-2xs); + font-weight: 400; + cursor: pointer; + transition: all 0.2s ease; + outline: none; + opacity: 0.3; + white-space: nowrap; + letter-spacing: 0.3px; + + .bitfun-chat-input__box:focus-within & { + opacity: 1; + } + + &:hover { + background: var(--element-bg-medium); + color: var(--color-text-primary); + opacity: 1; + } + + &--open { + background: rgba(100, 140, 255, 0.15); + color: var(--color-text-primary); + opacity: 1; + } + + &:disabled { + opacity: 0.35; + cursor: default; + + &:hover { + background: transparent; + } + } + } + + &__icon { + flex-shrink: 0; + } + + &__name { + max-width: 80px; + overflow: hidden; + text-overflow: ellipsis; + } + + &__chevron { + flex-shrink: 0; + opacity: 0.5; + } + + // ==================== Dropdown ==================== + + &__dropdown { + position: absolute; + bottom: calc(100% + 6px); + left: 50%; + transform: translateX(-50%); + min-width: 200px; + max-width: 300px; + background: var(--element-bg-elevated); + border: 1px solid var(--border-color-light); + border-radius: 8px; + box-shadow: + 0 4px 16px rgba(0, 0, 0, 0.12), + 0 1px 4px rgba(0, 0, 0, 0.08); + z-index: 1000; + overflow: hidden; + } + + &__dropdown-header { + padding: 8px 12px 4px; + font-size: var(--flowchat-font-size-2xs); + font-weight: 600; + color: var(--color-text-tertiary); + text-transform: uppercase; + letter-spacing: 0.5px; + } + + &__list { + padding: 4px 0; + } + + // ==================== Option ==================== + + &__option { + display: flex; + align-items: center; + justify-content: space-between; + width: 100%; + padding: 6px 12px; + border: none; + background: transparent; + cursor: pointer; + transition: background 0.15s ease; + text-align: left; + font-size: var(--flowchat-font-size-xs); + + &:hover:not(:disabled) { + background: var(--element-bg-medium); + } + + &--selected { + background: rgba(100, 140, 255, 0.08); + } + + &--unavailable { + opacity: 0.45; + cursor: not-allowed; + } + } + + &__option-main { + display: flex; + align-items: center; + gap: 6px; + min-width: 0; + } + + &__option-name { + color: var(--color-text-primary); + white-space: nowrap; + overflow: hidden; + text-overflow: ellipsis; + } + + &__option-check { + flex-shrink: 0; + color: var(--color-primary); + } + + // ==================== Status dot ==================== + + &__status-dot { + width: 6px; + height: 6px; + border-radius: 50%; + flex-shrink: 0; + + &--ok { + background: var(--color-success, #4caf50); + } + + &--err { + background: var(--color-error, #f44336); + } + } + + // ==================== Badge ==================== + + &__badge { + font-size: 9px; + padding: 0 4px; + border-radius: 3px; + background: rgba(100, 140, 255, 0.15); + color: var(--color-primary); + line-height: 14px; + flex-shrink: 0; + } +} diff --git a/src/web-ui/src/flow_chat/components/RuntimeSelector.tsx b/src/web-ui/src/flow_chat/components/RuntimeSelector.tsx new file mode 100644 index 000000000..ca1d70c60 --- /dev/null +++ b/src/web-ui/src/flow_chat/components/RuntimeSelector.tsx @@ -0,0 +1,177 @@ +/** + * RuntimeSelector — compact runtime switcher for the chat input bar. + * + * Sits next to the ModelSelector. Visually identical trigger style. + * Locked (disabled) once the session has any dialog turns. + * Falls back to a hardcoded BitFun Native entry if backend invoke fails. + */ + +import React, { useEffect, useState, useRef, useCallback, useMemo } from 'react'; +import { Check, ChevronDown, Cpu } from 'lucide-react'; +import { invoke } from '@tauri-apps/api/core'; +import './RuntimeSelector.scss'; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +interface AgentRuntimeDto { + id: string; + displayName: string; + description: string; + available: boolean; + error: string | null; + supportsSteer: boolean; + supportsThinking: boolean; + autonomousTools: boolean; +} + +export interface RuntimeSelectorProps { + /** Currently selected runtime id */ + selectedRuntimeId: string | null; + /** Callback when user picks a different runtime */ + onRuntimeChange: (runtimeId: string) => void; + /** If true, the selector is locked */ + disabled?: boolean; +} + +// Hardcoded fallback — always available +const BITFUN_RUNTIME: AgentRuntimeDto = { + id: 'bitfun', + displayName: 'BitFun Native', + description: "BitFun's built-in agent runtime", + available: true, + error: null, + supportsSteer: false, + supportsThinking: true, + autonomousTools: false, +}; + +// --------------------------------------------------------------------------- +// Component +// --------------------------------------------------------------------------- + +export const RuntimeSelector: React.FC = ({ + selectedRuntimeId, + onRuntimeChange, + disabled = false, +}) => { + const [runtimes, setRuntimes] = useState([BITFUN_RUNTIME]); + const [loading, setLoading] = useState(true); + const [dropdownOpen, setDropdownOpen] = useState(false); + const containerRef = useRef(null); + + // Fetch runtimes from backend + useEffect(() => { + let cancelled = false; + (async () => { + try { + const data = await invoke('list_agent_runtimes'); + console.log('[RuntimeSelector] list_agent_runtimes returned:', JSON.stringify(data)); + if (cancelled) return; + if (data.length > 0) { + setRuntimes(data); + } else { + // Backend returned empty — use fallback + setRuntimes([BITFUN_RUNTIME]); + } + } catch (err) { + console.warn('[RuntimeSelector] invoke failed, using fallback:', err); + if (cancelled) return; + // keep the initial [BITFUN_RUNTIME] state + } finally { + if (!cancelled) setLoading(false); + } + })(); + return () => { cancelled = true; }; + }, []); + + // Auto-select default runtime if none selected + useEffect(() => { + if (selectedRuntimeId === null && !loading) { + const defaultRt = runtimes.find((r) => r.id === 'omp' && r.available) ?? runtimes.find((r) => r.available) ?? runtimes[0]; + if (defaultRt) { + onRuntimeChange(defaultRt.id); + } + } + }, [loading, runtimes, selectedRuntimeId, onRuntimeChange]); + + // Click outside + useEffect(() => { + if (!dropdownOpen) return; + const handler = (e: MouseEvent) => { + if (containerRef.current && !containerRef.current.contains(e.target as Node)) { + setDropdownOpen(false); + } + }; + document.addEventListener('mousedown', handler); + return () => document.removeEventListener('mousedown', handler); + }, [dropdownOpen]); + + const selectedRuntime = useMemo( + () => runtimes.find((r) => r.id === selectedRuntimeId) ?? null, + [runtimes, selectedRuntimeId], + ); + + const handleSelect = useCallback( + (runtimeId: string) => { + const rt = runtimes.find((r) => r.id === runtimeId); + if (rt) { + onRuntimeChange(runtimeId); + setDropdownOpen(false); + } + }, + [runtimes, onRuntimeChange], + ); + + const triggerLabel = loading ? '…' : selectedRuntime?.displayName ?? 'BitFun Native'; + + return ( +
+ + + {dropdownOpen && !disabled && ( +
+
+ Runtime +
+
+ {runtimes.map((rt) => { + const isSelected = rt.id === selectedRuntimeId; + return ( + + ); + })} +
+
+ )} +
+ ); +}; + +RuntimeSelector.displayName = 'RuntimeSelector'; +export default RuntimeSelector; diff --git a/src/web-ui/src/flow_chat/components/index.ts b/src/web-ui/src/flow_chat/components/index.ts index b2aeef784..c66a10179 100644 --- a/src/web-ui/src/flow_chat/components/index.ts +++ b/src/web-ui/src/flow_chat/components/index.ts @@ -8,3 +8,5 @@ export { RichTextInput, type MentionState } from './RichTextInput'; export { ModelSelector } from './ModelSelector'; export * from './toolbar-mode'; +export { RuntimeSelector } from './RuntimeSelector'; +export type { RuntimeSelectorProps } from './RuntimeSelector'; diff --git a/src/web-ui/src/flow_chat/services/flow-chat-manager/SessionModule.ts b/src/web-ui/src/flow_chat/services/flow-chat-manager/SessionModule.ts index 953f6c2c8..901b73515 100644 --- a/src/web-ui/src/flow_chat/services/flow-chat-manager/SessionModule.ts +++ b/src/web-ui/src/flow_chat/services/flow-chat-manager/SessionModule.ts @@ -759,6 +759,7 @@ export async function ensureBackendSession( resolveSessionTitle(latestSession, (key, options) => i18nService.t(key, options)) || `Session ${sessionId.slice(0, 8)}`, agentType: latestSession.mode || 'agentic', + runtimeId: latestSession.runtimeId, workspacePath, workspaceId: latestSession.workspaceId, remoteConnectionId: effectiveConnectionId, @@ -800,6 +801,7 @@ export async function retryCreateBackendSession( resolveSessionTitle(session, (key, options) => i18nService.t(key, options)) || `Session ${sessionId.slice(0, 8)}`, agentType: session.mode || 'agentic', + runtimeId: session.runtimeId, workspacePath, workspaceId: session.workspaceId, remoteConnectionId: session.remoteConnectionId, diff --git a/src/web-ui/src/flow_chat/store/FlowChatStore.ts b/src/web-ui/src/flow_chat/store/FlowChatStore.ts index d453c13a8..2db13befa 100644 --- a/src/web-ui/src/flow_chat/store/FlowChatStore.ts +++ b/src/web-ui/src/flow_chat/store/FlowChatStore.ts @@ -528,6 +528,33 @@ export class FlowChatStore { }); } + /** + * Update the agent runtime selection for a session. + * This is a session-level preference that persists across switches. + */ + public updateSessionRuntimeId(sessionId: string, runtimeId: string): void { + this.setState(prev => { + const session = prev.sessions.get(sessionId); + if (!session) return prev; + + if (session.runtimeId === runtimeId) return prev; + + const updatedSession = { + ...session, + runtimeId, + lastActiveAt: Date.now() + }; + + const newSessions = new Map(prev.sessions); + newSessions.set(sessionId, updatedSession); + + return { + ...prev, + sessions: newSessions + }; + }); + } + /** * Record the mode used by the most recent user submission accepted by the runtime. * Unlike `lastUserDialogMode`, this does not rewind when history is rolled back. @@ -2829,6 +2856,7 @@ export class FlowChatStore { contextRestoreState, error: null, mode: restoredSessionInfo?.agentType || session.mode, + runtimeId: restoredSessionInfo?.runtimeId ?? session.runtimeId, lastUserDialogMode: restoredLastUserDialogMode, lastSubmittedMode: restoredSessionInfo?.lastSubmittedAgentType ?? session.lastSubmittedMode, diff --git a/src/web-ui/src/flow_chat/types/flow-chat.ts b/src/web-ui/src/flow_chat/types/flow-chat.ts index 5b030541e..f95b5a6c1 100644 --- a/src/web-ui/src/flow_chat/types/flow-chat.ts +++ b/src/web-ui/src/flow_chat/types/flow-chat.ts @@ -320,6 +320,11 @@ export interface Session { * This controls what the next dialog turn should use by default. */ mode?: string; + /** + * Agent runtime selection for this session (e.g. 'bitfun', 'omp', 'claude'). + * Locked once the first message is sent. Persists across session switches. + */ + runtimeId?: string; /** * Mode of the last surviving user dialog turn in the current session * history. Rollback and turn truncation should follow this value. diff --git a/src/web-ui/src/infrastructure/api/service-api/AgentAPI.ts b/src/web-ui/src/infrastructure/api/service-api/AgentAPI.ts index d631cccc1..9cf174645 100644 --- a/src/web-ui/src/infrastructure/api/service-api/AgentAPI.ts +++ b/src/web-ui/src/infrastructure/api/service-api/AgentAPI.ts @@ -47,6 +47,7 @@ export interface CreateSessionRequest { remoteSshHost?: string; sessionKind?: 'standard' | 'subagent'; relationship?: SessionRelationship; + runtimeId?: string; deepReviewRunManifest?: ReviewTeamRunManifest; config?: SessionConfig; } @@ -96,6 +97,8 @@ export interface SessionInfo { state: string; turnCount: number; createdAt: number; + /** Agent runtime selection (e.g. 'bitfun', 'omp', 'claude'). */ + runtimeId?: string; } export interface RestoreSessionWithTurnsResponse { @@ -143,6 +146,11 @@ export interface UpdateSessionModelRequest { modelName: string; } +export interface UpdateSessionRuntimeRequest { + sessionId: string; + runtimeId: string; +} + export interface UpdateSessionTitleRequest { sessionId: string; title: string; @@ -164,9 +172,6 @@ export interface ModeInfo { * share the same key can reuse the same session-level prompt cache. */ promptCacheScopeKey: string; - configProfileId: string; - configProfileLabel?: string; - configProfileMemberModeIds: string[]; } @@ -542,6 +547,14 @@ export class AgentAPI { } } + async updateSessionRuntime(request: UpdateSessionRuntimeRequest): Promise { + try { + await api.invoke('update_session_runtime', { request }); + } catch (error) { + throw createTauriCommandError('update_session_runtime', error, request); + } + } + async updateSessionTitle(request: UpdateSessionTitleRequest): Promise { try { return await api.invoke('update_session_title', { request }); @@ -807,8 +820,6 @@ export class AgentAPI { isReadonly: false, toolCount: 0, promptCacheScopeKey: agentType, - configProfileId: agentType, - configProfileMemberModeIds: [agentType], agent_type: agentType, when_to_use: `Use ${agentType} for related tasks`, tools: 'all',