From 2bd8efd413512e22fbb3223c693d00f496fed135 Mon Sep 17 00:00:00 2001 From: Bob Gardner Date: Tue, 11 Jun 2024 15:39:33 -0700 Subject: [PATCH 01/29] Created an EventMutator for when you want to mutate an event. --- crates/bevy_ecs/src/event/collections.rs | 16 +- crates/bevy_ecs/src/event/mod.rs | 247 ++++++++++++++++++- crates/bevy_ecs/src/event/mut_iterators.rs | 264 +++++++++++++++++++++ crates/bevy_ecs/src/event/mutator.rs | 255 ++++++++++++++++++++ crates/bevy_ecs/src/event/reader.rs | 2 +- crates/bevy_ecs/src/lib.rs | 2 +- 6 files changed, 782 insertions(+), 4 deletions(-) create mode 100644 crates/bevy_ecs/src/event/mut_iterators.rs create mode 100644 crates/bevy_ecs/src/event/mutator.rs diff --git a/crates/bevy_ecs/src/event/collections.rs b/crates/bevy_ecs/src/event/collections.rs index 4370119c065db..a5750b4128518 100644 --- a/crates/bevy_ecs/src/event/collections.rs +++ b/crates/bevy_ecs/src/event/collections.rs @@ -1,6 +1,6 @@ use crate as bevy_ecs; use bevy_ecs::{ - event::{Event, EventId, EventInstance, ManualEventReader}, + event::{Event, EventId, EventInstance, ManualEventMutator, ManualEventReader}, system::Resource, }; #[cfg(feature = "bevy_reflect")] @@ -167,6 +167,20 @@ impl Events { } } + /// Gets a new [`ManualEventReader`]. This will include all events already in the event buffers. + pub fn get_mutator(&self) -> ManualEventMutator { + ManualEventMutator::default() + } + + /// Gets a new [`ManualEventReader`]. This will ignore all events already in the event buffers. + /// It will read all future events. + pub fn get_mutator_current(&self) -> ManualEventMutator { + ManualEventMutator { + last_event_count: self.event_count, + ..Default::default() + } + } + /// Swaps the event buffers and clears the oldest event buffer. In general, this should be /// called once per frame/update. /// diff --git a/crates/bevy_ecs/src/event/mod.rs b/crates/bevy_ecs/src/event/mod.rs index 4aae28587d6eb..84c00af8325e7 100644 --- a/crates/bevy_ecs/src/event/mod.rs +++ b/crates/bevy_ecs/src/event/mod.rs @@ -2,6 +2,8 @@ mod base; mod collections; mod iterators; +mod mut_iterators; +mod mutator; mod reader; mod registry; mod update; @@ -12,6 +14,8 @@ pub use base::{Event, EventId}; pub use bevy_ecs_macros::Event; pub use collections::{Events, SendBatchIds}; pub use iterators::{EventIterator, EventIteratorWithId, EventParIter}; +pub use mut_iterators::{EventMutatorIterator, EventMutatorIteratorWithId, EventMutatorParIter}; +pub use mutator::{EventMutator, ManualEventMutator}; pub use reader::{EventReader, ManualEventReader}; pub use registry::EventRegistry; pub use update::{ @@ -446,5 +450,246 @@ mod tests { schedule.run(&mut world); } - // Peak tests + // Mutation tests + fn events_clear_and_mutate_impl(clear_func: impl FnOnce(&mut Events)) { + let mut events = Events::::default(); + let mut mutator = events.get_mutator(); + let mut reader = events.get_reader(); + assert!(mutator.read(&mut events).next().is_none()); + assert!(reader.read(&events).next().is_none()); + + events.send(E(0)); + let sent_event = mutator.read(&mut events).next().unwrap(); + assert_eq!(sent_event, &mut E(0)); + *sent_event = E(1); // Mutate whole event + assert_eq!(reader.read(&events).next().unwrap(), &E(1)); + + events.send(E(2)); + let sent_event = mutator.read(&mut events).next().unwrap(); + assert_eq!(sent_event, &mut E(2)); + sent_event.0 = 3; // Mutate sub value + assert_eq!(reader.read(&events).next().unwrap(), &E(3)); + + clear_func(&mut events); + assert!(mutator.read(&mut events).next().is_none()); + assert!(reader.read(&events).next().is_none()); + } + + #[test] + fn test_events_clear_and_mutate() { + events_clear_and_mutate_impl(|events| events.clear()); + } + + #[test] + fn test_event_mutator_len_empty() { + let events = Events::::default(); + assert_eq!(events.get_mutator().len(&events), 0); + assert!(events.get_mutator().is_empty(&events)); + } + + #[test] + fn test_event_mutator_len_filled() { + let mut events = Events::::default(); + events.send(TestEvent { i: 0 }); + assert_eq!(events.get_mutator().len(&events), 1); + assert!(!events.get_mutator().is_empty(&events)); + } + + #[test] + fn test_event_mutator_iter_len_updated() { + let mut events = Events::::default(); + events.send(TestEvent { i: 0 }); + events.send(TestEvent { i: 1 }); + events.send(TestEvent { i: 2 }); + let mut mutator = events.get_mutator(); + let mut iter = mutator.read(&mut events); + assert_eq!(iter.len(), 3); + iter.next(); + assert_eq!(iter.len(), 2); + iter.next(); + assert_eq!(iter.len(), 1); + iter.next(); + assert_eq!(iter.len(), 0); + } + + #[test] + fn test_event_mutator_len_current() { + let mut events = Events::::default(); + events.send(TestEvent { i: 0 }); + let mutator = events.get_mutator_current(); + dbg!(&mutator); + dbg!(&events); + assert!(mutator.is_empty(&events)); + events.send(TestEvent { i: 0 }); + assert_eq!(mutator.len(&events), 1); + assert!(!mutator.is_empty(&events)); + } + + #[test] + fn test_event_mutator_len_update() { + let mut events = Events::::default(); + events.send(TestEvent { i: 0 }); + events.send(TestEvent { i: 0 }); + let mutator = events.get_mutator(); + assert_eq!(mutator.len(&events), 2); + events.update(); + events.send(TestEvent { i: 0 }); + assert_eq!(mutator.len(&events), 3); + events.update(); + assert_eq!(mutator.len(&events), 1); + events.update(); + assert!(mutator.is_empty(&events)); + } + + #[test] + fn test_event_mutator_clear() { + use bevy_ecs::prelude::*; + + let mut world = World::new(); + let mut events = Events::::default(); + events.send(TestEvent { i: 0 }); + world.insert_resource(events); + + let mut mutator = IntoSystem::into_system(|mut events: EventMutator| -> bool { + if !events.is_empty() { + events.clear(); + false + } else { + true + } + }); + mutator.initialize(&mut world); + + let is_empty = mutator.run((), &mut world); + assert!(!is_empty, "EventMutator should not be empty"); + let is_empty = mutator.run((), &mut world); + assert!(is_empty, "EventMutator should be empty"); + } + + #[test] + fn test_mutator_update_drain() { + let mut events = Events::::default(); + let mut mutator = events.get_mutator(); + + events.send(TestEvent { i: 0 }); + events.send(TestEvent { i: 1 }); + assert_eq!(mutator.read(&mut events).count(), 2); + + let mut old_events = Vec::from_iter(events.update_drain()); + assert!(old_events.is_empty()); + + events.send(TestEvent { i: 2 }); + assert_eq!(mutator.read(&mut events).count(), 1); + + old_events.extend(events.update_drain()); + assert_eq!(old_events.len(), 2); + + old_events.extend(events.update_drain()); + assert_eq!( + old_events, + &[TestEvent { i: 0 }, TestEvent { i: 1 }, TestEvent { i: 2 }] + ); + } + + #[allow(clippy::iter_nth_zero)] + #[test] + fn test_event_mutator_read_nth() { + use bevy_ecs::prelude::*; + + let mut world = World::new(); + world.init_resource::>(); + + world.send_event(TestEvent { i: 0 }); + world.send_event(TestEvent { i: 1 }); + world.send_event(TestEvent { i: 2 }); + world.send_event(TestEvent { i: 3 }); + + let mut schedule = Schedule::default(); + schedule.add_systems(|mut events: EventMutator| { + let mut iter = events.read(); + + // Peek does not consume events put should still advance the iterator + assert_eq!(iter.next(), Some(&mut TestEvent { i: 0 })); + assert_eq!(iter.nth(0), Some(&mut TestEvent { i: 1 })); + assert_eq!(iter.nth(1), Some(&mut TestEvent { i: 3 })); + assert_eq!(iter.nth(2), None); + + assert!(events.is_empty()); + }); + schedule.run(&mut world); + } + + #[test] + fn test_event_mutator_read_last() { + use bevy_ecs::prelude::*; + + let mut world = World::new(); + world.init_resource::>(); + + let mut mutator = + IntoSystem::into_system(|mut events: EventMutator| -> Option { + events.read().last().copied() + }); + mutator.initialize(&mut world); + + let last = mutator.run((), &mut world); + assert!(last.is_none(), "EventMutator should be empty"); + + world.send_event(TestEvent { i: 0 }); + let last = mutator.run((), &mut world); + assert_eq!(last, Some(TestEvent { i: 0 })); + + world.send_event(TestEvent { i: 1 }); + world.send_event(TestEvent { i: 2 }); + world.send_event(TestEvent { i: 3 }); + let last = mutator.run((), &mut world); + assert_eq!(last, Some(TestEvent { i: 3 })); + + let last = mutator.run((), &mut world); + assert!(last.is_none(), "EventMutator should be empty"); + } + + #[cfg(feature = "multi_threaded")] + #[test] + fn test_events_mutator_par_read() { + use std::{collections::HashSet, sync::mpsc}; + + use crate::prelude::*; + + let mut world = World::new(); + world.init_resource::>(); + for i in 0..100 { + world.send_event(TestEvent { i }); + } + + let mut schedule = Schedule::default(); + + schedule.add_systems( + |mut events: EventMutator, mut second_run: Local| { + events.par_read().for_each(|event| { + event.i += 1; // Mutate every event + }); + + let (tx, rx) = mpsc::channel(); + events.par_read().for_each(|event| { + tx.send(event.i).unwrap(); + }); + drop(tx); + + if *second_run { + let observed: HashSet<_> = rx.into_iter().collect(); + assert!( + observed.is_empty(), + "par_read didn't consume events after first run but should" + ); + } else { + let observed: HashSet<_> = rx.into_iter().collect(); + assert_eq!(observed, HashSet::from_iter(1..102)); + *second_run = true; + }; + }, + ); + schedule.run(&mut world); + schedule.run(&mut world); + } } diff --git a/crates/bevy_ecs/src/event/mut_iterators.rs b/crates/bevy_ecs/src/event/mut_iterators.rs new file mode 100644 index 0000000000000..0f54150c7a8da --- /dev/null +++ b/crates/bevy_ecs/src/event/mut_iterators.rs @@ -0,0 +1,264 @@ +use crate as bevy_ecs; +use bevy_ecs::{ + batching::BatchingStrategy, + event::{Event, EventId, EventInstance, Events, ManualEventMutator}, +}; +use bevy_utils::detailed_trace; +use std::{iter::Chain, slice::IterMut}; + +/// An iterator that yields any unread events from an [`EventMutator`] or [`ManualEventMutator`]. +#[derive(Debug)] +pub struct EventMutatorIterator<'a, E: Event> { + iter: EventMutatorIteratorWithId<'a, E>, +} + +impl<'a, E: Event> Iterator for EventMutatorIterator<'a, E> { + type Item = &'a mut E; + fn next(&mut self) -> Option { + self.iter.next().map(|(event, _)| event) + } + + fn size_hint(&self) -> (usize, Option) { + self.iter.size_hint() + } + + fn count(self) -> usize { + self.iter.count() + } + + fn last(self) -> Option + where + Self: Sized, + { + self.iter.last().map(|(event, _)| event) + } + + fn nth(&mut self, n: usize) -> Option { + self.iter.nth(n).map(|(event, _)| event) + } +} + +impl<'a, E: Event> ExactSizeIterator for EventMutatorIterator<'a, E> { + fn len(&self) -> usize { + self.iter.len() + } +} + +/// An iterator that yields any unread events (and their IDs) from an [`EventMutator`] or [`ManualEventMutator`]. +#[derive(Debug)] +pub struct EventMutatorIteratorWithId<'a, E: Event> { + mutator: &'a mut ManualEventMutator, + chain: Chain>, IterMut<'a, EventInstance>>, + unread: usize, +} + +impl<'a, E: Event> EventMutatorIteratorWithId<'a, E> { + /// Creates a new iterator that yields any `events` that have not yet been seen by `mutator`. + pub fn new(mutator: &'a mut ManualEventMutator, events: &'a mut Events) -> Self { + let a_index = mutator + .last_event_count + .saturating_sub(events.events_a.start_event_count); + let b_index = mutator + .last_event_count + .saturating_sub(events.events_b.start_event_count); + let a = events.events_a.get_mut(a_index..).unwrap_or_default(); + let b = events.events_b.get_mut(b_index..).unwrap_or_default(); + + let unread_count = a.len() + b.len(); + + mutator.last_event_count = events.event_count - unread_count; + // Iterate the oldest first, then the newer events + let chain = a.iter_mut().chain(b.iter_mut()); + + Self { + mutator, + chain, + unread: unread_count, + } + } + + /// Iterate over only the events. + pub fn without_id(self) -> EventMutatorIterator<'a, E> { + EventMutatorIterator { iter: self } + } +} + +impl<'a, E: Event> Iterator for EventMutatorIteratorWithId<'a, E> { + type Item = (&'a mut E, EventId); + fn next(&mut self) -> Option { + match self + .chain + .next() + .map(|instance| (&mut instance.event, instance.event_id)) + { + Some(item) => { + detailed_trace!("EventReader::iter() -> {}", item.1); + self.mutator.last_event_count += 1; + self.unread -= 1; + Some(item) + } + None => None, + } + } + + fn size_hint(&self) -> (usize, Option) { + self.chain.size_hint() + } + + fn count(self) -> usize { + self.mutator.last_event_count += self.unread; + self.unread + } + + fn last(self) -> Option + where + Self: Sized, + { + let EventInstance { event_id, event } = self.chain.last()?; + self.mutator.last_event_count += self.unread; + Some((event, *event_id)) + } + + fn nth(&mut self, n: usize) -> Option { + if let Some(EventInstance { event_id, event }) = self.chain.nth(n) { + self.mutator.last_event_count += n + 1; + self.unread -= n + 1; + Some((event, *event_id)) + } else { + self.mutator.last_event_count += self.unread; + self.unread = 0; + None + } + } +} + +impl<'a, E: Event> ExactSizeIterator for EventMutatorIteratorWithId<'a, E> { + fn len(&self) -> usize { + self.unread + } +} + +/// A parallel iterator over `Event`s. +#[derive(Debug)] +pub struct EventMutatorParIter<'a, E: Event> { + reader: &'a mut ManualEventMutator, + slices: [&'a mut [EventInstance]; 2], + batching_strategy: BatchingStrategy, +} + +impl<'a, E: Event> EventMutatorParIter<'a, E> { + /// Creates a new parallel iterator over `events` that have not yet been seen by `mutator`. + pub fn new(mutator: &'a mut ManualEventMutator, events: &'a mut Events) -> Self { + let a_index = mutator + .last_event_count + .saturating_sub(events.events_a.start_event_count); + let b_index = mutator + .last_event_count + .saturating_sub(events.events_b.start_event_count); + let a = events.events_a.get_mut(a_index..).unwrap_or_default(); + let b = events.events_b.get_mut(b_index..).unwrap_or_default(); + + let unread_count = a.len() + b.len(); + mutator.last_event_count = events.event_count - unread_count; + + Self { + reader: mutator, + slices: [a, b], + batching_strategy: BatchingStrategy::default(), + } + } + + /// Changes the batching strategy used when iterating. + /// + /// For more information on how this affects the resultant iteration, see + /// [`BatchingStrategy`]. + pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self { + self.batching_strategy = strategy; + self + } + + /// Runs the provided closure for each unread event in parallel. + /// + /// Unlike normal iteration, the event order is not guaranteed in any form. + /// + /// # Panics + /// If the [`ComputeTaskPool`] is not initialized. If using this from an event reader that is being + /// initialized and run from the ECS scheduler, this should never panic. + /// + /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool + pub fn for_each(self, func: FN) { + self.for_each_with_id(move |e, _| func(e)); + } + + /// Runs the provided closure for each unread event in parallel, like [`for_each`](Self::for_each), + /// but additionally provides the `EventId` to the closure. + /// + /// Note that the order of iteration is not guaranteed, but `EventId`s are ordered by send order. + /// + /// # Panics + /// If the [`ComputeTaskPool`] is not initialized. If using this from an event reader that is being + /// initialized and run from the ECS scheduler, this should never panic. + /// + /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool + pub fn for_each_with_id) + Send + Sync + Clone>(self, func: FN) { + #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))] + { + self.into_iter().for_each(|(e, i)| func(e, i)); + } + + #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] + { + let pool = bevy_tasks::ComputeTaskPool::get(); + let thread_count = pool.thread_num(); + if thread_count <= 1 { + return self.into_iter().for_each(|(e, i)| func(e, i)); + } + + let batch_size = self + .batching_strategy + .calc_batch_size(|| self.len(), thread_count); + let chunks = self.slices.map(|s| s.chunks_mut(batch_size)); + + pool.scope(|scope| { + for batch in chunks.into_iter().flatten() { + let func = func.clone(); + scope.spawn(async move { + for event in batch { + func(&mut event.event, event.event_id); + } + }); + } + }); + } + } + + /// Returns the number of [`Event`]s to be iterated. + pub fn len(&self) -> usize { + self.slices.iter().map(|s| s.len()).sum() + } + + /// Returns [`true`] if there are no events remaining in this iterator. + pub fn is_empty(&self) -> bool { + self.slices.iter().all(|x| x.is_empty()) + } +} + +impl<'a, E: Event> IntoIterator for EventMutatorParIter<'a, E> { + type IntoIter = EventMutatorIteratorWithId<'a, E>; + type Item = ::Item; + + fn into_iter(self) -> Self::IntoIter { + let EventMutatorParIter { + reader, + slices: [a, b], + .. + } = self; + let unread = a.len() + b.len(); + let chain = a.iter_mut().chain(b); + EventMutatorIteratorWithId { + mutator: reader, + chain, + unread, + } + } +} diff --git a/crates/bevy_ecs/src/event/mutator.rs b/crates/bevy_ecs/src/event/mutator.rs new file mode 100644 index 0000000000000..330f595c92667 --- /dev/null +++ b/crates/bevy_ecs/src/event/mutator.rs @@ -0,0 +1,255 @@ +use crate as bevy_ecs; +use bevy_ecs::{ + event::{Event, EventMutatorIterator, EventMutatorIteratorWithId, EventMutatorParIter, Events}, + system::{Local, ResMut, SystemParam}, +}; +use std::marker::PhantomData; + +/// Mutably reads events of type `T` keeping track of which events have already been read +/// by each system allowing multiple systems to read the same events. Ideal for chains of systems +/// that all want to modify the same events. +/// +/// # Usage +/// +/// `EventMutators`s are usually declared as a [`SystemParam`]. +/// ``` +/// # use bevy_ecs::prelude::*; +/// +/// #[derive(Event, Debug)] +/// pub struct MyEvent(pub u32); // Custom event type. +/// fn my_system(mut reader: EventMutator) { +/// for event in reader.read() { +/// event.0 += 1; +/// println!("received event: {:?}", event); +/// } +/// } +/// ``` +/// +/// # Concurrency +/// +/// Multiple systems with `EventMutator` of the same event type can not run concurrently. +/// They also can not be executed in parallel with [`EventReader`] and [`EventWriter`]. +/// +/// # Clearing, Reading, and Peeking +/// +/// Events are stored in a double buffered queue that switches each frame. This switch also clears the previous +/// frame's events. Events should be read each frame otherwise they may be lost. For manual control over this +/// behavior, see [`Events`]. +/// +/// Most of the time systems will want to use [`EventMutator::read()`]. This function creates an iterator over +/// all events that haven't been read yet by this system, marking the event as read in the process. +/// +#[derive(SystemParam, Debug)] +pub struct EventMutator<'w, 's, E: Event> { + pub(super) reader: Local<'s, ManualEventMutator>, + events: ResMut<'w, Events>, +} + +impl<'w, 's, E: Event> EventMutator<'w, 's, E> { + /// Iterates over the events this [`EventReader`] has not seen yet. This updates the + /// [`EventReader`]'s event counter, which means subsequent event reads will not include events + /// that happened before now. + pub fn read(&mut self) -> EventMutatorIterator<'_, E> { + self.reader.read(&mut self.events) + } + + /// Like [`read`](Self::read), except also returning the [`EventId`] of the events. + pub fn read_with_id(&mut self) -> EventMutatorIteratorWithId<'_, E> { + self.reader.read_with_id(&mut self.events) + } + + /// Returns a parallel iterator over the events this [`EventReader`] has not seen yet. + /// See also [`for_each`](EventParIter::for_each). + /// + /// # Example + /// ``` + /// # use bevy_ecs::prelude::*; + /// # use std::sync::atomic::{AtomicUsize, Ordering}; + /// + /// #[derive(Event)] + /// struct MyEvent { + /// value: usize, + /// } + /// + /// #[derive(Resource, Default)] + /// struct Counter(AtomicUsize); + /// + /// // setup + /// let mut world = World::new(); + /// world.init_resource::>(); + /// world.insert_resource(Counter::default()); + /// + /// let mut schedule = Schedule::default(); + /// schedule.add_systems(|mut events: EventReader, counter: Res| { + /// events.par_read().for_each(|MyEvent { value }| { + /// counter.0.fetch_add(*value, Ordering::Relaxed); + /// }); + /// }); + /// for value in 0..100 { + /// world.send_event(MyEvent { value }); + /// } + /// schedule.run(&mut world); + /// let Counter(counter) = world.remove_resource::().unwrap(); + /// // all events were processed + /// assert_eq!(counter.into_inner(), 4950); + /// ``` + /// + pub fn par_read(&mut self) -> EventMutatorParIter<'_, E> { + self.reader.par_read(&mut self.events) + } + + /// Determines the number of events available to be read from this [`EventReader`] without consuming any. + pub fn len(&self) -> usize { + self.reader.len(&self.events) + } + + /// Returns `true` if there are no events available to read. + /// + /// # Example + /// + /// The following example shows a useful pattern where some behavior is triggered if new events are available. + /// [`EventReader::clear()`] is used so the same events don't re-trigger the behavior the next time the system runs. + /// + /// ``` + /// # use bevy_ecs::prelude::*; + /// # + /// #[derive(Event)] + /// struct CollisionEvent; + /// + /// fn play_collision_sound(mut events: EventReader) { + /// if !events.is_empty() { + /// events.clear(); + /// // Play a sound + /// } + /// } + /// # bevy_ecs::system::assert_is_system(play_collision_sound); + /// ``` + pub fn is_empty(&self) -> bool { + self.reader.is_empty(&self.events) + } + + /// Consumes all available events. + /// + /// This means these events will not appear in calls to [`EventReader::read()`] or + /// [`EventReader::read_with_id()`] and [`EventReader::is_empty()`] will return `true`. + /// + /// For usage, see [`EventReader::is_empty()`]. + pub fn clear(&mut self) { + self.reader.clear(&self.events); + } +} + +/// Stores the state for an [`EventReader`]. +/// +/// Access to the [`Events`] resource is required to read any incoming events. +/// +/// In almost all cases, you should just use an [`EventReader`], +/// which will automatically manage the state for you. +/// +/// However, this type can be useful if you need to manually track events, +/// such as when you're attempting to send and receive events of the same type in the same system. +/// +/// # Example +/// +/// ``` +/// use bevy_ecs::prelude::*; +/// use bevy_ecs::event::{Event, Events, ManualEventReader}; +/// +/// #[derive(Event, Clone, Debug)] +/// struct MyEvent; +/// +/// /// A system that both sends and receives events using a [`Local`] [`ManualEventReader`]. +/// fn send_and_receive_manual_event_reader( +/// // The `Local` `SystemParam` stores state inside the system itself, rather than in the world. +/// // `ManualEventReader` is the internal state of `EventReader`, which tracks which events have been seen. +/// mut local_event_reader: Local>, +/// // We can access the `Events` resource mutably, allowing us to both read and write its contents. +/// mut events: ResMut>, +/// ) { +/// // We must collect the events to resend, because we can't mutate events while we're iterating over the events. +/// let mut events_to_resend = Vec::new(); +/// +/// for event in local_event_reader.read(&events) { +/// events_to_resend.push(event.clone()); +/// } +/// +/// for event in events_to_resend { +/// events.send(MyEvent); +/// } +/// } +/// +/// # bevy_ecs::system::assert_is_system(send_and_receive_manual_event_reader); +/// ``` +#[derive(Debug)] +pub struct ManualEventMutator { + pub(super) last_event_count: usize, + pub(super) _marker: PhantomData, +} + +impl Default for ManualEventMutator { + fn default() -> Self { + ManualEventMutator { + last_event_count: 0, + _marker: Default::default(), + } + } +} + +impl Clone for ManualEventMutator { + fn clone(&self) -> Self { + ManualEventMutator { + last_event_count: self.last_event_count, + _marker: PhantomData, + } + } +} + +#[allow(clippy::len_without_is_empty)] // Check fails since the is_empty implementation has a signature other than `(&self) -> bool` +impl ManualEventMutator { + /// See [`EventReader::read`] + pub fn read<'a>(&'a mut self, events: &'a mut Events) -> EventMutatorIterator<'a, E> { + self.read_with_id(events).without_id() + } + + /// See [`EventReader::read_with_id`] + pub fn read_with_id<'a>( + &'a mut self, + events: &'a mut Events, + ) -> EventMutatorIteratorWithId<'a, E> { + EventMutatorIteratorWithId::new(self, events) + } + + /// See [`EventReader::par_read`] + pub fn par_read<'a>(&'a mut self, events: &'a mut Events) -> EventMutatorParIter<'a, E> { + EventMutatorParIter::new(self, events) + } + + /// See [`EventReader::len`] + pub fn len(&self, events: &Events) -> usize { + // The number of events in this reader is the difference between the most recent event + // and the last event seen by it. This will be at most the number of events contained + // with the events (any others have already been dropped) + // TODO: Warn when there are dropped events, or return e.g. a `Result` + events + .event_count + .saturating_sub(self.last_event_count) + .min(events.len()) + } + + /// Amount of events we missed. + pub fn missed_events(&self, events: &Events) -> usize { + events + .oldest_event_count() + .saturating_sub(self.last_event_count) + } + + /// See [`EventReader::is_empty()`] + pub fn is_empty(&self, events: &Events) -> bool { + self.len(events) == 0 + } + + /// See [`EventReader::clear()`] + pub fn clear(&mut self, events: &Events) { + self.last_event_count = events.event_count; + } +} diff --git a/crates/bevy_ecs/src/event/reader.rs b/crates/bevy_ecs/src/event/reader.rs index f939a6faa664d..a9cb2a1da694c 100644 --- a/crates/bevy_ecs/src/event/reader.rs +++ b/crates/bevy_ecs/src/event/reader.rs @@ -10,7 +10,7 @@ use std::marker::PhantomData; /// # Concurrency /// /// Unlike [`EventWriter`], systems with `EventReader` param can be executed concurrently -/// (but not concurrently with `EventWriter` systems for the same event type). +/// (but not concurrently with `EventWriter` or `EventMutator` systems for the same event type). #[derive(SystemParam, Debug)] pub struct EventReader<'w, 's, E: Event> { pub(super) reader: Local<'s, ManualEventReader>, diff --git a/crates/bevy_ecs/src/lib.rs b/crates/bevy_ecs/src/lib.rs index e6cabde398153..ef2b3dbefcc4e 100644 --- a/crates/bevy_ecs/src/lib.rs +++ b/crates/bevy_ecs/src/lib.rs @@ -45,7 +45,7 @@ pub mod prelude { change_detection::{DetectChanges, DetectChangesMut, Mut, Ref}, component::Component, entity::{Entity, EntityMapper}, - event::{Event, EventReader, EventWriter, Events}, + event::{Event, EventMutator, EventReader, EventWriter, Events}, query::{Added, AnyOf, Changed, Has, Or, QueryBuilder, QueryState, With, Without}, removal_detection::RemovedComponents, schedule::{ From 7b27f9079da53a63411df012061754b1bbd08fb3 Mon Sep 17 00:00:00 2001 From: Bob Gardner Date: Tue, 11 Jun 2024 16:54:42 -0700 Subject: [PATCH 02/29] Changed tests for par_read to match behavior for now. --- crates/bevy_ecs/src/event/mod.rs | 38 +++++++++----------------------- 1 file changed, 11 insertions(+), 27 deletions(-) diff --git a/crates/bevy_ecs/src/event/mod.rs b/crates/bevy_ecs/src/event/mod.rs index 84c00af8325e7..c4ba68de19985 100644 --- a/crates/bevy_ecs/src/event/mod.rs +++ b/crates/bevy_ecs/src/event/mod.rs @@ -424,7 +424,7 @@ mod tests { #[cfg(feature = "multi_threaded")] #[test] - fn test_events_par_iter() { + fn test_events_par_read() { use std::{collections::HashSet, sync::mpsc}; use crate::prelude::*; @@ -664,32 +664,16 @@ mod tests { let mut schedule = Schedule::default(); - schedule.add_systems( - |mut events: EventMutator, mut second_run: Local| { - events.par_read().for_each(|event| { - event.i += 1; // Mutate every event - }); - - let (tx, rx) = mpsc::channel(); - events.par_read().for_each(|event| { - tx.send(event.i).unwrap(); - }); - drop(tx); - - if *second_run { - let observed: HashSet<_> = rx.into_iter().collect(); - assert!( - observed.is_empty(), - "par_read didn't consume events after first run but should" - ); - } else { - let observed: HashSet<_> = rx.into_iter().collect(); - assert_eq!(observed, HashSet::from_iter(1..102)); - *second_run = true; - }; - }, - ); - schedule.run(&mut world); + schedule.add_systems(|mut events: EventMutator| { + let (tx, rx) = mpsc::channel(); + events.par_read().for_each(|event| { + tx.send(event.i).unwrap(); + }); + drop(tx); + + let observed: HashSet<_> = rx.into_iter().collect(); + assert_eq!(observed, HashSet::from_iter(0..100)); + }); schedule.run(&mut world); } } From 6c496831449f53478a6fc6872ae01d4772ec46df Mon Sep 17 00:00:00 2001 From: Bob Gardner Date: Thu, 13 Jun 2024 14:29:45 -0700 Subject: [PATCH 03/29] Fixed par_read for mutator --- crates/bevy_ecs/.vscode/launch.json | 51 ++++++++++++++++++++++ crates/bevy_ecs/.vscode/settings.json | 5 +++ crates/bevy_ecs/.vscode/tasks.json | 14 ++++++ crates/bevy_ecs/src/event/mod.rs | 40 ++++++++++------- crates/bevy_ecs/src/event/mut_iterators.rs | 17 ++++++-- 5 files changed, 108 insertions(+), 19 deletions(-) create mode 100644 crates/bevy_ecs/.vscode/launch.json create mode 100644 crates/bevy_ecs/.vscode/settings.json create mode 100644 crates/bevy_ecs/.vscode/tasks.json diff --git a/crates/bevy_ecs/.vscode/launch.json b/crates/bevy_ecs/.vscode/launch.json new file mode 100644 index 0000000000000..12b2377d20ea9 --- /dev/null +++ b/crates/bevy_ecs/.vscode/launch.json @@ -0,0 +1,51 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "lldb", + "request": "launch", + "name": "(MacOS) Launch", + "cargo": { + "args": [ + "build", + "--bin=zombies", + "--package=zombies" + ], + "filter": { + "name": "zombies", + "kind": "bin" + } + }, + "args": [], + "cwd": "${workspaceFolder}", + "env": { + "CARGO_MANIFEST_DIR": "${workspaceFolder}", + "DYLD_LIBRARY_PATH": "${workspaceFolder}/target/debug/deps:${env:HOME}/.rustup/toolchains/nightly-aarch64-apple-darwin/lib", + } + }, + { + "name": "(Windows) Launch", + "type": "cppvsdbg", + "request": "launch", + "program": "${workspaceFolder}/target/debug/zombies.exe", + "args": [], + "stopAtEntry": false, + "cwd": "${workspaceFolder}", + "environment": [ + { + "name": "CARGO_MANIFEST_DIR", + "value": "${workspaceFolder}" + }, + { + "name": "PATH", + "value": "%USERPROFILE%/scoop/persist/rustup-msvc/.rustup/toolchains/nightly-x86_64-pc-windows-msvc/bin;${workspaceFolder}/target/debug/deps;%PATH%" + } + ], + "console": "integratedTerminal", + "preLaunchTask": "rust: cargo build", + }, + ] +} \ No newline at end of file diff --git a/crates/bevy_ecs/.vscode/settings.json b/crates/bevy_ecs/.vscode/settings.json new file mode 100644 index 0000000000000..acde288cdc29e --- /dev/null +++ b/crates/bevy_ecs/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "rust-analyzer.cargo.targetDir": true, + "rust-analyzer.diagnostics.styleLints.enable": true, + "rust-analyzer.imports.preferPrelude": true, +} \ No newline at end of file diff --git a/crates/bevy_ecs/.vscode/tasks.json b/crates/bevy_ecs/.vscode/tasks.json new file mode 100644 index 0000000000000..f721f5acc1cd1 --- /dev/null +++ b/crates/bevy_ecs/.vscode/tasks.json @@ -0,0 +1,14 @@ +{ + "version": "2.0.0", + "tasks": [ + { + "type": "cargo", + "command": "build", + "problemMatcher": [ + "$rustc" + ], + "group": "build", + "label": "rust: cargo build" + } + ] +} \ No newline at end of file diff --git a/crates/bevy_ecs/src/event/mod.rs b/crates/bevy_ecs/src/event/mod.rs index c4ba68de19985..ff712650df882 100644 --- a/crates/bevy_ecs/src/event/mod.rs +++ b/crates/bevy_ecs/src/event/mod.rs @@ -652,28 +652,38 @@ mod tests { #[cfg(feature = "multi_threaded")] #[test] fn test_events_mutator_par_read() { - use std::{collections::HashSet, sync::mpsc}; - use crate::prelude::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + + #[derive(Resource)] + struct Counter(AtomicUsize); let mut world = World::new(); world.init_resource::>(); - for i in 0..100 { - world.send_event(TestEvent { i }); + for _ in 0..100 { + world.send_event(TestEvent { i: 1 }); } - let mut schedule = Schedule::default(); + schedule.add_systems( + |mut events: EventMutator, counter: ResMut| { + events.par_read().for_each(|event| { + event.i += 1; + counter.0.fetch_add(event.i, Ordering::Relaxed); + }); + }, + ); + world.insert_resource(Counter(AtomicUsize::new(0))); + schedule.run(&mut world); + let counter = world.remove_resource::().unwrap(); + assert_eq!(counter.0.into_inner(), 200, "Initial run failed"); - schedule.add_systems(|mut events: EventMutator| { - let (tx, rx) = mpsc::channel(); - events.par_read().for_each(|event| { - tx.send(event.i).unwrap(); - }); - drop(tx); - - let observed: HashSet<_> = rx.into_iter().collect(); - assert_eq!(observed, HashSet::from_iter(0..100)); - }); + world.insert_resource(Counter(AtomicUsize::new(0))); schedule.run(&mut world); + let counter = world.remove_resource::().unwrap(); + assert_eq!( + counter.0.into_inner(), + 0, + "par_read should have consumed events but didn't" + ); } } diff --git a/crates/bevy_ecs/src/event/mut_iterators.rs b/crates/bevy_ecs/src/event/mut_iterators.rs index 0f54150c7a8da..ba4452b4cda4f 100644 --- a/crates/bevy_ecs/src/event/mut_iterators.rs +++ b/crates/bevy_ecs/src/event/mut_iterators.rs @@ -141,9 +141,10 @@ impl<'a, E: Event> ExactSizeIterator for EventMutatorIteratorWithId<'a, E> { /// A parallel iterator over `Event`s. #[derive(Debug)] pub struct EventMutatorParIter<'a, E: Event> { - reader: &'a mut ManualEventMutator, + mutator: &'a mut ManualEventMutator, slices: [&'a mut [EventInstance]; 2], batching_strategy: BatchingStrategy, + unread: usize, } impl<'a, E: Event> EventMutatorParIter<'a, E> { @@ -162,9 +163,10 @@ impl<'a, E: Event> EventMutatorParIter<'a, E> { mutator.last_event_count = events.event_count - unread_count; Self { - reader: mutator, + mutator, slices: [a, b], batching_strategy: BatchingStrategy::default(), + unread: unread_count, } } @@ -200,7 +202,10 @@ impl<'a, E: Event> EventMutatorParIter<'a, E> { /// initialized and run from the ECS scheduler, this should never panic. /// /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool - pub fn for_each_with_id) + Send + Sync + Clone>(self, func: FN) { + pub fn for_each_with_id) + Send + Sync + Clone>( + mut self, + func: FN, + ) { #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))] { self.into_iter().for_each(|(e, i)| func(e, i)); @@ -229,6 +234,10 @@ impl<'a, E: Event> EventMutatorParIter<'a, E> { }); } }); + + // At this point we're gaurenteed to have seen all events + self.mutator.last_event_count += self.unread; + self.unread = 0; } } @@ -249,7 +258,7 @@ impl<'a, E: Event> IntoIterator for EventMutatorParIter<'a, E> { fn into_iter(self) -> Self::IntoIter { let EventMutatorParIter { - reader, + mutator: reader, slices: [a, b], .. } = self; From 0013e86551f1b28fbb8c6065cb931893b5cad1c7 Mon Sep 17 00:00:00 2001 From: Bob Gardner Date: Thu, 13 Jun 2024 14:38:04 -0700 Subject: [PATCH 04/29] Delete crates/bevy_ecs/.vscode directory I keep accidentally committing this >_< --- crates/bevy_ecs/.vscode/launch.json | 51 --------------------------- crates/bevy_ecs/.vscode/settings.json | 5 --- crates/bevy_ecs/.vscode/tasks.json | 14 -------- 3 files changed, 70 deletions(-) delete mode 100644 crates/bevy_ecs/.vscode/launch.json delete mode 100644 crates/bevy_ecs/.vscode/settings.json delete mode 100644 crates/bevy_ecs/.vscode/tasks.json diff --git a/crates/bevy_ecs/.vscode/launch.json b/crates/bevy_ecs/.vscode/launch.json deleted file mode 100644 index 12b2377d20ea9..0000000000000 --- a/crates/bevy_ecs/.vscode/launch.json +++ /dev/null @@ -1,51 +0,0 @@ -{ - // Use IntelliSense to learn about possible attributes. - // Hover to view descriptions of existing attributes. - // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 - "version": "0.2.0", - "configurations": [ - { - "type": "lldb", - "request": "launch", - "name": "(MacOS) Launch", - "cargo": { - "args": [ - "build", - "--bin=zombies", - "--package=zombies" - ], - "filter": { - "name": "zombies", - "kind": "bin" - } - }, - "args": [], - "cwd": "${workspaceFolder}", - "env": { - "CARGO_MANIFEST_DIR": "${workspaceFolder}", - "DYLD_LIBRARY_PATH": "${workspaceFolder}/target/debug/deps:${env:HOME}/.rustup/toolchains/nightly-aarch64-apple-darwin/lib", - } - }, - { - "name": "(Windows) Launch", - "type": "cppvsdbg", - "request": "launch", - "program": "${workspaceFolder}/target/debug/zombies.exe", - "args": [], - "stopAtEntry": false, - "cwd": "${workspaceFolder}", - "environment": [ - { - "name": "CARGO_MANIFEST_DIR", - "value": "${workspaceFolder}" - }, - { - "name": "PATH", - "value": "%USERPROFILE%/scoop/persist/rustup-msvc/.rustup/toolchains/nightly-x86_64-pc-windows-msvc/bin;${workspaceFolder}/target/debug/deps;%PATH%" - } - ], - "console": "integratedTerminal", - "preLaunchTask": "rust: cargo build", - }, - ] -} \ No newline at end of file diff --git a/crates/bevy_ecs/.vscode/settings.json b/crates/bevy_ecs/.vscode/settings.json deleted file mode 100644 index acde288cdc29e..0000000000000 --- a/crates/bevy_ecs/.vscode/settings.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "rust-analyzer.cargo.targetDir": true, - "rust-analyzer.diagnostics.styleLints.enable": true, - "rust-analyzer.imports.preferPrelude": true, -} \ No newline at end of file diff --git a/crates/bevy_ecs/.vscode/tasks.json b/crates/bevy_ecs/.vscode/tasks.json deleted file mode 100644 index f721f5acc1cd1..0000000000000 --- a/crates/bevy_ecs/.vscode/tasks.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "version": "2.0.0", - "tasks": [ - { - "type": "cargo", - "command": "build", - "problemMatcher": [ - "$rustc" - ], - "group": "build", - "label": "rust: cargo build" - } - ] -} \ No newline at end of file From c6dcad7f8f38c3caf1d2747315d440968107d07f Mon Sep 17 00:00:00 2001 From: Bob Gardner Date: Thu, 13 Jun 2024 14:55:35 -0700 Subject: [PATCH 05/29] Fix typo --- crates/bevy_ecs/src/event/mut_iterators.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_ecs/src/event/mut_iterators.rs b/crates/bevy_ecs/src/event/mut_iterators.rs index ba4452b4cda4f..c356597323a50 100644 --- a/crates/bevy_ecs/src/event/mut_iterators.rs +++ b/crates/bevy_ecs/src/event/mut_iterators.rs @@ -235,7 +235,7 @@ impl<'a, E: Event> EventMutatorParIter<'a, E> { } }); - // At this point we're gaurenteed to have seen all events + // At this point we're guaranteed to have seen all events self.mutator.last_event_count += self.unread; self.unread = 0; } From 670bfae6dd143e1bfcb9a1ed28ee336b71a5b31a Mon Sep 17 00:00:00 2001 From: Bob Gardner Date: Mon, 17 Jun 2024 13:36:27 -0700 Subject: [PATCH 06/29] Updated event example --- examples/ecs/event.rs | 162 +++++++++++++++++++++++++++++++----------- 1 file changed, 121 insertions(+), 41 deletions(-) diff --git a/examples/ecs/event.rs b/examples/ecs/event.rs index 7d5abd1856629..1b970e8c3cde4 100644 --- a/examples/ecs/event.rs +++ b/examples/ecs/event.rs @@ -1,63 +1,143 @@ -//! This example creates a new event, a system that triggers the event once per second, -//! and a system that prints a message whenever the event is received. +//! This example shows how to send, mutate, and receive, events. As well as showing +//! how to you might want to control system ordering so that events are processed +//! in a specific order. It does this by simulating a damage over time effect that you might +//! find any a game. use bevy::prelude::*; -fn main() { - App::new() - .add_plugins(DefaultPlugins) - .add_event::() - .add_event::() - .init_resource::() - .add_systems(Update, (event_trigger, event_listener, sound_player)) - .run(); +// In order to send or receive events first you must define them +// This event should be sent when something attempts to deal damage to another entity. +#[derive(Event, Debug)] +struct DealDamage { + pub amount: i32, } -#[derive(Event)] -struct MyEvent { - pub message: String, -} +// This event should be sent when an entity receives damage. +#[derive(Event, Debug, Default)] +struct DamageReceived; -#[derive(Event, Default)] -struct PlaySound; +// This event should be sent when an entity blocks damage with armor. +#[derive(Event, Debug, Default)] +struct ArmorBlockedDamage; -#[derive(Resource)] -struct EventTriggerState { - event_timer: Timer, -} +// This resource represents a timer used to determine when to deal damage +// By default it repeats once per second +#[derive(Resource, Deref, DerefMut)] +struct DamageTimer(pub Timer); -impl Default for EventTriggerState { +impl Default for DamageTimer { fn default() -> Self { - EventTriggerState { - event_timer: Timer::from_seconds(1.0, TimerMode::Repeating), - } + DamageTimer(Timer::from_seconds(1.0, TimerMode::Repeating)) } } -// sends MyEvent and PlaySound every second -fn event_trigger( +// Next we define systems that send, mutate, and receive events +// This system reads 'DamageTimer', updates it, then sends a 'DealDamage' event +// if the timer has finished. +// +// Events are sent using an 'EventWriter' by calling 'send' or 'send_default'. +// The 'send_default' method will send the event with the default value if the event +// has a 'Default' implementation. +fn deal_damage_over_time( time: Res