Skip to content

Commit

Permalink
Merge branch 'stschnei/mr-294-no-empty-protobufs' into 'master'
Browse files Browse the repository at this point in the history
[MR-294] Allow missing protobuf files

This MR makes it so that some protbuf files can be missing without
crashing. Instead, an missing file is equivalent to an empty file.

There will be a follow-up MR that starts omitting writing of the file
if it would be empty. 

See merge request dfinity-lab/public/ic!12400
  • Loading branch information
schneiderstefan committed May 17, 2023
2 parents b667219 + 3176b01 commit 51085d8
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 18 deletions.
35 changes: 17 additions & 18 deletions rs/state_layout/src/state_layout.rs
Expand Up @@ -1396,9 +1396,19 @@ where
T: prost::Message + std::default::Default,
P: ReadPolicy,
{
/// Deserializes the value from the underlying file.
/// If the file does not exist, deserialize as an empty buffer.
/// Returns an error for all other I/O errors.
pub fn deserialize(&self) -> Result<T, LayoutError> {
let file = open_for_read(&self.path)?;
self.deserialize_file(file)
match open_for_read(&self.path) {
Ok(f) => self.deserialize_file(f),
Err(LayoutError::IoError { io_err, .. })
if io_err.kind() == std::io::ErrorKind::NotFound =>
{
self.deserialize_buffer(&[])
}
Err(err) => Err(err),
}
}

fn deserialize_file(&self, f: std::fs::File) -> Result<T, LayoutError> {
Expand All @@ -1407,7 +1417,11 @@ where
message: "failed to mmap a file".to_string(),
io_err,
})?;
T::decode(mmap.as_slice()).map_err(|err| LayoutError::CorruptedLayout {
self.deserialize_buffer(mmap.as_slice())
}

fn deserialize_buffer(&self, buf: &[u8]) -> Result<T, LayoutError> {
T::decode(buf).map_err(|err| LayoutError::CorruptedLayout {
path: self.path.clone(),
message: format!(
"failed to deserialize an object of type {} from protobuf: {}",
Expand All @@ -1416,21 +1430,6 @@ where
),
})
}

/// Deserializes the value if the underlying file exists.
/// If the proto file does not exist, returns Ok(None).
/// Returns an error for all other I/O errors.
pub fn deserialize_opt(&self) -> Result<Option<T>, LayoutError> {
match open_for_read(&self.path) {
Ok(f) => self.deserialize_file(f).map(Some),
Err(LayoutError::IoError { io_err, .. })
if io_err.kind() == std::io::ErrorKind::NotFound =>
{
Ok(None)
}
Err(err) => Err(err),
}
}
}

impl<T, Permissions> From<PathBuf> for ProtoFileWith<T, Permissions>
Expand Down
69 changes: 69 additions & 0 deletions rs/state_manager/src/checkpoint/tests.rs
Expand Up @@ -607,3 +607,72 @@ fn can_recover_subnet_queues() {
);
});
}

#[test]
fn missing_protobufs_are_loaded_correctly() {
with_test_replica_logger(|log| {
let tmp = tmpdir("checkpoint");
let root = tmp.path().to_path_buf();
let layout = StateLayout::try_new(log.clone(), root, &MetricsRegistry::new()).unwrap();
let tip_handler = layout.capture_tip_handler();
let state_manager_metrics = state_manager_metrics();
let (_tip_thread, tip_channel) = spawn_tip_thread(
log,
tip_handler,
layout.clone(),
state_manager_metrics.clone(),
MaliciousFlags::default(),
);

const HEIGHT: Height = Height::new(42);
let canister_id = canister_test_id(1);

let own_subnet_type = SubnetType::Application;
let subnet_id = subnet_test_id(1);
let mut state = ReplicatedState::new(subnet_id, own_subnet_type);
let canister_state = new_canister_state(
canister_id,
user_test_id(24).get(),
INITIAL_CYCLES,
NumSeconds::from(100_000),
);
state.put_canister_state(canister_state);

let _state = make_checkpoint_and_get_state(&state, HEIGHT, &tip_channel);

let recovered_state = load_checkpoint(
&layout.checkpoint(HEIGHT).unwrap(),
own_subnet_type,
&state_manager_metrics.checkpoint_metrics,
Some(&mut thread_pool()),
Arc::new(TestPageAllocatorFileDescriptorImpl::new()),
)
.unwrap();

let checkpoint_layout = layout.checkpoint(HEIGHT).unwrap();
let canister_layout = checkpoint_layout.canister(&canister_id).unwrap();

let empty_protobufs = vec![
checkpoint_layout.subnet_queues().raw_path().to_owned(),
checkpoint_layout.ingress_history().raw_path().to_owned(),
canister_layout.queues().raw_path().to_owned(),
];

for path in empty_protobufs {
assert!(path.exists());
assert_eq!(std::fs::metadata(&path).unwrap().len(), 0);
std::fs::remove_file(&path).unwrap();
}

let recovered_state_altered = load_checkpoint(
&layout.checkpoint(HEIGHT).unwrap(),
own_subnet_type,
&state_manager_metrics.checkpoint_metrics,
Some(&mut thread_pool()),
Arc::new(TestPageAllocatorFileDescriptorImpl::new()),
)
.unwrap();

assert_eq!(recovered_state, recovered_state_altered);
});
}

0 comments on commit 51085d8

Please sign in to comment.