diff --git a/Cargo.toml b/Cargo.toml index 0c160b3aea850..cf7dc3836b16c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2211,6 +2211,17 @@ description = "How to use `AsyncComputeTaskPool` to complete longer running task category = "Async Tasks" wasm = false +[[example]] +name = "async_bridge" +path = "examples/async_tasks/async_bridge_primitive.rs" +doc-scrape-examples = true + +[package.metadata.example.async_bridge] +name = "Async Bridge" +description = "An example showing how to offload work to background async tasks using the `AsyncBridge` primitive." +category = "Async Tasks" +wasm = false + [[example]] name = "async_channel_pattern" path = "examples/async_tasks/async_channel_pattern.rs" diff --git a/crates/bevy_async/Cargo.toml b/crates/bevy_async/Cargo.toml new file mode 100644 index 0000000000000..c81b79bf253d4 --- /dev/null +++ b/crates/bevy_async/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "bevy_async" +version = "0.19.0-dev" +edition = "2024" +description = "Provides interop between the ecs and async runtimes" +homepage = "https://bevy.org" +repository = "https://github.com/bevyengine/bevy" +license = "MIT OR Apache-2.0" +keywords = ["bevy", "async"] + +[features] +default = ["std", "bevy_tasks"] + +std = [ + "bevy_app/std", + "bevy_ecs/std", + "bevy_platform/std", + "scoped_static_storage/std", + "keyed_concurrent_queue/std", +] + +[dependencies] +# bevy +bevy_app = { path = "../bevy_app", version = "0.19.0-dev", default-features = false } +bevy_ecs = { path = "../bevy_ecs", version = "0.19.0-dev", default-features = false } +bevy_ecs_macros = { path = "../bevy_ecs/macros", version = "0.19.0-dev" } +bevy_platform = { path = "../bevy_platform", version = "0.19.0-dev", default-features = false } +bevy_tasks = { path = "../bevy_tasks", version = "0.19.0-dev", default-features = false, optional = true } + +# other +scoped_static_storage = { version = "0.1.2", default-features = false } +keyed_concurrent_queue = { version = "0.1.3", default-features = false } +thiserror = { version = "2", default-features = false } + +[lints] +workspace = true + +[package.metadata.docs.rs] +rustdoc-args = [ + "-Zunstable-options", + "--generate-link-to-definition", + "--generate-macro-expansion", +] +all-features = true diff --git a/crates/bevy_async/LICENSE-APACHE b/crates/bevy_async/LICENSE-APACHE new file mode 100644 index 0000000000000..d9a10c0d8e868 --- /dev/null +++ b/crates/bevy_async/LICENSE-APACHE @@ -0,0 +1,176 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS diff --git a/crates/bevy_async/LICENSE-MIT b/crates/bevy_async/LICENSE-MIT new file mode 100644 index 0000000000000..9cf106272ac3b --- /dev/null +++ b/crates/bevy_async/LICENSE-MIT @@ -0,0 +1,19 @@ +MIT License + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/crates/bevy_async/README.md b/crates/bevy_async/README.md new file mode 100644 index 0000000000000..9be2d2082f059 --- /dev/null +++ b/crates/bevy_async/README.md @@ -0,0 +1,7 @@ +# Bevy Async + +[![License](https://img.shields.io/badge/license-MIT%2FApache-blue.svg)](https://github.com/bevyengine/bevy#license) +[![Crates.io](https://img.shields.io/crates/v/bevy_async.svg)](https://crates.io/crates/bevy_async) +[![Downloads](https://img.shields.io/crates/d/bevy_async.svg)](https://crates.io/crates/bevy_async) +[![Docs](https://docs.rs/bevy_async/badge.svg)](https://docs.rs/bevy_a11y/latest/bevy_async/) +[![Discord](https://img.shields.io/discord/691052431525675048.svg?label=&logo=discord&logoColor=ffffff&color=7389D8&labelColor=6A7EC2)](https://discord.gg/bevy) diff --git a/crates/bevy_async/src/bridge_future.rs b/crates/bevy_async/src/bridge_future.rs new file mode 100644 index 0000000000000..71a19705e3b90 --- /dev/null +++ b/crates/bevy_async/src/bridge_future.rs @@ -0,0 +1,240 @@ +use crate::bridge_request::BridgeRequest; +use crate::plugin::AsyncWorld; +use crate::system_state::{ErasedSystemStateCell, SystemStateCell}; +use crate::wake_signal::WakeSignaler; +use crate::{bridge_request, wake_signal}; +use bevy_ecs::schedule::{InternedSystemSet, IntoSystemSet, SystemSet}; +use bevy_ecs::system::SystemParam; +use bevy_platform::sync::Arc; +use core::marker::PhantomData; + +/// Handle that lets an async task request temporary access to an ECS +/// `SystemParam` or a tuple of them. +/// +/// `P` is the typed system parameter the caller eventually wants, such as: +/// - [`bevy_ecs::prelude::Commands`] +/// - [`bevy_ecs::prelude::Res`] +/// - [`bevy_ecs::prelude::Query`] +/// - tuples of params +/// +/// It is cheap to clone and intended to be passed into async tasks. +/// You can pass it into *multiple* tasks on separate threads and have them work concurrently +/// off of the same state, sharing `Locals`. +pub struct AsyncSystemState { + pub(crate) _p: PhantomData

, + + /// A `Weak` is used so tasks do not stay alive if the world is dropped. + /// If the world goes away, upgrading this weak pointer fails and access + /// returns [`BridgeError::WorldDropped`]. + pub(crate) world: AsyncWorld, + + /// Type-erased storage for the underlying `SystemState

`. + /// + /// Each `EcsAccess

