Skip to content

Commit

Permalink
feat: EXC-1529: Implement take canister snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexandraZapuc committed Mar 12, 2024
1 parent d9e4d6e commit 1d9661c
Show file tree
Hide file tree
Showing 8 changed files with 875 additions and 53 deletions.
193 changes: 192 additions & 1 deletion rs/execution_environment/src/canister_manager.rs
Expand Up @@ -22,12 +22,13 @@ use ic_logger::{error, fatal, info, ReplicaLogger};
use ic_management_canister_types::{
CanisterChangeDetails, CanisterChangeOrigin, CanisterInstallModeV2, CanisterStatusResultV2,
CanisterStatusType, ChunkHash, InstallChunkedCodeArgs, InstallCodeArgsV2, Method as Ic00Method,
StoredChunksReply, UploadChunkReply,
StoredChunksReply, TakeCanisterSnapshotResponse, UploadChunkReply,
};
use ic_registry_provisional_whitelist::ProvisionalWhitelist;
use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::canister_state::system_state::ReservationError;
use ic_replicated_state::{
canister_snapshots::{CanisterSnapshot, SnapshotId},
canister_state::system_state::{
wasm_chunk_store::{self, WasmChunkStore},
CyclesUseCase,
Expand Down Expand Up @@ -1784,6 +1785,161 @@ impl CanisterManager {
.collect();
Ok(StoredChunksReply(keys))
}

/// Creates a new canister snapshot.
///
/// A canister snapshot can only be initiated by the controllers.
/// In addition, if the `replace_snapshot` parameter is `Some`,
/// the system will attempt to identify the snapshot based on the provided ID,
/// and delete it before creating a new one.
/// Failure to do so will result in the creation of a new snapshot being unsuccessful.
///
/// If the new snapshot cannot be created, an appropiate error will be returned.

This comment has been minimized.

Copy link
@ilbertt

ilbertt Mar 23, 2024

Typo: appropiate -> appropriate

pub(crate) fn take_canister_snapshot(
&self,
subnet_size: usize,
sender: PrincipalId,
canister: &mut CanisterState,
replace_snapshot: Option<SnapshotId>,
state: &mut ReplicatedState,
round_limits: &mut RoundLimits,
resource_saturation: &ResourceSaturation,
) -> Result<TakeCanisterSnapshotResponse, CanisterManagerError> {
// Check sender is a controller.
validate_controller(canister, &sender)?;

// Check that replace snapshot ID exists if provided.
if let Some(replace_snapshot) = replace_snapshot {
match state.canister_snapshots.get(replace_snapshot) {
None => {
// If not found, the operation fails due to invalid parameters.
return Err(CanisterManagerError::CanisterSnapshotNotFound {
canister_id: canister.canister_id(),
snapshot_id: replace_snapshot,
});
}
Some(snapshot) => {
// Verify the provided replace snapshot belongs to this canister.
if snapshot.canister_id() != canister.canister_id() {
return Err(CanisterManagerError::CanisterSnapshotInvalidOwnership {
canister_id: canister.canister_id(),
snapshot_id: replace_snapshot,
});
}
}
}
}

if self.config.rate_limiting_of_heap_delta == FlagStatus::Enabled
&& canister.scheduler_state.heap_delta_debit >= self.config.heap_delta_rate_limit
{
return Err(CanisterManagerError::CanisterHeapDeltaRateLimited {
canister_id: canister.canister_id(),
value: canister.scheduler_state.heap_delta_debit,
limit: self.config.heap_delta_rate_limit,
});
}

let new_snapshot_size = canister.snapshot_memory_usage();

{
// Run the following checks on memory usage and return an error
// if any fails:
// 1. Check new usage will not freeze canister
// 2. Check subnet has available memory
// 3. Reserve cycles on canister
// 4. Actually deduct memory from subnet (asserting it won't fail)

// Calculate if any cycles will need to be reserved.
let reservation_cycles = self.cycles_account_manager.storage_reservation_cycles(
new_snapshot_size,
resource_saturation,
subnet_size,
);

// Memory usage will increase by the snapshot size.
// Check that it doesn't bump the canister over the freezing threshold.
let threshold = self.cycles_account_manager.freeze_threshold_cycles(
canister.system_state.freeze_threshold,
canister.memory_allocation(),
canister.memory_usage() + new_snapshot_size,
canister.message_memory_usage(),
canister.compute_allocation(),
subnet_size,
canister.system_state.reserved_balance(),
);

if canister.system_state.balance() < threshold + reservation_cycles {
return Err(CanisterManagerError::InsufficientCyclesInMemoryGrow {
bytes: new_snapshot_size,
available: canister.system_state.balance(),
threshold,
});
}
// Verify that the subnet has enough memory.
round_limits
.subnet_available_memory
.check_available_memory(new_snapshot_size, NumBytes::from(0), NumBytes::from(0))
.map_err(
|_| CanisterManagerError::SubnetMemoryCapacityOverSubscribed {
requested: new_snapshot_size,
available: NumBytes::from(
round_limits
.subnet_available_memory
.get_execution_memory()
.max(0) as u64,
),
},
)?;
// Reserve needed cycles if the subnet is becoming saturated.
canister
.system_state
.reserve_cycles(reservation_cycles)
.map_err(|err| match err {
ReservationError::InsufficientCycles {
requested,
available,
} => CanisterManagerError::InsufficientCyclesInMemoryGrow {
bytes: new_snapshot_size,
available,
threshold: requested,
},
ReservationError::ReservedLimitExceed { requested, limit } => {
CanisterManagerError::ReservedCyclesLimitExceededInMemoryGrow {
bytes: new_snapshot_size,
requested,
limit,
}
}
})?;
// Actually deduct memory from the subnet. It's safe to unwrap
// here because we already checked the available memory above.
round_limits.subnet_available_memory
.try_decrement(new_snapshot_size, NumBytes::from(0), NumBytes::from(0))
.expect("Error: Cannot fail to decrement SubnetAvailableMemory after checking for availability");
}

// Create new snapshot.
let new_snapshot = CanisterSnapshot::from(canister, state.time());

// Delete old snapshot identified by `replace_snapshot` ID.
if let Some(replace_snapshot) = replace_snapshot {
// Already confirmed that `replace_snapshot` exists.
let is_removed = state.canister_snapshots.remove(replace_snapshot);
debug_assert!(is_removed.is_some());
}

if self.config.rate_limiting_of_heap_delta == FlagStatus::Enabled {
canister.scheduler_state.heap_delta_debit += NumBytes::from(new_snapshot_size);
}

let snapshot_id = state.canister_snapshots.push(Arc::new(new_snapshot));
Ok(TakeCanisterSnapshotResponse::new(
snapshot_id.get(),
state.time().as_nanos_since_unix_epoch(),
new_snapshot_size,
))
}
}

#[derive(Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -1865,6 +2021,19 @@ pub(crate) enum CanisterManagerError {
WasmChunkStoreError {
message: String,
},
CanisterSnapshotNotFound {
canister_id: CanisterId,
snapshot_id: SnapshotId,
},
CanisterHeapDeltaRateLimited {
canister_id: CanisterId,
value: NumBytes,
limit: NumBytes,
},
CanisterSnapshotInvalidOwnership {
canister_id: CanisterId,
snapshot_id: SnapshotId,
},
}

