Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions crates/loopal-agent-hub/src/agent_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tracing::{info, warn};

use loopal_ipc::connection::{Connection, Incoming};
use loopal_ipc::protocol::methods;
use loopal_protocol::AgentEvent;
use loopal_protocol::{AgentEvent, Envelope};

use crate::dispatch::dispatch_hub_request;
use crate::hub::Hub;
Expand Down Expand Up @@ -125,25 +125,37 @@ fn spawn_wait_agent(
}

/// Register agent Connection in Hub and spawn background IO loop.
///
/// Sets up a completion channel + bridge so background sub-agent results
/// are forwarded to this agent via IPC `agent/message` notifications.
pub fn start_agent_io(
hub: Arc<Mutex<Hub>>,
name: &str,
conn: Arc<Connection>,
rx: tokio::sync::mpsc::Receiver<Incoming>,
) {
// Registration + IO loop in one background task (used by hub_server for incoming clients)
let hub2 = hub.clone();
let n = name.to_string();
let n2 = name.to_string();
let conn2 = conn.clone();
let conn3 = conn.clone();
tokio::spawn(async move {
// Completion channel: delivers background sub-agent results to this agent.
let (completion_tx, completion_rx) = tokio::sync::mpsc::channel::<Envelope>(32);
{
let mut h = hub.lock().await;
if let Err(e) = h.registry.register_connection(&n, conn2) {
if let Err(e) = h.registry.register_connection_with_parent(
&n,
conn2,
None,
None,
Some(completion_tx),
) {
tracing::warn!(agent = %n, error = %e, "registration failed");
return;
}
}
crate::spawn_manager::spawn_completion_bridge(&n, conn3, completion_rx);
info!(agent = %n, "agent registered in Hub");
let output = agent_io_loop(hub2, conn, rx, n.clone()).await;
let pending = {
Expand Down
8 changes: 7 additions & 1 deletion crates/loopal-agent-hub/src/dispatch/dispatch_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,21 @@ pub async fn handle_control(hub: &Arc<Mutex<Hub>>, params: Value) -> Result<Valu

pub async fn handle_interrupt(hub: &Arc<Mutex<Hub>>, params: Value) -> Result<Value, String> {
let target = params["target"].as_str().ok_or("missing 'target' field")?;
tracing::info!(target, "handle_interrupt: looking up agent connection");
let conn = {
let h = hub.lock().await;
h.registry
.get_agent_connection(target)
.ok_or_else(|| format!("no agent: '{target}'"))?
};
let _ = conn
let result = conn
.send_notification(methods::AGENT_INTERRUPT.name, json!({}))
.await;
match &result {
Ok(()) => tracing::info!(target, "handle_interrupt: notification sent"),
Err(e) => tracing::warn!(target, error = %e, "handle_interrupt: send failed"),
}
let _ = result;
Ok(json!({"ok": true}))
}

Expand Down
6 changes: 5 additions & 1 deletion crates/loopal-agent-hub/src/spawn_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ pub async fn register_agent_connection(
}

/// Bridge: reads from Hub-internal channel, forwards to agent via IPC notification.
fn spawn_completion_bridge(name: &str, conn: Arc<Connection>, mut rx: mpsc::Receiver<Envelope>) {
pub fn spawn_completion_bridge(
name: &str,
conn: Arc<Connection>,
mut rx: mpsc::Receiver<Envelope>,
) {
let n = name.to_string();
tokio::spawn(async move {
while let Some(envelope) = rx.recv().await {
Expand Down
1 change: 1 addition & 0 deletions crates/loopal-agent-server/src/session_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub(crate) async fn forward_loop(
}
Incoming::Notification { method, params } => {
if method == methods::AGENT_INTERRUPT.name {
tracing::info!("forward_loop: received agent/interrupt, signaling");
session.interrupt.signal();
session.interrupt_tx.send_modify(|v| *v = v.wrapping_add(1));
} else if method == methods::AGENT_MESSAGE.name {
Expand Down
8 changes: 8 additions & 0 deletions crates/loopal-runtime/src/agent_loop/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,15 @@ impl AgentLoopRunner {
///
/// **This is the ONLY way to change agent status.** Every status change
/// goes through this method, ensuring SSOT and deterministic event emission.
///
/// Skips emission when already in the target status (idempotent) to prevent
/// duplicate idle events — e.g. `emit_interrupted()` already transitions to
/// WaitingForInput, so the subsequent `transition(WaitingForInput)` at the
/// top of the loop must NOT emit a second AwaitingInput.
pub(super) async fn transition(&mut self, new_status: AgentStatus) -> Result<()> {
if self.status == new_status {
return Ok(());
}
self.status = new_status;
match new_status {
AgentStatus::Starting => Ok(()),
Expand Down
5 changes: 3 additions & 2 deletions crates/loopal-session/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ impl SessionController {

/// Interrupt the currently viewed agent.
pub fn interrupt(&self) {
tracing::debug!("session: interrupt signaled");
self.backend.interrupt_target(&self.active_target());
let target = self.active_target();
tracing::info!(target = %target, "session: interrupt signaled");
self.backend.interrupt_target(&target);
}

/// Interrupt a specific named agent (e.g., to terminate from agent panel).
Expand Down
3 changes: 3 additions & 0 deletions crates/loopal-session/src/controller_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ impl ControlBackend {
pub(crate) fn interrupt_target(&self, target: &str) {
match self {
Self::Local(ch) => {
tracing::info!(target, "interrupt_target: local signal");
ch.interrupt.signal();
ch.interrupt_tx.send_modify(|v| *v = v.wrapping_add(1));
}
Self::Hub(client) => {
tracing::info!(target, "interrupt_target: hub IPC");
let client = client.clone();
let target = target.to_string();
tokio::spawn(async move {
client.interrupt_target(&target).await;
tracing::info!(target = %target, "interrupt_target: hub IPC sent");
});
}
}
Expand Down
6 changes: 5 additions & 1 deletion crates/loopal-tui/src/input/navigation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,17 @@ pub(super) fn handle_down(app: &mut App) -> InputAction {

pub(super) fn handle_esc(app: &mut App) -> InputAction {
// Priority 1: exit agent view
if app.session.lock().active_view != loopal_session::ROOT_AGENT {
let active_view = app.session.lock().active_view.clone();
if active_view != loopal_session::ROOT_AGENT {
tracing::info!(view = %active_view, "ESC: exit agent view (not root)");
return InputAction::ExitAgentView;
}
let is_idle = app.session.lock().active_conversation().agent_idle;
if !is_idle {
tracing::info!("ESC: agent busy, sending interrupt");
return InputAction::Interrupt;
}
tracing::debug!("ESC: agent idle, no interrupt");
let now = Instant::now();
let is_empty = app.input.is_empty();
if is_empty {
Expand Down
Loading