Skip to content

Commit

Permalink
Improve safety for the multi-threaded executor using UnsafeWorldCell (
Browse files Browse the repository at this point in the history
#8292)

# Objective

Fix #7833.

Safety comments in the multi-threaded executor don't really talk about
system world accesses, which makes it unclear if the code is actually
valid.

## Solution

Update the `System` trait to use `UnsafeWorldCell`. This type's API is
written in a way that makes it much easier to cleanly maintain safety
invariants. Use this type throughout the multi-threaded executor, with a
liberal use of safety comments.

---

## Migration Guide

The `System` trait now uses `UnsafeWorldCell` instead of `&World`. This
type provides a robust API for interior mutable world access.
- The method `run_unsafe` uses this type to manage world mutations
across multiple threads.
- The method `update_archetype_component_access` uses this type to
ensure that only world metadata can be used.

```rust
let mut system = IntoSystem::into_system(my_system);
system.initialize(&mut world);

// Before:
system.update_archetype_component_access(&world);
unsafe { system.run_unsafe(&world) }

// After:
system.update_archetype_component_access(world.as_unsafe_world_cell_readonly());
unsafe { system.run_unsafe(world.as_unsafe_world_cell()) }
```

---------

Co-authored-by: James Liu <contact@jamessliu.com>
  • Loading branch information
JoJoJet and james7132 committed May 29, 2023
1 parent 4465f25 commit 85a918a
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 69 deletions.
2 changes: 1 addition & 1 deletion benches/benches/bevy_ecs/iteration/heavy_compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub fn heavy_compute(c: &mut Criterion) {

let mut system = IntoSystem::into_system(sys);
system.initialize(&mut world);
system.update_archetype_component_access(&world);
system.update_archetype_component_access(world.as_unsafe_world_cell());

b.iter(move || system.run((), &mut world));
});
Expand Down
2 changes: 1 addition & 1 deletion benches/benches/bevy_ecs/iteration/iter_simple_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl Benchmark {

let mut system = IntoSystem::into_system(query_system);
system.initialize(&mut world);
system.update_archetype_component_access(&world);
system.update_archetype_component_access(world.as_unsafe_world_cell());
Self(world, Box::new(system))
}

Expand Down
2 changes: 1 addition & 1 deletion benches/benches/bevy_ecs/world/world_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ pub fn query_get_component_simple(criterion: &mut Criterion) {

let mut system = IntoSystem::into_system(query_system);
system.initialize(&mut world);
system.update_archetype_component_access(&world);
system.update_archetype_component_access(world.as_unsafe_world_cell());

bencher.iter(|| system.run(entity, &mut world));
});
Expand Down
5 changes: 3 additions & 2 deletions crates/bevy_ecs/src/schedule/condition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::ops::Not;
use crate::component::{self, ComponentId};
use crate::query::Access;
use crate::system::{CombinatorSystem, Combine, IntoSystem, ReadOnlySystem, System};
use crate::world::unsafe_world_cell::UnsafeWorldCell;
use crate::world::World;

pub type BoxedCondition = Box<dyn ReadOnlySystem<In = (), Out = bool>>;
Expand Down Expand Up @@ -990,7 +991,7 @@ where
self.condition.is_exclusive()
}

unsafe fn run_unsafe(&mut self, input: Self::In, world: &World) -> Self::Out {
unsafe fn run_unsafe(&mut self, input: Self::In, world: UnsafeWorldCell) -> Self::Out {
// SAFETY: The inner condition system asserts its own safety.
!self.condition.run_unsafe(input, world)
}
Expand All @@ -1007,7 +1008,7 @@ where
self.condition.initialize(world);
}

fn update_archetype_component_access(&mut self, world: &World) {
fn update_archetype_component_access(&mut self, world: UnsafeWorldCell) {
self.condition.update_archetype_component_access(world);
}

Expand Down
89 changes: 55 additions & 34 deletions crates/bevy_ecs/src/schedule/executor/multi_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
is_apply_system_buffers, BoxedCondition, ExecutorKind, SystemExecutor, SystemSchedule,
},
system::BoxedSystem,
world::World,
world::{unsafe_world_cell::UnsafeWorldCell, World},
};

