Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions rs/rust_canisters/statesync_test/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use candid::CandidType;
use candid::{CandidType, Principal};
use serde::{Deserialize, Serialize};

#[derive(Copy, Clone, CandidType, Deserialize, Serialize)]
#[derive(Clone, Debug, CandidType, Deserialize, Serialize)]
pub enum CanisterCreationStatus {
#[serde(rename = "idle")]
Idle,
#[serde(rename = "in_progress")]
InProgress(u64),
#[serde(rename = "done")]
Done(u64),
Done(Vec<Principal>),
}
47 changes: 40 additions & 7 deletions rs/rust_canisters/statesync_test/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use candid::Nat;
use futures::{StreamExt, stream};
use ic_cdk::futures::spawn;
use ic_cdk::management_canister::{
ProvisionalCreateCanisterWithCyclesArgs, provisional_create_canister_with_cycles,
CanisterSettings, ProvisionalCreateCanisterWithCyclesArgs, UpdateSettingsArgs,
provisional_create_canister_with_cycles, update_settings,
};
use ic_cdk::stable::{
WASM_PAGE_SIZE_IN_BYTES as PAGE_SIZE, stable_grow, stable_size, stable_write,
Expand Down Expand Up @@ -88,12 +90,13 @@ async fn read_state(index: usize) -> Result<u8, String> {

fn set_canister_creation_status(n: u64) -> bool {
let mut canister_creation_status_guard = CANISTER_CREATION_STATUS.lock().unwrap();
match *canister_creation_status_guard {
match &*canister_creation_status_guard {
CanisterCreationStatus::Idle => {
*canister_creation_status_guard = CanisterCreationStatus::InProgress(n);
true
}
CanisterCreationStatus::InProgress(num_canisters) => {
let num_canisters = *num_canisters;
if n == num_canisters {
false
} else {
Expand All @@ -102,7 +105,8 @@ fn set_canister_creation_status(n: u64) -> bool {
);
}
}
CanisterCreationStatus::Done(num_canisters) => {
CanisterCreationStatus::Done(canister_ids) => {
let num_canisters = canister_ids.len() as u64;
if n == num_canisters {
false
} else {
Expand Down Expand Up @@ -133,24 +137,53 @@ async fn create_many_canisters(n: u64) {
};
provisional_create_canister_with_cycles(&create_args)
.await
.expect("Failed to create canister");
.expect("Failed to create canister")
.canister_id
};
futs.push(fut);
}

stream::iter(futs)
let canister_ids = stream::iter(futs)
.buffer_unordered(500) // limit concurrency to 500 (inter-canister queue capacity)
.collect::<Vec<_>>()
.await;

let mut canister_creation_status_guard = CANISTER_CREATION_STATUS.lock().unwrap();
*canister_creation_status_guard = CanisterCreationStatus::Done(n);
*canister_creation_status_guard = CanisterCreationStatus::Done(canister_ids);
});
}

#[update]
async fn update_many_canisters() {
let canister_ids = match &*CANISTER_CREATION_STATUS.lock().unwrap() {
CanisterCreationStatus::Done(ids) => ids.clone(),
_ => ic_cdk::trap("Canister creation is not done yet"),
};

#[allow(clippy::disallowed_methods)]
spawn(async move {
stream::iter(canister_ids.into_iter().cycle().enumerate().map(
|(i, canister_id)| async move {
update_settings(&UpdateSettingsArgs {
canister_id,
settings: CanisterSettings {
freezing_threshold: Some(Nat::from(2592000_u64 + i as u64)),
..Default::default()
},
})
.await
.expect("Failed to update settings");
},
))
.buffer_unordered(500) // limit concurrency to 500 (inter-canister queue capacity)
.for_each(|()| async {})
.await;
});
}

#[query]
fn canister_creation_status() -> CanisterCreationStatus {
*CANISTER_CREATION_STATUS.lock().unwrap()
CANISTER_CREATION_STATUS.lock().unwrap().clone()
}

fn main() {}
Expand Down
3 changes: 2 additions & 1 deletion rs/rust_canisters/statesync_test/statesync_test.did
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
type canister_creation_status = variant {
idle;
in_progress : nat64;
done : nat64;
done : vec principal;
};

service : {
change_state : (nat32) -> (variant { Ok : nat64; Err : text });
read_state : (nat64) -> (variant { Ok : nat8; Err : text }) query;
write_random_data : (nat64, nat64, nat64) -> (variant { Ok : null; Err : text });
create_many_canisters : (nat64) -> ();
update_many_canisters : () -> ();
canister_creation_status : () -> (canister_creation_status) query;
}
61 changes: 60 additions & 1 deletion rs/rust_canisters/statesync_test/test/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,19 @@ fn test_statesync_test_canisters() {
}

#[test]
fn test_create_many_canisters() {
fn test_create_and_update_many_canisters() {
let env = StateMachine::new();

let seed_canister_id = deploy_state_sync_test_canister(&env);

// Canister version of a canister created/controlled by the seed canister.
let canister_version = |canister_id| {
env.canister_status_query_as(seed_canister_id.into(), canister_id)
.unwrap()
.unwrap()
.version()
};

let canister_creation_status = || {
let result = env
.query(
Expand Down Expand Up @@ -160,6 +168,57 @@ fn test_create_many_canisters() {

// We created `num_canisters` in addition to the seed canister.
assert_eq!(env.num_running_canisters(), num_canisters + 1);

let created_canister_ids: Vec<CanisterId> = match canister_creation_status() {
CanisterCreationStatus::Done(ids) => ids
.into_iter()
.map(|canister_candid_principal| {
CanisterId::unchecked_from_principal(PrincipalId::from(canister_candid_principal))
})
.collect(),
s => panic!("Expected Done, got {s:?}"),
};
assert_eq!(created_canister_ids.len(), num_canisters as usize);

// Kick off canister state updates for the created canisters.
// The call returns immediately, but keeps cycling through
// the created canisters in the background and bumping their
// freezing threshold so that their canister state keeps changing
// and, consequently, their canister "version" keeps increasing.
let result = env
.execute_ingress(
seed_canister_id,
"update_many_canisters",
Encode!(&()).unwrap(),
)
.unwrap();
let _ = assert_reply(result);

// Capture the current canister version as the baseline version
// to check increase against.
let baseline_versions: Vec<(_, u64)> = created_canister_ids
.iter()
.copied()
.map(|canister_id| (canister_id, canister_version(canister_id)))
.collect();

// Execute rounds until the canister version of all created canisters increases
// by at least 2 w.r.t. the baseline version observed above.
// This property ensures that the created canisters are indeed being
// updated repeatedly in the background.
loop {
let all_versions_increased =
baseline_versions
.iter()
.copied()
.all(|(canister_id, baseline_version)| {
canister_version(canister_id) >= baseline_version + 2
});
if all_versions_increased {
break;
}
env.tick();
}
}

fn assert_reply(res: WasmResult) -> Vec<u8> {
Expand Down
19 changes: 19 additions & 0 deletions rs/state_machine_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4733,6 +4733,25 @@ impl StateMachine {
})
}

/// Queries the `canister_status` endpoint on the management canister of the specified sender.
/// Use this if the `canister_id` is controlled by `sender`.
pub fn canister_status_query_as(
&self,
sender: PrincipalId,
canister_id: CanisterId,
) -> Result<Result<CanisterStatusResultV2, String>, UserError> {
self.query_as(
sender,
CanisterId::ic_00(),
"canister_status",
(CanisterIdRecord::from(canister_id)).encode(),
)
.map(|wasm_result| match wasm_result {
WasmResult::Reply(reply) => Ok(Decode!(&reply, CanisterStatusResultV2).unwrap()),
WasmResult::Reject(reject_msg) => Err(reject_msg),
})
}

/// Deletes the canister with the specified ID.
pub fn delete_canister(&self, canister_id: CanisterId) -> Result<WasmResult, UserError> {
self.execute_ingress(
Expand Down
81 changes: 40 additions & 41 deletions rs/tests/message_routing/rejoin_test_lib/rejoin_test_lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
use candid::{Decode, Encode, Principal};
use canister_test::{Canister, Runtime, Wasm};
use futures::future::join_all;
use ic_agent::Agent;
use ic_system_test_driver::driver::test_env::TestEnv;
use ic_system_test_driver::driver::test_env_api::get_dependency_path_from_env;
use ic_system_test_driver::driver::test_env_api::retry_async;
use ic_system_test_driver::driver::test_env_api::{HasPublicApiUrl, HasVm, IcNodeSnapshot};
use ic_system_test_driver::util::{MetricsFetcher, UniversalCanister, block_on, runtime_from_url};
use ic_types::PrincipalId;
use ic_universal_canister::wasm;
use ic_utils::interfaces::management_canister::ManagementCanister;
use slog::Logger;
use slog::info;
use statesync_test::CanisterCreationStatus;
use std::collections::BTreeMap;
Expand Down Expand Up @@ -303,24 +300,6 @@ async fn deploy_seed_canister(
seed_canister_id
}

async fn deploy_busy_canister(agent: &Agent, effective_canister_id: PrincipalId, logger: &Logger) {
let universal_canister =
UniversalCanister::new_with_retries(agent, effective_canister_id, logger).await;
universal_canister
.update(
wasm()
.set_heartbeat(
wasm()
.instruction_counter_is_at_least(1_800_000_000)
.build(),
)
.reply()
.build(),
)
.await
.expect("Failed to set up a busy canister.");
}

async fn deploy_canisters_for_long_rounds(
logger: &slog::Logger,
nodes: Vec<IcNodeSnapshot>,
Expand Down Expand Up @@ -354,7 +333,7 @@ async fn deploy_canisters_for_long_rounds(
num_canisters_per_seed_canister * num_seed_canisters,
);
let mut create_many_canisters_futs = vec![];
for seed_canister_id in seed_canisters {
for seed_canister_id in seed_canisters.iter() {
let seed_canister_id_str = seed_canister_id.to_string();
info!(
logger,
Expand All @@ -366,7 +345,7 @@ async fn deploy_canisters_for_long_rounds(
let bytes = Encode!(&num_canisters_per_seed_canister)
.expect("Failed to candid encode argument for a seed canister");
let res = agent
.update(&seed_canister_id, "create_many_canisters")
.update(seed_canister_id, "create_many_canisters")
.with_arg(bytes)
.call_and_wait()
.await;
Expand All @@ -388,7 +367,7 @@ async fn deploy_canisters_for_long_rounds(
loop {
let bytes = Encode!(&()).expect("Failed to candid encode unit type");
let res = agent
.query(&seed_canister_id, "canister_creation_status")
.query(seed_canister_id, "canister_creation_status")
.with_arg(bytes)
.call()
.await;
Expand All @@ -409,7 +388,12 @@ async fn deploy_canisters_for_long_rounds(
"Canister creation on seed canister {seed_canister_id_str:?} is in progress ({n}). Retrying canister_creation_status query ...",
);
}
CanisterCreationStatus::Done(_) => {
CanisterCreationStatus::Done(canister_ids) => {
info!(
logger,
"Canister creation on seed canister {seed_canister_id_str:?} is done ({} canisters created).",
canister_ids.len(),
);
break;
}
}
Expand All @@ -428,26 +412,41 @@ async fn deploy_canisters_for_long_rounds(
}
join_all(create_many_canisters_futs).await;

// We deploy 8 "busy" canisters: this way,
// there are 2 canisters per each of the 4 scheduler threads
// and thus every thread executes 2 x 1.8B = 3.6B instructions.
let num_busy_canisters = 8;
info!(
logger,
"Deploying {} busy canisters on a node {} ({}) ...",
num_busy_canisters,
init_node.node_id,
init_node.get_public_url()
"Calling update_many_canisters on all seed canisters ..."
);
let mut create_busy_canisters_futs = vec![];
for _ in 0..num_busy_canisters {
create_busy_canisters_futs.push(deploy_busy_canister(
&agent,
init_node.effective_canister_id(),
logger,
));
let mut update_many_canisters_futs = vec![];
for seed_canister_id in seed_canisters.iter() {
let seed_canister_id_str = seed_canister_id.to_string();
let agent = agent.clone();
let fut = async move {
loop {
let bytes = Encode!(&()).expect("Failed to candid encode unit type");
let res = agent
.update(seed_canister_id, "update_many_canisters")
.with_arg(bytes)
.call_and_wait()
.await;
match res {
Ok(_) => break,
Err(err) => {
info!(
logger,
"Calling update_many_canisters on seed canister {seed_canister_id_str:?} failed because {err:?}. Retrying ...",
);
}
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
info!(
logger,
"Successfully called update_many_canisters on seed canister {seed_canister_id_str:?}.",
);
};
update_many_canisters_futs.push(fut);
}
join_all(create_busy_canisters_futs).await;
join_all(update_many_canisters_futs).await;
}

fn no_state_clone_count(node: IcNodeSnapshot, logger: &slog::Logger) -> u64 {
Expand Down
2 changes: 1 addition & 1 deletion rs/tests/message_routing/rejoin_test_long_rounds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Runbook::
. setup the testnet of 3f + 1 nodes with f = 4 (like on mainnet)
. pick a random node and install 4 "seed" canisters through it (the state sync test canister is used as "seed")
. create 100,000 canisters via the "seed" canisters (in parallel)
. deploy 8 "busy" canisters (universal canister with heartbeats executing 1.8B instructions)
. make the "seed" canisters cycle through those 100,000 canisters (in parallel) and keep changing their canister state
. pick the slowest node required for consensus in terms of batch processing time and kill that node
. wait for the subnet producing a CUP
. start the killed node
Expand Down
Loading