Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 180 additions & 31 deletions src/llm-coding-tools-core/src/tools/bash/tokio_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,111 @@

use super::{BashOutput, PIPE_BUFFER_CAPACITY};
use crate::error::{ToolError, ToolResult};
use core::fmt::Write;
use parking_lot::Mutex;
use process_wrap::tokio::*;
use std::path::Path;
use std::pin::Pin;
use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio::task::JoinHandle;

/// Maximum time to wait for pipe drains after timeout kill.
const PIPE_DRAIN_GRACE_PERIOD: Duration = Duration::from_millis(100);
/// Read chunk size for async pipe draining.
const PIPE_DRAIN_READ_CHUNK: usize = 8 * 1024;

type SharedPipeBuffer = Arc<Mutex<Vec<u8>>>;

struct PipeDrainTask {
handle: JoinHandle<()>,
buffer: SharedPipeBuffer,
}

#[inline]
fn spawn_pipe_drain_task<R>(mut pipe: R) -> PipeDrainTask
where
R: AsyncRead + Unpin + Send + 'static,
{
let buffer: SharedPipeBuffer = Arc::new(Mutex::new(Vec::with_capacity(PIPE_BUFFER_CAPACITY)));
let task_buffer = Arc::clone(&buffer);

let handle = tokio::spawn(async move {
let mut chunk = [0_u8; PIPE_DRAIN_READ_CHUNK];
loop {
match pipe.read(&mut chunk).await {
Ok(0) => break,
Ok(read) => task_buffer.lock().extend_from_slice(&chunk[..read]),
Err(_) => break,
}
}
});

PipeDrainTask { handle, buffer }
}

#[inline]
fn take_pipe_buffer(buffer: SharedPipeBuffer) -> Vec<u8> {
match Arc::try_unwrap(buffer) {
Ok(mutex) => mutex.into_inner(),
Err(shared) => shared.lock().clone(),
}
}

#[inline]
async fn await_pipe_drain_task(task: PipeDrainTask) -> Vec<u8> {
let PipeDrainTask { handle, buffer } = task;
let _ = handle.await;
take_pipe_buffer(buffer)
}

#[inline]
async fn await_pipe_drain_task_with_grace(task: PipeDrainTask, grace: Duration) -> Vec<u8> {
let PipeDrainTask { mut handle, buffer } = task;

tokio::select! {
_ = &mut handle => {},
_ = tokio::time::sleep(grace) => {
// Preserve strict timeout semantics while retaining buffered bytes.
// Buffer state is shared outside the task so abort cannot discard it.
handle.abort();
let _ = handle.await;
}
}

take_pipe_buffer(buffer)
}

#[inline]
fn timeout_with_buffered_output(
timeout: Duration,
stdout_data: &[u8],
stderr_data: &[u8],
) -> ToolError {
let stdout = String::from_utf8_lossy(stdout_data);
let stderr = String::from_utf8_lossy(stderr_data);

// Base message + outputs + stderr label.
let mut message = String::with_capacity(stdout.len() + stderr.len() + 64);
let _ = write!(message, "command timed out after {}ms", timeout.as_millis());

if !stdout.is_empty() {
message.push('\n');
message.push_str(&stdout);
}

if !stderr.is_empty() {
if stdout.is_empty() || !stdout.ends_with('\n') {
message.push('\n');
}
message.push_str("[stderr]\n");
message.push_str(&stderr);
}

ToolError::Timeout(message)
}

