Skip to content

Commit

Permalink
fix: NODE-1238 - Retry ethtool parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
garym-dfinity committed Jan 24, 2024
1 parent 8474c83 commit 363b3e2
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 82 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rs/ic_os/network/BUILD.bazel
Expand Up @@ -8,6 +8,7 @@ DEPENDENCIES = [
"@crate_index//:anyhow",
"@crate_index//:hex",
"@crate_index//:ping",
"@crate_index//:rayon",
"@crate_index//:regex",
"@crate_index//:serde",
"@crate_index//:serde_json",
Expand Down
1 change: 1 addition & 0 deletions rs/ic_os/network/Cargo.toml
Expand Up @@ -9,6 +9,7 @@ utils = {path = "../utils"}
anyhow = {version = "^1"}
hex = {version = "^0.4.3"}
ping = {version = "^0.5.0"}
rayon = {version = "^1.5.1"}
regex = {version = "^1.3.9"}
serde = {version = "^1.0.171"}
serde_json = {version = "^1.0.40"}
Expand Down
204 changes: 122 additions & 82 deletions rs/ic_os/network/src/interfaces.rs
Expand Up @@ -7,8 +7,9 @@ use std::vec::Vec;

use anyhow::{Context, Result};
use ping::dgramsock;
use rayon::prelude::*;

use utils::get_command_stdout;
use utils::{get_command_stdout, retry, retry_pred};

static SYSFS_NETWORK_DIR: &str = "/sys/class/net";

Expand All @@ -18,23 +19,6 @@ pub struct Interface {
pub speed_mbps: Option<u64>,
}

fn retry<F, T>(attempts: usize, mut f: F) -> Result<T>
where
F: FnMut() -> Result<T, anyhow::Error>,
{
let mut attempt = 1;
loop {
match f() {
Ok(result) => return Ok(result),
Err(_) if attempt < attempts => {
eprintln!("Failed, trying again...");
attempt += 1;
}
Err(err) => return Err(err),
}
}
}

pub fn has_ipv6_connectivity(
interface: &Interface,
generated_ipv6: &Ipv6Addr,
Expand All @@ -57,19 +41,23 @@ pub fn has_ipv6_connectivity(
get_command_stdout("ip", ["addr", "add", &ip, "dev", &interface.name])?;
activate_link(&interface.name)?;

let wait_duration_secs = Duration::from_secs(2);
let wait_time = Duration::from_secs(2);
let ping_target = ping_target.parse::<IpAddr>()?;
let ping_timeout = Duration::from_secs(3);
let result = retry(10, || {
eprintln!(
"Waiting {} seconds and attempting to ping {}",
wait_duration_secs.as_secs(),
ping_target
);
sleep(wait_duration_secs);
dgramsock::ping(ping_target, Some(ping_timeout), None, None, None, None)
.context("Ping failed.")
});
let result = retry(
10,
|| {
eprintln!(
"Attempting to ping {}, after {} seconds",
ping_target,
wait_time.as_secs()
);
dgramsock::ping(ping_target, Some(ping_timeout), None, None, None, None)
.context("Ping failed.")
},
wait_time,
);

if result.is_err() {
eprintln!("Failed to ping from configured interface.");
interface_down_func()?;
Expand All @@ -92,41 +80,101 @@ pub fn get_interface_name(interface_path: &PathBuf) -> Result<String> {
}

pub fn has_ipv4_connectivity() -> bool {
let wait_duration_secs: Duration = Duration::from_secs(2);
let wait_time: Duration = Duration::from_secs(2);
let dns_servers = ["1.1.1.1", "1.0.0.1"];
let result = retry(10, || {
eprintln!(
let result = retry(
10,
|| {
eprintln!(
"Waiting for {} seconds before attempting a TCP connection with the following DNS servers: {:?}",
wait_duration_secs.as_secs(),
wait_time.as_secs(),
dns_servers
);
sleep(wait_duration_secs);

for &server in &dns_servers {
let connection_target_with_port = format!("{}:80", server);
match TcpStream::connect(&connection_target_with_port) {
Ok(_) => {
eprintln!("Successfully connected to {}", connection_target_with_port);
return Ok(true);
}
Err(e) => {
eprintln!(
"Failed to connect to {}: {}",
connection_target_with_port, e
);

for &server in &dns_servers {
let connection_target_with_port = format!("{}:80", server);
match TcpStream::connect(&connection_target_with_port) {
Ok(_) => {
eprintln!("Successfully connected to {}", connection_target_with_port);
return Ok(true);
}
Err(e) => {
eprintln!(
"Failed to connect to {}: {}",
connection_target_with_port, e
);
}
}
}
}
Err(anyhow::Error::msg("All connection attempts failed"))
});
Err(anyhow::Error::msg("All connection attempts failed"))
},
wait_time,
);
matches!(result, Ok(true))
}

fn qualify_and_generate_interface(interface_name: &str) -> Result<Option<Interface>> {
let ethtool_output = get_command_stdout("ethtool", [interface_name])?;
let link_is_up = is_link_up_from_ethool_output(&ethtool_output)?;

if !link_is_up {
return Ok(None);
}

let speed = get_speed_from_ethtool_output(&ethtool_output);

Ok(Some(Interface {
name: interface_name.to_string(),
speed_mbps: speed,
}))
}

fn is_some_or_err<T>(r: &Result<Option<T>>) -> bool {
matches!(r, Ok(Some(_)) | Err(_))
}

fn qualify_and_generate_interfaces(interface_names: &[&str]) -> Result<Vec<Interface>> {
// On some hardware ethtool needs time before link status settles.
// Takes 2.3 seconds for recent NP.
// Wait a total of ten seconds across all interfaces, retrying each 2 seconds.
let mut result_vec: Vec<Interface> = Vec::new();
let wait_time = Duration::from_secs(2);
let interface_results: Vec<Result<Option<Interface>>> = interface_names
.par_iter()
.map(|i| {
retry_pred(
10,
|| qualify_and_generate_interface(i),
is_some_or_err,
|_| sleep(wait_time),
)
})
.collect();

for (name, result) in std::iter::zip(interface_names, interface_results) {
eprintln!("Interface name: {name}");
match result {
Ok(Some(interface)) => {
eprintln!(
"Cable is ATTACHED. Speed (mbps) detected: {}",
match &interface.speed_mbps {
Some(s) => s.to_string(),
None => "None".to_string(),
}
);
result_vec.push(interface);
}
Ok(None) => eprintln!("Cable is NOT ATTACHED"),
Err(e) => eprintln!("ERROR: {:#?}", e),
}
}
Ok(result_vec)
}

/// Return vec of Interface's which:
/// Have physical links attached
/// Do not contain the string 'virtual'
pub fn get_interfaces() -> Result<Vec<Interface>> {
let mut result: Vec<Interface> = Vec::new();
let interfaces = get_interface_paths();
eprintln!("Found raw network interfaces: {:?}", interfaces);

Expand All @@ -137,39 +185,31 @@ pub fn get_interfaces() -> Result<Vec<Interface>> {
.collect();
eprintln!("Found valid network interfaces: {:?}", valid_interfaces);

eprintln!("Gathering info about each interface:");
for interface_path in valid_interfaces {
let name = get_interface_name(interface_path)?;
eprintln!("Interface name: {:?}", name);

// Activate the link to see physical cable connectivity. Then deactivate
activate_link(&name)?;
let ethtool_output = get_command_stdout("ethtool", [name.as_str()])?;
let link_result = is_link_up_from_ethool_output(&ethtool_output);
deactivate_link(&name)?;

let link_success = link_result?;
if !link_success {
eprintln!("Network cable seems NOT attached. Discarding interface.");
continue;
} else {
eprintln!("Network cable attached.");
}

let speed = get_speed_from_ethtool_output(&ethtool_output);
eprintln!(
"Detected mb/s: {:?}",
match speed {
Some(s) => s.to_string(),
None => "None".to_string(),
}
);
result.push(Interface {
name,
speed_mbps: speed,
});
let interface_names = valid_interfaces
.into_iter()
.map(get_interface_name)
.collect::<Result<Vec<_>, _>>()?;

eprintln!("Activating each interface");
for name in interface_names.iter() {
// Activate the link to see physical cable connectivity.
// Deactivate in the next step.
// If result is error, return. That's an unrecoverable bigger problem.
activate_link(name).context("Error activating interface link!")?;
}

let result = qualify_and_generate_interfaces(
&interface_names
.iter()
.map(AsRef::as_ref)
.collect::<Vec<&str>>(),
)?;
eprintln!("Proceeding with interfaces: {:?}", result);

for name in interface_names.iter() {
deactivate_link(name).context("Error deactivating interface links!")?;
}

Ok(result)
}

Expand Down
35 changes: 35 additions & 0 deletions rs/ic_os/utils/src/lib.rs
Expand Up @@ -54,6 +54,41 @@ pub fn intersperse(source: &str, to_inject: char, spacing: usize) -> String {
result
}

// Retry the given function `f` until either:
// * f has been called `attempts` times
// * `stop_pred` returns true when passed the result of `f()`
// `wait_func` is called before each attempt.
// The result returned is either the one held after `stop_pred` returns true or the one held after `attempts` has been breached.
pub fn retry_pred<F, P, T, W>(attempts: usize, f: F, stop_pred: P, wait_func: W) -> Result<T>
where
F: Fn() -> Result<T, anyhow::Error>,
P: Fn(&Result<T>) -> bool,
W: Fn(usize),
{
for attempt in 1..attempts - 1 {
// Final attempt is last line of function
wait_func(attempt);
let result = f();
if stop_pred(&result) {
return result;
}
}
f()
}

// Retry until `f` returns ok() or has been called `attempts` times
pub fn retry<F, T>(attempts: usize, f: F, wait: std::time::Duration) -> Result<T>
where
F: Fn() -> Result<T, anyhow::Error>,
{
retry_pred(
attempts,
f,
|result| result.is_ok(),
|_| std::thread::sleep(wait),
)
}

#[cfg(test)]
pub mod tests {
use super::*;
Expand Down

0 comments on commit 363b3e2

Please sign in to comment.