Skip to content

Commit

Permalink
[RUN-878] Add DTS slicing for messages that touch many pages.
Browse files Browse the repository at this point in the history
  • Loading branch information
alexandru-uta committed Jan 11, 2024
1 parent e4cfd55 commit 8f2ae8e
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 1 deletion.
34 changes: 34 additions & 0 deletions rs/canister_sandbox/backend_lib/src/dts.rs
Expand Up @@ -210,6 +210,29 @@ impl DeterministicTimeSlicing {
})
}

// Performance improvement:
// - The function is called from the execution thread when too many dirty
// pages are created during message execution.
// - The function transitions to `Paused` if it is possible to continue the
// dirty page copying in the next slice.
fn try_yield_for_dirty_memory_copy(
&self,
_instruction_counter: i64,
) -> Result<SliceExecutionOutput, HypervisorError> {
let mut state = self.state.lock().unwrap();
assert_eq!(state.execution_status, ExecutionStatus::Running);

state.execution_status = ExecutionStatus::Paused;
Ok(SliceExecutionOutput {
// Since this is a performance optimization we can consider
// that the extra round for copying takes 1 instruction.
// The overall behavior (i.e., number of executed instructions)
// will be the (almost) same as if we didn't pause. We don't return 0
// to avoid the confusion that no progress was made.
executed_instructions: NumInstructions::from(1),
})
}

// Sleeps while the current execution state is `Paused`.
// Returns the instruction limit for the next slice if execution was resumed.
// Otherwise, returns an error that indicates that execution was aborted.
Expand Down Expand Up @@ -288,6 +311,17 @@ impl OutOfInstructionsHandler for DeterministicTimeSlicingHandler {
(self.pause_callback)(slice, paused);
self.dts.wait_for_resume_or_abort()
}

fn yield_for_dirty_memory_copy(&self, instruction_counter: i64) -> HypervisorResult<i64> {
let slice = self
.dts
.try_yield_for_dirty_memory_copy(instruction_counter)?;
let paused = PausedExecution {
dts: self.dts.clone(),
};
(self.pause_callback)(slice, paused);
self.dts.wait_for_resume_or_abort()
}
}

#[cfg(test)]
Expand Down
10 changes: 10 additions & 0 deletions rs/config/src/embedders.rs
Expand Up @@ -51,6 +51,11 @@ pub(crate) const DEFAULT_MAX_SANDBOX_COUNT: usize = 2_000;
/// duration and sandbox process eviction is activated.
pub(crate) const DEFAULT_MAX_SANDBOX_IDLE_TIME: Duration = Duration::from_secs(30 * 60);

/// The maximum number of pages that a message dirties without optimizing dirty
/// page copying by triggering a new execution slice for copying and using prefaulting.
/// This default is 1 GiB. This is 262_144 pages of 4 KiB each.
pub(crate) const DEFAULT_MAX_DIRTY_PAGES_WITHOUT_OPTIMIZATION: usize = 262_144;

#[allow(non_upper_case_globals)]
const KiB: u64 = 1024;
#[allow(non_upper_case_globals)]
Expand Down Expand Up @@ -169,6 +174,10 @@ pub struct Config {
/// If this flag is enabled, then execution of a slice will produce a log
/// entry with the number of executed instructions and the duration.
pub trace_execution: FlagStatus,

/// The maximum number of pages that a message dirties without optimizing dirty
/// page copying by triggering a new execution slice for copying and using prefaulting.
pub max_dirty_pages_without_optimization: usize,
}

