Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cc): Continue CCP calculation if cpu_cores < cu_number [fixes NET-808] #2207

Merged
merged 16 commits into from
Apr 10, 2024
215 changes: 139 additions & 76 deletions crates/chain-listener/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ use chain_connector::{
ConnectorError, Deal, PEER_NOT_EXISTS,
};
use chain_data::{parse_log, peer_id_to_hex, Log};
use core_manager::types::{AcquireRequest, WorkType};
use core_manager::errors::AcquireError;
use core_manager::types::{AcquireRequest, Assignment, WorkType};
use core_manager::{CoreManager, CoreManagerFunctions, CUID};
use peer_metrics::ChainListenerMetrics;
use server_config::{ChainConfig, ChainListenerConfig};
Expand Down Expand Up @@ -813,46 +814,31 @@ impl ChainListener {
Ok(())
}

/// Return already started units involved in CC and not having less than MIN_PROOFS_PER_EPOCH proofs in the current epoch
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
fn get_priority_units(&self) -> Vec<CUID> {
self.cc_compute_units
.iter()
.filter(|(_, cu)| cu.startEpoch <= self.current_epoch) // CU is already started
.filter(|(cuid, _)| {
self.proof_counter
.get(cuid)
.map(|count| *count < self.min_proofs_per_epoch)
.unwrap_or(true)
})
.map(|(cuid, _)| *cuid)
.collect()
}

/// Return already started units involved in CC and found at least MIN_PROOFS_PER_EPOCH proofs,
/// but less that MAX_PROOFS_PER_EPOCH proofs in the current epoch
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
fn get_non_priority_units(&self) -> Vec<CUID> {
self.cc_compute_units
.iter()
.filter(|(_, cu)| cu.startEpoch <= self.current_epoch) // CU is already started
.filter(|(cuid, _)| {
self.proof_counter
.get(cuid)
.map(|count| {
*count >= self.min_proofs_per_epoch && *count < self.max_proofs_per_epoch
})
.unwrap_or(true)
})
.map(|(cuid, _)| *cuid)
.collect()
}

