diff --git a/hyperdrive/src/kernel/process.rs b/hyperdrive/src/kernel/process.rs index b1a1c27aa..f1c2d279d 100644 --- a/hyperdrive/src/kernel/process.rs +++ b/hyperdrive/src/kernel/process.rs @@ -1,9 +1,13 @@ use crate::KERNEL_PROCESS_ID; +use bytes::{BufMut, Bytes, BytesMut}; use lib::{types::core as t, v1::ProcessV1}; use std::{ collections::{HashMap, VecDeque}, path::PathBuf, - sync::Arc, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex as StdMutex, + }, }; use tokio::{fs, sync::Mutex, task::JoinHandle}; use wasmtime::{ @@ -11,9 +15,13 @@ use wasmtime::{ Engine, Store, }; use wasmtime_wasi::{ - p2::{pipe::MemoryOutputPipe, IoView, WasiCtx, WasiCtxBuilder, WasiView}, + p2::{ + pipe::MemoryOutputPipe, IoView, StdoutStream, StreamResult, WasiCtx, WasiCtxBuilder, + WasiView, + }, DirPerms, FilePerms, }; +use wasmtime_wasi_io::{async_trait, poll::Pollable, streams::OutputStream}; use super::RestartBackoff; @@ -78,9 +86,9 @@ impl WasiView for ProcessWasiV1 { async fn make_table_and_wasi( home_directory_path: PathBuf, process_state: &ProcessState, -) -> (Table, WasiCtx, MemoryOutputPipe) { +) -> (Table, WasiCtx, RotatingOutputPipe) { let table = Table::new(); - let wasi_stderr = MemoryOutputPipe::new(STACK_TRACE_SIZE); + let wasi_stderr = RotatingOutputPipe::new(STACK_TRACE_SIZE); #[cfg(unix)] let tmp_path = home_directory_path @@ -126,7 +134,7 @@ async fn make_component_v1( wasm_bytes: &[u8], home_directory_path: PathBuf, process_state: ProcessState, -) -> anyhow::Result<(ProcessV1, Store, MemoryOutputPipe)> { +) -> anyhow::Result<(ProcessV1, Store, RotatingOutputPipe)> { let our_process_id = process_state.metadata.our.process.clone(); let send_to_terminal = process_state.send_to_terminal.clone(); @@ -483,3 +491,150 @@ pub async fn make_process_loop( } Ok(()) } + +#[derive(Clone)] +pub struct RotatingOutputPipe { + pipes: Arc>>, + current_pipe: Arc>, + capacity_per_pipe: usize, + max_pipes: usize, + bytes_written: Arc, +} + +impl RotatingOutputPipe { + pub fn new(total_capacity: usize) -> Self { + // Use multiple smaller pipes to avoid blocking + let max_pipes = 2; + let capacity_per_pipe = total_capacity / max_pipes; + + let mut pipes = Vec::new(); + for _ in 0..max_pipes { + pipes.push(MemoryOutputPipe::new(capacity_per_pipe)); + } + + Self { + pipes: Arc::new(StdMutex::new(pipes)), + current_pipe: Arc::new(StdMutex::new(0)), + capacity_per_pipe, + max_pipes, + bytes_written: Arc::new(AtomicUsize::new(0)), + } + } + + pub fn contents(&self) -> Bytes { + let pipes = self.pipes.lock().unwrap(); + let current = *self.current_pipe.lock().unwrap(); + + // Collect contents from all pipes + let mut all_data = BytesMut::new(); + + // Start from the next pipe after current (oldest data) + for i in 0..self.max_pipes { + let idx = (current + 1 + i) % self.max_pipes; + let pipe_contents = pipes[idx].contents(); + all_data.put_slice(&pipe_contents); + } + + all_data.freeze() + } + + pub fn try_into_bytes(self) -> Option { + Some(self.contents()) + } +} + +impl StdoutStream for RotatingOutputPipe { + fn stream(&self) -> Box { + let pipes = self.pipes.lock().unwrap(); + let current = *self.current_pipe.lock().unwrap(); + + // Create a wrapper that monitors writes and switches pipes when needed + Box::new(RotatingWrapperStream { + pipes: self.pipes.clone(), + current_pipe: self.current_pipe.clone(), + current_stream: pipes[current].stream(), + capacity_per_pipe: self.capacity_per_pipe, + bytes_written_to_current: 0, + bytes_written: self.bytes_written.clone(), + }) + } + + fn isatty(&self) -> bool { + false + } +} + +struct RotatingWrapperStream { + pipes: Arc>>, + current_pipe: Arc>, + current_stream: Box, + capacity_per_pipe: usize, + bytes_written_to_current: usize, + bytes_written: Arc, +} + +impl OutputStream for RotatingWrapperStream { + fn write(&mut self, bytes: Bytes) -> StreamResult<()> { + let bytes_len = bytes.len(); + + // Check if we need to switch to a new pipe + if self.bytes_written_to_current + bytes_len > self.capacity_per_pipe * 90 / 100 { + // Switch to next pipe (this will overwrite old data in a circular fashion) + let mut pipes = self.pipes.lock().unwrap(); + let mut current = self.current_pipe.lock().unwrap(); + + *current = (*current + 1) % pipes.len(); + + // Create a fresh pipe at this position (clearing old data) + pipes[*current] = MemoryOutputPipe::new(self.capacity_per_pipe); + self.current_stream = pipes[*current].stream(); + self.bytes_written_to_current = 0; + } + + // Write to current stream + let result = self.current_stream.write(bytes); + if result.is_ok() { + self.bytes_written_to_current += bytes_len; + self.bytes_written.fetch_add(bytes_len, Ordering::Relaxed); + } + + result + } + + fn flush(&mut self) -> StreamResult<()> { + self.current_stream.flush() + } + + fn check_write(&mut self) -> StreamResult { + // Always report available space + Ok(self.capacity_per_pipe - self.bytes_written_to_current) + } +} + +#[async_trait] +impl Pollable for RotatingWrapperStream { + async fn ready(&mut self) { + self.current_stream.ready().await + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_wrapper_rotation() { + let pipe = RotatingOutputPipeWrapper::new(100); + let mut stream = pipe.stream(); + + // Write data that will cause rotation + for i in 0..20 { + let data = format!("data{:02}", i); + stream.write(Bytes::from(data)).unwrap(); + } + + // Should have rotated and kept recent data + let contents = pipe.contents(); + assert!(contents.len() <= 100); + } +}