From d630b5e9c1c120c22953ca4a563259729dac0edd Mon Sep 17 00:00:00 2001 From: Malek Date: Thu, 19 Mar 2026 21:09:52 -0400 Subject: [PATCH 01/19] redid everything, brand new pr effectively. --- Cargo.toml | 11 + crates/bevy_async/Cargo.toml | 44 ++++ crates/bevy_async/LICENSE-APACHE | 176 ++++++++++++++++ crates/bevy_async/LICENSE-MIT | 19 ++ crates/bevy_async/README.md | 7 + crates/bevy_async/src/async_bridge.rs | 203 +++++++++++++++++++ crates/bevy_async/src/ecs_access.rs | 214 ++++++++++++++++++++ crates/bevy_async/src/lib.rs | 26 +++ crates/bevy_async/src/plugin.rs | 121 +++++++++++ crates/bevy_async/src/system_state_store.rs | 110 ++++++++++ crates/bevy_async/src/wake_signal.rs | 67 ++++++ crates/bevy_internal/Cargo.toml | 2 + crates/bevy_internal/src/default_plugins.rs | 2 + crates/bevy_internal/src/lib.rs | 1 + examples/async_tasks/async_ecs_access.rs | 115 +++++++++++ 15 files changed, 1118 insertions(+) create mode 100644 crates/bevy_async/Cargo.toml create mode 100644 crates/bevy_async/LICENSE-APACHE create mode 100644 crates/bevy_async/LICENSE-MIT create mode 100644 crates/bevy_async/README.md create mode 100644 crates/bevy_async/src/async_bridge.rs create mode 100644 crates/bevy_async/src/ecs_access.rs create mode 100644 crates/bevy_async/src/lib.rs create mode 100644 crates/bevy_async/src/plugin.rs create mode 100644 crates/bevy_async/src/system_state_store.rs create mode 100644 crates/bevy_async/src/wake_signal.rs create mode 100644 examples/async_tasks/async_ecs_access.rs diff --git a/Cargo.toml b/Cargo.toml index d7041ff66dcc7..311083527044d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2137,6 +2137,17 @@ description = "How to use `AsyncComputeTaskPool` to complete longer running task category = "Async Tasks" wasm = false +[[example]] +name = "async_ecs_access" +path = "examples/async_tasks/async_ecs_access.rs" +doc-scrape-examples = true + +[package.metadata.example.async_ecs_access] +name = "Async Ecs Access" +description = "An example showing how to offload work to background async tasks using the AsyncBridge primitive." +category = "Async Tasks" +wasm = true + [[example]] name = "async_channel_pattern" path = "examples/async_tasks/async_channel_pattern.rs" diff --git a/crates/bevy_async/Cargo.toml b/crates/bevy_async/Cargo.toml new file mode 100644 index 0000000000000..c9290dc29f73e --- /dev/null +++ b/crates/bevy_async/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "bevy_async" +version = "0.19.0-dev" +edition = "2024" +description = "Provides interop between the ecs and async runtimes" +homepage = "https://bevy.org" +repository = "https://github.com/bevyengine/bevy" +license = "MIT OR Apache-2.0" +keywords = ["bevy", "async"] + +[features] +default = ["std", "bevy_tasks"] + +std = [ + "bevy_app/std", + "bevy_ecs/std", + "bevy_platform/std", + "scoped_static_storage/std", + "keyed_concurrent_queue/std", +] + +[dependencies] +# bevy +bevy_app = { path = "../bevy_app", version = "0.19.0-dev", default-features = false } +bevy_ecs = { path = "../bevy_ecs", version = "0.19.0-dev", default-features = false } +bevy_ecs_macros = { path = "../bevy_ecs/macros", version = "0.19.0-dev" } +bevy_platform = { path = "../bevy_platform", version = "0.19.0-dev", default-features = false } +bevy_tasks = { path = "../bevy_tasks", version = "0.19.0-dev", default-features = false, optional = true } + +# other +scoped_static_storage = { version = "0.1.2", default-features = false } +keyed_concurrent_queue = { version = "0.1.3", default-features = false } +thiserror = { version = "2", default-features = false } + +[lints] +workspace = true + +[package.metadata.docs.rs] +rustdoc-args = [ + "-Zunstable-options", + "--generate-link-to-definition", + "--generate-macro-expansion", +] +all-features = true \ No newline at end of file diff --git a/crates/bevy_async/LICENSE-APACHE b/crates/bevy_async/LICENSE-APACHE new file mode 100644 index 0000000000000..d9a10c0d8e868 --- /dev/null +++ b/crates/bevy_async/LICENSE-APACHE @@ -0,0 +1,176 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS diff --git a/crates/bevy_async/LICENSE-MIT b/crates/bevy_async/LICENSE-MIT new file mode 100644 index 0000000000000..9cf106272ac3b --- /dev/null +++ b/crates/bevy_async/LICENSE-MIT @@ -0,0 +1,19 @@ +MIT License + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/crates/bevy_async/README.md b/crates/bevy_async/README.md new file mode 100644 index 0000000000000..9be2d2082f059 --- /dev/null +++ b/crates/bevy_async/README.md @@ -0,0 +1,7 @@ +# Bevy Async + +[![License](https://img.shields.io/badge/license-MIT%2FApache-blue.svg)](https://github.com/bevyengine/bevy#license) +[![Crates.io](https://img.shields.io/crates/v/bevy_async.svg)](https://crates.io/crates/bevy_async) +[![Downloads](https://img.shields.io/crates/d/bevy_async.svg)](https://crates.io/crates/bevy_async) +[![Docs](https://docs.rs/bevy_async/badge.svg)](https://docs.rs/bevy_a11y/latest/bevy_async/) +[![Discord](https://img.shields.io/discord/691052431525675048.svg?label=&logo=discord&logoColor=ffffff&color=7389D8&labelColor=6A7EC2)](https://discord.gg/bevy) diff --git a/crates/bevy_async/src/async_bridge.rs b/crates/bevy_async/src/async_bridge.rs new file mode 100644 index 0000000000000..f5100e90184bb --- /dev/null +++ b/crates/bevy_async/src/async_bridge.rs @@ -0,0 +1,203 @@ +use crate::plugin::{AsyncBridge, MaxAsyncTicksPerSyncPoint}; +use crate::system_state_store::ErasedSystemStateStore; +use bevy_ecs::prelude::{IntoSystemSet, SystemSet, World}; +use bevy_ecs::schedule::InternedSystemSet; +use bevy_platform::sync::Arc; + +/// Drives the queued bridge work for `SyncPoint`. +/// +/// Every queued access request is guaranteed to be *woken*. That wake guarantees the corresponding +/// async future gets a chance to poll. +/// It does *not* however guarantee the poll will finish its ECS work, because that +/// poll may still fail to finish it's work for a *variety* of reasons, i.e. it is unable to acquire +/// the typed `SystemState` lock and returns `Poll::Pending`. +/// +/// This function attempts to drive queued work several times, up to +/// `MaxAsyncTicksPerSyncPoint`. If one internal tick finds no work, we opportunistically tick the +/// global task pool and try once more before returning early. +/// +/// We drive queued work multiple times for two reasons. The first is that serial `.await` calls +/// should try to all be completed within the same `SyncPoint` such as +/// ```rust,ignore +/// let health = task_1.run(|health: Single<&Health, With>| { +/// health.0 +/// }).await; +/// if health == 0 { +/// return; +/// } +/// task_1.run(|commands: Commands| { +/// commands.trigger(PlayerDoesAttack); +/// }).await; +/// ``` +/// The second reason is spoken of prior. Poll may fail to finish for a variety of reasons and +/// should be given several chances before quitting. +pub fn drive_async_bridge(world: &mut World) { + // Derive the stable interned system-set key used to look up requests queued + // for this exact sync point type. + let sync_point = drive_async_bridge::.into_system_set().intern(); + let bridge = world.get_resource::().unwrap().clone(); + // Read the configured maximum number of internal attempts we are willing to + // perform during this `SyncPoint`. + let max_ticks = world.get_resource::().unwrap().0; + for _ in 0..max_ticks { + // Drive once. If no work was found, we may truly be done. + // but we should give external task pools one more opportunity to make newly-woken + // tasks runnable. + if bridge.0.drive_sync_point(sync_point, world) == DriveStatus::Idle { + #[cfg(feature = "bevy_tasks")] + bevy_tasks::cfg::web! { + if {} else { + bevy_tasks::tick_global_task_pools_on_main_thread(); + } + } + // Retry once after ticking the global pool. If we are still idle, + // there is no more immediately available progress to make. + if bridge.0.drive_sync_point(sync_point, world) == DriveStatus::Idle { + return; + } + } + } +} + +#[derive(Default)] +pub(crate) struct AsyncBridgeInner { + pub(crate) requests_by_sync_point: + keyed_concurrent_queue::KeyedQueues, + pub(crate) world_scope: scoped_static_storage::ScopedStatic, +} + +impl AsyncBridgeInner { + /// This drives a single sync point, requesting the poll of all tasks in that sync point. + /// None of the tasks are guaranteed to actually return `Poll::Ready`, but all are guaranteed to + /// at least do a `Poll::Pending` + /// + /// The flow of logic is the following: + /// 1. We first drain the queue for our `SyncPoint`. + /// 2. We initialize the request's `SystemState`. (This is idempotent). + /// 3. Expose our `World` through `world_scope`. + /// 4. Wake all our `EcsAccessFuture`s. + /// 5. Apply our `SystemState` back into the `World`. (Things like `Commands`). + fn drive_sync_point(&self, sync_point: InternedSystemSet, world: &mut World) -> DriveStatus { + let mut queued_requests = bevy_platform::prelude::vec![]; + while let Ok(mut queued_task_bridge) = + self.requests_by_sync_point.get_or_create(&sync_point).pop() + { + queued_requests.push(queued_task_bridge.init_system_state(world)); + } + // If no requests were waiting then report idle so the caller can decide whether to stop + // or attempt one more task-pool tick. + if queued_requests.is_empty() { + return DriveStatus::Idle; + } + // Make this `World` temporarily visible to our waking futures. Wake them all and wait + // until they all have at least *attempted* to poll. + // This is contractually obligated by the contract of `.wake()`. We are guaranteed one wake + // per call to our `.wake()`. + let completed_tasks = self + .world_scope + .scope(world, || wake_requests_and_wait(queued_requests)); + for task in completed_tasks { + task.apply(world); + } + DriveStatus::Progress + } +} + +/// Whether a drive attempt made any progress. +#[derive(PartialEq)] +enum DriveStatus { + /// We found and processed at least one queued request. + Progress, + /// There was no queued work available for the `SyncPoint`. + Idle, +} + +/// A queued access request bridging an async task into ECS. +pub(crate) struct QueuedBridgeRequest { + /// Waker for the async future that wants ECS access. + /// When the `SyncPoint` is driven, this waker is fired so the future can + /// poll while `world_scope` exposes the current `World`. + pub(crate) waker: core::task::Waker, + /// Our custom primitive that lets us wait until all the futures have tried to run before + /// continuing. + pub(crate) wake_signal: crate::wake_signal::WakeSignal, + pub(crate) initialized: bool, + pub(crate) system_state: Arc, +} + +/// A queued access request whose waker has already been fired. +struct WokenBridgeRequest { + wake_signal: crate::wake_signal::WakeSignal, + system_state: Arc, +} + +/// A request that has finished its attempted poll and may need to apply deferred world state. +struct CompletedBridgeRequest { + system_state: Arc, +} + +impl CompletedBridgeRequest { + #[inline] + fn apply(self, world: &mut World) { + self.system_state.apply(world); + } +} + +impl QueuedBridgeRequest { + /// Initialize the `SystemStateCell` if it isn't already initialized. + fn init_system_state(mut self, world: &mut World) -> Self { + if self.initialized { + return self; + } + self.system_state.init(world); + self.initialized = true; + self + } +} + +#[inline] +fn wake_requests_and_wait( + queued_requests: bevy_platform::prelude::Vec, +) -> bevy_platform::prelude::Vec { + let bridged_tasks = queued_requests + .into_iter() + .map( + |QueuedBridgeRequest { + system_state, + waker, + wake_signal, + .. + }| { + // Trigger the async future so it can poll while `world_scope` + // is active. + waker.wake(); + WokenBridgeRequest { + system_state, + wake_signal, + } + }, + ) + // we re-collect to ensure we fully exhaust the prior iterator + // we want to have all the wakers call .wake() before the first barrier calls .wait() + .collect::>(); + + #[cfg(feature = "bevy_tasks")] + bevy_tasks::cfg::web! { + if {} else { + bevy_tasks::tick_global_task_pools_on_main_thread(); + } + } + + bridged_tasks + .into_iter() + .map( + |WokenBridgeRequest { + system_state, + wake_signal, + }| { + wake_signal.wait(); + CompletedBridgeRequest { system_state } + }, + ) + .collect() +} diff --git a/crates/bevy_async/src/ecs_access.rs b/crates/bevy_async/src/ecs_access.rs new file mode 100644 index 0000000000000..119dfabc86d03 --- /dev/null +++ b/crates/bevy_async/src/ecs_access.rs @@ -0,0 +1,214 @@ +use crate::async_bridge; +use crate::async_bridge::{AsyncBridgeInner, QueuedBridgeRequest}; +use crate::system_state_store::ErasedSystemStateStore; +use crate::wake_signal::WakeSignal; +use bevy_ecs::schedule::{InternedSystemSet, IntoSystemSet, SystemSet}; +use bevy_ecs::system::SystemParam; +use bevy_platform::sync::{Arc, Weak}; +use core::marker::PhantomData; + +/// Handle that lets an async task request temporary access to an ECS +/// `SystemParam` or a tuple of them. +/// +/// `P` is the typed system parameter the caller eventually wants, such as: +/// - [`bevy_ecs::prelude::Commands`] +/// - [`bevy_ecs::prelude::Res`] +/// - [`bevy_ecs::prelude::Query`] +/// - tuples of params +/// +/// It is cheap to clone and intended to be passed into async tasks. +/// You can pass it into *multiple* tasks on separate threads and have them work concurrently +/// off of the same state, sharing `Locals`. +pub struct EcsAccess { + pub(crate) phantom_data: PhantomData

, + + /// A `Weak` is used so tasks do not stay alive if the world is dropped. + /// If the world goes away, upgrading this weak pointer fails and access + /// returns [`EcsAccessError::WorldDropped`]. + pub(crate) bridge: Weak, + + /// Type-erased storage for the underlying `SystemState

`. + /// + /// Each `EcsAccess

` keeps reusing the same typed system state across + /// accesses so repeated operations do not rebuild it from scratch. + /// + /// This is also important not only to persist params like `Local` but *also* so `Changed` and + /// `Added` and other filters can work. + pub(crate) system_state: Arc, +} + +impl Clone for EcsAccess

{ + fn clone(&self) -> Self { + Self { + phantom_data: PhantomData::default(), + bridge: self.bridge.clone(), + system_state: self.system_state.clone(), + } + } +} + +impl EcsAccess

{ + pub async fn access( + &self, + _sync_point: SyncPoint, + access_fn: AccessFn, + ) -> Result + where + for<'w, 's> AccessFn: FnOnce(P::Item<'w, 's>) -> Out, + { + EcsAccessFuture { + phantom_data: PhantomData::default(), + system_set: async_bridge::drive_async_bridge:: + .into_system_set() + .intern(), + system_func: Some(access_fn), + wake_signal: None, + system_state: self.system_state.clone(), + bridge: self.bridge.clone(), + } + .await + } +} + +#[derive(thiserror::Error, Debug)] +pub enum EcsAccessError { + /// The requested `SystemParam` was invalid in the current world context. + /// for example trying to access a param that fails Bevy's usual validation like a missing + /// Resource or using `Single` on something that has 0 or multiple instances. + #[error(transparent)] + SystemParamValidation(bevy_ecs::system::SystemParamValidationError), + /// The world has been dropped, so we should just return. + #[error("World no longer exists")] + WorldDropped, +} + +/// Future representing a single in-flight ECS access request. +struct EcsAccessFuture { + phantom_data: PhantomData<(P, Func, Out)>, + /// Interned system-set key identifying which sync-point queue this future + /// should be sent to. + system_set: InternedSystemSet, + /// This is the pseudo-system that we try to run when we have access to `World`. + /// This is an option just so we can take it out when we run it so we can use `FnOnce` + /// instead of `FnMut`, so it's more flexible than real systems. + system_func: Option, + /// Wake signal for the currently queued wake cycle, if any. + /// + /// The future drops this at the end of `poll` which acts as acknowledgement that the wake + /// has been handled. + wake_signal: Option, + system_state: Arc, + /// Weak bridge pointer so the loss of the world becomes a clean runtime error. + bridge: Weak, +} + +impl Unpin for EcsAccessFuture {} + +impl Future for EcsAccessFuture +where + P: SystemParam + 'static, + for<'w, 's> Func: FnOnce(P::Item<'w, 's>) -> Out, +{ + type Output = Result; + + fn poll( + mut self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll { + use core::task::Poll; + + // If we were previously woken by the sync-point driver, we will have a + // `WakeSignal` stored here. + // + // Dropping that signal at the end of this poll acts as the + // acknowledgement that yes, this wake was observed and this task has + // attempted its run, you may release the waiting on the other side. + let _drop_at_end_of_scope = self.wake_signal.take(); + + // Try to gain a strong reference to the bridge. If this fails, the world is gone, + // so further access is impossible. + let async_bridge = match self.bridge.upgrade() { + None => { + return Poll::Ready(Err(EcsAccessError::WorldDropped)); + } + Some(async_ecs) => async_ecs, + }; + match async_bridge + .world_scope + .try_with(|world| { + let system_state = self.system_state.clone(); + // Attempt to acquire the typed `SystemState