` keeps reusing the same typed system state across + /// accesses so repeated operations do not rebuild it from scratch. + /// + /// This is also important not only to persist params like `Local` but *also* so `Changed` and + /// `Added` and other filters can work. + pub(crate) system_state: Arc, +} + +impl Clone for AsyncSystemState

{ + fn clone(&self) -> Self { + Self { + _p: PhantomData, + world: self.world.clone(), + system_state: self.system_state.clone(), + } + } +} + +impl AsyncSystemState

{ + /// Create a new system state from an [`AsyncWorld`] matching the API surface of [`SystemState`] + /// with [`World`]. + /// + /// [`SystemState`]: bevy_ecs::system::SystemState + /// [`World`]: bevy_ecs::world::World + pub(crate) fn new(world: AsyncWorld) -> Self { + Self { + _p: PhantomData, + world, + #[cfg(feature = "std")] + system_state: Arc::new(SystemStateCell::

::default()), + #[cfg(not(feature = "std"))] + system_state: Arc::from( + bevy_platform::prelude::Box::new(SystemStateCell::

::default()) + as bevy_platform::prelude::Box, + ), + } + } + + /// This function allows us to create a bridge between the async task we are in and the ecs + /// world we want access to, effectively running a system from an async task. The systems run + /// here are able to take in `&` and `&mut` variables from the surrounding context unlike + /// standard Bevy systems. + /// + /// We bridge *at* the `sync_point` `SyncPoint` with our `bridge_fn`. + pub async fn bridge( + &self, + sync_point: SyncPoint, + bridge_fn: BridgeFn, + ) -> Result + where + for<'w, 's> BridgeFn: FnOnce(P::Item<'w, 's>) -> Out, + { + // We only need the type of `sync_point`, not the actual value, so this value goes unused. + // Dropping it explicitly avoids needing to name it `_sync_point` which makes the API less + // clear. + drop(sync_point); + BridgeFuture { + _p: PhantomData, + system_set: bridge_request::async_world_sync_point:: + .into_system_set() + .intern(), + bridge_fn: Some(bridge_fn), + wake_signal: None, + system_state: self.system_state.clone(), + world: self.world.clone(), + } + .await + } +} + +/// If the bridge cannot run, either because the system params were invalid, or because the world it +/// was referencing no longer exists, we return this error. +#[derive(thiserror::Error, Debug)] +pub enum BridgeError { + /// The requested `SystemParam` was invalid in the current world context. + /// for example trying to access a param that fails Bevy's usual validation like a missing + /// Resource or using `Single` on something that has 0 or multiple instances. + #[error(transparent)] + SystemParamValidation(bevy_ecs::system::SystemParamValidationError), + /// The world has been dropped, so we should just return. + #[error("World no longer exists")] + WorldDropped, +} + +/// Future representing a single in-flight bridging request between our async task and our `World`. +struct BridgeFuture { + _p: PhantomData<(P, Func, Out)>, + /// Interned system-set key identifying which sync-point queue this future + /// should be sent to. + system_set: InternedSystemSet, + /// This is the pseudo-system that we try to run when we have access to `World`. + /// This is an option just so we can take it out when we run it so we can use `FnOnce` + /// instead of `FnMut`, so it's more flexible than true systems. + bridge_fn: Option, + /// Wake signal for the currently queued wake cycle, if any. + /// + /// The future drops this at the end of `poll` which acts as acknowledgement that the wake + /// has been handled. + wake_signal: Option, + system_state: Arc, + /// Weak bridge pointer so the loss of the world becomes a clean runtime error. + world: AsyncWorld, +} + +impl Unpin for BridgeFuture {} + +impl Future for BridgeFuture +where + P: SystemParam + 'static, + for<'w, 's> Func: FnOnce(P::Item<'w, 's>) -> Out, +{ + type Output = Result; + + fn poll( + mut self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll { + use core::task::Poll; + + // If we were previously woken by the sync-point driver, we will have a + // `WakeSignaler` stored here. + // + // Dropping that signal at the end of this poll acts as the + // acknowledgement that yes, this wake was observed and this task has + // attempted its run, you may release the waiting on the other side. + let _drop_at_end_of_scope = self.wake_signal.take(); + + // Try to gain a strong reference to the bridge. If this fails, the world is gone, + // so further access is impossible. + let Some(strong_world) = self.world.0.upgrade() else { + return Poll::Ready(Err(BridgeError::WorldDropped)); + }; + match strong_world + .world_scope + .try_with(|world| { + let Self { + ref system_state, + ref mut bridge_fn, + .. + } = *self; + // Attempt to acquire the typed `SystemState

