From 6bfb99f39637d1f771864dbac1de722238e384c2 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Thu, 16 Oct 2025 15:53:50 -0700 Subject: [PATCH] kernel: improve Restart exp backoff --- hyperdrive/src/kernel/mod.rs | 15 +++-- hyperdrive/src/kernel/process.rs | 101 +++++++++++++++++++++++-------- 2 files changed, 83 insertions(+), 33 deletions(-) diff --git a/hyperdrive/src/kernel/mod.rs b/hyperdrive/src/kernel/mod.rs index 0d2f9f19c..802f0f44d 100644 --- a/hyperdrive/src/kernel/mod.rs +++ b/hyperdrive/src/kernel/mod.rs @@ -4,6 +4,7 @@ use std::{ collections::{HashMap, HashSet}, path::PathBuf, sync::Arc, + time::Duration, }; use tokio::{ sync::{mpsc, Mutex}, @@ -43,16 +44,14 @@ enum ProcessSender { pub type ProcessRestartBackoffs = HashMap>>>; pub struct RestartBackoff { - /// if try to restart before this: - /// * wait till `next_soonest_restart_time` - /// * increment `consecutive_attempts` - /// else if try to restart after this: - /// * set `consecutive_attempts = 0`, - /// and in either case: - /// set `next_soonest_restart_time += 2 ** consecutive_attempts` seconds + /// earliest time at which the kernel may attempt another restart next_soonest_restart_time: tokio::time::Instant, - /// how many times has process tried to restart in a row + /// count of consecutive unhealthy exits (starts at 1) consecutive_attempts: u32, + /// delay that gated the launch of the currently running attempt + current_backoff: Duration, + /// instant when the current process instance began executing + last_start_time: tokio::time::Instant, /// task that will do the restart after wait time has elapsed _restart_handle: Option>, } diff --git a/hyperdrive/src/kernel/process.rs b/hyperdrive/src/kernel/process.rs index dea471723..98b9b0aec 100644 --- a/hyperdrive/src/kernel/process.rs +++ b/hyperdrive/src/kernel/process.rs @@ -7,6 +7,7 @@ use std::{ atomic::{AtomicUsize, Ordering}, Arc, Mutex as StdMutex, }, + time::Duration, }; use tokio::{sync::Mutex, task::JoinHandle}; use wasmtime::{ @@ -21,6 +22,10 @@ use wasmtime_wasi_io::{async_trait, poll::Pollable, streams::OutputStream}; use super::RestartBackoff; const STACK_TRACE_SIZE: usize = 5000; +const BASE_BACKOFF_SECS: u64 = 1; +const MAX_BACKOFF_SECS: u64 = 10 * 60; +const MAX_HEALTHY_DURATION_SECS: u64 = 50 * 60; +const HEALTHY_RUN_MULTIPLIER: u32 = 5; pub struct ProcessContext { // store predecessor in order to set prompting message when popped @@ -186,6 +191,24 @@ pub async fn make_process_loop( send_to_process.send(message).await?; } + let run_started_at = tokio::time::Instant::now(); + if let Some(restart_backoff) = &maybe_restart_backoff { + let mut restart_backoff_lock = restart_backoff.lock().await; + if restart_backoff_lock.is_none() { + let base_backoff = Duration::from_secs(BASE_BACKOFF_SECS); + *restart_backoff_lock = Some(RestartBackoff { + next_soonest_restart_time: run_started_at + base_backoff, + consecutive_attempts: 1, + current_backoff: base_backoff, + last_start_time: run_started_at, + _restart_handle: None, + }); + } else if let Some(ref mut rb) = *restart_backoff_lock { + rb.last_start_time = run_started_at; + rb._restart_handle = None; + } + } + let our = metadata.our.clone(); let wit_version = metadata.wit_version.clone(); @@ -286,27 +309,58 @@ pub async fn make_process_loop( let restart_backoff = maybe_restart_backoff.unwrap(); let mut restart_backoff_lock = restart_backoff.lock().await; let now = tokio::time::Instant::now(); - let (wait_till, next_soonest_restart_time, consecutive_attempts) = - match *restart_backoff_lock { - None => (None, now + tokio::time::Duration::from_secs(1), 0), - Some(ref rb) => { - if rb.next_soonest_restart_time <= now { - // no need to wait - (None, now + tokio::time::Duration::from_secs(1), 0) - } else { - // must wait - let base: u64 = 2; - ( - Some(rb.next_soonest_restart_time.clone()), - rb.next_soonest_restart_time.clone() - + tokio::time::Duration::from_secs( - base.pow(rb.consecutive_attempts), - ), - rb.consecutive_attempts.clone() + 1, - ) - } - } + let base_backoff = Duration::from_secs(BASE_BACKOFF_SECS); + let mut restart_state = restart_backoff_lock + .take() + .unwrap_or_else(|| RestartBackoff { + next_soonest_restart_time: now + base_backoff, + consecutive_attempts: 1, + current_backoff: base_backoff, + last_start_time: now, + _restart_handle: None, + }); + + let uptime = now + .checked_duration_since(restart_state.last_start_time) + .unwrap_or_default(); + let max_healthy_duration = Duration::from_secs(MAX_HEALTHY_DURATION_SECS); + let healthy_duration = restart_state + .current_backoff + .checked_mul(HEALTHY_RUN_MULTIPLIER) + .unwrap_or(max_healthy_duration) + .min(max_healthy_duration); + let compute_backoff = |attempt: u32| -> Duration { + let exponent = attempt.saturating_sub(1).min(63); + let secs = if exponent >= 63 { + u64::MAX + } else { + 1u64 << exponent }; + let secs = secs.max(BASE_BACKOFF_SECS).min(MAX_BACKOFF_SECS); + Duration::from_secs(secs) + }; + + if uptime >= healthy_duration { + restart_state.consecutive_attempts = 1; + restart_state.current_backoff = base_backoff; + } else { + restart_state.consecutive_attempts = + restart_state.consecutive_attempts.saturating_add(1); + restart_state.current_backoff = compute_backoff(restart_state.consecutive_attempts); + } + + let wait_duration = restart_state + .current_backoff + .min(Duration::from_secs(MAX_BACKOFF_SECS)); + restart_state.current_backoff = wait_duration; + restart_state.next_soonest_restart_time = now + wait_duration; + restart_state._restart_handle = None; + + let wait_till = if restart_state.consecutive_attempts == 1 { + None + } else { + Some(restart_state.next_soonest_restart_time) + }; // get caps before killing let (tx, rx) = tokio::sync::oneshot::channel(); @@ -404,11 +458,8 @@ pub async fn make_process_loop( reinitialize.await; })), }; - *restart_backoff_lock = Some(RestartBackoff { - next_soonest_restart_time, - consecutive_attempts, - _restart_handle: restart_handle, - }); + restart_state._restart_handle = restart_handle; + *restart_backoff_lock = Some(restart_state); } // if requests, fire them t::OnExit::Requests(requests) => {