From 363b3e25dd6b39f4552de83977ae3f51b4e64863 Mon Sep 17 00:00:00 2001 From: Gary McElroy Date: Wed, 24 Jan 2024 17:23:58 +0000 Subject: [PATCH] fix: NODE-1238 - Retry `ethtool` parsing --- Cargo.lock | 1 + rs/ic_os/network/BUILD.bazel | 1 + rs/ic_os/network/Cargo.toml | 1 + rs/ic_os/network/src/interfaces.rs | 204 +++++++++++++++++------------ rs/ic_os/utils/src/lib.rs | 35 +++++ 5 files changed, 160 insertions(+), 82 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ab8c8f33326..3f12ced10b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14126,6 +14126,7 @@ dependencies = [ "config", "hex", "ping", + "rayon", "regex", "serde", "serde_json", diff --git a/rs/ic_os/network/BUILD.bazel b/rs/ic_os/network/BUILD.bazel index 3d96c28a7d8..911d165299c 100644 --- a/rs/ic_os/network/BUILD.bazel +++ b/rs/ic_os/network/BUILD.bazel @@ -8,6 +8,7 @@ DEPENDENCIES = [ "@crate_index//:anyhow", "@crate_index//:hex", "@crate_index//:ping", + "@crate_index//:rayon", "@crate_index//:regex", "@crate_index//:serde", "@crate_index//:serde_json", diff --git a/rs/ic_os/network/Cargo.toml b/rs/ic_os/network/Cargo.toml index 9385c1e88cf..42b7dacb4fd 100644 --- a/rs/ic_os/network/Cargo.toml +++ b/rs/ic_os/network/Cargo.toml @@ -9,6 +9,7 @@ utils = {path = "../utils"} anyhow = {version = "^1"} hex = {version = "^0.4.3"} ping = {version = "^0.5.0"} +rayon = {version = "^1.5.1"} regex = {version = "^1.3.9"} serde = {version = "^1.0.171"} serde_json = {version = "^1.0.40"} diff --git a/rs/ic_os/network/src/interfaces.rs b/rs/ic_os/network/src/interfaces.rs index 235f9fa2255..1a0e79440dc 100644 --- a/rs/ic_os/network/src/interfaces.rs +++ b/rs/ic_os/network/src/interfaces.rs @@ -7,8 +7,9 @@ use std::vec::Vec; use anyhow::{Context, Result}; use ping::dgramsock; +use rayon::prelude::*; -use utils::get_command_stdout; +use utils::{get_command_stdout, retry, retry_pred}; static SYSFS_NETWORK_DIR: &str = "/sys/class/net"; @@ -18,23 +19,6 @@ pub struct Interface { pub speed_mbps: Option, } -fn retry(attempts: usize, mut f: F) -> Result -where - F: FnMut() -> Result, -{ - let mut attempt = 1; - loop { - match f() { - Ok(result) => return Ok(result), - Err(_) if attempt < attempts => { - eprintln!("Failed, trying again..."); - attempt += 1; - } - Err(err) => return Err(err), - } - } -} - pub fn has_ipv6_connectivity( interface: &Interface, generated_ipv6: &Ipv6Addr, @@ -57,19 +41,23 @@ pub fn has_ipv6_connectivity( get_command_stdout("ip", ["addr", "add", &ip, "dev", &interface.name])?; activate_link(&interface.name)?; - let wait_duration_secs = Duration::from_secs(2); + let wait_time = Duration::from_secs(2); let ping_target = ping_target.parse::()?; let ping_timeout = Duration::from_secs(3); - let result = retry(10, || { - eprintln!( - "Waiting {} seconds and attempting to ping {}", - wait_duration_secs.as_secs(), - ping_target - ); - sleep(wait_duration_secs); - dgramsock::ping(ping_target, Some(ping_timeout), None, None, None, None) - .context("Ping failed.") - }); + let result = retry( + 10, + || { + eprintln!( + "Attempting to ping {}, after {} seconds", + ping_target, + wait_time.as_secs() + ); + dgramsock::ping(ping_target, Some(ping_timeout), None, None, None, None) + .context("Ping failed.") + }, + wait_time, + ); + if result.is_err() { eprintln!("Failed to ping from configured interface."); interface_down_func()?; @@ -92,41 +80,101 @@ pub fn get_interface_name(interface_path: &PathBuf) -> Result { } pub fn has_ipv4_connectivity() -> bool { - let wait_duration_secs: Duration = Duration::from_secs(2); + let wait_time: Duration = Duration::from_secs(2); let dns_servers = ["1.1.1.1", "1.0.0.1"]; - let result = retry(10, || { - eprintln!( + let result = retry( + 10, + || { + eprintln!( "Waiting for {} seconds before attempting a TCP connection with the following DNS servers: {:?}", - wait_duration_secs.as_secs(), + wait_time.as_secs(), dns_servers ); - sleep(wait_duration_secs); - - for &server in &dns_servers { - let connection_target_with_port = format!("{}:80", server); - match TcpStream::connect(&connection_target_with_port) { - Ok(_) => { - eprintln!("Successfully connected to {}", connection_target_with_port); - return Ok(true); - } - Err(e) => { - eprintln!( - "Failed to connect to {}: {}", - connection_target_with_port, e - ); + + for &server in &dns_servers { + let connection_target_with_port = format!("{}:80", server); + match TcpStream::connect(&connection_target_with_port) { + Ok(_) => { + eprintln!("Successfully connected to {}", connection_target_with_port); + return Ok(true); + } + Err(e) => { + eprintln!( + "Failed to connect to {}: {}", + connection_target_with_port, e + ); + } } } - } - Err(anyhow::Error::msg("All connection attempts failed")) - }); + Err(anyhow::Error::msg("All connection attempts failed")) + }, + wait_time, + ); matches!(result, Ok(true)) } +fn qualify_and_generate_interface(interface_name: &str) -> Result> { + let ethtool_output = get_command_stdout("ethtool", [interface_name])?; + let link_is_up = is_link_up_from_ethool_output(ðtool_output)?; + + if !link_is_up { + return Ok(None); + } + + let speed = get_speed_from_ethtool_output(ðtool_output); + + Ok(Some(Interface { + name: interface_name.to_string(), + speed_mbps: speed, + })) +} + +fn is_some_or_err(r: &Result>) -> bool { + matches!(r, Ok(Some(_)) | Err(_)) +} + +fn qualify_and_generate_interfaces(interface_names: &[&str]) -> Result> { + // On some hardware ethtool needs time before link status settles. + // Takes 2.3 seconds for recent NP. + // Wait a total of ten seconds across all interfaces, retrying each 2 seconds. + let mut result_vec: Vec = Vec::new(); + let wait_time = Duration::from_secs(2); + let interface_results: Vec>> = interface_names + .par_iter() + .map(|i| { + retry_pred( + 10, + || qualify_and_generate_interface(i), + is_some_or_err, + |_| sleep(wait_time), + ) + }) + .collect(); + + for (name, result) in std::iter::zip(interface_names, interface_results) { + eprintln!("Interface name: {name}"); + match result { + Ok(Some(interface)) => { + eprintln!( + "Cable is ATTACHED. Speed (mbps) detected: {}", + match &interface.speed_mbps { + Some(s) => s.to_string(), + None => "None".to_string(), + } + ); + result_vec.push(interface); + } + Ok(None) => eprintln!("Cable is NOT ATTACHED"), + Err(e) => eprintln!("ERROR: {:#?}", e), + } + } + Ok(result_vec) +} + /// Return vec of Interface's which: /// Have physical links attached /// Do not contain the string 'virtual' pub fn get_interfaces() -> Result> { - let mut result: Vec = Vec::new(); let interfaces = get_interface_paths(); eprintln!("Found raw network interfaces: {:?}", interfaces); @@ -137,39 +185,31 @@ pub fn get_interfaces() -> Result> { .collect(); eprintln!("Found valid network interfaces: {:?}", valid_interfaces); - eprintln!("Gathering info about each interface:"); - for interface_path in valid_interfaces { - let name = get_interface_name(interface_path)?; - eprintln!("Interface name: {:?}", name); - - // Activate the link to see physical cable connectivity. Then deactivate - activate_link(&name)?; - let ethtool_output = get_command_stdout("ethtool", [name.as_str()])?; - let link_result = is_link_up_from_ethool_output(ðtool_output); - deactivate_link(&name)?; - - let link_success = link_result?; - if !link_success { - eprintln!("Network cable seems NOT attached. Discarding interface."); - continue; - } else { - eprintln!("Network cable attached."); - } - - let speed = get_speed_from_ethtool_output(ðtool_output); - eprintln!( - "Detected mb/s: {:?}", - match speed { - Some(s) => s.to_string(), - None => "None".to_string(), - } - ); - result.push(Interface { - name, - speed_mbps: speed, - }); + let interface_names = valid_interfaces + .into_iter() + .map(get_interface_name) + .collect::, _>>()?; + + eprintln!("Activating each interface"); + for name in interface_names.iter() { + // Activate the link to see physical cable connectivity. + // Deactivate in the next step. + // If result is error, return. That's an unrecoverable bigger problem. + activate_link(name).context("Error activating interface link!")?; } + + let result = qualify_and_generate_interfaces( + &interface_names + .iter() + .map(AsRef::as_ref) + .collect::>(), + )?; eprintln!("Proceeding with interfaces: {:?}", result); + + for name in interface_names.iter() { + deactivate_link(name).context("Error deactivating interface links!")?; + } + Ok(result) } diff --git a/rs/ic_os/utils/src/lib.rs b/rs/ic_os/utils/src/lib.rs index fda3a348f7e..5e403788d84 100644 --- a/rs/ic_os/utils/src/lib.rs +++ b/rs/ic_os/utils/src/lib.rs @@ -54,6 +54,41 @@ pub fn intersperse(source: &str, to_inject: char, spacing: usize) -> String { result } +// Retry the given function `f` until either: +// * f has been called `attempts` times +// * `stop_pred` returns true when passed the result of `f()` +// `wait_func` is called before each attempt. +// The result returned is either the one held after `stop_pred` returns true or the one held after `attempts` has been breached. +pub fn retry_pred(attempts: usize, f: F, stop_pred: P, wait_func: W) -> Result +where + F: Fn() -> Result, + P: Fn(&Result) -> bool, + W: Fn(usize), +{ + for attempt in 1..attempts - 1 { + // Final attempt is last line of function + wait_func(attempt); + let result = f(); + if stop_pred(&result) { + return result; + } + } + f() +} + +// Retry until `f` returns ok() or has been called `attempts` times +pub fn retry(attempts: usize, f: F, wait: std::time::Duration) -> Result +where + F: Fn() -> Result, +{ + retry_pred( + attempts, + f, + |result| result.is_ok(), + |_| std::thread::sleep(wait), + ) +} + #[cfg(test)] pub mod tests { use super::*;