From d001e039846bf2b52aad7c264ad1ce89a63f9ab9 Mon Sep 17 00:00:00 2001 From: andriyDev Date: Fri, 31 Oct 2025 16:37:48 -0700 Subject: [PATCH] Fix asset processing tests being flaky from hot reloading. --- crates/bevy_asset/src/processor/tests.rs | 148 ++++++++++++++++++----- 1 file changed, 115 insertions(+), 33 deletions(-) diff --git a/crates/bevy_asset/src/processor/tests.rs b/crates/bevy_asset/src/processor/tests.rs index 133fde43ee3fd..6097ac8a26a12 100644 --- a/crates/bevy_asset/src/processor/tests.rs +++ b/crates/bevy_asset/src/processor/tests.rs @@ -6,6 +6,7 @@ use alloc::{ vec, vec::Vec, }; +use async_lock::{RwLock, RwLockWriteGuard}; use bevy_platform::{ collections::HashMap, sync::{Mutex, PoisonError}, @@ -24,7 +25,8 @@ use bevy_tasks::BoxedFuture; use crate::{ io::{ memory::{Dir, MemoryAssetReader, MemoryAssetWriter}, - AssetSource, AssetSourceEvent, AssetSourceId, AssetWatcher, Reader, + AssetReader, AssetReaderError, AssetSource, AssetSourceEvent, AssetSourceId, AssetWatcher, + PathStream, Reader, }, processor::{ AssetProcessor, LoadTransformAndSave, LogEntry, ProcessorState, ProcessorTransactionLog, @@ -45,12 +47,55 @@ struct ProcessingDirs { struct AppWithProcessor { app: App, + source_gate: Arc>, default_source_dirs: ProcessingDirs, extra_sources_dirs: HashMap, } +/// Similar to [`crate::io::gated::GatedReader`], but uses a lock instead of a channel to avoid +/// needing to send the "correct" number of messages. +#[derive(Clone)] +struct LockGatedReader { + reader: R, + gate: Arc>, +} + +impl LockGatedReader { + /// Creates a new [`GatedReader`], which wraps the given `reader`. Also returns a [`GateOpener`] which + /// can be used to open "path gates" for this [`GatedReader`]. + fn new(gate: Arc>, reader: R) -> Self { + Self { gate, reader } + } +} + +impl AssetReader for LockGatedReader { + async fn read<'a>(&'a self, path: &'a Path) -> Result { + let _guard = self.gate.read().await; + self.reader.read(path).await + } + + async fn read_meta<'a>(&'a self, path: &'a Path) -> Result { + let _guard = self.gate.read().await; + self.reader.read_meta(path).await + } + + async fn read_directory<'a>( + &'a self, + path: &'a Path, + ) -> Result, AssetReaderError> { + let _guard = self.gate.read().await; + self.reader.read_directory(path).await + } + + async fn is_directory<'a>(&'a self, path: &'a Path) -> Result { + let _guard = self.gate.read().await; + self.reader.is_directory(path).await + } +} + fn create_app_with_asset_processor(extra_sources: &[String]) -> AppWithProcessor { let mut app = App::new(); + let source_gate = Arc::new(RwLock::new(())); struct UnfinishedProcessingDirs { source: Dir, @@ -72,13 +117,20 @@ fn create_app_with_asset_processor(extra_sources: &[String]) -> AppWithProcessor } } - fn create_source(app: &mut App, source_id: AssetSourceId<'static>) -> UnfinishedProcessingDirs { + fn create_source( + app: &mut App, + source_id: AssetSourceId<'static>, + source_gate: Arc>, + ) -> UnfinishedProcessingDirs { let source_dir = Dir::default(); let processed_dir = Dir::default(); - let source_memory_reader = MemoryAssetReader { - root: source_dir.clone(), - }; + let source_memory_reader = LockGatedReader::new( + source_gate, + MemoryAssetReader { + root: source_dir.clone(), + }, + ); let processed_memory_reader = MemoryAssetReader { root: processed_dir.clone(), }; @@ -111,14 +163,18 @@ fn create_app_with_asset_processor(extra_sources: &[String]) -> AppWithProcessor } } - let default_source_dirs = create_source(&mut app, AssetSourceId::Default); + let default_source_dirs = create_source(&mut app, AssetSourceId::Default, source_gate.clone()); let extra_sources_dirs = extra_sources .iter() .map(|source_name| { ( source_name.clone(), - create_source(&mut app, AssetSourceId::Name(source_name.clone().into())), + create_source( + &mut app, + AssetSourceId::Name(source_name.clone().into()), + source_gate.clone(), + ), ) }) .collect::>(); @@ -183,6 +239,7 @@ fn create_app_with_asset_processor(extra_sources: &[String]) -> AppWithProcessor AppWithProcessor { app, + source_gate, default_source_dirs: default_source_dirs.finish(), extra_sources_dirs: extra_sources_dirs .into_iter() @@ -191,21 +248,18 @@ fn create_app_with_asset_processor(extra_sources: &[String]) -> AppWithProcessor } } -fn run_app_until_finished_processing(app: &mut App) { - // If the original source changes through an AssetSourceEvent, we'll be racing (on - // multithreaded) between this and processor thread switching the state to `Processing`. So do a - // fixed number of iterations so the processor thread is likely to win. - for _ in 0..5 { - app.update(); - } - run_app_until(app, |world| { - if bevy_tasks::block_on(world.resource::().get_state()) - == ProcessorState::Finished - { - Some(()) - } else { - None - } +fn run_app_until_finished_processing(app: &mut App, guard: RwLockWriteGuard<'_, ()>) { + let processor = app.world().resource::().clone(); + // We can't just wait for the processor state to be finished since we could have already + // finished before, but now that something has changed, we may not have restarted processing + // yet. So wait for processing to start, then finish. + run_app_until(app, |_| { + let state = bevy_tasks::block_on(processor.get_state()); + (state == ProcessorState::Processing || state == ProcessorState::Initializing).then_some(()) + }); + drop(guard); + run_app_until(app, |_| { + (bevy_tasks::block_on(processor.get_state()) == ProcessorState::Finished).then_some(()) }); } @@ -299,6 +353,7 @@ fn no_meta_or_default_processor_copies_asset() { let AppWithProcessor { mut app, + source_gate, default_source_dirs: ProcessingDirs { source: source_dir, @@ -308,6 +363,8 @@ fn no_meta_or_default_processor_copies_asset() { .. } = create_app_with_asset_processor(&[]); + let guard = source_gate.write_blocking(); + let path = Path::new("abc.cool.ron"); let source_asset = r#"( text: "abc", @@ -318,7 +375,7 @@ fn no_meta_or_default_processor_copies_asset() { source_dir.insert_asset_text(path, source_asset); - run_app_until_finished_processing(&mut app); + run_app_until_finished_processing(&mut app, guard); let processed_asset = processed_dir.get_asset(path).unwrap(); let processed_asset = str::from_utf8(processed_asset.value()).unwrap(); @@ -329,6 +386,7 @@ fn no_meta_or_default_processor_copies_asset() { fn asset_processor_transforms_asset_default_processor() { let AppWithProcessor { mut app, + source_gate, default_source_dirs: ProcessingDirs { source: source_dir, @@ -350,6 +408,8 @@ fn asset_processor_transforms_asset_default_processor() { )) .set_default_asset_processor::("cool.ron"); + let guard = source_gate.write_blocking(); + let path = Path::new("abc.cool.ron"); source_dir.insert_asset_text( path, @@ -361,7 +421,7 @@ fn asset_processor_transforms_asset_default_processor() { )"#, ); - run_app_until_finished_processing(&mut app); + run_app_until_finished_processing(&mut app, guard); let processed_asset = processed_dir.get_asset(path).unwrap(); let processed_asset = str::from_utf8(processed_asset.value()).unwrap(); @@ -380,6 +440,7 @@ fn asset_processor_transforms_asset_default_processor() { fn asset_processor_transforms_asset_with_meta() { let AppWithProcessor { mut app, + source_gate, default_source_dirs: ProcessingDirs { source: source_dir, @@ -400,6 +461,8 @@ fn asset_processor_transforms_asset_with_meta() { CoolTextSaver, )); + let guard = source_gate.write_blocking(); + let path = Path::new("abc.cool.ron"); source_dir.insert_asset_text( path, @@ -422,7 +485,7 @@ fn asset_processor_transforms_asset_with_meta() { ), )"#); - run_app_until_finished_processing(&mut app); + run_app_until_finished_processing(&mut app, guard); let processed_asset = processed_dir.get_asset(path).unwrap(); let processed_asset = str::from_utf8(processed_asset.value()).unwrap(); @@ -574,6 +637,7 @@ fn asset_processor_loading_can_read_processed_assets() { let AppWithProcessor { mut app, + source_gate, default_source_dirs: ProcessingDirs { source: source_dir, @@ -599,6 +663,8 @@ fn asset_processor_loading_can_read_processed_assets() { .set_default_asset_processor::("gltf") .set_default_asset_processor::("bsn"); + let guard = source_gate.write_blocking(); + let gltf_path = Path::new("abc.gltf"); source_dir.insert_asset_text( gltf_path, @@ -623,7 +689,7 @@ fn asset_processor_loading_can_read_processed_assets() { )"#, ); - run_app_until_finished_processing(&mut app); + run_app_until_finished_processing(&mut app, guard); let processed_bsn = processed_dir.get_asset(bsn_path).unwrap(); let processed_bsn = str::from_utf8(processed_bsn.value()).unwrap(); @@ -645,6 +711,7 @@ fn asset_processor_loading_can_read_processed_assets() { fn asset_processor_loading_can_read_source_assets() { let AppWithProcessor { mut app, + source_gate, default_source_dirs: ProcessingDirs { source: source_dir, @@ -754,6 +821,8 @@ fn asset_processor_loading_can_read_source_assets() { .set_default_asset_processor::("gltf") .set_default_asset_processor::("gltfx"); + let guard = source_gate.write_blocking(); + let gltf_path_1 = Path::new("abc.gltf"); source_dir.insert_asset_text( gltf_path_1, @@ -783,7 +852,7 @@ fn asset_processor_loading_can_read_source_assets() { )"#, ); - run_app_until_finished_processing(&mut app); + run_app_until_finished_processing(&mut app, guard); // Sanity check that the two gltf files were actually processed. let processed_gltf_1 = processed_dir.get_asset(gltf_path_1).unwrap(); @@ -838,6 +907,7 @@ fn asset_processor_loading_can_read_source_assets() { fn asset_processor_processes_all_sources() { let AppWithProcessor { mut app, + source_gate, default_source_dirs: ProcessingDirs { source: default_source_dir, @@ -871,6 +941,8 @@ fn asset_processor_processes_all_sources() { )) .set_default_asset_processor::("cool.ron"); + let guard = source_gate.write_blocking(); + // All the assets will have the same path, but they will still be separately processed since // they are in different sources. let path = Path::new("asset.cool.ron"); @@ -887,7 +959,7 @@ fn asset_processor_processes_all_sources() { custom_1_source_dir.insert_asset_text(path, &serialize_as_cool_text("custom 1 asset")); custom_2_source_dir.insert_asset_text(path, &serialize_as_cool_text("custom 2 asset")); - run_app_until_finished_processing(&mut app); + run_app_until_finished_processing(&mut app, guard); // Check that all the assets are processed. assert_eq!( @@ -903,13 +975,15 @@ fn asset_processor_processes_all_sources() { serialize_as_cool_text("custom 2 asset processed") ); + let guard = source_gate.write_blocking(); + // Update the default source asset and notify the watcher. default_source_dir.insert_asset_text(path, &serialize_as_cool_text("default asset changed")); default_source_events .send_blocking(AssetSourceEvent::ModifiedAsset(path.to_path_buf())) .unwrap(); - run_app_until_finished_processing(&mut app); + run_app_until_finished_processing(&mut app, guard); // Check that all the assets are processed again. assert_eq!( @@ -925,6 +999,8 @@ fn asset_processor_processes_all_sources() { serialize_as_cool_text("custom 2 asset processed") ); + let guard = source_gate.write_blocking(); + // Update the custom source assets and notify the watchers. custom_1_source_dir.insert_asset_text(path, &serialize_as_cool_text("custom 1 asset changed")); custom_2_source_dir.insert_asset_text(path, &serialize_as_cool_text("custom 2 asset changed")); @@ -935,7 +1011,7 @@ fn asset_processor_processes_all_sources() { .send_blocking(AssetSourceEvent::ModifiedAsset(path.to_path_buf())) .unwrap(); - run_app_until_finished_processing(&mut app); + run_app_until_finished_processing(&mut app, guard); // Check that all the assets are processed again. assert_eq!( @@ -956,6 +1032,7 @@ fn asset_processor_processes_all_sources() { fn nested_loads_of_processed_asset_reprocesses_on_reload() { let AppWithProcessor { mut app, + source_gate, default_source_dirs: ProcessingDirs { source: default_source_dir, @@ -1061,6 +1138,8 @@ fn nested_loads_of_processed_asset_reprocesses_on_reload() { )) .set_default_asset_processor::("nest"); + let guard = source_gate.write_blocking(); + // This test also checks that processing of nested assets can occur across asset sources. custom_source_dir.insert_asset_text( Path::new("top.nest"), @@ -1077,7 +1156,7 @@ fn nested_loads_of_processed_asset_reprocesses_on_reload() { &serialize_as_leaf("unrelated".into()), ); - run_app_until_finished_processing(&mut app); + run_app_until_finished_processing(&mut app, guard); // The initial processing step should have processed all assets. assert_eq!( @@ -1106,6 +1185,7 @@ fn nested_loads_of_processed_asset_reprocesses_on_reload() { // Now we will only send a single source event, but that should still result in all related // assets being reprocessed. + let guard = source_gate.write_blocking(); custom_source_dir.insert_asset_text( Path::new("bottom.nest"), @@ -1115,7 +1195,7 @@ fn nested_loads_of_processed_asset_reprocesses_on_reload() { .send_blocking(AssetSourceEvent::ModifiedAsset("bottom.nest".into())) .unwrap(); - run_app_until_finished_processing(&mut app); + run_app_until_finished_processing(&mut app, guard); assert_eq!( read_asset_as_string(&custom_processed_dir, Path::new("bottom.nest")), @@ -1138,11 +1218,13 @@ fn nested_loads_of_processed_asset_reprocesses_on_reload() { // Send a modify event to the middle asset without changing the asset bytes. This should do // **nothing** since neither its dependencies nor its bytes have changed. + let guard = source_gate.write_blocking(); + default_source_events .send_blocking(AssetSourceEvent::ModifiedAsset("middle.nest".into())) .unwrap(); - run_app_until_finished_processing(&mut app); + run_app_until_finished_processing(&mut app, guard); assert_eq!( read_asset_as_string(&custom_processed_dir, Path::new("bottom.nest")),