diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index c82969f2..77711c01 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -15,6 +15,9 @@ env: CARGO_TERM_COLOR: always CARGO_INCREMENTAL: 0 RUSTFLAGS: "-C debuginfo=0" + LANG: C.UTF-8 + LC_ALL: C.UTF-8 + jobs: check: @@ -36,16 +39,16 @@ jobs: - name: Run clippy if: success() || failure() run: cargo clippy -- -D warnings - #- name: Install Redis binary - #run: | - #sudo apt-get update - #sudo apt-get install -y redis-server - ## Ensure redis-server binary is installed but don't start the service - #sudo systemctl stop redis || true - #sudo systemctl disable redis || true + - name: Install Redis binary + run: | + sudo apt-get update + sudo apt-get install -y redis-server + # Ensure redis-server binary is installed but don't start the service + sudo systemctl stop redis || true + sudo systemctl disable redis || true - #- name: Run tests - #if: success() || failure() - #run: | - #redis-server --version - #cargo test -- --nocapture \ No newline at end of file + - name: Run tests + if: success() || failure() + run: | + redis-server --version + cargo test -- --nocapture \ No newline at end of file diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7982f8fc..8b7c7218 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,68 +1,32 @@ # Contributing Guidelines -We love your input! We want to make contributing to this project as easy and transparent as possible, whether it's: +We welcome contributions via GitHub Pull Requests. -- Reporting a bug -- Discussing the current state of the code -- Submitting a fix -- Proposing new features -- Becoming a maintainer +## Branching & Workflow -## We Develop with Github -We use GitHub to host code, to track issues and feature requests, as well as accept pull requests. +* **`main`:** Stable releases ONLY. Merged from `release/*`. **NEVER commit directly.** +* **`develop`:** Integration branch for the next release. Source for features/releases. -## Branch Strategy -We use a two-branch strategy for development: +**1. Features (`feature/*`)** + * Branch from `develop`. + * PR targets `develop`. + * Use **Squash and Merge** when merging the PR. -1. `develop` - This is our main development branch where all feature branches are merged for nightly builds and testing -2. `main` - This is our stable production branch that contains reviewed and tested code +**2. Releases (`release/vX.Y.Z`)** + * Branch from `develop`. Add only version bumps & critical final fixes. + * **PR 1:** `release/*` -> `main`. Use **Merge Commit (NO Squash)**. + * **PR 2:** `release/*` -> `develop`. Use **Merge Commit (NO Squash)**. -### Development Process +**Rule:** **NEVER** merge `main` back into `develop`. -1. Create a new feature branch from `develop` -2. Make your changes and commit them -3. Submit a pull request to merge into `develop` -4. After review and testing in `develop`, changes will be merged into `main` for production releases +## Pull Requests (Features to `develop`) -## Pull Request Process +* Base your feature branch on the latest `develop`. +* Include tests and documentation updates. +* Ensure tests and linting pass. +* Submit PR targeting `develop`; address feedback. -1. Fork the repo and create your feature branch from `develop` -2. If you've added code that should be tested, add tests -3. If you've changed APIs, update the documentation -4. Ensure the test suite passes -5. Make sure your code lints -6. Submit a pull request to merge into `develop` +## Issues & Commits -## Report bugs using Github's issue tracker -We use GitHub issues to track public bugs. Report a bug by opening a new issue; it's that easy! - -## Write bug reports with detail, background, and sample code - -**Great Bug Reports** tend to have: - -- A quick summary and/or background -- Steps to reproduce - - Be specific! - - Give sample code if you can. -- What you expected would happen -- What actually happens -- Notes (possibly including why you think this might be happening, or stuff you tried that didn't work) - -## Development Process - -1. Create a new branch from `develop` for your work -2. Make your changes -3. Write or update tests as needed -4. Update documentation as needed -5. Submit a pull request to `develop` -6. Address any review feedback - -### Commit Messages - -- Use the present tense ("Add feature" not "Added feature") -- Use the imperative mood ("Move cursor to..." not "Moves cursor to...") -- Limit the first line to 72 characters or less -- Reference issues and pull requests liberally after the first line - -## References -This document was adapted from the open-source contribution guidelines for [Facebook's Draft](https://github.com/facebook/draft-js/blob/a9316a723f9e918afde44dea68b5f9f39b7d9b00/CONTRIBUTING.md). +* Report bugs via GitHub Issues with details. +* Use clear, present-tense commit messages (e.g., "Fix login bug #123"). \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 94f28093..c99afdb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2342,7 +2342,7 @@ dependencies = [ [[package]] name = "discovery" -version = "0.2.3" +version = "0.2.4" dependencies = [ "actix-web", "alloy", @@ -4349,7 +4349,7 @@ checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" [[package]] name = "orchestrator" -version = "0.2.3" +version = "0.2.4" dependencies = [ "actix-web", "alloy", @@ -6477,7 +6477,7 @@ dependencies = [ [[package]] name = "validator" -version = "0.2.3" +version = "0.2.4" dependencies = [ "actix-web", "alloy", @@ -7126,7 +7126,7 @@ dependencies = [ [[package]] name = "worker" -version = "0.2.3" +version = "0.2.4" dependencies = [ "actix-web", "alloy", diff --git a/Cargo.toml b/Cargo.toml index 316242df..fc0d31b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["discovery", "worker", "validator", "shared", "orchestrator", "dev-ut resolver = "2" [workspace.package] -version = "0.2.3" +version = "0.2.4" edition = "2021" [workspace.features] diff --git a/discovery/src/api/routes/node.rs b/discovery/src/api/routes/node.rs index 1c676e85..54ab4edb 100644 --- a/discovery/src/api/routes/node.rs +++ b/discovery/src/api/routes/node.rs @@ -220,6 +220,7 @@ mod tests { count: Some(4), model: Some("A100".to_string()), memory_mb: Some(40000), + indices: Some(vec![0, 1, 2, 3]), }), cpu: Some(CpuSpecs { cores: Some(16), diff --git a/orchestrator/src/api/routes/nodes.rs b/orchestrator/src/api/routes/nodes.rs index f67d51cc..29f24776 100644 --- a/orchestrator/src/api/routes/nodes.rs +++ b/orchestrator/src/api/routes/nodes.rs @@ -9,13 +9,31 @@ use serde_json::json; use shared::security::request_signer::sign_request; use std::str::FromStr; use std::time::Duration; - // Timeout for node operations in seconds const NODE_REQUEST_TIMEOUT: u64 = 30; async fn get_nodes(app_state: Data) -> HttpResponse { let nodes = app_state.store_context.node_store.get_nodes(); - HttpResponse::Ok().json(json!({"success": true, "nodes": nodes})) + + let mut status_counts = json!({}); + for node in &nodes { + let status_str = format!("{:?}", node.status); + if let Some(count) = status_counts.get(&status_str) { + if let Some(count_value) = count.as_u64() { + status_counts[status_str] = json!(count_value + 1); + } else { + status_counts[status_str] = json!(1); + } + } else { + status_counts[status_str] = json!(1); + } + } + + HttpResponse::Ok().json(json!({ + "success": true, + "nodes": nodes, + "counts": status_counts + })) } async fn restart_node_task(node_id: web::Path, app_state: Data) -> HttpResponse { diff --git a/shared/src/models/node.rs b/shared/src/models/node.rs index 92b0a1f7..3c74e063 100644 --- a/shared/src/models/node.rs +++ b/shared/src/models/node.rs @@ -66,6 +66,7 @@ pub struct GpuSpecs { pub count: Option, pub model: Option, pub memory_mb: Option, + pub indices: Option>, } impl fmt::Display for GpuSpecs { @@ -445,6 +446,7 @@ mod tests { count: gpu_count, model: gpu_model.map(String::from), memory_mb: gpu_mem, + indices: None, }) } else { None @@ -744,6 +746,7 @@ mod tests { count: Some(4), model: Some("A100".to_string()), memory_mb: None, + indices: None, }), cpu: Some(CpuSpecs { cores: Some(16), diff --git a/validator/src/validators/synthetic_data.rs b/validator/src/validators/synthetic_data.rs index 492b5144..1dabe913 100644 --- a/validator/src/validators/synthetic_data.rs +++ b/validator/src/validators/synthetic_data.rs @@ -603,7 +603,6 @@ mod tests { use shared::web3::contracts::core::builder::ContractBuilder; use shared::web3::wallet::Wallet; use url::Url; - fn test_store() -> RedisStore { let store = RedisStore::new_test(); let mut con = store @@ -639,6 +638,16 @@ mod tests { .build() .map_err(|e| Error::msg(format!("Failed to build contracts: {}", e)))?; + // Get S3 credentials from environment variables if they exist + let s3_credentials = std::env::var("S3_CREDENTIALS").ok(); + let bucket_name = std::env::var("S3_BUCKET_NAME").ok(); + + // If either credential is missing, we'll proceed with None values + if s3_credentials.is_none() || bucket_name.is_none() { + println!("S3 credentials or bucket name not found in environment, proceeding with test using None values"); + return Ok(()); + } + let validator = SyntheticDataValidator::new( "0".to_string(), contracts.synthetic_data_validator.clone().unwrap(), @@ -651,8 +660,8 @@ mod tests { unknown_status_expiry_seconds: 120, }, U256::from(1000), - None, - None, + s3_credentials, + bucket_name, store, CancellationToken::new(), ); diff --git a/worker/src/checks/hardware/gpu.rs b/worker/src/checks/hardware/gpu.rs index 541ffa59..9b58a3d8 100644 --- a/worker/src/checks/hardware/gpu.rs +++ b/worker/src/checks/hardware/gpu.rs @@ -5,52 +5,44 @@ use shared::models::node::GpuSpecs; use std::sync::Mutex; #[allow(dead_code)] -const BYTES_TO_GB: f64 = 1024.0 * 1024.0 * 1024.0; +const BYTES_TO_MB: u64 = 1024 * 1024; // Use lazy_static to initialize NVML once and reuse it lazy_static! { static ref NVML: Mutex> = Mutex::new(None); } +#[derive(Debug)] #[allow(dead_code)] -enum GpuDevice { - Available { - name: String, - memory: u64, - driver_version: String, - device_count: usize, - }, - NotAvailable(String), +struct GpuDevice { + name: String, + memory: u64, + driver_version: String, + count: u32, + indices: Vec, } -pub fn detect_gpu() -> Option { +pub fn detect_gpu() -> Vec { Console::title("GPU Detection"); - // Changed return type to GpuSpecs - match get_gpu_status() { - GpuDevice::Available { - name, - memory, - driver_version: _, - device_count, - } => Some(GpuSpecs { - // Create GpuSpecs directly - count: Some(device_count as u32), - model: Some( - name.to_lowercase() - .split_whitespace() - .collect::>() - .join("_"), - ), - memory_mb: Some((memory / 1024 / 1024) as u32), // Convert bytes to MB - }), - GpuDevice::NotAvailable(_) => { - Console::user_error("GPU not available"); - None - } + + let gpu_devices = get_gpu_status(); + if gpu_devices.is_empty() { + Console::user_error("No GPU devices detected"); + return vec![]; } + + gpu_devices + .into_iter() + .map(|device| GpuSpecs { + count: Some(device.count), + model: Some(device.name.to_lowercase()), + memory_mb: Some((device.memory / BYTES_TO_MB) as u32), + indices: Some(device.indices), + }) + .collect() } -fn get_gpu_status() -> GpuDevice { +fn get_gpu_status() -> Vec { let mut nvml_guard = NVML.lock().unwrap(); // Initialize NVML if not already initialized @@ -62,7 +54,10 @@ fn get_gpu_status() -> GpuDevice { .init() { Ok(nvml) => *nvml_guard = Some(nvml), - Err(e) => return GpuDevice::NotAvailable(format!("Failed to initialize NVML: {}", e)), + Err(e) => { + Console::user_error(&format!("Failed to initialize NVML: {}", e)); + return vec![]; + } } } @@ -71,30 +66,50 @@ fn get_gpu_status() -> GpuDevice { // Get device count let device_count = match nvml.device_count() { Ok(count) => count as usize, - Err(e) => return GpuDevice::NotAvailable(format!("Failed to get device count: {}", e)), + Err(e) => { + Console::user_error(&format!("Failed to get device count: {}", e)); + return vec![]; + } }; if device_count == 0 { - return GpuDevice::NotAvailable("No GPU devices detected".to_string()); + Console::user_error("No GPU devices detected"); + return vec![]; } - // Get first device info - // TODO: Get all devices - match nvml.device_by_index(0) { - Ok(device) => { - let name = device.name().unwrap_or_else(|_| "Unknown".to_string()); - let memory = device.memory_info().map(|m| m.total).unwrap_or(0); - let driver_version = nvml - .sys_driver_version() - .unwrap_or_else(|_| "Unknown".to_string()); + let mut device_map: std::collections::HashMap = + std::collections::HashMap::new(); + + for i in 0..device_count { + match nvml.device_by_index(i as u32) { + Ok(device) => { + let name = device.name().unwrap_or_else(|_| "Unknown".to_string()); + let memory = device.memory_info().map(|m| m.total).unwrap_or(0); + let driver_version = nvml + .sys_driver_version() + .unwrap_or_else(|_| "Unknown".to_string()); - GpuDevice::Available { - name, - memory, - driver_version, - device_count, + if let Some(existing_device) = device_map.get_mut(&name) { + existing_device.count += 1; + existing_device.indices.push(i as u32); + } else { + device_map.insert( + name.clone(), + GpuDevice { + name, + memory, + driver_version, + count: 1, + indices: vec![i as u32], + }, + ); + } + } + Err(e) => { + Console::user_error(&format!("Failed to get device {}: {}", i, e)); } } - Err(e) => GpuDevice::NotAvailable(format!("Failed to get device: {}", e)), } + + device_map.into_values().collect() } diff --git a/worker/src/checks/hardware/hardware_check.rs b/worker/src/checks/hardware/hardware_check.rs index 3d35d35b..dac3f807 100644 --- a/worker/src/checks/hardware/hardware_check.rs +++ b/worker/src/checks/hardware/hardware_check.rs @@ -141,11 +141,16 @@ impl HardwareChecker { } fn collect_gpu_specs(&self) -> Result, Box> { - Ok(detect_gpu().map(|gpu| GpuSpecs { - count: Some(gpu.count.unwrap_or(0)), - model: gpu.model, - memory_mb: gpu.memory_mb, - })) + let gpu_specs = detect_gpu(); + if gpu_specs.is_empty() { + return Ok(None); + } + + let main_gpu = gpu_specs + .into_iter() + .max_by_key(|gpu| gpu.count.unwrap_or(0)); + + Ok(main_gpu) } fn collect_memory_specs(&self) -> Result<(u32, u32), Box> { diff --git a/worker/src/cli/command.rs b/worker/src/cli/command.rs index a1e56283..f4af1111 100644 --- a/worker/src/cli/command.rs +++ b/worker/src/cli/command.rs @@ -347,14 +347,6 @@ pub async fn execute_command( } } - let has_gpu = match node_config.compute_specs { - Some(ref specs) => specs.gpu.is_some(), - None => { - Console::warning("Compute specs are not available, assuming no GPU."); - false - } - }; - let metrics_store = Arc::new(MetricsStore::new()); let heartbeat_metrics_clone = metrics_store.clone(); let bridge_contracts = contracts.clone(); @@ -380,9 +372,13 @@ pub async fn execute_command( .as_ref() .map(|specs| specs.ram_mb.unwrap_or(0)); + let gpu = node_config + .compute_specs + .clone() + .and_then(|specs| specs.gpu.clone()); let docker_service = Arc::new(DockerService::new( cancellation_token.clone(), - has_gpu, + gpu, system_memory, task_bridge.socket_path.clone(), docker_storage_path, diff --git a/worker/src/docker/docker_manager.rs b/worker/src/docker/docker_manager.rs index 96469d43..d5aefe51 100644 --- a/worker/src/docker/docker_manager.rs +++ b/worker/src/docker/docker_manager.rs @@ -11,6 +11,7 @@ use bollard::volume::CreateVolumeOptions; use bollard::Docker; use futures_util::StreamExt; use log::{debug, error, info}; +use shared::models::node::GpuSpecs; use std::collections::HashMap; use std::time::Duration; use strip_ansi_escapes::strip; @@ -104,7 +105,7 @@ impl DockerManager { name: &str, env_vars: Option>, command: Option>, - gpu_enabled: bool, + gpu: Option, // Simple Vec of (host_path, container_path, read_only) volumes: Option>, shm_size: Option, @@ -189,13 +190,25 @@ impl DockerManager { Some(binds) }; - let host_config = if gpu_enabled { + let host_config = if gpu.is_some() { + let gpu = gpu.unwrap(); + let device_ids = match &gpu.indices { + Some(indices) if !indices.is_empty() => { + // Use specific GPU indices if available + indices.iter().map(|i| i.to_string()).collect() + } + _ => { + // Request all available GPUs if no specific indices + vec!["all".to_string()] + } + }; + Some(HostConfig { extra_hosts: Some(vec!["host.docker.internal:host-gateway".into()]), device_requests: Some(vec![DeviceRequest { - driver: Some("".into()), - count: Some(-1), - device_ids: None, + driver: Some("nvidia".into()), + count: None, + device_ids: Some(device_ids), capabilities: Some(vec![vec!["gpu".into()]]), options: Some(HashMap::new()), }]), diff --git a/worker/src/docker/service.rs b/worker/src/docker/service.rs index 69f349de..122941d9 100644 --- a/worker/src/docker/service.rs +++ b/worker/src/docker/service.rs @@ -4,6 +4,7 @@ use super::DockerState; use crate::console::Console; use bollard::models::ContainerStateStatusEnum; use chrono::{DateTime, Utc}; +use shared::models::node::GpuSpecs; use shared::models::task::Task; use shared::models::task::TaskState; use std::collections::HashMap; @@ -17,7 +18,7 @@ pub struct DockerService { docker_manager: Arc, cancellation_token: CancellationToken, pub state: Arc, - has_gpu: bool, + gpu: Option, system_memory_mb: Option, task_bridge_socket_path: String, node_address: String, @@ -28,7 +29,7 @@ const TASK_PREFIX: &str = "prime-task"; impl DockerService { pub fn new( cancellation_token: CancellationToken, - has_gpu: bool, + gpu: Option, system_memory_mb: Option, task_bridge_socket_path: String, storage_path: Option, @@ -39,7 +40,7 @@ impl DockerService { docker_manager, cancellation_token, state: Arc::new(DockerState::new()), - has_gpu, + gpu, system_memory_mb, task_bridge_socket_path, node_address, @@ -151,7 +152,7 @@ impl DockerService { Console::info("DockerService", "Starting new container ..."); let manager_clone = manager_clone.clone(); let state_clone = task_state_clone.clone(); - let has_gpu = self.has_gpu; + let gpu = self.gpu.clone(); let system_memory_mb = self.system_memory_mb; let task_bridge_socket_path = self.task_bridge_socket_path.clone(); let node_address = self.node_address.clone(); @@ -195,7 +196,7 @@ impl DockerService { 67108864 // Default to 64MB in bytes } }; - match manager_clone.start_container(&payload.image, &container_task_id, Some(env_vars), Some(cmd), has_gpu, Some(volumes), Some(shm_size)).await { + match manager_clone.start_container(&payload.image, &container_task_id, Some(env_vars), Some(cmd), gpu, Some(volumes), Some(shm_size)).await { Ok(container_id) => { Console::info("DockerService", &format!("Container started with id: {}", container_id)); }, @@ -320,9 +321,9 @@ mod tests { let cancellation_token = CancellationToken::new(); let docker_service = DockerService::new( cancellation_token.clone(), - false, + None, Some(1024), - "/tmp/com.prime.worker/metrics.sock".to_string(), + "/tmp/com.prime.miner/metrics.sock".to_string(), None, Address::ZERO.to_string(), ); @@ -365,9 +366,9 @@ mod tests { let cancellation_token = CancellationToken::new(); let docker_service = DockerService::new( cancellation_token.clone(), - false, + None, Some(1024), - "/tmp/com.prime.worker/metrics.sock".to_string(), + "/tmp/com.prime.miner/metrics.sock".to_string(), None, Address::ZERO.to_string(), ); diff --git a/worker/src/operations/provider.rs b/worker/src/operations/provider.rs index 20e6432e..dfc22957 100644 --- a/worker/src/operations/provider.rs +++ b/worker/src/operations/provider.rs @@ -262,6 +262,57 @@ impl ProviderOperations { Console::info("Registration tx", &format!("{:?}", register_tx)); } + // Get provider details again - cleanup later + Console::progress("Getting provider details"); + let _ = self + .contracts + .compute_registry + .get_provider(address) + .await + .map_err(|_| ProviderError::Other)?; + + let provider_exists = self.check_provider_exists().await?; + + if !provider_exists { + Console::info( + "AI Token Balance", + &format!("{} tokens", balance / U256::from(10u128.pow(18))), + ); + Console::info( + "ETH Balance", + &format!("{:.6} ETH", { f64::from(eth_balance) / 10f64.powf(18.0) }), + ); + if balance < stake { + Console::user_error(&format!( + "Insufficient AI Token balance for stake: {} tokens", + stake / U256::from(10u128.pow(18)) + )); + return Err(ProviderError::InsufficientBalance); + } + if !self.prompt_user_confirmation(&format!( + "Do you want to approve staking {} tokens?", + stake / U256::from(10u128.pow(18)) + )) { + Console::info("Operation cancelled by user", "Staking approval declined"); + return Err(ProviderError::UserCancelled); + } + + Console::progress("Approving AI Token for Stake transaction"); + self.contracts + .ai_token + .approve(stake) + .await + .map_err(|_| ProviderError::Other)?; + Console::progress("Registering Provider"); + let register_tx = match self.contracts.prime_network.register_provider(stake).await { + Ok(tx) => tx, + Err(_) => { + return Err(ProviderError::Other); + } + }; + Console::info("Registration tx", &format!("{:?}", register_tx)); + } + // Get provider details again - cleanup later Console::progress("Getting provider details"); let provider = self