`. + // + // We deliberately use `try_lock` rather than blocking. If + // another bridge request is currently using the same system + // state, we simply yield and let the sync-point driver try again + // on a later internal tick. + let Some(mut system_state) = system_state.try_lock::

(world) else { + return Poll::Pending; + }; + + if !system_state.meta().is_send() { + return Poll::Ready(Err(BridgeError::SystemParamValidation( + bevy_ecs::system::SystemParamValidationError::invalid::< + bevy_ecs::prelude::NonSend<()>, + >("Cannot have your system be non-send / exclusive"), + ))); + } + + let param = match system_state.get_mut(world) { + Ok(param) => param, + Err(system_param_validation_error) => { + return Poll::Ready(Err(BridgeError::SystemParamValidation( + system_param_validation_error, + ))) + } + }; + // We finally have `P::Item<'w, 's>`, yay!, so consume the stored `FnOnce`, run it, + // and complete the future. + Poll::Ready(Ok(bridge_fn.take().unwrap()(param))) + }) + .ok() + { + Some(out) => out, + None => { + // No world is currently exposed. That means we are being polled + // outside the `async_world_sync_point`, so we cannot access ECS yet. + // + // Instead, enqueue ourselves to be revisited when the matching + // sync-point system runs. + let (wake_signal, wake_waiter) = wake_signal::pair(); + // Store the wake_signal locally so dropping it at the end of the next + // poll acknowledges the wake. + self.wake_signal.replace(wake_signal); + // Queue the request under this future's target sync point. + // + // The queued payload carries the following! + // 1. The task's waker, so the sync-point driver can wake it. + // 2. The wake handshake signal, so the driver can wait until the wake has actually + // been processed. + // 3. An initialization hint for the typed `SystemState`. + // 4. The erased `SystemState` storage itself. + strong_world + .bridge_requests + .try_send( + &self.system_set, + BridgeRequest { + waker: cx.waker().clone(), + wake_waiter, + system_state: self.system_state.clone(), + }, + ) + .ok() + .unwrap(); + Poll::Pending + } + } + } +} diff --git a/crates/bevy_async/src/bridge_request.rs b/crates/bevy_async/src/bridge_request.rs new file mode 100644 index 0000000000000..6ebf33f502e44 --- /dev/null +++ b/crates/bevy_async/src/bridge_request.rs @@ -0,0 +1,239 @@ +use crate::plugin::{AsyncTickBudget, StrongAsyncWorld}; +use crate::system_state::ErasedSystemStateCell; +use bevy_ecs::prelude::{IntoSystemSet, SystemSet, World}; +use bevy_ecs::schedule::InternedSystemSet; +use bevy_platform::sync::Arc; + +/// An exclusive system that drives the queued bridge work for `SyncPoint`. +/// +/// Every queued bridge request is guaranteed to be *woken*. That wake guarantees the corresponding +/// async future gets a chance to poll. +/// It does *not* however guarantee the poll will finish its ECS work, because that +/// poll may still fail to finish its work for a *variety* of reasons, i.e. it is unable to acquire +/// the typed [`SystemState`] lock and returns [`Poll::Pending`]. +/// +/// For [`TaskPool::spawn_local`] we *are* actually guaranteed that the poll will finish +/// its ECS work, because it's single threaded, so you can use [`TaskPool::spawn_local`] if you want +/// determinism. +/// +/// This function attempts to tick queued work several times, up to [`AsyncTickBudget`]. If one +/// internal tick finds no work, we opportunistically tick the local global task pool and try once +/// more before returning early. +/// +/// We tick queued work multiple times for two reasons. The first is that serial `.await` calls +/// should try to all be completed within the same `SyncPoint` such as: +/// +/// ```rust,ignore +/// let health = task.run(|health: Single<&Health, With>| { +/// health.0 +/// }).await; +/// if health == 0 { +/// return; +/// } +/// task.run(|commands: Commands| { +/// commands.trigger(PlayerDoesAttack); +/// }).await; +/// ``` +/// +/// The second reason was mentioned earlier: poll may fail to finish for a variety of reasons and +/// should be given several chances before giving up. +/// +/// The `SyncPoint` is an arbitrary, user-defined type that acts as a "label" for a sync point. +/// Async tasks must use the same type in [`AsyncSystemState::bridge`] to use this sync point. This +/// allows bridging to different points in the ECS schedule so the actions apply at correct points. +/// +/// [`SystemState`]: bevy_ecs::system::SystemState +/// [`Poll::Pending`]: core::task::Poll::Pending +/// [`TaskPool::spawn_local`]: bevy_tasks::TaskPool::spawn_local +/// [`AsyncSystemState::bridge`]: crate::AsyncSystemState::bridge +pub fn async_world_sync_point(world: &mut World) { + // The protocol: + // + // Futures (tasks on worker threads) + // - enqueue requests (create signal guard clones: one kept, one sent) + // + // - Driver(`async_world_sync_point`) (exclusive system, world-owning thread) + // 1. Drain request queue for this sync point + // 2. Publish World pointer (via `scoped_static_storage`). Future access scope begins + // 3. Wake all drained futures + // + // -> Futures race for locks (non-blocking) + // + // -> Success: acquire both locks, do work, complete + // + // -> Failure: signal driver (Drop signal guard), re-enqueue later + // + // -> Direct access: non-queued future polled during scope, + // bypasses queue, acquires locks, completes (no signal) + // 4. Wait for all signal guards to drop + scope mutex released + // 5. Unpublish pointer, scope ends. + // 6. Apply any deferred ops from `SystemState` of polled futures + // 7. Loop (up to `AsyncTickBudget`) or return + // 8. Schedule resumes (normal systems run) + + // Derive the stable interned system-set key used to look up requests queued + // for this exact sync point type. + let sync_point = async_world_sync_point:: + .into_system_set() + .intern(); + let async_world = world.get_resource::().unwrap().clone(); + // Read the configured maximum number of internal attempts we are willing to + // perform during this `SyncPoint`. + let max_ticks = world.get_resource::().unwrap().0; + for _ in 0..max_ticks { + // Drive once. If no work was found, we may truly be done. + // but we should give external task pools one more opportunity to make newly-woken + // tasks runnable. + if async_world.0.tick_sync_point(sync_point, world) == TickResult::NoWork { + #[cfg(feature = "bevy_tasks")] + bevy_tasks::cfg::web! { + if {} else { + bevy_tasks::tick_global_task_pools_on_main_thread(); + } + } + // Retry once after ticking the global pool. If we are still idle, + // there is no more immediately available progress to make. + if async_world.0.tick_sync_point(sync_point, world) == TickResult::NoWork { + return; + } + } + } +} + +#[derive(Default)] +pub(crate) struct AsyncWorldInner { + pub(crate) bridge_requests: + keyed_concurrent_queue::KeyedQueues, + pub(crate) world_scope: scoped_static_storage::ScopedStatic, +} + +impl AsyncWorldInner { + /// This ticks a single sync point, requesting the poll of all tasks in that sync point. + /// None of the tasks are guaranteed to actually return `Poll::Ready`, but all are guaranteed to + /// at least do a `Poll::Pending` + /// + /// The flow of logic is the following: + /// 1. We first drain the queue for our `SyncPoint`. + /// 2. Expose our `World` through `world_scope`. + /// 3. Wake all our `BridgeFuture`s. + /// 4. Apply our `SystemState` back into the `World`. (Things like `Commands`). + fn tick_sync_point(&self, sync_point: InternedSystemSet, world: &mut World) -> TickResult { + let mut queued_requests = bevy_platform::prelude::vec![]; + while let Ok(queued_task_bridge) = self.bridge_requests.get_or_create(&sync_point).pop() { + queued_requests.push(queued_task_bridge); + } + // If no requests were waiting then report idle so the caller can decide whether to stop + // or attempt one more task-pool tick. + if queued_requests.is_empty() { + return TickResult::NoWork; + } + // Make this `World` temporarily visible to our waking futures. Wake them all and wait + // until they all have at least *attempted* to poll. + // This is contractually obligated by the contract of `.wake()`. We are guaranteed one wake + // per call to our `.wake()`. + let completed_tasks = self + .world_scope + .scope(world, || wake_requests_and_wait(queued_requests)); + for task in completed_tasks { + task.apply(world); + } + TickResult::DidWork + } +} + +/// We need to notify all our Wakers that have queued that we've dropped so they can error +impl Drop for AsyncWorldInner { + fn drop(&mut self) { + for bridge_requests in self.bridge_requests.inner().read().unwrap().values() { + while let Ok(request) = bridge_requests.pop() { + request.waker.wake(); + } + } + } +} + +/// Whether a tick attempt made any progress. +#[derive(PartialEq)] +enum TickResult { + /// We found and processed at least one queued bridge request. + DidWork, + /// There was no queued work available for the `SyncPoint`. + NoWork, +} + +/// A queued access request bridging an async task into ECS. +pub(crate) struct BridgeRequest { + /// Waker for the async future (`crate::bridge_future::BridgeFuture`) that wants ECS access. + /// When the `SyncPoint` is driven, this waker is fired so the future can + /// poll while `world_scope` exposes the current `World`. + pub(crate) waker: core::task::Waker, + /// Our custom primitive that lets us wait until all the futures have tried to run before + /// continuing. + pub(crate) wake_waiter: crate::wake_signal::WakeWaiter, + pub(crate) system_state: Arc, +} + +/// A queued bridge request whose waker has already been fired. +struct WokenBridgeRequest { + wake_signal: crate::wake_signal::WakeWaiter, + system_state: Arc, +} + +/// A request that has finished its attempted poll and may need to apply deferred world state. +struct CompletedBridgeRequest { + system_state: Arc, +} + +impl CompletedBridgeRequest { + #[inline] + fn apply(self, world: &mut World) { + self.system_state.apply(world); + } +} + +#[inline] +fn wake_requests_and_wait( + queued_requests: bevy_platform::prelude::Vec, +) -> bevy_platform::prelude::Vec { + let bridged_futures = queued_requests + .into_iter() + .map( + |BridgeRequest { + system_state, + waker, + wake_waiter: wake_signal, + .. + }| { + // Trigger the `BridgeFuture` so it can poll while `world_scope` + // is active. + waker.wake(); + WokenBridgeRequest { + system_state, + wake_signal, + } + }, + ) + // we re-collect to ensure we fully exhaust the prior iterator + // we want to have all the wakers call .wake() before the first barrier calls .wait() + .collect::>(); + + #[cfg(feature = "bevy_tasks")] + bevy_tasks::cfg::web! { + if {} else { + bevy_tasks::tick_global_task_pools_on_main_thread(); + } + } + + bridged_futures + .into_iter() + .map( + |WokenBridgeRequest { + system_state, + wake_signal, + }| { + wake_signal.wait(); + CompletedBridgeRequest { system_state } + }, + ) + .collect() +} diff --git a/crates/bevy_async/src/lib.rs b/crates/bevy_async/src/lib.rs new file mode 100644 index 0000000000000..381652bfdef0d --- /dev/null +++ b/crates/bevy_async/src/lib.rs @@ -0,0 +1,432 @@ +//! `bevy_async` allows async tasks to synchronize with the main Bevy schedule, allowing futures to +//! access the ECS. This crate provides a "bridge" that performs this synchronization. +//! +//! # How does bridging occur? +//! +//! Users need to: +//! +//! - Create a "sync point" type. This is a marker type to indicate which sync point will be used +//! when accessing the ECS. This should generally just be as simple as `struct MySyncPoint;`. +//! - Add an [`async_world_sync_point`] system somewhere in your schedule. For example, adding +//! `app.add_systems(Update, async_world_sync_point::);` will allow your async tasks +//! to access the ECS during the [`Update`] schedule. This system can also have ordering +//! constraints to ensure its place in the schedule. +//! - Users call [`AsyncWorld::system_state`] to create the state they need to access the ECS. This +//! state should be reused whenever possible - features like [`Local`] or [`Changed`] rely on the +//! state being preserved between usages, and queries remain cached which can be more performant. +//! [`AsyncWorld::system_state`] can be called inside or **outside** the async task. +//! - Inside the async task, call [`AsyncSystemState::bridge`] with the sync point type you'd like +//! to use and the closure to run ECS access with, and then await this future. +//! +//! # Alternatives +//! +//! It is possible to access the ECS **without** this crate (in limited ways). For example, you can +//! use a channel as demonstrated in the [`async_channel_pattern`] example, or you can simply +//! [`check_ready`] on the [`Task`] as demonstrated in the [`async_compute`] example. +//! +//! ## Advantages to using this crate +//! +//! This crate: +//! +//! - Provides an out-of-the-box solution for all ECS accesses. The alternatives above require +//! manual setup (e.g., you need to create your own channel, your own systems), and requires you +//! to "hard-code" what ECS access you use (the system that provides the ECS access needs to +//! decide whether it will provide `&mut World` or only specific accesses). +//! - Allows the closure with ECS access to borrow from the async task itself. Most other solutions +//! require passing `'static` types, which prevents borrowing data from the async task. +//! - Allows you to await ECS access, allowing other futures to run concurrently, and +//! (more importantly) allow later code to happen after the ECS access. +//! - Allows you to reuse the [`AsyncSystemState`] which maintains [`SystemParam`] state, including +//! [`QueryState`]. This allows tools like [`Changed`] to work correctly across multiple ECS +//! accesses. +//! +//! [`Update`]: bevy_app::Update +//! [`Local`]: bevy_ecs::system::Local +//! [`Changed`]: bevy_ecs::query::Changed +//! [`async_channel_pattern`]: https://github.com/bevyengine/bevy/blob/main/examples/async_tasks/async_channel_pattern.rs +//! [`check_ready`]: bevy_tasks::futures::check_ready +//! [`Task`]: bevy_tasks::Task +//! [`async_compute`]: https://github.com/bevyengine/bevy/blob/main/examples/async_tasks/async_compute.rs +//! [`QueryState`]: bevy_ecs::query::QueryState +//! [`SystemParam`]: bevy_ecs::system::SystemParam +#![cfg_attr(docsrs, feature(doc_cfg))] +#![doc( + html_logo_url = "https://!bevy.org/assets/icon.png", + html_favicon_url = "https://!bevy.org/assets/icon.png" +)] +#![no_std] + +#[cfg(feature = "std")] +extern crate std; + +// Forbid unsafe_code in every module except the tests, which need some unsafe for Future Pins. +#[forbid(unsafe_code)] +mod bridge_future; +#[forbid(unsafe_code)] +mod bridge_request; +#[forbid(unsafe_code)] +mod plugin; +#[forbid(unsafe_code)] +mod system_state; +#[forbid(unsafe_code)] +mod wake_signal; + +pub use crate::bridge_future::{AsyncSystemState, BridgeError}; +pub use crate::bridge_request::async_world_sync_point; +pub use crate::plugin::{AsyncPlugin, AsyncTickBudget, AsyncWorld}; + +/// The async prelude. +/// +/// This includes the most common types in this crate, re-exported for your convenience. +pub mod prelude { + #[doc(hidden)] + pub use crate::{ + async_world_sync_point, AsyncPlugin, AsyncSystemState, AsyncTickBudget, AsyncWorld, + BridgeError, + }; +} + +#[cfg(test)] +mod tests { + extern crate alloc; + + use core::pin::Pin; + + use alloc::{sync::Arc, vec::Vec}; + + use crate::prelude::*; + use bevy_app::prelude::*; + use bevy_app::ScheduleRunnerPlugin; + use bevy_ecs::prelude::*; + use bevy_platform::sync::{ + atomic::{AtomicBool, Ordering}, + Mutex, + }; + use bevy_tasks::futures::check_ready; + use bevy_tasks::AsyncComputeTaskPool; + + /// A future wrapper around `F` that **first** polls the `F` future, then increments `counter`. + /// + /// This allows tests to wait until async bridge futures are polled once (and therefore are + /// waiting for ECS access) before allowing ECS access, which we can then track since the ECS + /// cannot proceed until all async tasks have moved on. + struct PollThenCount { + future: F, + counted: bool, + counter: Arc>, + } + + impl Future for PollThenCount { + type Output = F::Output; + + fn poll( + self: Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll { + #[expect( + unsafe_code, + reason = "we need to access all fields independently to update the future's state" + )] + // SAFETY: We don't move out of `this` - we just create a pin to the future (which + // we poll), then assign to `counted` and update `counter`. + let this = unsafe { self.get_unchecked_mut() }; + #[expect(unsafe_code, reason = "we need to poll the future for !Unpin types")] + // SAFETY: We never move this.future, so it is pinned in place, so this pin is + // valid. + let result = unsafe { Pin::new_unchecked(&mut this.future) }.poll(cx); + if !this.counted { + this.counted = true; + *this.counter.lock().unwrap() += 1; + } + result + } + } + + #[test] + fn more_tasks_than_threads() { + struct MySyncPoint; + + let mut app = App::new(); + app.add_plugins(( + AsyncPlugin::default(), + ScheduleRunnerPlugin::default(), + TaskPoolPlugin::default(), + )) + .insert_resource(AsyncTickBudget(3)) + .add_systems(Update, async_world_sync_point::); + + let system_state = app + .world() + .resource::() + .system_state::(); + + let task_pool = AsyncComputeTaskPool::get(); + let desired_tasks = task_pool.thread_num() * 10; + + let barrier_counter = Arc::new(Mutex::new(0)); + let mut tasks = Vec::new(); + for _ in 0..desired_tasks { + let barrier_counter = barrier_counter.clone(); + let system_state = system_state.clone(); + tasks.push(task_pool.spawn(async move { + let future = system_state.bridge(MySyncPoint, |_: Commands| {}); + PollThenCount { + future, + counted: false, + counter: barrier_counter, + } + .await + .unwrap() + })); + } + + // Spinloop until all the tasks are waiting for ECS access. + while *barrier_counter.lock().unwrap() != desired_tasks { + // If we're configured to be single-threaded, tick the task pools. + bevy_tasks::cfg::multi_threaded! { + if {} else { + bevy_tasks::tick_global_task_pools_on_main_thread(); + } + } + } + + // Clear the barrier counters. + *barrier_counter.lock().unwrap() = 0; + + app.update(); + + 'outer: { + for _ in 0..10000 { + bevy_tasks::cfg::multi_threaded! { + if {} else { + bevy_tasks::tick_global_task_pools_on_main_thread(); + } + } + tasks.retain_mut(|task| check_ready(task).is_none()); + if tasks.is_empty() { + break 'outer; + } + } + + panic!("Ran out of iterations waiting for tasks to complete"); + } + } + + #[test] + fn different_sync_points_allow_different_tasks() { + struct Sync1; + struct Sync2; + + let mut app = App::new(); + app.add_plugins(( + AsyncPlugin::default(), + ScheduleRunnerPlugin::default(), + TaskPoolPlugin::default(), + )); + + let system_state = app + .world() + .resource::() + .system_state::(); + + let system_state_clone = system_state.clone(); + let mut task_1 = AsyncComputeTaskPool::get().spawn(async move { + system_state_clone + .bridge(Sync1, |_: Commands| {}) + .await + .unwrap(); + 1 + }); + let mut task_2 = AsyncComputeTaskPool::get().spawn(async move { + system_state.bridge(Sync2, |_: Commands| {}).await.unwrap(); + 2 + }); + + assert!(check_ready(&mut task_1).is_none()); + assert!(check_ready(&mut task_2).is_none()); + + app.world_mut() + .run_system_cached(async_world_sync_point::) + .unwrap(); + + assert_eq!(check_ready(&mut task_1).unwrap(), 1); + assert!(check_ready(&mut task_2).is_none()); + + app.world_mut() + .run_system_cached(async_world_sync_point::) + .unwrap(); + + assert_eq!(check_ready(&mut task_2).unwrap(), 2); + } + + /// This tests that if a world is dropped we return an error from attempting to run it and + /// that everything cleans up nicely + /// Because of a quirk of how bevy's task pools work we have to always have at least one + /// active world for anything to progress on them. + /// That's what `other_app` is for. + #[test] + fn dropped_world() { + struct MySyncPoint; + static WORLD_WAS_DROPPED: AtomicBool = AtomicBool::new(false); + let mut other_app = App::new(); + other_app.add_plugins((TaskPoolPlugin::default(), ScheduleRunnerPlugin::default())); + let mut app = App::new(); + app.add_plugins(( + AsyncPlugin::default(), + ScheduleRunnerPlugin::default(), + TaskPoolPlugin::default(), + )); + + app.add_systems(Startup, move |world: Res| { + let world = world.clone(); + AsyncComputeTaskPool::get() + .spawn(async move { + let system_state = world.system_state::(); + match system_state + .bridge(MySyncPoint, |mut commands: Commands| { + commands.spawn_empty(); + }) + .await + { + Err(BridgeError::WorldDropped) => { + WORLD_WAS_DROPPED.store(true, Ordering::Relaxed); + } + _ => unreachable!("World should have Dropped"), + } + }) + .detach(); + }); + app.update(); + drop(app); + other_app.update(); + assert!(WORLD_WAS_DROPPED.load(Ordering::Relaxed)); + } + + bevy_tasks::cfg::multi_threaded! { + #[test] + fn ecs_then_stuck() { + use bevy_platform::sync::{Arc, Mutex}; + + // We want to make sure that the implementation here doesn't block the ECS thread. So we + // spawn a task that does some ECS work and then immediately blocks (not awaits). Since + // this test blocks a thread, we cannot run this test unless we are multi_threaded. + + struct MySyncPoint; + + let mut app = App::new(); + app.add_plugins(( + AsyncPlugin, + ScheduleRunnerPlugin::default(), + TaskPoolPlugin::default(), + )); + + let mutex = Arc::new(Mutex::new(())); + + let mutex_clone = mutex.clone(); + app.add_systems(Startup, move |world: Res| { + let system_state = world.system_state::(); + + let mutex_clone = mutex_clone.clone(); + AsyncComputeTaskPool::get() + .spawn(async move { + system_state + .bridge(MySyncPoint, |mut commands| { + commands.spawn_empty(); + }) + .await + .unwrap(); + let _guard = mutex_clone.lock().unwrap(); + }) + .detach(); + }) + .add_systems(Update, async_world_sync_point::); + + // Lock the guard while we update - this makes it impossible to get the lock, and + // therefore, makes the task stuck. That's fine as long as this happens in another + // thread. + let _guard = mutex.lock().unwrap(); + + app.update(); + } + } + + #[test] + fn invalid_parameters() { + struct MySyncPoint; + static FAILED_VALIDATION: AtomicBool = AtomicBool::new(false); + + #[derive(Resource)] + struct MyResource; + + let mut app = App::new(); + app.add_plugins(( + AsyncPlugin, + ScheduleRunnerPlugin::default(), + TaskPoolPlugin::default(), + )); + + app.add_systems(Update, async_world_sync_point::); + + app.add_systems(Startup, move |world: Res| { + let world = world.clone(); + AsyncComputeTaskPool::get() + .spawn(async move { + let system_state = world.system_state::>(); + match system_state.bridge(MySyncPoint, |_| unreachable!()).await { + Err(BridgeError::SystemParamValidation(_)) => { + FAILED_VALIDATION.store(true, Ordering::Relaxed); + } + _ => unreachable!("Parameter validation should have failed"), + } + }) + .detach(); + }); + + app.update(); + + assert!(FAILED_VALIDATION.load(Ordering::Relaxed)); + } + + #[test] + #[cfg(not(feature = "std"))] + fn no_std_test() { + use crate::prelude::*; + use bevy_app::prelude::*; + use bevy_app::ScheduleRunnerPlugin; + use bevy_ecs::prelude::*; + use bevy_platform::sync::atomic::AtomicBool; + use bevy_platform::sync::atomic::Ordering; + use bevy_tasks::AsyncComputeTaskPool; + + struct MySyncPoint; + static ACCESS_RAN: AtomicBool = AtomicBool::new(false); + let mut app = App::new(); + app.add_plugins(( + AsyncPlugin, + ScheduleRunnerPlugin::default(), + TaskPoolPlugin::default(), + )); + + app.add_systems(Update, async_world_sync_point::); + + app.add_systems(Startup, move |world: Res| { + let world = world.clone(); + AsyncComputeTaskPool::get() + .spawn_local(async move { + let system_state = world.system_state::(); + system_state + .bridge(MySyncPoint, |mut commands: Commands| { + commands.spawn_empty(); + ACCESS_RAN.store(true, Ordering::Relaxed); + }) + .await + .unwrap(); + }) + .detach(); + }); + + app.update(); + + assert!(ACCESS_RAN.load(Ordering::Relaxed)); + } +} diff --git a/crates/bevy_async/src/plugin.rs b/crates/bevy_async/src/plugin.rs new file mode 100644 index 0000000000000..4b3b4c8effcc8 --- /dev/null +++ b/crates/bevy_async/src/plugin.rs @@ -0,0 +1,124 @@ +use crate::bridge_future::AsyncSystemState; +use bevy_app::App; +use bevy_ecs::system::SystemParam; +use bevy_platform::sync::{Arc, Weak}; + +/// Plugin entry point for the async <-> ECS bridge system. +/// +/// Conceptually, async tasks cannot directly access Bevy ECS state from arbitrary +/// threads or arbitrary times. Instead, they enqueue requests which are later +/// driven from a known `SyncPoint` on the world-owning thread. +/// +/// This supports arbitrary async runtimes as well as multiple Bevy Worlds / Bevy Apps. +/// +/// To configure how "aggressively" sync points drive work, see [`AsyncTickBudget`]. +#[derive(Default)] +pub struct AsyncPlugin; + +impl bevy_app::Plugin for AsyncPlugin { + fn build(&self, app: &mut App) { + let strong_world = StrongAsyncWorld::default(); + let weak_world = AsyncWorld(Arc::downgrade(&strong_world.0)); + app.init_resource::() + .insert_resource(strong_world) + .insert_resource(weak_world); + } +} + +/// Resource to manage a limit on how many times we try to drive the async <-> ecs bridge per sync +/// point. +/// +/// This holds the upper-bound on how many async world ticks to perform each time a sync-point +/// system runs. +/// +/// A single "tick" means: +/// +/// 1. Collect queued bridge requests for that sync point. +/// 2. Wake the corresponding async tasks. +/// 3. Wait for each one to attempt a poll. +/// 4. Apply any deferred [`SystemState`] work back into the world. +/// +/// We may need to do this multiple times because one task's progress can unblock another task that +/// previously returned [`Poll::Pending`]. +/// +/// [`SystemState`]: bevy_ecs::system::SystemState +/// [`Poll::Pending`]: core::task::Poll::Pending +#[derive(bevy_ecs_macros::Resource, Clone)] +pub struct AsyncTickBudget(pub usize); + +impl Default for AsyncTickBudget { + fn default() -> Self { + Self(100) + } +} + +/// This resource gives one the ability to create a bridge between an async task and the ecs. +/// By calling `AsyncBridge::new(&self)` you create a new bridge between an async task +/// and the ecs. +#[derive(bevy_ecs_macros::Resource, Default, Clone)] +pub struct AsyncWorld(pub(crate) Weak); + +/// [`StrongAsyncWorld`] is the singular strong handle to the Inner that lives in a private Resource. +/// We only expose [`Weak`] handles publicly so we can rely on the behavior that if the `World` +/// is dropped then we can detect it by a failing [`Weak::upgrade`] +#[derive(bevy_ecs_macros::Resource, Default, Clone)] +pub(crate) struct StrongAsyncWorld(pub(crate) Arc); + +impl AsyncWorld { + /// Creates a reusable async handle for accessing the ECS with the + /// `SystemParam` type `P`. + /// + /// This is the entry-point to let an + /// async task interact with Bevy ECS state. + /// + /// The returned [`AsyncSystemState

