diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 984239d..9474083 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -118,6 +118,9 @@ jobs: - name: Build hm run: cargo build -p harmont-cli + - name: Enable FUSE allow_other + run: sudo sed -i 's/#user_allow_other/user_allow_other/' /etc/fuse.conf + - name: Restore harmont Docker cache uses: actions/cache/restore@v4 with: diff --git a/.github/workflows/examples.yml b/.github/workflows/examples.yml index d923926..ad8eff1 100644 --- a/.github/workflows/examples.yml +++ b/.github/workflows/examples.yml @@ -109,6 +109,9 @@ jobs: - name: Mark hm executable run: chmod +x bin/hm + - name: Enable FUSE allow_other + run: sudo sed -i 's/#user_allow_other/user_allow_other/' /etc/fuse.conf + - name: Run example via hm run working-directory: examples/${{ matrix.example }} env: diff --git a/Cargo.lock b/Cargo.lock index a372b05..b7f1902 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1211,7 +1211,7 @@ dependencies = [ [[package]] name = "hm-plugin-cloud" -version = "0.1.0" +version = "0.0.0-dev" dependencies = [ "anyhow", "base64", @@ -1248,9 +1248,12 @@ dependencies = [ name = "hm-util" version = "0.0.0-dev" dependencies = [ + "anyhow", "dirs", "tempfile", "tokio", + "tracing", + "which 6.0.3", "windows", ] diff --git a/README.md b/README.md index f42e471..1f80327 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@

- Website · Docs · Slack + Website · Docs · Slack

> [!WARNING] @@ -174,7 +174,7 @@ Go, Python, Java, C++, React, Next.js, and more. ## Documentation For the full pipeline reference, rich examples, and more — see the -[docs](https://harmont.dev/docs). +[docs](https://docs.harmont.dev). ## License diff --git a/crates/hm-util/Cargo.toml b/crates/hm-util/Cargo.toml index 9e10e8a..4eea739 100644 --- a/crates/hm-util/Cargo.toml +++ b/crates/hm-util/Cargo.toml @@ -7,8 +7,12 @@ repository.workspace = true description = "Shared OS and filesystem utilities for Harmont crates." [dependencies] +anyhow = { workspace = true } dirs = "6" +tempfile = "3" tokio = { version = "1", features = ["rt", "rt-multi-thread", "fs", "io-util"] } +tracing = { workspace = true } +which = "6" [target.'cfg(windows)'.dependencies.windows] version = "0.62" diff --git a/crates/hm-util/src/cow.rs b/crates/hm-util/src/cow.rs new file mode 100644 index 0000000..fd0ac7b --- /dev/null +++ b/crates/hm-util/src/cow.rs @@ -0,0 +1,429 @@ +//! Platform-native copy-on-write directory cloning. + +use std::path::{Path, PathBuf}; +use std::process::Command; +use std::sync::OnceLock; + +use anyhow::{Context, Result, bail}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CowStrategy { + ApfsClone, + Reflink, + FuseOverlay, + FullCopy, +} + +/// Detect the best available COW strategy for the current platform. +/// Result is cached after the first call. +#[must_use] +pub fn detect_strategy() -> CowStrategy { + static STRATEGY: OnceLock = OnceLock::new(); + *STRATEGY.get_or_init(detect_strategy_inner) +} + +/// Probe result for a single strategy. +#[derive(Debug, Clone)] +pub struct StrategyProbe { + pub strategy: CowStrategy, + pub available: bool, + pub reason: &'static str, +} + +/// Test all strategies and report which are available. +/// Used for diagnostics / user-facing warnings. +#[must_use] +#[allow(clippy::vec_init_then_push)] +pub fn diagnose_strategies() -> Vec { + let mut probes = Vec::new(); + + #[cfg(target_os = "macos")] + probes.push(StrategyProbe { + strategy: CowStrategy::ApfsClone, + available: true, + reason: "macOS APFS detected", + }); + + #[cfg(target_os = "linux")] + { + probes.push(StrategyProbe { + strategy: CowStrategy::Reflink, + available: probe_reflink(), + reason: if probe_reflink() { + "filesystem supports reflinks" + } else { + "filesystem does not support reflinks (btrfs/XFS required)" + }, + }); + let fuse_ok = probe_fuse_overlayfs(); + probes.push(StrategyProbe { + strategy: CowStrategy::FuseOverlay, + available: fuse_ok, + reason: if fuse_ok { + "fuse-overlayfs mount succeeded" + } else if which::which("fuse-overlayfs").is_err() { + "fuse-overlayfs not installed" + } else { + "fuse-overlayfs mount failed (missing /dev/fuse or user_allow_other?)" + }, + }); + } + + probes.push(StrategyProbe { + strategy: CowStrategy::FullCopy, + available: true, + reason: "always available (slow)", + }); + + probes +} + +#[allow(clippy::missing_const_for_fn)] +fn detect_strategy_inner() -> CowStrategy { + #[cfg(target_os = "macos")] + { + return CowStrategy::ApfsClone; + } + + #[cfg(target_os = "linux")] + { + if probe_reflink() { + return CowStrategy::Reflink; + } + if probe_fuse_overlayfs() { + return CowStrategy::FuseOverlay; + } + return CowStrategy::FullCopy; + } + + #[allow(unreachable_code)] + CowStrategy::FullCopy +} + +#[cfg(target_os = "linux")] +fn probe_reflink() -> bool { + let Ok(tmp) = tempfile::tempdir() else { + return false; + }; + let src = tmp.path().join("src"); + let dst = tmp.path().join("dst"); + if std::fs::write(&src, b"x").is_err() { + return false; + } + Command::new("cp") + .args(["--reflink=always"]) + .arg(&src) + .arg(&dst) + .stderr(std::process::Stdio::null()) + .status() + .is_ok_and(|s| s.success()) +} + +#[cfg(target_os = "linux")] +fn probe_fuse_overlayfs() -> bool { + if which::which("fuse-overlayfs").is_err() { + return false; + } + let Ok(tmp) = tempfile::tempdir() else { + return false; + }; + let lower = tmp.path().join("lower"); + let upper = tmp.path().join("upper"); + let work = tmp.path().join("work"); + let merged = tmp.path().join("merged"); + for d in [&lower, &upper, &work, &merged] { + if std::fs::create_dir(d).is_err() { + return false; + } + } + let opts = format!( + "lowerdir={},upperdir={},workdir={},allow_other,squash_to_uid=0,squash_to_gid=0", + lower.display(), + upper.display(), + work.display(), + ); + let ok = Command::new("fuse-overlayfs") + .args(["-o", &opts]) + .arg(&merged) + .stderr(std::process::Stdio::null()) + .status() + .is_ok_and(|s| s.success()); + if ok { + let bin = if which::which("fusermount3").is_ok() { + "fusermount3" + } else { + "fusermount" + }; + let _ = Command::new(bin) + .args(["-u"]) + .arg(&merged) + .stderr(std::process::Stdio::null()) + .status(); + } + ok +} + +/// Clone `src` to `dst` using the best available COW mechanism. +/// +/// # Errors +/// +/// Returns an error if `dst` already exists, if parent directories cannot +/// be created, or if the underlying copy operation fails. +pub fn cow_clone_dir(src: &Path, dst: &Path) -> Result<()> { + if dst.exists() { + bail!("destination already exists: {}", dst.display()); + } + if let Some(parent) = dst.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("create parent dirs for {}", dst.display()))?; + } + + if try_platform_cow(src, dst)? { + return Ok(()); + } + + copy_dir_recursive(src, dst) +} + +fn try_platform_cow(src: &Path, dst: &Path) -> Result { + #[cfg(target_os = "macos")] + { + let status = Command::new("cp") + .args(["-c", "-R", "-p"]) + .arg(src) + .arg(dst) + .stderr(std::process::Stdio::null()) + .status() + .context("spawn cp -c")?; + if status.success() { + return Ok(true); + } + let _ = std::fs::remove_dir_all(dst); + } + + #[cfg(target_os = "linux")] + { + let status = Command::new("cp") + .args(["--reflink=always", "-a"]) + .arg(src) + .arg(dst) + .stderr(std::process::Stdio::null()) + .status() + .context("spawn cp --reflink")?; + if status.success() { + return Ok(true); + } + let _ = std::fs::remove_dir_all(dst); + + let status = Command::new("cp") + .args(["-a"]) + .arg(src) + .arg(dst) + .stderr(std::process::Stdio::null()) + .status() + .context("spawn cp -a")?; + if status.success() { + return Ok(true); + } + let _ = std::fs::remove_dir_all(dst); + } + + Ok(false) +} + +fn copy_dir_recursive(src: &Path, dst: &Path) -> Result<()> { + std::fs::create_dir_all(dst).with_context(|| format!("create {}", dst.display()))?; + for entry in std::fs::read_dir(src).with_context(|| format!("read dir {}", src.display()))? { + let entry = entry?; + let ty = entry.file_type()?; + let src_path = entry.path(); + let dst_path = dst.join(entry.file_name()); + if ty.is_dir() { + copy_dir_recursive(&src_path, &dst_path)?; + } else if ty.is_symlink() { + let target = std::fs::read_link(&src_path)?; + #[cfg(unix)] + std::os::unix::fs::symlink(&target, &dst_path)?; + #[cfg(windows)] + std::os::windows::fs::symlink_file(&target, &dst_path)?; + } else { + std::fs::copy(&src_path, &dst_path) + .with_context(|| format!("copy {}", src_path.display()))?; + } + } + Ok(()) +} + +pub struct OverlayMount { + merged: PathBuf, + upper: PathBuf, + mounted: std::sync::atomic::AtomicBool, +} + +impl OverlayMount { + /// Mount a fuse-overlayfs filesystem merging the given layers. + /// + /// # Errors + /// + /// Returns an error if directory creation fails or `fuse-overlayfs` + /// exits with a non-zero status. + pub fn mount( + lower_dirs: &[&Path], + upper_dir: &Path, + work_dir: &Path, + merged_path: &Path, + ) -> Result { + std::fs::create_dir_all(upper_dir)?; + std::fs::create_dir_all(work_dir)?; + std::fs::create_dir_all(merged_path)?; + + let lowerdir: String = lower_dirs + .iter() + .map(|p| p.to_string_lossy().into_owned()) + .collect::>() + .join(":"); + + let opts = format!( + "lowerdir={lowerdir},upperdir={},workdir={},allow_other,squash_to_uid=0,squash_to_gid=0", + upper_dir.display(), + work_dir.display(), + ); + + let output = Command::new("fuse-overlayfs") + .args(["-o", &opts]) + .arg(merged_path) + .output() + .context("spawn fuse-overlayfs")?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + bail!( + "fuse-overlayfs mount failed (exit {}): {stderr}\nlowerdir={}, upper={}, merged={}", + output.status.code().unwrap_or(-1), + lowerdir, + upper_dir.display(), + merged_path.display(), + ); + } + + Ok(Self { + merged: merged_path.to_path_buf(), + upper: upper_dir.to_path_buf(), + mounted: std::sync::atomic::AtomicBool::new(true), + }) + } + + #[must_use] + pub fn merged_path(&self) -> &Path { + &self.merged + } + + #[must_use] + pub fn upper_dir(&self) -> &Path { + &self.upper + } + + /// Unmount the fuse-overlayfs filesystem. Safe to call multiple times. + /// + /// # Errors + /// + /// Returns an error if `fusermount` cannot be spawned or exits + /// with a non-zero status. + pub fn unmount(&self) -> Result<()> { + if !self + .mounted + .swap(false, std::sync::atomic::Ordering::AcqRel) + { + return Ok(()); + } + let bin = if which::which("fusermount3").is_ok() { + "fusermount3" + } else { + "fusermount" + }; + let status = Command::new(bin) + .args(["-u"]) + .arg(&self.merged) + .stderr(std::process::Stdio::null()) + .status() + .with_context(|| format!("spawn {bin} -u"))?; + if !status.success() { + bail!("{bin} -u {} failed", self.merged.display()); + } + Ok(()) + } +} + +impl std::fmt::Debug for OverlayMount { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OverlayMount") + .field("merged", &self.merged) + .field("upper", &self.upper) + .finish_non_exhaustive() + } +} + +impl Drop for OverlayMount { + fn drop(&mut self) { + if let Err(e) = self.unmount() { + tracing::warn!(%e, path = %self.merged.display(), "fuse-overlayfs unmount failed"); + } + } +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + use std::fs; + + #[test] + fn cow_clone_creates_identical_tree() { + let tmp = tempfile::tempdir().unwrap(); + let src = tmp.path().join("src"); + fs::create_dir_all(src.join("sub")).unwrap(); + fs::write(src.join("a.txt"), b"hello").unwrap(); + fs::write(src.join("sub/b.txt"), b"world").unwrap(); + + let dst = tmp.path().join("dst"); + cow_clone_dir(&src, &dst).unwrap(); + + assert_eq!(fs::read_to_string(dst.join("a.txt")).unwrap(), "hello"); + assert_eq!(fs::read_to_string(dst.join("sub/b.txt")).unwrap(), "world"); + } + + #[test] + fn cow_clone_is_isolated() { + let tmp = tempfile::tempdir().unwrap(); + let src = tmp.path().join("src"); + fs::create_dir(&src).unwrap(); + fs::write(src.join("f.txt"), b"original").unwrap(); + + let dst = tmp.path().join("dst"); + cow_clone_dir(&src, &dst).unwrap(); + + // Mutate dst; src must be unchanged. + fs::write(dst.join("f.txt"), b"modified").unwrap(); + assert_eq!(fs::read_to_string(src.join("f.txt")).unwrap(), "original"); + assert_eq!(fs::read_to_string(dst.join("f.txt")).unwrap(), "modified"); + } + + #[test] + fn cow_clone_fails_if_dst_exists() { + let tmp = tempfile::tempdir().unwrap(); + let src = tmp.path().join("src"); + fs::create_dir(&src).unwrap(); + let dst = tmp.path().join("dst"); + fs::create_dir(&dst).unwrap(); + + assert!(cow_clone_dir(&src, &dst).is_err()); + } + + #[test] + fn detect_strategy_returns_something() { + // Should always detect at least FullCopy. + let s = detect_strategy(); + assert!(!matches!(s, CowStrategy::FuseOverlay)); + // Can't assert specific strategy (platform-dependent) but it must not panic. + } +} diff --git a/crates/hm-util/src/dirs.rs b/crates/hm-util/src/dirs.rs index 4838e3f..e4b1120 100644 --- a/crates/hm-util/src/dirs.rs +++ b/crates/hm-util/src/dirs.rs @@ -31,6 +31,16 @@ pub fn harmont_plugin_state_dir() -> Option { harmont_data_dir().map(|d| d.join("state")) } +/// `~/.harmont/cache/` — local build cache root. +pub fn harmont_cache_dir() -> Option { + harmont_config_dir().map(|h| h.join("cache")) +} + +/// `~/.harmont/cache/workspaces/` — COW workspace cache root. +pub fn harmont_workspace_cache_dir() -> Option { + harmont_cache_dir().map(|c| c.join("workspaces")) +} + #[cfg(test)] #[allow(clippy::unwrap_used)] mod tests { @@ -59,4 +69,16 @@ mod tests { let p = harmont_plugin_state_dir().unwrap(); assert!(p.ends_with("harmont/state")); } + + #[test] + fn harmont_cache_dir_resolves() { + let p = harmont_cache_dir().unwrap(); + assert!(p.to_string_lossy().contains("cache")); + } + + #[test] + fn harmont_workspace_cache_dir_resolves() { + let p = harmont_workspace_cache_dir().unwrap(); + assert!(p.to_string_lossy().contains("workspaces")); + } } diff --git a/crates/hm-util/src/lib.rs b/crates/hm-util/src/lib.rs index c5284c5..e35ba64 100644 --- a/crates/hm-util/src/lib.rs +++ b/crates/hm-util/src/lib.rs @@ -1,2 +1,3 @@ +pub mod cow; pub mod dirs; pub mod os; diff --git a/crates/hm/src/orchestrator/cache.rs b/crates/hm/src/orchestrator/cache.rs index e0bdc5a..1e3208e 100644 --- a/crates/hm/src/orchestrator/cache.rs +++ b/crates/hm/src/orchestrator/cache.rs @@ -1,38 +1,16 @@ //! Host-side cache decision. //! -//! Resolves a wire-typed [`CommandStep`] against the local Docker -//! daemon and returns the wire-typed [`CacheDecision`] consumed by -//! step-executor plugins (design spec §5.5). +//! Resolves a wire-typed [`CommandStep`] against the local COW +//! workspace cache directory and returns the wire-typed +//! [`CacheDecision`] consumed by step execution. //! //! Cache keys are computed by `harmont.keygen` at plan time and ride -//! along the JSON in `cache.key`. We turn them into Docker image tags -//! and consult the local image store. +//! along the JSON in `cache.key`. -use anyhow::Result; -use hm_plugin_protocol::{CacheDecision, CommandStep, SnapshotRef}; - -use crate::orchestrator::docker_client::DockerClient; +use std::path::{Path, PathBuf}; -/// `harmont-local/:`. Step key is -/// sanitised to `[a-zA-Z0-9_-]` (Docker tag rules). Returns `None` -/// when the step has no cache or a policy of `"none"`. -/// -/// The cache key is the SHA-256 hex resolved at plan time by -/// `harmont.keygen`. We truncate to the first 16 hex chars (8 bytes) -/// for the image tag — collision odds across a developer's local -/// cache are negligible. The cloud path uses the full key elsewhere; -/// that divergence is acceptable for local-only tags since they're -/// never resolved across machines. -fn cache_image_tag(step: &CommandStep) -> Option { - let cache = step.cache.as_ref()?; - if cache.policy == "none" { - return None; - } - let key = cache.key.as_deref()?; - let safe = sanitize_for_tag(&step.key); - let short = &key[..key.len().min(16)]; - Some(format!("harmont-local/{safe}:{short}")) -} +use anyhow::{Context, Result}; +use hm_plugin_protocol::{CacheDecision, CommandStep, SnapshotRef}; fn sanitize_for_tag(s: &str) -> String { s.chars() @@ -46,58 +24,127 @@ fn sanitize_for_tag(s: &str) -> String { .collect() } -/// The outcome of a cache lookup: the wire-typed decision plus any -/// stale images that should be garbage-collected after the new image -/// is committed. +// --------------------------------------------------------------------------- +// COW workspace cache +// --------------------------------------------------------------------------- + +/// The outcome of a COW workspace cache lookup. #[derive(Debug)] -pub struct CacheOutcome { +pub struct CowCacheOutcome { pub decision: CacheDecision, - /// Stale cache images for this step that should be removed after - /// the new image is committed successfully. - pub stale_tags: Vec, + pub cache_to: Option, + pub stale_dirs: Vec, } -/// Decide cache outcome for a step against the local Docker daemon. +/// Resolve the on-disk cache directory for a step's COW workspace. +/// +/// Returns `None` when the step has no cache, a `"none"` policy, or no +/// cache key — matching the same guard logic as [`cache_image_tag`]. /// -/// Returns hit (snapshot already present), miss-with-tag (run and commit -/// afterwards), or miss-no-commit (`cache.policy == "none"` or no cache -/// key). +/// # Errors +/// Returns an error if the config directory cannot be resolved. +pub fn cow_cache_dir(step: &CommandStep) -> Result> { + let cache = match step.cache.as_ref() { + Some(c) if c.policy != "none" => c, + _ => return Ok(None), + }; + let Some(key) = cache.key.as_deref() else { + return Ok(None); + }; + let ws_cache = hm_util::dirs::harmont_workspace_cache_dir() + .ok_or_else(|| anyhow::anyhow!("cannot resolve ~/.harmont/cache/workspaces"))?; + let safe = sanitize_for_tag(&step.key); + let short = &key[..key.len().min(16)]; + Ok(Some(ws_cache.join(safe).join(short))) +} + +/// Decide cache outcome for a step against the local COW workspace +/// cache directory. /// /// # Errors -/// Returns an error if the Docker daemon `image_exists` call fails. -pub async fn decide(docker: &DockerClient, step: &CommandStep) -> Result { - let Some(tag) = cache_image_tag(step) else { - return Ok(CacheOutcome { +/// Returns an error if the config directory cannot be resolved or the +/// stale directory listing fails. +pub fn decide_cow(step: &CommandStep) -> Result { + let Some(cache_dir) = cow_cache_dir(step)? else { + return Ok(CowCacheOutcome { decision: CacheDecision::MissNoCommit, - stale_tags: vec![], + cache_to: None, + stale_dirs: vec![], }); }; - if docker.image_exists(&tag).await? { - Ok(CacheOutcome { + if cache_dir.exists() { + Ok(CowCacheOutcome { decision: CacheDecision::Hit { - tag: SnapshotRef::from(tag), + tag: SnapshotRef::from(format!("cow:{}", cache_dir.display())), }, - stale_tags: vec![], + cache_to: None, + stale_dirs: vec![], }) } else { - let safe = sanitize_for_tag(&step.key); - let prefix = format!("harmont-local/{safe}"); - let stale = docker - .list_images_by_reference(&prefix) - .await - .unwrap_or_default() - .into_iter() - .filter(|t| *t != tag) - .collect(); - Ok(CacheOutcome { + let Some(step_cache_root) = cache_dir.parent() else { + return Ok(CowCacheOutcome { + decision: CacheDecision::MissBuildAs { + tag: SnapshotRef::from(format!("cow:{}", cache_dir.display())), + }, + cache_to: Some(cache_dir), + stale_dirs: vec![], + }); + }; + let stale = if step_cache_root.exists() { + std::fs::read_dir(step_cache_root)? + .filter_map(std::result::Result::ok) + .map(|e| e.path()) + .filter(|p| *p != cache_dir) + .collect() + } else { + vec![] + }; + Ok(CowCacheOutcome { decision: CacheDecision::MissBuildAs { - tag: SnapshotRef::from(tag), + tag: SnapshotRef::from(format!("cow:{}", cache_dir.display())), }, - stale_tags: stale, + cache_to: Some(cache_dir), + stale_dirs: stale, }) } } +/// Persist a completed workspace directory into the COW cache. +/// +/// Creates intermediate directories and performs a COW clone. If the +/// cache directory already exists (e.g. a concurrent run beat us) the +/// function returns `Ok(())` without overwriting. +/// +/// # Errors +/// Returns an error if the parent directory cannot be created or the +/// COW clone fails. +pub fn persist_cow_cache(workspace_path: &Path, cache_dir: &Path) -> Result<()> { + if let Some(parent) = cache_dir.parent() { + std::fs::create_dir_all(parent)?; + } + if cache_dir.exists() { + return Ok(()); + } + match hm_util::cow::cow_clone_dir(workspace_path, cache_dir) { + Ok(()) => Ok(()), + Err(e) if cache_dir.exists() => { + tracing::debug!(%e, "concurrent run already populated cache"); + Ok(()) + } + Err(e) => Err(e).context("persist workspace to COW cache"), + } +} + +/// Remove stale COW cache directories left over from previous cache +/// keys. Failures are logged but never propagated. +pub fn evict_stale_cow_dirs(dirs: &[PathBuf]) { + for dir in dirs { + if let Err(e) = std::fs::remove_dir_all(dir) { + tracing::warn!(path = %dir.display(), %e, "failed to evict stale COW cache"); + } + } +} + #[cfg(test)] #[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] mod tests { @@ -119,33 +166,67 @@ mod tests { } #[test] - fn no_cache_yields_none() { - assert!(cache_image_tag(&step(None)).is_none()); + fn sanitize_replaces_invalid_chars() { + assert_eq!(sanitize_for_tag("my/step.name:v1"), "my-step-name-v1"); + assert_eq!(sanitize_for_tag("simple"), "simple"); + assert_eq!(sanitize_for_tag("a_b-c"), "a_b-c"); } #[test] - fn policy_none_yields_none() { + fn cow_cache_dir_returns_path_for_cacheable_step() { let s = step(Some(Cache { - policy: "none".into(), - key: Some("abcdef".into()), + policy: "ttl".into(), + key: Some("0123456789abcdef0000".into()), })); - assert!(cache_image_tag(&s).is_none()); + let dir = cow_cache_dir(&s).unwrap(); + assert!(dir.is_some(), "expected Some for cacheable step"); + let dir = dir.unwrap(); + assert!( + dir.ends_with("cache/workspaces/build/0123456789abcdef"), + "unexpected path: {}", + dir.display() + ); + } + + #[test] + fn cow_cache_dir_returns_none_for_no_cache() { + let s = step(None); + let dir = cow_cache_dir(&s).unwrap(); + assert!(dir.is_none()); } #[test] - fn ttl_with_key_yields_tag() { + fn cow_cache_dir_returns_none_for_policy_none() { let s = step(Some(Cache { - policy: "ttl".into(), - key: Some("0123456789abcdefffff".into()), + policy: "none".into(), + key: Some("abcdef1234567890".into()), })); - let tag = cache_image_tag(&s).unwrap(); - assert!(tag.starts_with("harmont-local/build:")); + let dir = cow_cache_dir(&s).unwrap(); + assert!(dir.is_none()); } #[test] - fn sanitize_replaces_invalid_chars() { - assert_eq!(sanitize_for_tag("my/step.name:v1"), "my-step-name-v1"); - assert_eq!(sanitize_for_tag("simple"), "simple"); - assert_eq!(sanitize_for_tag("a_b-c"), "a_b-c"); + fn decide_cow_miss_no_commit_when_no_cache() { + let s = step(None); + let outcome = decide_cow(&s).unwrap(); + assert!(outcome.decision.is_miss_no_commit()); + assert!(outcome.cache_to.is_none()); + assert!(outcome.stale_dirs.is_empty()); + } + + #[test] + fn decide_cow_miss_build_as_for_new_key() { + // Use a unique key that will not exist on disk. + let s = step(Some(Cache { + policy: "ttl".into(), + key: Some("deadbeefcafebabe9999".into()), + })); + let outcome = decide_cow(&s).unwrap(); + assert!( + outcome.decision.is_miss_build_as(), + "expected MissBuildAs, got {:?}", + outcome.decision + ); + assert!(outcome.cache_to.is_some()); } } diff --git a/crates/hm/src/orchestrator/docker_client.rs b/crates/hm/src/orchestrator/docker_client.rs index 866ffb8..208c9d0 100644 --- a/crates/hm/src/orchestrator/docker_client.rs +++ b/crates/hm/src/orchestrator/docker_client.rs @@ -18,11 +18,31 @@ use bollard::image::{ CommitContainerOptions, CreateImageOptions, ImportImageOptions, ListImagesOptions, RemoveImageOptions, }; +use bollard::models::HostConfig; use futures_util::StreamExt; use tokio::io::AsyncWrite; use crate::error::HmError; +/// Build a [`HostConfig`] with optional bind mounts and Linux capabilities. +/// +/// Empty slices become `None` so Docker applies its defaults. +fn build_host_config(binds: &[String], cap_add: &[String]) -> HostConfig { + HostConfig { + binds: if binds.is_empty() { + None + } else { + Some(binds.to_vec()) + }, + cap_add: if cap_add.is_empty() { + None + } else { + Some(cap_add.to_vec()) + }, + ..Default::default() + } +} + #[derive(Debug, Clone)] pub struct DockerClient { inner: Arc, @@ -135,12 +155,35 @@ impl DockerClient { env: &[String], workdir: &str, name: &str, + ) -> Result { + self.start_long_lived_with_mounts(image, env, workdir, name, &[]) + .await + } + + /// Like [`Self::start_long_lived`] but with bind mounts via `HostConfig`. + /// + /// Each entry in `binds` is a Docker bind-mount string of the form + /// `"/host/path:/container/path"` (with an optional `:ro` suffix). + /// + /// # Errors + /// + /// Returns [`HmError::Docker`] if the container cannot be created + /// (image not pulled, name conflict, OCI runtime failure) or if + /// `start_container` rejects the create. + pub async fn start_long_lived_with_mounts( + &self, + image: &str, + env: &[String], + workdir: &str, + name: &str, + binds: &[String], ) -> Result { let cfg = Config { image: Some(image.to_string()), cmd: Some(vec!["sh".into(), "-c".into(), "sleep infinity".into()]), env: Some(env.to_vec()), working_dir: Some(workdir.to_string()), + host_config: Some(build_host_config(binds, &[])), ..Default::default() }; let create = self @@ -530,4 +573,21 @@ mod smoke { .unwrap(); assert!(tags.is_empty()); } + + #[test] + fn build_host_config_with_binds_and_no_caps() { + let hc = super::build_host_config(&["/host/path:/container/path".to_string()], &[]); + assert_eq!( + hc.binds.as_ref().unwrap(), + &["/host/path:/container/path".to_string()] + ); + assert!(hc.cap_add.is_none()); + } + + #[test] + fn build_host_config_empty_binds_is_none() { + let hc = super::build_host_config(&[], &[]); + assert!(hc.binds.is_none()); + assert!(hc.cap_add.is_none()); + } } diff --git a/crates/hm/src/orchestrator/mod.rs b/crates/hm/src/orchestrator/mod.rs index 7c23600..f6fffcd 100644 --- a/crates/hm/src/orchestrator/mod.rs +++ b/crates/hm/src/orchestrator/mod.rs @@ -14,5 +14,6 @@ pub mod output_subscriber; pub mod scheduler; pub mod signal; pub mod source; +pub mod workspace; pub use scheduler::run; diff --git a/crates/hm/src/orchestrator/scheduler.rs b/crates/hm/src/orchestrator/scheduler.rs index 45e7209..cfbecd4 100644 --- a/crates/hm/src/orchestrator/scheduler.rs +++ b/crates/hm/src/orchestrator/scheduler.rs @@ -33,7 +33,7 @@ use futures::future::{BoxFuture, FutureExt, join_all}; use anyhow::{Context, Result}; use hm_plugin_protocol::{ - ArchiveId, BuildEvent, ExecutorInput, PlanSummary, SnapshotRef, StepResult, + ArchiveId, BuildEvent, CacheDecision, ExecutorInput, PlanSummary, SnapshotRef, StepResult, }; use uuid::Uuid; @@ -89,6 +89,17 @@ pub async fn run( // Build the source archive once. let archive_bytes = build_archive_bytes(&repo_root).context("build source archive")?; + + // Extract the source archive into a temporary directory and create + // a workspace manager for per-step COW clones. This must happen + // before `archives.register()` which consumes the bytes. + let run_dir = std::env::temp_dir().join(format!("harmont-run-{run_id}")); + let workspace = { + let mgr = super::workspace::WorkspaceManager::from_archive(run_dir, &archive_bytes) + .context("init COW workspace")?; + Arc::new(std::sync::Mutex::new(mgr)) + }; + let archive_id = archives.register(archive_bytes); let run_ctx = RunContext { @@ -96,6 +107,7 @@ pub async fn run( event_bus: bus.clone(), archives: archives.clone(), cancel: cancel.clone(), + workspace: workspace.clone(), }; let parallelism = parallelism.max(1); @@ -219,7 +231,7 @@ pub async fn run( let dur = started_total.elapsed().as_millis() as u64; - // Clean up ephemeral images created during this run. + // Clean up ephemeral images created during the run. let ephemeral_tags: Vec<&str> = outcomes .iter() .filter_map(|o| o.snapshot.as_ref()) @@ -232,6 +244,15 @@ pub async fn run( } } + { + let mut mgr = workspace + .lock() + .map_err(|_| anyhow::anyhow!("workspace manager mutex poisoned"))?; + if let Err(e) = mgr.cleanup() { + tracing::warn!(%e, "failed to clean up COW workspace"); + } + } + bus.emit(BuildEvent::BuildEnd { exit_code: overall, duration_ms: dur, @@ -282,15 +303,36 @@ async fn execute_step( step_id, key: step_key.clone(), chain_idx: chain_pos, - parent_key, + parent_key: parent_key.clone(), display_name: display_name.clone(), }); - // Decide cache outcome host-side. - let outcome = cache::decide(&run_ctx.docker, &step_wire).await?; - let decision = outcome.decision; + let cow_outcome = cache::decide_cow(&step_wire)?; + let decision = cow_outcome.decision; + let cow_cache_to = cow_outcome.cache_to; + let cow_stale_dirs = cow_outcome.stale_dirs; + + { + let mut mgr = run_ctx + .workspace + .lock() + .map_err(|_| anyhow::anyhow!("workspace manager mutex poisoned"))?; + if let CacheDecision::Hit { ref tag } = decision { + if tag.0.starts_with("cow:") { + let cached_path = PathBuf::from(&tag.0[4..]); + mgr.create_workspace_from_cache(&step_key, &cached_path) + .context("create workspace from cache")?; + } else { + mgr.create_workspace(&step_key, parent_key.as_deref()) + .context("create workspace for cache hit")?; + } + } else { + mgr.create_workspace(&step_key, parent_key.as_deref()) + .context("create workspace for step")?; + } + } - if let hm_plugin_protocol::CacheDecision::Hit { tag } = &decision { + if let CacheDecision::Hit { tag } = &decision { bus.emit(BuildEvent::StepCacheHit { step_id, key: step_wire @@ -300,9 +342,6 @@ async fn execute_step( .unwrap_or_default(), tag: tag.0.clone(), }); - // Short-circuit: the cached image already exists locally, so - // there is nothing for the executor to do. Return the - // snapshot so downstream nodes can use it as their parent. return Ok(StepOutcome { exit_code: 0, snapshot: Some(tag.clone()), @@ -371,11 +410,22 @@ async fn execute_step( }); cancel.cancel(); } else { - for stale in &outcome.stale_tags { - if let Err(e) = run_ctx.docker.remove_image(stale).await { - tracing::warn!(image = %stale, %e, "failed to evict stale cache image"); + if let Some(ref cache_to) = cow_cache_to { + let ws_path = { + let mgr = run_ctx + .workspace + .lock() + .map_err(|_| anyhow::anyhow!("workspace manager mutex poisoned"))?; + mgr.workspace_path(&step_key) + .map(std::path::Path::to_path_buf) + }; + if let Some(ws) = ws_path + && let Err(e) = cache::persist_cow_cache(&ws, cache_to) + { + tracing::warn!(%e, "failed to persist COW cache"); } } + cache::evict_stale_cow_dirs(&cow_stale_dirs); } Ok(StepOutcome { exit_code: sr.exit_code, diff --git a/crates/hm/src/orchestrator/workspace.rs b/crates/hm/src/orchestrator/workspace.rs new file mode 100644 index 0000000..8a69a52 --- /dev/null +++ b/crates/hm/src/orchestrator/workspace.rs @@ -0,0 +1,374 @@ +//! Per-run workspace orchestration. +//! +//! [`WorkspaceManager`] auto-selects between two strategies: +//! +//! - **Clone strategy** (macOS APFS, Linux reflink, fallback `cp`): +//! each step gets a full directory clone via [`hm_util::cow::cow_clone_dir`]. +//! - **Overlay strategy** (Linux ext4 + `fuse-overlayfs`): +//! each step gets a `fuse-overlayfs` mount with shared lower layers. +//! +//! The rest of the system (scheduler, runner) only sees +//! [`WorkspaceManager::workspace_path`] — strategy is transparent. + +use std::collections::HashMap; +use std::path::{Path, PathBuf}; + +use anyhow::{Context, Result}; +use hm_util::cow::{CowStrategy, OverlayMount}; + +/// Manages workspace directories for a single pipeline run. +/// +/// Each step gets an isolated directory that is either a full COW clone +/// or a `fuse-overlayfs` mount, depending on the platform. +pub struct WorkspaceManager { + run_dir: PathBuf, + base_dir: PathBuf, + strategy: CowStrategy, + workspaces: HashMap, + overlays: HashMap, +} + +struct OverlayLayer { + upper_dir: PathBuf, + merged_dir: PathBuf, + ancestor_uppers: Vec, + _mount: Option, +} + +impl WorkspaceManager { + /// Create a new workspace manager that clones from `base_dir` into + /// per-step sub-directories under `run_dir`. + /// + /// # Errors + /// + /// Returns an error if `run_dir` cannot be created. + pub fn from_base(run_dir: PathBuf, base_dir: PathBuf) -> Result { + std::fs::create_dir_all(&run_dir) + .with_context(|| format!("create run dir {}", run_dir.display()))?; + let strategy = hm_util::cow::detect_strategy(); + tracing::info!(?strategy, "COW workspace strategy"); + if strategy == hm_util::cow::CowStrategy::FullCopy { + tracing::warn!("using full-copy fallback — workspace cloning will be slow"); + for probe in hm_util::cow::diagnose_strategies() { + if probe.available { + tracing::info!(strategy = ?probe.strategy, reason = probe.reason, "available"); + } else { + tracing::info!(strategy = ?probe.strategy, reason = probe.reason, "unavailable"); + } + } + } + Ok(Self { + run_dir, + base_dir, + strategy, + workspaces: HashMap::new(), + overlays: HashMap::new(), + }) + } + + /// Create a new workspace manager that first extracts a tar.gz + /// archive into `run_dir/base`, then delegates to [`Self::from_base`]. + /// + /// # Errors + /// + /// Returns an error if the archive cannot be extracted or the run + /// directory cannot be created. + pub fn from_archive(run_dir: PathBuf, archive_bytes: &[u8]) -> Result { + let base_dir = run_dir.join("base"); + std::fs::create_dir_all(&base_dir) + .with_context(|| format!("create base dir {}", base_dir.display()))?; + extract_tar_gz(archive_bytes, &base_dir)?; + Self::from_base(run_dir, base_dir) + } + + /// Create an isolated workspace directory for `step_key`. + /// + /// If `parent_key` is `Some`, the workspace inherits the contents of + /// the parent workspace (including any modifications made after + /// creation). If `None`, the workspace is cloned from the base + /// directory. + /// + /// # Errors + /// + /// Returns an error if the parent workspace is not registered, or if + /// the clone / overlay operation fails. + pub fn create_workspace( + &mut self, + step_key: &str, + parent_key: Option<&str>, + ) -> Result { + if self.workspaces.contains_key(step_key) || self.overlays.contains_key(step_key) { + anyhow::bail!("workspace for step '{step_key}' already exists"); + } + match self.strategy { + CowStrategy::FuseOverlay => self.create_overlay(step_key, parent_key), + _ => self.create_clone(step_key, parent_key, None), + } + } + + /// Create a workspace from a cached directory, bypassing parent + /// relationships. + /// + /// # Errors + /// + /// Returns an error if the clone operation fails. + pub fn create_workspace_from_cache( + &mut self, + step_key: &str, + cached_workspace: &Path, + ) -> Result { + if self.workspaces.contains_key(step_key) || self.overlays.contains_key(step_key) { + anyhow::bail!("workspace for step '{step_key}' already exists"); + } + self.create_clone(step_key, None, Some(cached_workspace)) + } + + /// Look up the filesystem path for a previously created workspace. + #[must_use] + pub fn workspace_path(&self, step_key: &str) -> Option<&Path> { + if let Some(p) = self.workspaces.get(step_key) { + return Some(p.as_path()); + } + self.overlays.get(step_key).map(|l| l.merged_dir.as_path()) + } + + /// The base directory that root workspaces are cloned from. + #[must_use] + pub fn base_dir(&self) -> &Path { + &self.base_dir + } + + /// The COW strategy in use for this run. + #[must_use] + pub const fn strategy(&self) -> CowStrategy { + self.strategy + } + + /// Remove the entire run directory, including all workspaces and + /// overlay mounts. + /// + /// # Errors + /// + /// Returns an error if the run directory cannot be removed. + pub fn cleanup(&mut self) -> Result<()> { + // Drop overlay mounts before removing the filesystem tree. + self.overlays.clear(); + if self.run_dir.exists() { + std::fs::remove_dir_all(&self.run_dir) + .with_context(|| format!("cleanup run dir {}", self.run_dir.display()))?; + } + Ok(()) + } + + fn create_clone( + &mut self, + step_key: &str, + parent_key: Option<&str>, + cached: Option<&Path>, + ) -> Result { + let safe = sanitize_key(step_key); + let ws_dir = self.run_dir.join("workspaces").join(&safe); + + let source = if let Some(c) = cached { + c.to_path_buf() + } else if let Some(pk) = parent_key { + self.workspaces + .get(pk) + .cloned() + .ok_or_else(|| anyhow::anyhow!("parent workspace '{pk}' not registered"))? + } else { + self.base_dir.clone() + }; + + hm_util::cow::cow_clone_dir(&source, &ws_dir) + .with_context(|| format!("cow clone {} -> {}", source.display(), ws_dir.display()))?; + + self.workspaces.insert(step_key.to_string(), ws_dir.clone()); + Ok(ws_dir) + } + + fn create_overlay(&mut self, step_key: &str, parent_key: Option<&str>) -> Result { + let safe = sanitize_key(step_key); + let layer_dir = self.run_dir.join("layers").join(&safe); + let upper_dir = layer_dir.join("upper"); + let work_dir = layer_dir.join("work"); + let merged_dir = layer_dir.join("merged"); + + std::fs::create_dir_all(&upper_dir)?; + std::fs::create_dir_all(&work_dir)?; + std::fs::create_dir_all(&merged_dir)?; + + let ancestor_uppers = if let Some(pk) = parent_key { + let parent = self + .overlays + .get(pk) + .ok_or_else(|| anyhow::anyhow!("parent overlay '{pk}' not registered"))?; + let mut ancestors = vec![parent.upper_dir.clone()]; + ancestors.extend(parent.ancestor_uppers.iter().cloned()); + ancestors + } else { + vec![] + }; + + let mut lower_dirs: Vec<&Path> = ancestor_uppers.iter().map(PathBuf::as_path).collect(); + lower_dirs.push(&self.base_dir); + + let mount = OverlayMount::mount(&lower_dirs, &upper_dir, &work_dir, &merged_dir)?; + + let merged_path = merged_dir.clone(); + self.overlays.insert( + step_key.to_string(), + OverlayLayer { + upper_dir, + merged_dir, + ancestor_uppers, + _mount: Some(mount), + }, + ); + Ok(merged_path) + } +} + +impl std::fmt::Debug for WorkspaceManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WorkspaceManager") + .field("run_dir", &self.run_dir) + .field("workspaces", &self.workspaces.keys().collect::>()) + .finish_non_exhaustive() + } +} + +fn sanitize_key(s: &str) -> String { + s.chars() + .map(|c| { + if c.is_ascii_alphanumeric() || c == '_' || c == '-' { + c + } else { + '-' + } + }) + .collect() +} + +fn extract_tar_gz(bytes: &[u8], dest: &Path) -> Result<()> { + use flate2::read::GzDecoder; + + let decoder = GzDecoder::new(bytes); + let mut archive = tar::Archive::new(decoder); + archive + .unpack(dest) + .with_context(|| format!("extract archive to {}", dest.display())) +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + use std::fs; + + fn make_base(tmp: &std::path::Path) -> PathBuf { + let base = tmp.join("base"); + fs::create_dir(&base).unwrap(); + fs::write(base.join("main.rs"), b"fn main() {}").unwrap(); + base + } + + #[test] + fn root_step_clones_base() { + let tmp = tempfile::tempdir().unwrap(); + let base = make_base(tmp.path()); + let mut mgr = WorkspaceManager::from_base(tmp.path().join("run"), base).unwrap(); + + let ws = mgr.create_workspace("build", None).unwrap(); + assert_eq!( + fs::read_to_string(ws.join("main.rs")).unwrap(), + "fn main() {}" + ); + } + + #[test] + fn child_step_inherits_parent_changes() { + let tmp = tempfile::tempdir().unwrap(); + let base = make_base(tmp.path()); + let mut mgr = WorkspaceManager::from_base(tmp.path().join("run"), base).unwrap(); + + let ws_a = mgr.create_workspace("a", None).unwrap(); + fs::write(ws_a.join("artifact.bin"), b"built").unwrap(); + + let ws_b = mgr.create_workspace("b", Some("a")).unwrap(); + assert_eq!( + fs::read_to_string(ws_b.join("main.rs")).unwrap(), + "fn main() {}" + ); + assert_eq!( + fs::read_to_string(ws_b.join("artifact.bin")).unwrap(), + "built" + ); + } + + #[test] + fn fork_children_are_isolated() { + let tmp = tempfile::tempdir().unwrap(); + let base = make_base(tmp.path()); + let mut mgr = WorkspaceManager::from_base(tmp.path().join("run"), base).unwrap(); + + let ws_a = mgr.create_workspace("a", None).unwrap(); + fs::write(ws_a.join("from_a"), b"a").unwrap(); + + let ws_b = mgr.create_workspace("b", Some("a")).unwrap(); + let ws_c = mgr.create_workspace("c", Some("a")).unwrap(); + + fs::write(ws_b.join("from_b"), b"b").unwrap(); + assert!(!ws_c.join("from_b").exists(), "c must not see b's changes"); + } + + #[test] + fn workspace_path_returns_created() { + let tmp = tempfile::tempdir().unwrap(); + let base = make_base(tmp.path()); + let mut mgr = WorkspaceManager::from_base(tmp.path().join("run"), base).unwrap(); + + mgr.create_workspace("s", None).unwrap(); + assert!(mgr.workspace_path("s").is_some()); + assert!(mgr.workspace_path("nonexistent").is_none()); + } + + #[test] + fn create_workspace_from_cache_clones_cached_dir() { + let tmp = tempfile::tempdir().unwrap(); + let base = make_base(tmp.path()); + let cached = tmp.path().join("cached"); + fs::create_dir(&cached).unwrap(); + fs::write(cached.join("cached_file.txt"), b"from_cache").unwrap(); + + let mut mgr = WorkspaceManager::from_base(tmp.path().join("run"), base).unwrap(); + let ws = mgr.create_workspace_from_cache("s", &cached).unwrap(); + assert_eq!( + fs::read_to_string(ws.join("cached_file.txt")).unwrap(), + "from_cache" + ); + assert!(!ws.join("main.rs").exists()); + } + + #[test] + fn duplicate_step_key_errors() { + let tmp = tempfile::tempdir().unwrap(); + let base = make_base(tmp.path()); + let mut mgr = WorkspaceManager::from_base(tmp.path().join("run"), base).unwrap(); + mgr.create_workspace("dup", None).unwrap(); + assert!(mgr.create_workspace("dup", None).is_err()); + } + + #[test] + fn cleanup_removes_run_dir() { + let tmp = tempfile::tempdir().unwrap(); + let base = make_base(tmp.path()); + let run_dir = tmp.path().join("run"); + let mut mgr = WorkspaceManager::from_base(run_dir.clone(), base).unwrap(); + mgr.create_workspace("s", None).unwrap(); + assert!(run_dir.exists()); + + mgr.cleanup().unwrap(); + assert!(!run_dir.exists()); + } +} diff --git a/crates/hm/src/runner/docker.rs b/crates/hm/src/runner/docker.rs index 0b814a1..68183a7 100644 --- a/crates/hm/src/runner/docker.rs +++ b/crates/hm/src/runner/docker.rs @@ -1,9 +1,9 @@ //! Docker-based step runner. //! -//! Replaces the old `hm-plugin-docker` WASM plugin with direct Bollard -//! calls. All Docker orchestration (pull, start, extract, exec, commit, -//! stop+remove) runs through [`RunContext::docker`] with cancellation -//! support via [`RunContext::cancel`]. +//! Each step runs inside a Docker container with the workspace +//! bind-mounted from the host via COW clones. System-level state +//! (installed packages) propagates via Docker image commits; workspace +//! files propagate via host-side COW directory clones. use std::future::Future; use std::pin::Pin; @@ -18,38 +18,6 @@ use uuid::Uuid; use super::{RunContext, StepRunner}; use crate::orchestrator::events::EventBus; -// --------------------------------------------------------------------------- -// EXTRACT_CMD_SH -// --------------------------------------------------------------------------- - -/// Shell script for idempotent workspace extraction. Reads a `.harmont-extracted` -/// manifest to clean up files from a previous extract, then unpacks the new -/// archive and writes a fresh manifest. Files created by the step command -/// (e.g. `node_modules`, build artifacts) are not tracked and survive untouched. -const EXTRACT_CMD_SH: &str = r#"set -e -mkdir -p "$WORKDIR" -cd "$WORKDIR" -manifest="$WORKDIR/.harmont-extracted" -if [ -f "$manifest" ]; then - # Longest paths first: removes nested entries before their parents. - sort -r "$manifest" | while IFS= read -r p; do - [ -n "$p" ] || continue - if [ -d "$p" ] && [ ! -L "$p" ]; then - rmdir "$p" 2>/dev/null || true - else - rm -f "$p" 2>/dev/null || true - fi - done - rm -f "$manifest" -fi -# Stream the archive into a temp file so we can both list and extract. -tmp=$(mktemp) -trap 'rm -f "$tmp"' EXIT -cat > "$tmp" -tar -tzf "$tmp" > "$manifest" -tar -xzf "$tmp" -"#; - // --------------------------------------------------------------------------- // DockerRunner // --------------------------------------------------------------------------- @@ -75,14 +43,21 @@ impl StepRunner for DockerRunner { } // --------------------------------------------------------------------------- -// Core orchestration +// Step execution // --------------------------------------------------------------------------- +fn resolve_image(step: &CommandStep, input: &ExecutorInput) -> String { + if let Some(snap) = &input.parent_snapshot { + return snap.to_string(); + } + step.image + .clone() + .unwrap_or_else(|| "alpine:latest".to_string()) +} + async fn run_step(ctx: &RunContext, input: ExecutorInput) -> Result { let plan = decision_plan(&input.cache_lookup); - // Cache hit shortcut: no container, no exec; hand back the hit - // tag so downstream steps can boot from it. if !plan.run_command { return Ok(StepResult { exit_code: 0, @@ -91,15 +66,22 @@ async fn run_step(ctx: &RunContext, input: ExecutorInput) -> Result }); } - let image = resolve_image( - &input.step, - plan.hit_tag.as_ref(), - input.parent_snapshot.as_ref(), - ); + let workspace_mgr = &ctx.workspace; + + let workspace_path = { + let mgr = workspace_mgr + .lock() + .map_err(|_| anyhow::anyhow!("workspace manager mutex poisoned"))?; + mgr.workspace_path(&input.step.key) + .map(std::path::Path::to_path_buf) + .ok_or_else(|| anyhow::anyhow!("workspace for step '{}' not created", input.step.key))? + }; + + let image = resolve_image(&input.step, &input); let container_name = sanitize_container_name(&input.run_id.to_string(), &input.step.key); let env_vec: Vec = input.env.iter().map(|(k, v)| format!("{k}={v}")).collect(); - // Ensure the image is locally available. + // Pull image if needed. if !ctx.docker.image_exists(&image).await.unwrap_or(false) { let docker = ctx.docker.clone(); let cancel = ctx.cancel.clone(); @@ -111,20 +93,19 @@ async fn run_step(ctx: &RunContext, input: ExecutorInput) -> Result } } + // Start container with workspace bind mount. + let binds = vec![format!("{}:{}", workspace_path.display(), input.workdir)]; let cid = ctx .docker - .start_long_lived(&image, &env_vec, &input.workdir, &container_name) + .start_long_lived_with_mounts(&image, &env_vec, &input.workdir, &container_name, &binds) .await - .context("docker start failed")?; + .context("docker start with mounts failed")?; - // Always stop+remove the container, even on error. let result = run_in_container(ctx, &cid, &input, &env_vec, &plan).await; ctx.docker.stop_remove(&cid).await; result } -/// Inner body executed with a running container. Separated so the -/// caller can unconditionally clean up the container in all paths. async fn run_in_container( ctx: &RunContext, cid: &str, @@ -132,37 +113,6 @@ async fn run_in_container( env_vec: &[String], plan: &DecisionPlan, ) -> Result { - // --- Extract workspace archive --- - let archive = ctx.archives.read(input.workspace_archive_id, 0, u64::MAX); - if archive.is_empty() { - anyhow::bail!("archive {} is empty or unknown", input.workspace_archive_id); - } - - let docker = ctx.docker.clone(); - let cancel = ctx.cancel.clone(); - let cid_owned = cid.to_owned(); - let workdir = input.workdir.clone(); - let cmd = vec![ - "sh".to_string(), - "-c".to_string(), - EXTRACT_CMD_SH.replace("$WORKDIR", &workdir), - ]; - let extract_fut = async move { - let mut sink = tokio::io::sink(); - let rc = docker - .exec_streaming_stdin(&cid_owned, &cmd, &[], "/", &archive, &mut sink) - .await?; - if rc != 0 { - anyhow::bail!("tar extract exited {rc}"); - } - Ok::<(), anyhow::Error>(()) - }; - tokio::select! { - result = extract_fut => result.context("workspace extract failed")?, - () = cancel.cancelled() => anyhow::bail!("cancelled during workspace extract"), - } - - // --- Exec step command --- let mut writer = StepLogWriter::new(input.step_id, Arc::clone(&ctx.event_bus)); let docker = ctx.docker.clone(); let cancel = ctx.cancel.clone(); @@ -195,7 +145,9 @@ async fn run_in_container( )] let exit_code = rc as i32; - // --- Commit snapshot on success --- + // Commit container so child steps inherit system-level changes + // (installed packages, etc.). Workspace files propagate via COW + // bind mounts, but the container image captures everything else. let committed = if exit_code == 0 { let target_tag = plan.commit_to.clone().unwrap_or_else(|| { let safe: String = input @@ -268,35 +220,6 @@ fn decision_plan(decision: &CacheDecision) -> DecisionPlan { } } -// --------------------------------------------------------------------------- -// resolve_image -// --------------------------------------------------------------------------- - -/// Pick the base image for a step at boot time. -/// -/// Priority (high to low): -/// 1. Cache `hit_tag` — the host already located a satisfying snapshot. -/// 2. `parent_snapshot` — the previous step in this chain committed a -/// snapshot; chain-lineage requires we boot from it. -/// 3. The step's `image` field. -/// 4. Fall back to `"alpine:latest"`. -fn resolve_image( - step: &CommandStep, - hit_tag: Option<&SnapshotRef>, - parent_snapshot: Option<&SnapshotRef>, -) -> String { - if let Some(tag) = hit_tag { - return tag.to_string(); - } - if let Some(snap) = parent_snapshot { - return snap.to_string(); - } - if let Some(image) = &step.image { - return image.clone(); - } - "alpine:latest".to_string() -} - // --------------------------------------------------------------------------- // sanitize_container_name // --------------------------------------------------------------------------- @@ -413,33 +336,44 @@ mod tests { } } - // -- resolve_image ------------------------------------------------------- - - #[test] - fn resolve_image_hit_tag_wins() { - let s = step_with_image(Some("rust:1.82")); - let hit = SnapshotRef("cache:tag".into()); - let parent = SnapshotRef("parent:tag".into()); - assert_eq!(resolve_image(&s, Some(&hit), Some(&parent)), "cache:tag"); + fn make_input(step: CommandStep, parent_snapshot: Option) -> ExecutorInput { + ExecutorInput { + step, + workspace_archive_id: hm_plugin_protocol::ArchiveId::from(uuid::Uuid::nil()), + env: std::collections::BTreeMap::new(), + workdir: "/workspace".into(), + run_id: uuid::Uuid::nil(), + step_id: uuid::Uuid::nil(), + cache_lookup: CacheDecision::MissNoCommit, + parent_snapshot, + } } - #[test] - fn resolve_image_parent_snapshot_beats_step_image() { - let s = step_with_image(Some("rust:1.82")); - let parent = SnapshotRef("parent:tag".into()); - assert_eq!(resolve_image(&s, None, Some(&parent)), "parent:tag"); - } + // -- resolve_image ---------------------------------------------------- #[test] - fn resolve_image_step_image_used() { + fn resolve_image_uses_step_image() { let s = step_with_image(Some("rust:1.82")); - assert_eq!(resolve_image(&s, None, None), "rust:1.82"); + let input = make_input(s.clone(), None); + assert_eq!(resolve_image(&s, &input), "rust:1.82"); } #[test] fn resolve_image_fallback_alpine() { let s = step_with_image(None); - assert_eq!(resolve_image(&s, None, None), "alpine:latest"); + let input = make_input(s.clone(), None); + assert_eq!(resolve_image(&s, &input), "alpine:latest"); + } + + #[test] + fn resolve_image_prefers_parent_snapshot() { + let s = step_with_image(Some("rust:1.82")); + let snap = SnapshotRef::from("harmont-local-ephemeral/base:abc123".to_string()); + let input = make_input(s.clone(), Some(snap)); + assert_eq!( + resolve_image(&s, &input), + "harmont-local-ephemeral/base:abc123" + ); } // -- decision_plan ------------------------------------------------------- diff --git a/crates/hm/src/runner/mod.rs b/crates/hm/src/runner/mod.rs index aefeec8..4944800 100644 --- a/crates/hm/src/runner/mod.rs +++ b/crates/hm/src/runner/mod.rs @@ -9,7 +9,7 @@ use std::collections::HashMap; use std::fmt; use std::future::Future; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use anyhow::Result; use hm_plugin_protocol::{BuildEvent, ExecutorInput, StepResult}; @@ -18,6 +18,7 @@ use tokio_util::sync::CancellationToken; use crate::orchestrator::archive::ArchiveStore; use crate::orchestrator::docker_client::DockerClient; use crate::orchestrator::events::EventBus; +use crate::orchestrator::workspace::WorkspaceManager; pub mod docker; @@ -36,6 +37,7 @@ pub struct RunContext { pub event_bus: Arc, pub archives: Arc, pub cancel: CancellationToken, + pub workspace: Arc>, } // --------------------------------------------------------------------------- diff --git a/crates/hm/tests/cow_workspace.rs b/crates/hm/tests/cow_workspace.rs new file mode 100644 index 0000000..bcbdcdf --- /dev/null +++ b/crates/hm/tests/cow_workspace.rs @@ -0,0 +1,75 @@ +#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] +//! End-to-end test for COW workspace mode. +//! +//! Verifies that `hm run --cow` correctly propagates workspace state +//! across a three-step chain: step `a` writes a file, step `b` reads +//! it and writes another, step `c` reads both — proving COW workspace +//! inheritance works through the entire chain. +//! +//! Skipped unless `HARMONT_LOCAL_E2E=1` is set AND a Docker daemon is +//! reachable. +//! +//! ```sh +//! HARMONT_LOCAL_E2E=1 cargo test --test cow_workspace -- --test-threads=1 +//! ``` + +use std::path::PathBuf; +use std::process::Command; + +/// Returns true when the test should no-op. +fn skip_if_no_docker() -> bool { + if std::env::var_os("HARMONT_LOCAL_E2E").is_none() { + return true; + } + let out = Command::new("docker") + .args(["version", "--format", "{{.Server.Version}}"]) + .output(); + match out { + Ok(o) => !o.status.success(), + Err(_) => true, + } +} + +fn fixture(name: &str) -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests/fixtures/pipelines") + .join(name) +} + +#[test] +fn cow_chain_inherits_workspace() { + if skip_if_no_docker() { + return; + } + + let bin = env!("CARGO_BIN_EXE_hm"); + let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let repo_root = manifest_dir + .parent() + .expect("CARGO_MANIFEST_DIR must have a parent (repo root)"); + + let tmp = tempfile::tempdir().expect("mktempdir"); + let harmont_dir = tmp.path().join(".harmont"); + std::fs::create_dir(&harmont_dir).expect("mkdir .harmont"); + std::fs::copy(fixture("cow_chain.py"), harmont_dir.join("cow_chain.py")) + .expect("copy fixture into .harmont/"); + + let out = Command::new(bin) + .args(["run", "--cow", "--logs", "--dir"]) + .arg(tmp.path()) + .arg("cow-chain") + .env("HARMONT_CIDSL_PY", repo_root.join("cidsl/py")) + .output() + .expect("spawning harmont binary should not fail"); + + let stderr = String::from_utf8_lossy(&out.stderr); + let stdout = String::from_utf8_lossy(&out.stdout); + assert!( + out.status.success(), + "hm run --cow failed.\nstdout:\n{stdout}\nstderr:\n{stderr}" + ); + assert!( + stderr.contains("c-saw-both"), + "step c must see files from a and b via COW workspace.\nstdout:\n{stdout}\nstderr:\n{stderr}" + ); +} diff --git a/crates/hm/tests/fixtures/pipelines/cow_chain.py b/crates/hm/tests/fixtures/pipelines/cow_chain.py new file mode 100644 index 0000000..70f3952 --- /dev/null +++ b/crates/hm/tests/fixtures/pipelines/cow_chain.py @@ -0,0 +1,10 @@ +"""COW workspace E2E fixture — three-step chain proving workspace inheritance.""" +import harmont as hm + + +@hm.pipeline("cow-chain", default_image="alpine:latest") +def cow_chain(): + a = hm.sh("echo from-a > /workspace/a.txt", label="a") + b = a.sh("cat /workspace/a.txt && echo from-b > /workspace/b.txt", label="b") + c = b.sh("cat /workspace/a.txt && cat /workspace/b.txt && echo c-saw-both", label="c") + return [c]