From 2a957f42a549929e21c731b22512c1938443ad56 Mon Sep 17 00:00:00 2001 From: Maciej Skrzypkowski Date: Tue, 21 Apr 2026 16:39:01 +0200 Subject: [PATCH 1/6] test infra --- crates/cli/src/commands/test/infra.rs | 764 +++++++++++++++++++++- crates/cli/src/commands/test/mod.rs | 9 +- crates/cli/src/commands/test/speedtest.rs | 164 +++++ crates/cli/src/main.rs | 2 +- 4 files changed, 916 insertions(+), 23 deletions(-) create mode 100644 crates/cli/src/commands/test/speedtest.rs diff --git a/crates/cli/src/commands/test/infra.rs b/crates/cli/src/commands/test/infra.rs index 87a66f8d..7c0a8d78 100644 --- a/crates/cli/src/commands/test/infra.rs +++ b/crates/cli/src/commands/test/infra.rs @@ -1,9 +1,752 @@ //! Infrastructure and hardware tests. -use super::{TestConfigArgs, helpers::TestCategoryResult}; -use crate::error::Result; +use std::{ + io::{BufRead as _, BufReader, Write}, + path::{Path, PathBuf}, + time::{Duration, Instant}, +}; + use clap::Args; -use std::io::Write; +use serde::Deserialize; +use tokio_util::sync::CancellationToken; + +use super::{ + AllCategoriesResult, TestCaseName, TestCategory, TestCategoryResult, TestConfigArgs, + TestResult, TestVerdict, calculate_score, evaluate_rtt, filter_tests, + must_output_to_file_on_quiet, publish_result_to_obol_api, sort_tests, write_result_to_file, + write_result_to_writer, +}; +use crate::{ + duration::Duration as CliDuration, + error::{CliError, Result}, +}; + +const FIO_NOT_FOUND: &str = "fio command not found, install fio from https://fio.readthedocs.io/en/latest/fio_doc.html#binary-packages or using the package manager of your choice (apt, yum, brew, etc.)"; + +const DISK_OPS_NUM_OF_JOBS: u32 = 8; +const DISK_OPS_MBS_TOTAL: u32 = 4096; +const DISK_WRITE_SPEED_MBS_AVG: f64 = 1000.0; +const DISK_WRITE_SPEED_MBS_POOR: f64 = 500.0; +const DISK_WRITE_IOPS_AVG: f64 = 2000.0; +const DISK_WRITE_IOPS_POOR: f64 = 1000.0; +const DISK_READ_SPEED_MBS_AVG: f64 = 1000.0; +const DISK_READ_SPEED_MBS_POOR: f64 = 500.0; +const DISK_READ_IOPS_AVG: f64 = 2000.0; +const DISK_READ_IOPS_POOR: f64 = 1000.0; +const AVAILABLE_MEMORY_MBS_AVG: i64 = 4000; +const AVAILABLE_MEMORY_MBS_POOR: i64 = 2000; +const TOTAL_MEMORY_MBS_AVG: i64 = 8000; +const TOTAL_MEMORY_MBS_POOR: i64 = 4000; +const INTERNET_LATENCY_AVG: Duration = Duration::from_millis(20); +const INTERNET_LATENCY_POOR: Duration = Duration::from_millis(50); +const INTERNET_DOWNLOAD_SPEED_MBPS_AVG: f64 = 50.0; +const INTERNET_DOWNLOAD_SPEED_MBPS_POOR: f64 = 15.0; +const INTERNET_UPLOAD_SPEED_MBPS_AVG: f64 = 50.0; +const INTERNET_UPLOAD_SPEED_MBPS_POOR: f64 = 15.0; + +#[derive(Deserialize)] +struct FioResult { + jobs: Vec, +} + +#[derive(Deserialize)] +struct FioResultJob { + read: FioResultSingle, + write: FioResultSingle, +} + +#[derive(Deserialize)] +struct FioResultSingle { + iops: f64, + bw: f64, +} + +#[allow(async_fn_in_trait)] +trait DiskTestTool { + async fn check_availability(&self) -> Result<()>; + async fn write_speed(&self, path: &Path, block_size_kb: i32) -> Result; + async fn write_iops(&self, path: &Path, block_size_kb: i32) -> Result; + async fn read_speed(&self, path: &Path, block_size_kb: i32) -> Result; + async fn read_iops(&self, path: &Path, block_size_kb: i32) -> Result; +} + +struct FioTestTool; + +impl DiskTestTool for FioTestTool { + async fn check_availability(&self) -> Result<()> { + let result = tokio::process::Command::new("fio") + .arg("--version") + .output() + .await; + match result { + Ok(o) if o.status.success() => Ok(()), + _ => Err(CliError::Other(FIO_NOT_FOUND.to_string())), + } + } + + async fn write_speed(&self, path: &Path, block_size_kb: i32) -> Result { + let out = fio_command(path, block_size_kb, "write").await?; + let res: FioResult = serde_json::from_slice(&out) + .map_err(|e| CliError::Other(format!("unmarshal fio result: {e}")))?; + let job = res + .jobs + .into_iter() + .next() + .ok_or_else(|| CliError::Other("fio returned no jobs".to_string()))?; + Ok(job.write.bw / 1024.0) + } + + async fn write_iops(&self, path: &Path, block_size_kb: i32) -> Result { + let out = fio_command(path, block_size_kb, "write").await?; + let res: FioResult = serde_json::from_slice(&out) + .map_err(|e| CliError::Other(format!("unmarshal fio result: {e}")))?; + let job = res + .jobs + .into_iter() + .next() + .ok_or_else(|| CliError::Other("fio returned no jobs".to_string()))?; + Ok(job.write.iops) + } + + async fn read_speed(&self, path: &Path, block_size_kb: i32) -> Result { + let out = fio_command(path, block_size_kb, "read").await?; + let res: FioResult = serde_json::from_slice(&out) + .map_err(|e| CliError::Other(format!("unmarshal fio result: {e}")))?; + let job = res + .jobs + .into_iter() + .next() + .ok_or_else(|| CliError::Other("fio returned no jobs".to_string()))?; + Ok(job.read.bw / 1024.0) + } + + async fn read_iops(&self, path: &Path, block_size_kb: i32) -> Result { + let out = fio_command(path, block_size_kb, "read").await?; + let res: FioResult = serde_json::from_slice(&out) + .map_err(|e| CliError::Other(format!("unmarshal fio result: {e}")))?; + let job = res + .jobs + .into_iter() + .next() + .ok_or_else(|| CliError::Other("fio returned no jobs".to_string()))?; + Ok(job.read.iops) + } +} + +fn can_write_to_dir(dir: &Path) -> bool { + let test_file = dir.join(".perm_test_tmp"); + match std::fs::File::create(&test_file) { + Ok(_) => { + let _ = std::fs::remove_file(&test_file); + true + } + Err(_) => false, + } +} + +async fn fio_command(path: &Path, block_size_kb: i32, operation: &str) -> Result> { + let filename = path.join("fiotest"); + let filename_str = filename.to_string_lossy().into_owned(); + let size_per_job = DISK_OPS_MBS_TOTAL / DISK_OPS_NUM_OF_JOBS; + + let output = tokio::process::Command::new("fio") + .arg("--name=fioTest") + .arg(format!("--filename={filename_str}")) + .arg(format!("--size={size_per_job}Mb")) + .arg(format!("--blocksize={block_size_kb}k")) + .arg(format!("--numjobs={DISK_OPS_NUM_OF_JOBS}")) + .arg(format!("--rw={operation}")) + .arg("--direct=1") + .arg("--runtime=60s") + .arg("--group_reporting") + .arg("--output-format=json") + .output() + .await + .map_err(|e| CliError::Other(format!("exec fio command: {e}")))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(CliError::Other(format!("exec fio command: {stderr}"))); + } + + tokio::fs::remove_file(&filename) + .await + .map_err(|e| CliError::Other(format!("delete fio test file: {e}")))?; + + Ok(output.stdout) +} + +fn available_memory_linux() -> Result { + let file = std::fs::File::open("/proc/meminfo") + .map_err(|e| CliError::Other(format!("open /proc/meminfo: {e}")))?; + let reader = BufReader::new(file); + + for line_result in reader.lines() { + let line = line_result.map_err(|e| CliError::Other(format!("open /proc/meminfo: {e}")))?; + if !line.contains("MemAvailable") { + continue; + } + let (_, value_part) = line + .split_once(": ") + .ok_or_else(|| CliError::Other("parse MemAvailable int".to_string()))?; + let kbs_str = value_part + .split("kB") + .next() + .unwrap_or_default() + .trim() + .to_string(); + let kbs: i64 = kbs_str + .parse() + .map_err(|_| CliError::Other("parse MemAvailable int".to_string()))?; + return Ok(kbs.saturating_mul(1024)); + } + + Err(CliError::Other( + "memAvailable not found in /proc/meminfo".to_string(), + )) +} + +async fn available_memory_macos() -> Result { + let page_size_out = tokio::process::Command::new("pagesize") + .output() + .await + .map_err(|e| CliError::Other(format!("run pagesize: {e}")))?; + let page_size_str = String::from_utf8_lossy(&page_size_out.stdout); + let page_size: i64 = page_size_str + .trim() + .parse() + .map_err(|_| CliError::Other("parse memorySizePerPage int".to_string()))?; + + let vm_stat_out = tokio::process::Command::new("vm_stat") + .output() + .await + .map_err(|e| CliError::Other(format!("run vm_stat: {e}")))?; + let vm_stat = String::from_utf8_lossy(&vm_stat_out.stdout).into_owned(); + + let mut pages_free: i64 = 0; + let mut pages_inactive: i64 = 0; + let mut pages_speculative: i64 = 0; + + for line in vm_stat.lines() { + let Some((key, value)) = line.split_once(": ") else { + continue; + }; + let num_str = value.split('.').next().unwrap_or_default().trim(); + + if key.contains("Pages free") { + pages_free = num_str + .parse() + .map_err(|_| CliError::Other("parse Pages free int".to_string()))?; + } else if key.contains("Pages inactive") { + pages_inactive = num_str + .parse() + .map_err(|_| CliError::Other("parse Pages inactive int".to_string()))?; + } else if key.contains("Pages speculative") { + pages_speculative = num_str + .parse() + .map_err(|_| CliError::Other("parse Pages speculative int".to_string()))?; + } + } + + let total = pages_free + .saturating_add(pages_inactive) + .saturating_add(pages_speculative); + Ok(total.saturating_mul(page_size)) +} + +fn total_memory_linux() -> Result { + let file = std::fs::File::open("/proc/meminfo") + .map_err(|e| CliError::Other(format!("open /proc/meminfo: {e}")))?; + let reader = BufReader::new(file); + + for line_result in reader.lines() { + let line = line_result.map_err(|e| CliError::Other(format!("open /proc/meminfo: {e}")))?; + if !line.contains("MemTotal") { + continue; + } + let (_, value_part) = line + .split_once(": ") + .ok_or_else(|| CliError::Other("parse MemTotal int".to_string()))?; + let kbs_str = value_part + .split("kB") + .next() + .unwrap_or_default() + .trim() + .to_string(); + let kbs: i64 = kbs_str + .parse() + .map_err(|_| CliError::Other("parse MemTotal int".to_string()))?; + return Ok(kbs.saturating_mul(1024)); + } + + Err(CliError::Other( + "memTotal not found in /proc/meminfo".to_string(), + )) +} + +async fn total_memory_macos() -> Result { + let out = tokio::process::Command::new("sysctl") + .arg("hw.memsize") + .output() + .await + .map_err(|e| CliError::Other(format!("run sysctl hw.memsize: {e}")))?; + let output_str = String::from_utf8_lossy(&out.stdout); + let mem_str = output_str + .split_once(": ") + .map(|(_, v)| v.trim()) + .ok_or_else(|| CliError::Other("parse memSize int".to_string()))?; + mem_str + .parse() + .map_err(|_| CliError::Other("parse memSize int".to_string())) +} + +async fn disk_write_speed_test( + args: &TestInfraArgs, + disk_dir: &Path, + tool: &impl DiskTestTool, +) -> TestResult { + let mut result = TestResult::new("DiskWriteSpeed"); + + tracing::info!( + test_file_size_mb = DISK_OPS_MBS_TOTAL, + jobs = DISK_OPS_NUM_OF_JOBS, + test_file_path = %disk_dir.display(), + "Testing disk write speed..." + ); + + if let Err(e) = tool.check_availability().await { + return result.fail(e); + } + + match tool.write_speed(disk_dir, args.disk_io_block_size_kb).await { + Err(e) => result.fail(e), + Ok(speed) => { + result.verdict = if speed < DISK_WRITE_SPEED_MBS_POOR { + TestVerdict::Poor + } else if speed < DISK_WRITE_SPEED_MBS_AVG { + TestVerdict::Avg + } else { + TestVerdict::Good + }; + result.measurement = format!("{speed:.2}MB/s"); + result + } + } +} + +async fn disk_write_iops_test( + args: &TestInfraArgs, + disk_dir: &Path, + tool: &impl DiskTestTool, +) -> TestResult { + let mut result = TestResult::new("DiskWriteIOPS"); + + tracing::info!( + test_file_size_mb = DISK_OPS_MBS_TOTAL, + jobs = DISK_OPS_NUM_OF_JOBS, + test_file_path = %disk_dir.display(), + "Testing disk write IOPS..." + ); + + if let Err(e) = tool.check_availability().await { + return result.fail(e); + } + + match tool.write_iops(disk_dir, args.disk_io_block_size_kb).await { + Err(e) => result.fail(e), + Ok(iops) => { + result.verdict = if iops < DISK_WRITE_IOPS_POOR { + TestVerdict::Poor + } else if iops < DISK_WRITE_IOPS_AVG { + TestVerdict::Avg + } else { + TestVerdict::Good + }; + result.measurement = format!("{iops:.0}"); + result + } + } +} + +async fn disk_read_speed_test( + args: &TestInfraArgs, + disk_dir: &Path, + tool: &impl DiskTestTool, +) -> TestResult { + let mut result = TestResult::new("DiskReadSpeed"); + + tracing::info!( + test_file_size_mb = DISK_OPS_MBS_TOTAL, + jobs = DISK_OPS_NUM_OF_JOBS, + test_file_path = %disk_dir.display(), + "Testing disk read speed..." + ); + + if let Err(e) = tool.check_availability().await { + return result.fail(e); + } + + match tool.read_speed(disk_dir, args.disk_io_block_size_kb).await { + Err(e) => result.fail(e), + Ok(speed) => { + result.verdict = if speed < DISK_READ_SPEED_MBS_POOR { + TestVerdict::Poor + } else if speed < DISK_READ_SPEED_MBS_AVG { + TestVerdict::Avg + } else { + TestVerdict::Good + }; + result.measurement = format!("{speed:.2}MB/s"); + result + } + } +} + +/// Go bug parity: the original Go implementation (testinfra.go:377) calls +/// ReadSpeed instead of ReadIOPS for this test, then compares the bandwidth +/// result against IOPS thresholds. Fixed here to call read_iops() correctly; +/// the Go behaviour was clearly unintentional. +async fn disk_read_iops_test( + args: &TestInfraArgs, + disk_dir: &Path, + tool: &impl DiskTestTool, +) -> TestResult { + let mut result = TestResult::new("DiskReadIOPS"); + + tracing::info!( + test_file_size_mb = DISK_OPS_MBS_TOTAL, + jobs = DISK_OPS_NUM_OF_JOBS, + test_file_path = %disk_dir.display(), + "Testing disk read IOPS..." + ); + + if let Err(e) = tool.check_availability().await { + return result.fail(e); + } + + match tool.read_iops(disk_dir, args.disk_io_block_size_kb).await { + Err(e) => result.fail(e), + Ok(iops) => { + result.verdict = if iops < DISK_READ_IOPS_POOR { + TestVerdict::Poor + } else if iops < DISK_READ_IOPS_AVG { + TestVerdict::Avg + } else { + TestVerdict::Good + }; + result.measurement = format!("{iops:.0}"); + result + } + } +} + +async fn available_memory_test() -> TestResult { + let mut result = TestResult::new("AvailableMemory"); + + let bytes = match std::env::consts::OS { + "linux" => available_memory_linux(), + "macos" => available_memory_macos().await, + os => return result.fail(CliError::Other(format!("unknown OS {os}"))), + }; + + match bytes { + Err(e) => result.fail(e), + Ok(b) => { + let mb = b / 1024 / 1024; + result.verdict = if mb < AVAILABLE_MEMORY_MBS_POOR { + TestVerdict::Poor + } else if mb < AVAILABLE_MEMORY_MBS_AVG { + TestVerdict::Avg + } else { + TestVerdict::Good + }; + result.measurement = format!("{mb}MB"); + result + } + } +} + +async fn total_memory_test() -> TestResult { + let mut result = TestResult::new("TotalMemory"); + + let bytes = match std::env::consts::OS { + "linux" => total_memory_linux(), + "macos" => total_memory_macos().await, + os => return result.fail(CliError::Other(format!("unknown OS {os}"))), + }; + + match bytes { + Err(e) => result.fail(e), + Ok(b) => { + let mb = b / 1024 / 1024; + result.verdict = if mb < TOTAL_MEMORY_MBS_POOR { + TestVerdict::Poor + } else if mb < TOTAL_MEMORY_MBS_AVG { + TestVerdict::Avg + } else { + TestVerdict::Good + }; + result.measurement = format!("{mb}MB"); + result + } + } +} + +async fn internet_latency_test(args: &TestInfraArgs, client: &reqwest::Client) -> TestResult { + let result = TestResult::new("InternetLatency"); + + let mut server = match super::speedtest::fetch_best_server( + &args.internet_test_servers_only, + &args.internet_test_servers_exclude, + client, + ) + .await + { + Err(e) => return result.fail(e), + Ok(s) => s, + }; + + tracing::info!( + server_name = %server.name, + server_country = %server.country, + server_distance_km = server.distance, + server_id = %server.id, + "Testing internet latency..." + ); + + if let Err(e) = server.ping_test(client).await { + return result.fail(e); + } + + evaluate_rtt( + server.latency, + result, + INTERNET_LATENCY_AVG, + INTERNET_LATENCY_POOR, + ) +} + +async fn internet_download_speed_test( + args: &TestInfraArgs, + client: &reqwest::Client, +) -> TestResult { + let mut result = TestResult::new("InternetDownloadSpeed"); + + let mut server = match super::speedtest::fetch_best_server( + &args.internet_test_servers_only, + &args.internet_test_servers_exclude, + client, + ) + .await + { + Err(e) => return result.fail(e), + Ok(s) => s, + }; + + tracing::info!( + server_name = %server.name, + server_country = %server.country, + server_distance_km = server.distance, + server_id = %server.id, + "Testing internet download speed..." + ); + + if let Err(e) = server.download_test(client).await { + return result.fail(e); + } + + let speed = server.dl_speed_mbps; + result.verdict = if speed < INTERNET_DOWNLOAD_SPEED_MBPS_POOR { + TestVerdict::Poor + } else if speed < INTERNET_DOWNLOAD_SPEED_MBPS_AVG { + TestVerdict::Avg + } else { + TestVerdict::Good + }; + result.measurement = format!("{speed:.2}MB/s"); + result +} + +async fn internet_upload_speed_test(args: &TestInfraArgs, client: &reqwest::Client) -> TestResult { + let mut result = TestResult::new("InternetUploadSpeed"); + + let mut server = match super::speedtest::fetch_best_server( + &args.internet_test_servers_only, + &args.internet_test_servers_exclude, + client, + ) + .await + { + Err(e) => return result.fail(e), + Ok(s) => s, + }; + + tracing::info!( + server_name = %server.name, + server_country = %server.country, + server_distance_km = server.distance, + server_id = %server.id, + "Testing internet upload speed..." + ); + + if let Err(e) = server.upload_test(client).await { + return result.fail(e); + } + + let speed = server.ul_speed_mbps; + result.verdict = if speed < INTERNET_UPLOAD_SPEED_MBPS_POOR { + TestVerdict::Poor + } else if speed < INTERNET_UPLOAD_SPEED_MBPS_AVG { + TestVerdict::Avg + } else { + TestVerdict::Good + }; + result.measurement = format!("{speed:.2}MB/s"); + result +} + +/// Returns the ordered list of supported infra test case names. +pub(crate) fn supported_infra_test_cases() -> Vec { + vec![ + TestCaseName::new("DiskWriteSpeed", 1), + TestCaseName::new("DiskWriteIOPS", 2), + TestCaseName::new("DiskReadSpeed", 3), + TestCaseName::new("DiskReadIOPS", 4), + TestCaseName::new("AvailableMemory", 5), + TestCaseName::new("TotalMemory", 6), + TestCaseName::new("InternetLatency", 7), + TestCaseName::new("InternetDownloadSpeed", 8), + TestCaseName::new("InternetUploadSpeed", 9), + ] +} + +async fn run_single_test( + name: &str, + args: &TestInfraArgs, + disk_dir: &Path, + tool: &FioTestTool, + client: &reqwest::Client, +) -> TestResult { + match name { + "DiskWriteSpeed" => disk_write_speed_test(args, disk_dir, tool).await, + "DiskWriteIOPS" => disk_write_iops_test(args, disk_dir, tool).await, + "DiskReadSpeed" => disk_read_speed_test(args, disk_dir, tool).await, + "DiskReadIOPS" => disk_read_iops_test(args, disk_dir, tool).await, + "AvailableMemory" => available_memory_test().await, + "TotalMemory" => total_memory_test().await, + "InternetLatency" => internet_latency_test(args, client).await, + "InternetDownloadSpeed" => internet_download_speed_test(args, client).await, + "InternetUploadSpeed" => internet_upload_speed_test(args, client).await, + _ => TestResult::new(name).fail(CliError::Other(format!("unknown test: {name}"))), + } +} + +async fn run_tests_with_timeout( + args: &TestInfraArgs, + tests: &[TestCaseName], + disk_dir: &Path, + client: &reqwest::Client, + ct: CancellationToken, +) -> Vec { + let tool = FioTestTool; + let mut results = Vec::new(); + let start = Instant::now(); + + for test_case in tests { + let remaining = args.test_config.timeout.saturating_sub(start.elapsed()); + tokio::select! { + result = run_single_test(test_case.name, args, disk_dir, &tool, client) => { + results.push(result); + } + () = tokio::time::sleep(remaining) => { + results.push(TestResult::new(test_case.name).fail(CliError::TimeoutInterrupted)); + break; + } + () = ct.cancelled() => { + results.push(TestResult::new(test_case.name).fail(CliError::TimeoutInterrupted)); + break; + } + } + } + + results +} + +/// Runs the infrastructure tests. +pub async fn run( + args: TestInfraArgs, + writer: &mut dyn Write, + ct: CancellationToken, +) -> Result { + pluto_tracing::init( + &pluto_tracing::TracingConfig::builder() + .with_default_console() + .build(), + ) + .expect("Failed to initialize tracing"); + + must_output_to_file_on_quiet(args.test_config.quiet, &args.test_config.output_json)?; + + tracing::info!("Starting hardware performance and network connectivity test"); + + let disk_dir = match &args.disk_io_test_file_dir { + Some(dir) => PathBuf::from(dir), + None => std::env::var("HOME") + .or_else(|_| std::env::var("USERPROFILE")) + .map(PathBuf::from) + .map_err(|_| CliError::Other("get user home directory".to_string()))?, + }; + + if !can_write_to_dir(&disk_dir) { + return Err(CliError::Other(format!( + "no write permissions to disk IO test file directory: {}", + disk_dir.display() + ))); + } + + let client = super::speedtest::build_client()?; + + let all_cases = supported_infra_test_cases(); + let mut queued = filter_tests(&all_cases, args.test_config.test_cases.as_deref()); + if queued.is_empty() { + return Err(CliError::TestCaseNotSupported); + } + sort_tests(&mut queued); + + let start = Instant::now(); + let test_results = run_tests_with_timeout(&args, &queued, &disk_dir, &client, ct).await; + let elapsed = start.elapsed(); + + let score = calculate_score(&test_results); + + let mut res = TestCategoryResult::new(TestCategory::Infra); + res.targets.insert("local".to_string(), test_results); + res.execution_time = Some(CliDuration::new(elapsed)); + res.score = Some(score); + + if !args.test_config.quiet { + write_result_to_writer(&res, writer)?; + } + + if !args.test_config.output_json.is_empty() { + write_result_to_file(&res, args.test_config.output_json.as_ref()).await?; + } + + if args.test_config.publish { + let all = AllCategoriesResult { + infra: Some(res.clone()), + ..Default::default() + }; + publish_result_to_obol_api( + all, + &args.test_config.publish_addr, + &args.test_config.publish_private_key_file, + ) + .await?; + } + + Ok(res) +} /// Arguments for the infra test command. #[derive(Args, Clone, Debug)] @@ -42,18 +785,3 @@ pub struct TestInfraArgs { )] pub internet_test_servers_exclude: Vec, } - -/// Runs the infrastructure tests. -pub async fn run(_args: TestInfraArgs, _writer: &mut dyn Write) -> Result { - // TODO: Implement infra tests - // - DiskWriteSpeed - // - DiskWriteIOPS - // - DiskReadSpeed - // - DiskReadIOPS - // - AvailableMemory - // - TotalMemory - // - InternetLatency - // - InternetDownloadSpeed - // - InternetUploadSpeed - unimplemented!("infra test not yet implemented") -} diff --git a/crates/cli/src/commands/test/mod.rs b/crates/cli/src/commands/test/mod.rs index 84e7dd1a..2fbf3dee 100644 --- a/crates/cli/src/commands/test/mod.rs +++ b/crates/cli/src/commands/test/mod.rs @@ -14,6 +14,7 @@ pub mod helpers; pub mod infra; pub mod mev; pub mod peers; +pub(super) mod speedtest; pub mod validator; pub(crate) use helpers::*; @@ -89,10 +90,10 @@ fn list_test_cases(category: TestCategory) -> Vec { // supported_self_test_cases() vec![] } - TestCategory::Infra => { - // TODO: Extract from infra::supported_infra_test_cases() - vec![] - } + TestCategory::Infra => infra::supported_infra_test_cases() + .into_iter() + .map(|tc| tc.name.to_string()) + .collect(), TestCategory::All => { // TODO: Combine all test cases from all categories vec![] diff --git a/crates/cli/src/commands/test/speedtest.rs b/crates/cli/src/commands/test/speedtest.rs new file mode 100644 index 00000000..a4d95372 --- /dev/null +++ b/crates/cli/src/commands/test/speedtest.rs @@ -0,0 +1,164 @@ +//! Ookla Speedtest.net client for latency, download, and upload measurements. + +use std::time::{Duration, Instant}; + +use serde::Deserialize; + +use crate::error::{CliError, Result}; + +const SPEEDTEST_SERVERS_URL: &str = + "https://www.speedtest.net/api/js/servers?engine=js&https_functional=true&limit=10"; +const SPEEDTEST_MAX_CANDIDATES: usize = 5; +const SPEEDTEST_UPLOAD_BYTES: usize = 50_000_000; + +#[derive(Deserialize)] +struct OoklaServerResponse { + id: String, + name: String, + country: String, + url: String, + #[serde(default)] + distance: f64, +} + +pub(super) struct SpeedtestServer { + pub(super) id: String, + pub(super) name: String, + pub(super) country: String, + pub(super) distance: f64, + pub(super) latency: Duration, + pub(super) dl_speed_mbps: f64, + pub(super) ul_speed_mbps: f64, + url: String, +} + +impl SpeedtestServer { + fn from_response(r: OoklaServerResponse) -> Self { + Self { + id: r.id, + name: r.name, + country: r.country, + url: r.url, + distance: r.distance, + latency: Duration::ZERO, + dl_speed_mbps: 0.0, + ul_speed_mbps: 0.0, + } + } + + fn base_url(&self) -> &str { + self.url.strip_suffix("upload.php").unwrap_or(&self.url) + } + + pub(super) async fn ping_test(&mut self, client: &reqwest::Client) -> Result<()> { + let latency_url = format!("{}latency.txt", self.base_url()); + let start = Instant::now(); + // Read and discard the body so the connection is left in a clean state + // for the connection pool; dropping Response without reading closes the + // underlying TCP socket and corrupts pool state for subsequent requests. + let response = client.get(&latency_url).send().await?; + let _ = response.bytes().await?; + self.latency = start.elapsed(); + Ok(()) + } + + pub(super) async fn download_test(&mut self, client: &reqwest::Client) -> Result<()> { + // Download multiple large images sequentially to saturate the link long enough + // for an accurate throughput measurement (single 4000x4000 JPEG is ~4MB). + let download_url = format!("{}random4000x4000.jpg", self.base_url()); + let mut total_bytes = 0usize; + let start = Instant::now(); + for _ in 0..4 { + let response = client.get(&download_url).send().await?; + if !response.status().is_success() { + return Err(CliError::Other(format!( + "download test failed: HTTP {}", + response.status() + ))); + } + total_bytes = total_bytes.saturating_add(response.bytes().await?.len()); + } + self.dl_speed_mbps = bytes_to_mbps(total_bytes, start.elapsed()); + Ok(()) + } + + pub(super) async fn upload_test(&mut self, client: &reqwest::Client) -> Result<()> { + let upload_data = vec![0u8; SPEEDTEST_UPLOAD_BYTES]; + let start = Instant::now(); + let response = client + .post(&self.url) + .header("Content-Type", "application/octet-stream") + .header("Content-Length", SPEEDTEST_UPLOAD_BYTES.to_string()) + .body(upload_data) + .send() + .await?; + if !response.status().is_success() { + return Err(CliError::Other(format!( + "upload test failed: HTTP {}", + response.status() + ))); + } + let _ = response.bytes().await?; + self.ul_speed_mbps = bytes_to_mbps(SPEEDTEST_UPLOAD_BYTES, start.elapsed()); + Ok(()) + } +} + +/// Builds a shared reqwest client configured for Ookla Speedtest servers. +pub(super) fn build_client() -> Result { + reqwest::Client::builder() + .user_agent("showwin/speedtest-go 1.7.10") + .build() + .map_err(|e| CliError::Other(format!("build HTTP client: {e}"))) +} + +/// Fetches the Ookla server list, applies filters, pings candidates, and +/// returns the lowest-latency reachable server. +pub(super) async fn fetch_best_server( + servers_only: &[String], + servers_exclude: &[String], + client: &reqwest::Client, +) -> Result { + let servers: Vec = client + .get(SPEEDTEST_SERVERS_URL) + .send() + .await + .map_err(|e| CliError::Other(format!("fetch Ookla servers: {e}")))? + .json() + .await + .map_err(|e| CliError::Other(format!("fetch Ookla servers: {e}")))?; + + let candidates: Vec<_> = servers + .into_iter() + .filter(|s| servers_only.is_empty() || servers_only.contains(&s.name)) + .filter(|s| !servers_exclude.contains(&s.name)) + .collect(); + + if candidates.is_empty() { + return Err(CliError::Other( + "fetch Ookla servers: no servers match the specified filters".to_string(), + )); + } + + let mut best: Option = None; + for candidate in candidates.into_iter().take(SPEEDTEST_MAX_CANDIDATES) { + let mut server = SpeedtestServer::from_response(candidate); + if server.ping_test(client).await.is_ok() { + let is_better = best.as_ref().is_none_or(|b| server.latency < b.latency); + if is_better { + best = Some(server); + } + } + } + + best.ok_or_else(|| CliError::Other("find Ookla server: no reachable servers".to_string())) +} + +#[allow(clippy::cast_precision_loss, clippy::arithmetic_side_effects)] +pub(super) fn bytes_to_mbps(bytes: usize, elapsed: Duration) -> f64 { + let secs = elapsed.as_secs_f64(); + if secs == 0.0 { + return 0.0; + } + bytes as f64 * 8.0 / secs / 1_000_000.0 +} diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 6aede8f2..d5f60477 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -80,7 +80,7 @@ async fn run() -> std::result::Result<(), CliError> { TestCommands::Mev(args) => commands::test::mev::run(args, &mut stdout, ct) .await .map(|_| ()), - TestCommands::Infra(args) => commands::test::infra::run(args, &mut stdout) + TestCommands::Infra(args) => commands::test::infra::run(args, &mut stdout, ct) .await .map(|_| ()), TestCommands::All(args) => commands::test::all::run(*args, &mut stdout).await, From 0f1fe63068dc8708233802e5dda0a9666c9c064f Mon Sep 17 00:00:00 2001 From: Maciej Skrzypkowski Date: Wed, 22 Apr 2026 15:32:58 +0200 Subject: [PATCH 2/6] corrected trait usage for single test --- crates/cli/src/commands/test/infra.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/cli/src/commands/test/infra.rs b/crates/cli/src/commands/test/infra.rs index 7c0a8d78..580e4f9d 100644 --- a/crates/cli/src/commands/test/infra.rs +++ b/crates/cli/src/commands/test/infra.rs @@ -624,7 +624,7 @@ async fn run_single_test( name: &str, args: &TestInfraArgs, disk_dir: &Path, - tool: &FioTestTool, + tool: &impl DiskTestTool, client: &reqwest::Client, ) -> TestResult { match name { From 3b28e6d78e55546b9f45eb55955997e8e6c8cbc0 Mon Sep 17 00:00:00 2001 From: Maciej Skrzypkowski Date: Wed, 22 Apr 2026 17:09:20 +0200 Subject: [PATCH 3/6] review fiexes --- crates/cli/src/commands/test/infra.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/crates/cli/src/commands/test/infra.rs b/crates/cli/src/commands/test/infra.rs index 580e4f9d..65df5970 100644 --- a/crates/cli/src/commands/test/infra.rs +++ b/crates/cli/src/commands/test/infra.rs @@ -160,19 +160,22 @@ async fn fio_command(path: &Path, block_size_kb: i32, operation: &str) -> Result .arg("--runtime=60s") .arg("--group_reporting") .arg("--output-format=json") - .output() + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .kill_on_drop(true) + .spawn() + .map_err(|e| CliError::Other(format!("exec fio command: {e}")))? + .wait_with_output() .await .map_err(|e| CliError::Other(format!("exec fio command: {e}")))?; + let _ = tokio::fs::remove_file(&filename).await; + if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); return Err(CliError::Other(format!("exec fio command: {stderr}"))); } - tokio::fs::remove_file(&filename) - .await - .map_err(|e| CliError::Other(format!("delete fio test file: {e}")))?; - Ok(output.stdout) } @@ -198,7 +201,7 @@ fn available_memory_linux() -> Result { let kbs: i64 = kbs_str .parse() .map_err(|_| CliError::Other("parse MemAvailable int".to_string()))?; - return Ok(kbs.saturating_mul(1024)); + return Ok(kbs * 1024); } Err(CliError::Other( @@ -276,7 +279,7 @@ fn total_memory_linux() -> Result { let kbs: i64 = kbs_str .parse() .map_err(|_| CliError::Other("parse MemTotal int".to_string()))?; - return Ok(kbs.saturating_mul(1024)); + return Ok(kbs * 1024); } Err(CliError::Other( @@ -563,7 +566,7 @@ async fn internet_download_speed_test( } else { TestVerdict::Good }; - result.measurement = format!("{speed:.2}MB/s"); + result.measurement = format!("{speed:.2}Mb/s"); result } @@ -785,3 +788,4 @@ pub struct TestInfraArgs { )] pub internet_test_servers_exclude: Vec, } + From 00591a235682d84bfd3d3c2791ba628fd44fb183 Mon Sep 17 00:00:00 2001 From: Maciej Skrzypkowski Date: Wed, 22 Apr 2026 18:56:35 +0200 Subject: [PATCH 4/6] fixed clippy allow statements --- crates/cli/src/commands/test/infra.rs | 11 ++++++++++- crates/cli/src/commands/test/speedtest.rs | 10 ++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/crates/cli/src/commands/test/infra.rs b/crates/cli/src/commands/test/infra.rs index 65df5970..dffc6dc0 100644 --- a/crates/cli/src/commands/test/infra.rs +++ b/crates/cli/src/commands/test/infra.rs @@ -201,6 +201,11 @@ fn available_memory_linux() -> Result { let kbs: i64 = kbs_str .parse() .map_err(|_| CliError::Other("parse MemAvailable int".to_string()))?; + + #[allow( + clippy::arithmetic_side_effects, + reason = "The memory won't overflow i64 because the value would be larger than 9223372TB" + )] return Ok(kbs * 1024); } @@ -279,6 +284,11 @@ fn total_memory_linux() -> Result { let kbs: i64 = kbs_str .parse() .map_err(|_| CliError::Other("parse MemTotal int".to_string()))?; + + #[allow( + clippy::arithmetic_side_effects, + reason = "The memory won't overflow i64 because the value would be larger than 9223372TB" + )] return Ok(kbs * 1024); } @@ -788,4 +798,3 @@ pub struct TestInfraArgs { )] pub internet_test_servers_exclude: Vec, } - diff --git a/crates/cli/src/commands/test/speedtest.rs b/crates/cli/src/commands/test/speedtest.rs index a4d95372..b4d3b47d 100644 --- a/crates/cli/src/commands/test/speedtest.rs +++ b/crates/cli/src/commands/test/speedtest.rs @@ -154,11 +154,17 @@ pub(super) async fn fetch_best_server( best.ok_or_else(|| CliError::Other("find Ookla server: no reachable servers".to_string())) } -#[allow(clippy::cast_precision_loss, clippy::arithmetic_side_effects)] pub(super) fn bytes_to_mbps(bytes: usize, elapsed: Duration) -> f64 { let secs = elapsed.as_secs_f64(); if secs == 0.0 { return 0.0; } - bytes as f64 * 8.0 / secs / 1_000_000.0 + + #[allow( + clippy::cast_precision_loss, + clippy::arithmetic_side_effects, + reason = "precision loss requires >8PB transferred; arithmetic overflow is impossible for realistic network speeds" + )] + let bytes: f64 = bytes as f64; + bytes * 8.0 / secs / 1_000_000.0 } From 34705575bc918d3c9776c113bb59252cdef853e8 Mon Sep 17 00:00:00 2001 From: Maciej Skrzypkowski Date: Wed, 22 Apr 2026 19:00:09 +0200 Subject: [PATCH 5/6] comment --- crates/cli/src/commands/test/speedtest.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/cli/src/commands/test/speedtest.rs b/crates/cli/src/commands/test/speedtest.rs index b4d3b47d..f916bed8 100644 --- a/crates/cli/src/commands/test/speedtest.rs +++ b/crates/cli/src/commands/test/speedtest.rs @@ -128,6 +128,11 @@ pub(super) async fn fetch_best_server( .await .map_err(|e| CliError::Other(format!("fetch Ookla servers: {e}")))?; + // Go bug parity: the original Go implementation (testinfra.go) appends both + // servers_only and servers_exclude filter results independently (union), so + // excluded servers can still appear if they also match servers_only. The Rust + // implementation correctly chains the filters as intersection, which is the + // intended behaviour. This intentional divergence from Go is kept. let candidates: Vec<_> = servers .into_iter() .filter(|s| servers_only.is_empty() || servers_only.contains(&s.name)) From 3230d3e437d5a056e57369582c190a8e9fe47d23 Mon Sep 17 00:00:00 2001 From: Maciej Skrzypkowski Date: Wed, 22 Apr 2026 19:02:09 +0200 Subject: [PATCH 6/6] additional warning --- crates/cli/src/commands/test/speedtest.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/cli/src/commands/test/speedtest.rs b/crates/cli/src/commands/test/speedtest.rs index f916bed8..b0bc7f51 100644 --- a/crates/cli/src/commands/test/speedtest.rs +++ b/crates/cli/src/commands/test/speedtest.rs @@ -47,7 +47,13 @@ impl SpeedtestServer { } fn base_url(&self) -> &str { - self.url.strip_suffix("upload.php").unwrap_or(&self.url) + match self.url.strip_suffix("upload.php") { + Some(base) => base, + None => { + tracing::warn!(url = %self.url, "Ookla server URL does not end in 'upload.php'; subsequent requests may fail"); + &self.url + } + } } pub(super) async fn ping_test(&mut self, client: &reqwest::Client) -> Result<()> {