`]: + /// - is cheap to clone, + /// - can be moved into async tasks, + /// - does not access the world immediately, + /// + /// [`AsyncSystemState

`] waits until a matching sync point drives the bridge and + /// temporarily grants safe ECS access. + /// + /// You create one of these from a cloned [`AsyncWorld`] resource and + /// then call `.access(...)` inside async code whenever you want to access the ECS. + /// + /// # Example + /// ```rust + /// use bevy_app::prelude::*; + /// use bevy_async::prelude::*; + /// use bevy_ecs::prelude::*; + /// use bevy_tasks::AsyncComputeTaskPool; + /// use bevy_platform::sync::atomic::AtomicBool; + /// use bevy_platform::sync::atomic::Ordering; + /// use bevy_platform::sync::Arc; + /// use bevy_app::ScheduleRunnerPlugin; + /// + /// struct MySyncPoint; + /// static ACCESS_RAN: AtomicBool = AtomicBool::new(false); + /// fn main() { + /// let mut app = App::new(); + /// app.add_plugins((AsyncPlugin::default(), ScheduleRunnerPlugin::default(), TaskPoolPlugin::default())); + /// app.add_systems(Update, async_world_sync_point::); + /// app.add_systems(Startup, move |world: Res| { + /// let world = world.clone(); + /// AsyncComputeTaskPool::get().spawn(async move { + /// let system_state = world.system_state::(); + /// system_state.bridge(MySyncPoint, |mut commands: Commands| { + /// commands.spawn_empty(); + /// ACCESS_RAN.store(true, Ordering::Relaxed); + /// }).await.unwrap(); + /// }).detach(); + /// }); + /// app.update(); + /// + /// assert!(ACCESS_RAN.load(Ordering::Relaxed)); + /// } + /// + /// ``` + /// + /// `P` is stored lazily, meaning the underlying `SystemState

