Skip to content

Commit

Permalink
fix(core-manager)!: use hex format for CUIDs [fixes NET-804] (#2188)
Browse files Browse the repository at this point in the history
  • Loading branch information
gurinderu committed Mar 20, 2024
1 parent e061413 commit 415f917
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 57 deletions.
15 changes: 9 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/core-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ tracing.workspace = true
tokio-stream.workspace = true
futures.workspace = true
rand = "0.8.5"
hex.workspace = true
serde_with = { workspace = true }
hex-utils = { workspace = true, features = ["serde_with"] }


[dev-dependencies]
Expand Down
8 changes: 8 additions & 0 deletions crates/core-manager/src/core_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,12 @@ mod tests {
let core_range_1: CoreRange = "0-2,5,7-9".parse().unwrap();
assert_eq!(format!("{}", core_range_1), "0-2,5,7-9");
}

#[test]
fn range_is_inclusive() {
let core_range_1: CoreRange = "1-3".parse().unwrap();
let actual: Vec<usize> = core_range_1.0.iter().collect();
let expected = vec![1, 2, 3];
assert_eq!(actual, expected)
}
}
33 changes: 10 additions & 23 deletions crates/core-manager/src/dev.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::collections::{BTreeSet, HashMap, VecDeque};
use std::fs::File;
use std::hash::BuildHasherDefault;
use std::io::Write;
use std::ops::Deref;
use std::path::PathBuf;

Expand All @@ -10,11 +8,12 @@ use cpu_utils::CPUTopology;
use fxhash::{FxBuildHasher, FxHasher};
use parking_lot::RwLock;
use range_set_blaze::RangeSetBlaze;
use serde::{Deserialize, Serialize};

use crate::errors::{AcquireError, CreateError, LoadingError, PersistError};
use crate::errors::{AcquireError, CreateError, CurrentAssignment, LoadingError, PersistError};
use crate::manager::CoreManagerFunctions;
use crate::persistence::{PersistenceTask, PersistentCoreManagerFunctions};
use crate::persistence::{
PersistenceTask, PersistentCoreManagerFunctions, PersistentCoreManagerState,
};
use crate::types::{AcquireRequest, Assignment, WorkType};
use crate::CoreRange;

Expand Down Expand Up @@ -196,15 +195,6 @@ struct CoreManagerState {
work_type_mapping: Map<CUID, WorkType>,
}

#[derive(Serialize, Deserialize)]
struct PersistentCoreManagerState {
cores_mapping: Vec<(PhysicalCoreId, LogicalCoreId)>,
system_cores: Vec<PhysicalCoreId>,
available_cores: Vec<PhysicalCoreId>,
unit_id_mapping: Vec<(PhysicalCoreId, CUID)>,
work_type_mapping: Vec<(CUID, WorkType)>,
}

impl From<&CoreManagerState> for PersistentCoreManagerState {
fn from(value: &CoreManagerState) -> Self {
Self {
Expand All @@ -214,12 +204,12 @@ impl From<&CoreManagerState> for PersistentCoreManagerState {
unit_id_mapping: value
.core_unit_id_mapping
.iter()
.map(|(k, v)| (*k, *v))
.map(|(k, v)| (*k, (*v)))
.collect(),
work_type_mapping: value
.work_type_mapping
.iter()
.map(|(k, v)| (*k, v.clone()))
.map(|(k, v)| ((*k), v.clone()))
.collect(),
}
}
Expand Down Expand Up @@ -261,7 +251,9 @@ impl CoreManagerFunctions for DevCoreManager {
.iter()
.map(|(k, v)| (*k, *v))
.collect();
AcquireError::NotFoundAvailableCores { current_assignment }
AcquireError::NotFoundAvailableCores {
current_assignment: CurrentAssignment::new(current_assignment),
}
})?;
lock.core_unit_id_mapping.insert(core_id, unit_id);
lock.unit_id_core_mapping.insert(unit_id, core_id);
Expand Down Expand Up @@ -342,12 +334,7 @@ impl PersistentCoreManagerFunctions for DevCoreManager {
let inner_state = lock.deref();
let persistent_state: PersistentCoreManagerState = inner_state.into();
drop(lock);
let toml = toml::to_string_pretty(&persistent_state)
.map_err(|err| PersistError::SerializationError { err })?;
let mut file =
File::create(self.file_path.clone()).map_err(|err| PersistError::IoError { err })?;
file.write(toml.as_bytes())
.map_err(|err| PersistError::IoError { err })?;
persistent_state.persist(self.file_path.as_path())?;
Ok(())
}
}
Expand Down
37 changes: 33 additions & 4 deletions crates/core-manager/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use ccp_shared::types::CUID;
use cpu_utils::{CPUTopologyError, PhysicalCoreId};
use std::fmt::{Display, Formatter, Write};
use std::str::Utf8Error;
use thiserror::Error;

