Skip to content

Commit

Permalink
refactor: Pass necessary functionality of StateLayout into Storage
Browse files Browse the repository at this point in the history
  • Loading branch information
pakhomov-dfinity committed Feb 12, 2024
1 parent 75c4492 commit de89c12
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 181 deletions.
4 changes: 2 additions & 2 deletions rs/replicated_state/src/page_map.rs
Expand Up @@ -16,8 +16,8 @@ pub use page_allocator::{
PageDeltaSerialization, PageSerialization,
};
pub use storage::{
MergeCandidate, OverlayFileSerialization, OverlayIndicesSerialization, StorageSerialization,
MAX_NUMBER_OF_FILES,
MergeCandidate, OverlayFileSerialization, OverlayIndicesSerialization, StorageLayout,
StorageSerialization, MAX_NUMBER_OF_FILES,
};
use storage::{OverlayFile, OverlayVersion, Storage};

Expand Down
69 changes: 42 additions & 27 deletions rs/replicated_state/src/page_map/storage.rs
Expand Up @@ -19,6 +19,7 @@ use crate::page_map::{

use bit_vec::BitVec;
use ic_sys::{mmap::ScopedMmap, PageBytes, PageIndex, PAGE_SIZE};
use ic_types::Height;
use itertools::Itertools;
use phantom_newtype::Id;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -547,6 +548,18 @@ impl OverlayFile {
}
}

/// Provide information from `StateLayout` about paths of a specific `PageMap`.
pub trait StorageLayout {
/// Base file path.
fn base(&self) -> PathBuf;

/// Path for overlay of given height.
fn overlay(&self, height: Height) -> PathBuf;

/// All existing overlay files.
fn existing_overlays(&self) -> Result<Vec<PathBuf>, Box<dyn std::error::Error>>;
}

