Skip to content

Commit

Permalink
feat: EXC-1526: Snapshot related types
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexandraZapuc committed Feb 9, 2024
1 parent f297ddd commit 7f96d2c
Show file tree
Hide file tree
Showing 9 changed files with 336 additions and 1 deletion.
@@ -0,0 +1,15 @@
syntax = "proto3";
package state.canister_snapshot_bits.v1;

import "state/canister_state_bits/v1/canister_state_bits.proto";
import "types/v1/types.proto";

message CanisterSnapshotBits {
uint64 snapshot_id = 1;
types.v1.CanisterId canister_id = 2;
uint64 taken_at_timestamp = 3;
uint64 canister_version = 4;
bytes certified_data = 5;
optional bytes binary_hash = 6;
canister_state_bits.v1.WasmChunkStoreMetadata wasm_chunk_store_metadata = 7;
}
1 change: 1 addition & 0 deletions rs/protobuf/generator/src/lib.rs
Expand Up @@ -347,6 +347,7 @@ fn build_state_proto(def: &Path, out: &Path) {
def.join("state/ingress/v1/ingress.proto"),
def.join("state/metadata/v1/metadata.proto"),
def.join("state/canister_state_bits/v1/canister_state_bits.proto"),
def.join("state/canister_snapshot_bits/v1/canister_snapshot_bits.proto"),
def.join("state/queues/v1/queues.proto"),
def.join("state/sync/v1/manifest.proto"),
def.join("state/stats/v1/stats.proto"),
Expand Down
19 changes: 19 additions & 0 deletions rs/protobuf/src/gen/state/state.canister_snapshot_bits.v1.rs
@@ -0,0 +1,19 @@
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CanisterSnapshotBits {
#[prost(uint64, tag = "1")]
pub snapshot_id: u64,
#[prost(message, optional, tag = "2")]
pub canister_id: ::core::option::Option<super::super::super::types::v1::CanisterId>,
#[prost(uint64, tag = "3")]
pub taken_at_timestamp: u64,
#[prost(uint64, tag = "4")]
pub canister_version: u64,
#[prost(bytes = "vec", tag = "5")]
pub certified_data: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes = "vec", optional, tag = "6")]
pub binary_hash: ::core::option::Option<::prost::alloc::vec::Vec<u8>>,
#[prost(message, optional, tag = "7")]
pub wasm_chunk_store_metadata:
::core::option::Option<super::super::canister_state_bits::v1::WasmChunkStoreMetadata>,
}
3 changes: 3 additions & 0 deletions rs/protobuf/src/state/canister_snapshot_bits/mod.rs
@@ -0,0 +1,3 @@
#[allow(clippy::all)]
#[path = "../../gen/state/state.canister_snapshot_bits.v1.rs"]
pub mod v1;
1 change: 1 addition & 0 deletions rs/protobuf/src/state/mod.rs
@@ -1,3 +1,4 @@
pub mod canister_snapshot_bits;
pub mod canister_state_bits;
pub mod ingress;
pub mod queues;
Expand Down
205 changes: 205 additions & 0 deletions rs/replicated_state/src/canister_snapshots.rs
@@ -0,0 +1,205 @@
use ic_types::{CanisterId, Time};
use ic_wasm_types::CanisterModule;

use crate::{canister_state::system_state::wasm_chunk_store::WasmChunkStore, PageMap};

use phantom_newtype::Id;
use std::{collections::BTreeMap, sync::Arc};

pub struct SnapshotIdTag;
pub type SnapshotId = Id<SnapshotIdTag, u64>;

/// A collection of canister snapshots and their IDs.
///
/// Additionally, keeps track of all the accumulated changes
/// since the last flush to the disk.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CanisterSnapshots {
next_snapshot_id: SnapshotId,
pub(crate) snapshots: BTreeMap<SnapshotId, Arc<CanisterSnapshot>>,
pub(crate) unflushed_changes: Vec<SnapshotOperation>,
}

impl Default for CanisterSnapshots {
fn default() -> Self {
Self {
next_snapshot_id: SnapshotId::new(0),
snapshots: BTreeMap::new(),
unflushed_changes: vec![],
}
}
}

