Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
315 changes: 315 additions & 0 deletions src-tauri/src/backend/app_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,315 @@
use serde_json::{json, Value};
use std::collections::HashMap;
use std::env;
use std::io::ErrorKind;
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;

use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStdin, Command};
use tokio::sync::{oneshot, Mutex};
use tokio::time::timeout;

use crate::backend::events::{AppServerEvent, EventSink};
use crate::types::WorkspaceEntry;

pub(crate) struct WorkspaceSession {
pub(crate) entry: WorkspaceEntry,
pub(crate) child: Mutex<Child>,
pub(crate) stdin: Mutex<ChildStdin>,
pub(crate) pending: Mutex<HashMap<u64, oneshot::Sender<Value>>>,
pub(crate) next_id: AtomicU64,
}

impl WorkspaceSession {
async fn write_message(&self, value: Value) -> Result<(), String> {
let mut stdin = self.stdin.lock().await;
let mut line = serde_json::to_string(&value).map_err(|e| e.to_string())?;
line.push('\n');
stdin
.write_all(line.as_bytes())
.await
.map_err(|e| e.to_string())
}

pub(crate) async fn send_request(&self, method: &str, params: Value) -> Result<Value, String> {
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
let (tx, rx) = oneshot::channel();
self.pending.lock().await.insert(id, tx);
self.write_message(json!({ "id": id, "method": method, "params": params }))
.await?;
rx.await.map_err(|_| "request canceled".to_string())
}

pub(crate) async fn send_notification(
&self,
method: &str,
params: Option<Value>,
) -> Result<(), String> {
let value = if let Some(params) = params {
json!({ "method": method, "params": params })
} else {
json!({ "method": method })
};
self.write_message(value).await
}

pub(crate) async fn send_response(&self, id: u64, result: Value) -> Result<(), String> {
self.write_message(json!({ "id": id, "result": result }))
.await
}
}

pub(crate) fn build_codex_path_env(codex_bin: Option<&str>) -> Option<String> {
let mut paths: Vec<String> = env::var("PATH")
.unwrap_or_default()
.split(':')
.filter(|value| !value.is_empty())
.map(|value| value.to_string())
.collect();
let mut extras = vec![
"/opt/homebrew/bin",
"/usr/local/bin",
"/usr/bin",
"/bin",
"/usr/sbin",
"/sbin",
]
.into_iter()
.map(|value| value.to_string())
.collect::<Vec<String>>();
if let Ok(home) = env::var("HOME") {
extras.push(format!("{home}/.local/bin"));
extras.push(format!("{home}/.local/share/mise/shims"));
extras.push(format!("{home}/.cargo/bin"));
extras.push(format!("{home}/.bun/bin"));
let nvm_root = Path::new(&home).join(".nvm/versions/node");
if let Ok(entries) = std::fs::read_dir(nvm_root) {
for entry in entries.flatten() {
let bin_path = entry.path().join("bin");
if bin_path.is_dir() {
extras.push(bin_path.to_string_lossy().to_string());
}
}
}
}
if let Some(bin_path) = codex_bin.filter(|value| !value.trim().is_empty()) {
let parent = Path::new(bin_path).parent();
if let Some(parent) = parent {
extras.push(parent.to_string_lossy().to_string());
}
}
for extra in extras {
if !paths.contains(&extra) {
paths.push(extra);
}
}
if paths.is_empty() {
None
} else {
Some(paths.join(":"))
}
}

pub(crate) fn build_codex_command_with_bin(codex_bin: Option<String>) -> Command {
let bin = codex_bin
.clone()
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| "codex".into());
let mut command = Command::new(bin);
if let Some(path_env) = build_codex_path_env(codex_bin.as_deref()) {
command.env("PATH", path_env);
}
command
}

pub(crate) async fn check_codex_installation(
codex_bin: Option<String>,
) -> Result<Option<String>, String> {
let mut command = build_codex_command_with_bin(codex_bin);
command.arg("--version");
command.stdout(std::process::Stdio::piped());
command.stderr(std::process::Stdio::piped());

let output = match timeout(Duration::from_secs(5), command.output()).await {
Ok(result) => result.map_err(|e| {
if e.kind() == ErrorKind::NotFound {
"Codex CLI not found. Install Codex and ensure `codex` is on your PATH."
.to_string()
} else {
e.to_string()
}
})?,
Err(_) => {
return Err(
"Timed out while checking Codex CLI. Make sure `codex --version` runs in Terminal."
.to_string(),
);
}
};

if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
let stdout = String::from_utf8_lossy(&output.stdout);
let detail = if stderr.trim().is_empty() {
stdout.trim()
} else {
stderr.trim()
};
if detail.is_empty() {
return Err(
"Codex CLI failed to start. Try running `codex --version` in Terminal."
.to_string(),
);
}
return Err(format!(
"Codex CLI failed to start: {detail}. Try running `codex --version` in Terminal."
));
}

let version = String::from_utf8_lossy(&output.stdout).trim().to_string();
Ok(if version.is_empty() { None } else { Some(version) })
}

