diff --git a/rs/protobuf/def/state/canister_snapshot_bits/v1/canister_snapshot_bits.proto b/rs/protobuf/def/state/canister_snapshot_bits/v1/canister_snapshot_bits.proto new file mode 100644 index 00000000000..35c91041594 --- /dev/null +++ b/rs/protobuf/def/state/canister_snapshot_bits/v1/canister_snapshot_bits.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; +package state.canister_snapshot_bits.v1; + +import "state/canister_state_bits/v1/canister_state_bits.proto"; +import "types/v1/types.proto"; + +message CanisterSnapshotBits { + uint64 snapshot_id = 1; + types.v1.CanisterId canister_id = 2; + uint64 taken_at_timestamp = 3; + uint64 canister_version = 4; + bytes certified_data = 5; + optional bytes binary_hash = 6; + canister_state_bits.v1.WasmChunkStoreMetadata wasm_chunk_store_metadata = 7; +} diff --git a/rs/protobuf/generator/src/lib.rs b/rs/protobuf/generator/src/lib.rs index f62c01c4000..8837df2d85f 100644 --- a/rs/protobuf/generator/src/lib.rs +++ b/rs/protobuf/generator/src/lib.rs @@ -347,6 +347,7 @@ fn build_state_proto(def: &Path, out: &Path) { def.join("state/ingress/v1/ingress.proto"), def.join("state/metadata/v1/metadata.proto"), def.join("state/canister_state_bits/v1/canister_state_bits.proto"), + def.join("state/canister_snapshot_bits/v1/canister_snapshot_bits.proto"), def.join("state/queues/v1/queues.proto"), def.join("state/sync/v1/manifest.proto"), def.join("state/stats/v1/stats.proto"), diff --git a/rs/protobuf/src/gen/state/state.canister_snapshot_bits.v1.rs b/rs/protobuf/src/gen/state/state.canister_snapshot_bits.v1.rs new file mode 100644 index 00000000000..b02aa66efb6 --- /dev/null +++ b/rs/protobuf/src/gen/state/state.canister_snapshot_bits.v1.rs @@ -0,0 +1,19 @@ +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CanisterSnapshotBits { + #[prost(uint64, tag = "1")] + pub snapshot_id: u64, + #[prost(message, optional, tag = "2")] + pub canister_id: ::core::option::Option, + #[prost(uint64, tag = "3")] + pub taken_at_timestamp: u64, + #[prost(uint64, tag = "4")] + pub canister_version: u64, + #[prost(bytes = "vec", tag = "5")] + pub certified_data: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", optional, tag = "6")] + pub binary_hash: ::core::option::Option<::prost::alloc::vec::Vec>, + #[prost(message, optional, tag = "7")] + pub wasm_chunk_store_metadata: + ::core::option::Option, +} diff --git a/rs/protobuf/src/state/canister_snapshot_bits/mod.rs b/rs/protobuf/src/state/canister_snapshot_bits/mod.rs new file mode 100644 index 00000000000..dbf56c3af7f --- /dev/null +++ b/rs/protobuf/src/state/canister_snapshot_bits/mod.rs @@ -0,0 +1,3 @@ +#[allow(clippy::all)] +#[path = "../../gen/state/state.canister_snapshot_bits.v1.rs"] +pub mod v1; diff --git a/rs/protobuf/src/state/mod.rs b/rs/protobuf/src/state/mod.rs index bf955f829d8..8b33256c0f5 100644 --- a/rs/protobuf/src/state/mod.rs +++ b/rs/protobuf/src/state/mod.rs @@ -1,3 +1,4 @@ +pub mod canister_snapshot_bits; pub mod canister_state_bits; pub mod ingress; pub mod queues; diff --git a/rs/replicated_state/src/canister_snapshots.rs b/rs/replicated_state/src/canister_snapshots.rs new file mode 100644 index 00000000000..8b5a9b16dde --- /dev/null +++ b/rs/replicated_state/src/canister_snapshots.rs @@ -0,0 +1,205 @@ +use ic_types::{CanisterId, Time}; +use ic_wasm_types::CanisterModule; + +use crate::{canister_state::system_state::wasm_chunk_store::WasmChunkStore, PageMap}; + +use phantom_newtype::Id; +use std::{collections::BTreeMap, sync::Arc}; + +pub struct SnapshotIdTag; +pub type SnapshotId = Id; + +/// A collection of canister snapshots and their IDs. +/// +/// Additionally, keeps track of all the accumulated changes +/// since the last flush to the disk. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct CanisterSnapshots { + next_snapshot_id: SnapshotId, + pub(crate) snapshots: BTreeMap>, + pub(crate) unflushed_changes: Vec, +} + +impl Default for CanisterSnapshots { + fn default() -> Self { + Self { + next_snapshot_id: SnapshotId::new(0), + snapshots: BTreeMap::new(), + unflushed_changes: vec![], + } + } +} + +impl CanisterSnapshots { + pub fn new( + next_snapshot_id: SnapshotId, + snapshots: BTreeMap>, + ) -> Self { + Self { + next_snapshot_id, + snapshots, + unflushed_changes: vec![], + } + } + + /// Adds new snapshot in the collection and assigns a `SnapshotId`. + /// + /// Additionally, adds a new item to the `unflushed_changes` + /// which represents the new backup accumulated since the last flush to the disk. + pub fn push(&mut self, snapshot: Arc) -> SnapshotId { + let snapshot_id = self.next_snapshot_id; + self.next_snapshot_id = SnapshotId::new(self.next_snapshot_id.get() + 1); + self.unflushed_changes.push(SnapshotOperation::Backup( + *snapshot.canister_id(), + snapshot_id, + )); + self.snapshots.insert(snapshot_id, snapshot); + snapshot_id + } + + /// Remove snapshot identified by `snapshot_id` from the collection of snapshots. + /// + /// Additionally, adds a new item to the `unflushed_changes` + /// which represents the deleted backup since the last flush to the disk. + pub fn remove(&mut self, snapshot_id: SnapshotId) -> Option> { + let removed_snapshot = self.snapshots.remove(&snapshot_id); + match removed_snapshot { + Some(snapshot) => { + self.unflushed_changes + .push(SnapshotOperation::Delete(snapshot_id)); + Some(snapshot) + } + None => { + // No snapshot found based on the snapshot ID provided. + None + } + } + } + + /// Take the unflushed changes. + pub fn take_unflushed_changes(&mut self) -> Vec { + std::mem::take(&mut self.unflushed_changes) + } +} + +/// Contains all information related to a canister snapshot. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct CanisterSnapshot { + /// Identifies the canister to which this snapshot belongs. + canister_id: CanisterId, + /// The timestamp indicating the moment the snapshot was captured. + taken_at_timestamp: Time, + /// The canister version at the time of taking the snapshot. + canister_version: u64, + /// The certified data blob belonging to the canister. + certified_data: Vec, + /// Snapshot of chunked store. + chunk_store: WasmChunkStore, + /// The raw canister module. + /// May not exist depending on whether or not the canister has + /// an actual wasm module. + wasm_binary: Option, + /// Snapshot of stable memory. + stable_memory: Option, + /// Snapshot of wasm memory. + wasm_memory: Option, +} + +impl CanisterSnapshot { + pub fn new( + canister_id: CanisterId, + taken_at_timestamp: Time, + canister_version: u64, + certified_data: Vec, + stable_memory: Option, + wasm_memory: Option, + chunk_store: WasmChunkStore, + wasm_binary: Option, + ) -> CanisterSnapshot { + Self { + canister_id, + taken_at_timestamp, + canister_version, + certified_data, + stable_memory, + wasm_memory, + chunk_store, + wasm_binary, + } + } + + pub fn canister_id(&self) -> &CanisterId { + &self.canister_id + } + + pub fn canister_version(&self) -> u64 { + self.canister_version + } + + pub fn taken_at_timestamp(&self) -> &Time { + &self.taken_at_timestamp + } + + pub fn stable_memory(&self) -> &Option { + &self.stable_memory + } + + pub fn wasm_memory(&self) -> &Option { + &self.wasm_memory + } + + pub fn chunk_store(&self) -> &WasmChunkStore { + &self.chunk_store + } +} + +/// Describes the types of unflushed changes that can be stored by the `SnapshotManager`. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum SnapshotOperation { + Delete(SnapshotId), + Backup(CanisterId, SnapshotId), + Restore(CanisterId, SnapshotId), +} + +#[cfg(test)] +mod tests { + use super::*; + use super::{CanisterSnapshot, CanisterSnapshots, PageMap}; + use ic_test_utilities::types::ids::canister_test_id; + use ic_test_utilities_time::mock_time; + use ic_types::NumBytes; + #[test] + fn test_push_and_remove_snapshot() { + let snapshot = CanisterSnapshot::new( + canister_test_id(0), + mock_time(), + 0, + vec![], + Some(PageMap::new_for_testing()), + Some(PageMap::new_for_testing()), + WasmChunkStore::new_for_testing(NumBytes::from(20)), + Some(CanisterModule::new(vec![1, 2, 3])), + ); + let mut snapshot_manager = CanisterSnapshots::default(); + assert_eq!(snapshot_manager.snapshots.len(), 0); + assert_eq!(snapshot_manager.unflushed_changes.len(), 0); + + // Pushing new snapshot updates the `unflushed_changes` collection. + let snapshot_id = snapshot_manager.push(Arc::::new(snapshot)); + assert_eq!(snapshot_manager.snapshots.len(), 1); + assert_eq!(snapshot_manager.unflushed_changes.len(), 1); + + let unflushed_changes = snapshot_manager.take_unflushed_changes(); + assert_eq!(snapshot_manager.snapshots.len(), 1); + assert_eq!(snapshot_manager.unflushed_changes.len(), 0); + assert_eq!(unflushed_changes.len(), 1); + + // Deleting snapshot updates the `unflushed_changes` collection. + snapshot_manager.remove(snapshot_id); + assert_eq!(snapshot_manager.snapshots.len(), 0); + assert_eq!(snapshot_manager.unflushed_changes.len(), 1); + let unflushed_changes = snapshot_manager.take_unflushed_changes(); + assert_eq!(snapshot_manager.unflushed_changes.len(), 0); + assert_eq!(unflushed_changes.len(), 1); + } +} diff --git a/rs/replicated_state/src/lib.rs b/rs/replicated_state/src/lib.rs index 3797d670dc5..cc400989d66 100644 --- a/rs/replicated_state/src/lib.rs +++ b/rs/replicated_state/src/lib.rs @@ -29,6 +29,7 @@ //! as it could change the past. //! mod bitcoin; +pub mod canister_snapshots; pub mod canister_state; pub(crate) mod hash; pub mod metadata_state; diff --git a/rs/state_layout/src/state_layout.rs b/rs/state_layout/src/state_layout.rs index 44cc7615c5c..fc80fa89f6e 100644 --- a/rs/state_layout/src/state_layout.rs +++ b/rs/state_layout/src/state_layout.rs @@ -9,11 +9,13 @@ use ic_metrics::{buckets::decimal_buckets, MetricsRegistry}; use ic_protobuf::{ proxy::{try_from_option_field, ProxyDecodeError}, state::{ + canister_snapshot_bits::v1 as pb_canister_snapshot_bits, canister_state_bits::v1 as pb_canister_state_bits, ingress::v1 as pb_ingress, queues::v1 as pb_queues, stats::v1 as pb_stats, system_metadata::v1 as pb_metadata, }, }; use ic_replicated_state::{ + canister_snapshots::SnapshotId, canister_state::{ execution_state::{NextScheduledMethod, WasmMetadata}, system_state::{wasm_chunk_store::WasmChunkStoreMetadata, CanisterHistory, CyclesUseCase}, @@ -24,7 +26,7 @@ use ic_sys::{fs::sync_path, mmap::ScopedMmap}; use ic_types::{ batch::TotalQueryStats, nominal_cycles::NominalCycles, AccumulatedPriority, CanisterId, ComputeAllocation, Cycles, ExecutionRound, Height, MemoryAllocation, NumInstructions, - PrincipalId, + PrincipalId, Time, }; use ic_utils::thread::parallel_map; use ic_wasm_types::{CanisterModule, WasmHash}; @@ -164,6 +166,26 @@ pub struct CanisterStateBits { pub log_visibility: LogVisibility, } +/// This struct contains bits of the `CanisterSnapshot` that are not already +/// covered somewhere else and are too small to be serialized separately. +#[derive(Debug, PartialEq, Eq)] +pub struct CanisterSnapshotBits { + /// The ID of the canister snapshot. + pub snapshot_id: SnapshotId, + /// Identifies the canister to which this snapshot belongs. + pub canister_id: CanisterId, + /// The timestamp indicating the moment the snapshot was captured. + pub taken_at_timestamp: Time, + /// The canister version at the time of taking the snapshot. + pub canister_version: u64, + /// The hash of the canister wasm. + pub binary_hash: Option, + /// The certified data blob belonging to the canister. + pub certified_data: Vec, + /// The metadata required for a wasm chunk store. + pub wasm_chunk_store_metadata: WasmChunkStoreMetadata, +} + #[derive(Clone)] struct StateLayoutMetrics { state_layout_error_count: IntCounterVec, @@ -1956,6 +1978,56 @@ impl TryFrom for ExecutionStateBits } } +impl From<&CanisterSnapshotBits> for pb_canister_snapshot_bits::CanisterSnapshotBits { + fn from(item: &CanisterSnapshotBits) -> Self { + Self { + snapshot_id: item.snapshot_id.get(), + canister_id: Some((item.canister_id).into()), + taken_at_timestamp: item.taken_at_timestamp.as_nanos_since_unix_epoch(), + canister_version: item.canister_version, + binary_hash: item.binary_hash.as_ref().map(|h| h.to_vec()), + certified_data: item.certified_data.clone(), + wasm_chunk_store_metadata: Some((&item.wasm_chunk_store_metadata).into()), + } + } +} + +impl TryFrom for CanisterSnapshotBits { + type Error = ProxyDecodeError; + fn try_from( + item: pb_canister_snapshot_bits::CanisterSnapshotBits, + ) -> Result { + let canister_id: CanisterId = + try_from_option_field(item.canister_id, "CanisterSnapshotBits::canister_id")?; + + let binary_hash = match item.binary_hash { + Some(hash) => { + let hash: [u8; 32] = + hash.try_into() + .map_err(|e| ProxyDecodeError::ValueOutOfRange { + typ: "BinaryHash", + err: format!("Expected a 32-byte long module hash, got {:?}", e), + })?; + Some(hash.into()) + } + None => None, + }; + Ok(Self { + snapshot_id: SnapshotId::new(item.snapshot_id), + canister_id, + taken_at_timestamp: Time::from_nanos_since_unix_epoch(item.taken_at_timestamp), + canister_version: item.canister_version, + binary_hash, + certified_data: item.certified_data, + wasm_chunk_store_metadata: try_from_option_field( + item.wasm_chunk_store_metadata, + "CanisterSnapshotBits::wasm_chunk_store_metadata", + ) + .unwrap_or_default(), + }) + } +} + fn dir_file_names(p: &Path) -> std::io::Result> { if !p.exists() { return Ok(vec![]); diff --git a/rs/state_layout/src/state_layout/tests.rs b/rs/state_layout/src/state_layout/tests.rs index 5f84d531763..8e56c4db851 100644 --- a/rs/state_layout/src/state_layout/tests.rs +++ b/rs/state_layout/src/state_layout/tests.rs @@ -193,6 +193,24 @@ fn test_encode_decode_non_empty_history() { assert_eq!(canister_state_bits.canister_history, canister_history); } +#[test] +fn test_canister_snapshots_decode() { + let canister_snapshot_bits = CanisterSnapshotBits { + snapshot_id: SnapshotId::new(5), + canister_id: canister_test_id(7), + taken_at_timestamp: mock_time(), + canister_version: 3, + binary_hash: Some(WasmHash::from(&CanisterModule::new(vec![2, 3, 4]))), + certified_data: vec![3, 4, 7], + wasm_chunk_store_metadata: WasmChunkStoreMetadata::default(), + }; + + let pb_bits = pb_canister_snapshot_bits::CanisterSnapshotBits::from(&canister_snapshot_bits); + let new_canister_snapshot_bits = CanisterSnapshotBits::try_from(pb_bits).unwrap(); + + assert_eq!(canister_snapshot_bits, new_canister_snapshot_bits); +} + #[test] fn test_encode_decode_task_queue() { let ingress = Arc::new(IngressBuilder::new().method_name("test_ingress").build());