Skip to content

Commit

Permalink
refactor: Optimize and simplify heartbeat and global timer scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
alin-at-dfinity committed Feb 6, 2024
1 parent 293b694 commit 0e23b0a
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 85 deletions.
123 changes: 55 additions & 68 deletions rs/execution_environment/src/scheduler.rs
Expand Up @@ -593,8 +593,15 @@ impl SchedulerImpl {
}
}

let global_timer_has_reached_deadline =
canister.system_state.global_timer.has_reached_deadline(now);
let may_schedule_heartbeat = canister.exports_heartbeat_method();
let may_schedule_global_timer = canister.exports_global_timer_method()
&& canister.system_state.global_timer.has_reached_deadline(now);

if !may_schedule_heartbeat && !may_schedule_global_timer {
// Canister has no heartbeat and no (schedulable) global timer.
continue;
}

match canister.next_execution() {
NextExecution::ContinueLong | NextExecution::ContinueInstallCode => {
// Do not add a heartbeat task if a long execution
Expand All @@ -605,7 +612,8 @@ impl SchedulerImpl {
let method_chosen = is_next_method_chosen(
canister,
&mut heartbeat_and_timer_canister_ids,
global_timer_has_reached_deadline,
may_schedule_heartbeat,
may_schedule_global_timer,
);

canister.inc_next_scheduled_method();
Expand Down Expand Up @@ -2342,84 +2350,63 @@ fn get_instructions_limits_for_subnet_message(
}
}

/// If the next execution method (`Message`, `Heartbeat` or `GlobalTimer) may be
/// scheduled, it is added to the front of the canister's task queue, the
/// canister ID is added to `heartbeat_and_timer_canister_ids` and `true` is
/// returned. Otherwise, no mutations are made and `false` is returned.
///
/// If either `Heartbeat` or `GlobalTimer` is enqueued, then the other one is
/// also enqueued in the second position, if it may be scheduled.
fn is_next_method_chosen(
canister: &mut CanisterState,
heartbeat_and_timer_canister_ids: &mut BTreeSet<CanisterId>,
global_timer_has_reached_deadline: bool,
may_schedule_heartbeat: bool,
may_schedule_global_timer: bool,
) -> bool {
let mut tasks_added = false;
let method_chosen = match canister.get_next_scheduled_method() {
match canister.get_next_scheduled_method() {
NextScheduledMethod::Message => canister.has_input(),

NextScheduledMethod::Heartbeat => {
tasks_added = try_add_tasks(
canister,
ExecutionTask::Heartbeat,
global_timer_has_reached_deadline,
);
tasks_added
if may_schedule_heartbeat {
enqueue_tasks(
ExecutionTask::Heartbeat,
may_schedule_global_timer.then_some(ExecutionTask::GlobalTimer),
canister,
);
heartbeat_and_timer_canister_ids.insert(canister.canister_id());
}
may_schedule_heartbeat
}

NextScheduledMethod::GlobalTimer => {
tasks_added = try_add_tasks(
canister,
ExecutionTask::GlobalTimer,
global_timer_has_reached_deadline,
);
tasks_added
if may_schedule_global_timer {
enqueue_tasks(
ExecutionTask::GlobalTimer,
may_schedule_heartbeat.then_some(ExecutionTask::Heartbeat),
canister,
);
heartbeat_and_timer_canister_ids.insert(canister.canister_id());
}
may_schedule_global_timer
}
};

if tasks_added {
heartbeat_and_timer_canister_ids.insert(canister.canister_id());
}

method_chosen
}

fn try_add_tasks(
/// Enqueues `task` (optionally followed by `other_task`) at the front of
/// `canister`'s task queue.
fn enqueue_tasks(
task: ExecutionTask,
other_task: Option<ExecutionTask>,
canister: &mut CanisterState,
scheduled_task: ExecutionTask,
global_timer_has_reached_deadline: bool,
) -> bool {
if should_add_task(canister, &scheduled_task, global_timer_has_reached_deadline) {
// If the conditions for the 'other_task' are satisfied, then we are
// adding it as well, because we want to execute as many tasks as
// possible on the single canister to avoid context switching.
// We are first adding the 'other_task' to the front of the queue and after it
// 'scheduled_task' so that the 'scheduled_task' is the first executed.
let other_task = get_other_task(scheduled_task.clone());
if should_add_task(canister, &other_task, global_timer_has_reached_deadline) {
canister.system_state.task_queue.push_front(other_task);
}
canister.system_state.task_queue.push_front(scheduled_task);
return true;
}
false
}

fn should_add_task(
canister: &CanisterState,
task: &ExecutionTask,
global_timer_has_reached_deadline: bool,
) -> bool {
match task {
ExecutionTask::Heartbeat => canister.exports_heartbeat_method(),
ExecutionTask::GlobalTimer => {
global_timer_has_reached_deadline && canister.exports_global_timer_method()
}
ExecutionTask::AbortedExecution { .. }
| ExecutionTask::AbortedInstallCode { .. }
| ExecutionTask::PausedExecution(..)
| ExecutionTask::PausedInstallCode(..) => unreachable!("Unexpected ExecutionTask variant."),
) {
// If the conditions for the 'other_task' are satisfied, then we are
// adding it as well, because we want to execute as many tasks as
// possible on the single canister to avoid context switching.
// We first push the 'other_task' to the front of the queue and then
// in front of it 'task' so that 'task' is executed first.
if let Some(other_task) = other_task {
canister.system_state.task_queue.push_front(other_task);
}
}

fn get_other_task(task: ExecutionTask) -> ExecutionTask {
match task {
ExecutionTask::Heartbeat => ExecutionTask::GlobalTimer,
ExecutionTask::GlobalTimer => ExecutionTask::Heartbeat,
ExecutionTask::AbortedExecution { .. }
| ExecutionTask::AbortedInstallCode { .. }
| ExecutionTask::PausedExecution(..)
| ExecutionTask::PausedInstallCode(..) => unreachable!("Unexpected ExecutionTask variant."),
}
canister.system_state.task_queue.push_front(task);
}
32 changes: 15 additions & 17 deletions rs/execution_environment/src/scheduler/tests.rs
Expand Up @@ -25,7 +25,6 @@ use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::canister_state::system_state::PausedExecutionId;
use ic_replicated_state::testing::CanisterQueuesTesting;
use ic_replicated_state::testing::SystemStateTesting;
use ic_replicated_state::ExportedFunctions;
use ic_state_machine_tests::{PayloadBuilder, StateMachineBuilder};
use ic_test_utilities::types::ids::message_test_id;
use ic_test_utilities::{
Expand All @@ -41,7 +40,6 @@ use ic_test_utilities_metrics::{
};
use ic_test_utilities_time::mock_time;
use ic_types::methods::SystemMethod;
use ic_types::methods::WasmMethod;
use ic_types::time::expiry_time_from_now;
use ic_types::{
messages::{
Expand Down Expand Up @@ -602,7 +600,7 @@ fn test_message_limit_from_message_overhead() {
let mut callee = canister0;
let mut call = other_side(callee, 0);

for _ in 0..expected_number_of_messages * 10 {
for _ in 0..expected_number_of_messages * 3 {
callee = if callee == canister1 {
canister0
} else {
Expand Down Expand Up @@ -4685,10 +4683,12 @@ fn test_is_next_method_added_to_task_queue() {
Cycles::new(1_000_000_000_000),
ComputeAllocation::zero(),
MemoryAllocation::BestEffort,
Some(SystemMethod::CanisterGlobalTimer),
None,
None,
None,
);
let may_schedule_heartbeat = false;
let may_schedule_global_timer = false;

let mut heartbeat_and_timer_canister_ids = BTreeSet::new();
assert!(!test
Expand All @@ -4703,22 +4703,17 @@ fn test_is_next_method_added_to_task_queue() {
assert!(!is_next_method_chosen(
test.canister_state_mut(canister),
&mut heartbeat_and_timer_canister_ids,
false
may_schedule_heartbeat,
may_schedule_global_timer,
));
assert_eq!(heartbeat_and_timer_canister_ids, BTreeSet::new());
test.canister_state_mut(canister)
.inc_next_scheduled_method();
}

// Make canister export heartbeat and global timer.
test.canister_state_mut(canister)
.execution_state
.as_mut()
.unwrap()
.exports = ExportedFunctions::new(BTreeSet::from([
WasmMethod::System(SystemMethod::CanisterHeartbeat),
WasmMethod::System(SystemMethod::CanisterGlobalTimer),
]));
// Make canister able to schedule both heartbeat and global timer.
let may_schedule_heartbeat = true;
let may_schedule_global_timer = true;

// Set input.
test.canister_state_mut(canister)
Expand Down Expand Up @@ -4752,7 +4747,8 @@ fn test_is_next_method_added_to_task_queue() {
assert!(is_next_method_chosen(
test.canister_state_mut(canister),
&mut heartbeat_and_timer_canister_ids,
true
may_schedule_heartbeat,
may_schedule_global_timer,
));

// Since NextScheduledMethod is Message it is not expected that Heartbeat
Expand Down Expand Up @@ -4788,7 +4784,8 @@ fn test_is_next_method_added_to_task_queue() {
assert!(is_next_method_chosen(
test.canister_state_mut(canister),
&mut heartbeat_and_timer_canister_ids,
true
may_schedule_heartbeat,
may_schedule_global_timer,
));

assert_eq!(heartbeat_and_timer_canister_ids, BTreeSet::from([canister]));
Expand Down Expand Up @@ -4835,7 +4832,8 @@ fn test_is_next_method_added_to_task_queue() {
assert!(is_next_method_chosen(
test.canister_state_mut(canister),
&mut heartbeat_and_timer_canister_ids,
true
may_schedule_heartbeat,
may_schedule_global_timer,
));

assert_eq!(heartbeat_and_timer_canister_ids, BTreeSet::from([canister]));
Expand Down

0 comments on commit 0e23b0a

Please sign in to comment.