Skip to content

Commit

Permalink
Merge branch 'elias/pic-locking-fix' into 'master'
Browse files Browse the repository at this point in the history
feat(VER-2430): Fix PocketIC daemon startup

Make sure only one PocketIC instance is running 

See merge request dfinity-lab/public/ic!14000
  • Loading branch information
fxgst committed Aug 8, 2023
2 parents 27ee689 + 7b4d1be commit 99d52f9
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 97 deletions.
74 changes: 30 additions & 44 deletions packages/pocket-ic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,6 @@ const POCKET_IC_BIN_PATH: &str = "../../target/debug/pocket-ic-backend";

type InstanceId = String;

fn get_service_port(lock_file: PathBuf, bin_path: PathBuf) -> u16 {
let start = Instant::now();
loop {
match lock_file.try_exists() {
Ok(true) => {
// If the file exists, we expect it to have content, otherwise we need not continue.
let line = std::fs::read_to_string(lock_file)
.expect("Failed to read port from lock file.");
return line.parse().expect("Failed to parse port to number");
}
_ => {
println!("Attempting to start PocketIC server...");
let _process = Command::new(bin_path.clone())
.arg("--lock-file")
.arg(lock_file.clone().as_os_str())
.spawn()
.unwrap_or_else(|_| {
panic!(
"Failed to launch PocketIC server process from bin_path {:?}",
bin_path
)
});
std::thread::sleep(Duration::from_millis(20));
// Now, try again. the child process will try to acquire a lock on the lock_file,
// so even though we may launch a second child process, it will shutdown immediately
// if it fails to get the lock.
}
}
if start.elapsed() > Duration::from_secs(5) {
panic!("Failed to start PocketIC service in time.");
}
}
}

