Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions hyperdrive/src/kernel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
collections::{HashMap, HashSet},
path::PathBuf,
sync::Arc,
time::Duration,
};
use tokio::{
sync::{mpsc, Mutex},
Expand Down Expand Up @@ -43,16 +44,14 @@ enum ProcessSender {
pub type ProcessRestartBackoffs = HashMap<t::ProcessId, Arc<Mutex<Option<RestartBackoff>>>>;

pub struct RestartBackoff {
/// if try to restart before this:
/// * wait till `next_soonest_restart_time`
/// * increment `consecutive_attempts`
/// else if try to restart after this:
/// * set `consecutive_attempts = 0`,
/// and in either case:
/// set `next_soonest_restart_time += 2 ** consecutive_attempts` seconds
/// earliest time at which the kernel may attempt another restart
next_soonest_restart_time: tokio::time::Instant,
/// how many times has process tried to restart in a row
/// count of consecutive unhealthy exits (starts at 1)
consecutive_attempts: u32,
/// delay that gated the launch of the currently running attempt
current_backoff: Duration,
/// instant when the current process instance began executing
last_start_time: tokio::time::Instant,
/// task that will do the restart after wait time has elapsed
_restart_handle: Option<JoinHandle<()>>,
}
Expand Down
101 changes: 76 additions & 25 deletions hyperdrive/src/kernel/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex as StdMutex,
},
time::Duration,
};
use tokio::{sync::Mutex, task::JoinHandle};
use wasmtime::{
Expand All @@ -21,6 +22,10 @@ use wasmtime_wasi_io::{async_trait, poll::Pollable, streams::OutputStream};
use super::RestartBackoff;

const STACK_TRACE_SIZE: usize = 5000;
const BASE_BACKOFF_SECS: u64 = 1;
const MAX_BACKOFF_SECS: u64 = 10 * 60;
const MAX_HEALTHY_DURATION_SECS: u64 = 50 * 60;
const HEALTHY_RUN_MULTIPLIER: u32 = 5;

pub struct ProcessContext {
// store predecessor in order to set prompting message when popped
Expand Down Expand Up @@ -186,6 +191,24 @@ pub async fn make_process_loop(
send_to_process.send(message).await?;
}

let run_started_at = tokio::time::Instant::now();
if let Some(restart_backoff) = &maybe_restart_backoff {
let mut restart_backoff_lock = restart_backoff.lock().await;
if restart_backoff_lock.is_none() {
let base_backoff = Duration::from_secs(BASE_BACKOFF_SECS);
*restart_backoff_lock = Some(RestartBackoff {
next_soonest_restart_time: run_started_at + base_backoff,
consecutive_attempts: 1,
current_backoff: base_backoff,
last_start_time: run_started_at,
_restart_handle: None,
});
} else if let Some(ref mut rb) = *restart_backoff_lock {
rb.last_start_time = run_started_at;
rb._restart_handle = None;
}
}

let our = metadata.our.clone();
let wit_version = metadata.wit_version.clone();

Expand Down Expand Up @@ -286,27 +309,58 @@ pub async fn make_process_loop(
let restart_backoff = maybe_restart_backoff.unwrap();
let mut restart_backoff_lock = restart_backoff.lock().await;
let now = tokio::time::Instant::now();
let (wait_till, next_soonest_restart_time, consecutive_attempts) =
match *restart_backoff_lock {
None => (None, now + tokio::time::Duration::from_secs(1), 0),
Some(ref rb) => {
if rb.next_soonest_restart_time <= now {
// no need to wait
(None, now + tokio::time::Duration::from_secs(1), 0)
} else {
// must wait
let base: u64 = 2;
(
Some(rb.next_soonest_restart_time.clone()),
rb.next_soonest_restart_time.clone()
+ tokio::time::Duration::from_secs(
base.pow(rb.consecutive_attempts),
),
rb.consecutive_attempts.clone() + 1,
)
}
}
let base_backoff = Duration::from_secs(BASE_BACKOFF_SECS);
let mut restart_state = restart_backoff_lock
.take()
.unwrap_or_else(|| RestartBackoff {
next_soonest_restart_time: now + base_backoff,
consecutive_attempts: 1,
current_backoff: base_backoff,
last_start_time: now,
_restart_handle: None,
});

let uptime = now
.checked_duration_since(restart_state.last_start_time)
.unwrap_or_default();
let max_healthy_duration = Duration::from_secs(MAX_HEALTHY_DURATION_SECS);
let healthy_duration = restart_state
.current_backoff
.checked_mul(HEALTHY_RUN_MULTIPLIER)
.unwrap_or(max_healthy_duration)
.min(max_healthy_duration);
let compute_backoff = |attempt: u32| -> Duration {
let exponent = attempt.saturating_sub(1).min(63);
let secs = if exponent >= 63 {
u64::MAX
} else {
1u64 << exponent
};
let secs = secs.max(BASE_BACKOFF_SECS).min(MAX_BACKOFF_SECS);
Duration::from_secs(secs)
};

if uptime >= healthy_duration {
restart_state.consecutive_attempts = 1;
restart_state.current_backoff = base_backoff;
} else {
restart_state.consecutive_attempts =
restart_state.consecutive_attempts.saturating_add(1);
restart_state.current_backoff = compute_backoff(restart_state.consecutive_attempts);
}

let wait_duration = restart_state
.current_backoff
.min(Duration::from_secs(MAX_BACKOFF_SECS));
restart_state.current_backoff = wait_duration;
restart_state.next_soonest_restart_time = now + wait_duration;
restart_state._restart_handle = None;

let wait_till = if restart_state.consecutive_attempts == 1 {
None
} else {
Some(restart_state.next_soonest_restart_time)
};

// get caps before killing
let (tx, rx) = tokio::sync::oneshot::channel();
Expand Down Expand Up @@ -404,11 +458,8 @@ pub async fn make_process_loop(
reinitialize.await;
})),
};
*restart_backoff_lock = Some(RestartBackoff {
next_soonest_restart_time,
consecutive_attempts,
_restart_handle: restart_handle,
});
restart_state._restart_handle = restart_handle;
*restart_backoff_lock = Some(restart_state);
}
// if requests, fire them
t::OnExit::Requests(requests) => {
Expand Down