Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Extract parallel queue abstraction #7348

Merged
merged 13 commits into from
Feb 19, 2024
1 change: 0 additions & 1 deletion crates/bevy_ecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ bevy_utils = { path = "../bevy_utils", version = "0.12.0" }
bevy_ecs_macros = { path = "macros", version = "0.12.0" }

async-channel = "2.1.0"
thread_local = "1.1.4"
fixedbitset = "0.4.2"
rustc-hash = "1.1"
downcast-rs = "1.2"
Expand Down
25 changes: 8 additions & 17 deletions crates/bevy_ecs/src/system/commands/parallel_scope.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::cell::Cell;

use thread_local::ThreadLocal;
use bevy_utils::Parallel;

use crate::{
self as bevy_ecs,
Expand All @@ -13,7 +11,7 @@ use super::{CommandQueue, Commands};

#[derive(Default)]
struct ParallelCommandQueue {
thread_local_storage: ThreadLocal<Cell<CommandQueue>>,
thread_queues: Parallel<CommandQueue>,
}

alice-i-cecile marked this conversation as resolved.
Show resolved Hide resolved
/// An alternative to [`Commands`] that can be used in parallel contexts, such as those in [`Query::par_iter`](crate::system::Query::par_iter)
Expand Down Expand Up @@ -53,8 +51,8 @@ impl SystemBuffer for ParallelCommandQueue {
fn apply(&mut self, _system_meta: &SystemMeta, world: &mut World) {
#[cfg(feature = "trace")]
let _system_span = _system_meta.commands_span.enter();
for cq in &mut self.thread_local_storage {
cq.get_mut().apply(world);
for cq in self.thread_queues.iter_mut() {
cq.apply(world);
}
}
}
Expand All @@ -64,16 +62,9 @@ impl<'w, 's> ParallelCommands<'w, 's> {
///
/// For an example, see the type-level documentation for [`ParallelCommands`].
pub fn command_scope<R>(&self, f: impl FnOnce(Commands) -> R) -> R {
let store = &self.state.thread_local_storage;
let command_queue_cell = store.get_or_default();
let mut command_queue = command_queue_cell.take();

let r = f(Commands::new_from_entities(
&mut command_queue,
self.entities,
));

command_queue_cell.set(command_queue);
r
self.state.thread_queues.scope(|queue| {
let commands = Commands::new_from_entities(queue, self.entities);
f(commands)
})
}
}
17 changes: 7 additions & 10 deletions crates/bevy_render/src/view/visibility/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use bevy_ecs::prelude::*;
use bevy_hierarchy::{Children, Parent};
use bevy_reflect::{std_traits::ReflectDefault, Reflect};
use bevy_transform::{components::GlobalTransform, TransformSystem};
use std::cell::Cell;
use thread_local::ThreadLocal;
use bevy_utils::Parallel;

use crate::deterministic::DeterministicRenderingConfig;
use crate::{
Expand Down Expand Up @@ -370,7 +369,7 @@ fn reset_view_visibility(mut query: Query<&mut ViewVisibility>) {
/// [`ViewVisibility`] of all entities, and for each view also compute the [`VisibleEntities`]
/// for that view.
pub fn check_visibility(
mut thread_queues: Local<ThreadLocal<Cell<Vec<Entity>>>>,
mut thread_queues: Local<Parallel<Vec<Entity>>>,
mut view_query: Query<(
&mut VisibleEntities,
&Frustum,
Expand Down Expand Up @@ -438,15 +437,13 @@ pub fn check_visibility(
}

view_visibility.set();
let cell = thread_queues.get_or_default();
let mut queue = cell.take();
queue.push(entity);
cell.set(queue);
thread_queues.scope(|queue| {
queue.push(entity);
});
});

for cell in &mut thread_queues {
visible_entities.entities.append(cell.get_mut());
}
visible_entities.entities.clear();
thread_queues.drain_into(&mut visible_entities.entities);
if deterministic_rendering_config.stable_sort_z_fighting {
// We can use the faster unstable sort here because
// the values (`Entity`) are guaranteed to be unique.
Expand Down
1 change: 1 addition & 0 deletions crates/bevy_utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ hashbrown = { version = "0.14", features = ["serde"] }
bevy_utils_proc_macros = { version = "0.12.0", path = "macros" }
petgraph = "0.6"
thiserror = "1.0"
thread_local = "1.0"
nonmax = "0.5"
smallvec = { version = "1.11", features = ["serde", "union", "const_generics"] }

Expand Down
2 changes: 2 additions & 0 deletions crates/bevy_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod default;
mod float_ord;
pub mod intern;
mod once;
mod parallel_queue;

pub use crate::uuid::Uuid;
pub use ahash::{AHasher, RandomState};
Expand All @@ -30,6 +31,7 @@ pub use cow_arc::*;
pub use default::default;
pub use float_ord::*;
pub use hashbrown;
pub use parallel_queue::*;
pub use petgraph;
pub use smallvec;
pub use thiserror;
Expand Down
72 changes: 72 additions & 0 deletions crates/bevy_utils/src/parallel_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use core::cell::Cell;
use thread_local::ThreadLocal;

/// A cohesive set of thread-local values of a given type.
///
/// Mutable references can be fetched if `T: Default` via [`Parallel::scope`].
#[derive(Default)]
pub struct Parallel<T: Send> {
locals: ThreadLocal<Cell<T>>,
}

impl<T: Send> Parallel<T> {
/// Gets a mutable iterator over all of the per-thread queues.
pub fn iter_mut(&mut self) -> impl Iterator<Item = &'_ mut T> {
self.locals.iter_mut().map(|cell| cell.get_mut())
}

/// Clears all of the stored thread local values.
pub fn clear(&mut self) {
self.locals.clear();
}
}

impl<T: Default + Send> Parallel<T> {
/// Retrieves the thread-local value for the current thread and runs `f` on it.
///
/// If there is no thread-local value, it will be initialized to it's default.
pub fn scope<R>(&self, f: impl FnOnce(&mut T) -> R) -> R {
let cell = self.locals.get_or_default();
let mut value = cell.take();
let ret = f(&mut value);
cell.set(value);
Copy link
Member

@JoJoJet JoJoJet Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a panic occurs in the scope and gets caught, any commands added during the scope will be lost. This is probably desirable but I think it's worth mentioning in a comment.

ret
}
}

impl<T, I> Parallel<I>
where
I: IntoIterator<Item = T> + Default + Send + 'static,
{
/// Drains all enqueued items from all threads and returns an iterator over them.
///
/// Unlike [`Vec::drain`], this will piecemeal remove chunks of the data stored.
/// If iteration is terminated part way, the rest of the enqueued items in the same
/// chunk will be dropped, and the rest of the undrained elements will remain.
///
/// The ordering is not guaranteed.
pub fn drain<B>(&mut self) -> impl Iterator<Item = T> + '_
james7132 marked this conversation as resolved.
Show resolved Hide resolved
where
B: FromIterator<T>,
{
self.locals.iter_mut().flat_map(|item| item.take())
}
}

impl<T: Send> Parallel<Vec<T>> {
/// Collect all enqueued items from all threads and appends them to the end of a
/// single Vec.
///
/// The ordering is not guarenteed.
pub fn drain_into(&mut self, out: &mut Vec<T>) {
let size = self
.locals
.iter_mut()
.map(|queue| queue.get_mut().len())
.sum();
out.reserve(size);
for queue in self.locals.iter_mut() {
out.append(queue.get_mut());
}
}
}