Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ base64 = "0.21"
clap = { version = "4.4", features = ["cargo", "string"] }
dirs = "5.0"
futures-util = "0.3"
nix = { version = "0.27", features = ["term"] }
nix = { version = "0.27", features = ["process", "signal", "term"] }
reqwest = { version = "0.11", features = ["json"] }
rmp-serde = "1.1.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "1.28", features = ["fs", "macros", "rt-multi-thread", "sync"] }
tokio = { version = "1.28", features = ["fs", "macros", "rt-multi-thread", "signal", "sync", "time"] }
tokio-tungstenite = "*"
toml = "0.8"
uqbar_process_lib = { git = "ssh://git@github.com/uqbar-dao/process_lib.git", rev = "77ebb26" }
Expand Down
71 changes: 51 additions & 20 deletions src/boot_fake_node/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::io::Read;
use std::cell::RefCell;
use std::{fs, io, thread, time};
use std::os::fd::AsRawFd;
use std::os::unix::fs::PermissionsExt;
use std::os::unix::io::{FromRawFd, OwnedFd};
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
use std::rc::Rc;
use std::sync::Arc;
use zip::read::ZipArchive;

use tokio::sync::Mutex;

use super::build;
use super::run_tests::cleanup::{cleanup, cleanup_on_signal};
use super::run_tests::network_router;
use super::run_tests::types::*;

