Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 160 additions & 5 deletions hyperdrive/src/kernel/process.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
use crate::KERNEL_PROCESS_ID;
use bytes::{BufMut, Bytes, BytesMut};
use lib::{types::core as t, v1::ProcessV1};
use std::{
collections::{HashMap, VecDeque},
path::PathBuf,
sync::Arc,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex as StdMutex,
},
};
use tokio::{fs, sync::Mutex, task::JoinHandle};
use wasmtime::{
component::{Component, Linker, ResourceTable as Table},
Engine, Store,
};
use wasmtime_wasi::{
p2::{pipe::MemoryOutputPipe, IoView, WasiCtx, WasiCtxBuilder, WasiView},
p2::{
pipe::MemoryOutputPipe, IoView, StdoutStream, StreamResult, WasiCtx, WasiCtxBuilder,
WasiView,
},
DirPerms, FilePerms,
};
use wasmtime_wasi_io::{async_trait, poll::Pollable, streams::OutputStream};

use super::RestartBackoff;

Expand Down Expand Up @@ -78,9 +86,9 @@ impl WasiView for ProcessWasiV1 {
async fn make_table_and_wasi(
home_directory_path: PathBuf,
process_state: &ProcessState,
) -> (Table, WasiCtx, MemoryOutputPipe) {
) -> (Table, WasiCtx, RotatingOutputPipe) {
let table = Table::new();
let wasi_stderr = MemoryOutputPipe::new(STACK_TRACE_SIZE);
let wasi_stderr = RotatingOutputPipe::new(STACK_TRACE_SIZE);

#[cfg(unix)]
let tmp_path = home_directory_path
Expand Down Expand Up @@ -126,7 +134,7 @@ async fn make_component_v1(
wasm_bytes: &[u8],
home_directory_path: PathBuf,
process_state: ProcessState,
) -> anyhow::Result<(ProcessV1, Store<ProcessWasiV1>, MemoryOutputPipe)> {
) -> anyhow::Result<(ProcessV1, Store<ProcessWasiV1>, RotatingOutputPipe)> {
let our_process_id = process_state.metadata.our.process.clone();
let send_to_terminal = process_state.send_to_terminal.clone();

Expand Down Expand Up @@ -483,3 +491,150 @@ pub async fn make_process_loop(
}
Ok(())
}

#[derive(Clone)]
pub struct RotatingOutputPipe {
pipes: Arc<StdMutex<Vec<MemoryOutputPipe>>>,
current_pipe: Arc<StdMutex<usize>>,
capacity_per_pipe: usize,
max_pipes: usize,
bytes_written: Arc<AtomicUsize>,
}

impl RotatingOutputPipe {
pub fn new(total_capacity: usize) -> Self {
// Use multiple smaller pipes to avoid blocking
let max_pipes = 2;
let capacity_per_pipe = total_capacity / max_pipes;

let mut pipes = Vec::new();
for _ in 0..max_pipes {
pipes.push(MemoryOutputPipe::new(capacity_per_pipe));
}

Self {
pipes: Arc::new(StdMutex::new(pipes)),
current_pipe: Arc::new(StdMutex::new(0)),
capacity_per_pipe,
max_pipes,
bytes_written: Arc::new(AtomicUsize::new(0)),
}
}

pub fn contents(&self) -> Bytes {
let pipes = self.pipes.lock().unwrap();
let current = *self.current_pipe.lock().unwrap();

// Collect contents from all pipes
let mut all_data = BytesMut::new();

// Start from the next pipe after current (oldest data)
for i in 0..self.max_pipes {
let idx = (current + 1 + i) % self.max_pipes;
let pipe_contents = pipes[idx].contents();
all_data.put_slice(&pipe_contents);
}

all_data.freeze()
}

pub fn try_into_bytes(self) -> Option<Bytes> {
Some(self.contents())
}
}

impl StdoutStream for RotatingOutputPipe {
fn stream(&self) -> Box<dyn OutputStream> {
let pipes = self.pipes.lock().unwrap();
let current = *self.current_pipe.lock().unwrap();

// Create a wrapper that monitors writes and switches pipes when needed
Box::new(RotatingWrapperStream {
pipes: self.pipes.clone(),
current_pipe: self.current_pipe.clone(),
current_stream: pipes[current].stream(),
capacity_per_pipe: self.capacity_per_pipe,
bytes_written_to_current: 0,
bytes_written: self.bytes_written.clone(),
})
}

fn isatty(&self) -> bool {
false
}
}

struct RotatingWrapperStream {
pipes: Arc<StdMutex<Vec<MemoryOutputPipe>>>,
current_pipe: Arc<StdMutex<usize>>,
current_stream: Box<dyn OutputStream>,
capacity_per_pipe: usize,
bytes_written_to_current: usize,
bytes_written: Arc<AtomicUsize>,
}

impl OutputStream for RotatingWrapperStream {
fn write(&mut self, bytes: Bytes) -> StreamResult<()> {
let bytes_len = bytes.len();

// Check if we need to switch to a new pipe
if self.bytes_written_to_current + bytes_len > self.capacity_per_pipe * 90 / 100 {
// Switch to next pipe (this will overwrite old data in a circular fashion)
let mut pipes = self.pipes.lock().unwrap();
let mut current = self.current_pipe.lock().unwrap();

*current = (*current + 1) % pipes.len();

// Create a fresh pipe at this position (clearing old data)
pipes[*current] = MemoryOutputPipe::new(self.capacity_per_pipe);
self.current_stream = pipes[*current].stream();
self.bytes_written_to_current = 0;
}

// Write to current stream
let result = self.current_stream.write(bytes);
if result.is_ok() {
self.bytes_written_to_current += bytes_len;
self.bytes_written.fetch_add(bytes_len, Ordering::Relaxed);
}

result
}

fn flush(&mut self) -> StreamResult<()> {
self.current_stream.flush()
}

fn check_write(&mut self) -> StreamResult<usize> {
// Always report available space
Ok(self.capacity_per_pipe - self.bytes_written_to_current)
}
}

#[async_trait]
impl Pollable for RotatingWrapperStream {
async fn ready(&mut self) {
self.current_stream.ready().await
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_wrapper_rotation() {
let pipe = RotatingOutputPipeWrapper::new(100);
let mut stream = pipe.stream();

// Write data that will cause rotation
for i in 0..20 {
let data = format!("data{:02}", i);
stream.write(Bytes::from(data)).unwrap();
}

// Should have rotated and kept recent data
let contents = pipe.contents();
assert!(contents.len() <= 100);
}
}