` is only + /// initialized when the bridge is first driven against a real `World`. + pub fn system_state(&self) -> AsyncSystemState

{ + AsyncSystemState::new(self.clone()) + } +} diff --git a/crates/bevy_async/src/system_state.rs b/crates/bevy_async/src/system_state.rs new file mode 100644 index 0000000000000..633ce384b040d --- /dev/null +++ b/crates/bevy_async/src/system_state.rs @@ -0,0 +1,73 @@ +use bevy_ecs::system::{SystemParam, SystemState}; +use bevy_ecs::world::World; +use bevy_platform::sync::{Mutex, MutexGuard, OnceLock}; + +/// Stores a typed `SystemState

` behind a `OnceLock` so it can be initialized once +/// and then mutably shared across bridge requests. +/// +/// Why this exists: +/// `SystemState

` is typed, but the bridge queue needs to store heterogeneous +/// requests without knowing `P` at compile time. So each concrete +/// `SystemStateCell

` is later erased behind `dyn ErasedSystemStateCell`. +/// +/// We use a `OnceLock` because we cannot construct the `SystemState

` until we have a mutable +/// `World`. So we initialize it `SystemStateCell

` the first time it is used. +pub(crate) struct SystemStateCell(OnceLock>>); + +impl Default for SystemStateCell

{ + fn default() -> Self { + // Start uninitialized. Initialization is deferred until the request is + // first driven on the world-owning thread with access to `&mut World`. + Self(OnceLock::default()) + } +} + +/// Allows us to erase the `SystemStateCell` so we can pass it to and from the ecs. +/// +/// This lets the bridge store all request state uniformly as `Arc`. +/// +/// This trait exposes a single operation: to apply deferred state back into the `World`. +/// The second operation the trait is used for is in it's `impl dyn` implementation below. +pub(crate) trait ErasedSystemStateCell: Send + Sync + core::any::Any + 'static { + /// Apply deferred operations accumulated by the `SystemState` back into + /// the world. + /// + /// For example, `Commands` buffers are typically flushed during `apply`. + fn apply(&self, world: &mut World); +} + +impl ErasedSystemStateCell for SystemStateCell

{ + fn apply(&self, world: &mut World) { + // We expect initialization to have already occurred before `apply` is + // ever called. So `unwrap()` here reflects an invariant of the bridge. + // Completed requests only exist for initialized system states. + self.0.get().unwrap().lock().unwrap().apply(world); + } +} + +impl dyn ErasedSystemStateCell { + /// This function initializes the [`SystemStateCell`] if it hasn't already been initialized, and + /// then returns the [`MutexGuard`] of the `SystemState` if it isn't being used by another thread. + pub(crate) fn try_lock<'w, 'a, P: SystemParam + 'static>( + &'a self, + world: &'w mut World, + ) -> Option>> + where + 'a: 'w, + { + (self as &dyn core::any::Any) + .downcast_ref::>() + // Caller must use the same `Params` that created this cell. + .unwrap() + .0 + .get_or_init(|| Mutex::new(SystemState::new(world))) + // Use `try_lock` rather than blocking: + // if another request currently owns the typed `SystemState

