diff --git a/packages/pocket-ic/src/lib.rs b/packages/pocket-ic/src/lib.rs index ba2023246ea..a184d1f8591 100644 --- a/packages/pocket-ic/src/lib.rs +++ b/packages/pocket-ic/src/lib.rs @@ -18,40 +18,6 @@ const POCKET_IC_BIN_PATH: &str = "../../target/debug/pocket-ic-backend"; type InstanceId = String; -fn get_service_port(lock_file: PathBuf, bin_path: PathBuf) -> u16 { - let start = Instant::now(); - loop { - match lock_file.try_exists() { - Ok(true) => { - // If the file exists, we expect it to have content, otherwise we need not continue. - let line = std::fs::read_to_string(lock_file) - .expect("Failed to read port from lock file."); - return line.parse().expect("Failed to parse port to number"); - } - _ => { - println!("Attempting to start PocketIC server..."); - let _process = Command::new(bin_path.clone()) - .arg("--lock-file") - .arg(lock_file.clone().as_os_str()) - .spawn() - .unwrap_or_else(|_| { - panic!( - "Failed to launch PocketIC server process from bin_path {:?}", - bin_path - ) - }); - std::thread::sleep(Duration::from_millis(20)); - // Now, try again. the child process will try to acquire a lock on the lock_file, - // so even though we may launch a second child process, it will shutdown immediately - // if it fails to get the lock. - } - } - if start.elapsed() > Duration::from_secs(5) { - panic!("Failed to start PocketIC service in time."); - } - } -} - // ====================================================================================================== // Code borrowed from https://github.com/dfinity/test-state-machine-client/blob/main/src/lib.rs // The StateMachine struct is renamed to `PocketIc` and given new interface. @@ -67,8 +33,16 @@ pub struct PocketIc { impl PocketIc { pub fn new() -> Self { + // Attempt to start new PocketIC backend if it's not already running. + let parent_pid = std::os::unix::process::parent_id(); + Command::new(PathBuf::from(POCKET_IC_BIN_PATH)) + .arg("--pid") + .arg(parent_pid.to_string()) + .spawn() + .expect("Failed to start PocketIC binary"); + // Use the parent process ID to find the PocketIC backend port for this `cargo test` run. + let daemon_url = Self::get_daemon_url(parent_pid); let reqwest_client = reqwest::blocking::Client::new(); - let daemon_url = Self::start_or_reuse_daemon(); let instance_id = reqwest_client .post(daemon_url.join("instance").unwrap()) .send() @@ -90,15 +64,27 @@ impl PocketIc { } } - fn start_or_reuse_daemon() -> Url { - // use the parent process ID, so that we have one PocketIC per `cargo test` invocation - let ppid = std::os::unix::process::parent_id(); - let lock_file = std::env::temp_dir().join(format!("pocket_ic.{}.port", ppid)); - let port = get_service_port(lock_file, PathBuf::from(POCKET_IC_BIN_PATH)); - let url = - Url::parse(&format!("http://{}:{}/", LOCALHOST, port)).expect("Failed to parse url."); - println!("Found PocketIC server running on {}", &url); - url + fn get_daemon_url(parent_pid: u32) -> Url { + let port_file_path = std::env::temp_dir().join(format!("pocket_ic_{}.port", parent_pid)); + let ready_file_path = std::env::temp_dir().join(format!("pocket_ic_{}.ready", parent_pid)); + let start = Instant::now(); + loop { + match ready_file_path.try_exists() { + Ok(true) => { + let port_string = std::fs::read_to_string(port_file_path) + .expect("Failed to read port from port file"); + let port: u16 = port_string.parse().expect("Failed to parse port to number"); + let daemon_url = + Url::parse(&format!("http://{}:{}/", LOCALHOST, port)).unwrap(); + println!("Found PocketIC running at {}", daemon_url); + return daemon_url; + } + _ => std::thread::sleep(Duration::from_millis(20)), + } + if start.elapsed() > Duration::from_secs(5) { + panic!("Failed to start PocketIC service in time"); + } + } } pub fn list_instances(&self) -> Vec { diff --git a/rs/pocket_ic_backend/src/main.rs b/rs/pocket_ic_backend/src/main.rs index 7935893f392..0569a871b14 100644 --- a/rs/pocket_ic_backend/src/main.rs +++ b/rs/pocket_ic_backend/src/main.rs @@ -3,7 +3,6 @@ use axum::extract::State; use axum::routing::post; use axum::{extract::Path, http::StatusCode, routing::get, Router, Server}; use clap::Parser; -use file_lock::{FileLock, FileOptions}; use ic_config::execution_environment; use ic_config::subnet_config::SubnetConfig; use ic_crypto::threshold_sig_public_key_to_der; @@ -17,6 +16,7 @@ use itertools::Itertools; use pocket_ic::{CanisterCall, RawCanisterId, Request, Request::*}; use serde::Serialize; use std::collections::HashMap; +use std::fs::File; use std::io::Write; use std::sync::Arc; use std::time::Instant; @@ -52,71 +52,97 @@ impl FromRef for Arc> { } } -// Command line arguments to PocketIC service +// Command line arguments to PocketIC service. #[derive(Parser)] struct Args { #[clap(long)] - lock_file: String, + pid: u32, } #[tokio::main] async fn main() { let args = Args::parse(); - let file_opts = FileOptions::new().write(true).create(true); - let mut lock_file = FileLock::lock(args.lock_file, false, file_opts) - .expect("Failed to acquire lock on port file. Shutting down."); + let port_file_path = std::env::temp_dir().join(format!("pocket_ic_{}.port", args.pid)); + let ready_file_path = std::env::temp_dir().join(format!("pocket_ic_{}.ready", args.pid)); - // The shared, mutable state of the PocketIC process. - let instance_map: InstanceMap = Arc::new(RwLock::new(HashMap::new())); - // A time-to-live mechanism: Requests bump this value, and the server - // gracefully shuts down when the value wasn't bumped for a while - // TODO: Implement ttl increase for every handler. - let last_request = Arc::new(RwLock::new(Instant::now())); - let app_state = AppState { - instance_map, - last_request, - }; + let port_file = File::options() + .read(true) + .write(true) + .create_new(true) + .open(&port_file_path); - let app = Router::new() - // - // Get health of service. - .route("/status", get(status)) - // - // List all IC instances. - .route("/instance", get(list_instances)) - // - // Create a new IC instance. Returns an InstanceId. - // Body is currently ignored. - .route("/instance", post(create_instance)) - // - // Call the specified IC instance. - // Body contains a Request. - // Returns the IC's Response. - .route("/instance/:id", post(call_instance)) - .with_state(app_state.clone()); + // .create_new(true) ensures atomically that this file was created newly, and gives an error otherwise. + if let Ok(mut new_port_file) = port_file { + // This process is the one to start PocketIC. + println!("New PocketIC will be started"); + // The shared, mutable state of the PocketIC process. + let instance_map: InstanceMap = Arc::new(RwLock::new(HashMap::new())); + // A time-to-live mechanism: Requests bump this value, and the server + // gracefully shuts down when the value wasn't bumped for a while + // TODO: Implement ttl increase for every handler. + let last_request = Arc::new(RwLock::new(Instant::now())); + let app_state = AppState { + instance_map, + last_request, + }; - // bind to port 0; the OS will give a specific port; communicate that to parent process via stdout - let server = Server::bind(&"127.0.0.1:0".parse().expect("Failed to parse address")) - .serve(app.into_make_service()); - let real_port = server.local_addr().port(); - let _ = lock_file.file.write_all(real_port.to_string().as_bytes()); - let _ = lock_file.file.flush(); + let app = Router::new() + // + // Get health of service. + .route("/status", get(status)) + // + // List all IC instances. + .route("/instance", get(list_instances)) + // + // Create a new IC instance. Returns an InstanceId. + // Body is currently ignored. + .route("/instance", post(create_instance)) + // + // Call the specified IC instance. + // Body contains a Request. + // Returns the IC's Response. + .route("/instance/:id", post(call_instance)) + .with_state(app_state.clone()); - // This is a safeguard against orphaning this child process. - let shutdown_signal = async { - loop { - let guard = app_state.last_request.read().await; - // TODO: implement ttl increase for every handler. - if guard.elapsed() > Duration::from_secs(TTL_SEC) { - break; - } - drop(guard); - tokio::time::sleep(Duration::from_millis(1000)).await; + // bind to port 0; the OS will give a specific port; communicate that to parent process via stdout + let server = Server::bind(&"127.0.0.1:0".parse().expect("Failed to parse address")) + .serve(app.into_make_service()); + let real_port = server.local_addr().port(); + let _ = new_port_file.write_all(real_port.to_string().as_bytes()); + let _ = new_port_file.flush(); + + let ready_file = File::options() + .read(true) + .write(true) + .create_new(true) + .open(&ready_file_path); + if ready_file.is_ok() { + println!("The PocketIC backend port can now be safely read by others"); + } else { + eprintln!("The .ready file already exists; This should not happen unless the PID has been reused, and/or the tmp dir has not been properly cleaned up"); } - eprintln!("PocketIC process will exit."); - }; - let server = server.with_graceful_shutdown(shutdown_signal); - server.await.expect("Failed to launch pocketIC."); + + // This is a safeguard against orphaning this child process. + let shutdown_signal = async { + loop { + let guard = app_state.last_request.read().await; + // TODO: implement ttl increase for every handler. + if guard.elapsed() > Duration::from_secs(TTL_SEC) { + break; + } + drop(guard); + tokio::time::sleep(Duration::from_secs(1)).await; + } + println!("PocketIC process will exit"); + // Clean up tmpfiles. + let _ = std::fs::remove_file(ready_file_path); + let _ = std::fs::remove_file(port_file_path); + }; + let server = server.with_graceful_shutdown(shutdown_signal); + server.await.expect("Failed to launch PocketIC"); + } else { + println!("PocketIC will be reused"); + } } async fn status() -> StatusCode {