Expand Down Expand Up @@ -218,6 +220,7 @@ pub async fn execute(
password: &str,
mut args: Vec<&str>,
) -> anyhow::Result<()> {
let detached = false; // TODO: to argument?
// TODO: factor out with run_tests?
let runtime_path = match runtime_path {
None => get_runtime_binary(&version).await?,
Expand All @@ -239,44 +242,72 @@ pub async fn execute(
},
};

let (send_to_kill_router, recv_kill_in_router) = tokio::sync::mpsc::unbounded_channel();
tokio::task::spawn(network_router::execute(
network_router_port.clone(),
NetworkRouterDefects::None,
recv_kill_in_router,
));
let mut task_handles = Vec::new();

thread::sleep(time::Duration::from_secs(1));
let node_cleanup_infos = Arc::new(Mutex::new(Vec::new()));

let nodes = Rc::new(RefCell::new(Vec::new()));
let _cleanup_context = CleanupContext::new(Rc::clone(&nodes), send_to_kill_router);
let (send_to_cleanup, recv_in_cleanup) = tokio::sync::mpsc::unbounded_channel();
let (send_to_kill, _recv_kill) = tokio::sync::broadcast::channel(1);
let recv_kill_in_cos = send_to_kill.subscribe();
let recv_kill_in_router = send_to_kill.subscribe();

let node_cleanup_infos_for_cleanup = Arc::clone(&node_cleanup_infos);
let handle = tokio::spawn(cleanup(
recv_in_cleanup,
send_to_kill,
node_cleanup_infos_for_cleanup,
None,
detached,
));
task_handles.push(handle);
let send_to_cleanup_for_signal = send_to_cleanup.clone();
let handle = tokio::spawn(cleanup_on_signal(send_to_cleanup_for_signal, recv_kill_in_cos));
task_handles.push(handle);
let send_to_cleanup_for_cleanup = send_to_cleanup.clone();
let _cleanup_context = CleanupContext::new(send_to_cleanup_for_cleanup);

let network_router_port_for_router = network_router_port.clone();
let handle = tokio::spawn(async move {
let _ = network_router::execute(
network_router_port_for_router,
NetworkRouterDefects::None,
recv_kill_in_router,
).await;
});
task_handles.push(handle);

// TODO: can remove?
thread::sleep(time::Duration::from_secs(1));

if let Some(ref rpc) = rpc {
args.extend_from_slice(&["--rpc", rpc]);
};
args.extend_from_slice(&["--fake-node-name", fake_node_name]);
args.extend_from_slice(&["--password", password]);

let (runtime_process, master_fd) = run_runtime(
let (mut runtime_process, master_fd) = run_runtime(
&runtime_path,
&node_home,
node_port,
network_router_port,
&args[..],
true,
false,
detached,
)?;

let node_info = NodeInfo {
process_handle: runtime_process,
let mut node_cleanup_infos = node_cleanup_infos.lock().await;
node_cleanup_infos.push(NodeCleanupInfo {
master_fd,
port: node_port,
process_id: runtime_process.id() as i32,
home: node_home.clone(),
};
});
drop(node_cleanup_infos);

nodes.borrow_mut().push(node_info);

nodes.borrow_mut()[0].process_handle.wait().unwrap();
runtime_process.wait().unwrap();
let _ = send_to_cleanup.send(true);
for handle in task_handles {
handle.await.unwrap();
}

Ok(())
}
88 changes: 88 additions & 0 deletions src/run_tests/cleanup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::fs;
use std::os::fd::AsRawFd;

use tokio::signal::unix::{signal, SignalKind};

use crate::run_tests::types::{BroadcastRecvBool, BroadcastSendBool, NodeCleanupInfo, NodeCleanupInfos, NodeHandles, RecvBool, SendBool};

/// trigger cleanup if receive signal to kill process
pub async fn cleanup_on_signal(
send_to_cleanup: SendBool,
mut recv_kill_in_cos: BroadcastRecvBool,
) {
let mut sigalrm = signal(SignalKind::alarm()).expect("uqdev run-tests: failed to set up SIGALRM handler");
let mut sighup = signal(SignalKind::hangup()).expect("uqdev run-tests: failed to set up SIGHUP handler");
let mut sigint = signal(SignalKind::interrupt()).expect("uqdev run-tests: failed to set up SIGINT handler");
let mut sigpipe = signal(SignalKind::pipe()).expect("uqdev run-tests: failed to set up SIGPIPE handler");
let mut sigquit = signal(SignalKind::quit()).expect("uqdev run-tests: failed to set up SIGQUIT handler");
let mut sigterm = signal(SignalKind::terminate()).expect("uqdev run-tests: failed to set up SIGTERM handler");
let mut sigusr1 = signal(SignalKind::user_defined1()).expect("uqdev run-tests: failed to set up SIGUSR1 handler");
let mut sigusr2 = signal(SignalKind::user_defined2()).expect("uqdev run-tests: failed to set up SIGUSR2 handler");

tokio::select! {
_ = sigalrm.recv() => println!("uqdev cleanup got SIGALRM\r"),
_ = sighup.recv() => println!("uqdev cleanup got SIGHUP\r"),
_ = sigint.recv() => println!("uqdev cleanup got SIGINT\r"),
_ = sigpipe.recv() => println!("uqdev cleanup got SIGPIPE\r"),
_ = sigquit.recv() => println!("uqdev cleanup got SIGQUIT\r"),
_ = sigterm.recv() => println!("uqdev cleanup got SIGTERM\r"),
_ = sigusr1.recv() => println!("uqdev cleanup got SIGUSR1\r"),
_ = sigusr2.recv() => println!("uqdev cleanup got SIGUSR2\r"),
_ = recv_kill_in_cos.recv() => {},
}

let _ = send_to_cleanup.send(true);
}

pub async fn cleanup(
mut recv_in_cleanup: RecvBool,
send_to_kill: BroadcastSendBool,
node_cleanup_infos: NodeCleanupInfos,
node_handles: Option<NodeHandles>,
detached: bool,
) {
// Block until get cleanup request.
recv_in_cleanup.recv().await;

let mut node_cleanup_infos = node_cleanup_infos.lock().await;

for (i, NodeCleanupInfo { master_fd, process_id, home }) in node_cleanup_infos.iter_mut().enumerate() {
// Send Ctrl-C to the process
println!("Cleaning up {:?}...\r", home);
if detached {
// 231222 Note: I (hf) tried to use the `else` method for
// both detached and non-detached processes and found it
// did not work properly for detached processes; specifically
// for `run-tests` that exited early by, e.g., a user input
// Ctrl+C.
nix::unistd::write(master_fd.as_raw_fd(), b"\x03").unwrap();
} else {
let pid = nix::unistd::Pid::from_raw(*process_id);
match nix::sys::wait::waitpid(pid, Some(nix::sys::wait::WaitPidFlag::WNOHANG)) {
Ok(nix::sys::wait::WaitStatus::StillAlive) |
Ok(nix::sys::wait::WaitStatus::Stopped(_, _)) |
Ok(nix::sys::wait::WaitStatus::Continued(_)) => {
nix::sys::signal::kill(pid, nix::sys::signal::Signal::SIGINT)
.expect("SIGINT failed");
}
_ => {}
}
}

if let Some(ref node_handles) = node_handles {
let mut nh = node_handles.lock().await;
nh[i].wait().unwrap();
}

if home.exists() {
for dir in &["kernel", "kv", "sqlite", "vfs"] {
let dir = home.join(dir);
if dir.exists() {
fs::remove_dir_all(&home.join(dir)).unwrap();
}
}
}
println!("Done cleaning up {:?}.\r", home);
}
let _ = send_to_kill.send(true);
}
Loading