Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Start running systems while prepare_systems is running #4919

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/bevy_ecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ bevy_utils = { path = "../bevy_utils", version = "0.9.0-dev" }
bevy_ecs_macros = { path = "macros", version = "0.9.0-dev" }

async-channel = "1.4"
event-listener = "2.5"
thread_local = "1.1.4"
fixedbitset = "0.4"
fxhash = "0.2"
Expand Down
156 changes: 113 additions & 43 deletions crates/bevy_ecs/src/schedule/executor_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@ use async_channel::{Receiver, Sender};
use bevy_tasks::{ComputeTaskPool, Scope, TaskPool};
#[cfg(feature = "trace")]
use bevy_utils::tracing::Instrument;
use event_listener::Event;
use fixedbitset::FixedBitSet;

#[cfg(test)]
use scheduling_event::*;

struct SystemSchedulingMetadata {
/// Used to signal the system's task to start the system.
start_sender: Sender<()>,
/// Receives the signal to start the system.
start_receiver: Receiver<()>,
start: Event,
/// Indices of systems that depend on this one, used to decrement their
/// dependency counters when this system finishes.
dependants: Vec<usize>,
Expand Down Expand Up @@ -56,7 +55,11 @@ pub struct ParallelExecutor {

impl Default for ParallelExecutor {
fn default() -> Self {
let (finish_sender, finish_receiver) = async_channel::unbounded();
// Using a bounded channel here as it avoids allocations when signaling
// and generally remains hotter in memory. It'll take 128 systems completing
// before the parallel executor runs before this overflows. If it overflows
// all systems will just suspend until the parallel executor runs.
let (finish_sender, finish_receiver) = async_channel::bounded(128);
james7132 marked this conversation as resolved.
Show resolved Hide resolved
Self {
system_metadata: Default::default(),
finish_sender,
Expand Down Expand Up @@ -84,10 +87,8 @@ impl ParallelSystemExecutor for ParallelExecutor {
for container in systems {
let dependencies_total = container.dependencies().len();
let system = container.system();
let (start_sender, start_receiver) = async_channel::bounded(1);
self.system_metadata.push(SystemSchedulingMetadata {
start_sender,
start_receiver,
start: Event::new(),
dependants: vec![],
dependencies_total,
dependencies_now: 0,
Expand Down Expand Up @@ -125,10 +126,13 @@ impl ParallelSystemExecutor for ParallelExecutor {

ComputeTaskPool::init(TaskPool::default).scope(|scope| {
self.prepare_systems(scope, systems, world);
if self.should_run.count_ones(..) == 0 {
return;
}
let parallel_executor = async {
// All systems have been ran if there are no queued or running systems.
while 0 != self.queued.count_ones(..) + self.running.count_ones(..) {
self.process_queued_systems().await;
self.process_queued_systems();
// Avoid deadlocking if no systems were actually started.
if self.running.count_ones(..) != 0 {
// Wait until at least one system has finished.
Expand Down Expand Up @@ -166,34 +170,96 @@ impl ParallelExecutor {
systems: &'scope mut [ParallelSystemContainer],
world: &'scope World,
) {
// These are used as a part of a unit test.
#[cfg(test)]
let mut started_systems = 0;
james7132 marked this conversation as resolved.
Show resolved Hide resolved
#[cfg(feature = "trace")]
let _span = bevy_utils::tracing::info_span!("prepare_systems").entered();
self.should_run.clear();
for (index, (system_data, system)) in
self.system_metadata.iter_mut().zip(systems).enumerate()
{
let should_run = system.should_run();
let can_start = should_run
&& system_data.dependencies_total == 0
&& Self::can_start_now(
self.non_send_running,
system_data,
&self.active_archetype_component_access,
);

// Queue the system if it has no dependencies, otherwise reset its dependency counter.
if system_data.dependencies_total == 0 {
if !can_start {
self.queued.insert(index);
}
} else {
system_data.dependencies_now = system_data.dependencies_total;
}

if !should_run {
continue;
}

// Spawn the system task.
if system.should_run() {
self.should_run.set(index, true);
let start_receiver = system_data.start_receiver.clone();
let finish_sender = self.finish_sender.clone();
let system = system.system_mut();
#[cfg(feature = "trace")] // NB: outside the task to get the TLS current span
let system_span = bevy_utils::tracing::info_span!("system", name = &*system.name());
self.should_run.insert(index);
let finish_sender = self.finish_sender.clone();
let system = system.system_mut();
#[cfg(feature = "trace")] // NB: outside the task to get the TLS current span
let system_span = bevy_utils::tracing::info_span!("system", name = &*system.name());
#[cfg(feature = "trace")]
let overhead_span =
bevy_utils::tracing::info_span!("system overhead", name = &*system.name());

let mut run = move || {
#[cfg(feature = "trace")]
let overhead_span =
bevy_utils::tracing::info_span!("system overhead", name = &*system.name());
let _system_guard = system_span.enter();
// SAFETY: the executor prevents two systems with conflicting access from running simultaneously.
unsafe { system.run_unsafe((), world) };
};

if can_start {
let task = async move {
start_receiver
.recv()
run();
// This will never panic:
// - The channel is never closed or dropped.
// - Overflowing the bounded size will just suspend until
// there is capacity.
finish_sender
.send(index)
.await
.unwrap_or_else(|error| unreachable!("{}", error));
#[cfg(feature = "trace")]
let system_guard = system_span.enter();
// SAFETY: the executor prevents two systems with conflicting access from running simultaneously.
unsafe { system.run_unsafe((), world) };
#[cfg(feature = "trace")]
drop(system_guard);
};

#[cfg(feature = "trace")]
let task = task.instrument(overhead_span);
if system_data.is_send {
scope.spawn(task);
} else {
scope.spawn_local(task);
}

#[cfg(test)]
{
started_systems += 1;
}

self.running.insert(index);
if !system_data.is_send {
self.non_send_running = true;
}
// Add this system's access information to the active access information.
self.active_archetype_component_access
.extend(&system_data.archetype_component_access);
} else {
let start_listener = system_data.start.listen();
let task = async move {
start_listener.await;
run();
// This will never panic:
// - The channel is never closed or dropped.
// - Overflowing the bounded size will just suspend until
// there is capacity.
finish_sender
.send(index)
.await
Expand All @@ -208,29 +274,33 @@ impl ParallelExecutor {
scope.spawn_local(task);
}
}
// Queue the system if it has no dependencies, otherwise reset its dependency counter.
if system_data.dependencies_total == 0 {
self.queued.insert(index);
} else {
system_data.dependencies_now = system_data.dependencies_total;
}
}
#[cfg(test)]
if started_systems != 0 {
self.emit_event(SchedulingEvent::StartedSystems(started_systems));
}
}

/// Determines if the system with given index has no conflicts with already running systems.
fn can_start_now(&self, index: usize) -> bool {
let system_data = &self.system_metadata[index];
#[inline]
fn can_start_now(
non_send_running: bool,
system_data: &SystemSchedulingMetadata,
active_archetype_component_access: &Access<ArchetypeComponentId>,
) -> bool {
// Non-send systems are considered conflicting with each other.
(!self.non_send_running || system_data.is_send)
(!non_send_running || system_data.is_send)
&& system_data
.archetype_component_access
.is_compatible(&self.active_archetype_component_access)
.is_compatible(active_archetype_component_access)
}

/// Starts all non-conflicting queued systems, moves them from `queued` to `running`,
/// adds their access information to active access information;
/// processes queued systems that shouldn't run this iteration as completed immediately.
async fn process_queued_systems(&mut self) {
fn process_queued_systems(&mut self) {
// These are used as a part of a unit test as seen in `process_queued_systems`.
// Removing them will cause the test to fail.
#[cfg(test)]
let mut started_systems = 0;
for index in self.queued.ones() {
Expand All @@ -239,17 +309,17 @@ impl ParallelExecutor {
let system_metadata = &self.system_metadata[index];
if !self.should_run[index] {
self.dependants_scratch.extend(&system_metadata.dependants);
} else if self.can_start_now(index) {
} else if Self::can_start_now(
self.non_send_running,
system_metadata,
&self.active_archetype_component_access,
) {
#[cfg(test)]
{
started_systems += 1;
}
system_metadata
.start_sender
.send(())
.await
.unwrap_or_else(|error| unreachable!("{}", error));
self.running.set(index, true);
system_metadata.start.notify_additional_relaxed(1);
self.running.insert(index);
if !system_metadata.is_send {
self.non_send_running = true;
}
Expand Down