`. + // + // We deliberately use `try_lock` rather than blocking. If + // another bridge request is currently using the same system + // state, we simply yield and let the sync-point driver try again + // on a later internal tick. + let Some(mut system_state_guard) = system_state.try_lock::

() else { + return Poll::Pending; + }; + // This one really shouldn't happen very often. If we created this task *while* + // the sync point driver was running this will occur. In that case the system state + // never actually got initialized, and even though we *have* access to the world, + // for safetyreasons we have to perform our initialization on the main world-thread, + // not here. + let Some(mut system_state) = system_state_guard.as_mut() else { + return Poll::Pending; + }; + if !system_state.meta().is_send() { + return Poll::Ready(Err(EcsAccessError::SystemParamValidation( + bevy_ecs::system::SystemParamValidationError::invalid::< + bevy_ecs::prelude::NonSend<()>, + >("Cannot have your system be non-send / exclusive"), + ))); + } + let state = match system_state.get_mut(world) { + Ok(state) => state, + Err(system_param_validation_error) => { + return Poll::Ready(Err(EcsAccessError::SystemParamValidation( + system_param_validation_error, + ))) + } + }; + // We finally have `P::Item<'w, 's>`, yay!, so consume the stored `FnOnce`, run it, + // and complete the future. + Poll::Ready(Ok(self.system_func.take().unwrap()(state))) + }) + .ok() + { + Some(out) => out, + None => { + // No world is currently exposed. That means we are being polled + // outside the sync-point drive, so we cannot access ECS yet. + // + // Instead, enqueue ourselves to be revisited when the matching + // sync-point system runs. + let wait_barrier = WakeSignal::new(); + // Store one clone locally so dropping it at the end of the next + // poll acknowledges the wake. + self.wake_signal.replace(wait_barrier.clone()); + // Queue the request under this future's target sync point. + // + // The queued payload carries the following! + // 1. The task's waker, so the sync-point driver can wake it. + // 2. The wake handshake signal, so the driver can wait until the wake has actually + // been processed. + // 3. An initialization hint for the typed `SystemState`. + // 4. The erased `SystemState` storage itself. + async_bridge + .requests_by_sync_point + .try_send( + &self.system_set, + QueuedBridgeRequest { + waker: cx.waker().clone(), + wake_signal: wait_barrier, + initialized: self.system_state.is_initialized(), + system_state: self.system_state.clone(), + }, + ) + .ok() + .unwrap(); + Poll::Pending + } + } + } +} diff --git a/crates/bevy_async/src/lib.rs b/crates/bevy_async/src/lib.rs new file mode 100644 index 0000000000000..67c878eaa196c --- /dev/null +++ b/crates/bevy_async/src/lib.rs @@ -0,0 +1,26 @@ +#![forbid(unsafe_code)] +#![cfg_attr(docsrs, feature(doc_cfg))] +#![doc( + html_logo_url = "https://bevy.org/assets/icon.png", + html_favicon_url = "https://bevy.org/assets/icon.png" +)] +#![no_std] + +#[cfg(feature = "std")] +extern crate std; + +mod async_bridge; +mod ecs_access; +mod plugin; +mod system_state_store; +mod wake_signal; + +pub use crate::plugin::AsyncPlugin; + +pub mod prelude { + #[doc(hidden)] + pub use crate::{ + async_bridge::drive_async_bridge, + plugin::{AsyncBridge, AsyncPlugin}, + }; +} diff --git a/crates/bevy_async/src/plugin.rs b/crates/bevy_async/src/plugin.rs new file mode 100644 index 0000000000000..009e195c4939f --- /dev/null +++ b/crates/bevy_async/src/plugin.rs @@ -0,0 +1,121 @@ +use crate::ecs_access::EcsAccess; +use crate::system_state_store::SystemStateStore; +use bevy_app::App; +use bevy_ecs::system::SystemParam; +use std::marker::PhantomData; +use std::sync::Arc; + +/// Plugin entry point for the async <-> ECS bridge system. +/// +/// This plugin installs a configuration resource telling the bridge how aggressively to drive work +/// at each sync point. +/// +/// Conceptually, async tasks cannot directly access Bevy ECS state from arbitrary +/// threads or arbitrary times. Instead, they enqueue requests which are later +/// driven from a known ECS `SyncPoint` on the world-owning thread. +/// +/// This supports arbitrary async runtimes as well as multiple Bevy Worlds / Bevy Apps. +pub struct AsyncPlugin { + /// Upper bound on how many internal bridge ticks we perform each time a + /// sync point system runs. + /// + /// A single "bridge tick" means: + /// 1. collect queued access requests for that sync point, + /// 2. wake the corresponding async tasks, + /// 3. wait for each one to attempt a poll, + /// 4. apply any deferred `SystemState` work back into the world. + /// + /// We may need to do this multiple times because one task's progress can + /// unblock another task that previously returned `Poll::Pending`. + pub max_async_ticks_per_sync_point: usize, +} + +impl Default for AsyncPlugin { + fn default() -> Self { + Self { + max_async_ticks_per_sync_point: 100, + } + } +} + +impl bevy_app::Plugin for AsyncPlugin { + fn build(&self, app: &mut App) { + app.insert_resource(MaxAsyncTicksPerSyncPoint( + self.max_async_ticks_per_sync_point, + )) + .init_resource::(); + } +} + +/// Internal resource to manage a limit on how many times we try to drive the async <-> ecs bridge +/// per sync point. +#[derive(bevy_ecs_macros::Resource, Clone)] +pub(crate) struct MaxAsyncTicksPerSyncPoint(pub(crate) usize); + +/// This resource gives one the ability to create a bridge between an async task and the ecs. +/// By calling `AsyncBridge::new(&self)` you create a new bridge between an async task +/// and the ecs. +#[derive(bevy_ecs_macros::Resource, Default, Clone)] +pub struct AsyncBridge(pub(crate) Arc); + +impl AsyncBridge { + /// Creates a reusable async handle for accessing the ECS with the + /// `SystemParam` type `P`. + /// + /// This is the entry-point to let an + /// async task interact with Bevy ECS state. + /// + /// The returned [`EcsAccess

`]: + /// - is cheap to clone, + /// - can be moved into async tasks, + /// - does not access the world immediately, + /// [`EcsAccess

`] waits until a matching sync point drives the bridge and + /// temporarily grants safe ECS access. + /// + /// You create one of these from a cloned [`AsyncBridge`] resource and + /// then call `.access(...)` inside async code whenever you want to access the ECS. + /// + /// # Example + /// ```rust + /// use bevy_app::prelude::*; + /// use bevy_async::prelude::*; + /// use bevy_ecs::prelude::*; + /// use bevy_tasks::AsyncComputeTaskPool; + /// use bevy_platform::sync::atomic::AtomicBool; + /// use bevy_platform::sync::atomic::Ordering; + /// use bevy_platform::sync::Arc; + /// use bevy_app::ScheduleRunnerPlugin; + /// + /// struct MySyncPoint; + /// static ACCESS_RAN: AtomicBool = AtomicBool::new(false); + /// fn main() { + /// let mut app = App::new(); + /// app.add_plugins((AsyncPlugin::default(), ScheduleRunnerPlugin::default(), TaskPoolPlugin::default())); + /// app.add_systems(Update, drive_async_bridge::); + /// app.add_systems(Startup, move |bridge: Res| { + /// let bridge = bridge.clone(); + /// AsyncComputeTaskPool::get().spawn(async move { + /// let ecs_access = bridge.new::(); + /// ecs_access.access(MySyncPoint, |mut commands: Commands| { + /// commands.spawn_empty(); + /// ACCESS_RAN.store(true, Ordering::Relaxed); + /// }).await.unwrap(); + /// }).detach(); + /// }); + /// app.update(); + /// + /// assert!(ACCESS_RAN.load(Ordering::Relaxed)); + /// } + /// + /// ``` + /// + /// `P` is stored lazily, meaning the underlying `SystemState

` is only + /// initialized when the bridge is first driven against a real `World`. + pub fn new(&self) -> EcsAccess

{ + EcsAccess { + phantom_data: PhantomData::default(), + bridge: Arc::downgrade(&self.0), + system_state: Arc::new(SystemStateStore::

::default()), + } + } +} diff --git a/crates/bevy_async/src/system_state_store.rs b/crates/bevy_async/src/system_state_store.rs new file mode 100644 index 0000000000000..8c1fd2be0c7df --- /dev/null +++ b/crates/bevy_async/src/system_state_store.rs @@ -0,0 +1,110 @@ +use bevy_ecs::system::{SystemParam, SystemState}; +use bevy_ecs::world::World; +use bevy_platform::sync::{atomic::AtomicBool, Mutex, MutexGuard}; + +/// Stores a typed `SystemState

` behind a mutex so it can be initialized once +/// and then shared across bridge requests. +/// +/// Why this exists: +/// `SystemState

` is typed, but the bridge queue needs to store heterogeneous +/// requests without knowing `P` at compile time. So each concrete +/// `SystemStateStore

` is later erased behind `dyn ErasedSystemStateStore`. +/// +/// The inner `Option` starts as `None` because we cannot construct the +/// `SystemState

` until we have a mutable `World`. Furthermore, it is not safe to try to +/// initialize the `SystemState

` from a thread *other* than the world-owning thread, so +/// we have to start it as none and have the initialization occur on the world-owning thread before +/// the `SystemState

` is ever used. +pub(crate) struct SystemStateStore( + Mutex>>, + AtomicBool, +); + +impl Default for SystemStateStore

{ + fn default() -> Self { + // Start uninitialized. Initialization is deferred until the request is + // first driven on the world-owning thread with access to `&mut World`. + Self(Mutex::new(None), AtomicBool::new(false)) + } +} + +/// Allows us to erase the `SystemStateCell` so we can pass it to and from the ecs. +/// +/// This lets the bridge store all request state uniformly as `Arc`. +/// +/// This trait exposes the following operations: +/// - initialize the typed `SystemState` if needed, +/// - apply deferred state back into the world, +/// - ask whether initialization has already happened. +pub(crate) trait ErasedSystemStateStore: Send + Sync + core::any::Any + 'static { + /// Lazily initialize the underlying typed `SystemState`. + /// + /// If initialization has already happened, this is idempodent, however it is not 0-cost because + /// of the forced pointer chasing and indirection of a `dyn Trait`. + /// + /// This must run on the world-owning thread because `SystemState::new` + /// requires `&mut World`. + fn init(&self, world: &mut World); + + /// Apply deferred operations accumulated by the `SystemState` back into + /// the world. + /// + /// For example, `Commands` buffers are typically flushed during `apply`. + fn apply(&self, world: &mut World); + + /// Returns `true` if the system is initialized, `false` if it is uninitialized. + fn is_initialized(&self) -> bool; +} + +impl ErasedSystemStateStore for SystemStateStore

{ + fn init(&self, world: &mut World) { + // Lock the store so only one thread/driver path can perform the lazy + // initialization. + let mut system_state = self.0.lock().unwrap(); + // If another earlier request already initialized the state we are done. + if system_state.is_some() { + return; + } + self.1 + .store(true, bevy_platform::sync::atomic::Ordering::Relaxed); + system_state.replace(SystemState::new(world)); + } + + fn apply(&self, world: &mut World) { + // We expect initialization to have already occurred before `apply` is + // ever called. So `unwrap()` here reflects an invariant of the bridge. + // Completed requests only exist for initialized system states. + self.0.lock().unwrap().as_mut().unwrap().apply(world); + } + + fn is_initialized(&self) -> bool { + // If the atomic bool *says* it's loaded then we know for sure it is. + // Otherwise we have to conservatively assume it's not initialized. + // This is okay because our initialization logic is idempotent. + self.1.load(bevy_platform::sync::atomic::Ordering::Relaxed) + } +} + +impl dyn ErasedSystemStateStore { + pub(crate) fn try_lock( + &self, + ) -> Option>>> { + // Recover the concrete typed store from the erased trait object. + // + // This `unwrap()` encodes another invariant of the design, it is the case that every + // call site must ask for the same `P` that was originally used to create the erased store. + // A mismatch here would be a logic bug in the bridge, and should never ever happen. + (self as &dyn core::any::Any) + .downcast_ref::>() + .unwrap() + .0 + // Use `try_lock` rather than blocking: + // if another request currently owns the typed `SystemState

