From 50d89fbe15d07d8e5019c5462e72709f259c8b7a Mon Sep 17 00:00:00 2001 From: Vladyslav Nikonov Date: Tue, 11 Nov 2025 02:02:53 +0200 Subject: [PATCH] feat(agent): Implement exec detached mode --- Cargo.lock | 5 +- devolutions-session/Cargo.toml | 2 +- devolutions-session/src/dvc/process.rs | 160 ++++++++++++++----------- devolutions-session/src/dvc/task.rs | 38 ++++++ 4 files changed, 134 insertions(+), 71 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7e8b1c283..3bdaa4cad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3926,13 +3926,14 @@ checksum = "5e0826a989adedc2a244799e823aece04662b66609d96af8dff7ac6df9a8925d" [[package]] name = "now-proto-pdu" -version = "0.3.2" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5023d146f7c4e7327e3343ae7ba3c716322ac2ef16e9405d1c6b84e3b66d40b3" +checksum = "00ac734dd7c77bf952b96f9755ffbc72b888e5d860d35575729f1ee4822887d0" dependencies = [ "bitflags 2.9.4", "ironrdp-core", "ironrdp-error 0.1.3", + "uuid", ] [[package]] diff --git a/devolutions-session/Cargo.toml b/devolutions-session/Cargo.toml index 0ab84168f..d0e21d080 100644 --- a/devolutions-session/Cargo.toml +++ b/devolutions-session/Cargo.toml @@ -44,7 +44,7 @@ win-api-wrappers = { path = "../crates/win-api-wrappers", optional = true } [dependencies.now-proto-pdu] optional = true -version = "0.3.2" +version = "0.4.1" features = ["std"] [target.'cfg(windows)'.build-dependencies] diff --git a/devolutions-session/src/dvc/process.rs b/devolutions-session/src/dvc/process.rs index 98cc096ee..6b7eaf55f 100644 --- a/devolutions-session/src/dvc/process.rs +++ b/devolutions-session/src/dvc/process.rs @@ -98,8 +98,6 @@ pub enum ServerChannelEvent { pub struct WinApiProcessCtx { session_id: u32, - io_notification_tx: Sender, - stdout_read_pipe: Option, stderr_read_pipe: Option, stdin_write_pipe: Option, @@ -123,7 +121,7 @@ impl WinApiProcessCtx { Ok(()) } - pub fn process_cancel(&mut self) -> Result<(), ExecError> { + pub fn process_cancel(&mut self, io_notification_tx: Sender) -> Result<(), ExecError> { info!( session_id = self.session_id, "Cancelling process execution by user request" @@ -135,15 +133,18 @@ impl WinApiProcessCtx { // Acknowledge client that cancel request has been processed // successfully. - self.io_notification_tx - .blocking_send(ServerChannelEvent::SessionCancelSuccess { - session_id: self.session_id, - })?; + io_notification_tx.blocking_send(ServerChannelEvent::SessionCancelSuccess { + session_id: self.session_id, + })?; Ok(()) } - pub fn wait(mut self, mut input_event_rx: WinapiSignaledReceiver) -> Result { + pub fn wait( + mut self, + mut input_event_rx: WinapiSignaledReceiver, + io_notification_tx: Sender, + ) -> Result { let session_id = self.session_id; info!(session_id, "Waiting for process to exit"); @@ -153,8 +154,7 @@ impl WinApiProcessCtx { const WAIT_OBJECT_INPUT_MESSAGE: WAIT_EVENT = WAIT_OBJECT_0; const WAIT_OBJECT_PROCESS_EXIT: WAIT_EVENT = WAIT_EVENT(WAIT_OBJECT_0.0 + 1); - self.io_notification_tx - .blocking_send(ServerChannelEvent::SessionStarted { session_id })?; + io_notification_tx.blocking_send(ServerChannelEvent::SessionStarted { session_id })?; loop { // SAFETY: No preconditions. @@ -179,7 +179,7 @@ impl WinApiProcessCtx { return Err(ExecError::Aborted); } ProcessIoInputEvent::CancelExecution => { - self.process_cancel()?; + self.process_cancel(io_notification_tx.clone())?; // wait for process to exit continue; @@ -209,6 +209,7 @@ impl WinApiProcessCtx { pub fn wait_with_io_redirection( mut self, mut input_event_rx: WinapiSignaledReceiver, + io_notification_tx: Sender, ) -> Result { let session_id = self.session_id; @@ -277,8 +278,7 @@ impl WinApiProcessCtx { // Signal client side about started execution - self.io_notification_tx - .blocking_send(ServerChannelEvent::SessionStarted { session_id })?; + io_notification_tx.blocking_send(ServerChannelEvent::SessionStarted { session_id })?; info!(session_id, "Process IO is ready for async loop execution"); loop { @@ -304,7 +304,7 @@ impl WinApiProcessCtx { return Err(ExecError::Aborted); } ProcessIoInputEvent::CancelExecution => { - self.process_cancel()?; + self.process_cancel(io_notification_tx.clone())?; // wait for process to exit continue; @@ -369,26 +369,24 @@ impl WinApiProcessCtx { // EOF on stdout pipe, close it and send EOF message to message_tx self.stdout_read_pipe = None; - self.io_notification_tx - .blocking_send(ServerChannelEvent::SessionDataOut { - session_id, - stream: NowExecDataStreamKind::Stdout, - last: true, - data: Vec::new(), - })?; + io_notification_tx.blocking_send(ServerChannelEvent::SessionDataOut { + session_id, + stream: NowExecDataStreamKind::Stdout, + last: true, + data: Vec::new(), + })?; } _code => return Err(err.into()), } continue; } - self.io_notification_tx - .blocking_send(ServerChannelEvent::SessionDataOut { - session_id, - stream: NowExecDataStreamKind::Stdout, - last: false, - data: stdout_buffer[..bytes_read as usize].to_vec(), - })?; + io_notification_tx.blocking_send(ServerChannelEvent::SessionDataOut { + session_id, + stream: NowExecDataStreamKind::Stdout, + last: false, + data: stdout_buffer[..bytes_read as usize].to_vec(), + })?; // Schedule next overlapped read // SAFETY: pipe is valid to read from, as long as it is not closed. @@ -432,26 +430,24 @@ impl WinApiProcessCtx { ERROR_HANDLE_EOF | ERROR_BROKEN_PIPE => { // EOF on stderr pipe, close it and send EOF message to message_tx self.stderr_read_pipe = None; - self.io_notification_tx - .blocking_send(ServerChannelEvent::SessionDataOut { - session_id, - stream: NowExecDataStreamKind::Stderr, - last: true, - data: Vec::new(), - })?; + io_notification_tx.blocking_send(ServerChannelEvent::SessionDataOut { + session_id, + stream: NowExecDataStreamKind::Stderr, + last: true, + data: Vec::new(), + })?; } _code => return Err(err.into()), } continue; } - self.io_notification_tx - .blocking_send(ServerChannelEvent::SessionDataOut { - session_id, - stream: NowExecDataStreamKind::Stderr, - last: false, - data: stderr_buffer[..bytes_read as usize].to_vec(), - })?; + io_notification_tx.blocking_send(ServerChannelEvent::SessionDataOut { + session_id, + stream: NowExecDataStreamKind::Stderr, + last: false, + data: stderr_buffer[..bytes_read as usize].to_vec(), + })?; // Schedule next overlapped read // SAFETY: pipe is valid to read from, as long as it is not closed. @@ -527,12 +523,12 @@ impl WinApiProcessBuilder { self } - /// Starts process execution and spawns IO thread to redirect stdio to/from dvc. - pub fn run( + fn run_impl( mut self, session_id: u32, - io_notification_tx: Sender, - ) -> Result { + io_notification_tx: Option>, + detached: bool, + ) -> Result, ExecError> { let command_line = format!("\"{}\" {}", self.executable, self.command_line) .trim_end() .to_owned(); @@ -557,31 +553,42 @@ impl WinApiProcessBuilder { let io_redirection = self.enable_io_redirection; let process_ctx = if io_redirection { - prepare_process_with_io_redirection( - session_id, - command_line, - current_directory, - self.env, - io_notification_tx.clone(), - )? + prepare_process_with_io_redirection(session_id, command_line, current_directory, self.env)? } else { - prepare_process( - session_id, - command_line, - current_directory, - self.env, - io_notification_tx.clone(), - )? + prepare_process(session_id, command_line, current_directory, self.env)? }; + // For detached mode, spawn a thread that waits for process exit and keeps temp files alive + if detached && !temp_files.is_empty() { + std::thread::spawn(move || { + let _temp_files = temp_files; + + // Wait for process to exit (indefinitely) + if let Err(error) = process_ctx.process.wait(None) { + error!(%error, session_id, "Failed to wait for detached process"); + return; + } + + info!(session_id, "Detached process exited"); + + // Temp files will be cleaned up when this thread exits + }); + + info!(session_id, "Detached process started successfully"); + return Ok(None); + } + // Create channel for `task` -> `Process IO thread` communication let (input_event_tx, input_event_rx) = winapi_signaled_mpsc_channel()?; + let io_notification_tx = + io_notification_tx.expect("BUG: io_notification_tx must be Some for non-detached mode"); + let join_handle = std::thread::spawn(move || { let run_result = if io_redirection { - process_ctx.wait_with_io_redirection(input_event_rx) + process_ctx.wait_with_io_redirection(input_event_rx, io_notification_tx.clone()) } else { - process_ctx.wait(input_event_rx) + process_ctx.wait(input_event_rx, io_notification_tx.clone()) }; let notification = match run_result { @@ -594,11 +601,31 @@ impl WinApiProcessBuilder { } }); - Ok(WinApiProcess { + Ok(Some(WinApiProcess { input_event_tx, join_handle, _temp_files: temp_files, - }) + })) + } + + /// Starts process execution and spawns IO thread to redirect stdio to/from dvc. + pub fn run( + self, + session_id: u32, + io_notification_tx: Sender, + ) -> Result { + Ok(self + .run_impl(session_id, Some(io_notification_tx), false)? + .expect("result should be non-optional when running in non-detached mode")) + } + + /// Starts process in detached mode (fire-and-forget). + /// No IO redirection. Process exit is monitored in a background thread to manage temp file cleanup. + /// Returns immediately after spawning. + pub fn run_detached(self, session_id: u32) -> Result<(), ExecError> { + // Result always empty and therefore ignored in detached mode. + self.run_impl(session_id, None, true)?; + Ok(()) } } @@ -607,7 +634,6 @@ fn prepare_process( mut command_line: WideString, current_directory: WideString, env: HashMap, - io_notification_tx: Sender, ) -> Result { let mut process_information = PROCESS_INFORMATION::default(); @@ -620,6 +646,7 @@ fn prepare_process( let environment_block = (!env.is_empty()).then(|| make_environment_block(env)).transpose()?; let mut creation_flags = NORMAL_PRIORITY_CLASS | CREATE_NEW_PROCESS_GROUP | CREATE_NEW_CONSOLE; + if environment_block.is_some() { creation_flags |= CREATE_UNICODE_ENVIRONMENT; } @@ -657,7 +684,6 @@ fn prepare_process( Ok(WinApiProcessCtx { session_id, - io_notification_tx, stdout_read_pipe: None, stderr_read_pipe: None, stdin_write_pipe: None, @@ -671,7 +697,6 @@ fn prepare_process_with_io_redirection( mut command_line: WideString, current_directory: WideString, env: HashMap, - io_notification_tx: Sender, ) -> Result { let mut process_information = PROCESS_INFORMATION::default(); @@ -741,7 +766,6 @@ fn prepare_process_with_io_redirection( let process_ctx = WinApiProcessCtx { session_id, - io_notification_tx, stdout_read_pipe: Some(stdout_read_pipe), stderr_read_pipe: Some(stderr_read_pipe), stdin_write_pipe: Some(stdin_write_pipe), diff --git a/devolutions-session/src/dvc/task.rs b/devolutions-session/src/dvc/task.rs index b20121dfa..ff11be440 100644 --- a/devolutions-session/src/dvc/task.rs +++ b/devolutions-session/src/dvc/task.rs @@ -316,6 +316,16 @@ impl MessageProcessor { Ok(()) } + async fn send_detached_process_success(&self, session_id: u32) -> Result<(), ExecError> { + self.io_notification_tx + .send(ServerChannelEvent::SessionExited { + session_id, + exit_code: 0, + }) + .await?; + Ok(()) + } + pub(crate) async fn process_message( &mut self, message: NowMessage<'static>, @@ -567,6 +577,13 @@ impl MessageProcessor { run_process = run_process.with_current_directory(directory); } + if exec_msg.is_detached() { + // Detached mode: fire-and-forget, no IO redirection + run_process.run_detached(exec_msg.session_id())?; + self.send_detached_process_success(exec_msg.session_id()).await?; + return Ok(()); + } + let process = run_process .with_io_redirection(exec_msg.is_with_io_redirection()) .run(exec_msg.session_id(), self.io_notification_tx.clone())?; @@ -594,6 +611,13 @@ impl MessageProcessor { run_batch = run_batch.with_current_directory(directory); } + if batch_msg.is_detached() { + // Detached mode: fire-and-forget, no IO redirection + run_batch.run_detached(batch_msg.session_id())?; + self.send_detached_process_success(batch_msg.session_id()).await?; + return Ok(()); + } + let process = run_batch .with_io_redirection(batch_msg.is_with_io_redirection()) .run(batch_msg.session_id(), self.io_notification_tx.clone())?; @@ -638,6 +662,13 @@ impl MessageProcessor { run_process = run_process.with_current_directory(directory); } + if winps_msg.is_detached() { + // Detached mode: fire-and-forget, no IO redirection + run_process.run_detached(winps_msg.session_id())?; + self.send_detached_process_success(winps_msg.session_id()).await?; + return Ok(()); + } + let process = run_process .with_io_redirection(winps_msg.is_with_io_redirection()) .run(winps_msg.session_id(), self.io_notification_tx.clone())?; @@ -684,6 +715,13 @@ impl MessageProcessor { run_process = run_process.with_current_directory(directory); } + if winps_msg.is_detached() { + // Detached mode: fire-and-forget, no IO redirection + run_process.run_detached(winps_msg.session_id())?; + self.send_detached_process_success(winps_msg.session_id()).await?; + return Ok(()); + } + let process = run_process .with_io_redirection(winps_msg.is_with_io_redirection()) .run(winps_msg.session_id(), self.io_notification_tx.clone())?;