pub(crate) async fn spawn_workspace_session<E: EventSink>(
entry: WorkspaceEntry,
default_codex_bin: Option<String>,
client_version: String,
event_sink: E,
) -> Result<Arc<WorkspaceSession>, String> {
let codex_bin = entry
.codex_bin
.clone()
.filter(|value| !value.trim().is_empty())
.or(default_codex_bin);
let _ = check_codex_installation(codex_bin.clone()).await?;

let mut command = build_codex_command_with_bin(codex_bin);
command.current_dir(&entry.path);
command.arg("app-server");
command.stdin(std::process::Stdio::piped());
command.stdout(std::process::Stdio::piped());
command.stderr(std::process::Stdio::piped());

let mut child = command.spawn().map_err(|e| e.to_string())?;
let stdin = child.stdin.take().ok_or("missing stdin")?;
let stdout = child.stdout.take().ok_or("missing stdout")?;
let stderr = child.stderr.take().ok_or("missing stderr")?;

let session = Arc::new(WorkspaceSession {
entry: entry.clone(),
child: Mutex::new(child),
stdin: Mutex::new(stdin),
pending: Mutex::new(HashMap::new()),
next_id: AtomicU64::new(1),
});

let session_clone = Arc::clone(&session);
let workspace_id = entry.id.clone();
let event_sink_clone = event_sink.clone();
tokio::spawn(async move {
let mut lines = BufReader::new(stdout).lines();
while let Ok(Some(line)) = lines.next_line().await {
if line.trim().is_empty() {
continue;
}
let value: Value = match serde_json::from_str(&line) {
Ok(value) => value,
Err(err) => {
let payload = AppServerEvent {
workspace_id: workspace_id.clone(),
message: json!({
"method": "codex/parseError",
"params": { "error": err.to_string(), "raw": line },
}),
};
event_sink_clone.emit_app_server_event(payload);
continue;
}
};

let maybe_id = value.get("id").and_then(|id| id.as_u64());
let has_method = value.get("method").is_some();
let has_result_or_error = value.get("result").is_some() || value.get("error").is_some();
if let Some(id) = maybe_id {
if has_result_or_error {
if let Some(tx) = session_clone.pending.lock().await.remove(&id) {
let _ = tx.send(value);
}
} else if has_method {
let payload = AppServerEvent {
workspace_id: workspace_id.clone(),
message: value,
};
event_sink_clone.emit_app_server_event(payload);
} else if let Some(tx) = session_clone.pending.lock().await.remove(&id) {
let _ = tx.send(value);
}
} else if has_method {
let payload = AppServerEvent {
workspace_id: workspace_id.clone(),
message: value,
};
event_sink_clone.emit_app_server_event(payload);
}
}
});

let workspace_id = entry.id.clone();
let event_sink_clone = event_sink.clone();
tokio::spawn(async move {
let mut lines = BufReader::new(stderr).lines();
while let Ok(Some(line)) = lines.next_line().await {
if line.trim().is_empty() {
continue;
}
let payload = AppServerEvent {
workspace_id: workspace_id.clone(),
message: json!({
"method": "codex/stderr",
"params": { "message": line },
}),
};
event_sink_clone.emit_app_server_event(payload);
}
});

let init_params = json!({
"clientInfo": {
"name": "codex_monitor",
"title": "CodexMonitor",
"version": client_version
}
});
let init_result = timeout(
Duration::from_secs(15),
session.send_request("initialize", init_params),
)
.await;
let init_response = match init_result {
Ok(response) => response,
Err(_) => {
let mut child = session.child.lock().await;
let _ = child.kill().await;
return Err(
"Codex app-server did not respond to initialize. Check that `codex app-server` works in Terminal."
.to_string(),
);
}
};
init_response?;
session.send_notification("initialized", None).await?;

let payload = AppServerEvent {
workspace_id: entry.id.clone(),
message: json!({
"method": "codex/connected",
"params": { "workspaceId": entry.id.clone() }
}),
};
event_sink.emit_app_server_event(payload);

Ok(session)
}
22 changes: 22 additions & 0 deletions src-tauri/src/backend/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use serde::Serialize;
use serde_json::Value;

#[derive(Serialize, Clone)]
pub(crate) struct AppServerEvent {
pub(crate) workspace_id: String,
pub(crate) message: Value,
}

#[derive(Debug, Serialize, Clone)]
pub(crate) struct TerminalOutput {
#[serde(rename = "workspaceId")]
pub(crate) workspace_id: String,
#[serde(rename = "terminalId")]
pub(crate) terminal_id: String,
pub(crate) data: String,
}

pub(crate) trait EventSink: Clone + Send + Sync + 'static {
fn emit_app_server_event(&self, event: AppServerEvent);
fn emit_terminal_output(&self, event: TerminalOutput);
}
2 changes: 2 additions & 0 deletions src-tauri/src/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub(crate) mod app_server;
pub(crate) mod events;
Loading