From 53db461430ebfb951898d7b01480fceaa0617fdc Mon Sep 17 00:00:00 2001 From: andriyDev Date: Tue, 2 Dec 2025 22:30:55 -0800 Subject: [PATCH 1/2] Release the asset infos lock before acquiring the file transaction lock. --- crates/bevy_asset/src/processor/mod.rs | 231 ++++++++++++++----------- 1 file changed, 129 insertions(+), 102 deletions(-) diff --git a/crates/bevy_asset/src/processor/mod.rs b/crates/bevy_asset/src/processor/mod.rs index 516f83448be1f..3f8a790ba400a 100644 --- a/crates/bevy_asset/src/processor/mod.rs +++ b/crates/bevy_asset/src/processor/mod.rs @@ -642,15 +642,20 @@ impl AssetProcessor { async fn handle_removed_asset(&self, source: &AssetSource, path: PathBuf) { let asset_path = AssetPath::from(path).with_source(source.id()); debug!("Removing processed {asset_path} because source was removed"); - let mut infos = self.data.processing_state.asset_infos.write().await; - if let Some(info) = infos.get(&asset_path) { - // we must wait for uncontested write access to the asset source to ensure existing readers / writers - // can finish their operations - let _write_lock = info.file_transaction_lock.write(); - self.remove_processed_asset_and_meta(source, asset_path.path()) - .await; - } - infos.remove(&asset_path).await; + let lock = { + // Scope the infos lock so we don't hold up other processing for too long. + let mut infos = self.data.processing_state.asset_infos.write().await; + infos.remove(&asset_path).await + }; + let Some(lock) = lock else { + return; + }; + + // we must wait for uncontested write access to the asset source to ensure existing + // readers/writers can finish their operations + let _write_lock = lock.write(); + self.remove_processed_asset_and_meta(source, asset_path.path()) + .await; } /// Handles a renamed source asset by moving its processed results to the new location and updating in-memory paths + metadata. @@ -662,24 +667,29 @@ impl AssetProcessor { new: PathBuf, new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>, ) { - let mut infos = self.data.processing_state.asset_infos.write().await; let old = AssetPath::from(old).with_source(source.id()); let new = AssetPath::from(new).with_source(source.id()); let processed_writer = source.processed_writer().unwrap(); - if let Some(info) = infos.get(&old) { - // we must wait for uncontested write access to the asset source to ensure existing readers / writers - // can finish their operations - let _write_lock = info.file_transaction_lock.write(); - processed_writer - .rename(old.path(), new.path()) - .await - .unwrap(); - processed_writer - .rename_meta(old.path(), new.path()) - .await - .unwrap(); - } - infos.rename(&old, &new, new_task_sender).await; + let result = { + // Scope the infos lock so we don't hold up other processing for too long. + let mut infos = self.data.processing_state.asset_infos.write().await; + infos.rename(&old, &new, new_task_sender).await + }; + let Some((old_lock, new_lock)) = result else { + return; + }; + // we must wait for uncontested write access to both assets to ensure existing + // readers/writers can finish their operations + let _old_write_lock = old_lock.write(); + let _new_write_lock = new_lock.write(); + processed_writer + .rename(old.path(), new.path()) + .await + .unwrap(); + processed_writer + .rename_meta(old.path(), new.path()) + .await + .unwrap(); } async fn queue_processing_tasks_for_folder( @@ -1069,9 +1079,15 @@ impl AssetProcessor { // Note: this lock must remain alive until all processed asset and meta writes have finished (or failed) // See ProcessedAssetInfo::file_transaction_lock docs for more info let _transaction_lock = { - let mut infos = self.data.processing_state.asset_infos.write().await; - let info = infos.get_or_insert(asset_path.clone()); - info.file_transaction_lock.write_arc().await + let lock = { + let mut infos = self.data.processing_state.asset_infos.write().await; + let info = infos.get_or_insert(asset_path.clone()); + // Clone out the transaction lock first and then lock after we've dropped the + // asset_infos. Otherwise, trying to lock a single path can block all other paths to + // (leading to deadlocks). + info.file_transaction_lock.clone() + }; + lock.write_arc().await }; // NOTE: if processing the asset fails this will produce an "unfinished" log entry, forcing a rebuild on next run. @@ -1317,11 +1333,17 @@ impl ProcessingState { &self, path: &AssetPath<'static>, ) -> Result, AssetReaderError> { - let infos = self.asset_infos.read().await; - let info = infos - .get(path) - .ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?; - Ok(info.file_transaction_lock.read_arc().await) + let lock = { + let infos = self.asset_infos.read().await; + let info = infos + .get(path) + .ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?; + // Clone out the transaction lock first and then lock after we've dropped the + // asset_infos. Otherwise, trying to lock a single path can block all other paths to + // (leading to deadlocks). + info.file_transaction_lock.clone() + }; + Ok(lock.read_arc().await) } /// Returns a future that will not finish until the path has been processed. @@ -1603,95 +1625,100 @@ impl ProcessorAssetInfos { } } - /// Remove the info for the given path. This should only happen if an asset's source is removed / non-existent - async fn remove(&mut self, asset_path: &AssetPath<'static>) { - let info = self.infos.remove(asset_path); - if let Some(info) = info { - if let Some(processed_info) = info.processed_info { - self.clear_dependencies(asset_path, processed_info); - } - // Tell all listeners this asset does not exist - info.status_sender - .broadcast(ProcessStatus::NonExistent) - .await - .unwrap(); - if !info.dependents.is_empty() { - error!( + /// Remove the info for the given path. This should only happen if an asset's source is + /// removed/non-existent. Returns the transaction lock for the asset. + async fn remove( + &mut self, + asset_path: &AssetPath<'static>, + ) -> Option>> { + let info = self.infos.remove(asset_path)?; + if let Some(processed_info) = info.processed_info { + self.clear_dependencies(asset_path, processed_info); + } + // Tell all listeners this asset does not exist + info.status_sender + .broadcast(ProcessStatus::NonExistent) + .await + .unwrap(); + if !info.dependents.is_empty() { + error!( "The asset at {asset_path} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}", info.dependents ); - self.non_existent_dependents - .insert(asset_path.clone(), info.dependents); - } + self.non_existent_dependents + .insert(asset_path.clone(), info.dependents); } + + Some(info.file_transaction_lock) } - /// Remove the info for the given path. This should only happen if an asset's source is removed / non-existent + /// Remove the info for the given path. This should only happen if an asset's source is + /// removed/non-existent. Returns the transaction locks for the old and new assets respectively. async fn rename( &mut self, old: &AssetPath<'static>, new: &AssetPath<'static>, new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>, - ) { - let info = self.infos.remove(old); - if let Some(mut info) = info { - if !info.dependents.is_empty() { - // TODO: We can't currently ensure "moved" folders with relative paths aren't broken because AssetPath - // doesn't distinguish between absolute and relative paths. We have "erased" relativeness. In the short term, - // we could do "remove everything in a folder and re-add", but that requires full rebuilds / destroying the cache. - // If processors / loaders could enumerate dependencies, we could check if the new deps line up with a rename. - // If deps encoded "relativeness" as part of loading, that would also work (this seems like the right call). - // TODO: it would be nice to log an error here for dependents that aren't also being moved + fixed. - // (see the remove impl). - error!( + ) -> Option<(Arc>, Arc>)> { + let mut info = self.infos.remove(old)?; + if !info.dependents.is_empty() { + // TODO: We can't currently ensure "moved" folders with relative paths aren't broken because AssetPath + // doesn't distinguish between absolute and relative paths. We have "erased" relativeness. In the short term, + // we could do "remove everything in a folder and re-add", but that requires full rebuilds / destroying the cache. + // If processors / loaders could enumerate dependencies, we could check if the new deps line up with a rename. + // If deps encoded "relativeness" as part of loading, that would also work (this seems like the right call). + // TODO: it would be nice to log an error here for dependents that aren't also being moved + fixed. + // (see the remove impl). + error!( "The asset at {old} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}", info.dependents ); - self.non_existent_dependents - .insert(old.clone(), core::mem::take(&mut info.dependents)); - } - if let Some(processed_info) = &info.processed_info { - // Update "dependent" lists for this asset's "process dependencies" to use new path. - for dep in &processed_info.process_dependencies { - if let Some(info) = self.infos.get_mut(&dep.path) { - info.dependents.remove(old); - info.dependents.insert(new.clone()); - } else if let Some(dependents) = self.non_existent_dependents.get_mut(&dep.path) - { - dependents.remove(old); - dependents.insert(new.clone()); - } + self.non_existent_dependents + .insert(old.clone(), core::mem::take(&mut info.dependents)); + } + if let Some(processed_info) = &info.processed_info { + // Update "dependent" lists for this asset's "process dependencies" to use new path. + for dep in &processed_info.process_dependencies { + if let Some(info) = self.infos.get_mut(&dep.path) { + info.dependents.remove(old); + info.dependents.insert(new.clone()); + } else if let Some(dependents) = self.non_existent_dependents.get_mut(&dep.path) { + dependents.remove(old); + dependents.insert(new.clone()); } } - // Tell all listeners this asset no longer exists - info.status_sender - .broadcast(ProcessStatus::NonExistent) - .await - .unwrap(); - let dependents: Vec> = { - let new_info = self.get_or_insert(new.clone()); - new_info.processed_info = info.processed_info; - new_info.status = info.status; - // Ensure things waiting on the new path are informed of the status of this asset - if let Some(status) = new_info.status { - new_info.status_sender.broadcast(status).await.unwrap(); - } - new_info.dependents.iter().cloned().collect() - }; - // Queue the asset for a reprocess check, in case it needs new meta. + } + // Tell all listeners this asset no longer exists + info.status_sender + .broadcast(ProcessStatus::NonExistent) + .await + .unwrap(); + let new_info = self.get_or_insert(new.clone()); + new_info.processed_info = info.processed_info; + new_info.status = info.status; + // Ensure things waiting on the new path are informed of the status of this asset + if let Some(status) = new_info.status { + new_info.status_sender.broadcast(status).await.unwrap(); + } + let dependents = new_info.dependents.iter().cloned().collect::>(); + // Queue the asset for a reprocess check, in case it needs new meta. + let _ = new_task_sender + .send((new.source().clone_owned(), new.path().to_owned())) + .await; + for dependent in dependents { + // Queue dependents for reprocessing because they might have been waiting for this asset. let _ = new_task_sender - .send((new.source().clone_owned(), new.path().to_owned())) + .send(( + dependent.source().clone_owned(), + dependent.path().to_owned(), + )) .await; - for dependent in dependents { - // Queue dependents for reprocessing because they might have been waiting for this asset. - let _ = new_task_sender - .send(( - dependent.source().clone_owned(), - dependent.path().to_owned(), - )) - .await; - } } + + Some(( + info.file_transaction_lock, + new_info.file_transaction_lock.clone(), + )) } fn clear_dependencies(&mut self, asset_path: &AssetPath<'static>, removed_info: ProcessedInfo) { From 2245d41591c0d00899a9083833163ee6aa72f32f Mon Sep 17 00:00:00 2001 From: andriyDev Date: Wed, 3 Dec 2025 17:03:56 -0800 Subject: [PATCH 2/2] Improve the doc comments on some functions. --- crates/bevy_asset/src/processor/mod.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/bevy_asset/src/processor/mod.rs b/crates/bevy_asset/src/processor/mod.rs index 3f8a790ba400a..8e58b120fb934 100644 --- a/crates/bevy_asset/src/processor/mod.rs +++ b/crates/bevy_asset/src/processor/mod.rs @@ -1626,7 +1626,8 @@ impl ProcessorAssetInfos { } /// Remove the info for the given path. This should only happen if an asset's source is - /// removed/non-existent. Returns the transaction lock for the asset. + /// removed/non-existent. Returns the transaction lock for the asset, or [`None`] if the asset + /// path does not exist. async fn remove( &mut self, asset_path: &AssetPath<'static>, @@ -1652,8 +1653,9 @@ impl ProcessorAssetInfos { Some(info.file_transaction_lock) } - /// Remove the info for the given path. This should only happen if an asset's source is - /// removed/non-existent. Returns the transaction locks for the old and new assets respectively. + /// Remove the info for the old path, and move over its info to the new path. This should only + /// happen if an asset's source is removed/non-existent. Returns the transaction locks for the + /// old and new assets respectively, or [`None`] if the old path does not exist. async fn rename( &mut self, old: &AssetPath<'static>,