Skip to content

Commit

Permalink
Merge branch 'stschnei/lsmt-feature-flag' into 'master'
Browse files Browse the repository at this point in the history
chore: LSMT feature flag

This MR introduces a new feature flag for the log structured merge tree
based storage. The functionality for enabling it is largely missing at
this point, but will be added in upcoming MRs. 

See merge request dfinity-lab/public/ic!15608
  • Loading branch information
schneiderstefan committed Oct 31, 2023
2 parents 66e068b + ef7cca4 commit 5ea4437
Show file tree
Hide file tree
Showing 13 changed files with 234 additions and 49 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions rs/config/src/state_manager.rs
Expand Up @@ -8,13 +8,17 @@ pub struct Config {
/// A feature flag that enables/disables the file backed memory allocator.
#[serde(default = "file_backed_memory_allocator_default")]
pub file_backed_memory_allocator: FlagStatus,
/// A feature flag that enables/disables the log structure merge tree based storage
#[serde(default = "lsmt_storage_default")]
pub lsmt_storage: FlagStatus,
}

impl Config {
pub fn new(state_root: PathBuf) -> Self {
Self {
state_root,
file_backed_memory_allocator: file_backed_memory_allocator_default(),
lsmt_storage: lsmt_storage_default(),
}
}

Expand All @@ -32,3 +36,7 @@ impl Config {
fn file_backed_memory_allocator_default() -> FlagStatus {
FlagStatus::Enabled
}

pub fn lsmt_storage_default() -> FlagStatus {
FlagStatus::Disabled
}
37 changes: 33 additions & 4 deletions rs/replicated_state/src/page_map.rs
Expand Up @@ -4,6 +4,7 @@ mod page_allocator;
mod storage;

pub use checkpoint::{CheckpointSerialization, MappingSerialization};
use ic_config::flag_status::FlagStatus;
use ic_sys::PageBytes;
pub use ic_sys::{PageIndex, PAGE_SIZE};
use ic_utils::{deterministic_operations::deterministic_copy_from_slice, fs::write_all_vectored};
Expand Down Expand Up @@ -234,6 +235,22 @@ pub enum MemoryMapOrData<'a> {
Data(&'a [u8]),
}

/// For write operations, whether the delta should be written to a base file or an overlay file
pub enum PersistDestination {
BaseFile(PathBuf),
OverlayFile(PathBuf),
}

impl PersistDestination {
/// Helper function to simplify the typical match statement to construct this enum
pub fn new(base_file: PathBuf, overlay_file: PathBuf, lsmt_storage: FlagStatus) -> Self {
match lsmt_storage {
FlagStatus::Enabled => PersistDestination::OverlayFile(overlay_file),
FlagStatus::Disabled => PersistDestination::BaseFile(base_file),
}
}
}

/// PageMap is a data structure that represents an image of a canister virtual
/// memory. The memory is viewed as a collection of _pages_. `PageMap` uses
/// 4KiB host OS pages to track the heap contents, not 64KiB Wasm pages.
Expand Down Expand Up @@ -396,14 +413,26 @@ impl PageMap {

/// Persists the heap delta contained in this page map to the specified
/// destination.
pub fn persist_delta(&self, dst: &Path) -> Result<(), PersistenceError> {
self.persist_to_file(&self.page_delta, dst)
pub fn persist_delta(&self, dst: PersistDestination) -> Result<(), PersistenceError> {
match dst {
PersistDestination::BaseFile(dst) => self.persist_to_file(&self.page_delta, &dst),
PersistDestination::OverlayFile(_dst) => {
//TODO (IC-1306)
unimplemented!();
}
}
}

/// Persists the unflushed delta contained in this page map to the specified
/// destination.
pub fn persist_unflushed_delta(&self, dst: &Path) -> Result<(), PersistenceError> {
self.persist_to_file(&self.unflushed_delta, dst)
pub fn persist_unflushed_delta(&self, dst: PersistDestination) -> Result<(), PersistenceError> {
match dst {
PersistDestination::BaseFile(dst) => self.persist_to_file(&self.unflushed_delta, &dst),
PersistDestination::OverlayFile(_dst) => {
//TODO (IC-1306)
unimplemented!();
}
}
}

/// Returns the iterator over host pages managed by this `PageMap`.
Expand Down
23 changes: 15 additions & 8 deletions rs/replicated_state/src/page_map/tests.rs
@@ -1,10 +1,9 @@
use super::{
checkpoint::{Checkpoint, MappingSerialization},
page_allocator::PageAllocatorSerialization,
Buffer, FileDescriptor, PageAllocatorRegistry, PageIndex, PageMap, PageMapSerialization,
};
use crate::page_map::{
MemoryInstructions, MemoryMapOrData, TestPageAllocatorFileDescriptorImpl, WRITE_BUCKET_PAGES,
Buffer, FileDescriptor, MemoryInstructions, MemoryMapOrData, PageAllocatorRegistry, PageIndex,
PageMap, PageMapSerialization, PersistDestination, TestPageAllocatorFileDescriptorImpl,
WRITE_BUCKET_PAGES,
};
use ic_sys::PAGE_SIZE;
use ic_types::{Height, MAX_STABLE_MEMORY_IN_BYTES};
Expand Down Expand Up @@ -117,7 +116,9 @@ fn persisted_map_is_equivalent_to_the_original() {
.map(|(idx, p)| (*idx, p))
.collect::<Vec<_>>(),
);
pagemap.persist_delta(heap_file).unwrap();
pagemap
.persist_delta(PersistDestination::BaseFile(heap_file.to_path_buf()))
.unwrap();
let persisted_map = PageMap::open(
heap_file,
&[],
Expand Down Expand Up @@ -189,7 +190,9 @@ fn can_persist_and_load_an_empty_page_map() {
let heap_file = tmp.path().join("heap");

let original_map = PageMap::new_for_testing();
original_map.persist_delta(&heap_file).unwrap();
original_map
.persist_delta(PersistDestination::BaseFile(heap_file.clone()))
.unwrap();
let persisted_map = PageMap::open(
&heap_file,
&[],
Expand Down Expand Up @@ -422,7 +425,9 @@ fn get_memory_instructions_returns_deltas() {
page_map.get_memory_instructions(range.clone(), range.clone(), 0)
);

page_map.persist_delta(&heap_file).unwrap();
page_map
.persist_delta(PersistDestination::BaseFile(heap_file.clone()))
.unwrap();

let mut page_map = PageMap::open(
&heap_file,
Expand Down Expand Up @@ -478,7 +483,9 @@ fn get_memory_instructions_returns_deltas() {
page_map.update(pages);

// No trailing zero pages are serialized.
page_map.persist_delta(&heap_file).unwrap();
page_map
.persist_delta(PersistDestination::BaseFile(heap_file.clone()))
.unwrap();
assert_eq!(25 * PAGE_SIZE as u64, heap_file.metadata().unwrap().len());
}

Expand Down
1 change: 1 addition & 0 deletions rs/state_layout/BUILD.bazel
Expand Up @@ -4,6 +4,7 @@ package(default_visibility = ["//visibility:public"])

DEPENDENCIES = [
# Keep sorted.
"//rs/config",
"//rs/monitoring/logger",
"//rs/monitoring/metrics",
"//rs/protobuf",
Expand Down
1 change: 1 addition & 0 deletions rs/state_layout/Cargo.toml
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
hex = "0.4.2"
ic-base-types = { path = "../types/base_types" }
ic-config = { path = "../config" }
ic-ic00-types = { path = "../types/ic00_types" }
ic-logger = { path = "../monitoring/logger" }
ic-metrics = { path = "../monitoring/metrics" }
Expand Down
14 changes: 10 additions & 4 deletions rs/state_layout/src/state_layout.rs
Expand Up @@ -2,6 +2,7 @@ use crate::error::LayoutError;
use crate::utils::do_copy;

use ic_base_types::{NumBytes, NumSeconds};
use ic_config::flag_status::FlagStatus;
use ic_logger::{error, info, warn, ReplicaLogger};
use ic_metrics::{buckets::decimal_buckets, MetricsRegistry};
use ic_protobuf::{
Expand Down Expand Up @@ -316,6 +317,7 @@ impl TipHandler {
&mut self,
state_layout: &StateLayout,
cp: &CheckpointLayout<ReadOnly>,
lsmt_storage: FlagStatus,
thread_pool: Option<&mut scoped_threadpool::Pool>,
) -> Result<(), LayoutError> {
let tip = self.tip_path();
Expand All @@ -331,13 +333,17 @@ impl TipHandler {

let file_copy_instruction = |path: &Path| {
if path.extension() == Some(OsStr::new("pbuf")) {
// Do not copy protobufs
// Do not copy protobufs.
CopyInstruction::Skip
} else if path.extension() == Some(OsStr::new("bin")) {
// PageMap files need to be modified in the tip
} else if path.extension() == Some(OsStr::new("bin"))
&& lsmt_storage == FlagStatus::Disabled
{
// PageMap files need to be modified in the tip,
// but only with non-LSMT storage layer that modifies these files.
// With LSMT we always write additional overlay files instead.
CopyInstruction::ReadWrite
} else {
// Everything else should be readonly
// Everything else should be readonly.
CopyInstruction::ReadOnly
}
};
Expand Down
2 changes: 0 additions & 2 deletions rs/state_manager/src/checkpoint.rs
@@ -1,8 +1,6 @@
use crate::{CheckpointError, CheckpointMetrics, TipRequest, NUMBER_OF_CHECKPOINT_THREADS};
use crossbeam_channel::{unbounded, Sender};
use ic_base_types::{subnet_id_try_from_protobuf, CanisterId};
// TODO(MR-412): uncomment
//use ic_protobuf::proxy::try_from_option_field;
use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::page_map::PageAllocatorFileDescriptor;
use ic_replicated_state::Memory;
Expand Down
10 changes: 10 additions & 0 deletions rs/state_manager/src/checkpoint/tests.rs
@@ -1,6 +1,7 @@
use super::*;
use crate::{spawn_tip_thread, StateManagerMetrics, NUMBER_OF_CHECKPOINT_THREADS};
use ic_base_types::NumSeconds;
use ic_config::state_manager::lsmt_storage_default;
use ic_ic00_types::CanisterStatusType;
use ic_metrics::MetricsRegistry;
use ic_registry_subnet_type::SubnetType;
Expand Down Expand Up @@ -100,6 +101,7 @@ fn can_make_a_checkpoint() {
log,
tip_handler,
layout.clone(),
lsmt_storage_default(),
state_manager_metrics(),
MaliciousFlags::default(),
);
Expand Down Expand Up @@ -164,6 +166,7 @@ fn scratchpad_dir_is_deleted_if_checkpointing_failed() {
log,
tip_handler,
layout,
lsmt_storage_default(),
state_manager_metrics.clone(),
MaliciousFlags::default(),
);
Expand Down Expand Up @@ -214,6 +217,7 @@ fn can_recover_from_a_checkpoint() {
log,
tip_handler,
layout.clone(),
lsmt_storage_default(),
state_manager_metrics.clone(),
MaliciousFlags::default(),
);
Expand Down Expand Up @@ -316,6 +320,7 @@ fn can_recover_an_empty_state() {
log,
tip_handler,
layout.clone(),
lsmt_storage_default(),
state_manager_metrics.clone(),
MaliciousFlags::default(),
);
Expand Down Expand Up @@ -400,6 +405,7 @@ fn can_recover_a_stopping_canister() {
log,
tip_handler,
layout.clone(),
lsmt_storage_default(),
state_manager_metrics.clone(),
MaliciousFlags::default(),
);
Expand Down Expand Up @@ -467,6 +473,7 @@ fn can_recover_a_stopped_canister() {
log,
tip_handler,
layout.clone(),
lsmt_storage_default(),
state_manager_metrics.clone(),
MaliciousFlags::default(),
);
Expand Down Expand Up @@ -519,6 +526,7 @@ fn can_recover_a_running_canister() {
log,
tip_handler,
layout.clone(),
lsmt_storage_default(),
state_manager_metrics.clone(),
MaliciousFlags::default(),
);
Expand Down Expand Up @@ -571,6 +579,7 @@ fn can_recover_subnet_queues() {
log,
tip_handler,
layout.clone(),
lsmt_storage_default(),
state_manager_metrics.clone(),
MaliciousFlags::default(),
);
Expand Down Expand Up @@ -621,6 +630,7 @@ fn empty_protobufs_are_loaded_correctly() {
log,
tip_handler,
layout.clone(),
lsmt_storage_default(),
state_manager_metrics.clone(),
MaliciousFlags::default(),
);
Expand Down
41 changes: 40 additions & 1 deletion rs/state_manager/src/lib.rs
Expand Up @@ -746,6 +746,7 @@ pub struct StateManagerImpl {
fd_factory: Arc<dyn PageAllocatorFileDescriptor>,
malicious_flags: MaliciousFlags,
latest_height_update_time: Arc<Mutex<Instant>>,
lsmt_storage: FlagStatus,
}

#[cfg(debug_assertions)]
Expand Down Expand Up @@ -1028,6 +1029,39 @@ impl PageMapType {
}
}

/// The path of an overlay file written during round `height`
fn overlay<Access>(
&self,
layout: &CheckpointLayout<Access>,
height: Height,
) -> Result<PathBuf, LayoutError>
where
Access: AccessPolicy,
{
match &self {
PageMapType::WasmMemory(id) => Ok(layout.canister(id)?.vmemory_0_overlay(height)),
PageMapType::StableMemory(id) => Ok(layout.canister(id)?.stable_memory_overlay(height)),
PageMapType::WasmChunkStore(id) => {
Ok(layout.canister(id)?.wasm_chunk_store_overlay(height))
}
}
}

/// List all existing overlay files of a this PageMapType inside `layout`
fn overlays<Access>(
&self,
layout: &CheckpointLayout<Access>,
) -> Result<Vec<PathBuf>, LayoutError>
where
Access: AccessPolicy,
{
match &self {
PageMapType::WasmMemory(id) => layout.canister(id)?.vmemory_0_overlays(),
PageMapType::StableMemory(id) => layout.canister(id)?.stable_memory_overlays(),
PageMapType::WasmChunkStore(id) => layout.canister(id)?.wasm_chunk_store_overlays(),
}
}

/// Maps a PageMapType to the the `&PageMap` in `state`
fn get<'a>(&self, state: &'a ReplicatedState) -> Option<&'a PageMap> {
match &self {
Expand Down Expand Up @@ -1365,6 +1399,7 @@ impl StateManagerImpl {
log.clone(),
state_layout.capture_tip_handler(),
state_layout.clone(),
config.lsmt_storage,
metrics.clone(),
malicious_flags.clone(),
);
Expand Down Expand Up @@ -1588,6 +1623,7 @@ impl StateManagerImpl {
fd_factory,
malicious_flags,
latest_height_update_time: Arc::new(Mutex::new(Instant::now())),
lsmt_storage: config.lsmt_storage,
}
}
/// Returns the Page Allocator file descriptor factory. This will then be
Expand Down Expand Up @@ -3008,7 +3044,10 @@ impl StateManager for StateManagerImpl {
checkpointed_state
}
CertificationScope::Metadata => {
if self.tip_channel.is_empty() {
if self.lsmt_storage == FlagStatus::Enabled {
// TODO (IC-1306): Implement LSMT strategy for when to flush page maps
unimplemented!();
} else if self.tip_channel.is_empty() {
self.flush_page_maps(&mut state, height);
} else {
self.metrics.checkpoint_metrics.page_map_flush_skips.inc();
Expand Down

0 comments on commit 5ea4437

Please sign in to comment.