Skip to content

Commit

Permalink
Merge branch 'kpop/CON-1124/unit_tests' into 'master'
Browse files Browse the repository at this point in the history
chore(ic-backup): [CON-1124] split `do_move_cold_storage` into two functions so they can be reused and add some unit tests

 

See merge request dfinity-lab/public/ic!14679
  • Loading branch information
kpop-dfinity committed Sep 8, 2023
2 parents 3a6c72b + 81d5e62 commit 0b83d19
Showing 1 changed file with 238 additions and 22 deletions.
260 changes: 238 additions & 22 deletions rs/backup/src/backup_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,8 @@ impl BackupHelper {
Ok(ReplayResult::UpgradeRequired(upgrade_version)) => {
// replayed the current version, but if there is upgrade try to do it again
self.notification_client.message_slack(format!(
"Replica version upgrade detected (current: {} new: {}): upgrading the ic-replay tool to retry... 🤞",
"Replica version upgrade detected (current: {} new: {}): \
upgrading the ic-replay tool to retry... 🤞",
current_replica_version, upgrade_version
));
current_replica_version = upgrade_version;
Expand Down Expand Up @@ -665,6 +666,39 @@ impl BackupHelper {
}

pub fn do_move_cold_storage(&self) -> Result<(), String> {
let old_space = self.get_disk_stats(DiskStats::Space)? as i32;
let old_inodes = self.get_disk_stats(DiskStats::Inodes)? as i32;

let max_height = self.cold_store_artifacts()?;
self.cold_store_states(max_height)?;

// i32 to calculate negative difference below
let new_space = self.get_disk_stats(DiskStats::Space)? as i32;
let new_inodes = self.get_disk_stats(DiskStats::Inodes)? as i32;

let action_text = if self.do_cold_storage {
"Moved to cold storage"
} else {
"Cleaned up"
};
self.notification_client.message_slack(format!(
"✅ {} artifacts of subnet {:?} and states up to height *{}*, \
saved {}% of space and {}% of inodes.",
action_text,
self.subnet_id,
max_height,
old_space - new_space,
old_inodes - new_inodes
));
info!(
self.log,
"Finished moving old artifacts and states of subnet {:?} to the cold storage",
self.subnet_id
);
Ok(())
}

fn cold_store_artifacts(&self) -> Result<u64, String> {
let guard = self
.artifacts_guard
.lock()
Expand All @@ -674,8 +708,6 @@ impl BackupHelper {
"Start moving old artifacts and states of subnet {:?} to the cold storage",
self.subnet_id
);
let old_space = self.get_disk_stats(DiskStats::Space)? as i32;
let old_inodes = self.get_disk_stats(DiskStats::Inodes)? as i32;
let spool_dirs = collect_only_dirs(&self.spool_dir())?;
let mut dir_heights = BTreeMap::new();
spool_dirs.iter().for_each(|replica_version_dir| {
Expand Down Expand Up @@ -705,7 +737,8 @@ impl BackupHelper {
debug!(self.log, "Will execute: {:?}", cmd);
exec_cmd(&mut cmd).map_err(|err| format!("Error moving artifacts: {:?}", err))?;
}
// we have moved all the artifacts from the spool directory, so don't need the mutex guard anymore
// we have moved all the artifacts from the spool directory,
// so don't need the mutex guard anymore
drop(guard);

if self.do_cold_storage {
Expand Down Expand Up @@ -757,6 +790,10 @@ impl BackupHelper {
);
remove_dir_all(work_dir).map_err(|err| format!("Error deleting leftovers: {:?}", err))?;

Ok(max_height)
}

fn cold_store_states(&self, max_height: u64) -> Result<(), String> {
info!(
self.log,
"Moving states with height up to: {:?} from the archive to the cold storage",
Expand Down Expand Up @@ -784,7 +821,8 @@ impl BackupHelper {
exec_cmd(&mut cmd).map_err(|err| format!("Error copying states: {:?}", err))?;
// skip some of the states if we replay more than one per day
if self.daily_replays > 1 {
// one element is consumed in the next() call above, and one in the nth(), hence the substract 2
// one element is consumed in the next() call above,
// and one in the nth(), hence the substract 2
reversed.nth(self.daily_replays - 2);
}
}
Expand All @@ -807,23 +845,6 @@ impl BackupHelper {

remove_dir_all(trash_dir).map_err(|err| format!("Error deleting trashdir: {:?}", err))?;

let new_space = self.get_disk_stats(DiskStats::Space)? as i32; // i32 to calculate negative difference bellow
let new_inodes = self.get_disk_stats(DiskStats::Inodes)? as i32;

let action_text = if self.do_cold_storage {
"Moved to cold storage"
} else {
"Cleaned up"
};
self.notification_client.message_slack(format!(
"✅ {} artifacts of subnet {:?} and states up to height *{}*, saved {}% of space and {}% of inodes.",
action_text, self.subnet_id, max_height, old_space - new_space, old_inodes - new_inodes
));
debug!(
self.log,
"Finished moving old artifacts and states of subnet {:?} to the cold storage",
self.subnet_id
);
Ok(())
}
}
Expand Down Expand Up @@ -954,3 +975,198 @@ pub fn retrieve_replica_version_last_replayed(

current_replica_version
}

#[cfg(test)]
mod tests {
use std::str::FromStr;

use ic_registry_local_store::LocalStoreImpl;
use ic_test_utilities_tmpdir::tmpdir;
use ic_types::PrincipalId;

use super::*;

const FAKE_SUBNET_ID: &str = "gpvux-2ejnk-3hgmh-cegwf-iekfc-b7rzs-hrvep-5euo2-3ywz3-k3hcb-cqe";

#[test]
fn need_cold_storage_move_test() {
let dir = tmpdir("test_dir");

let backup_helper = fake_backup_helper(dir.as_ref(), /*versions_hot=*/ 2);

create_dir_all(backup_helper.spool_dir().join("replica_version_1")).unwrap();
create_dir_all(backup_helper.spool_dir().join("replica_version_2")).unwrap();
create_dir_all(backup_helper.spool_dir().join("replica_version_3")).unwrap();

let need_cold_storage_move = backup_helper
.need_cold_storage_move()
.expect("should execute successfully");

assert!(need_cold_storage_move);
}

#[test]
fn does_not_need_cold_storage_move_test() {
let dir = tmpdir("test_dir");

let backup_helper = fake_backup_helper(dir.as_ref(), /*versions_hot=*/ 2);

create_dir_all(backup_helper.spool_dir().join("replica_version_1")).unwrap();
create_dir_all(backup_helper.spool_dir().join("replica_version_2")).unwrap();

let need_cold_storage_move = backup_helper
.need_cold_storage_move()
.expect("should execute successfully");

assert!(!need_cold_storage_move);
}

#[test]
fn cold_store_artifacts_test() {
let dir = tmpdir("test_dir");

let backup_helper = fake_backup_helper(dir.as_ref(), /*versions_hot=*/ 2);

create_artifacts_dir_with_heights(
&backup_helper.spool_dir().join("replica_version_1"),
vec![0, 50, 100, 150],
);
create_artifacts_dir_with_heights(
&backup_helper.spool_dir().join("replica_version_2"),
vec![200, 250],
);
create_artifacts_dir_with_heights(
&backup_helper.spool_dir().join("replica_version_3"),
vec![300, 350, 400, 450, 500, 550],
);

let max_height = backup_helper
.cold_store_artifacts()
.expect("should execute successfully");

assert_eq!(max_height, 150);

let cold_storage_dirs = collect_dir_entries(
&backup_helper
.cold_storage_dir
.join(FAKE_SUBNET_ID)
.join("artifacts"),
);

// Only the artifacts from the earliest replica version are moved to the cold storage.
assert_eq!(cold_storage_dirs.len(), 1);
assert!(cold_storage_dirs[0].ends_with("_000000000150_replica_version_1.tgz"));

let artifacts_dirs = collect_dir_entries(&backup_helper.spool_dir());

// The artifacts from the earliest replica version are removed from the hot storage.
assert_eq!(artifacts_dirs.len(), 2);
assert!(!artifacts_dirs.contains(&"replica_version_1".to_string()));
assert!(artifacts_dirs.contains(&"replica_version_2".to_string()));
assert!(artifacts_dirs.contains(&"replica_version_3".to_string()));
}

#[test]
fn cold_store_states_test() {
let dir = tmpdir("test_dir");

let backup_helper = fake_backup_helper(dir.as_ref(), /*versions_hot=*/ 2);

for height in [0, 10, 20, 30, 40, 50] {
create_dir_all(backup_helper.archive_dir().join(height.to_string())).unwrap();
}

backup_helper
.cold_store_states(30)
.expect("should execute successfully");

let cold_storage_dirs = collect_dir_entries(
&backup_helper
.cold_storage_dir
.join(FAKE_SUBNET_ID)
.join("states"),
);

assert_eq!(cold_storage_dirs.len(), 2);
assert!(cold_storage_dirs.contains(&"30".to_string()));
assert!(cold_storage_dirs.contains(&"10".to_string()));

let archives_dirs = collect_dir_entries(&backup_helper.archive_dir());

// The artifacts from the earliest replica version are removed from the hot storage.
assert_eq!(archives_dirs.len(), 2);
assert!(archives_dirs.contains(&"40".to_string()));
assert!(archives_dirs.contains(&"50".to_string()));
}

// Utility functions below

fn create_artifacts_dir_with_heights(replica_version_dir: &Path, heights: Vec<u64>) {
for height in heights {
let shard = 100 * (height / 100);
create_dir_all(
replica_version_dir
.join(shard.to_string())
.join(height.to_string()),
)
.unwrap();
}
}

fn fake_backup_helper(temp_dir: &Path, versions_hot: usize) -> BackupHelper {
let data_provider = Arc::new(LocalStoreImpl::new(
temp_dir.join("ic_registry_local_store"),
));
let registry_client = Arc::new(RegistryClientImpl::new(
data_provider,
/*metrics_registry=*/ None,
));

let notification_client = NotificationClient {
push_metrics: false,
metrics_urls: vec![],
network_name: "fake_network_name".into(),
backup_instance: "fake_backup_instance".into(),
slack_token: "fake_slack_token".into(),
subnet: "fake_subnet".into(),
log: ic_recovery::util::make_logger(),
};

BackupHelper {
subnet_id: PrincipalId::from_str(FAKE_SUBNET_ID)
.map(SubnetId::from)
.unwrap(),
initial_replica_version: ReplicaVersion::try_from("fake_replica_version").unwrap(),
root_dir: temp_dir.join("backup"),
excluded_dirs: vec![],
ssh_private_key: "fake_ssh_private_key".into(),
registry_client,
notification_client,
downloads_guard: Mutex::new(true).into(),
disk_threshold_warn: 75,
cold_storage_dir: temp_dir.join("cold_storage"),
versions_hot,
artifacts_guard: Mutex::new(true),
daily_replays: 2,
do_cold_storage: true,
thread_id: 1,
blacklisted_nodes: Arc::new(vec![]),
log: ic_recovery::util::make_logger(),
}
}

fn collect_dir_entries(dir: &Path) -> Vec<String> {
std::fs::read_dir(dir)
.unwrap()
.map(|entry| {
entry
.unwrap()
.path()
.file_name()
.unwrap()
.to_string_lossy()
.into()
})
.collect()
}
}

0 comments on commit 0b83d19

Please sign in to comment.