Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 72 additions & 3 deletions relay-pty/src/inject.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,79 @@ fn current_timestamp_ms() -> u64 {
#[cfg(test)]
mod tests {
use super::*;
use tokio::sync::{broadcast, mpsc};

fn test_config(idle_timeout_ms: u64) -> Config {
Config {
idle_timeout_ms,
..Config::default()
}
}

fn test_parse_result(is_idle: bool) -> ParseResult {
ParseResult {
commands: Vec::new(),
continuity_commands: Vec::new(),
is_idle,
ready_signal: false,
}
}

#[tokio::test]
async fn test_update_from_parse_sets_idle() {
let (pty_tx, _pty_rx) = mpsc::channel(1);
let (response_tx, _response_rx) = broadcast::channel(1);
let queue = Arc::new(MessageQueue::new(1, response_tx));
let injector = Injector::new(pty_tx, queue, test_config(600000));

injector.update_from_parse(&test_parse_result(true));
assert!(injector.check_idle());
}

#[tokio::test]
async fn test_record_output_clears_idle_on_non_relay() {
let (pty_tx, _pty_rx) = mpsc::channel(1);
let (response_tx, _response_rx) = broadcast::channel(1);
let queue = Arc::new(MessageQueue::new(1, response_tx));
let injector = Injector::new(pty_tx, queue, test_config(600000));

injector.update_from_parse(&test_parse_result(true));
assert!(injector.check_idle());

injector.record_output("Hello world").await;
assert!(!injector.check_idle());
}

#[tokio::test]
async fn test_record_output_keeps_idle_on_relay_echo() {
let (pty_tx, _pty_rx) = mpsc::channel(1);
let (response_tx, _response_rx) = broadcast::channel(1);
let queue = Arc::new(MessageQueue::new(1, response_tx));
let injector = Injector::new(pty_tx, queue, test_config(600000));

injector.update_from_parse(&test_parse_result(true));
assert!(injector.check_idle());

injector
.record_output("Relay message from Alice [abc]: Hi\n")
.await;
assert!(injector.check_idle());
}

#[test]
fn test_idle_timeout_zero_is_immediately_idle() {
let (pty_tx, _pty_rx) = mpsc::channel(1);
let (response_tx, _response_rx) = broadcast::channel(1);
let queue = Arc::new(MessageQueue::new(1, response_tx));
let injector = Injector::new(pty_tx, queue, test_config(0));

assert!(injector.check_idle());
}

#[test]
fn test_silence_detection() {
// This is a basic structure test
// Full integration tests require PTY setup
fn test_is_relay_echo() {
assert!(is_relay_echo("Relay message from Alice [abc]: Hi\n"));
assert!(is_relay_echo("\nRelay message from Bob [def]: Yo\n\n"));
assert!(!is_relay_echo("Some other output\n"));
}
}
31 changes: 25 additions & 6 deletions relay-pty/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ struct Args {
#[arg(short, long)]
name: String,

/// Unix socket path (default: /tmp/relay-pty-{name}.sock)
/// Unix socket path (default: /tmp/relay-pty-{name}.sock or /tmp/relay/{WORKSPACE_ID}/sockets/{name}.sock)
#[arg(short, long)]
socket: Option<String>,

Expand Down Expand Up @@ -89,7 +89,7 @@ struct Args {
#[arg(long)]
log_file: Option<String>,

/// Outbox directory for file-based relay messages
/// Outbox directory for file-based relay messages (default: /tmp/relay/{WORKSPACE_ID}/outbox/{name} when set)
#[arg(long)]
outbox: Option<String>,

Expand All @@ -116,9 +116,24 @@ async fn main() -> Result<()> {
info!("Command: {:?}", args.command);

// Build configuration
let socket_path = args
.socket
.unwrap_or_else(|| format!("/tmp/relay-pty-{}.sock", args.name));
let workspace_id = std::env::var("WORKSPACE_ID")
.ok()
.map(|id| id.trim().to_string())
.filter(|id| !id.is_empty());

let socket_path = args.socket.unwrap_or_else(|| {
if let Some(ref workspace_id) = workspace_id {
format!("/tmp/relay/{}/sockets/{}.sock", workspace_id, args.name)
} else {
format!("/tmp/relay-pty-{}.sock", args.name)
}
});

let outbox_path = args.outbox.or_else(|| {
workspace_id
.as_ref()
.map(|id| format!("/tmp/relay/{}/outbox/{}", id, args.name))
});

let config = Config {
name: args.name.clone(),
Expand Down Expand Up @@ -183,7 +198,7 @@ async fn main() -> Result<()> {
let injector = Arc::new(Injector::new(inject_tx, Arc::clone(&queue), config.clone()));

// Create output parser
let mut parser = if let Some(ref outbox) = args.outbox {
let mut parser = if let Some(ref outbox) = outbox_path {
let outbox_path = std::path::PathBuf::from(outbox);
// Create outbox directory if needed
if !outbox_path.exists() {
Expand Down Expand Up @@ -330,6 +345,10 @@ async fn main() -> Result<()> {
let json = serde_json::to_string(&cmd)?;
eprintln!("{}", json);
}
for cmd in parse_result.continuity_commands {
let json = serde_json::to_string(&cmd)?;
eprintln!("{}", json);
}
}
} else {
// PTY closed
Expand Down
Loading
Loading