Skip to content

Commit

Permalink
fix: Create base files efficiently with LSMT [IC-1306]
Browse files Browse the repository at this point in the history
  • Loading branch information
pakhomov-dfinity committed Jan 17, 2024
1 parent 5c80fa2 commit 323fde8
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 56 deletions.
2 changes: 1 addition & 1 deletion rs/replicated_state/src/page_map/checkpoint.rs
Expand Up @@ -12,7 +12,7 @@ use std::path::Path;
use std::sync::Arc;

lazy_static! {
static ref ZEROED_PAGE: Box<PageBytes> = Box::new([0; PAGE_SIZE]);
pub(crate) static ref ZEROED_PAGE: Box<PageBytes> = Box::new([0; PAGE_SIZE]);
}

/// Checkpoint represents a full snapshot of the heap of a single Wasm
Expand Down
66 changes: 18 additions & 48 deletions rs/replicated_state/src/page_map/storage.rs
Expand Up @@ -11,7 +11,7 @@ use std::{
};

use crate::page_map::{
checkpoint::{Checkpoint, Mapping},
checkpoint::{Checkpoint, Mapping, ZEROED_PAGE},
CheckpointSerialization, FileDescriptor, FileOffset, MappingSerialization, MemoryInstruction,
MemoryInstructions, MemoryMapOrData, PageDelta, PersistDestination, PersistenceError,
StorageMetrics, LABEL_OP_FLUSH, LABEL_OP_MERGE, LABEL_TYPE_INDEX, LABEL_TYPE_PAGE_DATA,
Expand Down Expand Up @@ -1099,37 +1099,18 @@ fn create_file_for_write(path: &Path) -> Result<File, PersistenceError> {
})
}

/// A buffer to write data to file at offset based on `PageIndex` of the start.
struct LinearWriteBuffer {
/// Content to write.
content: Vec<u8>,
/// `PageIndex` of offset in the destination file.
start_index: PageIndex,
}

impl LinearWriteBuffer {
fn apply_to_file(&mut self, file: &mut File, path: &Path) -> Result<(), PersistenceError> {
let offset = self.start_index.get() * PAGE_SIZE as u64;
file.seek(SeekFrom::Start(offset))
.map_err(|err| PersistenceError::FileSystemError {
path: path.display().to_string(),
context: format!("Failed to seek to {}", offset),
internal_error: err.to_string(),
})?;

file.write_all(&self.content)
.map_err(|err| PersistenceError::FileSystemError {
path: path.display().to_string(),
context: format!(
"Failed to write page range #{}..{}",
self.start_index,
self.start_index.get() + self.content.len() as u64
),
internal_error: err.to_string(),
})?;
fn expand_with_zeroes<'a>(pages: &[&'a [u8]], indices: &[PageIndex]) -> Vec<&'a [u8]> {
if indices.is_empty() {
return Vec::new();
}

Ok(())
let mut result =
vec![&ZEROED_PAGE as &PageBytes as &[u8]; indices.last().unwrap().get() as usize + 1];
assert_eq!(pages.len(), indices.len());
for (page, index) in pages.iter().zip(indices) {
result[index.get() as usize] = page;
}
result
}

/// Write all the pages into their corresponding indices as a base file (dense storage).
Expand All @@ -1145,27 +1126,16 @@ fn write_base(
return Ok(());
}
let mut file = create_file_for_write(path)?;
let mut data_size = 0;
let mut buffer = LinearWriteBuffer {
content: pages[0].to_vec(),
start_index: indices[0],
};
let mut last_index = indices[0];
for (page, index) in pages.iter().zip(indices).skip(1) {
if index.get() != last_index.get() + 1 || buffer.content.len() + page.len() > BUF_SIZE {
buffer.apply_to_file(&mut file, path)?;
buffer.start_index = *index;
buffer.content.clear();
}
buffer.content.extend(*page);
data_size += page.len();
last_index = *index;
}
buffer.apply_to_file(&mut file, path)?;
let pages = expand_with_zeroes(pages, indices);
write_pages(&mut file, &pages).map_err(|err| PersistenceError::FileSystemError {
path: path.display().to_string(),
context: format!("Failed to write base file {}", path.display()),
internal_error: err.to_string(),
})?;
metrics
.write_bytes
.with_label_values(&[op_label, LABEL_TYPE_PAGE_DATA])
.inc_by(data_size as u64);
.inc_by((pages.len() * PAGE_SIZE) as u64);
Ok(())
}

Expand Down
14 changes: 7 additions & 7 deletions rs/state_manager/src/tip.rs
Expand Up @@ -517,7 +517,11 @@ impl MergeCandidateAndMetrics {
None => 0,
Some(m) => m.input_size_bytes()?,
};
let write_size_bytes = std::cmp::min(page_map_size_bytes, merge_input_bytes as usize);
let write_size_bytes = if merge_all {
page_map_size_bytes
} else {
std::cmp::min(page_map_size_bytes, merge_input_bytes as usize)
};
Ok(MergeCandidateAndMetrics {
merge_candidate,
num_files_before,
Expand Down Expand Up @@ -672,19 +676,15 @@ fn merge(
0
}
});
let storage_to_save = if max_storage < storage_size {
storage_size - max_storage
} else {
0
};
let storage_to_save = storage_size as i64 - max_storage as i64;

This comment has been minimized.

Copy link
@cybrowl

cybrowl Jan 21, 2024

@pakhomov-dfinity Before storage_to_save could NOT be negative but now it can. Or maybe it isn't possible for it to be negative. Just want to make sure that won't be an issue.

// For a full merge the resulting base file can be larger than sum of the overlays,
// so we need a signed accumulator.
let mut storage_saved: i64 = scheduled_merges
.iter()
.map(|m| m.storage_size_bytes_before as i64 - m.storage_size_bytes_after as i64)
.sum();
for m in merges_with_metrics.into_iter() {
if storage_saved >= storage_to_save as i64 {
if storage_saved >= storage_to_save {
break;
}

Expand Down

0 comments on commit 323fde8

Please sign in to comment.