From 1e51a6b98fcd798094279a643c67d06c66316b7c Mon Sep 17 00:00:00 2001 From: Wayland Yang Date: Wed, 20 May 2026 03:45:54 +0800 Subject: [PATCH 1/4] =?UTF-8?q?feat(controller):=20workspaces=20=E2=80=94?= =?UTF-8?q?=20POST/GET/DELETE=20/v1/workspaces=20+=20suspend/resume?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stateful workspace MVP for #116. New types in api.rs: - WorkspaceStatus (Running / Suspended / Stale) - CreateWorkspaceRequest { name, snapshot_tag, per_child_netns, memory_limit_mib } - WorkspaceInfo { id, name, source_snapshot_tag, current_state_tag, status, live_sandbox_id, created_at_unix, last_active_unix, last_branch_memory_path } - SuspendWorkspaceRequest { diff: bool } Registry gains BTreeMap keyed by name + list/get/insert/remove/update methods. reconcile() now also marks Running workspaces Stale at daemon startup if their live_sandbox_id no longer corresponds to a tracked sandbox. HTTP routes: - GET /v1/workspaces - POST /v1/workspaces — create + spawn first sandbox - GET /v1/workspaces/:name - DELETE /v1/workspaces/:name — kill sandbox + drop state snapshot - POST /v1/workspaces/:name/suspend — branch live sandbox to ws--state, kill sandbox - POST /v1/workspaces/:name/resume — spawn from current_state_tag (or source if first resume) State-tag scheme is overwrite-on-suspend (ws--state), bounded to one snapshot per workspace on disk. Source snapshot is preserved on delete; only the workspace's own state snapshot is removed. Suspend respects v0.3 diff: true and the previous-output chain via last_branch_memory_path persisted on WorkspaceInfo. CLI subcommands + tests land in a follow-up commit on this branch. Refs #116. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/forkd-controller/src/api.rs | 66 +++++ crates/forkd-controller/src/http.rs | 404 ++++++++++++++++++++++++++- crates/forkd-controller/src/state.rs | 93 +++++- 3 files changed, 559 insertions(+), 4 deletions(-) diff --git a/crates/forkd-controller/src/api.rs b/crates/forkd-controller/src/api.rs index 8e48735..39c0348 100644 --- a/crates/forkd-controller/src/api.rs +++ b/crates/forkd-controller/src/api.rs @@ -179,6 +179,72 @@ pub struct SandboxInfo { pub last_branch_memory_path: Option, } +/// State of a stateful workspace (#116). Tracks whether the workspace +/// is currently driving a live sandbox or has been suspended to a +/// state tag (so a future `resume` can pick up where it left off). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum WorkspaceStatus { + /// Has a live sandbox (`live_sandbox_id` is Some). + Running, + /// No live sandbox; `current_state_tag` points at the latest + /// suspended snapshot. `resume` spawns from there. + Suspended, + /// Was Running at daemon shutdown / crash. The live sandbox is + /// gone; the workspace needs a fresh resume from + /// `current_state_tag` (if any) or `source_snapshot_tag` (if + /// never suspended). + Stale, +} + +/// `POST /v1/workspaces` — create a new stateful workspace. +/// +/// Spawns a sandbox from `snapshot_tag` and tracks it as a workspace +/// the user can `suspend` / `resume` across daemon restarts. The +/// workspace is identified by `name` (unique per daemon). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateWorkspaceRequest { + pub name: String, + pub snapshot_tag: String, + #[serde(default)] + pub per_child_netns: bool, + #[serde(default)] + pub memory_limit_mib: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WorkspaceInfo { + pub id: String, + pub name: String, + pub source_snapshot_tag: String, + /// Set after the first successful `suspend`. None for workspaces + /// that have only been Running since creation. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub current_state_tag: Option, + pub status: WorkspaceStatus, + /// Set when status == Running. None otherwise. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub live_sandbox_id: Option, + pub created_at_unix: u64, + pub last_active_unix: u64, + /// Persisted between resumes — used to chain diff snapshots + /// across the workspace lifetime if the operator opts in via + /// `suspend?diff=true`. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_branch_memory_path: Option, +} + +/// `POST /v1/workspaces/:name/suspend` request body. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct SuspendWorkspaceRequest { + /// Use v0.3 diff snapshot for the suspend write. ~200 ms source + /// pause vs seconds for a Full snapshot. Honors the same + /// `last_branch_memory_path` chain that `POST /v1/sandboxes/:id/branch` + /// uses. + #[serde(default)] + pub diff: bool, +} + /// `POST /v1/sandboxes/:id/exec` #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ExecRequest { diff --git a/crates/forkd-controller/src/http.rs b/crates/forkd-controller/src/http.rs index 3b0f202..26f4d96 100644 --- a/crates/forkd-controller/src/http.rs +++ b/crates/forkd-controller/src/http.rs @@ -33,8 +33,9 @@ use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use crate::api::{ - BranchSandboxRequest, CreateSandboxRequest, CreateSnapshotRequest, ErrorBody, EvalRequest, - EvalResponse, ExecRequest, ExecResponse, SandboxInfo, SnapshotInfo, VersionResponse, + BranchSandboxRequest, CreateSandboxRequest, CreateSnapshotRequest, CreateWorkspaceRequest, + ErrorBody, EvalRequest, EvalResponse, ExecRequest, ExecResponse, SandboxInfo, SnapshotInfo, + SuspendWorkspaceRequest, VersionResponse, WorkspaceInfo, WorkspaceStatus, }; use crate::state::Registry; @@ -151,6 +152,13 @@ pub fn router(state: SharedState) -> Router { .route("/v1/sandboxes/:id/eval", post(eval_sandbox)) .route("/v1/sandboxes/:id/ping", post(ping_sandbox)) .route("/v1/sandboxes/:id/branch", post(branch_sandbox)) + .route("/v1/workspaces", get(list_workspaces).post(create_workspace)) + .route( + "/v1/workspaces/:name", + get(get_workspace).delete(delete_workspace), + ) + .route("/v1/workspaces/:name/suspend", post(suspend_workspace)) + .route("/v1/workspaces/:name/resume", post(resume_workspace)) .with_state(state) } @@ -1068,6 +1076,398 @@ fn service_unavailable(msg: &str) -> Response { .into_response() } +// ----------------------------------------------------------------------- +// Stateful workspaces (#116) +// ----------------------------------------------------------------------- + +/// Spawn one sandbox from a snapshot tag and return the resulting +/// `forkd_vmm::Vm` + the daemon-side metadata, without inserting into +/// the live_vms / Registry. Workspace endpoints insert into those +/// themselves after wrapping the Vm in a WorkspaceInfo. Kept small +/// because the workspace path doesn't need per_child_netns or +/// memory_limit auto-negotiation today. +fn spawn_one_for_workspace( + s: &SharedState, + snapshot_tag: &str, + per_child_netns: bool, + memory_limit_mib: Option, +) -> anyhow::Result<(forkd_vmm::Vm, SandboxInfo)> { + let snap_dir: PathBuf = match s.registry.get_snapshot(snapshot_tag) { + Some(s) => PathBuf::from(&s.dir), + None => s.snapshot_root.join(snapshot_tag), + }; + if !snap_dir.join("vmstate").exists() { + anyhow::bail!("snapshot {snapshot_tag} not found"); + } + let snapshot = match std::fs::read(snap_dir.join("snapshot.json")) + .ok() + .and_then(|raw| serde_json::from_slice::(&raw).ok()) + { + Some(s) => s, + None => forkd_vmm::Snapshot { + vmstate: snap_dir.join("vmstate"), + memory: snap_dir.join("memory.bin"), + volumes: Vec::new(), + }, + }; + let netns_offset = if per_child_netns { + pick_netns_offset(&s.live_vms.lock(), 1) + } else { + 0 + }; + let opts = forkd_vmm::ForkOpts { + n: 1, + per_child_netns, + memory_limit_mib, + netns_offset, + prewarm_scratch_dir: None, + memory_backend: forkd_vmm::MemoryBackend::File, + enable_diff_snapshots: true, + }; + let work_dir = std::env::temp_dir().join(format!( + "forkd-workspace-{snapshot_tag}-o{netns_offset}" + )); + let mut fork_result = snapshot.restore_many_with(opts, &work_dir)?; + let vm = fork_result + .children + .pop() + .ok_or_else(|| anyhow::anyhow!("restore_many returned no children"))?; + + let info = SandboxInfo { + id: new_sandbox_id(), + snapshot_tag: snapshot_tag.to_string(), + netns: vm.netns.clone(), + guest_addr: "10.42.0.2:8888".to_string(), + created_at_unix: unix_now(), + pid: Some(vm.pid()), + memory_limit_mib, + has_branched: false, + last_branch_memory_path: None, + }; + Ok((vm, info)) +} + +async fn list_workspaces(State(s): State) -> Response { + let v = s.registry.list_workspaces(); + Json(v).into_response() +} + +async fn get_workspace(State(s): State, Path(name): Path) -> Response { + match s.registry.get_workspace(&name) { + Some(ws) => Json(ws).into_response(), + None => not_found(&format!("workspace {name}")), + } +} + +async fn create_workspace( + State(s): State, + Json(req): Json, +) -> Response { + if !is_safe_tag(&req.name) { + return bad_request( + "workspace name must be 1-64 chars, ASCII alnum or dash/underscore", + ); + } + if !is_safe_tag(&req.snapshot_tag) { + return bad_request("snapshot_tag must be 1-64 chars, ASCII alnum or dash/underscore"); + } + if s.registry.get_workspace(&req.name).is_some() { + return conflict(&format!("workspace {} already exists; DELETE first", req.name)); + } + let snapshot_tag = req.snapshot_tag.clone(); + let per_child_netns = req.per_child_netns; + let memory_limit_mib = req.memory_limit_mib; + let s_clone = s.clone(); + let spawn_result = tokio::task::spawn_blocking(move || { + spawn_one_for_workspace(&s_clone, &snapshot_tag, per_child_netns, memory_limit_mib) + }) + .await; + + let (vm, sb_info) = match spawn_result { + Ok(Ok(pair)) => pair, + Ok(Err(e)) => return server_error(&format!("spawn workspace sandbox: {e:#}")), + Err(e) => return server_error(&format!("blocking task panicked: {e}")), + }; + + let id = sb_info.id.clone(); + if let Err(e) = s.registry.insert_sandbox(sb_info.clone()) { + tracing::error!(error=%e, "persist workspace's live sandbox failed"); + } + s.live_vms.lock().insert(id.clone(), vm); + + let now = unix_now(); + let ws = WorkspaceInfo { + id: format!("ws-{}", &id[..id.len().min(16)]), + name: req.name.clone(), + source_snapshot_tag: req.snapshot_tag.clone(), + current_state_tag: None, + status: WorkspaceStatus::Running, + live_sandbox_id: Some(id), + created_at_unix: now, + last_active_unix: now, + last_branch_memory_path: None, + }; + if let Err(e) = s.registry.insert_workspace(ws.clone()) { + return server_error(&format!("persist workspace: {e:#}")); + } + (StatusCode::CREATED, Json(ws)).into_response() +} + +async fn delete_workspace(State(s): State, Path(name): Path) -> Response { + let ws = match s.registry.get_workspace(&name) { + Some(w) => w, + None => return not_found(&format!("workspace {name}")), + }; + // Kill the live sandbox if any. + if let Some(sb_id) = &ws.live_sandbox_id { + if let Some(vm) = s.live_vms.lock().remove(sb_id) { + drop(vm); // Vm::drop kills firecracker + cleans cgroup + } + let _ = s.registry.remove_sandbox(sb_id); + } + // Best-effort cleanup of the workspace's state snapshot. We DO + // NOT remove the source snapshot — it might be shared with other + // workspaces / sandboxes. + if let Some(state_tag) = ws.current_state_tag.as_deref() { + let dir = s.snapshot_root.join(state_tag); + let _ = std::fs::remove_dir_all(&dir); + let _ = s.registry.remove_snapshot(state_tag); + } + let _ = s.registry.remove_workspace(&name); + StatusCode::NO_CONTENT.into_response() +} + +async fn suspend_workspace( + State(s): State, + Path(name): Path, + Json(req): Json, +) -> Response { + let ws = match s.registry.get_workspace(&name) { + Some(w) => w, + None => return not_found(&format!("workspace {name}")), + }; + if ws.status != WorkspaceStatus::Running { + return bad_request(&format!( + "workspace {name} is {:?}, not Running — suspend requires a live sandbox", + ws.status + )); + } + let sb_id = match ws.live_sandbox_id.clone() { + Some(id) => id, + None => return server_error("inconsistent state: Running but no live_sandbox_id"), + }; + + // Pick a state-tag that we overwrite on each suspend; keeps disk + // usage bounded at one snapshot per workspace. + let state_tag = format!("ws-{name}-state"); + if !is_safe_tag(&state_tag) { + return server_error("derived state tag failed validation (workspace name pathological?)"); + } + let snap_dir = s.snapshot_root.join(&state_tag); + + // Hand-roll a slimmer branch path here. Acquire the slot via the + // existing concurrency gate so we don't overlap with other branches. + let _slot = match s.try_acquire_branch_slot(&state_tag) { + Ok(slot) => slot, + Err(BranchSlotError::AlreadyInFlight) => { + return conflict(&format!( + "suspend for workspace '{name}' already in flight" + )); + } + Err(BranchSlotError::CapacityExceeded) => { + return service_unavailable(&format!( + "daemon at branch concurrency cap ({}); retry shortly", + DEFAULT_BRANCH_CONCURRENCY + )); + } + }; + + // Delete any previous state snapshot so the new one can claim the dir. + if snap_dir.join("vmstate").exists() { + let _ = std::fs::remove_dir_all(&snap_dir); + let _ = s.registry.remove_snapshot(&state_tag); + } + + let vm = match s.live_vms.lock().remove(&sb_id) { + Some(v) => v, + None => return not_found(&format!("workspace's live sandbox {sb_id} is gone")), + }; + let snap_dir_for_task = snap_dir.clone(); + let source_tag = ws.source_snapshot_tag.clone(); + let source_memory_path = s.snapshot_root.join(&source_tag).join("memory.bin"); + let last_chain = ws.last_branch_memory_path.clone(); + let diff_mode = req.diff; + + let task = tokio::task::spawn_blocking(move || -> (forkd_vmm::Vm, anyhow::Result<(forkd_vmm::Snapshot, Option)>) { + let mut pause_ms: Option = None; + let res = (|| -> anyhow::Result { + std::fs::create_dir_all(&snap_dir_for_task)?; + let pause_start = std::time::Instant::now(); + let cp_handle: Option>> = if diff_mode { + let src = last_chain + .as_ref() + .filter(|p| p.exists()) + .cloned() + .unwrap_or_else(|| source_memory_path.clone()); + let dst = snap_dir_for_task.join("memory.bin"); + Some(std::thread::spawn(move || std::fs::copy(&src, &dst))) + } else { + None + }; + vm.pause()?; + let snap = if diff_mode { + let diff_path = std::env::temp_dir().join(format!( + "forkd-ws-diff-{}-{}.bin", + std::process::id(), + unix_now() + )); + let diff_snap = vm.snapshot_diff_to( + snap_dir_for_task.join("vmstate"), + diff_path.clone(), + Vec::new(), + )?; + vm.resume()?; + pause_ms = Some(pause_start.elapsed().as_millis() as u64); + if let Some(h) = cp_handle { + h.join() + .map_err(|e| anyhow::anyhow!("cp thread panicked: {e:?}"))??; + } + forkd_vmm::apply_diff(&diff_path, &snap_dir_for_task.join("memory.bin"))?; + let _ = std::fs::remove_file(&diff_path); + forkd_vmm::Snapshot { + vmstate: diff_snap.vmstate, + memory: snap_dir_for_task.join("memory.bin"), + volumes: diff_snap.volumes, + } + } else { + let snap = vm.snapshot_to( + snap_dir_for_task.join("vmstate"), + snap_dir_for_task.join("memory.bin"), + Vec::new(), + )?; + vm.resume()?; + pause_ms = Some(pause_start.elapsed().as_millis() as u64); + snap + }; + Ok(snap) + })(); + (vm, res.map(|s| (s, pause_ms))) + }) + .await; + + let (vm_back, snap_or_err) = match task { + Ok((vm, r)) => (vm, r), + Err(e) => return server_error(&format!("blocking task panicked: {e}")), + }; + + // We took the VM out of live_vms for suspend; intentionally + // discard it now (suspend == kill source after snapshotting). + drop(vm_back); + let _ = s.registry.remove_sandbox(&sb_id); + + let (snap, pause_ms) = match snap_or_err { + Ok((s, p)) => (s, p), + Err(e) => { + let _ = std::fs::remove_dir_all(&snap_dir); + return server_error(&format!("suspend: {e:#}")); + } + }; + + // Persist snapshot.json so resume can find the volume / mem_file metadata. + let meta = match serde_json::to_vec_pretty(&snap) { + Ok(v) => v, + Err(e) => { + let _ = std::fs::remove_dir_all(&snap_dir); + return server_error(&format!("serialize snapshot.json: {e}")); + } + }; + if let Err(e) = std::fs::write(snap_dir.join("snapshot.json"), &meta) { + let _ = std::fs::remove_dir_all(&snap_dir); + return server_error(&format!("write snapshot.json: {e}")); + } + + let snapshot_info = SnapshotInfo { + tag: state_tag.clone(), + dir: snap_dir.display().to_string(), + created_at_unix: unix_now(), + branched_from: Some(sb_id.clone()), + pause_ms, + diff_ms: None, + diff_physical_bytes: None, + diff_logical_bytes: None, + }; + if let Err(e) = s.registry.insert_snapshot(snapshot_info) { + return server_error(&format!("persist suspend snapshot: {e:#}")); + } + + let now = unix_now(); + if let Err(e) = s.registry.update_workspace(&name, |ws| { + ws.status = WorkspaceStatus::Suspended; + ws.live_sandbox_id = None; + ws.current_state_tag = Some(state_tag.clone()); + ws.last_active_unix = now; + ws.last_branch_memory_path = Some(snap_dir.join("memory.bin")); + }) { + return server_error(&format!("update workspace: {e:#}")); + } + + let ws = match s.registry.get_workspace(&name) { + Some(w) => w, + None => return server_error("workspace vanished during suspend"), + }; + Json(ws).into_response() +} + +async fn resume_workspace(State(s): State, Path(name): Path) -> Response { + let ws = match s.registry.get_workspace(&name) { + Some(w) => w, + None => return not_found(&format!("workspace {name}")), + }; + if ws.status == WorkspaceStatus::Running { + return bad_request(&format!( + "workspace {name} is already Running (sandbox {})", + ws.live_sandbox_id.as_deref().unwrap_or("?") + )); + } + // Pick the snapshot to spawn from: prefer current_state_tag (the + // suspend snapshot), fall back to source if the workspace was + // never suspended (Stale-from-startup case). + let spawn_tag = ws + .current_state_tag + .clone() + .unwrap_or_else(|| ws.source_snapshot_tag.clone()); + let s_clone = s.clone(); + let spawn_result = tokio::task::spawn_blocking(move || { + spawn_one_for_workspace(&s_clone, &spawn_tag, false, None) + }) + .await; + let (vm, sb_info) = match spawn_result { + Ok(Ok(pair)) => pair, + Ok(Err(e)) => return server_error(&format!("spawn workspace sandbox: {e:#}")), + Err(e) => return server_error(&format!("blocking task panicked: {e}")), + }; + let id = sb_info.id.clone(); + if let Err(e) = s.registry.insert_sandbox(sb_info.clone()) { + tracing::error!(error=%e, "persist workspace's live sandbox failed"); + } + s.live_vms.lock().insert(id.clone(), vm); + + let now = unix_now(); + if let Err(e) = s.registry.update_workspace(&name, |w| { + w.status = WorkspaceStatus::Running; + w.live_sandbox_id = Some(id.clone()); + w.last_active_unix = now; + }) { + return server_error(&format!("update workspace: {e:#}")); + } + + let ws = match s.registry.get_workspace(&name) { + Some(w) => w, + None => return server_error("workspace vanished during resume"), + }; + Json(ws).into_response() +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/forkd-controller/src/state.rs b/crates/forkd-controller/src/state.rs index 5680346..a1a0391 100644 --- a/crates/forkd-controller/src/state.rs +++ b/crates/forkd-controller/src/state.rs @@ -15,7 +15,7 @@ use std::fs; use std::path::PathBuf; use std::sync::Arc; -use crate::api::{SandboxInfo, SnapshotInfo}; +use crate::api::{SandboxInfo, SnapshotInfo, WorkspaceInfo, WorkspaceStatus}; #[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct PersistentState { @@ -23,6 +23,12 @@ pub struct PersistentState { pub snapshots: BTreeMap, #[serde(default)] pub sandboxes: BTreeMap, + /// Stateful workspaces (#116). Keyed by name (user-facing + /// identifier; unique per daemon). The internal `id` field on + /// `WorkspaceInfo` is for audit / cross-reference with live sandbox + /// pids; lookups + mutations from the HTTP / CLI surface go by name. + #[serde(default)] + pub workspaces: BTreeMap, } #[derive(Clone)] @@ -135,6 +141,58 @@ impl Registry { Ok(removed) } + // -------------------------- workspaces (#116) -------------------------- + + pub fn list_workspaces(&self) -> Vec { + self.inner.lock().workspaces.values().cloned().collect() + } + + pub fn get_workspace(&self, name: &str) -> Option { + self.inner.lock().workspaces.get(name).cloned() + } + + pub fn insert_workspace(&self, ws: WorkspaceInfo) -> Result<()> { + { + let mut g = self.inner.lock(); + g.workspaces.insert(ws.name.clone(), ws); + } + self.flush() + } + + pub fn remove_workspace(&self, name: &str) -> Result> { + let removed = { + let mut g = self.inner.lock(); + g.workspaces.remove(name) + }; + if removed.is_some() { + self.flush()?; + } + Ok(removed) + } + + /// Update a workspace in-place via a mutation closure. Returns + /// Ok(true) if the workspace was found and the change persisted; + /// Ok(false) if no such workspace. + pub fn update_workspace(&self, name: &str, mutate: F) -> Result + where + F: FnOnce(&mut WorkspaceInfo), + { + let updated = { + let mut g = self.inner.lock(); + match g.workspaces.get_mut(name) { + Some(ws) => { + mutate(ws); + true + } + None => false, + } + }; + if updated { + self.flush()?; + } + Ok(updated) + } + /// Persist current state atomically (write to temp + rename). fn flush(&self) -> Result<()> { let state = self.inner.lock().clone(); @@ -165,7 +223,38 @@ impl Registry { self.inner.lock().sandboxes.remove(&id); pruned += 1; } - if pruned > 0 { + + // Workspaces (#116): any workspace marked Running whose + // live_sandbox_id is no longer in the live sandbox table is + // Stale — the daemon crashed/restarted out from under it. + // We don't touch Suspended workspaces; they were intentionally + // parked. + let live_ids: std::collections::HashSet = self + .inner + .lock() + .sandboxes + .keys() + .cloned() + .collect(); + let mut stale_ws_changed = false; + { + let mut g = self.inner.lock(); + for ws in g.workspaces.values_mut() { + if ws.status == WorkspaceStatus::Running { + let live = ws + .live_sandbox_id + .as_ref() + .is_some_and(|id| live_ids.contains(id)); + if !live { + ws.status = WorkspaceStatus::Stale; + ws.live_sandbox_id = None; + stale_ws_changed = true; + } + } + } + } + + if pruned > 0 || stale_ws_changed { self.flush()?; } Ok(pruned) From 9032eb3e0c51f4563c5541b79fe771fd8e660313 Mon Sep 17 00:00:00 2001 From: Wayland Yang Date: Wed, 20 May 2026 03:48:23 +0800 Subject: [PATCH 2/4] feat(cli): forkd workspace {create,suspend,resume,list,delete} Wraps the new daemon workspace API in CLI subcommands. Driver uses ureq (already in the cli crate) to hit the daemon's REST endpoints. Subcommands: - create --snapshot [--per-child-netns] [--memory-limit-mib N] - suspend [--diff] - resume - list - delete All accept --daemon-url (env FORKD_URL) and --daemon-token (env FORKD_TOKEN), matching the existing snapshot subcommand conventions. Refs #116. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/forkd-cli/src/main.rs | 203 +++++++++++++++++++++++++++++++++++ 1 file changed, 203 insertions(+) diff --git a/crates/forkd-cli/src/main.rs b/crates/forkd-cli/src/main.rs index b4a59c4..77b3c59 100644 --- a/crates/forkd-cli/src/main.rs +++ b/crates/forkd-cli/src/main.rs @@ -281,6 +281,76 @@ enum Cmd { #[arg(long)] base_image: Option, }, + + /// Stateful workspaces (#116) — long-lived sandboxes that survive + /// suspend / resume across daemon restarts. Drive via the + /// controller daemon (`FORKD_URL` / `FORKD_TOKEN`). + #[command(subcommand)] + Workspace(WorkspaceAction), +} + +#[derive(Subcommand)] +enum WorkspaceAction { + /// Create a new workspace by spawning a sandbox from a snapshot + /// tag. The workspace tracks the sandbox so future `suspend` / + /// `resume` calls operate on it by name. + Create { + /// Workspace name. 1-64 chars, ASCII alnum / dash / underscore. + name: String, + /// Snapshot tag to fork from. + #[arg(long)] + snapshot: String, + /// Place the live sandbox in its own pre-provisioned netns. + #[arg(long)] + per_child_netns: bool, + /// Cgroup memory.max for the live sandbox (MiB). + #[arg(long)] + memory_limit_mib: Option, + /// Controller URL. Defaults to FORKD_URL or http://127.0.0.1:8889. + #[arg(long, env = "FORKD_URL", default_value = "http://127.0.0.1:8889")] + daemon_url: String, + /// Controller bearer token. + #[arg(long, env = "FORKD_TOKEN")] + daemon_token: Option, + }, + /// Snapshot the workspace's live sandbox and kill it. State is + /// preserved under `ws--state`; a subsequent `resume` + /// brings the workspace back from there. + Suspend { + name: String, + /// Use v0.3 diff snapshot mode for the suspend write. + /// ~200 ms source pause vs seconds for Full. + #[arg(long)] + diff: bool, + #[arg(long, env = "FORKD_URL", default_value = "http://127.0.0.1:8889")] + daemon_url: String, + #[arg(long, env = "FORKD_TOKEN")] + daemon_token: Option, + }, + /// Restore the workspace from its suspended state. + Resume { + name: String, + #[arg(long, env = "FORKD_URL", default_value = "http://127.0.0.1:8889")] + daemon_url: String, + #[arg(long, env = "FORKD_TOKEN")] + daemon_token: Option, + }, + /// List all workspaces tracked by the daemon. + List { + #[arg(long, env = "FORKD_URL", default_value = "http://127.0.0.1:8889")] + daemon_url: String, + #[arg(long, env = "FORKD_TOKEN")] + daemon_token: Option, + }, + /// Delete a workspace. Kills the live sandbox (if any) and + /// removes the state snapshot. Does NOT touch the source snapshot. + Delete { + name: String, + #[arg(long, env = "FORKD_URL", default_value = "http://127.0.0.1:8889")] + daemon_url: String, + #[arg(long, env = "FORKD_TOKEN")] + daemon_token: Option, + }, } #[derive(Subcommand)] @@ -453,6 +523,139 @@ fn main() -> Result<()> { description, base_image, } => push_cmd(tag, url, description, base_image), + Cmd::Workspace(action) => workspace_cmd(action), + } +} + +fn workspace_cmd(action: WorkspaceAction) -> Result<()> { + use serde_json::{json, Value}; + fn daemon_request( + method: &str, + url: String, + path: &str, + token: Option, + body: Option, + ) -> Result { + let mut req = ureq::request(method, &format!("{}{path}", url.trim_end_matches('/'))); + if let Some(t) = token { + req = req.set("Authorization", &format!("Bearer {t}")); + } + req = req.set("Content-Type", "application/json"); + let resp = match body { + Some(b) => req.send_json(b), + None => req.call(), + }; + match resp { + Ok(r) => Ok(r.into_json().unwrap_or(Value::Null)), + Err(ureq::Error::Status(code, r)) => { + let body = r.into_string().unwrap_or_default(); + anyhow::bail!("daemon HTTP {code}: {body}") + } + Err(e) => Err(anyhow::anyhow!("daemon request failed: {e}")), + } + } + fn print_ws(v: &Value) { + println!( + "{:<24} {:<10} source={:<24} state={:<24} live={}", + v.get("name").and_then(Value::as_str).unwrap_or("?"), + v.get("status").and_then(Value::as_str).unwrap_or("?"), + v.get("source_snapshot_tag") + .and_then(Value::as_str) + .unwrap_or("?"), + v.get("current_state_tag") + .and_then(Value::as_str) + .unwrap_or("-"), + v.get("live_sandbox_id") + .and_then(Value::as_str) + .unwrap_or("-"), + ); + } + match action { + WorkspaceAction::Create { + name, + snapshot, + per_child_netns, + memory_limit_mib, + daemon_url, + daemon_token, + } => { + let mut body = json!({ + "name": name, + "snapshot_tag": snapshot, + "per_child_netns": per_child_netns, + }); + if let Some(m) = memory_limit_mib { + body["memory_limit_mib"] = json!(m); + } + let resp = daemon_request("POST", daemon_url, "/v1/workspaces", daemon_token, Some(body))?; + print_ws(&resp); + Ok(()) + } + WorkspaceAction::Suspend { + name, + diff, + daemon_url, + daemon_token, + } => { + let body = json!({"diff": diff}); + let resp = daemon_request( + "POST", + daemon_url, + &format!("/v1/workspaces/{name}/suspend"), + daemon_token, + Some(body), + )?; + print_ws(&resp); + Ok(()) + } + WorkspaceAction::Resume { + name, + daemon_url, + daemon_token, + } => { + let resp = daemon_request( + "POST", + daemon_url, + &format!("/v1/workspaces/{name}/resume"), + daemon_token, + Some(json!({})), + )?; + print_ws(&resp); + Ok(()) + } + WorkspaceAction::List { + daemon_url, + daemon_token, + } => { + let resp = daemon_request("GET", daemon_url, "/v1/workspaces", daemon_token, None)?; + if let Some(arr) = resp.as_array() { + if arr.is_empty() { + println!("(no workspaces)"); + } else { + for ws in arr { + print_ws(ws); + } + } + } else { + println!("{}", serde_json::to_string_pretty(&resp).unwrap_or_default()); + } + Ok(()) + } + WorkspaceAction::Delete { + name, + daemon_url, + daemon_token, + } => { + daemon_request( + "DELETE", + daemon_url, + &format!("/v1/workspaces/{name}"), + daemon_token, + None, + )?; + println!("deleted workspace '{name}'"); + Ok(()) + } } } From b0073cc989e18bec7d9910a040bfc377bce7f1f5 Mon Sep 17 00:00:00 2001 From: Wayland Yang Date: Wed, 20 May 2026 03:50:00 +0800 Subject: [PATCH 3/4] fix(cli): use send_bytes instead of send_json (ureq json feature not enabled) --- crates/forkd-cli/src/main.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/crates/forkd-cli/src/main.rs b/crates/forkd-cli/src/main.rs index 77b3c59..5458d63 100644 --- a/crates/forkd-cli/src/main.rs +++ b/crates/forkd-cli/src/main.rs @@ -542,11 +542,22 @@ fn workspace_cmd(action: WorkspaceAction) -> Result<()> { } req = req.set("Content-Type", "application/json"); let resp = match body { - Some(b) => req.send_json(b), + Some(b) => { + let bytes = serde_json::to_vec(&b) + .map_err(|e| anyhow::anyhow!("serialize body: {e}"))?; + req.send_bytes(&bytes) + } None => req.call(), }; match resp { - Ok(r) => Ok(r.into_json().unwrap_or(Value::Null)), + Ok(r) => { + let text = r.into_string().unwrap_or_default(); + if text.is_empty() { + Ok(Value::Null) + } else { + serde_json::from_str(&text).map_err(|e| anyhow::anyhow!("parse response: {e}")) + } + } Err(ureq::Error::Status(code, r)) => { let body = r.into_string().unwrap_or_default(); anyhow::bail!("daemon HTTP {code}: {body}") From 30c5104b3f0b9281bc0311c8fb1818bcf2ea115e Mon Sep 17 00:00:00 2001 From: Wayland Yang Date: Wed, 20 May 2026 03:54:07 +0800 Subject: [PATCH 4/4] =?UTF-8?q?chore:=20cargo=20fmt=20=E2=80=94=20workspac?= =?UTF-8?q?es=20handler=20line=20wraps?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/forkd-cli/src/main.rs | 17 +++++++++++++---- crates/forkd-controller/src/http.rs | 23 ++++++++++++----------- crates/forkd-controller/src/state.rs | 9 ++------- 3 files changed, 27 insertions(+), 22 deletions(-) diff --git a/crates/forkd-cli/src/main.rs b/crates/forkd-cli/src/main.rs index 5458d63..06a3ae7 100644 --- a/crates/forkd-cli/src/main.rs +++ b/crates/forkd-cli/src/main.rs @@ -543,8 +543,8 @@ fn workspace_cmd(action: WorkspaceAction) -> Result<()> { req = req.set("Content-Type", "application/json"); let resp = match body { Some(b) => { - let bytes = serde_json::to_vec(&b) - .map_err(|e| anyhow::anyhow!("serialize body: {e}"))?; + let bytes = + serde_json::to_vec(&b).map_err(|e| anyhow::anyhow!("serialize body: {e}"))?; req.send_bytes(&bytes) } None => req.call(), @@ -598,7 +598,13 @@ fn workspace_cmd(action: WorkspaceAction) -> Result<()> { if let Some(m) = memory_limit_mib { body["memory_limit_mib"] = json!(m); } - let resp = daemon_request("POST", daemon_url, "/v1/workspaces", daemon_token, Some(body))?; + let resp = daemon_request( + "POST", + daemon_url, + "/v1/workspaces", + daemon_token, + Some(body), + )?; print_ws(&resp); Ok(()) } @@ -648,7 +654,10 @@ fn workspace_cmd(action: WorkspaceAction) -> Result<()> { } } } else { - println!("{}", serde_json::to_string_pretty(&resp).unwrap_or_default()); + println!( + "{}", + serde_json::to_string_pretty(&resp).unwrap_or_default() + ); } Ok(()) } diff --git a/crates/forkd-controller/src/http.rs b/crates/forkd-controller/src/http.rs index 26f4d96..85828e0 100644 --- a/crates/forkd-controller/src/http.rs +++ b/crates/forkd-controller/src/http.rs @@ -152,7 +152,10 @@ pub fn router(state: SharedState) -> Router { .route("/v1/sandboxes/:id/eval", post(eval_sandbox)) .route("/v1/sandboxes/:id/ping", post(ping_sandbox)) .route("/v1/sandboxes/:id/branch", post(branch_sandbox)) - .route("/v1/workspaces", get(list_workspaces).post(create_workspace)) + .route( + "/v1/workspaces", + get(list_workspaces).post(create_workspace), + ) .route( "/v1/workspaces/:name", get(get_workspace).delete(delete_workspace), @@ -1124,9 +1127,8 @@ fn spawn_one_for_workspace( memory_backend: forkd_vmm::MemoryBackend::File, enable_diff_snapshots: true, }; - let work_dir = std::env::temp_dir().join(format!( - "forkd-workspace-{snapshot_tag}-o{netns_offset}" - )); + let work_dir = + std::env::temp_dir().join(format!("forkd-workspace-{snapshot_tag}-o{netns_offset}")); let mut fork_result = snapshot.restore_many_with(opts, &work_dir)?; let vm = fork_result .children @@ -1164,15 +1166,16 @@ async fn create_workspace( Json(req): Json, ) -> Response { if !is_safe_tag(&req.name) { - return bad_request( - "workspace name must be 1-64 chars, ASCII alnum or dash/underscore", - ); + return bad_request("workspace name must be 1-64 chars, ASCII alnum or dash/underscore"); } if !is_safe_tag(&req.snapshot_tag) { return bad_request("snapshot_tag must be 1-64 chars, ASCII alnum or dash/underscore"); } if s.registry.get_workspace(&req.name).is_some() { - return conflict(&format!("workspace {} already exists; DELETE first", req.name)); + return conflict(&format!( + "workspace {} already exists; DELETE first", + req.name + )); } let snapshot_tag = req.snapshot_tag.clone(); let per_child_netns = req.per_child_netns; @@ -1270,9 +1273,7 @@ async fn suspend_workspace( let _slot = match s.try_acquire_branch_slot(&state_tag) { Ok(slot) => slot, Err(BranchSlotError::AlreadyInFlight) => { - return conflict(&format!( - "suspend for workspace '{name}' already in flight" - )); + return conflict(&format!("suspend for workspace '{name}' already in flight")); } Err(BranchSlotError::CapacityExceeded) => { return service_unavailable(&format!( diff --git a/crates/forkd-controller/src/state.rs b/crates/forkd-controller/src/state.rs index a1a0391..1f240cc 100644 --- a/crates/forkd-controller/src/state.rs +++ b/crates/forkd-controller/src/state.rs @@ -229,13 +229,8 @@ impl Registry { // Stale — the daemon crashed/restarted out from under it. // We don't touch Suspended workspaces; they were intentionally // parked. - let live_ids: std::collections::HashSet = self - .inner - .lock() - .sandboxes - .keys() - .cloned() - .collect(); + let live_ids: std::collections::HashSet = + self.inner.lock().sandboxes.keys().cloned().collect(); let mut stale_ws_changed = false; { let mut g = self.inner.lock();