Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Add ParallelCommands system parameter #4749

Closed
1 change: 1 addition & 0 deletions crates/bevy_ecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ bevy_utils = { path = "../bevy_utils", version = "0.8.0-dev" }
bevy_ecs_macros = { path = "macros", version = "0.8.0-dev" }

async-channel = "1.4"
thread_local = "1.1.4"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this already in our tree?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but it is a nice crate that another PR (#4663) depends on as well.

fixedbitset = "0.4"
fxhash = "0.2"
downcast-rs = "1.2"
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_ecs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub mod prelude {
},
system::{
Commands, In, IntoChainSystem, IntoExclusiveSystem, IntoSystem, Local, NonSend,
NonSendMut, ParamSet, Query, RemovedComponents, Res, ResMut, System,
NonSendMut, ParallelCommands, ParamSet, Query, RemovedComponents, Res, ResMut, System,
SystemParamFunction,
},
world::{FromWorld, Mut, World},
Expand Down
7 changes: 7 additions & 0 deletions crates/bevy_ecs/src/system/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod command_queue;
mod parallel_scope;

use crate::{
bundle::Bundle,
Expand All @@ -8,6 +9,7 @@ use crate::{
};
use bevy_utils::tracing::{error, warn};
pub use command_queue::CommandQueue;
pub use parallel_scope::*;
use std::marker::PhantomData;

use super::Resource;
Expand Down Expand Up @@ -55,6 +57,11 @@ impl<'w, 's> Commands<'w, 's> {
}
}

/// Create a new `Commands` from a queue and an [`Entities`] reference.
pub fn new_from_entities(queue: &'s mut CommandQueue, entities: &'w Entities) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get a quick doc example of how you'd actually access this in e.g. a test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, it seems that so far this method cannot be effectively used, as SystemParam derives cannot currently implement custom apply strategies, and I don't think it'd be a great idea to add an entire manual SystemParam impl here to use as an example. I'll make a separate PR for custom apply strategies in SystemParam derives later on, which could then be used as an example here.

Self { queue, entities }
}

/// Creates a new empty [`Entity`] and returns an [`EntityCommands`] builder for it.
///
/// To directly spawn an entity with a [`Bundle`] included, you can use
Expand Down
99 changes: 99 additions & 0 deletions crates/bevy_ecs/src/system/commands/parallel_scope.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::cell::Cell;

use thread_local::ThreadLocal;

use crate::{
entity::Entities,
prelude::World,
system::{SystemParam, SystemParamFetch, SystemParamState},
};

use super::{CommandQueue, Commands};

#[doc(hidden)]
#[derive(Default)]
TheRawMeatball marked this conversation as resolved.
Show resolved Hide resolved
/// The internal [`SystemParamState`] of the [`ParallelCommands`] type
pub struct ParallelCommandsState {
TheRawMeatball marked this conversation as resolved.
Show resolved Hide resolved
thread_local_storage: ThreadLocal<Cell<CommandQueue>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should generalize this and add it to bevy_utils. This is seemingly a very useful abstraction that showed up in #4899 too.

}

/// An alternative to [`Commands`] that can be used in parallel contexts, such as those in [`Query::par_for_each`](crate::system::Query::par_for_each)
///
/// Note: Because command application order will depend on how many threads are ran, non-commutative commands may result in non-deterministic results.
///
TheRawMeatball marked this conversation as resolved.
Show resolved Hide resolved
/// Example:
/// ```
/// # use bevy_ecs::prelude::*;
/// # use bevy_tasks::ComputeTaskPool;
/// #
/// # #[derive(Component)]
/// # struct Velocity;
/// # impl Velocity { fn magnitude(&self) -> f32 { 42.0 } }
/// fn parallel_command_system(
/// mut query: Query<(Entity, &Velocity)>,
/// pool: Res<ComputeTaskPool>,
/// par_commands: ParallelCommands
/// ) {
/// query.par_for_each(&pool, 32, |(entity, velocity)| {
/// if velocity.magnitude() > 10.0 {
/// par_commands.command_scope(|mut commands| {
/// commands.entity(entity).despawn();
/// });
/// }
/// });
/// }
/// # bevy_ecs::system::assert_is_system(parallel_command_system);
///```
pub struct ParallelCommands<'w, 's> {
state: &'s mut ParallelCommandsState,
entities: &'w Entities,
}

impl SystemParam for ParallelCommands<'_, '_> {
type Fetch = ParallelCommandsState;
}

impl<'w, 's> SystemParamFetch<'w, 's> for ParallelCommandsState {
type Item = ParallelCommands<'w, 's>;

unsafe fn get_param(
state: &'s mut Self,
_: &crate::system::SystemMeta,
world: &'w World,
_: u32,
) -> Self::Item {
ParallelCommands {
state,
entities: world.entities(),
}
}
}

// SAFE: no component or resource access to report
unsafe impl SystemParamState for ParallelCommandsState {
fn init(_: &mut World, _: &mut crate::system::SystemMeta) -> Self {
Self::default()
}

fn apply(&mut self, world: &mut World) {
for cq in self.thread_local_storage.iter_mut() {
cq.get_mut().apply(world);
}
}
}

impl<'w, 's> ParallelCommands<'w, 's> {
pub fn command_scope<R>(&self, f: impl FnOnce(Commands) -> R) -> R {
let store = &self.state.thread_local_storage;
let command_queue_cell = store.get_or_default();
let mut command_queue = command_queue_cell.take();

let r = f(Commands::new_from_entities(
&mut command_queue,
self.entities,
));

command_queue_cell.set(command_queue);
r
}
}