/// Executes a shell command with optional working directory and timeout.
///
Expand Down Expand Up @@ -67,40 +167,29 @@ pub async fn execute_command(

// Take stdout/stderr handles to drain them concurrently with process wait.
// This prevents deadlock when output exceeds pipe buffer (~64KB Linux, ~4KB Windows).
let mut stdout_pipe = child.stdout().take().expect("stdout was piped");
let mut stderr_pipe = child.stderr().take().expect("stderr was piped");
let stdout_pipe = child.stdout().take().expect("stdout was piped");
let stderr_pipe = child.stderr().take().expect("stderr was piped");

// Race between timeout and (process completion + pipe draining).
// Using join! inside select! avoids tokio::spawn overhead while still
// providing concurrent I/O for the pipe reads.
tokio::select! {
// Keep output drains independent from timeout selection so timed-out
// commands can still return buffered stdout/stderr.
let stdout_task = spawn_pipe_drain_task(stdout_pipe);
let stderr_task = spawn_pipe_drain_task(stderr_pipe);

// Race between timeout and process completion. Pipe drain tasks keep running
// regardless of which branch wins this select.
let wait_result = tokio::select! {
biased; // Check timeout first for consistent behavior

_ = tokio::time::sleep(timeout) => {
// Timeout: explicitly kill the process tree (Job Object on Windows, process group on Unix)
let _ = Pin::from(child.kill()).await;
Err(ToolError::Timeout(format!(
"command timed out after {}ms",
timeout.as_millis()
)))
}
_ = tokio::time::sleep(timeout) => None,
status = child.wait() => Some(status),
};

result = async {
tokio::join!(
child.wait(),
async {
let mut buf = Vec::with_capacity(PIPE_BUFFER_CAPACITY);
let _ = stdout_pipe.read_to_end(&mut buf).await;
buf
},
async {
let mut buf = Vec::with_capacity(PIPE_BUFFER_CAPACITY);
let _ = stderr_pipe.read_to_end(&mut buf).await;
buf
}
)
} => {
let (status, stdout_data, stderr_data) = result;
match wait_result {
Some(status) => {
let (stdout_data, stderr_data) = tokio::join!(
await_pipe_drain_task(stdout_task),
await_pipe_drain_task(stderr_task)
);
let status = status.map_err(|e| ToolError::Execution(e.to_string()))?;

Ok(BashOutput {
Expand All @@ -109,6 +198,22 @@ pub async fn execute_command(
stderr: String::from_utf8_lossy(&stderr_data).into_owned(),
})
}
None => {
// Timeout: explicitly kill the process tree (Job Object on Windows,
// process group on Unix), then briefly await pipe drains for buffered output.
let _ = Pin::from(child.kill()).await;

let (stdout_data, stderr_data) = tokio::join!(
await_pipe_drain_task_with_grace(stdout_task, PIPE_DRAIN_GRACE_PERIOD),
await_pipe_drain_task_with_grace(stderr_task, PIPE_DRAIN_GRACE_PERIOD)
);

Err(timeout_with_buffered_output(
timeout,
&stdout_data,
&stderr_data,
))
}
}
}

Expand Down Expand Up @@ -157,6 +262,50 @@ mod tests {
assert!(matches!(result, Err(ToolError::Timeout(_))));
}

#[tokio::test]
async fn timeout_preserves_buffered_output() {
let cmd = if cfg!(target_os = "windows") {
"echo stdout-before-timeout & echo stderr-before-timeout 1>&2 & ping -n 10 127.0.0.1 >nul"
} else {
"echo stdout-before-timeout; echo stderr-before-timeout 1>&2; sleep 10"
};

let result = execute_command(cmd, None, Duration::from_millis(500)).await;
match result {
Err(ToolError::Timeout(message)) => {
assert!(message.contains("stdout-before-timeout"));
assert!(message.contains("stderr-before-timeout"));
}
other => panic!("expected timeout error, got: {other:?}"),
}
}

#[tokio::test]
async fn grace_abort_retains_shared_pipe_buffer() {
use tokio::sync::oneshot;

let buffer: SharedPipeBuffer = Arc::new(Mutex::new(Vec::with_capacity(32)));
let task_buffer = Arc::clone(&buffer);
let (written_tx, written_rx) = oneshot::channel();
let (_block_tx, block_rx) = oneshot::channel::<()>();

let handle = tokio::spawn(async move {
task_buffer.lock().extend_from_slice(b"partial-output");
let _ = written_tx.send(());
let _ = block_rx.await; // block infinitely, task will be cancelled by grace period timeout
});

written_rx
.await
.expect("drain task should write buffered output before abort");

let data =
await_pipe_drain_task_with_grace(PipeDrainTask { handle, buffer }, Duration::ZERO)
.await;

assert_eq!(data, b"partial-output");
}

#[tokio::test]
async fn invalid_workdir_returns_error() {
let result = execute_command(
Expand Down