use crate as bevy_ecs;
Expand Down Expand Up @@ -184,7 +184,6 @@ impl SystemExecutor for MultiThreadedExecutor {
.map(|e| e.0.clone());
let thread_executor = thread_executor.as_deref();

let world = SyncUnsafeCell::from_mut(world);
let SyncUnsafeSchedule {
systems,
mut conditions,
Expand All @@ -197,10 +196,13 @@ impl SystemExecutor for MultiThreadedExecutor {
// the executor itself is a `Send` future so that it can run
// alongside systems that claim the local thread
let executor = async {
let world_cell = world.as_unsafe_world_cell();
while self.num_completed_systems < self.num_systems {
// SAFETY: self.ready_systems does not contain running systems
// SAFETY:
// - self.ready_systems does not contain running systems.
// - `world_cell` has mutable access to the entire world.
unsafe {
self.spawn_system_tasks(scope, systems, &mut conditions, world);
self.spawn_system_tasks(scope, systems, &mut conditions, world_cell);
}

if self.num_running_systems > 0 {
Expand Down Expand Up @@ -231,7 +233,7 @@ impl SystemExecutor for MultiThreadedExecutor {
if self.apply_final_buffers {
// Do one final apply buffers after all systems have completed
// Commands should be applied while on the scope's thread, not the executor's thread
let res = apply_system_buffers(&self.unapplied_systems, systems, world.get_mut());
let res = apply_system_buffers(&self.unapplied_systems, systems, world);
if let Err(payload) = res {
let mut panic_payload = self.panic_payload.lock().unwrap();
*panic_payload = Some(payload);
Expand Down Expand Up @@ -283,14 +285,16 @@ impl MultiThreadedExecutor {
}

/// # Safety
/// Caller must ensure that `self.ready_systems` does not contain any systems that
/// have been mutably borrowed (such as the systems currently running).
/// - Caller must ensure that `self.ready_systems` does not contain any systems that
/// have been mutably borrowed (such as the systems currently running).
/// - `world_cell` must have permission to access all world data (not counting
/// any world data that is claimed by systems currently running on this executor).
unsafe fn spawn_system_tasks<'scope>(
&mut self,
scope: &Scope<'_, 'scope, ()>,
systems: &'scope [SyncUnsafeCell<BoxedSystem>],
conditions: &mut Conditions,
cell: &'scope SyncUnsafeCell<World>,
world_cell: UnsafeWorldCell<'scope>,
) {
if self.exclusive_running {
return;
Expand All @@ -307,10 +311,7 @@ impl MultiThreadedExecutor {
// Therefore, no other reference to this system exists and there is no aliasing.
let system = unsafe { &mut *systems[system_index].get() };

// SAFETY: No exclusive system is running.
// Therefore, there is no existing mutable reference to the world.
let world = unsafe { &*cell.get() };
if !self.can_run(system_index, system, conditions, world) {
if !self.can_run(system_index, system, conditions, world_cell) {
// NOTE: exclusive systems with ambiguities are susceptible to
// being significantly displaced here (compared to single-threaded order)
// if systems after them in topological order can run
Expand All @@ -320,9 +321,10 @@ impl MultiThreadedExecutor {

self.ready_systems.set(system_index, false);

// SAFETY: Since `self.can_run` returned true earlier, it must have called
// `update_archetype_component_access` for each run condition.
if !self.should_run(system_index, system, conditions, world) {
// SAFETY: `can_run` returned true, which means that:
// - It must have called `update_archetype_component_access` for each run condition.
// - There can be no systems running whose accesses would conflict with any conditions.
if !self.should_run(system_index, system, conditions, world_cell) {
self.skip_system_and_signal_dependents(system_index);
continue;
}
Expand All @@ -331,20 +333,23 @@ impl MultiThreadedExecutor {
self.num_running_systems += 1;

if self.system_task_metadata[system_index].is_exclusive {
// SAFETY: `can_run` confirmed that no systems are running.
// Therefore, there is no existing reference to the world.
// SAFETY: `can_run` returned true for this system, which means
// that no other systems currently have access to the world.
let world = unsafe { world_cell.world_mut() };
// SAFETY: `can_run` returned true for this system,
// which means no systems are currently borrowed.
unsafe {
let world = &mut *cell.get();
self.spawn_exclusive_system_task(scope, system_index, systems, world);
}
break;
}

// SAFETY:
// - No other reference to this system exists.
// - `self.can_run` has been called, which calls `update_archetype_component_access` with this system.
// - `can_run` has been called, which calls `update_archetype_component_access` with this system.
// - `can_run` returned true, so no systems with conflicting world access are running.
unsafe {
self.spawn_system_task(scope, system_index, systems, world);
self.spawn_system_task(scope, system_index, systems, world_cell);
}
}

Expand All @@ -357,7 +362,7 @@ impl MultiThreadedExecutor {
system_index: usize,
system: &mut BoxedSystem,
conditions: &mut Conditions,
world: &World,
world: UnsafeWorldCell,
) -> bool {
let system_meta = &self.system_task_metadata[system_index];
if system_meta.is_exclusive && self.num_running_systems > 0 {
Expand Down Expand Up @@ -413,15 +418,17 @@ impl MultiThreadedExecutor {
}

/// # Safety
///
/// `update_archetype_component` must have been called with `world`
/// for each run condition in `conditions`.
/// * `world` must have permission to read any world data required by
/// the system's conditions: this includes conditions for the system
/// itself, and conditions for any of the system's sets.
/// * `update_archetype_component` must have been called with `world`
/// for each run condition in `conditions`.
unsafe fn should_run(
&mut self,
system_index: usize,
_system: &BoxedSystem,
conditions: &mut Conditions,
world: &World,
world: UnsafeWorldCell,
) -> bool {
let mut should_run = !self.skipped_systems.contains(system_index);
for set_idx in conditions.sets_with_conditions_of_systems[system_index].ones() {
Expand All @@ -430,7 +437,10 @@ impl MultiThreadedExecutor {
}

// Evaluate the system set's conditions.
// SAFETY: `update_archetype_component_access` has been called for each run condition.
// SAFETY:
// - The caller ensures that `world` has permission to read any data
// required by the conditions.
// - `update_archetype_component_access` has been called for each run condition.
let set_conditions_met =
evaluate_and_fold_conditions(&mut conditions.set_conditions[set_idx], world);

Expand All @@ -444,7 +454,10 @@ impl MultiThreadedExecutor {
}

// Evaluate the system's conditions.
// SAFETY: `update_archetype_component_access` has been called for each run condition.
// SAFETY:
// - The caller ensures that `world` has permission to read any data
// required by the conditions.
// - `update_archetype_component_access` has been called for each run condition.
let system_conditions_met =
evaluate_and_fold_conditions(&mut conditions.system_conditions[system_index], world);

Expand All @@ -459,14 +472,16 @@ impl MultiThreadedExecutor {

/// # Safety
/// - Caller must not alias systems that are running.
/// - `world` must have permission to access the world data
/// used by the specified system.
/// - `update_archetype_component_access` must have been called with `world`
/// on the system assocaited with `system_index`.
unsafe fn spawn_system_task<'scope>(
&mut self,
scope: &Scope<'_, 'scope, ()>,
system_index: usize,
systems: &'scope [SyncUnsafeCell<BoxedSystem>],
world: &'scope World,
world: UnsafeWorldCell<'scope>,
) {
// SAFETY: this system is not running, no other reference exists
let system = unsafe { &mut *systems[system_index].get() };
Expand All @@ -483,7 +498,8 @@ impl MultiThreadedExecutor {
let system_guard = system_span.enter();
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
// SAFETY:
// - Access: TODO.
// - The caller ensures that we have permission to
// access the world data used by the system.
// - `update_archetype_component_access` has been called.
unsafe { system.run_unsafe((), world) };
}));
Expand Down Expand Up @@ -688,18 +704,23 @@ fn apply_system_buffers(
}

/// # Safety
///
/// `update_archetype_component_access` must have been called
/// with `world` for each condition in `conditions`.
unsafe fn evaluate_and_fold_conditions(conditions: &mut [BoxedCondition], world: &World) -> bool {
/// - `world` must have permission to read any world data
/// required by `conditions`.
/// - `update_archetype_component_access` must have been called
/// with `world` for each condition in `conditions`.
unsafe fn evaluate_and_fold_conditions(
conditions: &mut [BoxedCondition],
world: UnsafeWorldCell,
) -> bool {
// not short-circuiting is intentional
#[allow(clippy::unnecessary_fold)]
conditions
.iter_mut()
.map(|condition| {
#[cfg(feature = "trace")]
let _condition_span = info_span!("condition", name = &*condition.name()).entered();
// SAFETY: caller ensures system access is compatible
// SAFETY: The caller ensures that `world` has permission to
// access any data required by the condition.
unsafe { condition.run_unsafe((), world) }
})
.fold(true, |acc, res| acc && res)
Expand Down
5 changes: 3 additions & 2 deletions crates/bevy_ecs/src/system/combinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
component::{ComponentId, Tick},
prelude::World,
query::Access,
world::unsafe_world_cell::UnsafeWorldCell,
};

use super::{ReadOnlySystem, System};
Expand Down Expand Up @@ -157,7 +158,7 @@ where
self.a.is_exclusive() || self.b.is_exclusive()
}

unsafe fn run_unsafe(&mut self, input: Self::In, world: &World) -> Self::Out {
unsafe fn run_unsafe(&mut self, input: Self::In, world: UnsafeWorldCell) -> Self::Out {
Func::combine(
input,
// SAFETY: The world accesses for both underlying systems have been registered,
Expand Down Expand Up @@ -198,7 +199,7 @@ where
self.component_access.extend(self.b.component_access());
}

fn update_archetype_component_access(&mut self, world: &World) {
fn update_archetype_component_access(&mut self, world: UnsafeWorldCell) {
self.a.update_archetype_component_access(world);
self.b.update_archetype_component_access(world);

Expand Down
6 changes: 3 additions & 3 deletions crates/bevy_ecs/src/system/exclusive_function_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
check_system_change_tick, ExclusiveSystemParam, ExclusiveSystemParamItem, In, IntoSystem,
System, SystemMeta,
},
world::World,
world::{unsafe_world_cell::UnsafeWorldCell, World},
};

use bevy_utils::all_tuples;
Expand Down Expand Up @@ -86,7 +86,7 @@ where
}

#[inline]
unsafe fn run_unsafe(&mut self, _input: Self::In, _world: &World) -> Self::Out {
unsafe fn run_unsafe(&mut self, _input: Self::In, _world: UnsafeWorldCell) -> Self::Out {
panic!("Cannot run exclusive systems with a shared World reference");
}

Expand Down Expand Up @@ -134,7 +134,7 @@ where
self.param_state = Some(F::Param::init(world, &mut self.system_meta));
}

fn update_archetype_component_access(&mut self, _world: &World) {}
fn update_archetype_component_access(&mut self, _world: UnsafeWorldCell) {}

#[inline]
fn check_change_tick(&mut self, change_tick: Tick) {
Expand Down
8 changes: 4 additions & 4 deletions crates/bevy_ecs/src/system/function_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
prelude::FromWorld,
query::{Access, FilteredAccessSet},
system::{check_system_change_tick, ReadOnlySystemParam, System, SystemParam, SystemParamItem},
world::{World, WorldId},
world::{unsafe_world_cell::UnsafeWorldCell, World, WorldId},
};

use bevy_utils::all_tuples;
Expand Down Expand Up @@ -417,7 +417,7 @@ where
}

#[inline]
unsafe fn run_unsafe(&mut self, input: Self::In, world: &World) -> Self::Out {
unsafe fn run_unsafe(&mut self, input: Self::In, world: UnsafeWorldCell) -> Self::Out {
let change_tick = world.increment_change_tick();

// SAFETY:
Expand All @@ -428,7 +428,7 @@ where
let params = F::Param::get_param(
self.param_state.as_mut().expect(Self::PARAM_MESSAGE),
&self.system_meta,
world.as_unsafe_world_cell_migration_internal(),
world,
change_tick,
);
let out = self.func.run(input, params);
Expand Down Expand Up @@ -457,7 +457,7 @@ where
self.param_state = Some(F::Param::init_state(world, &mut self.system_meta));
}

fn update_archetype_component_access(&mut self, world: &World) {
fn update_archetype_component_access(&mut self, world: UnsafeWorldCell) {
assert!(self.world_id == Some(world.id()), "Encountered a mismatched World. A System cannot be used with Worlds other than the one it was initialized with.");
let archetypes = world.archetypes();
let new_generation = archetypes.generation();
Expand Down
6 changes: 3 additions & 3 deletions crates/bevy_ecs/src/system/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1610,7 +1610,7 @@ mod tests {

// set up system and verify its access is empty
system.initialize(&mut world);
system.update_archetype_component_access(&world);
system.update_archetype_component_access(world.as_unsafe_world_cell());
assert_eq!(
system
.archetype_component_access()
Expand Down Expand Up @@ -1640,7 +1640,7 @@ mod tests {
world.spawn((B, C));

// update system and verify its accesses are correct
system.update_archetype_component_access(&world);
system.update_archetype_component_access(world.as_unsafe_world_cell());
assert_eq!(
system
.archetype_component_access()
Expand All @@ -1658,7 +1658,7 @@ mod tests {
.unwrap(),
);
world.spawn((A, B, D));
system.update_archetype_component_access(&world);
system.update_archetype_component_access(world.as_unsafe_world_cell());
assert_eq!(
system
.archetype_component_access()
Expand Down

0 comments on commit 85a918a

Please sign in to comment.