/// Return units in CC that is not active yet and can't produce proofs in the current epoch
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
fn get_pending_units(&self) -> Vec<CUID> {
self.cc_compute_units
.values()
.filter(|cu| cu.startEpoch > self.current_epoch) // CU hasn't yet started
.map(|cu| CUID::new(cu.id.0))
.collect()
fn get_cu_groups(&self) -> CUGroups {
let mut priority_units: Vec<CUID> = Vec::new();
let mut non_priority_units: Vec<CUID> = Vec::new();
let mut pending_units: Vec<CUID> = Vec::new();
let mut finished_units: Vec<CUID> = Vec::new();
for (cuid, cu) in &self.cc_compute_units {
if cu.startEpoch <= self.current_epoch {
let count = self.proof_counter.get(cuid).unwrap_or(&U256::ZERO);
if count < &self.min_proofs_per_epoch {
priority_units.push(*cuid)
} else if *count >= self.max_proofs_per_epoch {
finished_units.push(*cuid)
} else {
non_priority_units.push(*cuid)
}
} else {
pending_units.push(*cuid);
}
}
CUGroups {
priority_units,
non_priority_units,
pending_units,
finished_units,
}
}

/// Send GlobalNonce, Difficulty and Core<>CUID mapping (full commitment info) to CCP
Expand All @@ -878,44 +864,54 @@ impl ChainListener {
None => return Ok(()),
};

let priority_units = self.get_priority_units();
let non_priority_units = self.get_non_priority_units();
let pending_units = self.get_pending_units();
let group_result = self.get_cu_groups();
gurinderu marked this conversation as resolved.
Show resolved Hide resolved

let cc_cores = self.acquire_cores_for_cc(&group_result)?;

let mut cu_allocation = HashMap::new();
let priority_cores = self.acquire_cores_for_cc(&priority_units)?;
let non_priority_cores = self.acquire_cores_for_cc(&non_priority_units)?;
let pending_cores = self.acquire_cores_for_cc(&pending_units)?;
let mut cu_allocation: HashMap<PhysicalCoreId, CUID> = HashMap::new();

if all_min_proofs_found(&priority_units) {
if group_result.all_min_proofs_found() {
tracing::info!(target: "chain-listener", "All CUs found minimal number of proofs {} in current epoch {}", self.min_proofs_per_epoch, self.current_epoch);
if all_max_proofs_found(&non_priority_units) {
if group_result.all_max_proofs_found() {
tracing::info!(target: "chain-listener", "All CUs found max number of proofs {} in current epoch {}", self.max_proofs_per_epoch ,self.current_epoch);
self.stop_commitment().await?;
return Ok(());
} else {
// All CUs were proven, now let's work on submitting proofs for every CU until MAX_PROOF_COUNT is reached
cu_allocation.extend(non_priority_cores.iter().zip(non_priority_units.iter()));

let mut units = non_priority_units.iter().cycle();
cu_allocation.extend(
cc_cores
.non_priority_cores
.iter()
.cloned()
.zip(group_result.non_priority_units.iter().cloned()),
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
);

let mut units = group_result.non_priority_units.iter().cycle();
// Assign "pending cores" to help generate proofs for "non priority units"
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
pending_cores.iter().for_each(|core| {
cc_cores.pending_cores.iter().for_each(|core| {
if let Some(unit) = units.next() {
cu_allocation.insert(*core, *unit);
}
});
}
} else {
// Use assigned cores to calculate proofs for CUs who haven't reached MIN_PROOF_COUNT yet
cu_allocation.extend(priority_cores.iter().zip(priority_units.iter()));
cu_allocation.extend(
cc_cores
.priority_cores
.iter()
.zip(group_result.priority_units.iter()),
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
);

// Use all spare cores to help CUs to reach MIN_PROOF_COUNT
let spare_cores: BTreeSet<_> = non_priority_cores
let spare_cores: BTreeSet<_> = cc_cores
.non_priority_cores
.into_iter()
.chain(pending_cores.into_iter())
.chain(cc_cores.pending_cores.into_iter())
.chain(cc_cores.finished_cores.into_iter())
.collect();

let mut units = priority_units.iter().cycle();
let mut units = group_result.priority_units.iter().cycle();
spare_cores.iter().for_each(|core| {
if let Some(unit) = units.next() {
cu_allocation.insert(*core, *unit);
Expand Down Expand Up @@ -949,19 +945,71 @@ impl ChainListener {
Ok(())
}

fn acquire_cores_for_cc(&self, units: &[CUID]) -> eyre::Result<BTreeSet<PhysicalCoreId>> {
let cores = self
.core_manager
.acquire_worker_core(AcquireRequest::new(
units.to_vec(),
WorkType::CapacityCommitment,
))
.map_err(|err| {
tracing::error!(target: "chain-listener", "Failed to acquire cores for active units: {err}");
eyre::eyre!("Failed to acquire cores for active units: {err}")
})?;
fn acquire_cores_for_cc(
&self,
cu_group_result: &CUGroups,
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
) -> eyre::Result<PhysicalCoreGroupResult> {
let mut units = cu_group_result.priority_units.clone();
units.extend(cu_group_result.non_priority_units.clone());
units.extend(cu_group_result.pending_units.clone());
units.extend(cu_group_result.finished_units.clone());
gurinderu marked this conversation as resolved.
Show resolved Hide resolved

let cores = self.core_manager.acquire_worker_core(AcquireRequest::new(
units.to_vec(),
WorkType::CapacityCommitment,
));

fn filter(units: &[CUID], assignment: &Assignment) -> Vec<PhysicalCoreId> {
units
.iter()
.filter_map(|cuid| {
assignment
.cuid_core_data
.get(cuid)
.map(|data| data.physical_core_id)
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
})
.collect()
}

Ok(cores.physical_core_ids)
match cores {
Ok(assignment) => {
let priority_units = filter(&cu_group_result.priority_units, &assignment);
let non_priority_units = filter(&cu_group_result.non_priority_units, &assignment);
let pending_units = filter(&cu_group_result.pending_units, &assignment);
let finished_units = filter(&cu_group_result.finished_units, &assignment);

Ok(PhysicalCoreGroupResult {
priority_cores: priority_units,
non_priority_cores: non_priority_units,
pending_cores: pending_units,
finished_cores: finished_units,
})
}
Err(AcquireError::NotFoundAvailableCores {
required,
available,
..
}) => {
tracing::warn!("Found {required} CUs in the Capacity Commitment, but Nox has only {available} Cores available for CC");
let assign_units = units.iter().take(available).cloned().collect();
self.core_manager.release(units);
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
let assignment = self.core_manager.acquire_worker_core(AcquireRequest::new(
assign_units,
WorkType::CapacityCommitment,
))?;
let priority_units = filter(&cu_group_result.priority_units, &assignment);
let non_priority_units = filter(&cu_group_result.non_priority_units, &assignment);
let pending_units = filter(&cu_group_result.pending_units, &assignment);
let finished_units = filter(&cu_group_result.finished_units, &assignment);

Ok(PhysicalCoreGroupResult {
priority_cores: priority_units,
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
non_priority_cores: non_priority_units,
pending_cores: pending_units,
finished_cores: finished_units,
})
}
}
}

fn acquire_core_for_deal(&self, unit_id: CUID) -> eyre::Result<()> {
Expand Down Expand Up @@ -1036,7 +1084,7 @@ impl ChainListener {
.filter(|p| p.id.global_nonce == self.global_nonce)
.collect();

if proofs.len() > 0 {
if !proofs.is_empty() {
tracing::info!(target: "chain-listener", "Found {} proofs from polling", proofs.len());
}

Expand Down Expand Up @@ -1258,7 +1306,7 @@ impl ChainListener {
}

if stats_updated {
tracing::info!(target: "chain-listener", "Confirmed proofs count: {:?}", self.proof_counter.iter().map(|(cu, count)| format!("{}: {}", cu, count.to_string())).collect::<Vec<_>>());
tracing::info!(target: "chain-listener", "Confirmed proofs count: {:?}", self.proof_counter.iter().map(|(cu, count)| format!("{}: {}", cu, count)).collect::<Vec<_>>());
}

Ok(())
Expand All @@ -1274,12 +1322,27 @@ impl ChainListener {
}
}

fn all_min_proofs_found(priority_units: &[CUID]) -> bool {
priority_units.is_empty()
struct CUGroups {
pub priority_units: Vec<CUID>,
pub non_priority_units: Vec<CUID>,
pub pending_units: Vec<CUID>,
pub finished_units: Vec<CUID>,
}

fn all_max_proofs_found(non_priority_units: &[CUID]) -> bool {
non_priority_units.is_empty()
impl CUGroups {
fn all_min_proofs_found(&self) -> bool {
self.priority_units.is_empty()
}

fn all_max_proofs_found(&self) -> bool {
self.non_priority_units.is_empty()
}
}
struct PhysicalCoreGroupResult {
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
pub priority_cores: Vec<PhysicalCoreId>,
pub non_priority_cores: Vec<PhysicalCoreId>,
pub pending_cores: Vec<PhysicalCoreId>,
pub finished_cores: Vec<PhysicalCoreId>,
}

// measure the request execution time and store it in the metrics
Expand Down
46 changes: 25 additions & 21 deletions crates/core-manager/src/dev.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
use std::collections::{BTreeSet, HashMap, VecDeque};
use std::hash::BuildHasherDefault;
use std::ops::Deref;
use std::path::PathBuf;

use ccp_shared::types::{LogicalCoreId, PhysicalCoreId, CUID};
use cpu_utils::CPUTopology;
use fxhash::{FxBuildHasher, FxHasher};
use fxhash::FxBuildHasher;
use parking_lot::RwLock;
use range_set_blaze::RangeSetBlaze;

use crate::errors::{AcquireError, CreateError, CurrentAssignment, LoadingError, PersistError};
use crate::errors::{AcquireError, CreateError, LoadingError, PersistError};
use crate::manager::CoreManagerFunctions;
use crate::persistence::{
PersistenceTask, PersistentCoreManagerFunctions, PersistentCoreManagerState,
};
use crate::types::{AcquireRequest, Assignment, WorkType};
use crate::CoreRange;

type Map<K, V> = HashMap<K, V, BuildHasherDefault<FxHasher>>;
pub(crate) type MultiMap<K, V> = multimap::MultiMap<K, V, BuildHasherDefault<FxHasher>>;
use crate::types::{AcquireRequest, Assignment, CoreData, WorkType};
use crate::{CoreRange, Map, MultiMap};

/// `DevCoreManager` is a CPU core manager that provides a more flexible approach to
/// core allocation compared to `StrictCoreManager`.
Expand Down Expand Up @@ -240,21 +236,19 @@ impl CoreManagerFunctions for DevCoreManager {
let mut lock = self.state.write();
let mut result_physical_core_ids = BTreeSet::new();
let mut result_logical_core_ids = BTreeSet::new();
let mut cuid_core_data: Map<CUID, CoreData> = HashMap::with_capacity_and_hasher(
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
assign_request.unit_ids.len(),
FxBuildHasher::default(),
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
);
let worker_unit_type = assign_request.worker_type;
for unit_id in assign_request.unit_ids {
let physical_core_id = lock.unit_id_core_mapping.get(&unit_id).cloned();
let physical_core_id = match physical_core_id {
None => {
let core_id = lock.available_cores.pop_front().ok_or({
let current_assignment: Vec<(PhysicalCoreId, CUID)> = lock
.core_unit_id_mapping
.iter()
.map(|(k, v)| (*k, *v))
.collect();
AcquireError::NotFoundAvailableCores {
current_assignment: CurrentAssignment::new(current_assignment),
}
})?;
let core_id = lock
.available_cores
.pop_front()
.expect("Unexpected state. Should not be empty never");
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
lock.core_unit_id_mapping.insert(core_id, unit_id);
lock.unit_id_core_mapping.insert(unit_id, core_id);
lock.work_type_mapping
Expand All @@ -270,15 +264,23 @@ impl CoreManagerFunctions for DevCoreManager {
};
result_physical_core_ids.insert(physical_core_id);

let physical_core_ids = lock
let logical_core_ids = lock
.cores_mapping
.get_vec(&physical_core_id)
.cloned()
.expect("Unexpected state. Should not be empty never");

for physical_core_id in physical_core_ids {
result_logical_core_ids.insert(physical_core_id);
for logical_core in logical_core_ids.iter() {
result_logical_core_ids.insert(*logical_core);
}

cuid_core_data.insert(
unit_id,
CoreData {
physical_core_id,
logical_core_ids,
},
);
}

// We are trying to notify a persistence task that the state has been changed.
Expand All @@ -288,6 +290,7 @@ impl CoreManagerFunctions for DevCoreManager {
Ok(Assignment {
physical_core_ids: result_physical_core_ids,
logical_core_ids: result_logical_core_ids,
cuid_core_data,
})
}

Expand Down Expand Up @@ -324,6 +327,7 @@ impl CoreManagerFunctions for DevCoreManager {
Assignment {
physical_core_ids: lock.system_cores.clone(),
logical_core_ids,
cuid_core_data: Map::with_hasher(FxBuildHasher::default()),
}
}
}
Expand Down
Loading
Loading