From 5cfcb486de8260f7076134ac8eba46019a4d3512 Mon Sep 17 00:00:00 2001 From: andriyDev Date: Sat, 22 Nov 2025 16:11:38 -0800 Subject: [PATCH 1/5] Avoid reading the whole asset into memory for asset processing. --- crates/bevy_asset/src/meta.rs | 26 ++++++++--- crates/bevy_asset/src/processor/mod.rs | 53 +++++++++++++--------- crates/bevy_asset/src/processor/process.rs | 22 +++++---- 3 files changed, 65 insertions(+), 36 deletions(-) diff --git a/crates/bevy_asset/src/meta.rs b/crates/bevy_asset/src/meta.rs index 0e972261198cc..f18449ce88573 100644 --- a/crates/bevy_asset/src/meta.rs +++ b/crates/bevy_asset/src/meta.rs @@ -3,10 +3,13 @@ use alloc::{ string::{String, ToString}, vec::Vec, }; +use futures_lite::AsyncReadExt; use crate::{ - loader::AssetLoader, processor::Process, Asset, AssetPath, DeserializeMetaError, - VisitAssetDependencies, + io::{AssetReaderError, Reader}, + loader::AssetLoader, + processor::Process, + Asset, AssetPath, DeserializeMetaError, VisitAssetDependencies, }; use downcast_rs::{impl_downcast, Downcast}; use ron::ser::PrettyConfig; @@ -204,7 +207,7 @@ impl AssetLoader for () { type Error = std::io::Error; async fn load( &self, - _reader: &mut dyn crate::io::Reader, + _reader: &mut dyn Reader, _settings: &Self::Settings, _load_context: &mut crate::LoadContext<'_>, ) -> Result { @@ -241,11 +244,22 @@ pub(crate) fn loader_settings_meta_transform( pub type AssetHash = [u8; 32]; /// NOTE: changing the hashing logic here is a _breaking change_ that requires a [`META_FORMAT_VERSION`] bump. -pub(crate) fn get_asset_hash(meta_bytes: &[u8], asset_bytes: &[u8]) -> AssetHash { +pub(crate) async fn get_asset_hash( + meta_bytes: &[u8], + asset_reader: &mut impl Reader, +) -> Result { let mut hasher = blake3::Hasher::new(); hasher.update(meta_bytes); - hasher.update(asset_bytes); - *hasher.finalize().as_bytes() + let mut buffer = [0; blake3::CHUNK_LEN]; + loop { + let bytes_read = asset_reader.read(&mut buffer).await?; + if bytes_read == 0 { + // This means we've reached EOF, so we're done consume asset bytes. + break; + } + hasher.update(&buffer[..bytes_read]); + } + Ok(*hasher.finalize().as_bytes()) } /// NOTE: changing the hashing logic here is a _breaking change_ that requires a [`META_FORMAT_VERSION`] bump. diff --git a/crates/bevy_asset/src/processor/mod.rs b/crates/bevy_asset/src/processor/mod.rs index 516f83448be1f..1d7c57c58194a 100644 --- a/crates/bevy_asset/src/processor/mod.rs +++ b/crates/bevy_asset/src/processor/mod.rs @@ -64,7 +64,7 @@ use bevy_platform::{ }; use bevy_tasks::IoTaskPool; use futures_io::ErrorKind; -use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; +use futures_lite::{AsyncWriteExt, StreamExt}; use futures_util::{select_biased, FutureExt}; use std::{ path::{Path, PathBuf}, @@ -966,9 +966,6 @@ impl AssetProcessor { err, }; - // Note: we get the asset source reader first because we don't want to create meta files for assets that don't have source files - let mut byte_reader = reader.read(path).await.map_err(reader_err)?; - let (mut source_meta, meta_bytes, processor) = match reader.read_meta_bytes(path).await { Ok(meta_bytes) => { let minimal: AssetMetaMinimal = ron::de::from_bytes(&meta_bytes).map_err(|e| { @@ -1023,19 +1020,14 @@ impl AssetProcessor { let processed_writer = source.processed_writer()?; - let mut asset_bytes = Vec::new(); - byte_reader - .read_to_end(&mut asset_bytes) - .await - .map_err(|e| ProcessError::AssetReaderError { - path: asset_path.clone(), - err: AssetReaderError::Io(e.into()), - })?; - - // PERF: in theory these hashes could be streamed if we want to avoid allocating the whole asset. - // The downside is that reading assets would need to happen twice (once for the hash and once for the asset loader) - // Hard to say which is worse - let new_hash = get_asset_hash(&meta_bytes, &asset_bytes); + let new_hash = { + // Create a reader just for computing the hash. Keep this scoped here so that we drop it + // as soon as the hash is computed. + let mut reader_for_hash = reader.read(path).await.map_err(reader_err)?; + get_asset_hash(&meta_bytes, &mut reader_for_hash) + .await + .map_err(reader_err)? + }; let mut new_processed_info = ProcessedInfo { hash: new_hash, full_hash: new_hash, @@ -1066,6 +1058,16 @@ impl AssetProcessor { } } + // Create a reader just for the actual process. Note: this means that we're performing two + // reads for the same file (but we avoid having to load the whole file into memory). For + // some sources (like local file systems), this is not a big deal, but for other sources + // like an HTTP asset sources, this could be an entire additional download (if the asset + // source doesn't do any caching). In practice, most sources being processed are likely to + // be local, and processing in general is a publish-time operation, so it's not likely to be + // too big a deal. If in the future, we decide we want to avoid this repeated read, we could + // "ask" the asset source if it prefers avoiding repeated reads or not. + let mut reader_for_process = reader.read(path).await.map_err(reader_err)?; + // Note: this lock must remain alive until all processed asset and meta writes have finished (or failed) // See ProcessedAssetInfo::file_transaction_lock docs for more info let _transaction_lock = { @@ -1081,8 +1083,12 @@ impl AssetProcessor { if let Some(processor) = processor { let mut writer = processed_writer.write(path).await.map_err(writer_err)?; let mut processed_meta = { - let mut context = - ProcessContext::new(self, asset_path, &asset_bytes, &mut new_processed_info); + let mut context = ProcessContext::new( + self, + asset_path, + reader_for_process, + &mut new_processed_info, + ); processor .process(&mut context, source_meta, &mut *writer) .await? @@ -1112,10 +1118,13 @@ impl AssetProcessor { .await .map_err(writer_err)?; } else { - processed_writer - .write_bytes(path, &asset_bytes) + let mut writer = processed_writer.write(path).await.map_err(writer_err)?; + futures_lite::io::copy(&mut reader_for_process, &mut writer) .await - .map_err(writer_err)?; + .map_err(|err| ProcessError::AssetWriterError { + path: asset_path.clone_owned(), + err: err.into(), + })?; *source_meta.processed_info_mut() = Some(new_processed_info.clone()); let meta_bytes = source_meta.serialize(); processed_writer diff --git a/crates/bevy_asset/src/processor/process.rs b/crates/bevy_asset/src/processor/process.rs index b37265d0fb660..402acd240fd22 100644 --- a/crates/bevy_asset/src/processor/process.rs +++ b/crates/bevy_asset/src/processor/process.rs @@ -1,7 +1,7 @@ use crate::{ io::{ AssetReaderError, AssetWriterError, MissingAssetWriterError, - MissingProcessedAssetReaderError, MissingProcessedAssetWriterError, SliceReader, Writer, + MissingProcessedAssetReaderError, MissingProcessedAssetWriterError, Reader, Writer, }, meta::{AssetAction, AssetMeta, AssetMetaDyn, ProcessDependencyInfo, ProcessedInfo, Settings}, processor::AssetProcessor, @@ -280,20 +280,20 @@ pub struct ProcessContext<'a> { /// [`AssetServer`]: crate::server::AssetServer processor: &'a AssetProcessor, path: &'a AssetPath<'static>, - asset_bytes: &'a [u8], + reader: Box, } impl<'a> ProcessContext<'a> { pub(crate) fn new( processor: &'a AssetProcessor, path: &'a AssetPath<'static>, - asset_bytes: &'a [u8], + reader: Box, new_processed_info: &'a mut ProcessedInfo, ) -> Self { Self { processor, path, - asset_bytes, + reader, new_processed_info, } } @@ -309,9 +309,15 @@ impl<'a> ProcessContext<'a> { let server = &self.processor.server; let loader_name = core::any::type_name::(); let loader = server.get_asset_loader_with_type_name(loader_name).await?; - let mut reader = SliceReader::new(self.asset_bytes); let loaded_asset = server - .load_with_meta_loader_and_reader(self.path, &meta, &*loader, &mut reader, false, true) + .load_with_meta_loader_and_reader( + self.path, + &meta, + &*loader, + &mut self.reader, + false, + true, + ) .await?; for (path, full_hash) in &loaded_asset.loader_dependencies { self.new_processed_info @@ -332,7 +338,7 @@ impl<'a> ProcessContext<'a> { /// The source bytes of the asset being processed. #[inline] - pub fn asset_bytes(&self) -> &[u8] { - self.asset_bytes + pub fn asset_reader(&mut self) -> &mut dyn Reader { + &mut self.reader } } From 85432e71fb23e271811508d99ff465eea06aa473 Mon Sep 17 00:00:00 2001 From: andriyDev Date: Sat, 22 Nov 2025 17:39:55 -0800 Subject: [PATCH 2/5] Create a migration guide for changes to `ProcessContext`. --- .../migration-guides/process_trait_changes.md | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 release-content/migration-guides/process_trait_changes.md diff --git a/release-content/migration-guides/process_trait_changes.md b/release-content/migration-guides/process_trait_changes.md new file mode 100644 index 0000000000000..7fbceba59b8aa --- /dev/null +++ b/release-content/migration-guides/process_trait_changes.md @@ -0,0 +1,30 @@ +--- +title: Changes to the `Process` trait. +pull_requests: [] +--- + +`ProcessContext` no longer includes `asset_bytes`. This has been replaced by `asset_reader`. To +maintain current behavior in a `Process` implementation, you can read all the bytes into memory. +If previously, you did: + +```rust +// Inside `impl Process for Type` +let bytes = context.asset_bytes(); +// Use bytes here! +``` + +Then now, it should be: + +```rust +// Inside `impl Process for Type` +let reader = context.asset_reader(); +let mut bytes = vec![]; +reader + .read_to_end(&mut bytes) + .await + .map_err(|err| ProcessError::AssetReaderError { + path: context.path().clone_owned(), + err: err.into(), + })?; +// Use bytes here! +``` From 8248adb5b962a9d53b555bf1eec952b63a66ef5c Mon Sep 17 00:00:00 2001 From: andriyDev Date: Tue, 25 Nov 2025 19:30:16 -0800 Subject: [PATCH 3/5] Check for buffer < len to find EOF. --- crates/bevy_asset/src/meta.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/bevy_asset/src/meta.rs b/crates/bevy_asset/src/meta.rs index f18449ce88573..b6b41d36833cb 100644 --- a/crates/bevy_asset/src/meta.rs +++ b/crates/bevy_asset/src/meta.rs @@ -253,11 +253,11 @@ pub(crate) async fn get_asset_hash( let mut buffer = [0; blake3::CHUNK_LEN]; loop { let bytes_read = asset_reader.read(&mut buffer).await?; - if bytes_read == 0 { - // This means we've reached EOF, so we're done consume asset bytes. + hasher.update(&buffer[..bytes_read]); + if bytes_read < buffer.len() { + // This means we've reached EOF, so we're done consuming asset bytes. break; } - hasher.update(&buffer[..bytes_read]); } Ok(*hasher.finalize().as_bytes()) } From 7110da51bf4e35b3a64efbf5e94c9e0d307e1774 Mon Sep 17 00:00:00 2001 From: andriyDev Date: Tue, 25 Nov 2025 19:31:35 -0800 Subject: [PATCH 4/5] Reword migration guide title. --- release-content/migration-guides/process_trait_changes.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/release-content/migration-guides/process_trait_changes.md b/release-content/migration-guides/process_trait_changes.md index 7fbceba59b8aa..0b043a60e0845 100644 --- a/release-content/migration-guides/process_trait_changes.md +++ b/release-content/migration-guides/process_trait_changes.md @@ -1,6 +1,6 @@ --- -title: Changes to the `Process` trait. -pull_requests: [] +title: Changes to the `Process` trait in `bevy_asset`. +pull_requests: [21925] --- `ProcessContext` no longer includes `asset_bytes`. This has been replaced by `asset_reader`. To From 5dd402a8fd1651b6aff9257a6bcd68d9eaf4cbf9 Mon Sep 17 00:00:00 2001 From: andriyDev Date: Sun, 30 Nov 2025 16:14:02 -0800 Subject: [PATCH 5/5] Apply suggestion from @kfc35 Co-authored-by: Kevin Chen --- crates/bevy_asset/src/processor/process.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_asset/src/processor/process.rs b/crates/bevy_asset/src/processor/process.rs index 402acd240fd22..a29b6abcab1a5 100644 --- a/crates/bevy_asset/src/processor/process.rs +++ b/crates/bevy_asset/src/processor/process.rs @@ -336,7 +336,7 @@ impl<'a> ProcessContext<'a> { self.path } - /// The source bytes of the asset being processed. + /// The reader for the asset being processed. #[inline] pub fn asset_reader(&mut self) -> &mut dyn Reader { &mut self.reader