`, the + // caller should yield with `Poll::Pending` instead of stalling a + // thread. We get ticked optimistically many times so it's okay. We aren't guaranteed to + // run everytime so we can return Poll::Pending instead of blocking an async task + // which would be very bad. + .try_lock() + .ok() + } +} diff --git a/crates/bevy_async/src/wake_signal.rs b/crates/bevy_async/src/wake_signal.rs new file mode 100644 index 0000000000000..53cf5eb882ddd --- /dev/null +++ b/crates/bevy_async/src/wake_signal.rs @@ -0,0 +1,67 @@ +use bevy_platform::sync::{Arc, Mutex}; + +/// WakeSignal is a custom signaling primitive used in order to fufill our specific requirements for +/// our async bridge. We need to wait at the sync point, after waking all the futures and only when +/// all the futures have had a chance to run we stop waiting. +/// We need this signaling to occur also if the future is dropped, or if the future panics +/// so we implement the signaling *on* the Drop implementation. +/// This also makes replacing the wake signal automatically drop and signal the previous one. +#[derive(Clone)] +#[cfg(feature = "std")] +pub(crate) struct WakeSignal( + #[cfg(feature = "std")] Arc<(Mutex, std::sync::Condvar)>, + #[cfg(not(feature = "std"))] Arc<(Mutex)>, +); + +impl WakeSignal { + #[inline] + pub(crate) fn new() -> Self { + #[cfg(feature = "std")] + { + WakeSignal(Arc::new((Mutex::new(false), std::sync::Condvar::new()))) + } + #[cfg(not(feature = "std"))] + { + WakeSignal(Arc::new(Mutex::new(false))) + } + } + + /// Waits until another cloned instance of `WakeSignal` has been dropped. + /// If any cloned instance of `WakeSignal` is dropped then this wait stops waiting. + #[cfg(feature = "std")] + #[inline] + pub(crate) fn wait(&self) { + #[cfg(feature = "std")] + { + let (lock, cv) = &*self.0; + let mut signaled = lock.lock().unwrap(); + while !*signaled { + signaled = cv.wait(signaled).unwrap(); + } + } + #[cfg(not(feature = "std"))] + { + loop { + if self.0.lock().unwrap() { + break; + } + } + } + } +} +impl Drop for WakeSignal { + #[cfg(feature = "std")] + #[inline] + fn drop(&mut self) { + let (lock, cv) = &*self.0; + let mut signaled = lock.lock().unwrap(); + *signaled = true; + cv.notify_one(); + } + + #[cfg(not(feature = "std"))] + #[inline] + fn drop(&mut self) { + *self.0.lock().unwrap() = true; + } +} diff --git a/crates/bevy_internal/Cargo.toml b/crates/bevy_internal/Cargo.toml index 9840056b62ecb..917aed6be19bf 100644 --- a/crates/bevy_internal/Cargo.toml +++ b/crates/bevy_internal/Cargo.toml @@ -396,6 +396,7 @@ std = [ "bevy_color?/std", "bevy_diagnostic/std", "bevy_ecs/std", + "bevy_async/std", "bevy_input/std", "bevy_input_focus?/std", "bevy_math/std", @@ -468,6 +469,7 @@ bevy_diagnostic = { path = "../bevy_diagnostic", version = "0.19.0-dev", default bevy_ecs = { path = "../bevy_ecs", version = "0.19.0-dev", default-features = false, features = [ "bevy_reflect", ] } +bevy_async = { path = "../bevy_async", version = "0.19.0-dev", default-features = false } bevy_input = { path = "../bevy_input", version = "0.19.0-dev", default-features = false, features = [ "bevy_reflect", ] } diff --git a/crates/bevy_internal/src/default_plugins.rs b/crates/bevy_internal/src/default_plugins.rs index 0f75594b7bf91..2e23a2b41cfb8 100644 --- a/crates/bevy_internal/src/default_plugins.rs +++ b/crates/bevy_internal/src/default_plugins.rs @@ -7,6 +7,7 @@ plugin_group! { #[cfg(feature = "bevy_log")] bevy_log:::LogPlugin, bevy_app:::TaskPoolPlugin, + bevy_async:::AsyncPlugin, bevy_diagnostic:::FrameCountPlugin, bevy_time:::TimePlugin, bevy_transform:::TransformPlugin, @@ -141,6 +142,7 @@ plugin_group! { bevy_diagnostic:::FrameCountPlugin, bevy_time:::TimePlugin, bevy_app:::ScheduleRunnerPlugin, + bevy_async:::AsyncPlugin, #[cfg(feature = "bevy_ci_testing")] bevy_dev_tools::ci_testing:::CiTestingPlugin, } diff --git a/crates/bevy_internal/src/lib.rs b/crates/bevy_internal/src/lib.rs index 859ebec55307c..e1adad0091780 100644 --- a/crates/bevy_internal/src/lib.rs +++ b/crates/bevy_internal/src/lib.rs @@ -25,6 +25,7 @@ pub use bevy_anti_alias as anti_alias; pub use bevy_app as app; #[cfg(feature = "bevy_asset")] pub use bevy_asset as asset; +pub use bevy_async as async_bridge; #[cfg(feature = "bevy_audio")] pub use bevy_audio as audio; #[cfg(feature = "bevy_camera")] diff --git a/examples/async_tasks/async_ecs_access.rs b/examples/async_tasks/async_ecs_access.rs new file mode 100644 index 0000000000000..2d2e92b6b9553 --- /dev/null +++ b/examples/async_tasks/async_ecs_access.rs @@ -0,0 +1,115 @@ +//! This example demonstrates how to use Bevy's ECS and the [`AsyncComputeTaskPool`] +//! to offload computationally intensive tasks to a background thread pool and process them +//! asynchronously. +//! +//! Unlike the channel-based approach (where tasks send results directly via a communication +//! channel) or the direct approach in async_compute, this example uses the ecs <-> async bridge. + +use bevy::async_bridge::prelude::{drive_async_bridge, AsyncBridge}; +use bevy::{prelude::*, tasks::AsyncComputeTaskPool}; +use rand::RngExt; + +struct MySyncPoint; + +fn main() { + App::new() + .add_plugins(DefaultPlugins) + .add_systems(Startup, setup) + .add_systems(Update, (drive_async_bridge::, rotate_light)) + .run(); +} + +// Number of cubes to spawn across the x, y, and z axis +const NUM_CUBES: i32 = 6; + +const LIGHT_RADIUS: f32 = 8.0; + +/// This system generates tasks simulating computationally intensive +/// work that potentially spans multiple frames/ticks. A separate +/// system, [`handle_tasks`], will track the spawned tasks on subsequent +/// frames/ticks, and use the results to spawn cubes. +/// +/// The task is offloaded to the `AsyncComputeTaskPool`, allowing heavy computation +/// to be handled asynchronously, without blocking the main game thread. +fn setup( + mut commands: Commands, + bridge: Res, + mut meshes: ResMut>, + mut materials: ResMut>, +) { + commands.spawn(( + Mesh3d(meshes.add(Circle::new(1.618 * NUM_CUBES as f32))), + MeshMaterial3d(materials.add(Color::WHITE)), + Transform::from_rotation(Quat::from_rotation_x(-std::f32::consts::FRAC_PI_2)), + )); + + // Spawn a point light with shadows enabled + commands.spawn(( + PointLight { + shadow_maps_enabled: true, + ..default() + }, + Transform::from_xyz(0.0, LIGHT_RADIUS, 4.0), + )); + + // Spawn a camera looking at the origin + commands.spawn(( + Camera3d::default(), + Transform::from_xyz(-6.5, 5.5, 12.0).looking_at(Vec3::ZERO, Vec3::Y), + )); + + let pool = bevy::tasks::AsyncComputeTaskPool::get(); + + // Reuse tasks so you don't have to pay the system init cost every time it runs. + let task = bridge.new::<( + Commands, + Local>>, + Local>>, + ResMut>, + ResMut>, + )>(); + for x in -NUM_CUBES..NUM_CUBES { + for z in -NUM_CUBES..NUM_CUBES { + // Spawn a task on the async compute pool + let task = task.clone(); + pool.spawn(async move { + let delay = std::time::Duration::from_secs_f32(rand::rng().random_range(2.0..8.0)); + // Simulate a delay before task completion + futures_timer::Delay::new(delay).await; + task.access( + MySyncPoint, + |(mut commands, mut box_mesh, mut box_material, mut meshes, mut materials)| { + if box_mesh.is_none() { + box_mesh.replace(meshes.add(Cuboid::new(0.25, 0.25, 0.25))); + } + if box_material.is_none() { + box_material.replace(materials.add(Color::srgb(1.0, 0.2, 0.3))); + } + let (box_mesh, box_material) = + (box_mesh.as_ref().unwrap(), box_material.as_ref().unwrap()); + commands.spawn(( + Mesh3d(box_mesh.clone()), + MeshMaterial3d(box_material.clone()), + Transform::from_xyz(x as f32, 0.5, z as f32), + )); + }, + ) + .await + .unwrap(); + }) + .detach(); + } + } +} + +/// Rotates the point light around the origin (0, 0, 0) +fn rotate_light(mut query: Query<&mut Transform, With>, time: Res

, +pub struct AsyncSystemState { + pub(crate) _p: PhantomData

, /// A `Weak` is used so tasks do not stay alive if the world is dropped. /// If the world goes away, upgrading this weak pointer fails and access /// returns [`EcsAccessError::WorldDropped`]. - pub(crate) bridge: Weak, + pub(crate) world: AsyncWorld, /// Type-erased storage for the underlying `SystemState

`. /// @@ -34,37 +35,45 @@ pub struct EcsAccess { /// /// This is also important not only to persist params like `Local` but *also* so `Changed` and /// `Added` and other filters can work. - pub(crate) system_state: Arc, + pub(crate) inner: Arc, } -impl Clone for EcsAccess

{ +impl Clone for AsyncSystemState

{ fn clone(&self) -> Self { Self { - phantom_data: PhantomData::default(), - bridge: self.bridge.clone(), - system_state: self.system_state.clone(), + _p: PhantomData::default(), + world: self.world.clone(), + inner: self.inner.clone(), } } } -impl EcsAccess

{ - pub async fn access( +impl AsyncSystemState

{ + pub fn new(world: AsyncWorld) -> Self { + Self { + _p: PhantomData::default(), + world, + inner: Arc::new(SystemStateCell::

::default()), + } + } + + pub async fn bridge( &self, _sync_point: SyncPoint, - access_fn: AccessFn, + bridge_fn: BridgeFn, ) -> Result where - for<'w, 's> AccessFn: FnOnce(P::Item<'w, 's>) -> Out, + for<'w, 's> BridgeFn: FnOnce(P::Item<'w, 's>) -> Out, { - EcsAccessFuture { - phantom_data: PhantomData::default(), - system_set: async_bridge::drive_async_bridge:: + BridgeFuture { + _p: PhantomData::default(), + system_set: async_bridge::async_world_sync_point:: .into_system_set() .intern(), - system_func: Some(access_fn), + bridge_fn: Some(bridge_fn), wake_signal: None, - system_state: self.system_state.clone(), - bridge: self.bridge.clone(), + system_state: self.inner.clone(), + world: self.world.clone(), } .await } @@ -83,28 +92,28 @@ pub enum EcsAccessError { } /// Future representing a single in-flight ECS access request. -struct EcsAccessFuture { - phantom_data: PhantomData<(P, Func, Out)>, +struct BridgeFuture { + _p: PhantomData<(P, Func, Out)>, /// Interned system-set key identifying which sync-point queue this future /// should be sent to. system_set: InternedSystemSet, /// This is the pseudo-system that we try to run when we have access to `World`. /// This is an option just so we can take it out when we run it so we can use `FnOnce` /// instead of `FnMut`, so it's more flexible than real systems. - system_func: Option, + bridge_fn: Option, /// Wake signal for the currently queued wake cycle, if any. /// /// The future drops this at the end of `poll` which acts as acknowledgement that the wake /// has been handled. wake_signal: Option, - system_state: Arc, + system_state: Arc, /// Weak bridge pointer so the loss of the world becomes a clean runtime error. - bridge: Weak, + world: AsyncWorld, } -impl Unpin for EcsAccessFuture {} +impl Unpin for BridgeFuture {} -impl Future for EcsAccessFuture +impl Future for BridgeFuture where P: SystemParam + 'static, for<'w, 's> Func: FnOnce(P::Item<'w, 's>) -> Out, @@ -127,23 +136,27 @@ where // Try to gain a strong reference to the bridge. If this fails, the world is gone, // so further access is impossible. - let async_bridge = match self.bridge.upgrade() { + let strong_world = match self.world.0.upgrade() { None => { return Poll::Ready(Err(EcsAccessError::WorldDropped)); } - Some(async_ecs) => async_ecs, + Some(strong_world) => strong_world, }; - match async_bridge + match strong_world .world_scope .try_with(|world| { - let system_state = self.system_state.clone(); + let Self { + ref system_state, + ref mut bridge_fn, + .. + } = *self; // Attempt to acquire the typed `SystemState

`. // // We deliberately use `try_lock` rather than blocking. If // another bridge request is currently using the same system // state, we simply yield and let the sync-point driver try again // on a later internal tick. - let Some(mut system_state_guard) = system_state.try_lock::

() else { + let Some(mut system_state) = system_state.try_lock::

(world) else { return Poll::Pending; }; // This one really shouldn't happen very often. If we created this task *while* @@ -151,9 +164,6 @@ where // never actually got initialized, and even though we *have* access to the world, // for safetyreasons we have to perform our initialization on the main world-thread, // not here. - let Some(mut system_state) = system_state_guard.as_mut() else { - return Poll::Pending; - }; if !system_state.meta().is_send() { return Poll::Ready(Err(EcsAccessError::SystemParamValidation( bevy_ecs::system::SystemParamValidationError::invalid::< @@ -161,8 +171,9 @@ where >("Cannot have your system be non-send / exclusive"), ))); } - let state = match system_state.get_mut(world) { - Ok(state) => state, + + let param = match system_state.get_mut(world) { + Ok(param) => param, Err(system_param_validation_error) => { return Poll::Ready(Err(EcsAccessError::SystemParamValidation( system_param_validation_error, @@ -171,7 +182,7 @@ where }; // We finally have `P::Item<'w, 's>`, yay!, so consume the stored `FnOnce`, run it, // and complete the future. - Poll::Ready(Ok(self.system_func.take().unwrap()(state))) + Poll::Ready(Ok(bridge_fn.take().unwrap()(param))) }) .ok() { @@ -194,14 +205,13 @@ where // been processed. // 3. An initialization hint for the typed `SystemState`. // 4. The erased `SystemState` storage itself. - async_bridge - .requests_by_sync_point + strong_world + .bridge_requests .try_send( &self.system_set, - QueuedBridgeRequest { + BridgeRequest { waker: cx.waker().clone(), wake_signal: wait_barrier, - initialized: self.system_state.is_initialized(), system_state: self.system_state.clone(), }, ) diff --git a/crates/bevy_async/src/lib.rs b/crates/bevy_async/src/lib.rs index 67c878eaa196c..27c8992ec295f 100644 --- a/crates/bevy_async/src/lib.rs +++ b/crates/bevy_async/src/lib.rs @@ -15,12 +15,13 @@ mod plugin; mod system_state_store; mod wake_signal; -pub use crate::plugin::AsyncPlugin; +pub use crate::async_bridge::async_world_sync_point; +pub use crate::ecs_access::{AsyncSystemState, EcsAccessError}; +pub use crate::plugin::{AsyncPlugin, AsyncWorld}; pub mod prelude { #[doc(hidden)] pub use crate::{ - async_bridge::drive_async_bridge, - plugin::{AsyncBridge, AsyncPlugin}, + async_world_sync_point, AsyncPlugin, AsyncSystemState, AsyncWorld, EcsAccessError, }; } diff --git a/crates/bevy_async/src/plugin.rs b/crates/bevy_async/src/plugin.rs index 009e195c4939f..408383a14d11c 100644 --- a/crates/bevy_async/src/plugin.rs +++ b/crates/bevy_async/src/plugin.rs @@ -1,9 +1,7 @@ -use crate::ecs_access::EcsAccess; -use crate::system_state_store::SystemStateStore; +use crate::ecs_access::AsyncSystemState; use bevy_app::App; use bevy_ecs::system::SystemParam; -use std::marker::PhantomData; -use std::sync::Arc; +use std::sync::{Arc, Weak}; /// Plugin entry point for the async <-> ECS bridge system. /// @@ -40,39 +38,44 @@ impl Default for AsyncPlugin { impl bevy_app::Plugin for AsyncPlugin { fn build(&self, app: &mut App) { - app.insert_resource(MaxAsyncTicksPerSyncPoint( - self.max_async_ticks_per_sync_point, - )) - .init_resource::(); + let strong_world = StrongAsyncWorld::default(); + let weak_world = AsyncWorld(Arc::downgrade(&strong_world.0)); + app.insert_resource(AsyncTickBudget(self.max_async_ticks_per_sync_point)) + .insert_resource(strong_world) + .insert_resource(weak_world); } } /// Internal resource to manage a limit on how many times we try to drive the async <-> ecs bridge /// per sync point. #[derive(bevy_ecs_macros::Resource, Clone)] -pub(crate) struct MaxAsyncTicksPerSyncPoint(pub(crate) usize); +pub(crate) struct AsyncTickBudget(pub(crate) usize); /// This resource gives one the ability to create a bridge between an async task and the ecs. /// By calling `AsyncBridge::new(&self)` you create a new bridge between an async task /// and the ecs. #[derive(bevy_ecs_macros::Resource, Default, Clone)] -pub struct AsyncBridge(pub(crate) Arc); +pub struct AsyncWorld(pub(crate) Weak); -impl AsyncBridge { +/// StrongAsyncWorld is the singular strong handle to the Inner that lives in a private Resource. +#[derive(bevy_ecs_macros::Resource, Default, Clone)] +pub(crate) struct StrongAsyncWorld(pub(crate) Arc); + +impl AsyncWorld { /// Creates a reusable async handle for accessing the ECS with the /// `SystemParam` type `P`. /// /// This is the entry-point to let an /// async task interact with Bevy ECS state. /// - /// The returned [`EcsAccess

`]: + /// The returned [`AsyncSystemState

`]: /// - is cheap to clone, /// - can be moved into async tasks, /// - does not access the world immediately, - /// [`EcsAccess

`] waits until a matching sync point drives the bridge and + /// [`AsyncSystemState

`] waits until a matching sync point drives the bridge and /// temporarily grants safe ECS access. /// - /// You create one of these from a cloned [`AsyncBridge`] resource and + /// You create one of these from a cloned [`AsyncWorld`] resource and /// then call `.access(...)` inside async code whenever you want to access the ECS. /// /// # Example @@ -91,12 +94,12 @@ impl AsyncBridge { /// fn main() { /// let mut app = App::new(); /// app.add_plugins((AsyncPlugin::default(), ScheduleRunnerPlugin::default(), TaskPoolPlugin::default())); - /// app.add_systems(Update, drive_async_bridge::); - /// app.add_systems(Startup, move |bridge: Res| { - /// let bridge = bridge.clone(); + /// app.add_systems(Update, async_world_sync_point::); + /// app.add_systems(Startup, move |world: Res| { + /// let world = world.clone(); /// AsyncComputeTaskPool::get().spawn(async move { - /// let ecs_access = bridge.new::(); - /// ecs_access.access(MySyncPoint, |mut commands: Commands| { + /// let system_state = world.system_state::(); + /// system_state.bridge(MySyncPoint, |mut commands: Commands| { /// commands.spawn_empty(); /// ACCESS_RAN.store(true, Ordering::Relaxed); /// }).await.unwrap(); @@ -111,11 +114,7 @@ impl AsyncBridge { /// /// `P` is stored lazily, meaning the underlying `SystemState

` is only /// initialized when the bridge is first driven against a real `World`. - pub fn new(&self) -> EcsAccess

{ - EcsAccess { - phantom_data: PhantomData::default(), - bridge: Arc::downgrade(&self.0), - system_state: Arc::new(SystemStateStore::

::default()), - } + pub fn system_state(&self) -> AsyncSystemState

{ + AsyncSystemState::new(self.clone()) } } diff --git a/crates/bevy_async/src/system_state_store.rs b/crates/bevy_async/src/system_state_store.rs index 8c1fd2be0c7df..f29c023a24815 100644 --- a/crates/bevy_async/src/system_state_store.rs +++ b/crates/bevy_async/src/system_state_store.rs @@ -1,6 +1,6 @@ use bevy_ecs::system::{SystemParam, SystemState}; use bevy_ecs::world::World; -use bevy_platform::sync::{atomic::AtomicBool, Mutex, MutexGuard}; +use bevy_platform::sync::{Mutex, MutexGuard, OnceLock}; /// Stores a typed `SystemState