`, the + // caller should yield with `Poll::Pending` instead of stalling a + // thread. We get ticked optimistically many times so it's okay. We aren't guaranteed to + // run everytime so we can return Poll::Pending instead of blocking an async task + // which would be very bad. + .try_lock() + .ok() + } +} diff --git a/crates/bevy_async/src/wake_signal.rs b/crates/bevy_async/src/wake_signal.rs new file mode 100644 index 0000000000000..b32dbcb80232c --- /dev/null +++ b/crates/bevy_async/src/wake_signal.rs @@ -0,0 +1,63 @@ +#[cfg(feature = "std")] +use bevy_platform::sync::{Arc, Mutex}; + +/// [`WakeSignaler`] is a custom signaling primitive used in order to fulfill our specific requirements for +/// our async bridge. We need to wait at the sync point, after waking all the futures and only when +/// all the futures have had a chance to run we stop waiting. +/// We need this signaling to occur also if the future is dropped, or if the future panics +/// so we implement the signaling *on* the Drop implementation. +/// This also makes replacing the wake signal automatically drop and signal the previous one. +pub(crate) struct WakeSignaler( + #[cfg(feature = "std")] Arc<(Mutex, std::sync::Condvar)>, + #[cfg(not(feature = "std"))] (), +); +/// Counterpart to the [`WakeSignaler`], the [`WakeWaiter`] waits for the [`WakeSignaler`] to drop and notify. +pub(crate) struct WakeWaiter( + #[cfg(feature = "std")] Arc<(Mutex, std::sync::Condvar)>, + #[cfg(not(feature = "std"))] (), +); + +#[inline] +pub(crate) fn pair() -> (WakeSignaler, WakeWaiter) { + #[cfg(feature = "std")] + let inner = Arc::new((Mutex::new(false), std::sync::Condvar::new())); + #[cfg(not(feature = "std"))] + let inner = (); + (WakeSignaler(inner.clone()), WakeWaiter(inner)) +} + +impl WakeWaiter { + /// Waits until another cloned instance of [`WakeSignaler`] has been dropped. + /// If any cloned instance of [`WakeSignaler`] is dropped then this wait stops waiting. + #[inline] + pub(crate) fn wait(&self) { + #[cfg(feature = "std")] + { + let (lock, cv) = &*self.0; + let mut signaled = lock.lock().unwrap(); + while !*signaled { + signaled = cv.wait(signaled).unwrap(); + } + } + #[cfg(not(feature = "std"))] + { + // No-op on no_std, since we are only using local futures we should tick them + // prior to reaching this point. + return; + } + } +} +impl Drop for WakeSignaler { + #[cfg(feature = "std")] + #[inline] + fn drop(&mut self) { + let (lock, cv) = &*self.0; + let mut signaled = lock.lock().unwrap(); + *signaled = true; + cv.notify_one(); + } + + #[cfg(not(feature = "std"))] + #[inline] + fn drop(&mut self) {} +} diff --git a/crates/bevy_internal/Cargo.toml b/crates/bevy_internal/Cargo.toml index eeca37f7c484d..33ff05b889d81 100644 --- a/crates/bevy_internal/Cargo.toml +++ b/crates/bevy_internal/Cargo.toml @@ -406,6 +406,7 @@ std = [ "bevy_color?/std", "bevy_diagnostic/std", "bevy_ecs/std", + "bevy_async/std", "bevy_input/std", "bevy_input_focus?/std", "bevy_math/std", @@ -488,6 +489,7 @@ bevy_diagnostic = { path = "../bevy_diagnostic", version = "0.19.0-dev", default bevy_ecs = { path = "../bevy_ecs", version = "0.19.0-dev", default-features = false, features = [ "bevy_reflect", ] } +bevy_async = { path = "../bevy_async", version = "0.19.0-dev", default-features = false } bevy_input = { path = "../bevy_input", version = "0.19.0-dev", default-features = false, features = [ "bevy_reflect", ] } diff --git a/crates/bevy_internal/src/default_plugins.rs b/crates/bevy_internal/src/default_plugins.rs index 26df25b475326..4430998e0d06c 100644 --- a/crates/bevy_internal/src/default_plugins.rs +++ b/crates/bevy_internal/src/default_plugins.rs @@ -7,6 +7,7 @@ plugin_group! { #[cfg(feature = "bevy_log")] bevy_log:::LogPlugin, bevy_app:::TaskPoolPlugin, + bevy_async:::AsyncPlugin, bevy_diagnostic:::FrameCountPlugin, bevy_time:::TimePlugin, bevy_transform:::TransformPlugin, @@ -165,6 +166,7 @@ plugin_group! { bevy_diagnostic:::FrameCountPlugin, bevy_time:::TimePlugin, bevy_app:::ScheduleRunnerPlugin, + bevy_async:::AsyncPlugin, #[cfg(feature = "bevy_ci_testing")] bevy_dev_tools::ci_testing:::CiTestingPlugin, } diff --git a/crates/bevy_internal/src/lib.rs b/crates/bevy_internal/src/lib.rs index bc278b9b4f125..a907def563bf7 100644 --- a/crates/bevy_internal/src/lib.rs +++ b/crates/bevy_internal/src/lib.rs @@ -25,6 +25,7 @@ pub use bevy_anti_alias as anti_alias; pub use bevy_app as app; #[cfg(feature = "bevy_asset")] pub use bevy_asset as asset; +pub use bevy_async as async_bridge; #[cfg(feature = "bevy_audio")] pub use bevy_audio as audio; #[cfg(feature = "bevy_camera")] diff --git a/examples/README.md b/examples/README.md index 0871132574246..b1ce17950db6a 100644 --- a/examples/README.md +++ b/examples/README.md @@ -276,6 +276,7 @@ Example | Description Example | Description --- | --- +[Async Bridge](../examples/async_tasks/async_bridge_primitive.rs) | An example showing how to offload work to background async tasks using the AsyncBridge primitive. [Async Channel Pattern](../examples/async_tasks/async_channel_pattern.rs) | An example showing how to offload work to background async tasks using channels for communication. [Async Compute](../examples/async_tasks/async_compute.rs) | How to use `AsyncComputeTaskPool` to complete longer running tasks [External Source of Data on an External Thread](../examples/async_tasks/external_source_external_thread.rs) | How to use an external thread to run an infinite task and communicate with a channel diff --git a/examples/async_tasks/async_bridge_primitive.rs b/examples/async_tasks/async_bridge_primitive.rs new file mode 100644 index 0000000000000..1d01fd16201c5 --- /dev/null +++ b/examples/async_tasks/async_bridge_primitive.rs @@ -0,0 +1,116 @@ +//! This example demonstrates how to use Bevy's ECS and the [`AsyncComputeTaskPool`] +//! to offload computationally intensive tasks to a background thread pool and process them +//! asynchronously. +//! +//! Unlike the channel-based approach (where tasks send results directly via a communication +//! channel) or the direct approach in the `async_compute` example, this example uses the +//! ecs<->async bridge. +//! +//! This approach allows for arbitrary ECS mutations throughout the async task, as well as awaiting +//! changes (which allows for ECS mutations to happen concurrently with other async operations). +//! Both the channel-based approach and the direct approach involve bespoke systems to handle task +//! communication (polling the channel, or polling for task completion) and therefore cannot perform +//! arbitrary ECS mutations unless explicitly implemented. These options also cannot await ECS +//! operations. + +use bevy::async_bridge::prelude::{async_world_sync_point, AsyncWorld}; +use bevy::prelude::*; +use rand::RngExt; + +struct MySyncPoint; + +fn main() { + App::new() + .add_plugins(DefaultPlugins) + .add_systems(Startup, (spawn_cube_tasks, setup_scene)) + .add_systems(Update, async_world_sync_point::) + .run(); +} + +// Number of cubes to spawn across the x, y, and z axis +const NUM_CUBES: i32 = 6; + +/// This system generates tasks simulating computationally intensive +/// work that potentially spans multiple frames/ticks. +/// +/// The task is offloaded to the `AsyncComputeTaskPool`, allowing heavy computation +/// to be handled asynchronously, without blocking the main game thread. +fn spawn_cube_tasks(async_world: Res) { + let pool = bevy::tasks::AsyncComputeTaskPool::get(); + + // Create a system state that is shared across all our tasks. SystemParams with local state + // (e.g., `Local`, `Changed` QueryFilter) will be reused between async tasks that bridge with + // this state. For example, if you have two tasks reusing a query with `Changed`, + // the second task that runs will only see changes the occurred between the two tasks. + // Generally, you should reuse the system state for similar tasks. In this example, we only + // spawn tasks in this system, so we don't need to cache this for reuse later. + let system_state = async_world.system_state::<( + Commands, + Local, Handle)>>, + ResMut>, + ResMut>, + )>(); + for x in -NUM_CUBES..NUM_CUBES { + for z in -NUM_CUBES..NUM_CUBES { + // Spawn a task on the async compute pool + let system_state = system_state.clone(); + pool.spawn(async move { + let delay = std::time::Duration::from_secs_f32(rand::rng().random_range(2.0..8.0)); + // Simulate a delay before task completion + futures_timer::Delay::new(delay).await; + system_state + .bridge( + MySyncPoint, + |(mut commands, mut box_handles, mut meshes, mut materials)| { + // The first time this bridge runs it will initialize the box mesh and box material, and then it will reuse them from then on. + if box_handles.is_none() { + box_handles.replace(( + meshes.add(Cuboid::new(0.25, 0.25, 0.25)), + materials.add(Color::srgb(1.0, 0.2, 0.3)), + )); + } + + let (box_mesh, box_material) = box_handles.clone().unwrap(); + + commands.spawn(( + Mesh3d(box_mesh), + MeshMaterial3d(box_material), + Transform::from_xyz(x as f32, 0.5, z as f32), + )); + }, + ) + .await + .unwrap(); + }) + .detach(); + } + } +} + +/// Setup a generic scene for our cubes to spawn into. +fn setup_scene( + mut commands: Commands, + mut meshes: ResMut>, + mut materials: ResMut>, +) { + commands.spawn(( + Mesh3d(meshes.add(Circle::new(1.618 * NUM_CUBES as f32))), + MeshMaterial3d(materials.add(Color::WHITE)), + Transform::from_rotation(Quat::from_rotation_x(-std::f32::consts::FRAC_PI_2)), + )); + + // Spawn a point light with shadows enabled + commands.spawn(( + PointLight { + shadow_maps_enabled: true, + ..default() + }, + Transform::from_xyz(0.0, 8.0, 4.0), + )); + + // Spawn a camera looking at the origin + commands.spawn(( + Camera3d::default(), + Transform::from_xyz(-6.5, 5.5, 12.0).looking_at(Vec3::ZERO, Vec3::Y), + )); +}