// ======================================================================================================
// Code borrowed from https://github.com/dfinity/test-state-machine-client/blob/main/src/lib.rs
// The StateMachine struct is renamed to `PocketIc` and given new interface.
Expand All @@ -67,8 +33,16 @@ pub struct PocketIc {

impl PocketIc {
pub fn new() -> Self {
// Attempt to start new PocketIC backend if it's not already running.
let parent_pid = std::os::unix::process::parent_id();
Command::new(PathBuf::from(POCKET_IC_BIN_PATH))
.arg("--pid")
.arg(parent_pid.to_string())
.spawn()
.expect("Failed to start PocketIC binary");
// Use the parent process ID to find the PocketIC backend port for this `cargo test` run.
let daemon_url = Self::get_daemon_url(parent_pid);
let reqwest_client = reqwest::blocking::Client::new();
let daemon_url = Self::start_or_reuse_daemon();
let instance_id = reqwest_client
.post(daemon_url.join("instance").unwrap())
.send()
Expand All @@ -90,15 +64,27 @@ impl PocketIc {
}
}

fn start_or_reuse_daemon() -> Url {
// use the parent process ID, so that we have one PocketIC per `cargo test` invocation
let ppid = std::os::unix::process::parent_id();
let lock_file = std::env::temp_dir().join(format!("pocket_ic.{}.port", ppid));
let port = get_service_port(lock_file, PathBuf::from(POCKET_IC_BIN_PATH));
let url =
Url::parse(&format!("http://{}:{}/", LOCALHOST, port)).expect("Failed to parse url.");
println!("Found PocketIC server running on {}", &url);
url
fn get_daemon_url(parent_pid: u32) -> Url {
let port_file_path = std::env::temp_dir().join(format!("pocket_ic_{}.port", parent_pid));
let ready_file_path = std::env::temp_dir().join(format!("pocket_ic_{}.ready", parent_pid));
let start = Instant::now();
loop {
match ready_file_path.try_exists() {
Ok(true) => {
let port_string = std::fs::read_to_string(port_file_path)
.expect("Failed to read port from port file");
let port: u16 = port_string.parse().expect("Failed to parse port to number");
let daemon_url =
Url::parse(&format!("http://{}:{}/", LOCALHOST, port)).unwrap();
println!("Found PocketIC running at {}", daemon_url);
return daemon_url;
}
_ => std::thread::sleep(Duration::from_millis(20)),
}
if start.elapsed() > Duration::from_secs(5) {
panic!("Failed to start PocketIC service in time");
}
}
}

pub fn list_instances(&self) -> Vec<InstanceId> {
Expand Down
132 changes: 79 additions & 53 deletions rs/pocket_ic_backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use axum::extract::State;
use axum::routing::post;
use axum::{extract::Path, http::StatusCode, routing::get, Router, Server};
use clap::Parser;
use file_lock::{FileLock, FileOptions};
use ic_config::execution_environment;
use ic_config::subnet_config::SubnetConfig;
use ic_crypto::threshold_sig_public_key_to_der;
Expand All @@ -17,6 +16,7 @@ use itertools::Itertools;
use pocket_ic::{CanisterCall, RawCanisterId, Request, Request::*};
use serde::Serialize;
use std::collections::HashMap;
use std::fs::File;
use std::io::Write;
use std::sync::Arc;
use std::time::Instant;
Expand Down Expand Up @@ -52,71 +52,97 @@ impl FromRef<AppState> for Arc<RwLock<Instant>> {
}
}

// Command line arguments to PocketIC service
// Command line arguments to PocketIC service.
#[derive(Parser)]
struct Args {
#[clap(long)]
lock_file: String,
pid: u32,
}

#[tokio::main]
async fn main() {
let args = Args::parse();
let file_opts = FileOptions::new().write(true).create(true);
let mut lock_file = FileLock::lock(args.lock_file, false, file_opts)
.expect("Failed to acquire lock on port file. Shutting down.");
let port_file_path = std::env::temp_dir().join(format!("pocket_ic_{}.port", args.pid));
let ready_file_path = std::env::temp_dir().join(format!("pocket_ic_{}.ready", args.pid));

// The shared, mutable state of the PocketIC process.
let instance_map: InstanceMap = Arc::new(RwLock::new(HashMap::new()));
// A time-to-live mechanism: Requests bump this value, and the server
// gracefully shuts down when the value wasn't bumped for a while
// TODO: Implement ttl increase for every handler.
let last_request = Arc::new(RwLock::new(Instant::now()));
let app_state = AppState {
instance_map,
last_request,
};
let port_file = File::options()
.read(true)
.write(true)
.create_new(true)
.open(&port_file_path);

let app = Router::new()
//
// Get health of service.
.route("/status", get(status))
//
// List all IC instances.
.route("/instance", get(list_instances))
//
// Create a new IC instance. Returns an InstanceId.
// Body is currently ignored.
.route("/instance", post(create_instance))
//
// Call the specified IC instance.
// Body contains a Request.
// Returns the IC's Response.
.route("/instance/:id", post(call_instance))
.with_state(app_state.clone());
// .create_new(true) ensures atomically that this file was created newly, and gives an error otherwise.
if let Ok(mut new_port_file) = port_file {
// This process is the one to start PocketIC.
println!("New PocketIC will be started");
// The shared, mutable state of the PocketIC process.
let instance_map: InstanceMap = Arc::new(RwLock::new(HashMap::new()));
// A time-to-live mechanism: Requests bump this value, and the server
// gracefully shuts down when the value wasn't bumped for a while
// TODO: Implement ttl increase for every handler.
let last_request = Arc::new(RwLock::new(Instant::now()));
let app_state = AppState {
instance_map,
last_request,
};

// bind to port 0; the OS will give a specific port; communicate that to parent process via stdout
let server = Server::bind(&"127.0.0.1:0".parse().expect("Failed to parse address"))
.serve(app.into_make_service());
let real_port = server.local_addr().port();
let _ = lock_file.file.write_all(real_port.to_string().as_bytes());
let _ = lock_file.file.flush();
let app = Router::new()
//
// Get health of service.
.route("/status", get(status))
//
// List all IC instances.
.route("/instance", get(list_instances))
//
// Create a new IC instance. Returns an InstanceId.
// Body is currently ignored.
.route("/instance", post(create_instance))
//
// Call the specified IC instance.
// Body contains a Request.
// Returns the IC's Response.
.route("/instance/:id", post(call_instance))
.with_state(app_state.clone());

// This is a safeguard against orphaning this child process.
let shutdown_signal = async {
loop {
let guard = app_state.last_request.read().await;
// TODO: implement ttl increase for every handler.
if guard.elapsed() > Duration::from_secs(TTL_SEC) {
break;
}
drop(guard);
tokio::time::sleep(Duration::from_millis(1000)).await;
// bind to port 0; the OS will give a specific port; communicate that to parent process via stdout
let server = Server::bind(&"127.0.0.1:0".parse().expect("Failed to parse address"))
.serve(app.into_make_service());
let real_port = server.local_addr().port();
let _ = new_port_file.write_all(real_port.to_string().as_bytes());
let _ = new_port_file.flush();

let ready_file = File::options()
.read(true)
.write(true)
.create_new(true)
.open(&ready_file_path);
if ready_file.is_ok() {
println!("The PocketIC backend port can now be safely read by others");
} else {
eprintln!("The .ready file already exists; This should not happen unless the PID has been reused, and/or the tmp dir has not been properly cleaned up");
}
eprintln!("PocketIC process will exit.");
};
let server = server.with_graceful_shutdown(shutdown_signal);
server.await.expect("Failed to launch pocketIC.");

// This is a safeguard against orphaning this child process.
let shutdown_signal = async {
loop {
let guard = app_state.last_request.read().await;
// TODO: implement ttl increase for every handler.
if guard.elapsed() > Duration::from_secs(TTL_SEC) {
break;
}
drop(guard);
tokio::time::sleep(Duration::from_secs(1)).await;
}
println!("PocketIC process will exit");
// Clean up tmpfiles.
let _ = std::fs::remove_file(ready_file_path);
let _ = std::fs::remove_file(port_file_path);
};
let server = server.with_graceful_shutdown(shutdown_signal);
server.await.expect("Failed to launch PocketIC");
} else {
println!("PocketIC will be reused");
}
}

async fn status() -> StatusCode {
Expand Down

0 comments on commit 99d52f9

Please sign in to comment.