impl Config {
Expand All @@ -194,6 +203,7 @@ impl Config {
subnet_type: SubnetType::Application,
dirty_page_overhead: NumInstructions::new(0),
trace_execution: FlagStatus::Disabled,
max_dirty_pages_without_optimization: DEFAULT_MAX_DIRTY_PAGES_WITHOUT_OPTIMIZATION,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion rs/embedders/src/lib.rs
Expand Up @@ -29,7 +29,7 @@ pub struct WasmExecutionInput {
pub compilation_cache: Arc<CompilationCache>,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct InstanceRunResult {
pub dirty_pages: Vec<PageIndex>,
pub stable_memory_dirty_pages: Vec<PageIndex>,
Expand Down
26 changes: 26 additions & 0 deletions rs/embedders/src/wasm_executor.rs
Expand Up @@ -654,6 +654,32 @@ pub fn process(
.min(message_instruction_limit);
let message_instructions_left = message_instruction_limit - message_instructions_executed;

// In case the message dirtied too many pages, as a performance optimization we will
// yield the control to the replica and then resume copying dirty pages in a new execution slice.
if let Ok(ref res) = run_result {
if res.dirty_pages.len() > embedder.config().max_dirty_pages_without_optimization {
if let Err(err) = system_api.yield_for_dirty_memory_copy(instruction_counter) {
// If there was an error slicing, propagate this error to the main result and return.
// Otherwise, the regular message path takes place.
return (
SliceExecutionOutput {
executed_instructions: slice_instructions_executed,
},
WasmExecutionOutput {
wasm_result: Err(err),
num_instructions_left: message_instructions_left,
allocated_bytes: NumBytes::from(0),
allocated_message_bytes: NumBytes::from(0),
instance_stats,
system_api_call_counters,
},
None,
Ok(instance),
);
}
}
}

// Has the side effect of deallocating memory if message failed and
// returning cycles from a request that wasn't sent.
let mut wasm_result = system_api.take_execution_result(run_result.as_ref().err());
Expand Down
151 changes: 151 additions & 0 deletions rs/execution_environment/src/hypervisor/tests.rs
Expand Up @@ -6533,3 +6533,154 @@ fn stable_memory_grow_does_not_reserve_cycles_on_out_of_memory() {
.reserved_balance();
assert_eq!(reserved_cycles_before, reserved_cycles_after);
}

fn generate_wat_to_touch_pages(pages_to_touch: usize) -> String {
format!(
r#"
(module
(import "ic0" "msg_reply" (func $msg_reply))
(import "ic0" "msg_reply_data_append" (func $msg_reply_data_append (param i32 i32)))
(func (export "canister_update test")
(local $j i32)
(loop $my_loop
;; add one OS page to $j
local.get $j
i32.const 4096
i32.add
local.set $j
;; store 1 to heap[$j]
(i32.store (local.get $j) (i32.const 1))
;; loop if $j is less than number of OS pages that we want to touch
local.get $j
i32.const {bytes_to_touch}
i32.lt_s
br_if $my_loop
)
(call $msg_reply_data_append
(i32.const 0)
(i32.const 8))
(call $msg_reply)
)
(memory $memory 128)
)"#,
bytes_to_touch = pages_to_touch * 4096
)
}

#[test]
fn yield_triggers_dts_slice_with_many_dirty_pages() {
let pages_to_touch = 100;
let wat = generate_wat_to_touch_pages(pages_to_touch);

const CYCLES: Cycles = Cycles::new(20_000_000_000_000);

let mut test = ExecutionTestBuilder::new()
.with_deterministic_time_slicing()
.with_manual_execution()
.with_max_dirty_pages_optimization_embedder_config(pages_to_touch - 1)
.build();

let wasm = wat::parse_str(wat).unwrap();
let canister_id = test.canister_from_cycles_and_binary(CYCLES, wasm).unwrap();

let _result = test.ingress_raw(canister_id, "test", vec![]);

// The test touches `pages_to_touch`, but the embedder is configured to yield when `pages_to_touch - 1` pages are dirty.
// Therefore, we should have two slices here.
test.execute_slice(canister_id);
assert_eq!(
test.canister_state(canister_id).next_execution(),
NextExecution::ContinueLong
);
test.execute_slice(canister_id);
assert_eq!(
test.canister_state(canister_id).next_execution(),
NextExecution::None
);
}

#[test]
fn yield_does_not_trigger_dts_slice_without_enough_dirty_pages() {
let pages_to_touch = 100;
let wat = generate_wat_to_touch_pages(pages_to_touch);

const CYCLES: Cycles = Cycles::new(20_000_000_000_000);

let mut test = ExecutionTestBuilder::new()
.with_deterministic_time_slicing()
.with_manual_execution()
.with_max_dirty_pages_optimization_embedder_config(pages_to_touch + 1)
.build();

let wasm = wat::parse_str(wat).unwrap();
let canister_id = test.canister_from_cycles_and_binary(CYCLES, wasm).unwrap();

let _result = test.ingress_raw(canister_id, "test", vec![]);

// The test touches `pages_to_touch`, but the embedder is configured to yield when `pages_to_touch + 1` pages are dirty.
// Therefore, we should have only 1 slice here.
test.execute_slice(canister_id);
assert_eq!(
test.canister_state(canister_id).next_execution(),
NextExecution::None
);
}

#[test]
fn yield_abort_does_not_modify_state() {
let pages_to_touch = 100;
let wat = generate_wat_to_touch_pages(pages_to_touch);

const CYCLES: Cycles = Cycles::new(20_000_000_000_000);

let mut test = ExecutionTestBuilder::new()
.with_deterministic_time_slicing()
.with_manual_execution()
.with_max_dirty_pages_optimization_embedder_config(pages_to_touch - 1)
.build();

let wasm = wat::parse_str(wat).unwrap();
let canister_id = test.canister_from_cycles_and_binary(CYCLES, wasm).unwrap();

let _result = test.ingress_raw(canister_id, "test", vec![]);

// The test touches `pages_to_touch`, but the embedder is configured to yield when `pages_to_touch - 1` pages are dirty.
// Therefore, we should have 2 slices here.
test.execute_slice(canister_id);
assert_eq!(
test.canister_state(canister_id).next_execution(),
NextExecution::ContinueLong
);
// Abort before executing the last slice.
test.abort_all_paused_executions();

// Test that the abort means the canister's state is not modified.
let mut dirty_pages = test
.execution_state(canister_id)
.wasm_memory
.page_map
.delta_pages_iter()
.count();

assert_eq!(dirty_pages, 0);

// Start execution from scratch, let the slices execute fully and check dirty pages again.
test.execute_slice(canister_id);
assert_eq!(
test.canister_state(canister_id).next_execution(),
NextExecution::ContinueLong
);
test.execute_slice(canister_id);
assert_eq!(
test.canister_state(canister_id).next_execution(),
NextExecution::None
);
dirty_pages += test
.execution_state(canister_id)
.wasm_memory
.page_map
.delta_pages_iter()
.count();
// This time the dirty pages should be equal to `pages_to_touch`.
assert_eq!(dirty_pages, pages_to_touch);
}
9 changes: 9 additions & 0 deletions rs/interfaces/src/execution_environment.rs
Expand Up @@ -499,6 +499,10 @@ pub trait OutOfInstructionsHandler {
// the function returns `Err(HypervisorError::InstructionLimitExceeded)`.
// Otherwise, the function returns a new positive instruction counter.
fn out_of_instructions(&self, instruction_counter: i64) -> HypervisorResult<i64>;

// Invoked only when a long execution dirties many memory pages to yield control
// and start the copy only in a new slice. This is a performance improvement.
fn yield_for_dirty_memory_copy(&self, instruction_counter: i64) -> HypervisorResult<i64>;
}

/// Indicates the type of stable memory API being used.
Expand Down Expand Up @@ -896,6 +900,11 @@ pub trait SystemApi {
/// Otherwise, the function return a new non-negative instruction counter.
fn out_of_instructions(&mut self, instruction_counter: i64) -> HypervisorResult<i64>;

/// This system call is not part of the public spec and it is invoked when
/// Wasm execution has a large number of dirty pages that, for performance reasons,
/// should be copied in a new execution slice.
fn yield_for_dirty_memory_copy(&mut self, instruction_counter: i64) -> HypervisorResult<i64>;

/// This system call is not part of the public spec. It's called after a
/// native `memory.grow` or `table.grow` has been called to check whether
/// there's enough available memory left.
Expand Down
31 changes: 31 additions & 0 deletions rs/system_api/src/lib.rs
Expand Up @@ -2386,6 +2386,33 @@ impl SystemApi for SystemApiImpl {
result
}

/// Performance improvement:
/// This function is called after a message execution succeeded but the number of
/// dirty pages is large enough to warrant an extra round of execution.
/// Therefore, we yield control back to the replica and we wait for the
/// next round to start copying dirty pages.
fn yield_for_dirty_memory_copy(&mut self, instruction_counter: i64) -> HypervisorResult<i64> {
let result = self
.out_of_instructions_handler
.yield_for_dirty_memory_copy(instruction_counter);
if let Ok(new_slice_instruction_limit) = result {
// A new slice has started, update the instruction sum and limit.
let slice_instructions = self
.current_slice_instruction_limit
.saturating_sub(instruction_counter)
.max(0);
self.instructions_executed_before_current_slice += slice_instructions;
self.current_slice_instruction_limit = new_slice_instruction_limit;
}
trace_syscall!(
self,
yield_for_dirty_memory_copy,
result,
instruction_counter
);
result
}

fn update_available_memory(
&mut self,
native_memory_grow_res: i64,
Expand Down Expand Up @@ -2960,6 +2987,10 @@ impl OutOfInstructionsHandler for DefaultOutOfInstructionsHandler {
fn out_of_instructions(&self, _instruction_counter: i64) -> HypervisorResult<i64> {
Err(HypervisorError::InstructionLimitExceeded)
}

fn yield_for_dirty_memory_copy(&self, _instruction_counter: i64) -> HypervisorResult<i64> {
Err(HypervisorError::InstructionLimitExceeded)
}
}

pub(crate) fn copy_cycles_to_heap(
Expand Down
7 changes: 7 additions & 0 deletions rs/test_utilities/execution_environment/src/lib.rs
Expand Up @@ -1917,6 +1917,13 @@ impl ExecutionTestBuilder {
self
}

pub fn with_max_dirty_pages_optimization_embedder_config(mut self, no_pages: usize) -> Self {
self.execution_config
.embedders_config
.max_dirty_pages_without_optimization = no_pages;
self
}

pub fn build(self) -> ExecutionTest {
let own_range = CanisterIdRange {
start: CanisterId::from(CANISTER_IDS_PER_SUBNET),
Expand Down

0 comments on commit 8f2ae8e

Please sign in to comment.