From 92176a45289953a89dac93c0ffea65ad9c75a80f Mon Sep 17 00:00:00 2001 From: wsp1911 Date: Wed, 27 May 2026 15:52:30 +0800 Subject: [PATCH] refactor(core): switch context compression to full-prefix fork-like summary requests - move model-based compression request assembly into execution engine - reuse session system prompt, tools, reminders, and user context for compression - replace tail-retention policy with auto/manual compression modes - summarize the full visible context in one request instead of chunking by turns - keep compressor focused on turn collection, fallback summary, and context rewrite - route manual compaction through the same full-prefix compression path --- .../src/agentic/coordination/coordinator.rs | 25 +- .../src/agentic/execution/execution_engine.rs | 492 ++++++++++++-- .../agentic/session/compression/compressor.rs | 642 +++++------------- 3 files changed, 649 insertions(+), 510 deletions(-) diff --git a/src/crates/core/src/agentic/coordination/coordinator.rs b/src/crates/core/src/agentic/coordination/coordinator.rs index 03d438270..c9fb1e7f0 100644 --- a/src/crates/core/src/agentic/coordination/coordinator.rs +++ b/src/crates/core/src/agentic/coordination/coordinator.rs @@ -2116,6 +2116,24 @@ Update the persona files and delete BOOTSTRAP.md as soon as bootstrap is complet .await; let current_tokens = Self::estimate_context_tokens(&context_messages); + let manual_workspace = Self::build_workspace_binding(&session.config).await; + let manual_workspace_services = Self::build_workspace_services(&manual_workspace).await; + let manual_execution_context = ExecutionContext { + session_id: session_id.clone(), + dialog_turn_id: turn_id.clone(), + turn_index, + agent_type: session.agent_type.clone(), + workspace: manual_workspace, + context: HashMap::new(), + subagent_parent_info: None, + delegation_policy: DelegationPolicy::top_level(), + skip_tool_confirmation: true, + runtime_tool_restrictions: ToolRuntimeRestrictions::default(), + workspace_services: manual_workspace_services, + round_preempt: None, + round_injection: None, + recover_partial_on_cancel: false, + }; let session_max_tokens = session.config.max_context_tokens; // Unify context_window: min(model capability, session config) @@ -2139,13 +2157,12 @@ Update the persona files and delete BOOTSTRAP.md as soon as bootstrap is complet match self .execution_engine .compact_session_context( - &session_id, - &turn_id, + session_id.clone(), + turn_id.clone(), + manual_execution_context, context_messages, current_tokens, - context_window, "manual", - crate::agentic::session::CompressionTailPolicy::CollapseAll, ) .await { diff --git a/src/crates/core/src/agentic/execution/execution_engine.rs b/src/crates/core/src/agentic/execution/execution_engine.rs index 3935ca4bf..5c27fb724 100644 --- a/src/crates/core/src/agentic/execution/execution_engine.rs +++ b/src/crates/core/src/agentic/execution/execution_engine.rs @@ -20,7 +20,7 @@ use crate::agentic::image_analysis::{ ImageLimits, }; use crate::agentic::round_preempt::RoundInjectionKind; -use crate::agentic::session::{CompressionTailPolicy, ContextCompressor, SessionManager}; +use crate::agentic::session::{CompressionMode, ContextCompressor, SessionManager}; use crate::agentic::tools::implementations::{GetToolSpecTool, SkillTool, TaskTool}; use crate::agentic::tools::{ resolve_tool_manifest, tool_context_runtime, ResolvedToolManifest, SubagentParentInfo, @@ -75,6 +75,15 @@ pub struct ContextCompactionOutcome { pub applied: bool, } +struct CompressionRuntimeScaffold { + ai_client: Arc, + tool_definitions: Option>, + system_prompt_message: Message, + prepended_prompt_reminders: PrependedPromptReminders, + primary_supports_image_understanding: bool, + compression_contract_limit: usize, +} + #[derive(Debug, Clone)] struct ContextHealthSnapshot { token_usage_ratio: f32, @@ -1154,6 +1163,346 @@ impl ExecutionEngine { content } + async fn build_compression_request_messages( + &self, + runtime_messages: &[Message], + dialog_turn_id: &str, + workspace_path: Option<&Path>, + provider: &str, + attach_images: bool, + prepended_prompt_reminders: &PrependedPromptReminders, + contract: Option<&crate::agentic::core::CompressionContract>, + ) -> BitFunResult> { + let prepended_reminders = prepended_prompt_reminders.ordered_reminders(); + let mut compression_messages = Self::build_ai_messages_for_send( + runtime_messages, + provider, + workspace_path, + dialog_turn_id, + attach_images, + &prepended_reminders, + ) + .await?; + compression_messages.push(AIMessage::user( + self.context_compressor.build_compact_prompt(contract), + )); + Ok(compression_messages) + } + + async fn request_compression_summary_with_retry( + &self, + ai_client: Arc, + request_messages: Vec, + tool_definitions: Option>, + max_tries: usize, + ) -> BitFunResult { + let mut last_error = None; + let base_wait_time_ms = 500; + + for attempt in 0..max_tries { + let result = ai_client + .send_message(request_messages.clone(), tool_definitions.clone()) + .await; + + match result { + Ok(response) => { + if response.tool_calls.is_some() { + return Err(BitFunError::AIClient( + "Compression request returned tool calls instead of a summary" + .to_string(), + )); + } + if attempt > 0 { + debug!( + "Compression summary generation succeeded (attempt {}/{})", + attempt + 1, + max_tries + ); + } + return Ok(response.text); + } + Err(err) => { + warn!( + "Compression summary generation failed (attempt {}/{}): {}", + attempt + 1, + max_tries, + err + ); + last_error = Some(err); + + if attempt < max_tries - 1 { + let delay_ms = base_wait_time_ms * (1 << attempt.min(3)); + debug!( + "Waiting {}ms before compression summary retry {}...", + delay_ms, + attempt + 2 + ); + tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; + } + } + } + } + + Err(BitFunError::AIClient(format!( + "Compression summary generation failed after {} attempts: {}", + max_tries, + last_error + .map(|err| err.to_string()) + .unwrap_or_else(|| "Unknown error".to_string()) + ))) + } + + async fn generate_compression_model_summary( + &self, + ai_client: Arc, + runtime_messages: &[Message], + dialog_turn_id: &str, + workspace_path: Option<&Path>, + tool_definitions: &Option>, + prepended_prompt_reminders: &PrependedPromptReminders, + primary_supports_image_understanding: bool, + context_window: usize, + contract: Option<&crate::agentic::core::CompressionContract>, + ) -> BitFunResult> { + let request_messages = self + .build_compression_request_messages( + runtime_messages, + dialog_turn_id, + workspace_path, + &ai_client.config.format, + primary_supports_image_understanding, + prepended_prompt_reminders, + contract, + ) + .await?; + let request_tokens = + TokenCounter::estimate_request_tokens(&request_messages, tool_definitions.as_deref()); + let max_request_tokens = self + .context_compressor + .max_model_request_tokens(context_window); + if request_tokens > max_request_tokens { + debug!( + "Skipping model-based compression because full-prefix request exceeds budget: dialog_turn_id={}, request_tokens={}, max_request_tokens={}", + dialog_turn_id, + request_tokens, + max_request_tokens + ); + return Ok(None); + } + + let raw_summary = self + .request_compression_summary_with_retry( + ai_client, + request_messages, + tool_definitions.clone(), + 2, + ) + .await?; + let summary = + ContextCompressor::normalize_model_summary_output(&raw_summary).ok_or_else(|| { + BitFunError::AIClient( + "Model-based compression returned without a usable " + .to_string(), + ) + })?; + Ok(Some(summary)) + } + + async fn resolve_compression_runtime_scaffold( + &self, + session: &Session, + context: &ExecutionContext, + ) -> BitFunResult { + let agent_registry = get_agent_registry(); + if let Some(workspace) = context.workspace.as_ref() { + agent_registry + .load_custom_subagents(workspace.root_path()) + .await; + } + + let current_agent = agent_registry + .get_agent( + &context.agent_type, + context + .workspace + .as_ref() + .map(|workspace| workspace.root_path()), + ) + .ok_or_else(|| { + BitFunError::NotFound(format!("Agent not found: {}", context.agent_type)) + })?; + + let original_user_input = context + .context + .get("original_user_input") + .cloned() + .unwrap_or_default(); + let model_id = self + .resolve_model_id_for_turn( + session, + &context.agent_type, + context.workspace.as_ref(), + &original_user_input, + context.turn_index, + ) + .await?; + + let ai_client_factory = get_global_ai_client_factory().await.map_err(|e| { + BitFunError::AIClient(format!("Failed to get AI client factory: {}", e)) + })?; + let ai_client = ai_client_factory + .get_client_resolved(&model_id) + .await + .map_err(|e| { + BitFunError::AIClient(format!( + "Failed to get AI client (model_id={}): {}", + model_id, e + )) + })?; + + let (resolved_primary_model_id, primary_supports_image_understanding, write_tool_mode) = { + let config_service = get_global_config_service().await.ok(); + if let Some(service) = config_service { + let ai_config: crate::service::config::types::AIConfig = + service.get_config(Some("ai")).await.unwrap_or_default(); + + let resolved_id = Self::resolve_configured_model_id(&ai_config, &model_id); + let model_cfg = ai_config + .models + .iter() + .find(|m| m.id == resolved_id) + .or_else(|| ai_config.models.iter().find(|m| m.name == resolved_id)) + .or_else(|| { + ai_config + .models + .iter() + .find(|m| m.model_name == resolved_id) + }) + .or_else(|| { + ai_config.models.iter().find(|m| { + m.model_name == ai_client.config.model + && m.provider == ai_client.config.format + }) + }); + + let supports = model_cfg.is_some_and(|m| { + m.capabilities + .iter() + .any(|cap| matches!(cap, ModelCapability::ImageUnderstanding)) + || matches!(m.category, ModelCategory::Multimodal) + }); + + (resolved_id, supports, ai_config.write_tool_mode) + } else { + warn!( + "Config service unavailable, assuming compression model is text-only for image input gating" + ); + (model_id.clone(), false, WriteToolMode::default()) + } + }; + + let model_capability_profile = ModelCapabilityProfile::from_resolved_model( + &resolved_primary_model_id, + &ai_client.config.model, + ); + let is_review_subagent = agent_registry + .get_subagent_is_review(&context.agent_type) + .unwrap_or(false); + let context_profile_policy = ContextProfilePolicy::for_agent_context( + &context.agent_type, + is_review_subagent, + model_capability_profile, + ); + + let tool_policy = agent_registry + .get_agent_tool_policy( + &context.agent_type, + context + .workspace + .as_ref() + .map(|workspace| workspace.root_path()), + ) + .await; + let allowed_tools = tool_policy.allowed_tools.clone(); + let enable_tools = context + .context + .get("enable_tools") + .and_then(|value| value.parse::().ok()) + .unwrap_or(true); + let write_tool_mode = if context + .context + .get("acp_transport") + .is_some_and(|value| value == "true") + { + WriteToolMode::InlineContent + } else { + write_tool_mode + }; + + let mut tool_manifest_context_vars = context.context.clone(); + tool_manifest_context_vars.insert( + "write_tool_mode".to_string(), + write_tool_mode.as_str().to_string(), + ); + + let tool_description_context = tool_context_runtime::build_tool_description_context( + &context.agent_type, + context.workspace.as_ref(), + context.workspace_services.as_ref(), + primary_supports_image_understanding, + &tool_manifest_context_vars, + ); + let tool_manifest = if enable_tools { + Some( + resolve_tool_manifest( + &allowed_tools, + &tool_policy.exposure_overrides, + &tool_description_context, + ) + .await, + ) + } else { + None + }; + let tool_listing_sections = if let Some(manifest) = tool_manifest.as_ref() { + Self::build_tool_listing_sections(manifest, &tool_description_context).await + } else { + ToolListingSections::default() + }; + let tool_definitions = tool_manifest.map(|manifest| manifest.tool_definitions); + + let prompt_context = Self::build_prompt_context( + context, + &ai_client.config.model, + primary_supports_image_understanding, + tool_listing_sections, + ) + .await; + let prepended_prompt_reminders = self + .build_cached_prepended_prompt_reminders( + &context.session_id, + current_agent.as_ref(), + prompt_context.as_ref(), + ) + .await; + let system_prompt = self + .resolve_cached_system_prompt( + &context.session_id, + current_agent.as_ref(), + prompt_context.as_ref(), + ) + .await?; + + Ok(CompressionRuntimeScaffold { + ai_client, + tool_definitions, + system_prompt_message: Message::system(system_prompt), + prepended_prompt_reminders, + primary_supports_image_understanding, + compression_contract_limit: context_profile_policy.compression_contract_limit, + }) + } + /// Compress context, will emit compression events (Started, Completed, and Failed) #[allow(clippy::too_many_arguments)] pub async fn compress_messages( @@ -1161,13 +1510,16 @@ impl ExecutionEngine { session_id: &str, dialog_turn_id: &str, _subagent_parent_info: Option, - messages: Vec, + runtime_messages: Vec, current_tokens: usize, context_window: usize, + ai_client: Arc, tool_definitions: &Option>, system_prompt_message: Message, + prepended_prompt_reminders: &PrependedPromptReminders, + primary_supports_image_understanding: bool, compression_contract_limit: usize, - tail_policy: CompressionTailPolicy, + workspace_path: Option<&Path>, ) -> BitFunResult)>> { let mut session = self .session_manager @@ -1177,13 +1529,11 @@ impl ExecutionEngine { // Record start time let start_time = std::time::Instant::now(); - let old_messages_len = messages.len(); - // Preprocess turns - let (turn_index_to_keep, turns) = self + let old_messages_len = runtime_messages.len(); + let turns = self .context_compressor - .preprocess_turns(session_id, context_window, messages) - .await?; - if turn_index_to_keep == 0 { + .collect_turns_for_auto_compression(session_id, runtime_messages.clone())?; + if turns.is_empty() { return Ok(None); } @@ -1209,18 +1559,37 @@ impl ExecutionEngine { let compression_contract = self .session_manager .compression_contract_for_session(session_id, compression_contract_limit); - match self - .context_compressor - .compress_turns_with_contract( - session_id, + let model_summary = match self + .generate_compression_model_summary( + ai_client, + &runtime_messages, + dialog_turn_id, + workspace_path, + tool_definitions, + prepended_prompt_reminders, + primary_supports_image_understanding, context_window, - turn_index_to_keep, - turns, - tail_policy, - compression_contract, + compression_contract.as_ref(), ) .await { + Ok(summary) => summary, + Err(err) => { + warn!( + "Model-based compression failed, falling back to structured local compression: {}", + err + ); + None + } + }; + match self.context_compressor.compress_turns_with_contract( + session_id, + context_window, + turns, + CompressionMode::Auto, + compression_contract, + model_summary, + ) { Ok(compression_result) => { self.session_manager .replace_context_messages(session_id, compression_result.messages.clone()) @@ -1313,20 +1682,24 @@ impl ExecutionEngine { #[allow(clippy::too_many_arguments)] pub async fn compact_session_context( &self, - session_id: &str, - dialog_turn_id: &str, + session_id: String, + dialog_turn_id: String, + context: ExecutionContext, messages: Vec, current_tokens: usize, - context_window: usize, trigger: &str, - tail_policy: CompressionTailPolicy, ) -> BitFunResult { let mut session = self .session_manager - .get_session(session_id) + .get_session(&session_id) .ok_or_else(|| BitFunError::NotFound(format!("Session not found: {}", session_id)))?; let start_time = std::time::Instant::now(); let compression_id = format!("compression_{}", uuid::Uuid::new_v4()); + let scaffold = self + .resolve_compression_runtime_scaffold(&session, &context) + .await?; + let context_window = (scaffold.ai_client.config.context_window as usize) + .min(session.config.max_context_tokens); self.emit_event( AgenticEvent::ContextCompressionStarted { @@ -1344,7 +1717,7 @@ impl ExecutionEngine { let turns = self .context_compressor - .collect_all_turns_for_manual_compaction(session_id, messages)?; + .collect_all_turns_for_manual_compaction(&session_id, messages.clone())?; if turns.is_empty() { let duration_ms = elapsed_ms_u64(start_time); @@ -1385,40 +1758,53 @@ impl ExecutionEngine { }); } - let is_review_subagent = get_agent_registry() - .get_subagent_is_review(&session.agent_type) - .unwrap_or(false); - let model_id = session.config.model_id.as_deref().unwrap_or_default(); - let context_profile_policy = ContextProfilePolicy::for_agent_context_and_model( - &session.agent_type, - is_review_subagent, - model_id, - model_id, - ); - let compression_contract = self.session_manager.compression_contract_for_session( - session_id, - context_profile_policy.compression_contract_limit, - ); - match self - .context_compressor - .compress_turns_with_contract( - session_id, + let mut runtime_messages = vec![scaffold.system_prompt_message.clone()]; + runtime_messages.extend(messages); + let compression_contract = self + .session_manager + .compression_contract_for_session(&session_id, scaffold.compression_contract_limit); + let model_summary = match self + .generate_compression_model_summary( + scaffold.ai_client.clone(), + &runtime_messages, + &dialog_turn_id, + context + .workspace + .as_ref() + .map(|workspace| workspace.root_path()), + &scaffold.tool_definitions, + &scaffold.prepended_prompt_reminders, + scaffold.primary_supports_image_understanding, context_window, - turns.len(), - turns, - tail_policy, - compression_contract, + compression_contract.as_ref(), ) .await { + Ok(summary) => summary, + Err(err) => { + warn!( + "Model-based manual compaction failed, falling back to structured local compression: {}", + err + ); + None + } + }; + match self.context_compressor.compress_turns_with_contract( + &session_id, + context_window, + turns, + CompressionMode::Manual, + compression_contract, + model_summary, + ) { Ok(compression_result) => { let mut compressed_messages = compression_result.messages; self.session_manager - .replace_context_messages(session_id, compressed_messages.clone()) + .replace_context_messages(&session_id, compressed_messages.clone()) .await; self.session_manager .invalidate_prompt_cache( - session_id, + &session_id, crate::agentic::session::PromptCacheScope::All, "manual_context_compaction_applied", ) @@ -1428,7 +1814,7 @@ impl ExecutionEngine { let compression_count = session.compression_state.compression_count; let _ = self .session_manager - .update_compression_state(session_id, session.compression_state.clone()) + .update_compression_state(&session_id, session.compression_state.clone()) .await; let duration_ms = elapsed_ms_u64(start_time); @@ -1993,10 +2379,16 @@ impl ExecutionEngine { messages.clone(), current_tokens, context_window, + ai_client.clone(), &tool_definitions, system_prompt_message.clone(), + &prepended_prompt_reminders, + primary_supports_image_understanding, context_profile_policy.compression_contract_limit, - CompressionTailPolicy::PreserveLiveFrontier, + context + .workspace + .as_ref() + .map(|workspace| workspace.root_path()), ) .await { @@ -2015,7 +2407,7 @@ impl ExecutionEngine { consecutive_compression_failures = 0; } Ok(None) => { - debug!("All turns need to be kept, no compression performed"); + debug!("No eligible multi-turn context available for compression"); consecutive_compression_failures = 0; } Err(e) => { diff --git a/src/crates/core/src/agentic/session/compression/compressor.rs b/src/crates/core/src/agentic/session/compression/compressor.rs index 64e83f846..c2a2e97bd 100644 --- a/src/crates/core/src/agentic/session/compression/compressor.rs +++ b/src/crates/core/src/agentic/session/compression/compressor.rs @@ -7,22 +7,17 @@ use super::fallback::{ CompressionSummaryArtifact, }; use crate::agentic::core::{ - render_system_reminder, CompressedTodoSnapshot, CompressionContract, CompressionEntry, - CompressionPayload, Message, MessageHelper, MessageRole, MessageSemanticKind, + render_system_reminder, CompressedMessage, CompressedMessageRole, CompressedTodoSnapshot, + CompressionContract, CompressionEntry, CompressionPayload, Message, MessageContent, + MessageHelper, MessageRole, MessageSemanticKind, }; -use crate::infrastructure::ai::{get_global_ai_client_factory, AIClient}; -use crate::util::errors::{BitFunError, BitFunResult}; -use crate::util::types::Message as AIMessage; -use anyhow; -use log::{debug, trace, warn}; -use std::sync::Arc; +use crate::util::errors::BitFunResult; +use log::{debug, trace}; /// Context compressor configuration #[derive(Debug, Clone)] pub struct CompressionConfig { - pub keep_turns_ratio: f32, - pub keep_last_turn_ratio: f32, - pub single_request_max_tokens_ratio: f32, + pub model_request_max_tokens_ratio: f32, pub fallback_max_tokens_ratio: f32, pub fallback_user_chars: usize, pub fallback_assistant_chars: usize, @@ -33,9 +28,7 @@ pub struct CompressionConfig { impl Default for CompressionConfig { fn default() -> Self { Self { - keep_turns_ratio: 0.3, - keep_last_turn_ratio: 0.4, - single_request_max_tokens_ratio: 0.7, + model_request_max_tokens_ratio: 0.95, fallback_max_tokens_ratio: 0.25, fallback_user_chars: 1000, fallback_assistant_chars: 1000, @@ -48,12 +41,11 @@ impl Default for CompressionConfig { #[derive(Debug, Clone)] pub struct TurnWithTokens { messages: Vec, - tokens: usize, } impl TurnWithTokens { - fn new(messages: Vec, tokens: usize) -> Self { - Self { messages, tokens } + fn new(messages: Vec) -> Self { + Self { messages } } } @@ -64,9 +56,9 @@ pub struct CompressionResult { } #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum CompressionTailPolicy { - CollapseAll, - PreserveLiveFrontier, +pub enum CompressionMode { + Auto, + Manual, } /// Stateless context compression service. @@ -79,20 +71,6 @@ impl ContextCompressor { Self { config } } - fn get_turn_index_to_keep(&self, turns_tokens: &[usize], token_limit: usize) -> usize { - let mut sum = 0; - let mut result = turns_tokens.len(); - for (idx, turn_token) in turns_tokens.iter().enumerate().rev() { - sum += turn_token; - if sum <= token_limit { - result = idx; - } else { - break; - } - } - result - } - fn collect_conversation_turns( &self, session_id: &str, @@ -137,19 +115,19 @@ impl ContextCompressor { Ok(turns_messages .into_iter() - .zip(turns_tokens) - .map(|(msgs, tokens)| TurnWithTokens::new(msgs, tokens)) + .map(TurnWithTokens::new) .collect()) } - /// Returns `(turn_index_to_keep, turns)`. - /// If `turn_index_to_keep` is 0, no compression is needed. - pub async fn preprocess_turns( + /// Collect all non-system conversation turns for an automatic compression pass. + /// + /// Auto-compression should not collapse the only active dialog turn mid-flight. + /// Within-turn pressure is handled by tool-result budgeting and emergency truncation. + pub fn collect_turns_for_auto_compression( &self, session_id: &str, - context_window: usize, messages: Vec, - ) -> BitFunResult<(usize, Vec)> { + ) -> BitFunResult> { debug!( "Starting session context compression analysis: session_id={}", session_id @@ -157,40 +135,19 @@ impl ContextCompressor { let turns = self.collect_conversation_turns(session_id, messages)?; if turns.is_empty() { - return Ok((0, Vec::new())); + return Ok(Vec::new()); } let turns_count = turns.len(); - let turns_tokens: Vec = turns.iter().map(|turn| turn.tokens).collect(); - // Auto-compression should not collapse the only active dialog turn mid-flight. - // Within-turn pressure is handled by tool-result budgeting and emergency truncation. if turns_count == 1 { debug!( "Single-turn session skipped for auto compression: session_id={}", session_id ); - return Ok((0, turns)); - } - - let token_limit_keep_turns = - (context_window as f32 * self.config.keep_turns_ratio) as usize; - let mut turn_index_to_keep = - self.get_turn_index_to_keep(&turns_tokens, token_limit_keep_turns); - if turn_index_to_keep == turns_count { - let token_limit_last_turn = - (context_window as f32 * self.config.keep_last_turn_ratio) as usize; - if let Some(last_turn_tokens) = turns_tokens.last() { - if *last_turn_tokens <= token_limit_last_turn { - turn_index_to_keep = turns_count - 1; - } - } + return Ok(Vec::new()); } - debug!( - "Turn index to keep after compression analysis: session_id={}, keep_from_turn={}", - session_id, turn_index_to_keep - ); - Ok((turn_index_to_keep, turns)) + Ok(turns) } /// Collect all non-system conversation turns for a full manual compaction pass. @@ -202,33 +159,36 @@ impl ContextCompressor { self.collect_conversation_turns(session_id, messages) } - pub async fn compress_turns( + pub fn max_model_request_tokens(&self, context_window: usize) -> usize { + ((context_window as f32 * self.config.model_request_max_tokens_ratio) as usize).max(256) + } + + pub fn compress_turns( &self, session_id: &str, context_window: usize, - turn_index_to_keep: usize, turns: Vec, - tail_policy: CompressionTailPolicy, + mode: CompressionMode, + model_summary: Option, ) -> BitFunResult { self.compress_turns_with_contract( session_id, context_window, - turn_index_to_keep, turns, - tail_policy, + mode, None, + model_summary, ) - .await } - pub async fn compress_turns_with_contract( + pub fn compress_turns_with_contract( &self, session_id: &str, context_window: usize, - turn_index_to_keep: usize, - mut turns: Vec, - tail_policy: CompressionTailPolicy, + turns: Vec, + mode: CompressionMode, contract: Option, + model_summary: Option, ) -> BitFunResult { if turns.is_empty() { debug!("No turns need compression: session_id={}", session_id); @@ -240,7 +200,7 @@ impl ContextCompressor { let Some(last_turn_messages) = turns.last().map(|turn| &turn.messages) else { debug!( - "No turns available after split, skipping compression: session_id={}", + "No turns available after collection, skipping compression: session_id={}", session_id ); return Ok(CompressionResult { @@ -255,33 +215,21 @@ impl ContextCompressor { let last_todo = MessageHelper::get_last_todo_snapshot(last_turn_messages); trace!("Last user message: {:?}", last_user_message); trace!("Last todo: {:?}", last_todo); - let turns_to_keep = turns.split_off(turn_index_to_keep); - - let mut compressed_messages = Vec::new(); - let mut has_model_summary = false; - if !turns.is_empty() { - let mut summary_artifact = self - .execute_compression_with_fallback(turns, context_window, contract) - .await?; - if turns_to_keep.is_empty() { - self.append_todo_snapshot(&mut summary_artifact, last_todo.clone()); - } - trace!("Compression summary artifact generated"); - has_model_summary = summary_artifact.used_model_summary; - let (boundary_message, summary_message) = self.create_summary_turn(summary_artifact); - compressed_messages.push(boundary_message); - compressed_messages.push(summary_message); - } - - if !turns_to_keep.is_empty() { - for turn in turns_to_keep { - compressed_messages.extend(turn.messages); - } - } else if matches!(tail_policy, CompressionTailPolicy::PreserveLiveFrontier) { - if let Some(last_user_message) = last_user_message { - compressed_messages.push(last_user_message); - } + let mut summary_artifact = match model_summary { + Some(summary) => self.build_model_summary_artifact(summary, contract), + None => self.build_fallback_summary_artifact(turns, context_window, contract), + }; + if matches!(mode, CompressionMode::Auto) { + self.append_live_boundary_context( + &mut summary_artifact, + last_user_message.as_ref(), + last_todo.as_ref(), + ); } + trace!("Compression summary artifact generated"); + let has_model_summary = summary_artifact.used_model_summary; + let (boundary_message, summary_message) = self.create_summary_turn(summary_artifact); + let compressed_messages = vec![boundary_message, summary_message]; debug!( "Compression completed: session_id={}, compressed_messages={}", @@ -311,34 +259,68 @@ impl ContextCompressor { (boundary, summary) } - fn append_todo_snapshot( + fn append_live_boundary_context( &self, summary_artifact: &mut CompressionSummaryArtifact, - todo_snapshot: Option, + last_user_message: Option<&Message>, + todo_snapshot: Option<&CompressedTodoSnapshot>, ) { - let Some(todo_snapshot) = todo_snapshot else { - return; - }; + let mut additions = Vec::new(); + let mut payload_messages = Vec::new(); + + if let Some(last_user_text) = + last_user_message.and_then(Self::render_boundary_user_message_text) + { + additions.push(format!( + "Most recent user message before this summary:\n{}", + last_user_text + )); + payload_messages.push(CompressedMessage { + role: CompressedMessageRole::User, + text: Some(last_user_text), + tool_calls: Vec::new(), + }); + } - let todo_text = Self::render_todo_snapshot(&todo_snapshot); + let todo_text = todo_snapshot + .map(Self::render_todo_snapshot) + .unwrap_or_default(); if !todo_text.is_empty() { - summary_artifact.summary_text = format!( - "{}\n\nLatest task list snapshot at the compression boundary:\n{}", - summary_artifact.summary_text.trim_end(), + additions.push(format!( + "Most recent task list snapshot before this summary:\n{}", todo_text - ); + )); } + if additions.is_empty() { + return; + } + + summary_artifact.summary_text = format!( + "{}\n\n{}", + summary_artifact.summary_text.trim_end(), + additions.join("\n\n") + ); summary_artifact .payload .entries .push(CompressionEntry::Turn { turn_id: None, - messages: Vec::new(), - todo: Some(todo_snapshot), + messages: payload_messages, + todo: todo_snapshot.cloned(), }); } + fn render_boundary_user_message_text(message: &Message) -> Option { + let text = match &message.content { + MessageContent::Text(text) => text.trim(), + MessageContent::Multimodal { text, .. } => text.trim(), + _ => return None, + }; + + (!text.is_empty()).then(|| text.to_string()) + } + fn render_todo_snapshot(todo_snapshot: &CompressedTodoSnapshot) -> String { if todo_snapshot.todos.is_empty() { return todo_snapshot.summary.clone().unwrap_or_default(); @@ -360,88 +342,60 @@ impl ContextCompressor { } fn render_boundary_marker_text(used_model_summary: bool) -> String { - let mut msg = "Earlier conversation was compressed for context management. Use the summary in the next assistant message as historical context.".to_string(); + let mut msg = "The earlier conversation is summarized in the next assistant message. Use it as prior context.".to_string(); if !used_model_summary { - msg.push_str(" This compressed context is a partial reconstructed record. Message text, tool arguments, task lists, and tool results may be truncated or omitted."); + msg.push_str(" This is a partial reconstructed record. Message text, tool arguments, task lists, and tool results may be truncated or omitted."); } msg } - async fn execute_compression_with_fallback( + fn build_model_summary_artifact( &self, - turns_to_compress: Vec, - context_window: usize, + summary: String, contract: Option, - ) -> BitFunResult { - let summary_result = match get_global_ai_client_factory().await { - Ok(ai_client_factory) => match ai_client_factory - .get_client_by_func_agent("compression") - .await - { - Ok(ai_client) => { - self.execute_compression( - ai_client, - turns_to_compress.clone(), - context_window, - contract.as_ref(), - ) - .await - } - Err(err) => Err(BitFunError::AIClient(format!( - "Failed to get AI client: {}", - err - ))), - }, - Err(err) => Err(BitFunError::AIClient(format!( - "Failed to get AI client factory: {}", - err - ))), + ) -> CompressionSummaryArtifact { + trace!("Compression summary: {}", summary); + let mut payload = CompressionPayload::from_summary(summary.clone()); + let summary_text = if let Some(contract) = contract.filter(|contract| !contract.is_empty()) + { + payload.entries.insert( + 0, + CompressionEntry::Contract { + contract: contract.clone(), + }, + ); + format!( + "{}\n\nSummary of the earlier conversation:\n{}", + contract.render_for_model(), + summary + ) + } else { + format!("Summary of the earlier conversation:\n{}", summary) }; - match summary_result { - Ok(summary) => { - trace!("Compression summary: {}", summary); - let mut payload = CompressionPayload::from_summary(summary.clone()); - let summary_text = - if let Some(contract) = contract.filter(|contract| !contract.is_empty()) { - payload.entries.insert( - 0, - CompressionEntry::Contract { - contract: contract.clone(), - }, - ); - format!( - "{}\n\nPrevious conversation is summarized below:\n{}", - contract.render_for_model(), - summary - ) - } else { - format!("Previous conversation is summarized below:\n{}", summary) - }; - Ok(CompressionSummaryArtifact { - summary_text, - payload, - used_model_summary: true, - }) - } - Err(err) => { - warn!( - "Model-based compression failed, falling back to structured local compression: {}", - err - ); - let summary_artifact = build_structured_compression_summary_with_contract( - turns_to_compress - .into_iter() - .map(|turn| turn.messages) - .collect(), - &self.build_fallback_options(context_window), - contract, - ); - Ok(summary_artifact) - } + CompressionSummaryArtifact { + summary_text, + payload, + used_model_summary: true, } } + fn build_fallback_summary_artifact( + &self, + turns_to_compress: Vec, + context_window: usize, + contract: Option, + ) -> CompressionSummaryArtifact { + build_structured_compression_summary_with_contract( + turns_to_compress + .into_iter() + .map(|turn| turn.messages) + .collect(), + &self.build_fallback_options(context_window), + contract, + ) + } + fn build_fallback_options(&self, context_window: usize) -> CompressionFallbackOptions { CompressionFallbackOptions { max_tokens: ((context_window as f32 * self.config.fallback_max_tokens_ratio) as usize) @@ -453,7 +407,7 @@ impl ContextCompressor { } } - fn normalize_model_summary_output(raw: &str) -> Option { + pub(crate) fn normalize_model_summary_output(raw: &str) -> Option { let trimmed = raw.trim(); if trimmed.is_empty() { return None; @@ -473,223 +427,7 @@ impl ContextCompressor { Some(trimmed.to_string()) } - async fn execute_compression( - &self, - ai_client: Arc, - turns_to_compress: Vec, - context_window: usize, - contract: Option<&CompressionContract>, - ) -> BitFunResult { - debug!("Compressing {} turn(s)", turns_to_compress.len()); - - fn gen_system_message_for_summary(prev_summary: &str) -> Message { - if prev_summary.is_empty() { - Message::system( - "You are a helpful AI assistant tasked with summarizing conversations." - .to_string(), - ) - } else { - Message::system(format!( - r#"You are a conversation summarization assistant performing an INCREMENTAL summary update. - -## Previous Summary -The conversation has already been partially summarized. Here is the existing summary: - - -{} - - -## Your Task -You will be given the CONTINUATION of this conversation. Your job is to: -1. Read and understand the new conversation segment -2. MERGE the new information into the existing summary -3. Output a single, unified summary that combines both the previous summary and the new conversation - -## Important Guidelines -- Preserve all important information from the previous summary -- Add new details from the current conversation segment -- If new information contradicts or updates previous information, use the newer information -- Maintain the same summary structure/format as specified in the user instructions -- The final output should be ONE cohesive summary, not two separate summaries -- Do not mention "previous summary" or "new conversation" in your output - write as if summarizing the entire conversation from the start - -Be thorough and precise. Do not lose important technical details from either the previous summary or the new conversation."#, - prev_summary - )) - } - } - - let max_tokens_in_one_request = - (context_window as f32 * self.config.single_request_max_tokens_ratio) as usize; - let mut current_tokens = 0; - let mut cur_messages = Vec::new(); - let mut summary = String::new(); - let mut request_cnt = 0; - for (idx, turn) in turns_to_compress.into_iter().enumerate() { - if current_tokens + turn.tokens <= max_tokens_in_one_request { - cur_messages.extend(turn.messages); - current_tokens += turn.tokens; - } else { - if !cur_messages.is_empty() { - summary = self - .generate_summary( - ai_client.clone(), - gen_system_message_for_summary(&summary), - cur_messages, - contract, - ) - .await?; - cur_messages = Vec::new(); - current_tokens = 0; - request_cnt += 1; - trace!( - "Compression request {} completed: turn_idx={}", - request_cnt, - idx - ); - } - - if turn.tokens <= max_tokens_in_one_request { - cur_messages.extend(turn.messages); - current_tokens = turn.tokens; - } else if let Some((messages_part1, messages_part2)) = - MessageHelper::split_messages_in_middle(turn.messages) - { - summary = self - .generate_summary( - ai_client.clone(), - gen_system_message_for_summary(&summary), - messages_part1, - contract, - ) - .await?; - request_cnt += 1; - debug!( - "[execute_compression] request_cnt={}, turn_idx={}, summary: \n{}", - request_cnt, idx, summary - ); - summary = self - .generate_summary( - ai_client.clone(), - gen_system_message_for_summary(&summary), - messages_part2, - contract, - ) - .await?; - request_cnt += 1; - debug!( - "[execute_compression] request_cnt={}, turn_idx={}, summary: \n{}", - request_cnt, idx, summary - ); - } else { - return Err(BitFunError::Service(format!( - "Compression Failed, turn {} cannot be split in middle", - idx - ))); - } - } - } - - if !cur_messages.is_empty() { - summary = self - .generate_summary( - ai_client.clone(), - gen_system_message_for_summary(&summary), - cur_messages, - contract, - ) - .await?; - request_cnt += 1; - trace!("Compression request {} completed", request_cnt); - } - Ok(summary) - } - - async fn generate_summary( - &self, - ai_client: Arc, - system_message_for_summary: Message, - messages: Vec, - contract: Option<&CompressionContract>, - ) -> BitFunResult { - let raw_summary = self - .generate_summary_with_retry( - ai_client, - system_message_for_summary, - messages, - contract, - 2, - ) - .await?; - Self::normalize_model_summary_output(&raw_summary).ok_or_else(|| { - BitFunError::AIClient( - "Model-based compression returned without a usable " - .to_string(), - ) - }) - } - - async fn generate_summary_with_retry( - &self, - ai_client: Arc, - system_message_for_summary: Message, - messages: Vec, - contract: Option<&CompressionContract>, - max_tries: usize, - ) -> BitFunResult { - let mut summary_messages = vec![AIMessage::from(system_message_for_summary)]; - summary_messages.extend(messages.iter().map(|m| { - let mut ai_msg = AIMessage::from(m); - ai_msg.reasoning_content = None; - ai_msg - })); - summary_messages.push(AIMessage::user(self.get_compact_prompt(contract))); - - let mut last_error = None; - let base_wait_time_ms = 500; - - for attempt in 0..max_tries { - let result = ai_client.send_message(summary_messages.clone(), None).await; - - match result { - Ok(response) => { - if attempt > 0 { - debug!( - "Summary generation succeeded (attempt {}/{})", - attempt + 1, - max_tries - ); - } - return Ok(response.text); - } - Err(e) => { - warn!( - "Summary generation failed (attempt {}/{}): {}", - attempt + 1, - max_tries, - e - ); - last_error = Some(e); - - if attempt < max_tries - 1 { - let delay_ms = base_wait_time_ms * (1 << attempt.min(3)); - debug!("Waiting {}ms before retry {}...", delay_ms, attempt + 2); - tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; - } - } - } - } - - let error_msg = format!( - "Summary generation failed after {} attempts: {}", - max_tries, - last_error.unwrap_or_else(|| anyhow::anyhow!("Unknown error")) - ); - warn!("{}", error_msg); - Err(BitFunError::AIClient(error_msg)) - } - - fn get_compact_prompt(&self, contract: Option<&CompressionContract>) -> String { + pub(crate) fn build_compact_prompt(&self, contract: Option<&CompressionContract>) -> String { let contract_instruction = contract .filter(|contract| !contract.is_empty()) .map(|contract| { @@ -701,10 +439,17 @@ Be thorough and precise. Do not lose important technical details from either the .unwrap_or_default(); format!( - r#"Your task is to create a detailed summary of the conversation so far, paying close attention to the user's explicit requests and your previous actions. + r#"Your current task is to create a detailed summary of the conversation so far, paying close attention to the user's explicit requests and your previous actions. This summary should be thorough in capturing technical details, code patterns, and architectural decisions that would be essential for continuing development work without losing context. {contract_instruction} +CRITICAL: Respond with TEXT ONLY. Do NOT call any tools. + +- Do NOT use Read, Bash, Grep, Glob, Edit, Write, or ANY other tool. +- You already have all the context you need in the conversation above. +- Tool calls will be REJECTED and will waste your only turn — you will fail the task. +- Your entire response must be plain text: an block followed by a block. + Before providing your final summary, wrap your analysis in tags to organize your thoughts and ensure you've covered all necessary points. Then output the final retained summary in tags. Important: only the content inside will be kept as compressed history. The section is transient and will be discarded, so do not put any required final information only in . In your analysis process: @@ -718,8 +463,8 @@ In your analysis process: - full code snippets - function signatures - file edits - - Errors that you ran into and how you fixed them - - Pay special attention to specific user feedback that you received, especially if the user told you to do something differently. + - Errors that you ran into and how you fixed them + - Pay special attention to specific user feedback that you received, especially if the user told you to do something differently. 2. Double-check for technical accuracy and completeness, addressing each required element thoroughly. Your summary should include the following sections: @@ -732,8 +477,7 @@ Your summary should include the following sections: 6. All user messages: List ALL user messages that are not tool results. These are critical for understanding the users' feedback and changing intent. 7. Pending Tasks: Outline any pending tasks that you have explicitly been asked to work on. 8. Current Work: Describe in detail precisely what was being worked on immediately before this summary request, paying special attention to the most recent messages from both user and assistant. Include file names and code snippets where applicable. -9. Optional Next Step: List the next step that you will take that is related to the most recent work you were doing. IMPORTANT: ensure that this step is DIRECTLY in line with the user's most recent explicit requests, and the task you were working on immediately before this summary request. If your last task was concluded, then only list next steps if they are explicitly in line with the users request. Do not start on tangential requests or really old requests that were already completed without confirming with the user first. -If there is a next step, include direct quotes from the most recent conversation showing exactly what task you were working on and where you left off. This should be verbatim to ensure there's no drift in task interpretation. +9. Optional Next Step: List the next step that you will take that is related to the most recent work you were doing. IMPORTANT: ensure that this step is DIRECTLY in line with the user's most recent explicit requests, and the task you were working on immediately before this summary request. If your last task was concluded, then only list next steps if they are explicitly in line with the users request. Do not start on tangential requests or really old requests that were already completed without confirming with the user first. If there is a next step, include direct quotes from the most recent conversation showing exactly what task you were working on and where you left off. This should be verbatim to ensure there's no drift in task interpretation. Here's an example of how your output should be structured: @@ -769,7 +513,7 @@ Here's an example of how your output should be structured: 5. Problem Solving: [Description of solved problems and ongoing troubleshooting] -6. All user messages: +6. All user messages: - [Detailed non tool use user message] - [...] @@ -787,7 +531,8 @@ Here's an example of how your output should be structured: -Please provide your summary based on the conversation so far, following this structure and ensuring precision and thoroughness in your response. +Please provide your summary based on the conversation so far, following this structure and ensuring precision and thoroughness in your response. +REMINDER: Do NOT call any tools. Respond with plain text only — an block followed by a block. Tool calls will be rejected and you will fail the task. "# ) } @@ -804,19 +549,14 @@ fn extract_tag_content<'a>(text: &'a str, tag: &str) -> Option<&'a str> { #[cfg(test)] mod tests { - use super::{CompressionTailPolicy, ContextCompressor, TurnWithTokens}; + use super::{CompressionMode, ContextCompressor, TurnWithTokens}; use crate::agentic::core::{ render_system_reminder, CompressionContract, CompressionContractItem, CompressionEntry, CompressionPayload, Message, MessageSemanticKind, }; fn make_turn(messages: Vec) -> TurnWithTokens { - let mut messages_with_tokens = messages; - let tokens = messages_with_tokens - .iter_mut() - .map(|message| message.get_tokens()) - .sum(); - TurnWithTokens::new(messages_with_tokens, tokens) + TurnWithTokens::new(messages) } fn todo_turn() -> TurnWithTokens { @@ -841,18 +581,17 @@ mod tests { ]) } - #[tokio::test] - async fn collapse_all_creates_closed_compression_turn() { + #[test] + fn manual_compression_creates_closed_compression_turn() { let compressor = ContextCompressor::new(Default::default()); let result = compressor .compress_turns( "session", 8000, - 1, vec![todo_turn()], - CompressionTailPolicy::CollapseAll, + CompressionMode::Manual, + None, ) - .await .expect("compression succeeds"); assert_eq!(result.messages.len(), 2); @@ -875,30 +614,31 @@ mod tests { crate::agentic::core::MessageContent::Text(text) => text, _ => panic!("expected assistant text summary"), }; - assert!(summary_text.contains("Latest task list snapshot at the compression boundary")); - assert!(summary_text.contains("Update compressor")); + assert!(summary_text.contains("Continue the refactor")); } - #[tokio::test] - async fn preserve_live_frontier_keeps_last_user_after_summary_turn() { + #[test] + fn auto_compression_appends_latest_user_and_todo_into_summary_turn() { let compressor = ContextCompressor::new(Default::default()); let result = compressor .compress_turns( "session", 8000, - 1, vec![todo_turn()], - CompressionTailPolicy::PreserveLiveFrontier, + CompressionMode::Auto, + Some("Model summary".to_string()), ) - .await .expect("compression succeeds"); - assert_eq!(result.messages.len(), 3); - assert_eq!( - result.messages[2].role, - crate::agentic::core::MessageRole::User - ); - assert!(result.messages[2].is_actual_user_message()); + assert_eq!(result.messages.len(), 2); + let summary_text = match &result.messages[1].content { + crate::agentic::core::MessageContent::Text(text) => text, + _ => panic!("expected assistant text summary"), + }; + assert!(summary_text.contains("Model summary")); + assert!(summary_text.contains("Most recent user message before this summary")); + assert!(summary_text.contains("Continue the refactor")); + assert!(summary_text.contains("Most recent task list snapshot before this summary")); } #[test] @@ -929,13 +669,6 @@ mod tests { )); } - #[test] - fn model_summary_boundary_marker_omits_partial_record_notice() { - let marker = ContextCompressor::render_boundary_marker_text(true); - assert!(!marker.contains("partial reconstructed record")); - assert!(marker.contains("historical context")); - } - #[test] fn model_summary_prompt_includes_compaction_contract() { let compressor = ContextCompressor::new(Default::default()); @@ -951,7 +684,7 @@ mod tests { subagent_statuses: Vec::new(), }; - let prompt = compressor.get_compact_prompt(Some(&contract)); + let prompt = compressor.build_compact_prompt(Some(&contract)); assert!(prompt.contains("authoritative factual context")); assert!(prompt.contains("src/lib.rs")); @@ -984,8 +717,8 @@ mod tests { assert_eq!(normalized, None); } - #[tokio::test] - async fn preprocess_turns_skips_single_active_turn() { + #[test] + fn auto_turn_collection_skips_single_active_turn() { let compressor = ContextCompressor::new(Default::default()); let messages = vec![ Message::system("system".to_string()), @@ -993,17 +726,15 @@ mod tests { Message::assistant("First reply".to_string()), ]; - let (turn_index, turns) = compressor - .preprocess_turns("session", 8_000, messages) - .await - .expect("preprocessing succeeds"); + let turns = compressor + .collect_turns_for_auto_compression("session", messages) + .expect("collection succeeds"); - assert_eq!(turn_index, 0); - assert_eq!(turns.len(), 1); + assert!(turns.is_empty()); } - #[tokio::test] - async fn manual_compaction_turn_collection_includes_all_non_system_turns() { + #[test] + fn manual_compaction_turn_collection_includes_all_non_system_turns() { let compressor = ContextCompressor::new(Default::default()); let messages = vec![ Message::system("system".to_string()), @@ -1016,10 +747,9 @@ mod tests { let manual_turns = compressor .collect_all_turns_for_manual_compaction("session", messages.clone()) .expect("manual collection succeeds"); - let (_, passive_turns) = compressor - .preprocess_turns("session", 8_000, messages) - .await - .expect("passive preprocessing succeeds"); + let passive_turns = compressor + .collect_turns_for_auto_compression("session", messages) + .expect("passive collection succeeds"); assert_eq!(manual_turns.len(), 2); assert_eq!(manual_turns.len(), passive_turns.len());