Expand Down Expand Up @@ -58,12 +59,40 @@ pub enum PersistError {
},
}

#[derive(Debug)]
pub struct CurrentAssignment {
data: Vec<(PhysicalCoreId, CUID)>,
}

impl CurrentAssignment {
pub fn new(data: Vec<(PhysicalCoreId, CUID)>) -> Self {
Self { data }
}
}

impl Display for CurrentAssignment {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_char('[')?;
for (core, cuid) in &self.data[0..self.data.len() - 1] {
f.write_str(core.to_string().as_str())?;
f.write_str(" -> ")?;
f.write_str(format!("{}", cuid).as_str())?;
f.write_str(", ")?;
}
let (core, cuid) = &self.data[self.data.len() - 1];
f.write_str(core.to_string().as_str())?;
f.write_str(" -> ")?;
f.write_str(format!("{}", cuid).as_str())?;

f.write_char(']')?;
Ok(())
}
}

#[derive(Debug, Error)]
pub enum AcquireError {
#[error(
"Couldn't assign core: no free cores left. Current assignment: {current_assignment:?}"
)]
#[error("Couldn't assign core: no free cores left. Current assignment: {current_assignment}")]
NotFoundAvailableCores {
current_assignment: Vec<(PhysicalCoreId, CUID)>,
current_assignment: CurrentAssignment,
},
}
69 changes: 69 additions & 0 deletions crates/core-manager/src/persistence.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
use std::fs::File;
use std::io::Write;
use std::path::Path;
use std::sync::Arc;

use ccp_shared::types::{LogicalCoreId, PhysicalCoreId, CUID};
use futures::StreamExt;
use hex_utils::serde_as::Hex;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use tokio::sync::mpsc::Receiver;
use tokio_stream::wrappers::ReceiverStream;

use crate::errors::PersistError;
use crate::types::WorkType;
use crate::CoreManager;

pub trait PersistentCoreManagerFunctions {
Expand Down Expand Up @@ -60,3 +68,64 @@ impl PersistenceTask {
.expect("Could not spawn persist task");
}
}

#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct PersistentCoreManagerState {
pub cores_mapping: Vec<(PhysicalCoreId, LogicalCoreId)>,
pub system_cores: Vec<PhysicalCoreId>,
pub available_cores: Vec<PhysicalCoreId>,
#[serde_as(as = "Vec<(_, Hex)>")]
pub unit_id_mapping: Vec<(PhysicalCoreId, CUID)>,
#[serde_as(as = "Vec<(Hex, _)>")]
pub work_type_mapping: Vec<(CUID, WorkType)>,
}

impl PersistentCoreManagerState {
pub fn persist(&self, file_path: &Path) -> Result<(), PersistError> {
let toml = toml::to_string_pretty(&self)
.map_err(|err| PersistError::SerializationError { err })?;
let mut file = File::create(file_path).map_err(|err| PersistError::IoError { err })?;
file.write(toml.as_bytes())
.map_err(|err| PersistError::IoError { err })?;
Ok(())
}
}

#[cfg(test)]
mod tests {
use crate::persistence::PersistentCoreManagerState;
use crate::types::WorkType;
use ccp_shared::types::{LogicalCoreId, PhysicalCoreId, CUID};
use hex::FromHex;

#[test]
fn test_serde() {
let init_id_1 =
<CUID>::from_hex("54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea")
.unwrap();
let persistent_state = PersistentCoreManagerState {
cores_mapping: vec![
(PhysicalCoreId::new(1), LogicalCoreId::new(1)),
(PhysicalCoreId::new(1), LogicalCoreId::new(2)),
(PhysicalCoreId::new(2), LogicalCoreId::new(3)),
(PhysicalCoreId::new(2), LogicalCoreId::new(4)),
(PhysicalCoreId::new(3), LogicalCoreId::new(5)),
(PhysicalCoreId::new(3), LogicalCoreId::new(6)),
(PhysicalCoreId::new(4), LogicalCoreId::new(7)),
(PhysicalCoreId::new(4), LogicalCoreId::new(8)),
],
system_cores: vec![PhysicalCoreId::new(1)],
available_cores: vec![PhysicalCoreId::new(2), PhysicalCoreId::new(3)],
unit_id_mapping: vec![(PhysicalCoreId::new(4), init_id_1)],
work_type_mapping: vec![(init_id_1, WorkType::Deal)],
};
let actual = toml::to_string(&persistent_state).unwrap();
let expected = "cores_mapping = [[1, 1], [1, 2], [2, 3], [2, 4], [3, 5], [3, 6], [4, 7], [4, 8]]\n\
system_cores = [1]\n\
available_cores = [2, 3]\n\
unit_id_mapping = [[4, \"54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea\"]]\n\
work_type_mapping = [[\"54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea\", \"Deal\"]]\n";
assert_eq!(expected, actual)
}
}
Loading

0 comments on commit 415f917

Please sign in to comment.