` behind a mutex so it can be initialized once /// and then shared across bridge requests. @@ -15,16 +15,13 @@ use bevy_platform::sync::{atomic::AtomicBool, Mutex, MutexGuard}; /// initialize the `SystemState

` from a thread *other* than the world-owning thread, so /// we have to start it as none and have the initialization occur on the world-owning thread before /// the `SystemState

` is ever used. -pub(crate) struct SystemStateStore( - Mutex>>, - AtomicBool, -); +pub(crate) struct SystemStateCell(OnceLock>>); -impl Default for SystemStateStore

{ +impl Default for SystemStateCell

{ fn default() -> Self { // Start uninitialized. Initialization is deferred until the request is // first driven on the world-owning thread with access to `&mut World`. - Self(Mutex::new(None), AtomicBool::new(false)) + Self(OnceLock::default()) } } @@ -36,68 +33,37 @@ impl Default for SystemStateStore

{ /// - initialize the typed `SystemState` if needed, /// - apply deferred state back into the world, /// - ask whether initialization has already happened. -pub(crate) trait ErasedSystemStateStore: Send + Sync + core::any::Any + 'static { - /// Lazily initialize the underlying typed `SystemState`. - /// - /// If initialization has already happened, this is idempodent, however it is not 0-cost because - /// of the forced pointer chasing and indirection of a `dyn Trait`. - /// - /// This must run on the world-owning thread because `SystemState::new` - /// requires `&mut World`. - fn init(&self, world: &mut World); - +pub(crate) trait ErasedSystemStateCell: Send + Sync + core::any::Any + 'static { /// Apply deferred operations accumulated by the `SystemState` back into /// the world. /// /// For example, `Commands` buffers are typically flushed during `apply`. fn apply(&self, world: &mut World); - - /// Returns `true` if the system is initialized, `false` if it is uninitialized. - fn is_initialized(&self) -> bool; } -impl ErasedSystemStateStore for SystemStateStore

{ - fn init(&self, world: &mut World) { - // Lock the store so only one thread/driver path can perform the lazy - // initialization. - let mut system_state = self.0.lock().unwrap(); - // If another earlier request already initialized the state we are done. - if system_state.is_some() { - return; - } - self.1 - .store(true, bevy_platform::sync::atomic::Ordering::Relaxed); - system_state.replace(SystemState::new(world)); - } - +impl ErasedSystemStateCell for SystemStateCell

{ fn apply(&self, world: &mut World) { // We expect initialization to have already occurred before `apply` is // ever called. So `unwrap()` here reflects an invariant of the bridge. // Completed requests only exist for initialized system states. - self.0.lock().unwrap().as_mut().unwrap().apply(world); - } - - fn is_initialized(&self) -> bool { - // If the atomic bool *says* it's loaded then we know for sure it is. - // Otherwise we have to conservatively assume it's not initialized. - // This is okay because our initialization logic is idempotent. - self.1.load(bevy_platform::sync::atomic::Ordering::Relaxed) + self.0.get().unwrap().lock().unwrap().apply(world); } } -impl dyn ErasedSystemStateStore { - pub(crate) fn try_lock( - &self, - ) -> Option>>> { - // Recover the concrete typed store from the erased trait object. - // - // This `unwrap()` encodes another invariant of the design, it is the case that every - // call site must ask for the same `P` that was originally used to create the erased store. - // A mismatch here would be a logic bug in the bridge, and should never ever happen. +impl dyn ErasedSystemStateCell { + pub(crate) fn try_lock<'w, 'a, P: SystemParam + 'static>( + &'a self, + world: &'w mut World, + ) -> Option>> + where + 'a: 'w, + { (self as &dyn core::any::Any) - .downcast_ref::>() + .downcast_ref::>() + // Caller must use the same `Params` that created this cell. .unwrap() .0 + .get_or_init(|| Mutex::new(SystemState::new(world))) // Use `try_lock` rather than blocking: // if another request currently owns the typed `SystemState

`, the // caller should yield with `Poll::Pending` instead of stalling a diff --git a/examples/async_tasks/async_ecs_access.rs b/examples/async_tasks/async_ecs_access.rs index 2d2e92b6b9553..36a123e00e4f6 100644 --- a/examples/async_tasks/async_ecs_access.rs +++ b/examples/async_tasks/async_ecs_access.rs @@ -5,7 +5,7 @@ //! Unlike the channel-based approach (where tasks send results directly via a communication //! channel) or the direct approach in async_compute, this example uses the ecs <-> async bridge. -use bevy::async_bridge::prelude::{drive_async_bridge, AsyncBridge}; +use bevy::async_bridge::prelude::{async_world_sync_point, AsyncWorld}; use bevy::{prelude::*, tasks::AsyncComputeTaskPool}; use rand::RngExt; @@ -15,7 +15,10 @@ fn main() { App::new() .add_plugins(DefaultPlugins) .add_systems(Startup, setup) - .add_systems(Update, (drive_async_bridge::, rotate_light)) + .add_systems( + Update, + (async_world_sync_point::, rotate_light), + ) .run(); } @@ -33,7 +36,7 @@ const LIGHT_RADIUS: f32 = 8.0; /// to be handled asynchronously, without blocking the main game thread. fn setup( mut commands: Commands, - bridge: Res, + bridge: Res, mut meshes: ResMut>, mut materials: ResMut>, ) { From 0b1374937f9b49f3353a211decba97efc70ccca2 Mon Sep 17 00:00:00 2001 From: Malek Date: Sun, 29 Mar 2026 15:35:42 -0400 Subject: [PATCH 03/19] reorganized some types including wait_barrier and added more documentation and fixed cit --- Cargo.toml | 4 +- .../src/{ecs_access.rs => bridge_future.rs} | 77 ++++++++-------- .../{async_bridge.rs => bridge_request.rs} | 53 +++++------ crates/bevy_async/src/lib.rs | 87 +++++++++++++++++-- crates/bevy_async/src/plugin.rs | 27 +++--- ...{system_state_store.rs => system_state.rs} | 23 +++-- crates/bevy_async/src/wake_signal.rs | 52 +++++------ crates/bevy_solari/src/realtime/mod.rs | 8 +- ...cs_access.rs => async_bridge_primitive.rs} | 39 ++++----- 9 files changed, 223 insertions(+), 147 deletions(-) rename crates/bevy_async/src/{ecs_access.rs => bridge_future.rs} (76%) rename crates/bevy_async/src/{async_bridge.rs => bridge_request.rs} (76%) rename crates/bevy_async/src/{system_state_store.rs => system_state.rs} (75%) rename examples/async_tasks/{async_ecs_access.rs => async_bridge_primitive.rs} (73%) diff --git a/Cargo.toml b/Cargo.toml index 311083527044d..f5b11de1f3187 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2138,8 +2138,8 @@ category = "Async Tasks" wasm = false [[example]] -name = "async_ecs_access" -path = "examples/async_tasks/async_ecs_access.rs" +name = "async_bridge" +path = "examples/async_tasks/async_bridge_primitive.rs" doc-scrape-examples = true [package.metadata.example.async_ecs_access] diff --git a/crates/bevy_async/src/ecs_access.rs b/crates/bevy_async/src/bridge_future.rs similarity index 76% rename from crates/bevy_async/src/ecs_access.rs rename to crates/bevy_async/src/bridge_future.rs index 0228eb84cb73a..7ef67b606aa80 100644 --- a/crates/bevy_async/src/ecs_access.rs +++ b/crates/bevy_async/src/bridge_future.rs @@ -1,8 +1,8 @@ -use crate::async_bridge; -use crate::async_bridge::BridgeRequest; +use crate::bridge_request::BridgeRequest; use crate::plugin::AsyncWorld; -use crate::system_state_store::{ErasedSystemStateCell, SystemStateCell}; -use crate::wake_signal::WakeSignal; +use crate::system_state::{ErasedSystemStateCell, SystemStateCell}; +use crate::wake_signal::WakeSignaler; +use crate::{bridge_request, wake_signal}; use bevy_ecs::schedule::{InternedSystemSet, IntoSystemSet, SystemSet}; use bevy_ecs::system::SystemParam; use bevy_platform::sync::Arc; @@ -25,7 +25,7 @@ pub struct AsyncSystemState { /// A `Weak` is used so tasks do not stay alive if the world is dropped. /// If the world goes away, upgrading this weak pointer fails and access - /// returns [`EcsAccessError::WorldDropped`]. + /// returns [`BridgeError::WorldDropped`]. pub(crate) world: AsyncWorld, /// Type-erased storage for the underlying `SystemState

