Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/cli/src/commands/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,17 @@ pub async fn execute(args: CreateArgs) -> Result<(), Box<dyn std::error::Error>>
};

let record_for_cleanup = record.clone();
let mut state = StateFile::load_default()?;
if let Err(error) = state.add(record) {
// Atomic append under the state lock so concurrent `create`/`run` cannot
// lose records (load_default()+add() is a lost-update race).
if let Err(error) = StateFile::add_record(record) {
let mut state = StateFile::load_default()?;
crate::cleanup::cleanup_partial_box_record(&record_for_cleanup, Some(&mut state));
return Err(error.into());
}

// Attach named volumes to this box
if let Err(error) = super::volume::attach_volumes(&volume_names, &box_id) {
let mut state = StateFile::load_default()?;
crate::cleanup::cleanup_partial_box_record(&record_for_cleanup, Some(&mut state));
return Err(error);
}
Expand Down
58 changes: 35 additions & 23 deletions src/cli/src/commands/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub async fn execute(args: MonitorArgs) -> Result<(), Box<dyn std::error::Error>
/// Single poll iteration: load state, find dead boxes, restart eligible ones.
/// Also checks for unhealthy boxes that have a restart policy.
async fn poll_once(tracker: &mut BackoffTracker) -> Result<(), Box<dyn std::error::Error>> {
let mut state = StateFile::load_default()?;
let state = StateFile::load_default()?;

// Track active boxes for stability detection.
for record in state.records() {
Expand All @@ -162,7 +162,7 @@ async fn poll_once(tracker: &mut BackoffTracker) -> Result<(), Box<dyn std::erro
}
}

run_due_health_checks(&mut state).await?;
run_due_health_checks(&state).await?;

// Find boxes that need restarting: dead boxes + unhealthy running boxes
let mut candidates = state.pending_restarts();
Expand Down Expand Up @@ -198,14 +198,17 @@ async fn poll_once(tracker: &mut BackoffTracker) -> Result<(), Box<dyn std::erro
crate::process::graceful_stop(pid, libc::SIGTERM, 10).await;
}
tracker.mark_dead(&box_id);
// Mark as dead so boot_from_record works
if let Some(rec) = state.find_by_id_mut(&box_id) {
rec.status = "dead".to_string();
rec.pid = None;
rec.health_status = "none".to_string();
rec.health_retries = 0;
}
state.save()?;
// Mark as dead so boot_from_record works; re-load fresh under the
// lock and touch only this box's fields.
StateFile::modify(|s| {
if let Some(rec) = s.find_by_id_mut(&box_id) {
rec.status = "dead".to_string();
rec.pid = None;
rec.health_status = "none".to_string();
rec.health_retries = 0;
}
Ok::<(), std::io::Error>(())
})?;
} else {
tracker.mark_dead(&box_id);
println!("{}", restart_log_line(&record, RestartReason::Dead));
Expand All @@ -214,15 +217,20 @@ async fn poll_once(tracker: &mut BackoffTracker) -> Result<(), Box<dyn std::erro
// Attempt restart
match boot::boot_from_record(&record).await {
Ok(result) => {
// Update record to running
if let Some(rec) = state.find_by_id_mut(&box_id) {
boot::apply_boot_result(rec, result, boot::RestartCountUpdate::Increment);
}
state.save()?;
// Re-load fresh under the lock and apply only this box's
// restart fields, returning the new restart count for logging.
let new_count = StateFile::modify(|s| {
let count = if let Some(rec) = s.find_by_id_mut(&box_id) {
boot::apply_boot_result(rec, result, boot::RestartCountUpdate::Increment);
rec.restart_count
} else {
0
};
Ok::<u32, std::io::Error>(count)
})?;
tracker.record_attempt(&box_id);
println!(
"monitor: box {name} ({short_id}) restarted (count: {})",
state.find_by_id(&box_id).map_or(0, |r| r.restart_count),
"monitor: box {name} ({short_id}) restarted (count: {new_count})",
name = record.name,
short_id = record.short_id,
);
Expand Down Expand Up @@ -288,7 +296,7 @@ fn format_exit_code(exit_code: Option<i32>) -> String {
}

#[cfg(not(windows))]
async fn run_due_health_checks(state: &mut StateFile) -> Result<(), Box<dyn std::error::Error>> {
async fn run_due_health_checks(state: &StateFile) -> Result<(), Box<dyn std::error::Error>> {
let now = chrono::Utc::now();
let probes: Vec<_> = state
.records()
Expand Down Expand Up @@ -316,17 +324,21 @@ async fn run_due_health_checks(state: &mut StateFile) -> Result<(), Box<dyn std:
health::probe_timeout_ns(&health_check),
)
.await;
if let Some(record) = state.find_by_id_mut(&box_id) {
health::apply_probe_result(record, healthy, chrono::Utc::now());
}
// Re-load fresh under the lock and apply only this box's health fields,
// so concurrent CLI/health-checker writes are preserved.
StateFile::modify(|s| {
if let Some(record) = s.find_by_id_mut(&box_id) {
health::apply_probe_result(record, healthy, chrono::Utc::now());
}
Ok::<(), std::io::Error>(())
})?;
}

state.save()?;
Ok(())
}

#[cfg(windows)]
async fn run_due_health_checks(_state: &mut StateFile) -> Result<(), Box<dyn std::error::Error>> {
async fn run_due_health_checks(_state: &StateFile) -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}

Expand Down
5 changes: 3 additions & 2 deletions src/cli/src/commands/rm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ fn rm_one(
let name = record.name.clone();
cleanup::cleanup_removed_box(&record);

// Remove from state
state.remove(&box_id)?;
// Remove from state atomically under the lock (avoids clobbering concurrent
// monitor/CLI writers that rewrite the whole record vector).
StateFile::remove_record(&box_id)?;
println!("{name}");

Ok(())
Expand Down
28 changes: 16 additions & 12 deletions src/cli/src/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,23 @@ async fn run_health_loop(box_id: String, exec_socket_path: PathBuf, hc: HealthCh

let healthy = run_probe(&exec_socket_path, &hc.cmd, timeout_ns).await;

let Ok(mut state) = StateFile::load_default() else {
continue;
};
let Some(record) = state.find_by_id_mut(&box_id) else {
break; // Box removed from state
};
if record.status != "running" {
break;
// Reload fresh under the state lock and apply ONLY this box's health
// fields, so concurrent monitor/CLI writers are not clobbered.
let keep_going = StateFile::modify(|state| {
let Some(record) = state.find_by_id_mut(&box_id) else {
return Ok::<bool, std::io::Error>(false); // box removed
};
if record.status != "running" {
return Ok(false); // box stopped
}
apply_probe_result(record, healthy, chrono::Utc::now());
Ok(true)
});
match keep_going {
Ok(true) => {}
Ok(false) => break,
Err(_) => continue,
}

apply_probe_result(record, healthy, chrono::Utc::now());

let _ = state.save();
}
}

Expand Down
54 changes: 52 additions & 2 deletions src/cli/src/state/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,63 @@ impl StateFile {
Self::load(&home.join("boxes.json"))
}

/// Save state to disk atomically (write to .tmp, then rename).
/// Save state to disk atomically under the cross-process state lock.
pub fn save(&self) -> Result<(), std::io::Error> {
let _lock = super::lock::StateLock::acquire()?;
self.write_to_disk()
}

/// Atomic write (tmp + rename) WITHOUT taking the state lock. Callers that
/// already hold the lock (`save`, `modify`, and `reconcile` which runs
/// inside `load`) use this to avoid re-locking (`flock` is not reentrant).
fn write_to_disk(&self) -> Result<(), std::io::Error> {
let data = serde_json::to_string_pretty(&self.records).map_err(std::io::Error::other)?;
let tmp_path = self.path.with_extension("json.tmp");
std::fs::write(&tmp_path, &data)?;
std::fs::rename(&tmp_path, &self.path)?;
Ok(())
}

/// Atomically apply `f` to the on-disk state under the exclusive
/// cross-process lock: load fresh → mutate → save, all while the lock is
/// held. This is the race-free read-modify-write primitive — every writer
/// should mutate through it (or, for async work, snapshot inputs before the
/// await and call `modify` afterward to re-apply only its owned fields), so
/// the monitor/compose/health/CLI cannot clobber each other.
///
/// `f` MUST be synchronous and MUST NOT `.await` (holding an OS lock across
/// a task yield would serialize or deadlock the async runtime).
pub fn modify<R, E>(f: impl FnOnce(&mut StateFile) -> Result<R, E>) -> Result<R, E>
where
E: From<std::io::Error>,
{
let _lock = super::lock::StateLock::acquire()?;
let mut sf = Self::load_default()?;
let out = f(&mut sf)?;
sf.write_to_disk()?;
Ok(out)
}

/// Append a record atomically under the state lock (load fresh → push →
/// save). Use this instead of `load_default()? + add()` so concurrent
/// appends/removals cannot lose records.
pub fn add_record(record: BoxRecord) -> Result<(), std::io::Error> {
Self::modify(|sf| {
sf.records.push(record);
Ok::<(), std::io::Error>(())
})
}

/// Remove a record by id atomically under the state lock. Returns whether a
/// record was removed.
pub fn remove_record(id: &str) -> Result<bool, std::io::Error> {
Self::modify(|sf| {
let before = sf.records.len();
sf.records.retain(|r| r.id != id);
Ok::<bool, std::io::Error>(sf.records.len() < before)
})
}

/// Add a record and persist.
pub fn add(&mut self, record: BoxRecord) -> Result<(), std::io::Error> {
self.records.push(record);
Expand Down Expand Up @@ -158,7 +206,9 @@ impl StateFile {
}

if changed {
let _ = self.save();
// reconcile runs inside `load`, which `modify` calls while holding
// the state lock; use the unlocked write to avoid re-locking.
let _ = self.write_to_disk();
}

restart_candidates
Expand Down
49 changes: 49 additions & 0 deletions src/cli/src/state/lock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//! Cross-process advisory lock for the box state file.

/// RAII exclusive advisory lock guarding `boxes.json` mutations.
///
/// Held for the duration of a [`StateFile::modify`](super::StateFile::modify)
/// (and each [`save`](super::StateFile::save)) so concurrent processes — the
/// `monitor` daemon, `compose`, per-box health checkers, and plain CLI
/// commands — cannot interleave a read-modify-write and clobber each other's
/// fields (`save` rewrites the whole record vector).
///
/// The lock lives on a sibling `boxes.json.lock` file, never on `boxes.json`
/// itself (whose atomic tmp+rename would swap the inode out from under a held
/// lock). `flock` is released automatically when the holder exits or crashes,
/// so a killed monitor/CLI never leaves a stale lock.
pub(crate) struct StateLock {
#[cfg(unix)]
_file: std::fs::File,
}

impl StateLock {
/// Acquire the exclusive advisory lock, blocking until it is available.
#[cfg(unix)]
pub(crate) fn acquire() -> std::io::Result<Self> {
use std::os::unix::io::AsRawFd;

let path = a3s_box_core::dirs_home().join("boxes.json.lock");
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&path)?;
// Blocking exclusive advisory lock; released when `file` drops.
if unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX) } != 0 {
return Err(std::io::Error::last_os_error());
}
Ok(Self { _file: file })
}

/// Non-Unix fallback: the atomic tmp+rename in `save` still prevents torn
/// reads; multi-writer concurrency is not a supported Windows scenario.
#[cfg(not(unix))]
pub(crate) fn acquire() -> std::io::Result<Self> {
Ok(Self {})
}
}
1 change: 1 addition & 0 deletions src/cli/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! On every load, dead active PIDs are reconciled to mark boxes as dead.

mod file;
mod lock;
pub(crate) mod policy;
#[cfg(test)]
mod tests;
Expand Down
4 changes: 2 additions & 2 deletions src/cli/tests/core_smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ impl CoreSmoke {
fn open_pty() -> Result<(RawFd, RawFd), String> {
let mut master: libc::c_int = -1;
let mut slave: libc::c_int = -1;
let mut winsize = libc::winsize {
let winsize = libc::winsize {
ws_row: 24,
ws_col: 80,
ws_xpixel: 0,
Expand All @@ -337,7 +337,7 @@ impl CoreSmoke {
&mut slave,
std::ptr::null_mut(),
std::ptr::null_mut(),
&mut winsize,
&winsize,
)
};
if rc != 0 {
Expand Down
Loading