Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/apps/desktop/src/api/acp_client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,24 @@ pub async fn start_acp_dialog_turn(
bitfun_core::util::errors::BitFunError::service(e.to_string())
})?;
}
AcpClientStreamEvent::ContextUsageUpdated(usage) => {
app_handle
.emit(
"agentic://acp-context-usage-updated",
serde_json::json!({
"sessionId": request.session_id,
"turnId": request.turn_id,
"clientId": request.client_id,
"used": usage.used,
"size": usage.size,
"cost": usage.cost,
"subagentParentInfo": null,
}),
)
.map_err(|e| {
bitfun_core::util::errors::BitFunError::service(e.to_string())
})?;
}
AcpClientStreamEvent::Completed => {
app_handle
.emit(
Expand Down
74 changes: 59 additions & 15 deletions src/crates/acp/src/client/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ use super::requirements::{
install_remote_npm_cli_package, predownload_npm_adapter, probe_executable, probe_npm_adapter,
probe_remote_executable, probe_remote_npx_adapter, resolve_configured_command,
};
use super::session_options::{model_config_id, session_options_from_state, AcpSessionOptions};
use super::session_options::{
model_config_id, session_options_from_state, AcpSessionContextUsage, AcpSessionOptions,
};
use super::session_persistence::AcpSessionPersistence;
pub use super::session_persistence::CreateAcpFlowSessionRecordResponse;
use super::stream::{
Expand Down Expand Up @@ -127,6 +129,7 @@ struct AcpRemoteSession {
active: Option<ActiveSession<'static, Agent>>,
models: Option<SessionModelState>,
config_options: Vec<SessionConfigOption>,
context_usage: Option<AcpSessionContextUsage>,
discard_pending_updates_before_next_prompt: bool,
}

Expand Down Expand Up @@ -154,6 +157,7 @@ impl AcpRemoteSession {
active: None,
models: None,
config_options: Vec::new(),
context_usage: None,
discard_pending_updates_before_next_prompt: false,
}
}
Expand Down Expand Up @@ -859,6 +863,7 @@ impl AcpClientService {
Ok(session_options_from_state(
session.models.as_ref(),
&session.config_options,
session.context_usage.as_ref(),
))
}

Expand Down Expand Up @@ -920,6 +925,7 @@ impl AcpClientService {
return Ok(session_options_from_state(
session.models.as_ref(),
&session.config_options,
session.context_usage.as_ref(),
));
}
Err(error) => {
Expand Down Expand Up @@ -947,6 +953,7 @@ impl AcpClientService {
return Ok(session_options_from_state(
session.models.as_ref(),
&session.config_options,
session.context_usage.as_ref(),
));
}

Expand Down Expand Up @@ -1045,23 +1052,34 @@ impl AcpClientService {
.await?;

discard_pending_session_updates_if_needed(&mut session).await;
let active = session
.active
.as_mut()
.ok_or_else(|| BitFunError::service("ACP session was not initialized"))?;
active.send_prompt(prompt).map_err(protocol_error)?;
{
let active = session
.active
.as_mut()
.ok_or_else(|| BitFunError::service("ACP session was not initialized"))?;
active.send_prompt(prompt).map_err(protocol_error)?;
}
let mut round_tracker = AcpStreamRoundTracker::new();
let mut tool_call_tracker = AcpToolCallTracker::new();

loop {
match active.read_update().await.map_err(protocol_error)? {
let message = {
let active = session
.active
.as_mut()
.ok_or_else(|| BitFunError::service("ACP session was not initialized"))?;
active.read_update().await.map_err(protocol_error)?
};

match message {
SessionMessage::SessionMessage(dispatch) => {
for event in acp_dispatch_to_stream_events_with_tracker(
let events = acp_dispatch_to_stream_events_with_tracker(
dispatch,
&mut tool_call_tracker,
)
.await?
{
.await?;
update_session_context_usage(&mut session, &events);
for event in events {
for event in round_tracker.apply(event) {
on_event(event)?;
}
Expand Down Expand Up @@ -1991,14 +2009,29 @@ async fn discard_pending_session_updates_if_needed(session: &mut AcpRemoteSessio
}

session.discard_pending_updates_before_next_prompt = false;
let Some(active) = session.active.as_mut() else {
return;
};

let started_at = Instant::now();
let mut discarded_count = 0usize;
while started_at.elapsed() < LOAD_REPLAY_DRAIN_MAX_DURATION {
match tokio::time::timeout(LOAD_REPLAY_DRAIN_QUIET_WINDOW, active.read_update()).await {
let update = {
let Some(active) = session.active.as_mut() else {
return;
};
tokio::time::timeout(LOAD_REPLAY_DRAIN_QUIET_WINDOW, active.read_update()).await
};

match update {
Ok(Ok(SessionMessage::SessionMessage(dispatch))) => {
let mut tracker = AcpToolCallTracker::new();
if let Ok(events) =
acp_dispatch_to_stream_events_with_tracker(dispatch, &mut tracker).await
{
update_session_context_usage(session, &events);
}
discarded_count += 1;
}
Ok(Ok(SessionMessage::StopReason(_))) => {
discarded_count += 1;
}
Ok(Ok(_)) => {
discarded_count += 1;
}
Expand All @@ -2021,6 +2054,17 @@ async fn discard_pending_session_updates_if_needed(session: &mut AcpRemoteSessio
}
}

fn update_session_context_usage(session: &mut AcpRemoteSession, events: &[AcpClientStreamEvent]) {
let Some(usage) = events.iter().rev().find_map(|event| match event {
AcpClientStreamEvent::ContextUsageUpdated(usage) => Some(usage.clone()),
_ => None,
}) else {
return;
};

session.context_usage = Some(usage);
}

fn protocol_error(error: impl std::fmt::Display) -> BitFunError {
BitFunError::service(format!("ACP protocol error: {}", error))
}
Expand Down
2 changes: 1 addition & 1 deletion src/crates/acp/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ pub use manager::{
AcpClientPermissionResponse, AcpClientService, CreateAcpFlowSessionRecordResponse,
SetAcpSessionModelRequest, SubmitAcpPermissionResponseRequest,
};
pub use session_options::{AcpSessionModelOption, AcpSessionOptions};
pub use session_options::{AcpSessionContextUsage, AcpSessionModelOption, AcpSessionOptions};
pub use stream::AcpClientStreamEvent;
77 changes: 64 additions & 13 deletions src/crates/acp/src/client/session_options.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,28 @@
use agent_client_protocol::schema::{
ModelInfo, SessionConfigKind, SessionConfigOption, SessionConfigOptionCategory,
Cost, ModelInfo, SessionConfigKind, SessionConfigOption, SessionConfigOptionCategory,
SessionConfigSelectOptions, SessionModelState,
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct AcpSessionContextUsage {
pub used: u64,
pub size: u64,
#[serde(default)]
pub cost: Option<Cost>,
}

impl From<agent_client_protocol::schema::UsageUpdate> for AcpSessionContextUsage {
fn from(update: agent_client_protocol::schema::UsageUpdate) -> Self {
Self {
used: update.used,
size: update.size,
cost: update.cost,
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct AcpSessionOptions {
Expand All @@ -13,6 +32,8 @@ pub struct AcpSessionOptions {
pub available_models: Vec<AcpSessionModelOption>,
#[serde(default)]
pub model_config_id: Option<String>,
#[serde(default)]
pub context_usage: Option<AcpSessionContextUsage>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -27,7 +48,9 @@ pub struct AcpSessionModelOption {
pub(super) fn session_options_from_state(
models: Option<&SessionModelState>,
config_options: &[SessionConfigOption],
context_usage: Option<&AcpSessionContextUsage>,
) -> AcpSessionOptions {
let context_usage = context_usage.cloned();
if let Some(models) = models.filter(|models| !models.available_models.is_empty()) {
return AcpSessionOptions {
current_model_id: Some(models.current_model_id.to_string()),
Expand All @@ -37,19 +60,24 @@ pub(super) fn session_options_from_state(
.map(model_option_from_model_info)
.collect(),
model_config_id: None,
context_usage,
};
}

model_config_option(config_options)
.map(|option| {
let (current_model_id, available_models) = select_model_values(option);
AcpSessionOptions {
current_model_id,
available_models,
model_config_id: Some(option.id.to_string()),
}
})
.unwrap_or_default()
if let Some(option) = model_config_option(config_options) {
let (current_model_id, available_models) = select_model_values(option);
return AcpSessionOptions {
current_model_id,
available_models,
model_config_id: Some(option.id.to_string()),
context_usage,
};
}

AcpSessionOptions {
context_usage,
..Default::default()
}
}

pub(super) fn model_config_id(config_options: &[SessionConfigOption]) -> Option<String> {
Expand Down Expand Up @@ -119,7 +147,7 @@ mod tests {
fn converts_native_model_state() {
let state = SessionModelState::new("gpt-5.4", vec![ModelInfo::new("gpt-5.4", "GPT 5.4")]);

let options = session_options_from_state(Some(&state), &[]);
let options = session_options_from_state(Some(&state), &[], None);

assert_eq!(options.current_model_id.as_deref(), Some("gpt-5.4"));
assert_eq!(options.available_models.len(), 1);
Expand All @@ -140,11 +168,34 @@ mod tests {
)
.category(SessionConfigOptionCategory::Model);

let options = session_options_from_state(None, &[config]);
let options = session_options_from_state(None, &[config], None);

assert_eq!(options.current_model_id.as_deref(), Some("fast"));
assert_eq!(options.model_config_id.as_deref(), Some("model"));
assert_eq!(options.available_models.len(), 2);
assert_eq!(options.available_models[1].id, "smart");
}

#[test]
fn includes_context_usage() {
let state = SessionModelState::new("gpt-5.4", vec![ModelInfo::new("gpt-5.4", "GPT 5.4")]);
let usage = AcpSessionContextUsage {
used: 42_000,
size: 128_000,
cost: Some(agent_client_protocol::schema::Cost::new(0.12, "USD")),
};

let options = session_options_from_state(Some(&state), &[], Some(&usage));

let context_usage = options.context_usage.expect("context usage");
assert_eq!(context_usage.used, 42_000);
assert_eq!(context_usage.size, 128_000);
assert_eq!(
context_usage
.cost
.as_ref()
.map(|cost| cost.currency.as_str()),
Some("USD")
);
}
}
38 changes: 38 additions & 0 deletions src/crates/acp/src/client/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use agent_client_protocol::util::MatchDispatch;
use bitfun_core::util::errors::{BitFunError, BitFunResult};
use bitfun_events::ToolEventData;

use super::session_options::AcpSessionContextUsage;
use super::tool_card_bridge::{acp_tool_name, normalize_tool_params};

#[derive(Debug, Clone)]
Expand All @@ -20,6 +21,7 @@ pub enum AcpClientStreamEvent {
AgentText(String),
AgentThought(String),
ToolEvent(ToolEventData),
ContextUsageUpdated(AcpSessionContextUsage),
Completed,
Cancelled,
}
Expand Down Expand Up @@ -77,6 +79,7 @@ impl AcpStreamRoundTracker {
events
}
AcpClientStreamEvent::ModelRoundStarted { .. }
| AcpClientStreamEvent::ContextUsageUpdated(_)
| AcpClientStreamEvent::Completed
| AcpClientStreamEvent::Cancelled => vec![event],
}
Expand Down Expand Up @@ -121,6 +124,11 @@ pub(super) async fn acp_dispatch_to_stream_events_with_tracker(
SessionUpdate::ToolCallUpdate(tool_call_update) => {
events.extend(acp_tool_call_update_events(tool_call_update, tracker));
}
SessionUpdate::UsageUpdate(usage_update) => {
events.push(AcpClientStreamEvent::ContextUsageUpdated(
AcpSessionContextUsage::from(usage_update),
));
}
_ => {}
}
Ok(())
Expand Down Expand Up @@ -423,12 +431,42 @@ mod tests {
AcpClientStreamEvent::AgentText(_) => "text",
AcpClientStreamEvent::AgentThought(_) => "thought",
AcpClientStreamEvent::ToolEvent(_) => "tool",
AcpClientStreamEvent::ContextUsageUpdated(_) => "usage",
AcpClientStreamEvent::Completed => "completed",
AcpClientStreamEvent::Cancelled => "cancelled",
})
.collect()
}

#[test]
fn exposes_context_usage_updates() {
use agent_client_protocol::JsonRpcMessage;

let mut tracker = AcpToolCallTracker::new();
let notification = SessionNotification::new(
"session-1",
SessionUpdate::UsageUpdate(agent_client_protocol::schema::UsageUpdate::new(
1_000, 4_000,
)),
)
.to_untyped_message()
.expect("notification");
let dispatch = agent_client_protocol::Dispatch::Notification(notification);

let events = tokio::runtime::Runtime::new()
.expect("runtime")
.block_on(acp_dispatch_to_stream_events_with_tracker(
dispatch,
&mut tracker,
))
.expect("dispatch");

assert!(matches!(
events.as_slice(),
[AcpClientStreamEvent::ContextUsageUpdated(usage)] if usage.used == 1_000 && usage.size == 4_000
));
}

#[test]
fn starts_new_round_for_text_after_tool() {
let mut tracker = AcpStreamRoundTracker::new();
Expand Down
Loading
Loading