`. @@ -35,52 +35,62 @@ pub struct AsyncSystemState { /// /// This is also important not only to persist params like `Local` but *also* so `Changed` and /// `Added` and other filters can work. - pub(crate) inner: Arc, + pub(crate) system_state: Arc, } impl Clone for AsyncSystemState

{ fn clone(&self) -> Self { Self { - _p: PhantomData::default(), + _p: PhantomData, world: self.world.clone(), - inner: self.inner.clone(), + system_state: self.system_state.clone(), } } } impl AsyncSystemState

{ + /// Create a new `AsyncSystemState` from an `AsyncWorld` matching the Api surface of + /// `SystemState` with `World`. pub fn new(world: AsyncWorld) -> Self { Self { - _p: PhantomData::default(), + _p: PhantomData, world, - inner: Arc::new(SystemStateCell::

::default()), + system_state: Arc::new(SystemStateCell::

::default()), } } + /// This function allows us to create a bridge between the async task we are in and the ecs + /// world we want access to, effectively running a system from an async task. The systems run + /// here are able to take in `&` and `&mut` variables from the surrounding context unlike + /// standard Bevy systems. + /// + /// We bridge *at* the `_sync_point` `SyncPoint` with our `bridge_fn`. pub async fn bridge( &self, _sync_point: SyncPoint, bridge_fn: BridgeFn, - ) -> Result + ) -> Result where for<'w, 's> BridgeFn: FnOnce(P::Item<'w, 's>) -> Out, { BridgeFuture { - _p: PhantomData::default(), - system_set: async_bridge::async_world_sync_point:: + _p: PhantomData, + system_set: bridge_request::async_world_sync_point:: .into_system_set() .intern(), bridge_fn: Some(bridge_fn), wake_signal: None, - system_state: self.inner.clone(), + system_state: self.system_state.clone(), world: self.world.clone(), } .await } } +/// If the bridge cannot run, either because the system params were invalid, or because the world it +/// was referencing no longer exists, we return this error. #[derive(thiserror::Error, Debug)] -pub enum EcsAccessError { +pub enum BridgeError { /// The requested `SystemParam` was invalid in the current world context. /// for example trying to access a param that fails Bevy's usual validation like a missing /// Resource or using `Single` on something that has 0 or multiple instances. @@ -91,7 +101,7 @@ pub enum EcsAccessError { WorldDropped, } -/// Future representing a single in-flight ECS access request. +/// Future representing a single in-flight bridging request between our async task and our `World`. struct BridgeFuture { _p: PhantomData<(P, Func, Out)>, /// Interned system-set key identifying which sync-point queue this future @@ -99,13 +109,13 @@ struct BridgeFuture { system_set: InternedSystemSet, /// This is the pseudo-system that we try to run when we have access to `World`. /// This is an option just so we can take it out when we run it so we can use `FnOnce` - /// instead of `FnMut`, so it's more flexible than real systems. + /// instead of `FnMut`, so it's more flexible than true systems. bridge_fn: Option, /// Wake signal for the currently queued wake cycle, if any. /// /// The future drops this at the end of `poll` which acts as acknowledgement that the wake /// has been handled. - wake_signal: Option, + wake_signal: Option, system_state: Arc, /// Weak bridge pointer so the loss of the world becomes a clean runtime error. world: AsyncWorld, @@ -118,7 +128,7 @@ where P: SystemParam + 'static, for<'w, 's> Func: FnOnce(P::Item<'w, 's>) -> Out, { - type Output = Result; + type Output = Result; fn poll( mut self: core::pin::Pin<&mut Self>, @@ -127,7 +137,7 @@ where use core::task::Poll; // If we were previously woken by the sync-point driver, we will have a - // `WakeSignal` stored here. + // `WakeSignaler` stored here. // // Dropping that signal at the end of this poll acts as the // acknowledgement that yes, this wake was observed and this task has @@ -136,11 +146,8 @@ where // Try to gain a strong reference to the bridge. If this fails, the world is gone, // so further access is impossible. - let strong_world = match self.world.0.upgrade() { - None => { - return Poll::Ready(Err(EcsAccessError::WorldDropped)); - } - Some(strong_world) => strong_world, + let Some(strong_world) = self.world.0.upgrade() else { + return Poll::Ready(Err(BridgeError::WorldDropped)); }; match strong_world .world_scope @@ -159,13 +166,9 @@ where let Some(mut system_state) = system_state.try_lock::

(world) else { return Poll::Pending; }; - // This one really shouldn't happen very often. If we created this task *while* - // the sync point driver was running this will occur. In that case the system state - // never actually got initialized, and even though we *have* access to the world, - // for safetyreasons we have to perform our initialization on the main world-thread, - // not here. + if !system_state.meta().is_send() { - return Poll::Ready(Err(EcsAccessError::SystemParamValidation( + return Poll::Ready(Err(BridgeError::SystemParamValidation( bevy_ecs::system::SystemParamValidationError::invalid::< bevy_ecs::prelude::NonSend<()>, >("Cannot have your system be non-send / exclusive"), @@ -175,7 +178,7 @@ where let param = match system_state.get_mut(world) { Ok(param) => param, Err(system_param_validation_error) => { - return Poll::Ready(Err(EcsAccessError::SystemParamValidation( + return Poll::Ready(Err(BridgeError::SystemParamValidation( system_param_validation_error, ))) } @@ -189,14 +192,14 @@ where Some(out) => out, None => { // No world is currently exposed. That means we are being polled - // outside the sync-point drive, so we cannot access ECS yet. + // outside the `async_world_sync_point`, so we cannot access ECS yet. // // Instead, enqueue ourselves to be revisited when the matching // sync-point system runs. - let wait_barrier = WakeSignal::new(); - // Store one clone locally so dropping it at the end of the next + let (wake_signal, wake_waiter) = wake_signal::pair(); + // Store the wake_signal locally so dropping it at the end of the next // poll acknowledges the wake. - self.wake_signal.replace(wait_barrier.clone()); + self.wake_signal.replace(wake_signal); // Queue the request under this future's target sync point. // // The queued payload carries the following! @@ -211,7 +214,7 @@ where &self.system_set, BridgeRequest { waker: cx.waker().clone(), - wake_signal: wait_barrier, + wake_waiter, system_state: self.system_state.clone(), }, ) diff --git a/crates/bevy_async/src/async_bridge.rs b/crates/bevy_async/src/bridge_request.rs similarity index 76% rename from crates/bevy_async/src/async_bridge.rs rename to crates/bevy_async/src/bridge_request.rs index 32db395081e37..ff20af466d7ef 100644 --- a/crates/bevy_async/src/async_bridge.rs +++ b/crates/bevy_async/src/bridge_request.rs @@ -1,22 +1,26 @@ use crate::plugin::{AsyncTickBudget, StrongAsyncWorld}; -use crate::system_state_store::ErasedSystemStateCell; +use crate::system_state::ErasedSystemStateCell; use bevy_ecs::prelude::{IntoSystemSet, SystemSet, World}; use bevy_ecs::schedule::InternedSystemSet; use bevy_platform::sync::Arc; /// Drives the queued bridge work for `SyncPoint`. /// -/// Every queued access request is guaranteed to be *woken*. That wake guarantees the corresponding +/// Every queued bridge request is guaranteed to be *woken*. That wake guarantees the corresponding /// async future gets a chance to poll. /// It does *not* however guarantee the poll will finish its ECS work, because that /// poll may still fail to finish it's work for a *variety* of reasons, i.e. it is unable to acquire /// the typed `SystemState` lock and returns `Poll::Pending`. /// -/// This function attempts to drive queued work several times, up to -/// `MaxAsyncTicksPerSyncPoint`. If one internal tick finds no work, we opportunistically tick the -/// global task pool and try once more before returning early. +/// For [`bevy_tasks::TaskPool::spawn_local`] we *are* actually guaranteed that the poll will finish +/// it's ECS work, because it's single threaded, so you can use `spawn_local` if you want +/// determinism. /// -/// We drive queued work multiple times for two reasons. The first is that serial `.await` calls +/// This function attempts to tick queued work several times, up to `MaxAsyncTicksPerSyncPoint`. +/// If one internal tick finds no work, we opportunistically tick the local global task pool and +/// try once more before returning early. +/// +/// We tick queued work multiple times for two reasons. The first is that serial `.await` calls /// should try to all be completed within the same `SyncPoint` such as /// ```rust,ignore /// let health = task_1.run(|health: Single<&Health, With>| { @@ -30,14 +34,14 @@ use bevy_platform::sync::Arc; /// }).await; /// ``` /// The second reason is spoken of prior. Poll may fail to finish for a variety of reasons and -/// should be given several chances before quitting. +/// should be given several chances before giving up. pub fn async_world_sync_point(world: &mut World) { // Derive the stable interned system-set key used to look up requests queued // for this exact sync point type. let sync_point = async_world_sync_point:: .into_system_set() .intern(); - let strong_world = world.get_resource::().unwrap().clone(); + let async_world = world.get_resource::().unwrap().clone(); // Read the configured maximum number of internal attempts we are willing to // perform during this `SyncPoint`. let max_ticks = world.get_resource::().unwrap().0; @@ -45,7 +49,7 @@ pub fn async_world_sync_point(world: &mut World) { // Drive once. If no work was found, we may truly be done. // but we should give external task pools one more opportunity to make newly-woken // tasks runnable. - if strong_world.0.tick_sync_point(sync_point, world) == TickResult::NoWork { + if async_world.0.tick_sync_point(sync_point, world) == TickResult::NoWork { #[cfg(feature = "bevy_tasks")] bevy_tasks::cfg::web! { if {} else { @@ -54,7 +58,7 @@ pub fn async_world_sync_point(world: &mut World) { } // Retry once after ticking the global pool. If we are still idle, // there is no more immediately available progress to make. - if strong_world.0.tick_sync_point(sync_point, world) == TickResult::NoWork { + if async_world.0.tick_sync_point(sync_point, world) == TickResult::NoWork { return; } } @@ -69,16 +73,15 @@ pub(crate) struct AsyncWorldInner { } impl AsyncWorldInner { - /// This drives a single sync point, requesting the poll of all tasks in that sync point. + /// This ticks a single sync point, requesting the poll of all tasks in that sync point. /// None of the tasks are guaranteed to actually return `Poll::Ready`, but all are guaranteed to /// at least do a `Poll::Pending` /// /// The flow of logic is the following: /// 1. We first drain the queue for our `SyncPoint`. - /// 2. We initialize the request's `SystemState`. (This is idempotent). - /// 3. Expose our `World` through `world_scope`. - /// 4. Wake all our `EcsAccessFuture`s. - /// 5. Apply our `SystemState` back into the `World`. (Things like `Commands`). + /// 2. Expose our `World` through `world_scope`. + /// 3. Wake all our `BridgeFuture`s. + /// 4. Apply our `SystemState` back into the `World`. (Things like `Commands`). fn tick_sync_point(&self, sync_point: InternedSystemSet, world: &mut World) -> TickResult { let mut queued_requests = bevy_platform::prelude::vec![]; while let Ok(queued_task_bridge) = self.bridge_requests.get_or_create(&sync_point).pop() { @@ -103,10 +106,10 @@ impl AsyncWorldInner { } } -/// Whether a drive attempt made any progress. +/// Whether a tick attempt made any progress. #[derive(PartialEq)] enum TickResult { - /// We found and processed at least one queued request. + /// We found and processed at least one queued bridge request. DidWork, /// There was no queued work available for the `SyncPoint`. NoWork, @@ -114,19 +117,19 @@ enum TickResult { /// A queued access request bridging an async task into ECS. pub(crate) struct BridgeRequest { - /// Waker for the async future that wants ECS access. + /// Waker for the async future ([`crate::bridge_future::BridgeFuture`]) that wants ECS access. /// When the `SyncPoint` is driven, this waker is fired so the future can /// poll while `world_scope` exposes the current `World`. pub(crate) waker: core::task::Waker, /// Our custom primitive that lets us wait until all the futures have tried to run before /// continuing. - pub(crate) wake_signal: crate::wake_signal::WakeSignal, + pub(crate) wake_waiter: crate::wake_signal::WakeWaiter, pub(crate) system_state: Arc, } -/// A queued access request whose waker has already been fired. +/// A queued bridge request whose waker has already been fired. struct WokenBridgeRequest { - wake_signal: crate::wake_signal::WakeSignal, + wake_signal: crate::wake_signal::WakeWaiter, system_state: Arc, } @@ -146,16 +149,16 @@ impl CompletedBridgeRequest { fn wake_requests_and_wait( queued_requests: bevy_platform::prelude::Vec, ) -> bevy_platform::prelude::Vec { - let bridged_tasks = queued_requests + let bridged_futures = queued_requests .into_iter() .map( |BridgeRequest { system_state, waker, - wake_signal, + wake_waiter: wake_signal, .. }| { - // Trigger the async future so it can poll while `world_scope` + // Trigger the `BridgeFuture` so it can poll while `world_scope` // is active. waker.wake(); WokenBridgeRequest { @@ -175,7 +178,7 @@ fn wake_requests_and_wait( } } - bridged_tasks + bridged_futures .into_iter() .map( |WokenBridgeRequest { diff --git a/crates/bevy_async/src/lib.rs b/crates/bevy_async/src/lib.rs index 27c8992ec295f..b21e4c139139a 100644 --- a/crates/bevy_async/src/lib.rs +++ b/crates/bevy_async/src/lib.rs @@ -1,27 +1,96 @@ +//! The objective here is to coordinate two participants that want to share World access: +//! +//! - The main Bevy schedule +//! - Futures and async tasks running on other threads +//! +//! This is done through the bridge primitive introduced in this crate +//! +//! +//! Invariants of this crate: +//! +//! - Normal rust safety invariants for &mut World (aliasing) +//! - At most one future has world access at a time +//! - Futures only access the world while the scoped pointer (managed by the bridge driver) is live +//! - `SystemState` is always initialized before use +//! - Deferred ops are only applied after every future finishes polling and releases world access +//! - The driver can't deadlock +//! - All futures that want world access can eventually complete (assuming fair scheduling by the async runtime) +//! - If the world is dropped, futures don't leak and eventually finish (in an error state) +//! +//! +//! The protocol: +//! +//! Futures (tasks on worker threads) +//! - enqueue requests (create signal guard clones: one kept, one sent) +//! +//! - Driver([`async_world_sync_point`]) (exclusive system, world-owning thread) +//! 1. Drain request queue for this sync point +//! 2. Publish World pointer (via `scoped_static_storage`). Future access scope begins +//! 3. Wake all drained futures +//! +//! -> Futures race for locks (non-blocking) +//! +//! -> Success: acquire both locks, do work, complete +//! +//! -> Failure: signal driver (Drop signal guard), re-enqueue later +//! +//! -> Direct access: non-queued future polled during scope, +//! bypasses queue, acquires locks, completes (no signal) +//! 4. Wait for all signal guards to drop + scope mutex released +//! 5. Unpublish pointer, scope ends. +//! 6. Apply any deferred ops from `SystemState` of polled futures +//! 7. Loop (up to [`AsyncTickBudget`]) or return +//! 8. Schedule resumes (normal systems run) +//! +//! +//! Dual locking: +//! +//! The published World pointer lock is managed by the `ScopedStatic` primitive in `scoped_static_storage` (only one future can lock this at a time) +//! `SystemState` locks are managed by the `SystemStateCell` primitive of this crate (futures using different `SystemState` types can work in parallel) +//! +//! +//! Preventing driver deadlocks when futures panic: +//! +//! If a future panics while holding locks, rust's panic unwinding drops destructors in reverse scope order +//! - First, the `SystemState` `MutexGuard` drops (releasing the lock) +//! - Second, the World pointer's scope `MutexGuard` drops (releasing the lock) +//! - Finally, the guard signal constructed by the future during `poll()` drops, and the driver is notified +//! +//! How futures can fail cleanly: +//! +//! If the [`AsyncWorld`] cannot be reached ([`bevy_platform::sync::Weak::upgrade`] fails during `poll()`), the world has been dropped and the future cannot complete. +//! +//! If `SystemState`s are invalid, they can't be used and the future cannot complete +//! +//! Regardless, the future returns Ready(Err) and completes permanently #![forbid(unsafe_code)] #![cfg_attr(docsrs, feature(doc_cfg))] #![doc( - html_logo_url = "https://bevy.org/assets/icon.png", - html_favicon_url = "https://bevy.org/assets/icon.png" + html_logo_url = "https://!bevy.org/assets/icon.png", + html_favicon_url = "https://!bevy.org/assets/icon.png" )] #![no_std] #[cfg(feature = "std")] extern crate std; -mod async_bridge; -mod ecs_access; +mod bridge_future; +mod bridge_request; mod plugin; -mod system_state_store; +mod system_state; mod wake_signal; -pub use crate::async_bridge::async_world_sync_point; -pub use crate::ecs_access::{AsyncSystemState, EcsAccessError}; -pub use crate::plugin::{AsyncPlugin, AsyncWorld}; +pub use crate::bridge_future::{AsyncSystemState, BridgeError}; +pub use crate::bridge_request::async_world_sync_point; +pub use crate::plugin::{AsyncPlugin, AsyncTickBudget, AsyncWorld}; +/// The async prelude. +/// +/// This includes the most common types in this crate, re-exported for your convenience. pub mod prelude { #[doc(hidden)] pub use crate::{ - async_world_sync_point, AsyncPlugin, AsyncSystemState, AsyncWorld, EcsAccessError, + async_world_sync_point, AsyncPlugin, AsyncSystemState, AsyncTickBudget, AsyncWorld, + BridgeError, }; } diff --git a/crates/bevy_async/src/plugin.rs b/crates/bevy_async/src/plugin.rs index 408383a14d11c..12a46c16669e5 100644 --- a/crates/bevy_async/src/plugin.rs +++ b/crates/bevy_async/src/plugin.rs @@ -1,24 +1,24 @@ -use crate::ecs_access::AsyncSystemState; +use crate::bridge_future::AsyncSystemState; use bevy_app::App; use bevy_ecs::system::SystemParam; -use std::sync::{Arc, Weak}; +use bevy_platform::sync::{Arc, Weak}; /// Plugin entry point for the async <-> ECS bridge system. /// -/// This plugin installs a configuration resource telling the bridge how aggressively to drive work +/// This plugin installs a configuration resource telling the driver([`crate::async_world_sync_point`]) how aggressively to drive work /// at each sync point. /// /// Conceptually, async tasks cannot directly access Bevy ECS state from arbitrary /// threads or arbitrary times. Instead, they enqueue requests which are later -/// driven from a known ECS `SyncPoint` on the world-owning thread. +/// driven from a known `SyncPoint` on the world-owning thread. /// /// This supports arbitrary async runtimes as well as multiple Bevy Worlds / Bevy Apps. pub struct AsyncPlugin { - /// Upper bound on how many internal bridge ticks we perform each time a + /// Upper bound on how many internal async world ticks we perform each time a /// sync point system runs. /// - /// A single "bridge tick" means: - /// 1. collect queued access requests for that sync point, + /// A single "tick" means: + /// 1. collect queued bridge requests for that sync point, /// 2. wake the corresponding async tasks, /// 3. wait for each one to attempt a poll, /// 4. apply any deferred `SystemState` work back into the world. @@ -49,17 +49,19 @@ impl bevy_app::Plugin for AsyncPlugin { /// Internal resource to manage a limit on how many times we try to drive the async <-> ecs bridge /// per sync point. #[derive(bevy_ecs_macros::Resource, Clone)] -pub(crate) struct AsyncTickBudget(pub(crate) usize); +pub struct AsyncTickBudget(pub usize); /// This resource gives one the ability to create a bridge between an async task and the ecs. /// By calling `AsyncBridge::new(&self)` you create a new bridge between an async task /// and the ecs. #[derive(bevy_ecs_macros::Resource, Default, Clone)] -pub struct AsyncWorld(pub(crate) Weak); +pub struct AsyncWorld(pub(crate) Weak); -/// StrongAsyncWorld is the singular strong handle to the Inner that lives in a private Resource. +/// [`StrongAsyncWorld`] is the singular strong handle to the Inner that lives in a private Resource. +/// We only expose [`Weak`] handles publicly so we can rely on the behavior that if the `World` +/// is dropped then we can detect it by a failing [`Weak::upgrade`] #[derive(bevy_ecs_macros::Resource, Default, Clone)] -pub(crate) struct StrongAsyncWorld(pub(crate) Arc); +pub(crate) struct StrongAsyncWorld(pub(crate) Arc); impl AsyncWorld { /// Creates a reusable async handle for accessing the ECS with the @@ -72,8 +74,9 @@ impl AsyncWorld { /// - is cheap to clone, /// - can be moved into async tasks, /// - does not access the world immediately, + /// /// [`AsyncSystemState

`] waits until a matching sync point drives the bridge and - /// temporarily grants safe ECS access. + /// temporarily grants safe ECS access. /// /// You create one of these from a cloned [`AsyncWorld`] resource and /// then call `.access(...)` inside async code whenever you want to access the ECS. diff --git a/crates/bevy_async/src/system_state_store.rs b/crates/bevy_async/src/system_state.rs similarity index 75% rename from crates/bevy_async/src/system_state_store.rs rename to crates/bevy_async/src/system_state.rs index f29c023a24815..633ce384b040d 100644 --- a/crates/bevy_async/src/system_state_store.rs +++ b/crates/bevy_async/src/system_state.rs @@ -2,19 +2,16 @@ use bevy_ecs::system::{SystemParam, SystemState}; use bevy_ecs::world::World; use bevy_platform::sync::{Mutex, MutexGuard, OnceLock}; -/// Stores a typed `SystemState

` behind a mutex so it can be initialized once -/// and then shared across bridge requests. +/// Stores a typed `SystemState

` behind a `OnceLock` so it can be initialized once +/// and then mutably shared across bridge requests. /// /// Why this exists: /// `SystemState

` is typed, but the bridge queue needs to store heterogeneous /// requests without knowing `P` at compile time. So each concrete -/// `SystemStateStore

` is later erased behind `dyn ErasedSystemStateStore`. +/// `SystemStateCell

` is later erased behind `dyn ErasedSystemStateCell`. /// -/// The inner `Option` starts as `None` because we cannot construct the -/// `SystemState

` until we have a mutable `World`. Furthermore, it is not safe to try to -/// initialize the `SystemState

` from a thread *other* than the world-owning thread, so -/// we have to start it as none and have the initialization occur on the world-owning thread before -/// the `SystemState

` is ever used. +/// We use a `OnceLock` because we cannot construct the `SystemState

` until we have a mutable +/// `World`. So we initialize it `SystemStateCell

