From 61a51f25a28918ccdce7e83a470e893f0ccf5001 Mon Sep 17 00:00:00 2001 From: anaslimem Date: Fri, 27 Mar 2026 19:04:06 +0100 Subject: [PATCH 1/4] fix avoid hanging forever when send fails --- src/agent-client-protocol/src/rpc.rs | 31 ++++++++++++++++------------ 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/src/agent-client-protocol/src/rpc.rs b/src/agent-client-protocol/src/rpc.rs index 846ae17..01d91ba 100644 --- a/src/agent-client-protocol/src/rpc.rs +++ b/src/agent-client-protocol/src/rpc.rs @@ -117,17 +117,6 @@ where let (tx, rx) = oneshot::channel(); let id = self.next_id.fetch_add(1, Ordering::SeqCst); let id = RequestId::Number(id); - self.pending_responses.lock().unwrap().insert( - id.clone(), - PendingResponse { - deserialize: |value| { - serde_json::from_str::(value.get()) - .map(|out| Box::new(out) as _) - .map_err(|_| Error::internal_error().data("failed to deserialize response")) - }, - respond: tx, - }, - ); if self .outgoing_tx @@ -138,8 +127,24 @@ where })) .is_err() { - self.pending_responses.lock().unwrap().remove(&id); + return async move { + drop(rx.await); + Err(Error::internal_error().data("connection closed before request could be sent")) + }.boxed(); } + + self.pending_responses.lock().unwrap().insert( + id, + PendingResponse { + deserialize: |value| { + serde_json::from_str::(value.get()) + .map(|out| Box::new(out) as _) + .map_err(|_| Error::internal_error().data("failed to deserialize response")) + }, + respond: tx, + }, + ); + async move { let result = rx .await @@ -148,7 +153,7 @@ where .map_err(|_| Error::internal_error().data("failed to deserialize response"))?; Ok(*result) - } + }.boxed() } async fn handle_io( From d3b83f7dd7598190516d3da9987f35219f8dca41 Mon Sep 17 00:00:00 2001 From: anaslimem Date: Fri, 27 Mar 2026 19:09:50 +0100 Subject: [PATCH 2/4] Fix formatting --- src/agent-client-protocol/src/rpc.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/agent-client-protocol/src/rpc.rs b/src/agent-client-protocol/src/rpc.rs index 01d91ba..8f8ca2f 100644 --- a/src/agent-client-protocol/src/rpc.rs +++ b/src/agent-client-protocol/src/rpc.rs @@ -130,7 +130,8 @@ where return async move { drop(rx.await); Err(Error::internal_error().data("connection closed before request could be sent")) - }.boxed(); + } + .boxed(); } self.pending_responses.lock().unwrap().insert( @@ -153,7 +154,8 @@ where .map_err(|_| Error::internal_error().data("failed to deserialize response"))?; Ok(*result) - }.boxed() + } + .boxed() } async fn handle_io( From 3b3333dae2404c2a640ac7299f5c35d14a4caebc Mon Sep 17 00:00:00 2001 From: anaslimem Date: Sat, 28 Mar 2026 07:00:07 +0100 Subject: [PATCH 3/4] removed the unnecessary wait --- src/agent-client-protocol/src/rpc.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/agent-client-protocol/src/rpc.rs b/src/agent-client-protocol/src/rpc.rs index 8f8ca2f..c035d86 100644 --- a/src/agent-client-protocol/src/rpc.rs +++ b/src/agent-client-protocol/src/rpc.rs @@ -128,7 +128,6 @@ where .is_err() { return async move { - drop(rx.await); Err(Error::internal_error().data("connection closed before request could be sent")) } .boxed(); From a8a6d2d0dd00e5014e24ee16feae0ee3a680c338 Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Sat, 28 Mar 2026 07:05:00 +0100 Subject: [PATCH 4/4] Return earlier --- src/agent-client-protocol/src/lib.rs | 46 ++++++++++++++-------------- src/agent-client-protocol/src/rpc.rs | 39 +++++++++++------------ 2 files changed, 41 insertions(+), 44 deletions(-) diff --git a/src/agent-client-protocol/src/lib.rs b/src/agent-client-protocol/src/lib.rs index 383473b..010515a 100644 --- a/src/agent-client-protocol/src/lib.rs +++ b/src/agent-client-protocol/src/lib.rs @@ -81,7 +81,7 @@ impl Agent for ClientSideConnection { .request( AGENT_METHOD_NAMES.initialize, Some(ClientRequest::InitializeRequest(args)), - ) + )? .await } @@ -90,7 +90,7 @@ impl Agent for ClientSideConnection { .request::>( AGENT_METHOD_NAMES.authenticate, Some(ClientRequest::AuthenticateRequest(args)), - ) + )? .await .map(Option::unwrap_or_default) } @@ -101,7 +101,7 @@ impl Agent for ClientSideConnection { .request::>( AGENT_METHOD_NAMES.logout, Some(ClientRequest::LogoutRequest(args)), - ) + )? .await .map(Option::unwrap_or_default) } @@ -111,7 +111,7 @@ impl Agent for ClientSideConnection { .request( AGENT_METHOD_NAMES.session_new, Some(ClientRequest::NewSessionRequest(args)), - ) + )? .await } @@ -120,7 +120,7 @@ impl Agent for ClientSideConnection { .request::>( AGENT_METHOD_NAMES.session_load, Some(ClientRequest::LoadSessionRequest(args)), - ) + )? .await .map(Option::unwrap_or_default) } @@ -133,7 +133,7 @@ impl Agent for ClientSideConnection { .request( AGENT_METHOD_NAMES.session_set_mode, Some(ClientRequest::SetSessionModeRequest(args)), - ) + )? .await } @@ -142,7 +142,7 @@ impl Agent for ClientSideConnection { .request( AGENT_METHOD_NAMES.session_prompt, Some(ClientRequest::PromptRequest(args)), - ) + )? .await } @@ -162,7 +162,7 @@ impl Agent for ClientSideConnection { .request( AGENT_METHOD_NAMES.session_set_model, Some(ClientRequest::SetSessionModelRequest(args)), - ) + )? .await } @@ -171,7 +171,7 @@ impl Agent for ClientSideConnection { .request( AGENT_METHOD_NAMES.session_list, Some(ClientRequest::ListSessionsRequest(args)), - ) + )? .await } @@ -181,7 +181,7 @@ impl Agent for ClientSideConnection { .request( AGENT_METHOD_NAMES.session_fork, Some(ClientRequest::ForkSessionRequest(args)), - ) + )? .await } @@ -191,7 +191,7 @@ impl Agent for ClientSideConnection { .request( AGENT_METHOD_NAMES.session_resume, Some(ClientRequest::ResumeSessionRequest(args)), - ) + )? .await } @@ -201,7 +201,7 @@ impl Agent for ClientSideConnection { .request::>( AGENT_METHOD_NAMES.session_close, Some(ClientRequest::CloseSessionRequest(args)), - ) + )? .await .map(Option::unwrap_or_default) } @@ -214,7 +214,7 @@ impl Agent for ClientSideConnection { .request( AGENT_METHOD_NAMES.session_set_config_option, Some(ClientRequest::SetSessionConfigOptionRequest(args)), - ) + )? .await } @@ -223,7 +223,7 @@ impl Agent for ClientSideConnection { .request( format!("_{}", args.method), Some(ClientRequest::ExtMethodRequest(args)), - ) + )? .await } @@ -441,7 +441,7 @@ impl Client for AgentSideConnection { .request( CLIENT_METHOD_NAMES.session_request_permission, Some(AgentRequest::RequestPermissionRequest(args)), - ) + )? .await } @@ -450,7 +450,7 @@ impl Client for AgentSideConnection { .request::>( CLIENT_METHOD_NAMES.fs_write_text_file, Some(AgentRequest::WriteTextFileRequest(args)), - ) + )? .await .map(Option::unwrap_or_default) } @@ -460,7 +460,7 @@ impl Client for AgentSideConnection { .request( CLIENT_METHOD_NAMES.fs_read_text_file, Some(AgentRequest::ReadTextFileRequest(args)), - ) + )? .await } @@ -469,7 +469,7 @@ impl Client for AgentSideConnection { .request( CLIENT_METHOD_NAMES.terminal_create, Some(AgentRequest::CreateTerminalRequest(args)), - ) + )? .await } @@ -478,7 +478,7 @@ impl Client for AgentSideConnection { .request( CLIENT_METHOD_NAMES.terminal_output, Some(AgentRequest::TerminalOutputRequest(args)), - ) + )? .await } @@ -490,7 +490,7 @@ impl Client for AgentSideConnection { .request::>( CLIENT_METHOD_NAMES.terminal_release, Some(AgentRequest::ReleaseTerminalRequest(args)), - ) + )? .await .map(Option::unwrap_or_default) } @@ -503,7 +503,7 @@ impl Client for AgentSideConnection { .request( CLIENT_METHOD_NAMES.terminal_wait_for_exit, Some(AgentRequest::WaitForTerminalExitRequest(args)), - ) + )? .await } @@ -512,7 +512,7 @@ impl Client for AgentSideConnection { .request::>( CLIENT_METHOD_NAMES.terminal_kill, Some(AgentRequest::KillTerminalRequest(args)), - ) + )? .await .map(Option::unwrap_or_default) } @@ -529,7 +529,7 @@ impl Client for AgentSideConnection { .request( format!("_{}", args.method), Some(AgentRequest::ExtMethodRequest(args)), - ) + )? .await } diff --git a/src/agent-client-protocol/src/rpc.rs b/src/agent-client-protocol/src/rpc.rs index c035d86..d4694b7 100644 --- a/src/agent-client-protocol/src/rpc.rs +++ b/src/agent-client-protocol/src/rpc.rs @@ -113,28 +113,12 @@ where &self, method: impl Into>, params: Option, - ) -> impl Future> { + ) -> Result>> { let (tx, rx) = oneshot::channel(); let id = self.next_id.fetch_add(1, Ordering::SeqCst); let id = RequestId::Number(id); - - if self - .outgoing_tx - .unbounded_send(OutgoingMessage::Request(Request { - id: id.clone(), - method: method.into(), - params, - })) - .is_err() - { - return async move { - Err(Error::internal_error().data("connection closed before request could be sent")) - } - .boxed(); - } - self.pending_responses.lock().unwrap().insert( - id, + id.clone(), PendingResponse { deserialize: |value| { serde_json::from_str::(value.get()) @@ -145,7 +129,21 @@ where }, ); - async move { + if self + .outgoing_tx + .unbounded_send(OutgoingMessage::Request(Request { + id: id.clone(), + method: method.into(), + params, + })) + .is_err() + { + self.pending_responses.lock().unwrap().remove(&id); + return Err( + Error::internal_error().data("connection closed before request could be sent") + ); + } + Ok(async move { let result = rx .await .map_err(|_| Error::internal_error().data("server shut down unexpectedly"))?? @@ -153,8 +151,7 @@ where .map_err(|_| Error::internal_error().data("failed to deserialize response"))?; Ok(*result) - } - .boxed() + }) } async fn handle_io(