/// `MergeCandidate` shows which files to merge into a single `PageMap`.
#[derive(Clone, Debug)]
pub struct MergeCandidate {
Expand All @@ -571,28 +584,30 @@ impl MergeCandidate {
/// Overlay_1 |xxxxxx|
/// Base |xxxxxxxxxxxxxxxxxxxxxxxxxxxx|
pub fn new(
dst_base: &Path,
dst_overlay: &Path,
existing_base: &Path,
existing_overlays: &[PathBuf],
) -> Result<Option<MergeCandidate>, PersistenceError> {
let existing_base = if existing_base.exists() {
Some(existing_base.to_path_buf())
layout: &dyn StorageLayout,
height: Height,
) -> Result<Option<MergeCandidate>, Box<dyn std::error::Error>> {
let existing_base = if layout.base().exists() {
Some(layout.base().to_path_buf())
} else {
None
};

let existing_overlays = layout.existing_overlays()?;
// base if any; then overlays old to new.
let existing_files = existing_base.iter().chain(existing_overlays.iter());

let file_lengths: Vec<usize> = existing_files
.map(|path| Ok(std::fs::metadata(path)?.len() as usize))
.collect::<Result<_, std::io::Error>>()
.map_err(|err: _| PersistenceError::FileSystemError {
path: dst_overlay.display().to_string(),
context: format!("Failed get existing file length: {}", dst_overlay.display()),
internal_error: err.to_string(),
})?;
.map(|path| {
Ok(std::fs::metadata(path)
.map_err(|err: _| PersistenceError::FileSystemError {
path: path.display().to_string(),
context: format!("Failed get existing file length: {}", path.display()),
internal_error: err.to_string(),
})?
.len() as usize)
})
.collect::<Result<_, PersistenceError>>()?;

let Some(num_files_to_merge) = Self::num_files_to_merge(&file_lengths) else {
return Ok(None);
Expand All @@ -617,9 +632,9 @@ impl MergeCandidate {
};

let dst = if merge_all {
PersistDestination::BaseFile(dst_base.to_path_buf())
PersistDestination::BaseFile(layout.base().to_path_buf())
} else {
PersistDestination::OverlayFile(dst_overlay.to_path_buf())
PersistDestination::OverlayFile(layout.overlay(height).to_path_buf())
};

Ok(Some(MergeCandidate {
Expand All @@ -630,22 +645,22 @@ impl MergeCandidate {
}

pub fn full_merge(
dst_base: &Path,
existing_base: &Path,
existing_overlays: &[PathBuf],
) -> Option<MergeCandidate> {
if dst_base == existing_base && existing_overlays.is_empty() {
None
layout: &dyn StorageLayout,
) -> Result<Option<MergeCandidate>, Box<dyn std::error::Error>> {
let existing_overlays = layout.existing_overlays()?;
let base_path = layout.base();
if existing_overlays.is_empty() {
Ok(None)
} else {
Some(MergeCandidate {
Ok(Some(MergeCandidate {
overlays: existing_overlays.to_vec(),
base: if existing_base.exists() {
Some(existing_base.to_path_buf())
base: if base_path.exists() {
Some(base_path.clone())
} else {
None
},
dst: PersistDestination::BaseFile(dst_base.to_path_buf()),
})
dst: PersistDestination::BaseFile(base_path),
}))
}
}

Expand Down
83 changes: 59 additions & 24 deletions rs/replicated_state/src/page_map/storage/tests.rs
Expand Up @@ -11,15 +11,34 @@ use crate::page_map::{
VERSION_NUM_BYTES,
},
FileDescriptor, MemoryInstructions, MemoryMapOrData, PageAllocator, PageDelta, PageMap,
PersistDestination, PersistenceError, StorageMetrics, MAX_NUMBER_OF_FILES,
PersistDestination, PersistenceError, StorageLayout, StorageMetrics, MAX_NUMBER_OF_FILES,
};
use assert_matches::assert_matches;
use bit_vec::BitVec;
use ic_metrics::MetricsRegistry;
use ic_sys::{PageIndex, PAGE_SIZE};
use ic_test_utilities::io::{make_mutable, make_readonly, write_all_at};
use ic_types::Height;
use tempfile::{tempdir, TempDir};

struct TestStorageLayout {
base: PathBuf,
overlay_dst: PathBuf,
existing_overlays: Vec<PathBuf>,
}

impl StorageLayout for TestStorageLayout {
fn base(&self) -> PathBuf {
self.base.clone()
}
fn overlay(&self, _height: Height) -> PathBuf {
self.overlay_dst.clone()
}
fn existing_overlays(&self) -> Result<Vec<PathBuf>, Box<dyn std::error::Error>> {
Ok(self.existing_overlays.clone())
}
}

/// The expected size of an overlay file.
///
/// The expectation is based on how many pages the overlay contains and how many distinct
Expand Down Expand Up @@ -385,9 +404,15 @@ fn write_overlays_and_verify_with_tempdir(instructions: Vec<Instruction>, tempdi
.as_slice(),
);

let merge =
MergeCandidate::new(path_base, path_overlay, path_base, &files_before.overlays)
.unwrap();
let merge = MergeCandidate::new(
&TestStorageLayout {
base: path_base.to_path_buf(),
overlay_dst: path_overlay.to_path_buf(),
existing_overlays: files_before.overlays.clone(),
},
Height::from(0),
)
.unwrap();
// Open the files before they might get deleted.
let merged_overlays: Vec<_> = merge.as_ref().map_or(Vec::new(), |m| {
m.overlays
Expand Down Expand Up @@ -639,10 +664,12 @@ fn test_num_files_to_merge() {
fn test_make_merge_candidate_on_empty_dir() {
let tempdir = tempdir().unwrap();
let merge_candidate = MergeCandidate::new(
&tempdir.path().join("vmemory_0.bin"),
&tempdir.path().join("000000_vmemory_0.overlay"),
&tempdir.path().join("vmemory_0.bin"),
&[],
&TestStorageLayout {
base: tempdir.path().join("vmemory_0.bin"),
overlay_dst: tempdir.path().join("000000_vmemory_0.overlay"),
existing_overlays: Vec::new(),
},
Height::from(0),
)
.unwrap();
assert!(merge_candidate.is_none());
Expand All @@ -660,10 +687,12 @@ fn test_make_none_merge_candidate() {
assert_eq!(storage_files.overlays.len(), 1);

let merge_candidate = MergeCandidate::new(
&tempdir.path().join("vmemory_0.bin"),
&tempdir.path().join("000000_vmemory_0.overlay"),
&tempdir.path().join("vmemory_0.bin"),
&storage_files.overlays,
&TestStorageLayout {
base: tempdir.path().join("vmemory_0.bin"),
overlay_dst: tempdir.path().join("000000_vmemory_0.overlay"),
existing_overlays: storage_files.overlays.clone(),
},
Height::from(0),
)
.unwrap();
assert!(merge_candidate.is_none());
Expand All @@ -688,10 +717,12 @@ fn test_make_merge_candidate_to_overlay() {
assert_eq!(storage_files.overlays.len(), 3);

let merge_candidate = MergeCandidate::new(
&tempdir.path().join("vmemory_0.bin"),
&tempdir.path().join("000003_vmemory_0.overlay"),
&tempdir.path().join("vmemory_0.bin"),
&storage_files.overlays,
&TestStorageLayout {
base: tempdir.path().join("vmemory_0.bin"),
overlay_dst: tempdir.path().join("000003_vmemory_0.overlay"),
existing_overlays: storage_files.overlays.clone(),
},
Height::from(3),
)
.unwrap()
.unwrap();
Expand Down Expand Up @@ -720,10 +751,12 @@ fn test_make_merge_candidate_to_base() {
assert_eq!(storage_files.overlays.len(), 2);

let merge_candidate = MergeCandidate::new(
&tempdir.path().join("vmemory_0.bin"),
&tempdir.path().join("000003_vmemory_0.overlay"),
&tempdir.path().join("vmemory_0.bin"),
&storage_files.overlays,
&TestStorageLayout {
base: tempdir.path().join("vmemory_0.bin"),
overlay_dst: tempdir.path().join("000003_vmemory_0.overlay"),
existing_overlays: storage_files.overlays.clone(),
},
Height::from(3),
)
.unwrap()
.unwrap();
Expand Down Expand Up @@ -752,10 +785,12 @@ fn test_two_same_length_files_are_a_pyramid() {
assert_eq!(storage_files.overlays.len(), 2);

let merge_candidate = MergeCandidate::new(
&tempdir.path().join("vmemory_0.bin"),
&tempdir.path().join("000003_vmemory_0.overlay"),
&tempdir.path().join("vmemory_0.bin"),
&storage_files.overlays,
&TestStorageLayout {
base: tempdir.path().join("vmemory_0.bin"),
overlay_dst: tempdir.path().join("000003_vmemory_0.overlay"),
existing_overlays: storage_files.overlays.clone(),
},
Height::from(0),
)
.unwrap();
assert!(merge_candidate.is_none());
Expand Down
9 changes: 9 additions & 0 deletions rs/state_layout/src/error.rs
Expand Up @@ -60,3 +60,12 @@ impl fmt::Display for LayoutError {
}
}
}

impl std::error::Error for LayoutError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
LayoutError::IoError { io_err, .. } => Some(io_err),
_ => None,
}
}
}
43 changes: 9 additions & 34 deletions rs/state_manager/src/lib.rs
Expand Up @@ -1067,48 +1067,23 @@ impl PageMapType {
result
}

/// Maps a PageMapType to its location in a checkpoint according to `layout`
fn path<Access>(&self, layout: &CheckpointLayout<Access>) -> Result<PathBuf, LayoutError>
where
Access: AccessPolicy,
{
match &self {
PageMapType::WasmMemory(id) => Ok(layout.canister(id)?.vmemory_0()),
PageMapType::StableMemory(id) => Ok(layout.canister(id)?.stable_memory_blob()),
PageMapType::WasmChunkStore(id) => Ok(layout.canister(id)?.wasm_chunk_store()),
}
}

/// The path of an overlay file written during round `height`.
fn overlay<Access>(
&self,
layout: &CheckpointLayout<Access>,
height: Height,
) -> Result<PathBuf, LayoutError>
where
Access: AccessPolicy,
{
fn id(&self) -> CanisterId {
match &self {
PageMapType::WasmMemory(id) => Ok(layout.canister(id)?.vmemory_0_overlay(height)),
PageMapType::StableMemory(id) => Ok(layout.canister(id)?.stable_memory_overlay(height)),
PageMapType::WasmChunkStore(id) => {
Ok(layout.canister(id)?.wasm_chunk_store_overlay(height))
}
PageMapType::WasmMemory(id) => *id,
PageMapType::StableMemory(id) => *id,
PageMapType::WasmChunkStore(id) => *id,
}
}

/// List all existing overlay files of a this PageMapType inside `layout`.
fn overlays<Access>(
&self,
layout: &CheckpointLayout<Access>,
) -> Result<Vec<PathBuf>, LayoutError>
/// Maps a PageMapType to its location in a checkpoint according to `layout`
fn base<Access>(&self, layout: &CheckpointLayout<Access>) -> Result<PathBuf, LayoutError>
where
Access: AccessPolicy,
{
match &self {
PageMapType::WasmMemory(id) => layout.canister(id)?.vmemory_0_overlays(),
PageMapType::StableMemory(id) => layout.canister(id)?.stable_memory_overlays(),
PageMapType::WasmChunkStore(id) => layout.canister(id)?.wasm_chunk_store_overlays(),
PageMapType::WasmMemory(id) => Ok(layout.canister(id)?.vmemory_0()),
PageMapType::StableMemory(id) => Ok(layout.canister(id)?.stable_memory_blob()),
PageMapType::WasmChunkStore(id) => Ok(layout.canister(id)?.wasm_chunk_store()),
}
}

Expand Down
2 changes: 1 addition & 1 deletion rs/state_manager/src/manifest.rs
Expand Up @@ -799,7 +799,7 @@ fn dirty_pages_to_dirty_chunks(
continue;
}

let path = dirty_page.page_type.path(checkpoint);
let path = dirty_page.page_type.base(checkpoint);

if let Ok(path) = path {
let relative_path = path
Expand Down

0 comments on commit de89c12

Please sign in to comment.