From d4504dd5bdef48d823a11075940e0d8b8f0eaa9e Mon Sep 17 00:00:00 2001 From: Bob Gardner Date: Thu, 13 Jun 2024 14:49:58 -0700 Subject: [PATCH 1/4] Fix for 13821 par_read not marking events as read --- crates/bevy_ecs/src/event/iterators.rs | 8 +++++- crates/bevy_ecs/src/event/mod.rs | 37 ++++++++++++++------------ 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/crates/bevy_ecs/src/event/iterators.rs b/crates/bevy_ecs/src/event/iterators.rs index 03d57ad2bce65..60750f7f1c923 100644 --- a/crates/bevy_ecs/src/event/iterators.rs +++ b/crates/bevy_ecs/src/event/iterators.rs @@ -145,6 +145,7 @@ pub struct EventParIter<'a, E: Event> { reader: &'a mut ManualEventReader, slices: [&'a [EventInstance]; 2], batching_strategy: BatchingStrategy, + unread: usize, } impl<'a, E: Event> EventParIter<'a, E> { @@ -168,6 +169,7 @@ impl<'a, E: Event> EventParIter<'a, E> { reader, slices: [a, b], batching_strategy: BatchingStrategy::default(), + unread: unread_count, } } @@ -203,7 +205,7 @@ impl<'a, E: Event> EventParIter<'a, E> { /// initialized and run from the ECS scheduler, this should never panic. /// /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool - pub fn for_each_with_id) + Send + Sync + Clone>(self, func: FN) { + pub fn for_each_with_id) + Send + Sync + Clone>(mut self, func: FN) { #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))] { self.into_iter().for_each(|(e, i)| func(e, i)); @@ -233,6 +235,10 @@ impl<'a, E: Event> EventParIter<'a, E> { }); } }); + + // If we get to hear we're gaurenteed to have seen all events + self.reader.last_event_count += self.unread; + self.unread = 0; } } diff --git a/crates/bevy_ecs/src/event/mod.rs b/crates/bevy_ecs/src/event/mod.rs index 4aae28587d6eb..d47c813578bb9 100644 --- a/crates/bevy_ecs/src/event/mod.rs +++ b/crates/bevy_ecs/src/event/mod.rs @@ -421,30 +421,33 @@ mod tests { #[cfg(feature = "multi_threaded")] #[test] fn test_events_par_iter() { - use std::{collections::HashSet, sync::mpsc}; - use crate::prelude::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + + #[derive(Resource)] + struct Counter(AtomicUsize); let mut world = World::new(); world.init_resource::>(); - for i in 0..100 { - world.send_event(TestEvent { i }); + for _ in 0..100 { + world.send_event(TestEvent { i: 1 }); } - let mut schedule = Schedule::default(); + schedule.add_systems( + |mut events: EventReader, counter: ResMut| { + events.par_read().for_each(|event| { + counter.0.fetch_add(event.i, Ordering::Relaxed); + }); + }, + ); + world.insert_resource(Counter(AtomicUsize::new(0))); + schedule.run(&mut world); + let counter = world.remove_resource::().unwrap(); + assert_eq!(counter.0.into_inner(), 100); - schedule.add_systems(|mut events: EventReader| { - let (tx, rx) = mpsc::channel(); - events.par_read().for_each(|event| { - tx.send(event.i).unwrap(); - }); - drop(tx); - - let observed: HashSet<_> = rx.into_iter().collect(); - assert_eq!(observed, HashSet::from_iter(0..100)); - }); + world.insert_resource(Counter(AtomicUsize::new(0))); schedule.run(&mut world); + let counter = world.remove_resource::().unwrap(); + assert_eq!(counter.0.into_inner(), 0); } - - // Peak tests } From d65a7cdfdf18789b208d9ba8c03c0f0a5b2acc96 Mon Sep 17 00:00:00 2001 From: Bob Gardner Date: Thu, 13 Jun 2024 14:54:24 -0700 Subject: [PATCH 2/4] Fix typo --- crates/bevy_ecs/src/event/iterators.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_ecs/src/event/iterators.rs b/crates/bevy_ecs/src/event/iterators.rs index 60750f7f1c923..89f6f9e5f7e4e 100644 --- a/crates/bevy_ecs/src/event/iterators.rs +++ b/crates/bevy_ecs/src/event/iterators.rs @@ -236,7 +236,7 @@ impl<'a, E: Event> EventParIter<'a, E> { } }); - // If we get to hear we're gaurenteed to have seen all events + // If we get to hear we're guaranteed to have seen all events self.reader.last_event_count += self.unread; self.unread = 0; } From 75a241a8a1ac482ed1288ad396e531768785da7e Mon Sep 17 00:00:00 2001 From: Bob Gardner Date: Fri, 14 Jun 2024 08:04:55 -0700 Subject: [PATCH 3/4] Update crates/bevy_ecs/src/event/iterators.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Martín Maita <47983254+mnmaita@users.noreply.github.com> --- crates/bevy_ecs/src/event/iterators.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_ecs/src/event/iterators.rs b/crates/bevy_ecs/src/event/iterators.rs index 89f6f9e5f7e4e..30f0f8e1edbf0 100644 --- a/crates/bevy_ecs/src/event/iterators.rs +++ b/crates/bevy_ecs/src/event/iterators.rs @@ -236,7 +236,7 @@ impl<'a, E: Event> EventParIter<'a, E> { } }); - // If we get to hear we're guaranteed to have seen all events + // Events are guaranteed to be read at this point. self.reader.last_event_count += self.unread; self.unread = 0; } From 80e451be304b1563b232ef55452e4efbb9342e3b Mon Sep 17 00:00:00 2001 From: Bob Gardner Date: Tue, 25 Jun 2024 08:27:29 -0700 Subject: [PATCH 4/4] Removed par_iter when multi_threaded is not an active feature --- crates/bevy_ecs/src/event/iterators.rs | 14 ++++++++------ crates/bevy_ecs/src/event/mod.rs | 4 +++- crates/bevy_ecs/src/event/reader.rs | 6 +++++- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/crates/bevy_ecs/src/event/iterators.rs b/crates/bevy_ecs/src/event/iterators.rs index 30f0f8e1edbf0..adcc1fdac0baa 100644 --- a/crates/bevy_ecs/src/event/iterators.rs +++ b/crates/bevy_ecs/src/event/iterators.rs @@ -1,8 +1,7 @@ use crate as bevy_ecs; -use bevy_ecs::{ - batching::BatchingStrategy, - event::{Event, EventId, EventInstance, Events, ManualEventReader}, -}; +#[cfg(feature = "multi_threaded")] +use bevy_ecs::batching::BatchingStrategy; +use bevy_ecs::event::{Event, EventId, EventInstance, Events, ManualEventReader}; use bevy_utils::detailed_trace; use std::{iter::Chain, slice::Iter}; @@ -141,6 +140,7 @@ impl<'a, E: Event> ExactSizeIterator for EventIteratorWithId<'a, E> { /// A parallel iterator over `Event`s. #[derive(Debug)] +#[cfg(feature = "multi_threaded")] pub struct EventParIter<'a, E: Event> { reader: &'a mut ManualEventReader, slices: [&'a [EventInstance]; 2], @@ -148,6 +148,7 @@ pub struct EventParIter<'a, E: Event> { unread: usize, } +#[cfg(feature = "multi_threaded")] impl<'a, E: Event> EventParIter<'a, E> { /// Creates a new parallel iterator over `events` that have not yet been seen by `reader`. pub fn new(reader: &'a mut ManualEventReader, events: &'a Events) -> Self { @@ -206,12 +207,12 @@ impl<'a, E: Event> EventParIter<'a, E> { /// /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool pub fn for_each_with_id) + Send + Sync + Clone>(mut self, func: FN) { - #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))] + #[cfg(target_arch = "wasm32")] { self.into_iter().for_each(|(e, i)| func(e, i)); } - #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] + #[cfg(not(target_arch = "wasm32"))] { let pool = bevy_tasks::ComputeTaskPool::get(); let thread_count = pool.thread_num(); @@ -253,6 +254,7 @@ impl<'a, E: Event> EventParIter<'a, E> { } } +#[cfg(feature = "multi_threaded")] impl<'a, E: Event> IntoIterator for EventParIter<'a, E> { type IntoIter = EventIteratorWithId<'a, E>; type Item = ::Item; diff --git a/crates/bevy_ecs/src/event/mod.rs b/crates/bevy_ecs/src/event/mod.rs index 33897b1257bca..d2366551f37b6 100644 --- a/crates/bevy_ecs/src/event/mod.rs +++ b/crates/bevy_ecs/src/event/mod.rs @@ -11,7 +11,9 @@ pub(crate) use base::EventInstance; pub use base::{Event, EventId}; pub use bevy_ecs_macros::Event; pub use collections::{Events, SendBatchIds}; -pub use iterators::{EventIterator, EventIteratorWithId, EventParIter}; +#[cfg(feature = "multi_threaded")] +pub use iterators::EventParIter; +pub use iterators::{EventIterator, EventIteratorWithId}; pub use reader::{EventReader, ManualEventReader}; pub use registry::{EventRegistry, ShouldUpdateEvents}; pub use update::{ diff --git a/crates/bevy_ecs/src/event/reader.rs b/crates/bevy_ecs/src/event/reader.rs index f939a6faa664d..2152613fb9ab4 100644 --- a/crates/bevy_ecs/src/event/reader.rs +++ b/crates/bevy_ecs/src/event/reader.rs @@ -1,6 +1,8 @@ use crate as bevy_ecs; +#[cfg(feature = "multi_threaded")] +use bevy_ecs::event::EventParIter; use bevy_ecs::{ - event::{Event, EventIterator, EventIteratorWithId, EventParIter, Events}, + event::{Event, EventIterator, EventIteratorWithId, Events}, system::{Local, Res, SystemParam}, }; use std::marker::PhantomData; @@ -66,6 +68,7 @@ impl<'w, 's, E: Event> EventReader<'w, 's, E> { /// assert_eq!(counter.into_inner(), 4950); /// ``` /// + #[cfg(feature = "multi_threaded")] pub fn par_read(&mut self) -> EventParIter<'_, E> { self.reader.par_read(&self.events) } @@ -189,6 +192,7 @@ impl ManualEventReader { } /// See [`EventReader::par_read`] + #[cfg(feature = "multi_threaded")] pub fn par_read<'a>(&'a mut self, events: &'a Events) -> EventParIter<'a, E> { EventParIter::new(self, events) }