` the first time it is used. pub(crate) struct SystemStateCell(OnceLock>>); impl Default for SystemStateCell

{ @@ -27,12 +24,10 @@ impl Default for SystemStateCell

{ /// Allows us to erase the `SystemStateCell` so we can pass it to and from the ecs. /// -/// This lets the bridge store all request state uniformly as `Arc`. +/// This lets the bridge store all request state uniformly as `Arc`. /// -/// This trait exposes the following operations: -/// - initialize the typed `SystemState` if needed, -/// - apply deferred state back into the world, -/// - ask whether initialization has already happened. +/// This trait exposes a single operation: to apply deferred state back into the `World`. +/// The second operation the trait is used for is in it's `impl dyn` implementation below. pub(crate) trait ErasedSystemStateCell: Send + Sync + core::any::Any + 'static { /// Apply deferred operations accumulated by the `SystemState` back into /// the world. @@ -51,6 +46,8 @@ impl ErasedSystemStateCell for SystemStateCell

{ } impl dyn ErasedSystemStateCell { + /// This function initializes the [`SystemStateCell`] if it hasn't already been initialized, and + /// then returns the [`MutexGuard`] of the `SystemState` if it isn't being used by another thread. pub(crate) fn try_lock<'w, 'a, P: SystemParam + 'static>( &'a self, world: &'w mut World, diff --git a/crates/bevy_async/src/wake_signal.rs b/crates/bevy_async/src/wake_signal.rs index 53cf5eb882ddd..70d64f0f6126b 100644 --- a/crates/bevy_async/src/wake_signal.rs +++ b/crates/bevy_async/src/wake_signal.rs @@ -1,33 +1,33 @@ use bevy_platform::sync::{Arc, Mutex}; -/// WakeSignal is a custom signaling primitive used in order to fufill our specific requirements for +/// [`WakeSignaler`] is a custom signaling primitive used in order to fufill our specific requirements for /// our async bridge. We need to wait at the sync point, after waking all the futures and only when /// all the futures have had a chance to run we stop waiting. /// We need this signaling to occur also if the future is dropped, or if the future panics /// so we implement the signaling *on* the Drop implementation. /// This also makes replacing the wake signal automatically drop and signal the previous one. -#[derive(Clone)] -#[cfg(feature = "std")] -pub(crate) struct WakeSignal( +pub(crate) struct WakeSignaler( #[cfg(feature = "std")] Arc<(Mutex, std::sync::Condvar)>, - #[cfg(not(feature = "std"))] Arc<(Mutex)>, + #[cfg(not(feature = "std"))] (), +); +/// Counterpart to the [`WakeSignaler`], the [`WakeWaiter`] waits for the [`WakeSignaler`] to drop and notify. +pub(crate) struct WakeWaiter( + #[cfg(feature = "std")] Arc<(Mutex, std::sync::Condvar)>, + #[cfg(not(feature = "std"))] (), ); -impl WakeSignal { - #[inline] - pub(crate) fn new() -> Self { - #[cfg(feature = "std")] - { - WakeSignal(Arc::new((Mutex::new(false), std::sync::Condvar::new()))) - } - #[cfg(not(feature = "std"))] - { - WakeSignal(Arc::new(Mutex::new(false))) - } - } +#[inline] +pub(crate) fn pair() -> (WakeSignaler, WakeWaiter) { + #[cfg(feature = "std")] + let inner = Arc::new((Mutex::new(false), std::sync::Condvar::new())); + #[cfg(not(feature = "std"))] + let inner = (); + (WakeSignaler(inner.clone()), WakeWaiter(inner)) +} - /// Waits until another cloned instance of `WakeSignal` has been dropped. - /// If any cloned instance of `WakeSignal` is dropped then this wait stops waiting. +impl WakeWaiter { + /// Waits until another cloned instance of [`WakeSignaler`] has been dropped. + /// If any cloned instance of [`WakeSignaler`] is dropped then this wait stops waiting. #[cfg(feature = "std")] #[inline] pub(crate) fn wait(&self) { @@ -41,15 +41,13 @@ impl WakeSignal { } #[cfg(not(feature = "std"))] { - loop { - if self.0.lock().unwrap() { - break; - } - } + // No-op on std, since we are only using local futures we should tick them + // prior to reaching this point. + return; } } } -impl Drop for WakeSignal { +impl Drop for WakeSignaler { #[cfg(feature = "std")] #[inline] fn drop(&mut self) { @@ -61,7 +59,5 @@ impl Drop for WakeSignal { #[cfg(not(feature = "std"))] #[inline] - fn drop(&mut self) { - *self.0.lock().unwrap() = true; - } + fn drop(&mut self) {} } diff --git a/crates/bevy_solari/src/realtime/mod.rs b/crates/bevy_solari/src/realtime/mod.rs index 15a41fcba62f8..bf8350eb16138 100644 --- a/crates/bevy_solari/src/realtime/mod.rs +++ b/crates/bevy_solari/src/realtime/mod.rs @@ -7,6 +7,7 @@ use bevy_app::{App, Plugin}; use bevy_asset::embedded_asset; use bevy_camera::Hdr; use bevy_core_pipeline::{ + core_3d::main_opaque_pass_3d, prepass::{ DeferredPrepass, DeferredPrepassDoubleBuffer, DepthPrepass, DepthPrepassDoubleBuffer, MotionVectorPrepass, @@ -68,7 +69,12 @@ impl Plugin for SolariLightingPlugin { Render, prepare_solari_lighting_resources.in_set(RenderSystems::PrepareResources), ) - .add_systems(Core3d, solari_lighting.in_set(Core3dSystems::MainPass)); + .add_systems( + Core3d, + solari_lighting + .before(main_opaque_pass_3d) + .in_set(Core3dSystems::MainPass), + ); } } diff --git a/examples/async_tasks/async_ecs_access.rs b/examples/async_tasks/async_bridge_primitive.rs similarity index 73% rename from examples/async_tasks/async_ecs_access.rs rename to examples/async_tasks/async_bridge_primitive.rs index 36a123e00e4f6..6da9371843f1c 100644 --- a/examples/async_tasks/async_ecs_access.rs +++ b/examples/async_tasks/async_bridge_primitive.rs @@ -3,10 +3,10 @@ //! asynchronously. //! //! Unlike the channel-based approach (where tasks send results directly via a communication -//! channel) or the direct approach in async_compute, this example uses the ecs <-> async bridge. +//! channel) or the direct approach in `async_compute`, this example uses the ecs <-> async bridge. use bevy::async_bridge::prelude::{async_world_sync_point, AsyncWorld}; -use bevy::{prelude::*, tasks::AsyncComputeTaskPool}; +use bevy::prelude::*; use rand::RngExt; struct MySyncPoint; @@ -28,15 +28,13 @@ const NUM_CUBES: i32 = 6; const LIGHT_RADIUS: f32 = 8.0; /// This system generates tasks simulating computationally intensive -/// work that potentially spans multiple frames/ticks. A separate -/// system, [`handle_tasks`], will track the spawned tasks on subsequent -/// frames/ticks, and use the results to spawn cubes. +/// work that potentially spans multiple frames/ticks. /// /// The task is offloaded to the `AsyncComputeTaskPool`, allowing heavy computation /// to be handled asynchronously, without blocking the main game thread. fn setup( mut commands: Commands, - bridge: Res, + async_world: Res, mut meshes: ResMut>, mut materials: ResMut>, ) { @@ -64,10 +62,9 @@ fn setup( let pool = bevy::tasks::AsyncComputeTaskPool::get(); // Reuse tasks so you don't have to pay the system init cost every time it runs. - let task = bridge.new::<( + let task = async_world.system_state::<( Commands, - Local>>, - Local>>, + Local, Handle)>>, ResMut>, ResMut>, )>(); @@ -79,20 +76,22 @@ fn setup( let delay = std::time::Duration::from_secs_f32(rand::rng().random_range(2.0..8.0)); // Simulate a delay before task completion futures_timer::Delay::new(delay).await; - task.access( + task.bridge( MySyncPoint, - |(mut commands, mut box_mesh, mut box_material, mut meshes, mut materials)| { - if box_mesh.is_none() { - box_mesh.replace(meshes.add(Cuboid::new(0.25, 0.25, 0.25))); + |(mut commands, mut box_handles, mut meshes, mut materials)| { + // The first time this bridge runs it will initialize the box mesh and box material, and then it will reuse them from then on. + if box_handles.is_none() { + box_handles.replace(( + meshes.add(Cuboid::new(0.25, 0.25, 0.25)), + materials.add(Color::srgb(1.0, 0.2, 0.3)), + )); } - if box_material.is_none() { - box_material.replace(materials.add(Color::srgb(1.0, 0.2, 0.3))); - } - let (box_mesh, box_material) = - (box_mesh.as_ref().unwrap(), box_material.as_ref().unwrap()); + + let (box_mesh, box_material) = box_handles.clone().unwrap(); + commands.spawn(( - Mesh3d(box_mesh.clone()), - MeshMaterial3d(box_material.clone()), + Mesh3d(box_mesh), + MeshMaterial3d(box_material), Transform::from_xyz(x as f32, 0.5, z as f32), )); }, From 5cc22f507c108319632cd2039b922e86b4fc55af Mon Sep 17 00:00:00 2001 From: Malek Date: Sun, 29 Mar 2026 16:57:39 -0400 Subject: [PATCH 04/19] fix ci --- Cargo.toml | 5 +++-- crates/bevy_async/Cargo.toml | 18 +++++++++--------- crates/bevy_async/src/wake_signal.rs | 2 +- examples/README.md | 1 + 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f5b11de1f3187..44c29632a6a04 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2141,9 +2141,10 @@ wasm = false name = "async_bridge" path = "examples/async_tasks/async_bridge_primitive.rs" doc-scrape-examples = true +wasm = false -[package.metadata.example.async_ecs_access] -name = "Async Ecs Access" +[package.metadata.example.async_bridge] +name = "Async Bridge" description = "An example showing how to offload work to background async tasks using the AsyncBridge primitive." category = "Async Tasks" wasm = true diff --git a/crates/bevy_async/Cargo.toml b/crates/bevy_async/Cargo.toml index c9290dc29f73e..c81b79bf253d4 100644 --- a/crates/bevy_async/Cargo.toml +++ b/crates/bevy_async/Cargo.toml @@ -12,11 +12,11 @@ keywords = ["bevy", "async"] default = ["std", "bevy_tasks"] std = [ - "bevy_app/std", - "bevy_ecs/std", - "bevy_platform/std", - "scoped_static_storage/std", - "keyed_concurrent_queue/std", + "bevy_app/std", + "bevy_ecs/std", + "bevy_platform/std", + "scoped_static_storage/std", + "keyed_concurrent_queue/std", ] [dependencies] @@ -37,8 +37,8 @@ workspace = true [package.metadata.docs.rs] rustdoc-args = [ - "-Zunstable-options", - "--generate-link-to-definition", - "--generate-macro-expansion", + "-Zunstable-options", + "--generate-link-to-definition", + "--generate-macro-expansion", ] -all-features = true \ No newline at end of file +all-features = true diff --git a/crates/bevy_async/src/wake_signal.rs b/crates/bevy_async/src/wake_signal.rs index 70d64f0f6126b..0cd50bb579115 100644 --- a/crates/bevy_async/src/wake_signal.rs +++ b/crates/bevy_async/src/wake_signal.rs @@ -1,6 +1,6 @@ use bevy_platform::sync::{Arc, Mutex}; -/// [`WakeSignaler`] is a custom signaling primitive used in order to fufill our specific requirements for +/// [`WakeSignaler`] is a custom signaling primitive used in order to fulfill our specific requirements for /// our async bridge. We need to wait at the sync point, after waking all the futures and only when /// all the futures have had a chance to run we stop waiting. /// We need this signaling to occur also if the future is dropped, or if the future panics diff --git a/examples/README.md b/examples/README.md index d52964858c231..07a0378328a64 100644 --- a/examples/README.md +++ b/examples/README.md @@ -276,6 +276,7 @@ Example | Description Example | Description --- | --- +[Async Bridge](../examples/async_tasks/async_bridge_primitive.rs) | An example showing how to offload work to background async tasks using the AsyncBridge primitive. [Async Channel Pattern](../examples/async_tasks/async_channel_pattern.rs) | An example showing how to offload work to background async tasks using channels for communication. [Async Compute](../examples/async_tasks/async_compute.rs) | How to use `AsyncComputeTaskPool` to complete longer running tasks [External Source of Data on an External Thread](../examples/async_tasks/external_source_external_thread.rs) | How to use an external thread to run an infinite task and communicate with a channel From 89e2f61c1aea035d956984eb9939125a54f6adb9 Mon Sep 17 00:00:00 2001 From: Malek Date: Sun, 29 Mar 2026 16:58:20 -0400 Subject: [PATCH 05/19] change the example to not compile on wasm ( for now ) --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 44c29632a6a04..1b7309c700c29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2147,7 +2147,7 @@ wasm = false name = "Async Bridge" description = "An example showing how to offload work to background async tasks using the AsyncBridge primitive." category = "Async Tasks" -wasm = true +wasm = false [[example]] name = "async_channel_pattern" From 1b78b9b6f1390fb59f8d1aa4e6dc422a204809d3 Mon Sep 17 00:00:00 2001 From: Malek Date: Sun, 29 Mar 2026 17:58:53 -0400 Subject: [PATCH 06/19] fix no_std support to make sure it compiles. --- crates/bevy_async/src/bridge_future.rs | 6 ++ crates/bevy_async/src/bridge_request.rs | 13 +++- crates/bevy_async/src/lib.rs | 91 +++++++++++++++++++++++++ crates/bevy_async/src/wake_signal.rs | 2 +- 4 files changed, 110 insertions(+), 2 deletions(-) diff --git a/crates/bevy_async/src/bridge_future.rs b/crates/bevy_async/src/bridge_future.rs index 7ef67b606aa80..1ae05ae205e0a 100644 --- a/crates/bevy_async/src/bridge_future.rs +++ b/crates/bevy_async/src/bridge_future.rs @@ -55,7 +55,13 @@ impl AsyncSystemState

{ Self { _p: PhantomData, world, + #[cfg(feature = "std")] system_state: Arc::new(SystemStateCell::

::default()), + #[cfg(not(feature = "std"))] + system_state: Arc::from( + bevy_platform::prelude::Box::new(SystemStateCell::

::default()) + as bevy_platform::prelude::Box, + ), } } diff --git a/crates/bevy_async/src/bridge_request.rs b/crates/bevy_async/src/bridge_request.rs index ff20af466d7ef..606686df7943a 100644 --- a/crates/bevy_async/src/bridge_request.rs +++ b/crates/bevy_async/src/bridge_request.rs @@ -106,6 +106,17 @@ impl AsyncWorldInner { } } +/// We need to notify all our Wakers that have queued that we've dropped so they can error +impl Drop for AsyncWorldInner { + fn drop(&mut self) { + for bridge_requests in self.bridge_requests.inner().read().unwrap().values() { + while let Ok(request) = bridge_requests.pop() { + request.waker.wake(); + } + } + } +} + /// Whether a tick attempt made any progress. #[derive(PartialEq)] enum TickResult { @@ -117,7 +128,7 @@ enum TickResult { /// A queued access request bridging an async task into ECS. pub(crate) struct BridgeRequest { - /// Waker for the async future ([`crate::bridge_future::BridgeFuture`]) that wants ECS access. + /// Waker for the async future (`crate::bridge_future::BridgeFuture`) that wants ECS access. /// When the `SyncPoint` is driven, this waker is fired so the future can /// poll while `world_scope` exposes the current `World`. pub(crate) waker: core::task::Waker, diff --git a/crates/bevy_async/src/lib.rs b/crates/bevy_async/src/lib.rs index b21e4c139139a..6f2631926b5e7 100644 --- a/crates/bevy_async/src/lib.rs +++ b/crates/bevy_async/src/lib.rs @@ -94,3 +94,94 @@ pub mod prelude { BridgeError, }; } + +#[cfg(test)] +mod tests { + use crate::prelude::*; + use bevy_app::prelude::*; + use bevy_app::ScheduleRunnerPlugin; + use bevy_ecs::prelude::*; + use bevy_platform::sync::atomic::AtomicBool; + use bevy_platform::sync::atomic::Ordering; + use bevy_tasks::AsyncComputeTaskPool; + + /// This tests that if a world is dropped we return an error from attempting to run it and + /// that everything cleans up nicely + /// Because of a quirk of how bevy's task pools work we have to always have at least one + /// active world for anything to progress on them. + /// That's what `other_app` is for. + #[test] + fn dropped_world() { + struct MySyncPoint; + static WORLD_WAS_DROPPED: AtomicBool = AtomicBool::new(false); + let mut other_app = App::new(); + other_app.add_plugins((TaskPoolPlugin::default(), ScheduleRunnerPlugin::default())); + let mut app = App::new(); + app.add_plugins(( + AsyncPlugin::default(), + ScheduleRunnerPlugin::default(), + TaskPoolPlugin::default(), + )); + + app.add_systems(Startup, move |world: Res| { + let world = world.clone(); + AsyncComputeTaskPool::get() + .spawn(async move { + let system_state = world.system_state::(); + match system_state + .bridge(MySyncPoint, |mut commands: Commands| { + commands.spawn_empty(); + }) + .await + { + Err(BridgeError::WorldDropped) => { + WORLD_WAS_DROPPED.store(true, Ordering::Relaxed); + } + _ => unreachable!("World should have Dropped"), + } + }) + .detach(); + }); + app.update(); + drop(app); + other_app.update(); + assert!(WORLD_WAS_DROPPED.load(Ordering::Relaxed)); + } + + #[test] + fn invalid_parameters() { + struct MySyncPoint; + static FAILED_VALIDATION: AtomicBool = AtomicBool::new(false); + + #[derive(Resource)] + struct MyResource; + + let mut app = App::new(); + app.add_plugins(( + AsyncPlugin::default(), + ScheduleRunnerPlugin::default(), + TaskPoolPlugin::default(), + )); + + app.add_systems(Update, async_world_sync_point::); + + app.add_systems(Startup, move |world: Res| { + let world = world.clone(); + AsyncComputeTaskPool::get() + .spawn(async move { + let system_state = world.system_state::>(); + match system_state.bridge(MySyncPoint, |_| unreachable!()).await { + Err(BridgeError::SystemParamValidation(_)) => { + FAILED_VALIDATION.store(true, Ordering::Relaxed); + } + _ => unreachable!("Parameter validation should have failed"), + } + }) + .detach(); + }); + + app.update(); + + assert!(FAILED_VALIDATION.load(Ordering::Relaxed)); + } +} diff --git a/crates/bevy_async/src/wake_signal.rs b/crates/bevy_async/src/wake_signal.rs index 0cd50bb579115..fbdbebc304f5f 100644 --- a/crates/bevy_async/src/wake_signal.rs +++ b/crates/bevy_async/src/wake_signal.rs @@ -1,3 +1,4 @@ +#[cfg(feature = "std")] use bevy_platform::sync::{Arc, Mutex}; /// [`WakeSignaler`] is a custom signaling primitive used in order to fulfill our specific requirements for @@ -28,7 +29,6 @@ pub(crate) fn pair() -> (WakeSignaler, WakeWaiter) { impl WakeWaiter { /// Waits until another cloned instance of [`WakeSignaler`] has been dropped. /// If any cloned instance of [`WakeSignaler`] is dropped then this wait stops waiting. - #[cfg(feature = "std")] #[inline] pub(crate) fn wait(&self) { #[cfg(feature = "std")] From fb71499238acdc58333e32eab84e4d999b0582ae Mon Sep 17 00:00:00 2001 From: Malek Date: Sun, 29 Mar 2026 18:08:15 -0400 Subject: [PATCH 07/19] added a unit test for no_std --- crates/bevy_async/src/lib.rs | 43 ++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/crates/bevy_async/src/lib.rs b/crates/bevy_async/src/lib.rs index 6f2631926b5e7..818419e8a0bea 100644 --- a/crates/bevy_async/src/lib.rs +++ b/crates/bevy_async/src/lib.rs @@ -184,4 +184,47 @@ mod tests { assert!(FAILED_VALIDATION.load(Ordering::Relaxed)); } + + #[test] + #[cfg(not(feature = "std"))] + fn no_std_test() { + use crate::prelude::*; + use bevy_app::prelude::*; + use bevy_app::ScheduleRunnerPlugin; + use bevy_ecs::prelude::*; + use bevy_platform::sync::atomic::AtomicBool; + use bevy_platform::sync::atomic::Ordering; + use bevy_tasks::AsyncComputeTaskPool; + + struct MySyncPoint; + static ACCESS_RAN: AtomicBool = AtomicBool::new(false); + let mut app = App::new(); + app.add_plugins(( + AsyncPlugin::default(), + ScheduleRunnerPlugin::default(), + TaskPoolPlugin::default(), + )); + + app.add_systems(Update, async_world_sync_point::); + + app.add_systems(Startup, move |world: Res| { + let world = world.clone(); + AsyncComputeTaskPool::get() + .spawn_local(async move { + let system_state = world.system_state::(); + system_state + .bridge(MySyncPoint, |mut commands: Commands| { + commands.spawn_empty(); + ACCESS_RAN.store(true, Ordering::Relaxed); + }) + .await + .unwrap(); + }) + .detach(); + }); + + app.update(); + + assert!(ACCESS_RAN.load(Ordering::Relaxed)); + } } From c7806eb36378d1b9edf2fef99f78a773c4157522 Mon Sep 17 00:00:00 2001 From: andriyDev Date: Mon, 1 Jun 2026 11:02:32 -0700 Subject: [PATCH 08/19] Fix nits for the async_bridge_primitive example. --- .../async_tasks/async_bridge_primitive.rs | 133 +++++++++--------- 1 file changed, 66 insertions(+), 67 deletions(-) diff --git a/examples/async_tasks/async_bridge_primitive.rs b/examples/async_tasks/async_bridge_primitive.rs index 6da9371843f1c..1d01fd16201c5 100644 --- a/examples/async_tasks/async_bridge_primitive.rs +++ b/examples/async_tasks/async_bridge_primitive.rs @@ -3,7 +3,15 @@ //! asynchronously. //! //! Unlike the channel-based approach (where tasks send results directly via a communication -//! channel) or the direct approach in `async_compute`, this example uses the ecs <-> async bridge. +//! channel) or the direct approach in the `async_compute` example, this example uses the +//! ecs<->async bridge. +//! +//! This approach allows for arbitrary ECS mutations throughout the async task, as well as awaiting +//! changes (which allows for ECS mutations to happen concurrently with other async operations). +//! Both the channel-based approach and the direct approach involve bespoke systems to handle task +//! communication (polling the channel, or polling for task completion) and therefore cannot perform +//! arbitrary ECS mutations unless explicitly implemented. These options also cannot await ECS +//! operations. use bevy::async_bridge::prelude::{async_world_sync_point, AsyncWorld}; use bevy::prelude::*; @@ -14,55 +22,29 @@ struct MySyncPoint; fn main() { App::new() .add_plugins(DefaultPlugins) - .add_systems(Startup, setup) - .add_systems( - Update, - (async_world_sync_point::, rotate_light), - ) + .add_systems(Startup, (spawn_cube_tasks, setup_scene)) + .add_systems(Update, async_world_sync_point::) .run(); } // Number of cubes to spawn across the x, y, and z axis const NUM_CUBES: i32 = 6; -const LIGHT_RADIUS: f32 = 8.0; - /// This system generates tasks simulating computationally intensive /// work that potentially spans multiple frames/ticks. /// /// The task is offloaded to the `AsyncComputeTaskPool`, allowing heavy computation /// to be handled asynchronously, without blocking the main game thread. -fn setup( - mut commands: Commands, - async_world: Res, - mut meshes: ResMut>, - mut materials: ResMut>, -) { - commands.spawn(( - Mesh3d(meshes.add(Circle::new(1.618 * NUM_CUBES as f32))), - MeshMaterial3d(materials.add(Color::WHITE)), - Transform::from_rotation(Quat::from_rotation_x(-std::f32::consts::FRAC_PI_2)), - )); - - // Spawn a point light with shadows enabled - commands.spawn(( - PointLight { - shadow_maps_enabled: true, - ..default() - }, - Transform::from_xyz(0.0, LIGHT_RADIUS, 4.0), - )); - - // Spawn a camera looking at the origin - commands.spawn(( - Camera3d::default(), - Transform::from_xyz(-6.5, 5.5, 12.0).looking_at(Vec3::ZERO, Vec3::Y), - )); - +fn spawn_cube_tasks(async_world: Res) { let pool = bevy::tasks::AsyncComputeTaskPool::get(); - // Reuse tasks so you don't have to pay the system init cost every time it runs. - let task = async_world.system_state::<( + // Create a system state that is shared across all our tasks. SystemParams with local state + // (e.g., `Local`, `Changed` QueryFilter) will be reused between async tasks that bridge with + // this state. For example, if you have two tasks reusing a query with `Changed`, + // the second task that runs will only see changes the occurred between the two tasks. + // Generally, you should reuse the system state for similar tasks. In this example, we only + // spawn tasks in this system, so we don't need to cache this for reuse later. + let system_state = async_world.system_state::<( Commands, Local, Handle)>>, ResMut>, @@ -71,47 +53,64 @@ fn setup( for x in -NUM_CUBES..NUM_CUBES { for z in -NUM_CUBES..NUM_CUBES { // Spawn a task on the async compute pool - let task = task.clone(); + let system_state = system_state.clone(); pool.spawn(async move { let delay = std::time::Duration::from_secs_f32(rand::rng().random_range(2.0..8.0)); // Simulate a delay before task completion futures_timer::Delay::new(delay).await; - task.bridge( - MySyncPoint, - |(mut commands, mut box_handles, mut meshes, mut materials)| { - // The first time this bridge runs it will initialize the box mesh and box material, and then it will reuse them from then on. - if box_handles.is_none() { - box_handles.replace(( - meshes.add(Cuboid::new(0.25, 0.25, 0.25)), - materials.add(Color::srgb(1.0, 0.2, 0.3)), - )); - } + system_state + .bridge( + MySyncPoint, + |(mut commands, mut box_handles, mut meshes, mut materials)| { + // The first time this bridge runs it will initialize the box mesh and box material, and then it will reuse them from then on. + if box_handles.is_none() { + box_handles.replace(( + meshes.add(Cuboid::new(0.25, 0.25, 0.25)), + materials.add(Color::srgb(1.0, 0.2, 0.3)), + )); + } - let (box_mesh, box_material) = box_handles.clone().unwrap(); + let (box_mesh, box_material) = box_handles.clone().unwrap(); - commands.spawn(( - Mesh3d(box_mesh), - MeshMaterial3d(box_material), - Transform::from_xyz(x as f32, 0.5, z as f32), - )); - }, - ) - .await - .unwrap(); + commands.spawn(( + Mesh3d(box_mesh), + MeshMaterial3d(box_material), + Transform::from_xyz(x as f32, 0.5, z as f32), + )); + }, + ) + .await + .unwrap(); }) .detach(); } } } -/// Rotates the point light around the origin (0, 0, 0) -fn rotate_light(mut query: Query<&mut Transform, With>, time: Res

{ } impl AsyncSystemState

{ - /// Create a new `AsyncSystemState` from an `AsyncWorld` matching the Api surface of - /// `SystemState` with `World`. - pub fn new(world: AsyncWorld) -> Self { + /// Create a new system state from an [`AsyncWorld`] matching the API surface of [`SystemState`] + /// with [`World`]. + /// + /// [`SystemState`]: bevy_ecs::system::SystemState + /// [`World`]: bevy_ecs::world::World + pub(crate) fn new(world: AsyncWorld) -> Self { Self { _p: PhantomData, world, @@ -70,15 +73,19 @@ impl AsyncSystemState

{ /// here are able to take in `&` and `&mut` variables from the surrounding context unlike /// standard Bevy systems. /// - /// We bridge *at* the `_sync_point` `SyncPoint` with our `bridge_fn`. + /// We bridge *at* the `sync_point` `SyncPoint` with our `bridge_fn`. pub async fn bridge( &self, - _sync_point: SyncPoint, + sync_point: SyncPoint, bridge_fn: BridgeFn, ) -> Result where for<'w, 's> BridgeFn: FnOnce(P::Item<'w, 's>) -> Out, { + // We only need the type of `sync_point`, not the actual value, so this value goes unused. + // Dropping it explicitly avoids needing to name it `_sync_point` which makes the API less + // clear. + drop(sync_point); BridgeFuture { _p: PhantomData, system_set: bridge_request::async_world_sync_point:: From 67a29976fa24571327c6aa2d01419a7be5265751 Mon Sep 17 00:00:00 2001 From: andriyDev Date: Mon, 1 Jun 2026 17:00:35 -0700 Subject: [PATCH 11/19] Improve docs on bridge_request. --- crates/bevy_async/src/bridge_request.rs | 35 ++++++++++++++++--------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/crates/bevy_async/src/bridge_request.rs b/crates/bevy_async/src/bridge_request.rs index 606686df7943a..bcfd68e58afc0 100644 --- a/crates/bevy_async/src/bridge_request.rs +++ b/crates/bevy_async/src/bridge_request.rs @@ -4,37 +4,48 @@ use bevy_ecs::prelude::{IntoSystemSet, SystemSet, World}; use bevy_ecs::schedule::InternedSystemSet; use bevy_platform::sync::Arc; -/// Drives the queued bridge work for `SyncPoint`. +/// An exclusive system that drives the queued bridge work for `SyncPoint`. /// /// Every queued bridge request is guaranteed to be *woken*. That wake guarantees the corresponding /// async future gets a chance to poll. /// It does *not* however guarantee the poll will finish its ECS work, because that -/// poll may still fail to finish it's work for a *variety* of reasons, i.e. it is unable to acquire -/// the typed `SystemState` lock and returns `Poll::Pending`. +/// poll may still fail to finish its work for a *variety* of reasons, i.e. it is unable to acquire +/// the typed [`SystemState`] lock and returns [`Poll::Pending`]. /// -/// For [`bevy_tasks::TaskPool::spawn_local`] we *are* actually guaranteed that the poll will finish -/// it's ECS work, because it's single threaded, so you can use `spawn_local` if you want +/// For [`TaskPool::spawn_local`] we *are* actually guaranteed that the poll will finish +/// its ECS work, because it's single threaded, so you can use [`TaskPool::spawn_local`] if you want /// determinism. /// -/// This function attempts to tick queued work several times, up to `MaxAsyncTicksPerSyncPoint`. -/// If one internal tick finds no work, we opportunistically tick the local global task pool and -/// try once more before returning early. +/// This function attempts to tick queued work several times, up to [`AsyncTickBudget`]. If one +/// internal tick finds no work, we opportunistically tick the local global task pool and try once +/// more before returning early. /// /// We tick queued work multiple times for two reasons. The first is that serial `.await` calls -/// should try to all be completed within the same `SyncPoint` such as +/// should try to all be completed within the same `SyncPoint` such as: +/// /// ```rust,ignore -/// let health = task_1.run(|health: Single<&Health, With>| { +/// let health = task.run(|health: Single<&Health, With>| { /// health.0 /// }).await; /// if health == 0 { /// return; /// } -/// task_1.run(|commands: Commands| { +/// task.run(|commands: Commands| { /// commands.trigger(PlayerDoesAttack); /// }).await; /// ``` -/// The second reason is spoken of prior. Poll may fail to finish for a variety of reasons and +/// +/// The second reason was mentioned earlier: poll may fail to finish for a variety of reasons and /// should be given several chances before giving up. +/// +/// The `SyncPoint` is an arbitrary, user-defined type that acts as a "label" for a sync point. +/// Async tasks must use the same type in [`AsyncSystemState::bridge`] to use this sync point. This +/// allows bridging to different points in the ECS schedule so the actions apply at correct points. +/// +/// [`SystemState`]: bevy_ecs::system::SystemState +/// [`Poll::Pending`]: core::task::Poll::Pending +/// [`TaskPool::spawn_local`]: bevy_tasks::TaskPool::spawn_local +/// [`AsyncSystemState::bridge`]: crate::AsyncSystemState::bridge pub fn async_world_sync_point(world: &mut World) { // Derive the stable interned system-set key used to look up requests queued // for this exact sync point type. From c80a50d000b9a13b0c63ef857760547dd018988e Mon Sep 17 00:00:00 2001 From: andriyDev Date: Fri, 5 Jun 2026 11:19:21 -0700 Subject: [PATCH 12/19] Fix comment referencing std behavior in a no_std block! --- crates/bevy_async/src/wake_signal.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_async/src/wake_signal.rs b/crates/bevy_async/src/wake_signal.rs index fbdbebc304f5f..b32dbcb80232c 100644 --- a/crates/bevy_async/src/wake_signal.rs +++ b/crates/bevy_async/src/wake_signal.rs @@ -41,7 +41,7 @@ impl WakeWaiter { } #[cfg(not(feature = "std"))] { - // No-op on std, since we are only using local futures we should tick them + // No-op on no_std, since we are only using local futures we should tick them // prior to reaching this point. return; } From c2dd072cf703445bb72745e578b2e7ac7b9e2204 Mon Sep 17 00:00:00 2001 From: andriyDev Date: Fri, 5 Jun 2026 11:22:16 -0700 Subject: [PATCH 13/19] Code-format `AsyncBridge` in the example description. --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 8a459ad919313..7ea04591feaa0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2219,7 +2219,7 @@ wasm = false [package.metadata.example.async_bridge] name = "Async Bridge" -description = "An example showing how to offload work to background async tasks using the AsyncBridge primitive." +description = "An example showing how to offload work to background async tasks using the `AsyncBridge` primitive." category = "Async Tasks" wasm = false From cf8026fc23347e146494fa1edfc51e74e0628768 Mon Sep 17 00:00:00 2001 From: andriyDev Date: Fri, 5 Jun 2026 11:23:35 -0700 Subject: [PATCH 14/19] Remove unused wasm key on [[example]]. --- Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 7ea04591feaa0..cf7dc3836b16c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2215,7 +2215,6 @@ wasm = false name = "async_bridge" path = "examples/async_tasks/async_bridge_primitive.rs" doc-scrape-examples = true -wasm = false [package.metadata.example.async_bridge] name = "Async Bridge" From 51ac125c250625e54a51eb114ced4c3fb6cfbb80 Mon Sep 17 00:00:00 2001 From: andriyDev Date: Mon, 1 Jun 2026 18:05:15 -0700 Subject: [PATCH 15/19] Add a test to show that async access won't block the ECS **after** the access. --- crates/bevy_async/src/lib.rs | 51 ++++++++++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/crates/bevy_async/src/lib.rs b/crates/bevy_async/src/lib.rs index eb3dfec602247..50940b030dc9a 100644 --- a/crates/bevy_async/src/lib.rs +++ b/crates/bevy_async/src/lib.rs @@ -101,8 +101,7 @@ mod tests { use bevy_app::prelude::*; use bevy_app::ScheduleRunnerPlugin; use bevy_ecs::prelude::*; - use bevy_platform::sync::atomic::AtomicBool; - use bevy_platform::sync::atomic::Ordering; + use bevy_platform::sync::atomic::{AtomicBool, Ordering}; use bevy_tasks::AsyncComputeTaskPool; /// This tests that if a world is dropped we return an error from attempting to run it and @@ -148,6 +147,54 @@ mod tests { assert!(WORLD_WAS_DROPPED.load(Ordering::Relaxed)); } + bevy_tasks::cfg::multi_threaded! { + #[test] + fn ecs_then_stuck() { + use bevy_platform::sync::{Arc, Mutex}; + + // We want to make sure that the implementation here doesn't block the ECS thread. So we + // spawn a task that does some ECS work and then immediately blocks (not awaits). Since + // this test blocks a thread, we cannot run this test unless we are multi_threaded. + + struct MySyncPoint; + + let mut app = App::new(); + app.add_plugins(( + AsyncPlugin, + ScheduleRunnerPlugin::default(), + TaskPoolPlugin::default(), + )); + + let mutex = Arc::new(Mutex::new(())); + + let mutex_clone = mutex.clone(); + app.add_systems(Startup, move |world: Res| { + let system_state = world.system_state::(); + + let mutex_clone = mutex_clone.clone(); + AsyncComputeTaskPool::get() + .spawn(async move { + system_state + .bridge(MySyncPoint, |mut commands| { + commands.spawn_empty(); + }) + .await + .unwrap(); + let _guard = mutex_clone.lock().unwrap(); + }) + .detach(); + }) + .add_systems(Update, async_world_sync_point::); + + // Lock the guard while we update - this makes it impossible to get the lock, and + // therefore, makes the task stuck. That's fine as long as this happens in another + // thread. + let _guard = mutex.lock().unwrap(); + + app.update(); + } + } + #[test] fn invalid_parameters() { struct MySyncPoint; From daac94684861c654eb847700df5956274df25d48 Mon Sep 17 00:00:00 2001 From: andriyDev Date: Mon, 1 Jun 2026 19:26:29 -0700 Subject: [PATCH 16/19] Add a test to show that different sync points will progress different tasks. --- crates/bevy_async/src/lib.rs | 48 ++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/crates/bevy_async/src/lib.rs b/crates/bevy_async/src/lib.rs index 50940b030dc9a..c457624ade68f 100644 --- a/crates/bevy_async/src/lib.rs +++ b/crates/bevy_async/src/lib.rs @@ -102,8 +102,56 @@ mod tests { use bevy_app::ScheduleRunnerPlugin; use bevy_ecs::prelude::*; use bevy_platform::sync::atomic::{AtomicBool, Ordering}; + use bevy_tasks::futures::check_ready; use bevy_tasks::AsyncComputeTaskPool; + #[test] + fn different_sync_points_allow_different_tasks() { + struct Sync1; + struct Sync2; + + let mut app = App::new(); + app.add_plugins(( + AsyncPlugin::default(), + ScheduleRunnerPlugin::default(), + TaskPoolPlugin::default(), + )); + + let system_state = app + .world() + .resource::() + .system_state::(); + + let system_state_clone = system_state.clone(); + let mut task_1 = AsyncComputeTaskPool::get().spawn(async move { + system_state_clone + .bridge(Sync1, |_: Commands| {}) + .await + .unwrap(); + 1 + }); + let mut task_2 = AsyncComputeTaskPool::get().spawn(async move { + system_state.bridge(Sync2, |_: Commands| {}).await.unwrap(); + 2 + }); + + assert!(check_ready(&mut task_1).is_none()); + assert!(check_ready(&mut task_2).is_none()); + + app.world_mut() + .run_system_cached(async_world_sync_point::) + .unwrap(); + + assert_eq!(check_ready(&mut task_1).unwrap(), 1); + assert!(check_ready(&mut task_2).is_none()); + + app.world_mut() + .run_system_cached(async_world_sync_point::) + .unwrap(); + + assert_eq!(check_ready(&mut task_2).unwrap(), 2); + } + /// This tests that if a world is dropped we return an error from attempting to run it and /// that everything cleans up nicely /// Because of a quirk of how bevy's task pools work we have to always have at least one From f09a7a979f12becc989109e009ec147324079419 Mon Sep 17 00:00:00 2001 From: andriyDev Date: Mon, 1 Jun 2026 22:20:12 -0700 Subject: [PATCH 17/19] Add a test to show that we can spawn many more tasks than threads. --- crates/bevy_async/src/lib.rs | 125 ++++++++++++++++++++++++++++++++++- 1 file changed, 123 insertions(+), 2 deletions(-) diff --git a/crates/bevy_async/src/lib.rs b/crates/bevy_async/src/lib.rs index c457624ade68f..2c59e62a36940 100644 --- a/crates/bevy_async/src/lib.rs +++ b/crates/bevy_async/src/lib.rs @@ -63,7 +63,6 @@ //! If `SystemState`s are invalid, they can't be used and the future cannot complete //! //! Regardless, the future returns Ready(Err) and completes permanently -#![forbid(unsafe_code)] #![cfg_attr(docsrs, feature(doc_cfg))] #![doc( html_logo_url = "https://!bevy.org/assets/icon.png", @@ -74,10 +73,16 @@ #[cfg(feature = "std")] extern crate std; +// Forbid unsafe_code in every module except the tests, which need some unsafe for Future Pins. +#[forbid(unsafe_code)] mod bridge_future; +#[forbid(unsafe_code)] mod bridge_request; +#[forbid(unsafe_code)] mod plugin; +#[forbid(unsafe_code)] mod system_state; +#[forbid(unsafe_code)] mod wake_signal; pub use crate::bridge_future::{AsyncSystemState, BridgeError}; @@ -97,14 +102,130 @@ pub mod prelude { #[cfg(test)] mod tests { + extern crate alloc; + + use core::pin::Pin; + + use alloc::{sync::Arc, vec::Vec}; + use crate::prelude::*; use bevy_app::prelude::*; use bevy_app::ScheduleRunnerPlugin; use bevy_ecs::prelude::*; - use bevy_platform::sync::atomic::{AtomicBool, Ordering}; + use bevy_platform::sync::{ + atomic::{AtomicBool, Ordering}, + Mutex, + }; use bevy_tasks::futures::check_ready; use bevy_tasks::AsyncComputeTaskPool; + /// A future wrapper around `F` that **first** polls the `F` future, then increments `counter`. + /// + /// This allows tests to wait until async bridge futures are polled once (and therefore are + /// waiting for ECS access) before allowing ECS access, which we can then track since the ECS + /// cannot proceed until all async tasks have moved on. + struct PollThenCount { + future: F, + counted: bool, + counter: Arc>, + } + + impl Future for PollThenCount { + type Output = F::Output; + + fn poll( + self: Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll { + #[expect( + unsafe_code, + reason = "we need to access all fields independently to update the future's state" + )] + // SAFETY: We don't move out of `this` - we just create a pin to the future (which + // we poll), then assign to `counted` and update `counter`. + let this = unsafe { self.get_unchecked_mut() }; + #[expect(unsafe_code, reason = "we need to poll the future for !Unpin types")] + // SAFETY: We never move this.future, so it is pinned in place, so this pin is + // valid. + let result = unsafe { Pin::new_unchecked(&mut this.future) }.poll(cx); + if !this.counted { + this.counted = true; + *this.counter.lock().unwrap() += 1; + } + result + } + } + + #[test] + fn more_tasks_than_threads() { + struct MySyncPoint; + + let mut app = App::new(); + app.add_plugins(( + AsyncPlugin::default(), + ScheduleRunnerPlugin::default(), + TaskPoolPlugin::default(), + )) + .insert_resource(AsyncTickBudget(3)) + .add_systems(Update, async_world_sync_point::); + + let system_state = app + .world() + .resource::() + .system_state::(); + + let task_pool = AsyncComputeTaskPool::get(); + let desired_tasks = task_pool.thread_num() * 10; + + let barrier_counter = Arc::new(Mutex::new(0)); + let mut tasks = Vec::new(); + for _ in 0..desired_tasks { + let barrier_counter = barrier_counter.clone(); + let system_state = system_state.clone(); + tasks.push(task_pool.spawn(async move { + let future = system_state.bridge(MySyncPoint, |_: Commands| {}); + PollThenCount { + future, + counted: false, + counter: barrier_counter, + } + .await + .unwrap() + })); + } + + // Spinloop until all the tasks are waiting for ECS access. + while *barrier_counter.lock().unwrap() != desired_tasks { + // If we're configured to be single-threaded, tick the task pools. + bevy_tasks::cfg::multi_threaded! { + if {} else { + bevy_tasks::tick_global_task_pools_on_main_thread(); + } + } + } + + // Clear the barrier counters. + *barrier_counter.lock().unwrap() = 0; + + app.update(); + + 'outer: { + for _ in 0..10000 { + bevy_tasks::cfg::multi_threaded! { + if {} else { + bevy_tasks::tick_global_task_pools_on_main_thread(); + } + } + tasks.retain_mut(|task| check_ready(task).is_none()); + if tasks.is_empty() { + break 'outer; + } + } + + panic!("Ran out of iterations waiting for tasks to complete"); + } + } + #[test] fn different_sync_points_allow_different_tasks() { struct Sync1; From b7be14be0c11e3329272da803099eb6db0323f1a Mon Sep 17 00:00:00 2001 From: andriyDev Date: Fri, 5 Jun 2026 11:12:21 -0700 Subject: [PATCH 18/19] Rewrite the docs for bevy_async to be usage-focused. --- crates/bevy_async/src/lib.rs | 98 ++++++++++++++++-------------------- 1 file changed, 42 insertions(+), 56 deletions(-) diff --git a/crates/bevy_async/src/lib.rs b/crates/bevy_async/src/lib.rs index 2c59e62a36940..381652bfdef0d 100644 --- a/crates/bevy_async/src/lib.rs +++ b/crates/bevy_async/src/lib.rs @@ -1,68 +1,54 @@ -//! The objective here is to coordinate two participants that want to share World access: +//! `bevy_async` allows async tasks to synchronize with the main Bevy schedule, allowing futures to +//! access the ECS. This crate provides a "bridge" that performs this synchronization. //! -//! - The main Bevy schedule -//! - Futures and async tasks running on other threads +//! # How does bridging occur? //! -//! This is done through the bridge primitive introduced in this crate +//! Users need to: //! +//! - Create a "sync point" type. This is a marker type to indicate which sync point will be used +//! when accessing the ECS. This should generally just be as simple as `struct MySyncPoint;`. +//! - Add an [`async_world_sync_point`] system somewhere in your schedule. For example, adding +//! `app.add_systems(Update, async_world_sync_point::);` will allow your async tasks +//! to access the ECS during the [`Update`] schedule. This system can also have ordering +//! constraints to ensure its place in the schedule. +//! - Users call [`AsyncWorld::system_state`] to create the state they need to access the ECS. This +//! state should be reused whenever possible - features like [`Local`] or [`Changed`] rely on the +//! state being preserved between usages, and queries remain cached which can be more performant. +//! [`AsyncWorld::system_state`] can be called inside or **outside** the async task. +//! - Inside the async task, call [`AsyncSystemState::bridge`] with the sync point type you'd like +//! to use and the closure to run ECS access with, and then await this future. //! -//! Invariants of this crate: +//! # Alternatives //! -//! - Normal rust safety invariants for &mut World (aliasing) -//! - At most one future has world access at a time -//! - Futures only access the world while the scoped pointer (managed by the bridge driver) is live -//! - `SystemState` is always initialized before use -//! - Deferred ops are only applied after every future finishes polling and releases world access -//! - The driver can't deadlock -//! - All futures that want world access can eventually complete (assuming fair scheduling by the async runtime) -//! - If the world is dropped, futures don't leak and eventually finish (in an error state) +//! It is possible to access the ECS **without** this crate (in limited ways). For example, you can +//! use a channel as demonstrated in the [`async_channel_pattern`] example, or you can simply +//! [`check_ready`] on the [`Task`] as demonstrated in the [`async_compute`] example. //! +//! ## Advantages to using this crate //! -//! The protocol: +//! This crate: //! -//! Futures (tasks on worker threads) -//! - enqueue requests (create signal guard clones: one kept, one sent) +//! - Provides an out-of-the-box solution for all ECS accesses. The alternatives above require +//! manual setup (e.g., you need to create your own channel, your own systems), and requires you +//! to "hard-code" what ECS access you use (the system that provides the ECS access needs to +//! decide whether it will provide `&mut World` or only specific accesses). +//! - Allows the closure with ECS access to borrow from the async task itself. Most other solutions +//! require passing `'static` types, which prevents borrowing data from the async task. +//! - Allows you to await ECS access, allowing other futures to run concurrently, and +//! (more importantly) allow later code to happen after the ECS access. +//! - Allows you to reuse the [`AsyncSystemState`] which maintains [`SystemParam`] state, including +//! [`QueryState`]. This allows tools like [`Changed`] to work correctly across multiple ECS +//! accesses. //! -//! - Driver([`async_world_sync_point`]) (exclusive system, world-owning thread) -//! 1. Drain request queue for this sync point -//! 2. Publish World pointer (via `scoped_static_storage`). Future access scope begins -//! 3. Wake all drained futures -//! -//! -> Futures race for locks (non-blocking) -//! -//! -> Success: acquire both locks, do work, complete -//! -//! -> Failure: signal driver (Drop signal guard), re-enqueue later -//! -//! -> Direct access: non-queued future polled during scope, -//! bypasses queue, acquires locks, completes (no signal) -//! 4. Wait for all signal guards to drop + scope mutex released -//! 5. Unpublish pointer, scope ends. -//! 6. Apply any deferred ops from `SystemState` of polled futures -//! 7. Loop (up to [`AsyncTickBudget`]) or return -//! 8. Schedule resumes (normal systems run) -//! -//! -//! Dual locking: -//! -//! The published World pointer lock is managed by the `ScopedStatic` primitive in `scoped_static_storage` (only one future can lock this at a time) -//! `SystemState` locks are managed by the `SystemStateCell` primitive of this crate (futures using different `SystemState` types can work in parallel) -//! -//! -//! Preventing driver deadlocks when futures panic: -//! -//! If a future panics while holding locks, rust's panic unwinding drops destructors in reverse scope order -//! - First, the `SystemState` `MutexGuard` drops (releasing the lock) -//! - Second, the World pointer's scope `MutexGuard` drops (releasing the lock) -//! - Finally, the guard signal constructed by the future during `poll()` drops, and the driver is notified -//! -//! How futures can fail cleanly: -//! -//! If the [`AsyncWorld`] cannot be reached ([`bevy_platform::sync::Weak::upgrade`] fails during `poll()`), the world has been dropped and the future cannot complete. -//! -//! If `SystemState`s are invalid, they can't be used and the future cannot complete -//! -//! Regardless, the future returns Ready(Err) and completes permanently +//! [`Update`]: bevy_app::Update +//! [`Local`]: bevy_ecs::system::Local +//! [`Changed`]: bevy_ecs::query::Changed +//! [`async_channel_pattern`]: https://github.com/bevyengine/bevy/blob/main/examples/async_tasks/async_channel_pattern.rs +//! [`check_ready`]: bevy_tasks::futures::check_ready +//! [`Task`]: bevy_tasks::Task +//! [`async_compute`]: https://github.com/bevyengine/bevy/blob/main/examples/async_tasks/async_compute.rs +//! [`QueryState`]: bevy_ecs::query::QueryState +//! [`SystemParam`]: bevy_ecs::system::SystemParam #![cfg_attr(docsrs, feature(doc_cfg))] #![doc( html_logo_url = "https://!bevy.org/assets/icon.png", From 74fec1870fbf84e8485774c5531d83e87612b4b4 Mon Sep 17 00:00:00 2001 From: andriyDev Date: Sat, 6 Jun 2026 08:11:48 -0700 Subject: [PATCH 19/19] Revive the "protocol" doc comment as a big block comment inside of async_world_sync_point. --- crates/bevy_async/src/bridge_request.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/crates/bevy_async/src/bridge_request.rs b/crates/bevy_async/src/bridge_request.rs index bcfd68e58afc0..6ebf33f502e44 100644 --- a/crates/bevy_async/src/bridge_request.rs +++ b/crates/bevy_async/src/bridge_request.rs @@ -47,6 +47,30 @@ use bevy_platform::sync::Arc; /// [`TaskPool::spawn_local`]: bevy_tasks::TaskPool::spawn_local /// [`AsyncSystemState::bridge`]: crate::AsyncSystemState::bridge pub fn async_world_sync_point(world: &mut World) { + // The protocol: + // + // Futures (tasks on worker threads) + // - enqueue requests (create signal guard clones: one kept, one sent) + // + // - Driver(`async_world_sync_point`) (exclusive system, world-owning thread) + // 1. Drain request queue for this sync point + // 2. Publish World pointer (via `scoped_static_storage`). Future access scope begins + // 3. Wake all drained futures + // + // -> Futures race for locks (non-blocking) + // + // -> Success: acquire both locks, do work, complete + // + // -> Failure: signal driver (Drop signal guard), re-enqueue later + // + // -> Direct access: non-queued future polled during scope, + // bypasses queue, acquires locks, completes (no signal) + // 4. Wait for all signal guards to drop + scope mutex released + // 5. Unpublish pointer, scope ends. + // 6. Apply any deferred ops from `SystemState` of polled futures + // 7. Loop (up to `AsyncTickBudget`) or return + // 8. Schedule resumes (normal systems run) + // Derive the stable interned system-set key used to look up requests queued // for this exact sync point type. let sync_point = async_world_sync_point::