impl CanisterSnapshots {
pub fn new(
next_snapshot_id: SnapshotId,
snapshots: BTreeMap<SnapshotId, Arc<CanisterSnapshot>>,
) -> Self {
Self {
next_snapshot_id,
snapshots,
unflushed_changes: vec![],
}
}

/// Adds new snapshot in the collection and assigns a `SnapshotId`.
///
/// Additionally, adds a new item to the `unflushed_changes`
/// which represents the new backup accumulated since the last flush to the disk.
pub fn push(&mut self, snapshot: Arc<CanisterSnapshot>) -> SnapshotId {
let snapshot_id = self.next_snapshot_id;
self.next_snapshot_id = SnapshotId::new(self.next_snapshot_id.get() + 1);
self.unflushed_changes.push(SnapshotOperation::Backup(
*snapshot.canister_id(),
snapshot_id,
));
self.snapshots.insert(snapshot_id, snapshot);
snapshot_id
}

/// Remove snapshot identified by `snapshot_id` from the collection of snapshots.
///
/// Additionally, adds a new item to the `unflushed_changes`
/// which represents the deleted backup since the last flush to the disk.
pub fn remove(&mut self, snapshot_id: SnapshotId) -> Option<Arc<CanisterSnapshot>> {
let removed_snapshot = self.snapshots.remove(&snapshot_id);
match removed_snapshot {
Some(snapshot) => {
self.unflushed_changes
.push(SnapshotOperation::Delete(snapshot_id));
Some(snapshot)
}
None => {
// No snapshot found based on the snapshot ID provided.
None
}
}
}

/// Take the unflushed changes.
pub fn take_unflushed_changes(&mut self) -> Vec<SnapshotOperation> {
std::mem::take(&mut self.unflushed_changes)
}
}

/// Contains all information related to a canister snapshot.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CanisterSnapshot {
/// Identifies the canister to which this snapshot belongs.
canister_id: CanisterId,
/// The timestamp indicating the moment the snapshot was captured.
taken_at_timestamp: Time,
/// The canister version at the time of taking the snapshot.
canister_version: u64,
/// The certified data blob belonging to the canister.
certified_data: Vec<u8>,
/// Snapshot of chunked store.
chunk_store: WasmChunkStore,
/// The raw canister module.
/// May not exist depending on whether or not the canister has
/// an actual wasm module.
wasm_binary: Option<CanisterModule>,
/// Snapshot of stable memory.
stable_memory: Option<PageMap>,
/// Snapshot of wasm memory.
wasm_memory: Option<PageMap>,
}

impl CanisterSnapshot {
pub fn new(
canister_id: CanisterId,
taken_at_timestamp: Time,
canister_version: u64,
certified_data: Vec<u8>,
stable_memory: Option<PageMap>,
wasm_memory: Option<PageMap>,
chunk_store: WasmChunkStore,
wasm_binary: Option<CanisterModule>,
) -> CanisterSnapshot {
Self {
canister_id,
taken_at_timestamp,
canister_version,
certified_data,
stable_memory,
wasm_memory,
chunk_store,
wasm_binary,
}
}

pub fn canister_id(&self) -> &CanisterId {
&self.canister_id
}

pub fn canister_version(&self) -> u64 {
self.canister_version
}

pub fn taken_at_timestamp(&self) -> &Time {
&self.taken_at_timestamp
}

pub fn stable_memory(&self) -> &Option<PageMap> {
&self.stable_memory
}

pub fn wasm_memory(&self) -> &Option<PageMap> {
&self.wasm_memory
}

pub fn chunk_store(&self) -> &WasmChunkStore {
&self.chunk_store
}
}

/// Describes the types of unflushed changes that can be stored by the `SnapshotManager`.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum SnapshotOperation {
Delete(SnapshotId),
Backup(CanisterId, SnapshotId),
Restore(CanisterId, SnapshotId),
}