impl From<CanisterManagerError> for UserError {
Expand Down Expand Up @@ -2101,6 +2270,28 @@ impl From<CanisterManagerError> for UserError {
)
)
}
CanisterSnapshotNotFound { canister_id, snapshot_id } => {
Self::new(
ErrorCode::CanisterSnapshotNotFound,
format!(
"Could not find the snapshot ID {} for canister {}", snapshot_id, canister_id,
)
)
}
CanisterHeapDeltaRateLimited { canister_id, value, limit } => {
Self::new(
ErrorCode::CanisterHeapDeltaRateLimited,
format!("Canister {} is heap delta rate limited: current delta debit {}, limit {}", canister_id, value, limit)
)
}
CanisterSnapshotInvalidOwnership { canister_id, snapshot_id } => {
Self::new(
ErrorCode::CanisterRejectedMessage,
format!(
" The snapshot {} does not belong to canister {}", snapshot_id, canister_id,
)
)
}
}
}
}
Expand Down
65 changes: 55 additions & 10 deletions rs/execution_environment/src/execution_environment.rs
Expand Up @@ -47,6 +47,7 @@ use ic_metrics::MetricsRegistry;
use ic_registry_provisional_whitelist::ProvisionalWhitelist;
use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::{
canister_snapshots::SnapshotId,
canister_state::system_state::PausedExecutionId,
canister_state::{system_state::CyclesUseCase, NextExecution},
metadata_state::subnet_call_context_manager::{
Expand Down Expand Up @@ -1245,16 +1246,15 @@ impl ExecutionEnvironment {

Ok(Ic00Method::TakeCanisterSnapshot) => match self.config.canister_snapshots {
FlagStatus::Enabled => {
let res = match TakeCanisterSnapshotArgs::decode(payload) {
Err(err) => Err(err),
Ok(_) => {
// TODO(EXC-1529): Implement take_canister_snapshot.
Err(UserError::new(
ErrorCode::CanisterRejectedMessage,
"Canister snapshotting API is not yet implemented.",
))
}
};
let res = TakeCanisterSnapshotArgs::decode(payload).and_then(|args| {
self.take_canister_snapshot(
*msg.sender(),
&mut state,
args,
registry_settings.subnet_size,
round_limits,
)
});
ExecuteSubnetMessageResult::Finished {
response: res,
refund: msg.take_cycles(),
Expand Down Expand Up @@ -1884,6 +1884,51 @@ impl ExecutionEnvironment {
.map_err(|err| err.into())
}

/// Creates a new canister snapshot and inserts it into `ReplicatedState`.
fn take_canister_snapshot(
&self,
sender: PrincipalId,
state: &mut ReplicatedState,
args: TakeCanisterSnapshotArgs,
subnet_size: usize,
round_limits: &mut RoundLimits,
) -> Result<Vec<u8>, UserError> {
let canister_id = args.get_canister_id();
// Take canister out.
let mut canister = match state.take_canister_state(&canister_id) {
None => {
return Err(UserError::new(
ErrorCode::CanisterNotFound,
format!("Canister {} not found.", &canister_id),
))
}
Some(canister) => canister,
};

let resource_saturation =
self.subnet_memory_saturation(&round_limits.subnet_available_memory);
let replace_snapshot = args.replace_snapshot().map(SnapshotId::new);
let result = self
.canister_manager
.take_canister_snapshot(
subnet_size,
sender,
&mut canister,
replace_snapshot,
state,
round_limits,
&resource_saturation,
)
.map(|response| {
state.metadata.heap_delta_estimate += NumBytes::from(response.total_size());
response.encode()
})
.map_err(|err| err.into());
// Put canister back.
state.put_canister_state(canister);
result
}

fn node_metrics_history(
&self,
state: &ReplicatedState,
Expand Down

0 comments on commit 1d9661c

Please sign in to comment.