From 8f18721c2af105ccae5251a9727ffda34e2b7f6b Mon Sep 17 00:00:00 2001 From: Aleksandr Pakhomov Date: Mon, 11 Dec 2023 14:26:25 +0000 Subject: [PATCH] feat: merge LSMT overlays --- rs/replicated_state/src/page_map.rs | 31 +- rs/replicated_state/src/page_map/storage.rs | 365 +++++++++++++- .../src/page_map/storage/tests.rs | 457 +++++++++++++++++- 3 files changed, 801 insertions(+), 52 deletions(-) diff --git a/rs/replicated_state/src/page_map.rs b/rs/replicated_state/src/page_map.rs index 63b3d960243..06b12b9449c 100644 --- a/rs/replicated_state/src/page_map.rs +++ b/rs/replicated_state/src/page_map.rs @@ -6,7 +6,7 @@ mod storage; use bit_vec::BitVec; pub use checkpoint::{CheckpointSerialization, MappingSerialization}; use ic_config::flag_status::FlagStatus; -use ic_metrics::buckets::decimal_buckets; +use ic_metrics::buckets::{decimal_buckets, linear_buckets}; use ic_metrics::MetricsRegistry; use ic_sys::PageBytes; pub use ic_sys::{PageIndex, PAGE_SIZE}; @@ -15,17 +15,17 @@ pub use page_allocator::{ allocated_pages_count, PageAllocator, PageAllocatorRegistry, PageAllocatorSerialization, PageDeltaSerialization, PageSerialization, }; -use storage::{OverlayFile, OverlayVersion, Storage}; pub use storage::{ - OverlayFileSerialization, OverlayIndicesSerialization, StorageSerialization, - MAX_NUMBER_OF_OVERLAYS, + MergeCandidate, OverlayFileSerialization, OverlayIndicesSerialization, StorageSerialization, + MAX_NUMBER_OF_FILES, }; +use storage::{OverlayFile, OverlayVersion, Storage}; use ic_types::{Height, NumPages, MAX_STABLE_MEMORY_IN_BYTES}; use int_map::{Bounds, IntMap}; use libc::off_t; use page_allocator::Page; -use prometheus::{HistogramVec, IntCounter, IntCounterVec}; +use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fs::{File, OpenOptions}; @@ -52,6 +52,8 @@ pub struct StorageMetrics { write_duration: HistogramVec, /// Number of overlays not written because they would have been empty. empty_delta_writes: IntCounter, + /// For each merge, amount of input files we merged. + num_merged_files: Histogram, } impl StorageMetrics { @@ -85,10 +87,17 @@ impl StorageMetrics { "The number of PageMaps that did not receive any deltas since the last write attempt.", ); + let num_merged_files = metrics_registry.histogram( + "storage_layer_num_merged_files", + "For each merge, number of input files we merged.", + linear_buckets(0.0, 1.0, 20), + ); + Self { write_bytes, write_duration, empty_delta_writes, + num_merged_files, } } } @@ -323,7 +332,8 @@ pub enum MemoryMapOrData<'a> { Data(&'a [u8]), } -/// For write operations, whether the delta should be written to a base file or an overlay file +/// For write operations, whether the delta should be written to a base file or an overlay file. +#[derive(Clone, Debug, Eq, PartialEq)] pub enum PersistDestination { BaseFile(PathBuf), OverlayFile(PathBuf), @@ -337,9 +347,16 @@ impl PersistDestination { FlagStatus::Disabled => PersistDestination::BaseFile(base_file), } } + + pub fn raw_path(&self) -> &Path { + match self { + PersistDestination::BaseFile(path) => path, + PersistDestination::OverlayFile(path) => path, + } + } } -/// PageMap is a data structure that represents an image of a canister virtual +/// `PageMap` is a data structure that represents an image of a canister virtual /// memory. The memory is viewed as a collection of _pages_. `PageMap` uses /// 4KiB host OS pages to track the heap contents, not 64KiB Wasm pages. /// diff --git a/rs/replicated_state/src/page_map/storage.rs b/rs/replicated_state/src/page_map/storage.rs index 283728307e9..ce4c6c1f83c 100644 --- a/rs/replicated_state/src/page_map/storage.rs +++ b/rs/replicated_state/src/page_map/storage.rs @@ -13,8 +13,8 @@ use std::{ use crate::page_map::{ checkpoint::{Checkpoint, Mapping}, CheckpointSerialization, FileDescriptor, FileOffset, MappingSerialization, MemoryInstruction, - MemoryInstructions, MemoryMapOrData, PageDelta, PersistenceError, StorageMetrics, - LABEL_OP_FLUSH, LABEL_TYPE_INDEX, LABEL_TYPE_PAGE_DATA, + MemoryInstructions, MemoryMapOrData, PageDelta, PersistDestination, PersistenceError, + StorageMetrics, LABEL_OP_FLUSH, LABEL_OP_MERGE, LABEL_TYPE_INDEX, LABEL_TYPE_PAGE_DATA, }; use bit_vec::BitVec; @@ -27,7 +27,7 @@ use strum_macros::{EnumCount, EnumIter}; /// The (soft) maximum of the number of overlay files. /// There is no limit on the number of overlays while reading, /// but we target this number with merges. -pub const MAX_NUMBER_OF_OVERLAYS: usize = 7; +pub const MAX_NUMBER_OF_FILES: usize = 7; /// For `get_memory_instructions`, any range with a size of up to that number /// of pages will be copied, and larger ranges will be memory mapped instead. @@ -39,6 +39,9 @@ const CURRENT_OVERLAY_VERSION: OverlayVersion = OverlayVersion::V0; /// The maximum supported overlay version for reading. const MAX_SUPPORTED_OVERLAY_VERSION: OverlayVersion = OverlayVersion::V0; +/// Buffer size, in bytes, for writing data to disk. +const BUF_SIZE: usize = 16 * 1024 * 1024; + #[derive( Clone, Copy, @@ -224,6 +227,26 @@ pub(crate) struct OverlayFile { } impl OverlayFile { + fn iter(&self) -> impl Iterator { + self.index + .iter() + .flat_map( + |PageIndexRange { + start_page, + end_page, + start_file_index, + }| { + (start_page.get()..end_page.get()).map(move |index| { + ( + PageIndex::new(index), + PageIndex::new(start_file_index.get() + index - start_page.get()), + ) + }) + }, + ) + .map(|(index, offset)| (index, self.mapping.get_page(offset).as_slice())) + } + /// Get the page at `page_index`. /// Returns `None` for pages not contained in this overlay. fn get_page(&self, page_index: PageIndex) -> Option<&PageBytes> { @@ -524,6 +547,225 @@ impl OverlayFile { } } +/// `MergeCandidate` shows which files to merge into a single `PageMap`. +#[derive(Clone, Debug)] +pub struct MergeCandidate { + /// Overlay files to merge. + overlays: Vec, + /// Base to merge if any. + base: Option, + /// File to create. The format is based on `PersistDestination` variant, either `Base` or + /// `Overlay`. + /// We merge all the data from `overlays` and `base` into it, and remove old files. + dst: PersistDestination, +} + +impl MergeCandidate { + /// Create a `MergeCandidate` for the given overlays and base. The `MergeCandidate` has as dst + /// either `dst_base` or `dst_overlay` depending on if we decided to make a partial (overlay) or a + /// full (base) merge. + /// If we apply the `MergeCandidate`, we must have up to `MAX_NUMBER_OF_FILES` files, forming a + /// pyramid, each file size being greater or equal to sum of newer files on top. For example: + /// Overlay_3 |x| + /// Overlay_2 |xx| + /// Overlay_1 |xxxxxx| + /// Base |xxxxxxxxxxxxxxxxxxxxxxxxxxxx| + pub fn new( + dst_base: &Path, + dst_overlay: &Path, + existing_base: &Path, + existing_overlays: &[PathBuf], + ) -> Result, PersistenceError> { + let existing_base = if existing_base.exists() { + Some(existing_base.to_path_buf()) + } else { + None + }; + + // base if any; then overlays old to new. + let existing_files = existing_base.iter().chain(existing_overlays.iter()); + + let file_lengths: Vec = existing_files + .map(|path| Ok(std::fs::metadata(path)?.len() as usize)) + .collect::>() + .map_err(|err: _| PersistenceError::FileSystemError { + path: dst_overlay.display().to_string(), + context: format!("Failed get existing file length: {}", dst_overlay.display()), + internal_error: err.to_string(), + })?; + + let Some(num_files_to_merge) = Self::num_files_to_merge(&file_lengths) else { + return Ok(None); + }; + + // If we merge all including base, `num_files_to_merge` is larger than the length of + // `existing_overlays`, `saturating_sub` returns zero, and we merge all overlays without + // skipping. + let overlays: Vec = existing_overlays + .iter() + .skip(existing_overlays.len().saturating_sub(num_files_to_merge)) + .cloned() + .collect(); + + // Merge all existing files and put all the data into a single base file. + // Otherwise we create an overlay file. + let merge_all = num_files_to_merge == file_lengths.len(); + let base = if merge_all { + existing_base.clone() + } else { + None + }; + + let dst = if merge_all { + PersistDestination::BaseFile(dst_base.to_path_buf()) + } else { + PersistDestination::OverlayFile(dst_overlay.to_path_buf()) + }; + + Ok(Some(MergeCandidate { + overlays, + base, + dst, + })) + } + + /// Merge data from `overlays` and `base` into `dst` and remove the input files. + pub fn apply(&self, metrics: &StorageMetrics) -> Result<(), PersistenceError> { + let _timer = metrics + .write_duration + .with_label_values(&[LABEL_OP_MERGE]) + .start_timer(); + let base: Option = match self.base { + None => None, + Some(ref path) => { + let checkpoint = Checkpoint::open(path)?; + std::fs::remove_file(path).map_err(|io_err| PersistenceError::FileSystemError { + path: path.display().to_string(), + context: "Could not remove base file before merge".to_string(), + internal_error: io_err.to_string(), + })?; + Some(checkpoint) + } + }; + + let num_merged_files = self.overlays.len() + base.iter().count(); + metrics.num_merged_files.observe(num_merged_files as f64); + + let overlays: Vec = self + .overlays + .iter() + .map(|path| OverlayFile::load(path)) + .collect::, PersistenceError>>()?; + for path in &self.overlays { + std::fs::remove_file(path).map_err(|io_err| PersistenceError::FileSystemError { + path: path.display().to_string(), + context: "Could not remove overlay file before merge".to_string(), + internal_error: io_err.to_string(), + })?; + } + Self::merge_impl(&self.dst, base, &overlays, metrics) + } + + fn merge_impl( + dst: &PersistDestination, + existing_base: Option, + existing: &[OverlayFile], + metrics: &StorageMetrics, + ) -> Result<(), PersistenceError> { + let max_size = existing.iter().map(|f| f.num_pages()).sum::() + + existing_base.as_ref().map_or(0, |base| base.num_pages()); + struct PageWithPriority<'a> { + // Page index in the `PageMap`. + page_index: PageIndex, + page_data: &'a [u8], + // Given the same `page_index`, we chose the data with the lowest priority to write. + priority: usize, + } + + let iterators_with_priority: Vec>> = existing + .iter() + .rev() + .enumerate() + .map(|(priority, overlay)| { + Box::new( + overlay + .iter() + .map(move |(page_index, page_data)| PageWithPriority { + page_index, + page_data, + priority, + }), + ) as Box> + }) + .chain(existing_base.as_ref().map(|checkpoint| { + Box::new((0..checkpoint.num_pages()).map(|index| { + let page_index = PageIndex::new(index as u64); + PageWithPriority { + page_index, + page_data: checkpoint.get_page(page_index).as_slice(), + priority: existing.len(), + } + })) as Box> + })) + .collect(); + + // Sort all iterators by `(page_index, priority)`. All sub-iterators in `iterators_with_priority` + // are sorted by `page_index` and have the same priority. So all the sub-iterators are sorted + // and the `merged_iterators` as well. + let merged_iterator = iterators_with_priority + .into_iter() + .kmerge_by(|a, b| (a.page_index, a.priority) < (b.page_index, b.priority)); + let mut pages_data: Vec<&[u8]> = Vec::with_capacity(max_size); + let mut pages_indices: Vec = Vec::with_capacity(max_size); + // Group sorted `merged_iterator` by `page_index`. Elements within group are sorted by + // priority; we need only the first element of each group. + for (_, mut group) in + &merged_iterator.group_by(|page_with_priority| page_with_priority.page_index) + { + let page_with_priority = group + .next() + .expect("group_by is expected to create non-empty groups"); + pages_data.push(page_with_priority.page_data); + pages_indices.push(page_with_priority.page_index); + } + + match dst { + PersistDestination::OverlayFile(path) => { + write_overlay(&pages_data, &pages_indices, path, metrics, LABEL_OP_MERGE) + } + PersistDestination::BaseFile(path) => { + write_base(&pages_data, &pages_indices, path, metrics, LABEL_OP_MERGE) + } + } + } + + /// Number of files to merge to achieve the `MergeCandidate` criteria (see `MergeCandidate::new` + /// documentation). + /// If no merge is required, return `None`. + fn num_files_to_merge(existing_lengths: &[usize]) -> Option { + let mut merge_to_get_pyramid = 0; + let mut sum = 0; + for (i, len) in existing_lengths.iter().rev().enumerate() { + if sum > *len { + merge_to_get_pyramid = i + 1; + } + sum += len; + } + + let result = std::cmp::max( + merge_to_get_pyramid, + // +1 because merge is going to create a file. + (existing_lengths.len() + 1).saturating_sub(MAX_NUMBER_OF_FILES), + ); + assert!(result <= existing_lengths.len()); + if result <= 1 { + None + } else { + Some(result) + } + } +} + /// A struct describing the index section of an overlay file. struct OverlayIndices { /// A memory map of the index section of the file. @@ -571,7 +813,13 @@ impl OverlayIndices { } } - /// Open the `StorageIndices` in the given file at the right offset. + /// Iterate over all ranges. + fn iter(&self) -> impl Iterator + '_ { + let slice = self.as_slice(); + (0..slice.len()).map(|i| index_range(slice, i, self.num_pages)) + } + + /// Open the `OverlayIndices` in the given file at the right offset. fn new(file: File, len: usize, offset: i64, num_pages: u64) -> Result { assert!(len > 0); let mmap = @@ -789,9 +1037,99 @@ fn group_pages_into_ranges(page_indices: &[PageIndex]) -> Vec { .collect() } +/// Create a new file to write `PageMap` data to. Because we write asynchonously with execution we +/// open file with `O_DIRECT | O_DSYNC`, otherwise it tends to cause lock congestion in Qemu. +fn create_file_for_write(path: &Path) -> Result { + let mut open_options = OpenOptions::new(); + open_options.write(true).create_new(true); + #[cfg(target_os = "linux")] + { + use std::os::unix::fs::OpenOptionsExt; + open_options + .custom_flags(libc::O_DIRECT) + .custom_flags(libc::O_DSYNC); + } + open_options + .open(path) + .map_err(|err| PersistenceError::FileSystemError { + path: path.display().to_string(), + context: "Failed to open file".to_string(), + internal_error: err.to_string(), + }) +} + +/// A buffer to write data to file at offset based on `PageIndex` of the start. +struct LinearWriteBuffer { + /// Content to write. + content: Vec, + /// `PageIndex` of offset in the destination file. + start_index: PageIndex, +} + +impl LinearWriteBuffer { + fn apply_to_file(&mut self, file: &mut File, path: &Path) -> Result<(), PersistenceError> { + let offset = self.start_index.get() * PAGE_SIZE as u64; + file.seek(SeekFrom::Start(offset)) + .map_err(|err| PersistenceError::FileSystemError { + path: path.display().to_string(), + context: format!("Failed to seek to {}", offset), + internal_error: err.to_string(), + })?; + + file.write_all(&self.content) + .map_err(|err| PersistenceError::FileSystemError { + path: path.display().to_string(), + context: format!( + "Failed to write page range #{}..{}", + self.start_index, + self.start_index.get() + self.content.len() as u64 + ), + internal_error: err.to_string(), + })?; + + Ok(()) + } +} + +/// Write all the pages into their corresponding indices as a base file (dense storage). +fn write_base( + pages: &[&[u8]], + indices: &[PageIndex], + path: &Path, + metrics: &StorageMetrics, + op_label: &str, // `LABEL_OP_FLUSH` or `LABEL_OP_MERGE`. +) -> Result<(), PersistenceError> { + assert_eq!(pages.len(), indices.len()); + if pages.is_empty() { + return Ok(()); + } + let mut file = create_file_for_write(path)?; + let mut data_size = 0; + let mut buffer = LinearWriteBuffer { + content: pages[0].to_vec(), + start_index: indices[0], + }; + let mut last_index = indices[0]; + for (page, index) in pages.iter().zip(indices).skip(1) { + if index.get() != last_index.get() + 1 || buffer.content.len() + page.len() > BUF_SIZE { + buffer.apply_to_file(&mut file, path)?; + buffer.start_index = *index; + buffer.content.clear(); + } + buffer.content.extend(*page); + data_size += page.len(); + last_index = *index; + } + buffer.apply_to_file(&mut file, path)?; + metrics + .write_bytes + .with_label_values(&[op_label, LABEL_TYPE_PAGE_DATA]) + .inc_by(data_size as u64); + Ok(()) +} + /// Helper function to write the data section of an overlay file. fn write_pages(file: &mut File, data: &Vec<&[u8]>) -> std::io::Result<()> { - const BUF_SIZE: usize = 16 * 1024 * 1024; let mut buf: Vec = Vec::with_capacity(BUF_SIZE); for page in data { if buf.len() + page.len() > BUF_SIZE { @@ -823,22 +1161,7 @@ fn write_overlay( }, ); - let mut open_options = OpenOptions::new(); - open_options.write(true).create_new(true); - #[cfg(target_os = "linux")] - { - use std::os::unix::fs::OpenOptionsExt; - open_options - .custom_flags(libc::O_DIRECT) - .custom_flags(libc::O_DSYNC); - } - let mut file = open_options - .open(path) - .map_err(|err| PersistenceError::FileSystemError { - path: path.display().to_string(), - context: "Failed to open file".to_string(), - internal_error: err.to_string(), - })?; + let mut file = create_file_for_write(path)?; write_pages(&mut file, pages).map_err(|err| PersistenceError::FileSystemError { path: path.display().to_string(), diff --git a/rs/replicated_state/src/page_map/storage/tests.rs b/rs/replicated_state/src/page_map/storage/tests.rs index d9446f771ba..b7f4e94bd81 100644 --- a/rs/replicated_state/src/page_map/storage/tests.rs +++ b/rs/replicated_state/src/page_map/storage/tests.rs @@ -6,9 +6,12 @@ use std::{ }; use crate::page_map::{ - storage::{OverlayFile, Storage, INDEX_ENTRY_NUM_BYTES, SIZE_NUM_BYTES, VERSION_NUM_BYTES}, - FileDescriptor, MemoryInstructions, MemoryMapOrData, PageAllocator, PageDelta, - PersistenceError, StorageMetrics, + storage::{ + Checkpoint, MergeCandidate, OverlayFile, Storage, INDEX_ENTRY_NUM_BYTES, SIZE_NUM_BYTES, + VERSION_NUM_BYTES, + }, + FileDescriptor, MemoryInstructions, MemoryMapOrData, PageAllocator, PageDelta, PageMap, + PersistDestination, PersistenceError, StorageMetrics, MAX_NUMBER_OF_FILES, }; use bit_vec::BitVec; use ic_metrics::MetricsRegistry; @@ -74,6 +77,97 @@ fn verify_overlay_file(path: &Path, expected: &PageDelta) { ); } +/// Read all data in input files as PageDelta. +fn files_as_delta(base: &Option, overlays: &[OverlayFile]) -> PageDelta { + let allocator = PageAllocator::new_for_testing(); + let mut pages = Vec::default(); + let num_logical_pages = overlays + .iter() + .map(|f| f.num_logical_pages()) + .chain(base.iter().map(|b| b.num_pages())) + .max() + .unwrap_or(0); + for index in 0..num_logical_pages { + let index = PageIndex::new(index as u64); + let page = (|| { + for file in overlays.iter().rev() { + if let Some(data) = file.get_page(index) { + return Some(data); + } + } + base.as_ref().map(|base| base.get_page(index)) + })(); + if let Some(data) = page { + pages.push((index, data)); + } + } + PageDelta::from(allocator.allocate(&pages)) +} + +/// Check that we have at most MAX_NUMBER_OF_FILES files and they form a pyramid, i.e. +/// each files size is bigger or equal than sum of files on top of it. +fn check_post_merge_criteria(storage_files: &StorageFiles) { + let file_lengths = storage_files + .base + .iter() + .chain(storage_files.overlays.iter()) + .map(|p| std::fs::metadata(p).unwrap().len()) + .collect::>(); + assert!(file_lengths.len() <= MAX_NUMBER_OF_FILES); + file_lengths + .iter() + .rev() + .fold(0, |size_on_top, current_size| { + assert!(size_on_top <= *current_size); + size_on_top + current_size + }); +} + +/// Verify that the data in `new_base` is the same as in `old_base` + `old_files`. +fn verify_merge_to_base( + new_base: &Path, + old_base: Option, + old_overlays: Vec, +) { + let delta = files_as_delta(&old_base, &old_overlays); + let dst = Checkpoint::open(new_base).unwrap(); + assert_eq!( + delta.iter().last().unwrap().0.get() + 1, + dst.num_pages() as u64 + ); + let zeroes = [0; PAGE_SIZE]; + for i in 0..dst.num_pages() as u64 { + let page_index = PageIndex::new(i); + match (delta.get_page(page_index), dst.get_page(page_index)) { + (Some(data_delta), data_dst) => assert_eq!(data_delta, data_dst), + (None, data_dst) => assert_eq!(&zeroes, data_dst), + } + } +} + +/// Verify that the data in `new_overlay` is the same as in `old_base` + `old_files`. +fn verify_merge_to_overlay( + new_overlay: &Path, + old_base: Option, + old_overlays: Vec, +) { + let delta = files_as_delta(&old_base, &old_overlays); + let dst = OverlayFile::load(new_overlay).unwrap(); + assert_eq!( + delta.iter().last().unwrap().0.get() + 1, + dst.num_logical_pages() as u64 + ); + for i in 0..dst.num_logical_pages() as u64 { + let page_index = PageIndex::new(i); + assert_eq!( + delta.get_page(page_index), + dst.get_page(page_index), + "Failed for idx {:#?}", + page_index + ); + } +} + /// Write the entire data from `delta` into a byte buffer. fn page_delta_as_buffer(delta: &PageDelta) -> Vec { let mut result: Vec = @@ -135,34 +229,40 @@ fn storage_as_buffer(storage: &Storage) -> Vec { result } -/// Base file and storage file in directory `dir`. -/// These tests use the schema where the base file is called `base.bin`, -/// and overlays end in `overlay`. -fn storage_files(dir: &Path) -> (PathBuf, Vec) { - let base_path = dir.join("base.bin"); +#[derive(Eq, Clone, Debug, PartialEq)] +struct StorageFiles { + base: Option, + overlays: Vec, +} +/// Base file and storage file in directory `dir`. +/// These tests use the schema where the base file ends in `.bin`, +/// and overlays end in `.overlay`. +fn storage_files(dir: &Path) -> StorageFiles { + let mut bases: Vec = Default::default(); let mut overlays: Vec = Default::default(); for file in std::fs::read_dir(dir).unwrap() { let path = file.unwrap().path(); if path.to_str().unwrap().ends_with("overlay") { overlays.push(path); + } else if path.to_str().unwrap().ends_with("bin") { + bases.push(path); } } overlays.sort(); + assert!(bases.len() <= 1); - (base_path, overlays) + StorageFiles { + base: bases.get(0).cloned(), + overlays, + } } /// Verify that the storage in the `dir` directory is equivalent to `expected`. fn verify_storage(dir: &Path, expected: &PageDelta) { - let (base_path, overlays) = storage_files(dir); - let base = if base_path.exists() { - Some(base_path.as_path()) - } else { - None - }; + let StorageFiles { base, overlays } = storage_files(dir); - let storage = Storage::load(base, &overlays).unwrap(); + let storage = Storage::load(base.as_deref(), &overlays).unwrap(); // Verify num_host_pages. assert_eq!( @@ -198,10 +298,29 @@ fn verify_storage(dir: &Path, expected: &PageDelta) { assert_eq!(expected_buffer, actual_buffer); } +fn merge_assert_num_files( + merge_files: usize, + merge: &Option, + before: &StorageFiles, + after: &StorageFiles, +) { + let before_len = before.overlays.len() + before.base.iter().len(); + let after_len = after.overlays.len() + after.base.iter().len(); + assert_eq!( + merge + .as_ref() + .map_or(0, |m| m.overlays.len() + m.base.iter().len()), + merge_files + ); + assert_eq!(before_len - after_len + 1, merge_files); +} + /// An instruction to modify a storage. -// TODO (IC-1306): Add Merge instruction enum Instruction { - WriteOverlay(Vec), // With list of PageIndex to overwrite. + /// Create an overlay file with provided list of `PageIndex` to write. + WriteOverlay(Vec), + /// Create & apply `MergeCandidate`; check for amount of files merged. + Merge { assert_files_merged: Option }, } use Instruction::*; @@ -216,9 +335,10 @@ fn write_overlays_and_verify_with_tempdir(instructions: Vec, tempdi let mut combined_delta = PageDelta::default(); for (round, instruction) in instructions.iter().enumerate() { - let path = &tempdir + let path_overlay = &tempdir .path() .join(format!("{:06}_vmemory_0.overlay", round)); + let path_base = &tempdir.path().join("vmemory_0.bin"); match instruction { WriteOverlay(round_indices) => { let data = &[round as u8; PAGE_SIZE]; @@ -229,15 +349,76 @@ fn write_overlays_and_verify_with_tempdir(instructions: Vec, tempdi let delta = PageDelta::from(allocator.allocate(&overlay_pages)); - OverlayFile::write(&delta, path, &metrics).unwrap(); + OverlayFile::write(&delta, path_overlay, &metrics).unwrap(); // Check both the file we just wrote and the resulting directory for correctness. - verify_overlay_file(path, &delta); + verify_overlay_file(path_overlay, &delta); combined_delta.update(delta); verify_storage(tempdir.path(), &combined_delta); } + + Merge { + assert_files_merged, + } => { + let files_before = storage_files(tempdir.path()); + + let mut page_map = PageMap::new_for_testing(); + page_map.update( + combined_delta + .iter() + .map(|(i, p)| (i, p.contents())) + .collect::>() + .as_slice(), + ); + + let merge = + MergeCandidate::new(path_base, path_overlay, path_base, &files_before.overlays) + .unwrap(); + // Open the files before they might get deleted. + let merged_overlays: Vec<_> = merge.as_ref().map_or(Vec::new(), |m| { + m.overlays + .iter() + .map(|path| OverlayFile::load(path).unwrap()) + .collect() + }); + let merged_base = merge + .as_ref() + .and_then(|m| m.base.as_ref().map(|path| Checkpoint::open(path).unwrap())); + + if let Some(merge) = merge.as_ref() { + merge.apply(&metrics).unwrap(); + } + + let files_after = storage_files(tempdir.path()); + + if let Some(assert_files_merged) = assert_files_merged { + merge_assert_num_files( + *assert_files_merged, + &merge, + &files_before, + &files_after, + ); + } + + // Check that the new file is equivalent to the deleted files. + if let Some(merge) = merge { + match merge.dst { + PersistDestination::OverlayFile(ref path) => { + verify_merge_to_overlay(path, merged_base, merged_overlays); + } + PersistDestination::BaseFile(ref path) => { + verify_merge_to_base(path, merged_base, merged_overlays); + } + } + } + + check_post_merge_criteria(&files_after); + + // The directory merge should not cause any changes to the combined data. + verify_storage(tempdir.path(), &combined_delta); + } } } } @@ -253,9 +434,9 @@ fn write_overlays_and_verify(instructions: Vec) { fn corrupt_overlay_is_an_error() { let tempdir = tempdir().unwrap(); write_overlays_and_verify_with_tempdir(vec![WriteOverlay(vec![9, 10])], &tempdir); - let files = storage_files(tempdir.path()); - assert!(files.1.len() == 1); - let overlay_path = &files.1[0]; + let StorageFiles { overlays, .. } = storage_files(tempdir.path()); + assert!(overlays.len() == 1); + let overlay_path = &overlays[0]; let len = std::fs::metadata(overlay_path).unwrap().len(); make_mutable(overlay_path).unwrap(); write_all_at(overlay_path, &[0xff; 4], len - 16).unwrap(); @@ -339,3 +520,231 @@ fn can_overwrite_page() { fn can_overwrite_part_of_range() { write_overlays_and_verify(vec![WriteOverlay(vec![9, 10]), WriteOverlay(vec![10])]); } + +#[test] +fn can_overwrite_and_merge_based_on_number_of_files() { + let mut instructions = Vec::new(); + for i in 0..MAX_NUMBER_OF_FILES { + // Create a pyramid. + instructions.push(WriteOverlay( + (0..2u64.pow((MAX_NUMBER_OF_FILES - i) as u32)).collect(), + )); + } + + instructions.push(Merge { + assert_files_merged: None, + }); + + for _ in 0..3 { + instructions.push(WriteOverlay(vec![0])); + // Always merge top two files to bring the number of files down to `MAX_NUMBER_OF_FILES`. + instructions.push(Merge { + assert_files_merged: Some(2), + }); + } + + write_overlays_and_verify(instructions); +} + +#[test] +fn can_write_consecutively_and_merge_based_on_number_of_files() { + let mut instructions = Vec::new(); + for i in 0..MAX_NUMBER_OF_FILES * 7 { + // Write a new file. + instructions.push(WriteOverlay(vec![20 + i as u64])); + + // Merge if needed. + instructions.push(Merge { + assert_files_merged: None, + }); + } + + write_overlays_and_verify(instructions); +} + +#[test] +fn can_write_with_gap_and_merge_based_on_number_of_files() { + let mut instructions = Vec::new(); + for i in 0..MAX_NUMBER_OF_FILES * 7 { + // Write a new file. + instructions.push(WriteOverlay(vec![20 + 2 * i as u64])); + + // Merge if needed. + instructions.push(Merge { + assert_files_merged: None, + }); + } + + write_overlays_and_verify(instructions); +} + +#[test] +fn can_merge_all() { + let tempdir = tempdir().unwrap(); + let mut instructions = Vec::new(); + // 5 same overlays, overhead 5x + for _ in 0..5 { + instructions.push(WriteOverlay((0..10).collect())); + } + + // Merge all, reduce overhead to 1x. + instructions.push(Merge { + assert_files_merged: Some(5), + }); + + write_overlays_and_verify_with_tempdir(instructions, &tempdir); + let storage_files = storage_files(tempdir.path()); + assert!(storage_files.overlays.is_empty()); + assert!(storage_files.base.is_some()); +} + +#[test] +fn test_num_files_to_merge() { + assert_eq!(MergeCandidate::num_files_to_merge(&[1, 2]), Some(2)); + assert_eq!(MergeCandidate::num_files_to_merge(&[2, 1]), None); + let make_pyramid = |levels| { + let mut result = Vec::new(); + for i in 0..levels { + result.push(1 << (levels - i)); + } + result + }; + assert_eq!( + MergeCandidate::num_files_to_merge(&make_pyramid(MAX_NUMBER_OF_FILES)), + None + ); + assert_eq!( + MergeCandidate::num_files_to_merge(&make_pyramid(MAX_NUMBER_OF_FILES + 1)), + Some(2) + ); + assert_eq!( + MergeCandidate::num_files_to_merge(&make_pyramid(MAX_NUMBER_OF_FILES + 2)), + Some(3) + ); +} + +#[test] +fn test_make_merge_candidate_on_empty_dir() { + let tempdir = tempdir().unwrap(); + let merge_candidate = MergeCandidate::new( + &tempdir.path().join("vmemory_0.bin"), + &tempdir.path().join("000000_vmemory_0.overlay"), + &tempdir.path().join("vmemory_0.bin"), + &[], + ) + .unwrap(); + assert!(merge_candidate.is_none()); +} + +#[test] +fn test_make_none_merge_candidate() { + let tempdir = tempdir().unwrap(); + // Write a single file, 10 pages. + let instructions = vec![WriteOverlay((0..10).collect())]; + + write_overlays_and_verify_with_tempdir(instructions, &tempdir); + let storage_files = storage_files(tempdir.path()); + assert!(storage_files.base.is_none()); + assert_eq!(storage_files.overlays.len(), 1); + + let merge_candidate = MergeCandidate::new( + &tempdir.path().join("vmemory_0.bin"), + &tempdir.path().join("000000_vmemory_0.overlay"), + &tempdir.path().join("vmemory_0.bin"), + &storage_files.overlays, + ) + .unwrap(); + assert!(merge_candidate.is_none()); +} + +#[test] +fn test_make_merge_candidate_to_overlay() { + let tempdir = tempdir().unwrap(); + // 000002 |xx| + // 000001 |x| + // 000000 |xxxxxxxxxx| + // Need to merge top two to reach pyramid. + let instructions = vec![ + WriteOverlay((0..10).collect()), + WriteOverlay((0..1).collect()), + WriteOverlay((0..2).collect()), + ]; + + write_overlays_and_verify_with_tempdir(instructions, &tempdir); + let storage_files = storage_files(tempdir.path()); + assert!(storage_files.base.is_none()); + assert_eq!(storage_files.overlays.len(), 3); + + let merge_candidate = MergeCandidate::new( + &tempdir.path().join("vmemory_0.bin"), + &tempdir.path().join("000003_vmemory_0.overlay"), + &tempdir.path().join("vmemory_0.bin"), + &storage_files.overlays, + ) + .unwrap() + .unwrap(); + assert_eq!( + merge_candidate.dst, + PersistDestination::OverlayFile(tempdir.path().join("000003_vmemory_0.overlay")) + ); + assert!(merge_candidate.base.is_none()); + assert_eq!(merge_candidate.overlays, storage_files.overlays[1..3]); +} + +#[test] +fn test_make_merge_candidate_to_base() { + let tempdir = tempdir().unwrap(); + // 000001 |xx| + // 000000 |x| + // Need to merge all two to reach pyramid. + let instructions = vec![ + WriteOverlay((0..1).collect()), + WriteOverlay((0..2).collect()), + ]; + + write_overlays_and_verify_with_tempdir(instructions, &tempdir); + let storage_files = storage_files(tempdir.path()); + assert!(storage_files.base.is_none()); + assert_eq!(storage_files.overlays.len(), 2); + + let merge_candidate = MergeCandidate::new( + &tempdir.path().join("vmemory_0.bin"), + &tempdir.path().join("000003_vmemory_0.overlay"), + &tempdir.path().join("vmemory_0.bin"), + &storage_files.overlays, + ) + .unwrap() + .unwrap(); + assert_eq!( + merge_candidate.dst, + PersistDestination::BaseFile(tempdir.path().join("vmemory_0.bin")) + ); + assert!(merge_candidate.base.is_none()); + assert_eq!(merge_candidate.overlays, storage_files.overlays); +} + +#[test] +fn test_two_same_length_files_are_a_pyramid() { + let tempdir = tempdir().unwrap(); + // 000001 |xx| + // 000000 |xx| + // No need to merge. + let instructions = vec![ + WriteOverlay((0..2).collect()), + WriteOverlay((0..2).collect()), + ]; + + write_overlays_and_verify_with_tempdir(instructions, &tempdir); + let storage_files = storage_files(tempdir.path()); + assert!(storage_files.base.is_none()); + assert_eq!(storage_files.overlays.len(), 2); + + let merge_candidate = MergeCandidate::new( + &tempdir.path().join("vmemory_0.bin"), + &tempdir.path().join("000003_vmemory_0.overlay"), + &tempdir.path().join("vmemory_0.bin"), + &storage_files.overlays, + ) + .unwrap(); + assert!(merge_candidate.is_none()); +}