#[cfg(test)]
mod tests {
use super::*;
use super::{CanisterSnapshot, CanisterSnapshots, PageMap};
use ic_test_utilities::types::ids::canister_test_id;
use ic_test_utilities_time::mock_time;
use ic_types::NumBytes;
#[test]
fn test_push_and_remove_snapshot() {
let snapshot = CanisterSnapshot::new(
canister_test_id(0),
mock_time(),
0,
vec![],
Some(PageMap::new_for_testing()),
Some(PageMap::new_for_testing()),
WasmChunkStore::new_for_testing(NumBytes::from(20)),
Some(CanisterModule::new(vec![1, 2, 3])),
);
let mut snapshot_manager = CanisterSnapshots::default();
assert_eq!(snapshot_manager.snapshots.len(), 0);
assert_eq!(snapshot_manager.unflushed_changes.len(), 0);

// Pushing new snapshot updates the `unflushed_changes` collection.
let snapshot_id = snapshot_manager.push(Arc::<CanisterSnapshot>::new(snapshot));
assert_eq!(snapshot_manager.snapshots.len(), 1);
assert_eq!(snapshot_manager.unflushed_changes.len(), 1);

let unflushed_changes = snapshot_manager.take_unflushed_changes();
assert_eq!(snapshot_manager.snapshots.len(), 1);
assert_eq!(snapshot_manager.unflushed_changes.len(), 0);
assert_eq!(unflushed_changes.len(), 1);

// Deleting snapshot updates the `unflushed_changes` collection.
snapshot_manager.remove(snapshot_id);
assert_eq!(snapshot_manager.snapshots.len(), 0);
assert_eq!(snapshot_manager.unflushed_changes.len(), 1);
let unflushed_changes = snapshot_manager.take_unflushed_changes();
assert_eq!(snapshot_manager.unflushed_changes.len(), 0);
assert_eq!(unflushed_changes.len(), 1);
}
}
1 change: 1 addition & 0 deletions rs/replicated_state/src/lib.rs
Expand Up @@ -29,6 +29,7 @@
//! as it could change the past.
//!
mod bitcoin;
pub mod canister_snapshots;
pub mod canister_state;
pub(crate) mod hash;
pub mod metadata_state;
Expand Down
74 changes: 73 additions & 1 deletion rs/state_layout/src/state_layout.rs
Expand Up @@ -9,11 +9,13 @@ use ic_metrics::{buckets::decimal_buckets, MetricsRegistry};
use ic_protobuf::{
proxy::{try_from_option_field, ProxyDecodeError},
state::{
canister_snapshot_bits::v1 as pb_canister_snapshot_bits,
canister_state_bits::v1 as pb_canister_state_bits, ingress::v1 as pb_ingress,
queues::v1 as pb_queues, stats::v1 as pb_stats, system_metadata::v1 as pb_metadata,
},
};
use ic_replicated_state::{
canister_snapshots::SnapshotId,
canister_state::{
execution_state::{NextScheduledMethod, WasmMetadata},
system_state::{wasm_chunk_store::WasmChunkStoreMetadata, CanisterHistory, CyclesUseCase},
Expand All @@ -24,7 +26,7 @@ use ic_sys::{fs::sync_path, mmap::ScopedMmap};
use ic_types::{
batch::TotalQueryStats, nominal_cycles::NominalCycles, AccumulatedPriority, CanisterId,
ComputeAllocation, Cycles, ExecutionRound, Height, MemoryAllocation, NumInstructions,
PrincipalId,
PrincipalId, Time,
};
use ic_utils::thread::parallel_map;
use ic_wasm_types::{CanisterModule, WasmHash};
Expand Down Expand Up @@ -164,6 +166,26 @@ pub struct CanisterStateBits {
pub log_visibility: LogVisibility,
}

/// This struct contains bits of the `CanisterSnapshot` that are not already
/// covered somewhere else and are too small to be serialized separately.
#[derive(Debug, PartialEq, Eq)]
pub struct CanisterSnapshotBits {
/// The ID of the canister snapshot.
pub snapshot_id: SnapshotId,
/// Identifies the canister to which this snapshot belongs.
pub canister_id: CanisterId,
/// The timestamp indicating the moment the snapshot was captured.
pub taken_at_timestamp: Time,
/// The canister version at the time of taking the snapshot.
pub canister_version: u64,
/// The hash of the canister wasm.
pub binary_hash: Option<WasmHash>,
/// The certified data blob belonging to the canister.
pub certified_data: Vec<u8>,

This comment has been minimized.

Copy link
@AleDema

AleDema Feb 25, 2024

Why is certified data included both here and in CanisterSnapshot?

/// The metadata required for a wasm chunk store.
pub wasm_chunk_store_metadata: WasmChunkStoreMetadata,
}

#[derive(Clone)]
struct StateLayoutMetrics {
state_layout_error_count: IntCounterVec,
Expand Down Expand Up @@ -1956,6 +1978,56 @@ impl TryFrom<pb_canister_state_bits::ExecutionStateBits> for ExecutionStateBits
}
}

impl From<&CanisterSnapshotBits> for pb_canister_snapshot_bits::CanisterSnapshotBits {
fn from(item: &CanisterSnapshotBits) -> Self {
Self {
snapshot_id: item.snapshot_id.get(),
canister_id: Some((item.canister_id).into()),
taken_at_timestamp: item.taken_at_timestamp.as_nanos_since_unix_epoch(),
canister_version: item.canister_version,
binary_hash: item.binary_hash.as_ref().map(|h| h.to_vec()),
certified_data: item.certified_data.clone(),
wasm_chunk_store_metadata: Some((&item.wasm_chunk_store_metadata).into()),
}
}
}

impl TryFrom<pb_canister_snapshot_bits::CanisterSnapshotBits> for CanisterSnapshotBits {
type Error = ProxyDecodeError;
fn try_from(
item: pb_canister_snapshot_bits::CanisterSnapshotBits,
) -> Result<Self, Self::Error> {
let canister_id: CanisterId =
try_from_option_field(item.canister_id, "CanisterSnapshotBits::canister_id")?;

let binary_hash = match item.binary_hash {
Some(hash) => {
let hash: [u8; 32] =
hash.try_into()
.map_err(|e| ProxyDecodeError::ValueOutOfRange {
typ: "BinaryHash",
err: format!("Expected a 32-byte long module hash, got {:?}", e),
})?;
Some(hash.into())
}
None => None,
};
Ok(Self {
snapshot_id: SnapshotId::new(item.snapshot_id),
canister_id,
taken_at_timestamp: Time::from_nanos_since_unix_epoch(item.taken_at_timestamp),
canister_version: item.canister_version,
binary_hash,
certified_data: item.certified_data,
wasm_chunk_store_metadata: try_from_option_field(
item.wasm_chunk_store_metadata,
"CanisterSnapshotBits::wasm_chunk_store_metadata",
)
.unwrap_or_default(),
})
}
}

fn dir_file_names(p: &Path) -> std::io::Result<Vec<String>> {
if !p.exists() {
return Ok(vec![]);
Expand Down
18 changes: 18 additions & 0 deletions rs/state_layout/src/state_layout/tests.rs
Expand Up @@ -193,6 +193,24 @@ fn test_encode_decode_non_empty_history() {
assert_eq!(canister_state_bits.canister_history, canister_history);
}

#[test]
fn test_canister_snapshots_decode() {
let canister_snapshot_bits = CanisterSnapshotBits {
snapshot_id: SnapshotId::new(5),
canister_id: canister_test_id(7),
taken_at_timestamp: mock_time(),
canister_version: 3,
binary_hash: Some(WasmHash::from(&CanisterModule::new(vec![2, 3, 4]))),
certified_data: vec![3, 4, 7],
wasm_chunk_store_metadata: WasmChunkStoreMetadata::default(),
};

let pb_bits = pb_canister_snapshot_bits::CanisterSnapshotBits::from(&canister_snapshot_bits);
let new_canister_snapshot_bits = CanisterSnapshotBits::try_from(pb_bits).unwrap();

assert_eq!(canister_snapshot_bits, new_canister_snapshot_bits);
}

#[test]
fn test_encode_decode_task_queue() {
let ingress = Arc::new(IngressBuilder::new().method_name("test_ingress").build());
Expand Down

0 comments on commit 7f96d2c

Please sign in to comment.