Skip to content

Commit

Permalink
fix(core-manager): reassignment (#2252)
Browse files Browse the repository at this point in the history
* fix(core-manager): reassignment

* Update crates/core-manager/src/strict.rs

Co-authored-by: Mike Voronov <michail.vms@gmail.com>

* Update crates/core-manager/src/strict.rs

Co-authored-by: Mike Voronov <michail.vms@gmail.com>

---------

Co-authored-by: Aleksey Proshutinskiy <justprosh@users.noreply.github.com>
Co-authored-by: Mike Voronov <michail.vms@gmail.com>
  • Loading branch information
3 people committed May 24, 2024
1 parent 895c0be commit d903f2a
Showing 1 changed file with 117 additions and 4 deletions.
121 changes: 117 additions & 4 deletions crates/core-manager/src/strict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,23 @@ impl CoreManagerFunctions for StrictCoreManager {

let mut result_physical_core_ids = BTreeSet::new();
let mut result_logical_core_ids = BTreeSet::new();
let worker_unit_type = assign_request.worker_type;

let worker_unit_type = assign_request.worker_type;
let available = lock.available_cores.len();
let required = assign_request.unit_ids.len();

let core_usage = assign_request
.unit_ids
.into_iter()
.map(|unit_id| {
(
unit_id,
lock.unit_id_mapping.get_by_right(&unit_id).cloned(),
)
})
.collect::<Vec<_>>();

let required = core_usage.iter().filter(|(_, core)| core.is_none()).count();

if required > available {
let current_assignment: Vec<(PhysicalCoreId, CUID)> =
lock.unit_id_mapping.iter().map(|(k, v)| (*k, *v)).collect();
Expand All @@ -249,8 +262,7 @@ impl CoreManagerFunctions for StrictCoreManager {
});
}

for unit_id in assign_request.unit_ids {
let physical_core_id = lock.unit_id_mapping.get_by_right(&unit_id).cloned();
for (unit_id, physical_core_id) in core_usage {
let physical_core_id = match physical_core_id {
None => {
// SAFETY: this should never happen because we already checked the availability of cores
Expand Down Expand Up @@ -351,9 +363,11 @@ impl PersistentCoreManagerFunctions for StrictCoreManager {
mod tests {
use ccp_shared::types::{LogicalCoreId, PhysicalCoreId, CUID};
use hex::FromHex;
use rand::Rng;
use std::collections::BTreeSet;
use std::str::FromStr;

use crate::errors::AcquireError;
use crate::manager::CoreManagerFunctions;
use crate::persistence::PersistentCoreManagerState;
use crate::strict::StrictCoreManager;
Expand Down Expand Up @@ -558,4 +572,103 @@ mod tests {
);
}
}

#[test]
fn test_reassignment() {
if cores_exists() {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let system_cpu_count = 1;
let (manager, _task) = StrictCoreManager::from_path(
temp_dir.path().join("test.toml"),
system_cpu_count,
CoreRange::default(),
)
.unwrap();

let unit_ids_count = num_cpus::get_physical() - system_cpu_count;
let unit_ids: Vec<CUID> = (0..unit_ids_count)
.map(|_| {
let mut rng = rand::thread_rng();
let bytes: [u8; 32] = rng.gen();
CUID::new(bytes)
})
.collect();

let assignment = manager
.acquire_worker_core(AcquireRequest {
unit_ids: unit_ids.clone(),
worker_type: WorkType::CapacityCommitment,
})
.unwrap();
assert_eq!(assignment.physical_core_ids.len(), unit_ids_count);
assert_eq!(assignment.cuid_cores.len(), unit_ids_count);

let assignment = manager
.acquire_worker_core(AcquireRequest {
unit_ids: unit_ids.clone(),
worker_type: WorkType::Deal,
})
.unwrap();
assert_eq!(assignment.physical_core_ids.len(), unit_ids_count);
assert_eq!(assignment.cuid_cores.len(), unit_ids_count);
}
}

#[test]
fn test_workload_assign_error() {
if cores_exists() {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let system_cpu_count = 1;
let (manager, _task) = StrictCoreManager::from_path(
temp_dir.path().join("test.toml"),
system_cpu_count,
CoreRange::default(),
)
.unwrap();

let unit_ids_count = num_cpus::get_physical() - system_cpu_count;
let unit_ids: Vec<CUID> = (0..unit_ids_count)
.map(|_| {
let mut rng = rand::thread_rng();
let bytes: [u8; 32] = rng.gen();
CUID::new(bytes)
})
.collect();

let assignment = manager
.acquire_worker_core(AcquireRequest {
unit_ids: unit_ids.clone(),
worker_type: WorkType::CapacityCommitment,
})
.unwrap();
assert_eq!(assignment.physical_core_ids.len(), unit_ids_count);
assert_eq!(assignment.cuid_cores.len(), unit_ids_count);
let unit_ids: Vec<CUID> = (0..unit_ids_count)
.map(|_| {
let mut rng = rand::thread_rng();
let bytes: [u8; 32] = rng.gen();
CUID::new(bytes)
})
.collect();

let result = manager.acquire_worker_core(AcquireRequest {
unit_ids: unit_ids.clone(),
worker_type: WorkType::Deal,
});

assert!(result.is_err());
if let Err(err) = result {
match err {
AcquireError::NotFoundAvailableCores {
required,
available,
..
} => {
assert_eq!(required, unit_ids_count);
assert_eq!(available, 0);
}
}
}
}
}
}

0 comments on commit d903f2a

Please sign in to comment.