Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ rust_binary(
"//crates/loopal-meta-hub",
"//crates/loopal-tui",
"//crates/loopal-acp",
"//crates/tools/process/background:loopal-tool-background",
"//crates/loopal-agent-server",
"//crates/loopal-agent-client",
"//crates/loopal-ipc",
Expand Down
15 changes: 8 additions & 7 deletions crates/loopal-acp/src/adapter/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ impl AcpAdapter {
let requested_sid = params["sessionId"].as_str().unwrap_or("");
let current_sid = self.session_id.lock().await.clone();

if let Some(ref sid) = current_sid {
if !requested_sid.is_empty() && requested_sid != sid.as_str() {
self.acp_out
.respond_error(id, jsonrpc::INVALID_REQUEST, "session mismatch")
.await;
return;
}
if let Some(ref sid) = current_sid
&& !requested_sid.is_empty()
&& requested_sid != sid.as_str()
{
self.acp_out
.respond_error(id, jsonrpc::INVALID_REQUEST, "session mismatch")
.await;
return;
}

self.client.shutdown_agent().await;
Expand Down
22 changes: 10 additions & 12 deletions crates/loopal-agent-client/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,13 @@ pub fn start_bridge(
let conn_ctrl = connection.clone();
tokio::spawn(async move {
while let Some(cmd) = control_rx.recv().await {
if let Ok(params) = serde_json::to_value(&cmd) {
if let Err(e) = conn_ctrl
if let Ok(params) = serde_json::to_value(&cmd)
&& let Err(e) = conn_ctrl
.send_request(methods::AGENT_CONTROL.name, params)
.await
{
warn!("bridge: control send failed: {e}");
break;
}
{
warn!("bridge: control send failed: {e}");
break;
}
}
});
Expand All @@ -72,14 +71,13 @@ pub fn start_bridge(
tokio::spawn(async move {
while let Some(envelope) = mailbox_rx.recv().await {
debug!(target_agent = %envelope.target, "bridge: forwarding message");
if let Ok(params) = serde_json::to_value(&envelope) {
if let Err(e) = conn_msg
if let Ok(params) = serde_json::to_value(&envelope)
&& let Err(e) = conn_msg
.send_request(methods::AGENT_MESSAGE.name, params)
.await
{
warn!("bridge: message send failed: {e}");
break;
}
{
warn!("bridge: message send failed: {e}");
break;
}
}
});
Expand Down
8 changes: 4 additions & 4 deletions crates/loopal-agent-client/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ impl AgentProcess {
return Ok(explicit);
}
// Otherwise, use the current executable (same binary, worker mode).
if let Ok(current) = std::env::current_exe() {
if current.exists() {
return Ok(current);
}
if let Ok(current) = std::env::current_exe()
&& current.exists()
{
return Ok(current);
}
Ok(explicit)
}
Expand Down
18 changes: 9 additions & 9 deletions crates/loopal-agent-hub/src/agent_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ pub async fn agent_io_loop(
.map(String::from);
info!(agent = %agent_name, has_result = agent_result.is_some(), "received agent/completed");
break;
} else if method == methods::AGENT_EVENT.name {
if let Ok(mut event) = serde_json::from_value::<AgentEvent>(params) {
if event.agent_name.is_none() {
event.agent_name = Some(agent_name.clone());
}
let h = hub.lock().await;
if h.registry.event_sender().try_send(event).is_err() {
tracing::debug!(agent = %agent_name, "event dropped (channel full)");
}
} else if method == methods::AGENT_EVENT.name
&& let Ok(mut event) = serde_json::from_value::<AgentEvent>(params)
{
if event.agent_name.is_none() {
event.agent_name = Some(agent_name.clone());
}
let h = hub.lock().await;
if h.registry.event_sender().try_send(event).is_err() {
tracing::debug!(agent = %agent_name, "event dropped (channel full)");
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions crates/loopal-agent-hub/src/agent_registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ impl AgentRegistry {
if self.agents.contains_key(name) {
return Err(format!("agent '{name}' already registered"));
}
if let Some(p) = parent {
if let Some(pa) = self.agents.get_mut(p) {
pa.info.children.push(name.to_string());
}
if let Some(p) = parent
&& let Some(pa) = self.agents.get_mut(p)
{
pa.info.children.push(name.to_string());
}
self.agents.insert(
name.to_string(),
Expand All @@ -84,10 +84,10 @@ impl AgentRegistry {

pub fn unregister_connection(&mut self, name: &str) {
let parent = self.agents.get(name).and_then(|a| a.info.parent.clone());
if let Some(ref p) = parent {
if let Some(pa) = self.agents.get_mut(p.as_str()) {
pa.info.children.retain(|c| c != name);
}
if let Some(ref p) = parent
&& let Some(pa) = self.agents.get_mut(p.as_str())
{
pa.info.children.retain(|c| c != name);
}
self.agents.remove(name);
self.completions.remove(name);
Expand Down
10 changes: 5 additions & 5 deletions crates/loopal-agent-hub/src/dispatch/dispatch_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ pub async fn handle_spawn_agent(
};
// On success, register a shadow entry so wait_agent can work locally.
// The completion will arrive via MetaHub → uplink → agent/message.
if let Ok(ref resp) = result {
if let Some(name) = resp["name"].as_str() {
let mut h = hub.lock().await;
h.registry.register_shadow(name, from_agent);
}
if let Ok(ref resp) = result
&& let Some(name) = resp["name"].as_str()
{
let mut h = hub.lock().await;
h.registry.register_shadow(name, from_agent);
}
return result;
}
Expand Down
25 changes: 12 additions & 13 deletions crates/loopal-agent-hub/src/finish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,18 @@ pub(crate) async fn finish_and_deliver(hub: &Arc<Mutex<Hub>>, name: &str, output
}
} else if let Some(parent) = parent_name {
let addr = QualifiedAddress::parse(&parent);
if addr.is_remote() {
if let Some(ul) = uplink {
let content =
format!("<agent-result name=\"{name}\">\n{output_text}\n</agent-result>");
let envelope = Envelope::new(
loopal_protocol::MessageSource::System("agent-completed".into()),
&parent,
content,
);
if let Err(e) = ul.route(&envelope).await {
tracing::warn!(agent = %name, parent = %parent, error = %e,
"failed to deliver completion to remote parent");
}
if addr.is_remote()
&& let Some(ul) = uplink
{
let content = format!("<agent-result name=\"{name}\">\n{output_text}\n</agent-result>");
let envelope = Envelope::new(
loopal_protocol::MessageSource::System("agent-completed".into()),
&parent,
content,
);
if let Err(e) = ul.route(&envelope).await {
tracing::warn!(agent = %name, parent = %parent, error = %e,
"failed to deliver completion to remote parent");
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions crates/loopal-agent-hub/src/spawn_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ pub async fn register_agent_connection(

{
let mut h = hub.lock().await;
if let Some(p) = parent {
if !h.registry.agents.contains_key(p) {
warn!(agent = %name, parent = %p, "parent not found");
}
if let Some(p) = parent
&& !h.registry.agents.contains_key(p)
{
warn!(agent = %name, parent = %p, "parent not found");
}
if let Err(e) = h.registry.register_connection_with_parent(
name,
Expand Down
25 changes: 12 additions & 13 deletions crates/loopal-agent-hub/src/uplink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,14 @@ pub async fn handle_reverse_requests(
serde_json::from_value::<loopal_protocol::Envelope>(params)
{
// If this is a remote agent completion, trigger shadow entry
if let loopal_protocol::MessageSource::System(ref tag) = env.source {
if tag == "agent-completed" {
if let Some(child) = extract_agent_result_name(&env) {
let output = env.content.text.clone();
let mut h = hub.lock().await;
h.registry.emit_agent_finished(&child, Some(output));
h.registry.unregister_connection(&child);
}
}
if let loopal_protocol::MessageSource::System(ref tag) = env.source
&& tag == "agent-completed"
&& let Some(child) = extract_agent_result_name(&env)
{
let output = env.content.text.clone();
let mut h = hub.lock().await;
h.registry.emit_agent_finished(&child, Some(output));
h.registry.unregister_connection(&child);
}
hub.lock().await.registry.route_message(&env).await.is_ok()
} else {
Expand Down Expand Up @@ -168,10 +167,10 @@ pub async fn handle_reverse_requests(
}
}
Incoming::Notification { method, params } => {
if method == methods::AGENT_MESSAGE.name {
if let Ok(env) = serde_json::from_value::<loopal_protocol::Envelope>(params) {
let _ = hub.lock().await.registry.route_message(&env).await;
}
if method == methods::AGENT_MESSAGE.name
&& let Ok(env) = serde_json::from_value::<loopal_protocol::Envelope>(params)
{
let _ = hub.lock().await.registry.route_message(&env).await;
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions crates/loopal-agent-hub/tests/suite/collaboration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,10 @@ async fn cascade_shutdown_interrupts_children() {
tokio::spawn(async move {
let mut rx = client_rx;
while let Some(msg) = rx.recv().await {
if let Incoming::Notification { method, .. } = &msg {
if method == methods::AGENT_INTERRUPT.name {
let _ = interrupt_tx.send(true).await;
}
if let Incoming::Notification { method, .. } = &msg
&& method == methods::AGENT_INTERRUPT.name
{
let _ = interrupt_tx.send(true).await;
}
if let Incoming::Request { id, .. } = msg {
let _ = cc.respond(id, json!({"ok": true})).await;
Expand Down
8 changes: 4 additions & 4 deletions crates/loopal-agent-hub/tests/suite/e2e_bootstrap_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ async fn full_bootstrap_hub_to_agent_roundtrip() {
/// Find the loopal binary. Checks LOOPAL_BINARY env var first (set by Bazel),
/// then falls back to Cargo target directory layout.
fn resolve_loopal_binary() -> String {
if let Ok(path) = std::env::var("LOOPAL_BINARY") {
if std::path::Path::new(&path).exists() {
return path;
}
if let Ok(path) = std::env::var("LOOPAL_BINARY")
&& std::path::Path::new(&path).exists()
{
return path;
}
let test_exe = std::env::current_exe().expect("current_exe");
let target_dir = test_exe
Expand Down
12 changes: 6 additions & 6 deletions crates/loopal-agent-server/src/interrupt_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ pub fn spawn(
let (tx, rx) = mpsc::channel(256);
tokio::spawn(async move {
while let Some(msg) = incoming_rx.recv().await {
if let Incoming::Notification { ref method, .. } = msg {
if method == methods::AGENT_INTERRUPT.name {
interrupt.signal();
interrupt_tx.send_modify(|v| *v = v.wrapping_add(1));
continue;
}
if let Incoming::Notification { ref method, .. } = msg
&& method == methods::AGENT_INTERRUPT.name
{
interrupt.signal();
interrupt_tx.send_modify(|v| *v = v.wrapping_add(1));
continue;
}
if tx.send(msg).await.is_err() {
break;
Expand Down
16 changes: 6 additions & 10 deletions crates/loopal-agent-server/src/session_hub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,9 @@ impl SharedSession {
.find(|c| c.id == client_id)
.is_some_and(|c| c.is_primary);
clients.retain(|c| c.id != client_id);
if was_primary {
if let Some(first) = clients.first_mut() {
first.is_primary = true;
tracing::info!(client = %first.id, "promoted to primary");
}
if was_primary && let Some(first) = clients.first_mut() {
first.is_primary = true;
tracing::info!(client = %first.id, "promoted to primary");
}
}

Expand Down Expand Up @@ -195,11 +193,9 @@ impl SharedSession {
}
// Promote new primary if needed
let has_primary = clients.iter().any(|c| c.is_primary);
if !has_primary {
if let Some(first) = clients.first_mut() {
first.is_primary = true;
tracing::info!(client = %first.id, "promoted to primary (dead cleanup)");
}
if !has_primary && let Some(first) = clients.first_mut() {
first.is_primary = true;
tracing::info!(client = %first.id, "promoted to primary (dead cleanup)");
}
}
}
40 changes: 20 additions & 20 deletions crates/loopal-agent-server/tests/suite/bridge_edge_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ async fn child_permission_request_denied() {
}
}
Ok(Some(Incoming::Notification { method, params })) => {
if method == methods::AGENT_EVENT.name {
if let Ok(ev) = serde_json::from_value::<AgentEvent>(params) {
let terminal = matches!(
ev.payload,
AgentEventPayload::Finished | AgentEventPayload::AwaitingInput
);
events.push(ev.payload);
if terminal {
break;
}
if method == methods::AGENT_EVENT.name
&& let Ok(ev) = serde_json::from_value::<AgentEvent>(params)
{
let terminal = matches!(
ev.payload,
AgentEventPayload::Finished | AgentEventPayload::AwaitingInput
);
events.push(ev.payload);
if terminal {
break;
}
}
}
Expand Down Expand Up @@ -108,16 +108,16 @@ async fn child_provider_error_handled() {
while tokio::time::Instant::now() < deadline {
match tokio::time::timeout(Duration::from_secs(5), rx.recv()).await {
Ok(Some(Incoming::Notification { method, params })) => {
if method == methods::AGENT_EVENT.name {
if let Ok(ev) = serde_json::from_value::<AgentEvent>(params) {
let terminal = matches!(
ev.payload,
AgentEventPayload::Finished | AgentEventPayload::AwaitingInput
);
events.push(ev.payload);
if terminal {
break;
}
if method == methods::AGENT_EVENT.name
&& let Ok(ev) = serde_json::from_value::<AgentEvent>(params)
{
let terminal = matches!(
ev.payload,
AgentEventPayload::Finished | AgentEventPayload::AwaitingInput
);
events.push(ev.payload);
if terminal {
break;
}
}
}
Expand Down
Loading
Loading