diff --git a/client/src/main.rs b/client/src/main.rs index 1ecebb5..2728763 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -4,7 +4,7 @@ use clap::{Parser, Subcommand}; use common::{ BLOB_KEY, Hash, Header, INDEX_KEY, Mode, ObjectType, TREE_KEY, archive::{Archive, ArchiveBody, ArchiveEntryData, ArchiveHeaderEntry, Compression, HEADER, RawEntryData, SUPPLEMENTAL_HEADER}, - compute_hash, object_body, read_header_and_body, read_header_from_slice, read_object_into_headers, + compute_hash, object_body, read_header_and_body, read_header_from_slice, read_object_into_headers_parallel, store::Store, }; @@ -83,7 +83,7 @@ fn build_tree_from_dir( } // Process files concurrently with bounded parallelism - let semaphore = Arc::new(tokio::sync::Semaphore::new(64)); + let semaphore = Arc::new(tokio::sync::Semaphore::new(256)); let mut file_set = JoinSet::new(); for file_path in file_paths { let store_clone = store.clone(); @@ -96,20 +96,22 @@ fn build_tree_from_dir( .ok_or_else(|| anyhow::anyhow!("invalid file name: {}", file_path.display()))? .to_string(); - let (data, hash) = tokio::task::spawn_blocking({ + // Read file, compute hash, and write to store all in one blocking call + let (hash, size) = tokio::task::spawn_blocking({ let path = file_path.clone(); - move || -> anyhow::Result<(Vec, Hash)> { + move || -> anyhow::Result<(Hash, u64)> { let data = std::fs::read(&path)?; + let size = data.len() as u64; let hash = compute_hash(BLOB_KEY, &data); - Ok((data, hash)) + let header = Header::new(ObjectType::Blob, size); + let header_str = header.to_string(); + let mut buf = Vec::with_capacity(header_str.len() + data.len()); + buf.extend_from_slice(header_str.as_bytes()); + buf.extend(data); + store_clone.put_bytes_direct_blocking(&hash, &buf)?; + Ok((hash, size)) } }).await??; - let size = data.len() as u64; - - if !store_clone.exists(&hash).await? { - let header = Header::new(ObjectType::Blob, size); - store_clone.put_object_bytes(&hash, header, data).await?; - } Ok::<_, anyhow::Error>((name, hash, size)) }); @@ -145,24 +147,20 @@ fn build_tree_from_dir( let tree_data = object_body::Object::to_data(&tree); let hash = compute_hash(TREE_KEY, &tree_data); - if !store.exists(&hash).await? { - let header = Header::new(ObjectType::Tree, tree_data.len() as u64); - store.put_object_bytes(&hash, header, tree_data).await?; - } + let header = Header::new(ObjectType::Tree, tree_data.len() as u64); + store.put_object_bytes(&hash, header, tree_data).await?; Ok((hash, total_size)) }) } -// Helper to read Tree from Store -async fn read_tree_from_store(store: &Store, hash: &Hash) -> anyhow::Result { - let mut store_obj = store.get_object(hash).await?; - anyhow::ensure!(store_obj.header.object_type == ObjectType::Tree, "expected tree object"); - - let mut data = Vec::new(); - store_obj.read_to_end(&mut data).await?; - - object_body::Object::from_data(&data) +// Helper to read Tree from Store using direct blocking I/O +fn read_tree_from_store_blocking(store: &Store, hash: &Hash) -> anyhow::Result { + let raw = store.get_bytes_direct_blocking(hash)?; + let (header, body) = read_header_and_body(&raw) + .ok_or_else(|| anyhow::anyhow!("failed to parse header for tree {}", hash))?; + anyhow::ensure!(header.object_type == ObjectType::Tree, "expected tree object"); + object_body::Object::from_data(body) } async fn commit_directory(store: &Store, path: &Path) -> anyhow::Result<()> { @@ -189,12 +187,13 @@ async fn restore_directory(store: &Store, path: &Path, index_hash: Hash, validat let mut entries = read_dir(path).await?; anyhow::ensure!(entries.next_entry().await?.is_none(), "path must be an empty directory"); - let mut store_obj = store.get_object(&index_hash).await?; - let mut index_data = Vec::new(); - store_obj.read_to_end(&mut index_data).await?; - let index: object_body::Index = object_body::Object::from_data(&index_data)?; + let raw = store.get_bytes_direct_blocking(&index_hash) + .context("index object not found in store")?; + let (_header, index_data) = read_header_and_body(&raw) + .ok_or_else(|| anyhow::anyhow!("failed to parse index object"))?; + let index: object_body::Index = object_body::Object::from_data(index_data)?; - let tree = read_tree_from_store(store, &index.tree).await?; + let tree = read_tree_from_store_blocking(store, &index.tree)?; write_tree_to_path(store, &tree, path).await?; @@ -212,14 +211,13 @@ fn validate_tree_at_path<'a>( Box::pin(async move { for entry in &tree.contents { let entry_path = path.join(&entry.path); - let header = store.get_object(&entry.hash).await?.header; - match header.object_type { - ObjectType::Tree => { - let subtree = read_tree_from_store(store, &entry.hash).await?; + match entry.mode { + Mode::Tree => { + let subtree = read_tree_from_store_blocking(store, &entry.hash)?; validate_tree_at_path(store, &subtree, &entry_path).await?; } - ObjectType::Blob => { + _ => { let expected_hash = entry.hash.clone(); let entry_name = entry.path.clone(); let computed_hash = tokio::task::spawn_blocking({ @@ -231,7 +229,6 @@ fn validate_tree_at_path<'a>( }).await??; anyhow::ensure!(computed_hash == expected_hash, "hash mismatch for {}", entry_name); } - ObjectType::Index => anyhow::bail!("invalid object type in tree"), } } Ok(()) @@ -244,20 +241,19 @@ fn write_tree_to_path<'a>( path: &'a Path, ) -> std::pin::Pin> + Send + 'a>> { Box::pin(async move { - let semaphore = Arc::new(tokio::sync::Semaphore::new(64)); + let semaphore = Arc::new(tokio::sync::Semaphore::new(256)); let mut blob_set = JoinSet::new(); for entry in &tree.contents { let entry_path = path.join(&entry.path); - let header = store.get_object(&entry.hash).await?.header; - match header.object_type { - ObjectType::Tree => { + match entry.mode { + Mode::Tree => { create_dir(&entry_path).await.context("failed to create directory")?; - let subtree = read_tree_from_store(store, &entry.hash).await?; + let subtree = read_tree_from_store_blocking(store, &entry.hash)?; write_tree_to_path(store, &subtree, &entry_path).await?; } - ObjectType::Blob => { + _ => { let store_clone = store.clone(); let hash = entry.hash.clone(); let sem = semaphore.clone(); @@ -265,19 +261,17 @@ fn write_tree_to_path<'a>( let _permit = sem.acquire().await .map_err(|e| anyhow::anyhow!("semaphore: {e}"))?; - let mut store_obj = store_clone.get_object(&hash).await?; - let mut data = Vec::with_capacity(store_obj.header.size as usize); - store_obj.read_to_end(&mut data).await?; - - let path = entry_path; + // Read from store and write to disk in one blocking call tokio::task::spawn_blocking(move || -> anyhow::Result<()> { - std::fs::write(&path, &data) - .context(format!("failed to write file {:?}", path))?; + let raw = store_clone.get_bytes_direct_blocking(&hash)?; + let (_header, body) = read_header_and_body(&raw) + .ok_or_else(|| anyhow::anyhow!("failed to parse object {}", hash))?; + std::fs::write(&entry_path, body) + .context(format!("failed to write file {:?}", entry_path))?; Ok(()) }).await? }); } - ObjectType::Index => anyhow::bail!("invalid object type in tree"), } } @@ -315,12 +309,31 @@ async fn push_cache(store: &Store, url: &str, hash: Option) -> anyhow::Res } let hashes = store.list_hashes().await.context("failed to list objects in store")?; + let total = hashes.len(); + tracing::info!("pushing {} objects", total); + + // Upload concurrently with bounded parallelism + let semaphore = Arc::new(tokio::sync::Semaphore::new(64)); + let mut set = JoinSet::new(); for hash in hashes { - let raw_bytes = store.get_raw_bytes(&hash).await - .context(format!("failed to read object {}", hash))?; - upload_object(&hash, &raw_bytes, url).await?; + let store_clone = store.clone(); + let url = url.to_string(); + let sem = semaphore.clone(); + set.spawn(async move { + let _permit = sem.acquire().await + .map_err(|e| anyhow::anyhow!("semaphore: {e}"))?; + let raw_bytes = store_clone.get_raw_bytes(&hash).await + .context(format!("failed to read object {}", hash))?; + upload_object(&hash, &raw_bytes, &url).await + }); } + + while let Some(result) = set.join_next().await { + result??; + } + + tracing::info!("pushed {} objects", total); Ok(()) } @@ -341,16 +354,40 @@ fn pull_tree<'a>(store: &'a Store, url: &'a str, tree_hash: &'a Hash) -> std::pi let tree_body: object_body::Tree = object_body::Object::from_data(data)?; - for entry in tree_body.contents { - let header = download_object(store, &entry.hash, url).await - .context(format!("unable to download object with hash {}", entry.hash))?; - - anyhow::ensure!(header.object_type != ObjectType::Index, "unexpected index object in tree"); + // Download blob entries concurrently, recurse into tree entries + let semaphore = Arc::new(tokio::sync::Semaphore::new(64)); + let mut blob_set = JoinSet::new(); - if header.object_type == ObjectType::Tree { - pull_tree(store, url, &entry.hash).await?; + for entry in tree_body.contents { + match entry.mode { + Mode::Tree => { + // Trees must be processed sequentially to discover children + let header = download_object(store, &entry.hash, url).await + .context(format!("unable to download object with hash {}", entry.hash))?; + anyhow::ensure!(header.object_type != ObjectType::Index, "unexpected index object in tree"); + pull_tree(store, url, &entry.hash).await?; + } + _ => { + let store_clone = store.clone(); + let url = url.to_string(); + let hash = entry.hash.clone(); + let sem = semaphore.clone(); + blob_set.spawn(async move { + let _permit = sem.acquire().await + .map_err(|e| anyhow::anyhow!("semaphore: {e}"))?; + let header = download_object(&store_clone, &hash, &url).await + .context(format!("unable to download object with hash {}", hash))?; + anyhow::ensure!(header.object_type != ObjectType::Index, "unexpected index object in tree"); + Ok::<_, anyhow::Error>(()) + }); + } } } + + while let Some(result) = blob_set.join_next().await { + result??; + } + Ok(()) }) } @@ -394,14 +431,12 @@ async fn upload_object(hash: &Hash, raw_bytes: &[u8], url: &str) -> anyhow::Resu } async fn download_object(store: &Store, hash: &Hash, url: &str) -> anyhow::Result
{ - // If already in store, return its header - if store.exists(hash).await? { - let mut store_obj = store.get_object(hash).await - .context("object should exist in store")?; - // Consume body to drop the reader - let mut _discard = Vec::new(); - store_obj.read_to_end(&mut _discard).await?; - return Ok(store_obj.header); + // If already in store, return its header without reading the full object + if store.exists_direct_blocking(hash)? { + let raw = store.get_bytes_direct_blocking(hash)?; + let (header, _) = read_header_and_body(&raw) + .ok_or_else(|| anyhow::anyhow!("failed to parse header for object {}", hash))?; + return Ok(header); } let url = format!("{url}/object/{hash}"); @@ -455,16 +490,17 @@ async fn pack_archive(store: &Store, path: &Path, index_hash: &Hash, compression ); let index: object_body::Index = { - let mut store_obj = store.get_object(index_hash).await.context("index object not found in store")?; - anyhow::ensure!(store_obj.header.object_type == ObjectType::Index, "expected index object type"); - let mut body = Vec::new(); - store_obj.read_to_end(&mut body).await?; - object_body::Object::from_data(&body)? + let raw = store.get_bytes_direct_blocking(index_hash) + .context("index object not found in store")?; + let (header, body) = read_header_and_body(&raw) + .ok_or_else(|| anyhow::anyhow!("failed to parse index object"))?; + anyhow::ensure!(header.object_type == ObjectType::Index, "expected index object type"); + object_body::Object::from_data(body)? }; #[allow(clippy::mutable_key_type)] let mut headers: HashMap)> = HashMap::new(); - read_object_into_headers(store, &mut headers, &index.tree).await?; + read_object_into_headers_parallel(store, &mut headers, &index.tree).await?; //TODO: Surely there is an algorithm to more efficiently lay out this data let mut i = 0; @@ -515,17 +551,18 @@ async fn push_archive(store: &Store, url: &str, index_hash: &Hash, compression: tracing::debug!(index = %index_hash, url, "starting push-archive"); // 1. Read index from store let index: object_body::Index = { - let mut store_obj = store.get_object(index_hash).await.context("index object not found in store")?; - anyhow::ensure!(store_obj.header.object_type == ObjectType::Index, "expected index object type"); - let mut body = Vec::new(); - store_obj.read_to_end(&mut body).await?; - object_body::Object::from_data(&body)? + let raw = store.get_bytes_direct_blocking(index_hash) + .context("index object not found in store")?; + let (header, body) = read_header_and_body(&raw) + .ok_or_else(|| anyhow::anyhow!("failed to parse index object"))?; + anyhow::ensure!(header.object_type == ObjectType::Index, "expected index object type"); + object_body::Object::from_data(body)? }; // 2. Collect all objects reachable from the tree #[allow(clippy::mutable_key_type)] let mut headers: HashMap)> = HashMap::new(); - read_object_into_headers(store, &mut headers, &index.tree).await?; + read_object_into_headers_parallel(store, &mut headers, &index.tree).await?; // Also include the index object itself let index_data = object_body::Object::to_data(&index); @@ -652,7 +689,7 @@ async fn unpack_archive(store: &Store, path: &Path) -> anyhow::Result<()> { store.put_object_bytes(&archive.hash, index_header, index_data).await?; } - let semaphore = Arc::new(tokio::sync::Semaphore::new(64)); + let semaphore = Arc::new(tokio::sync::Semaphore::new(256)); let mut set = JoinSet::new(); for (header, entry) in archive.body.header.into_iter().zip(archive.body.entries.into_iter()) { @@ -663,18 +700,13 @@ async fn unpack_archive(store: &Store, path: &Path) -> anyhow::Result<()> { let _permit = sem.acquire().await .map_err(|e| anyhow::anyhow!("semaphore: {e}"))?; - // Skip if already exists - if store_clone.exists(&hash).await? { - return Ok::<_, anyhow::Error>(()); - } - - let raw = entry.turn_into_vec()?; - let (obj_header, body) = read_header_and_body(&raw) - .ok_or_else(|| anyhow::anyhow!("failed to parse header for object {}", hash))?; + // Write raw bytes using direct I/O to avoid OpenDAL fsync overhead + tokio::task::spawn_blocking(move || { + let raw = entry.turn_into_vec()?; + store_clone.put_bytes_direct_blocking(&hash, &raw) + }).await??; - store_clone.put_object_bytes(&hash, obj_header, body.to_vec()).await?; - - Ok(()) + Ok::<_, anyhow::Error>(()) }); } @@ -740,7 +772,7 @@ fn extract_tree<'a>( let tree: object_body::Tree = object_body::Object::from_data(body)?; - let semaphore = Arc::new(tokio::sync::Semaphore::new(64)); + let semaphore = Arc::new(tokio::sync::Semaphore::new(256)); let mut set = JoinSet::new(); let mut file_count: usize = 0; @@ -823,7 +855,7 @@ fn archive_build_tree( } // Process files concurrently with bounded parallelism - let semaphore = Arc::new(tokio::sync::Semaphore::new(64)); + let semaphore = Arc::new(tokio::sync::Semaphore::new(256)); let mut file_set = JoinSet::new(); for file_path in file_paths { let objects_clone = objects.clone(); @@ -1139,8 +1171,7 @@ mod tests { use tempfile::TempDir; fn create_store(dir: &std::path::Path) -> Store { - let builder = opendal::services::Fs::default().root(dir.to_str().unwrap()); - Store::from_builder(builder).unwrap() + Store::from_fs_path(dir).unwrap() } fn collect_files_recursive<'a>( @@ -1290,10 +1321,10 @@ mod tests { let (index, _, _) = build_index_from_path(&store, &source_path).await.unwrap(); // Read root tree and collect file hashes from subtrees - let root_tree = read_tree_from_store(&store, &index.tree).await.unwrap(); + let root_tree = read_tree_from_store_blocking(&store, &index.tree).unwrap(); let mut file_hashes = Vec::new(); for entry in &root_tree.contents { - let subtree = read_tree_from_store(&store, &entry.hash).await.unwrap(); + let subtree = read_tree_from_store_blocking(&store, &entry.hash).unwrap(); for sub_entry in &subtree.contents { if sub_entry.path == "file.txt" { file_hashes.push(sub_entry.hash.clone()); @@ -1613,7 +1644,7 @@ mod tests { let (index, _, _) = build_index_from_path(&store, &source_path).await.unwrap(); // Read back the tree and verify entries are sorted - let tree = read_tree_from_store(&store, &index.tree).await.unwrap(); + let tree = read_tree_from_store_blocking(&store, &index.tree).unwrap(); let names: Vec<&str> = tree.contents.iter().map(|e| e.path.as_str()).collect(); let mut sorted = names.clone(); sorted.sort(); @@ -1630,8 +1661,8 @@ async fn run(cli: Cli) -> anyhow::Result<()> { create_dir_all(&store_path).await.context("failed to create store directory")?; } - // Create store from cache directory - let store = Store::from_builder(opendal::services::Fs::default().root(store_path.to_str().context("store path must be valid UTF-8")?)) + // Create store from cache directory with direct I/O support + let store = Store::from_fs_path(&store_path) .context("failed to create store")?; match cli.command { diff --git a/common/src/archive.rs b/common/src/archive.rs index 5065261..e5cef60 100644 --- a/common/src/archive.rs +++ b/common/src/archive.rs @@ -455,7 +455,7 @@ where ); tracing::trace!(hash = %entry.hash, length = amount, "reading archive entry"); - entries.push(RawEntryData(data.to_vec())); + entries.push(RawEntryData(data)); counter += amount; } diff --git a/common/src/lib.rs b/common/src/lib.rs index dd23962..e26fac6 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -8,8 +8,6 @@ use std::{ use sha2::{Digest, Sha512}; -use futures::AsyncReadExt; - pub use crate::constants::{BLOB_KEY, INDEX_KEY, TREE_KEY}; pub use crate::hash::Hash; pub use crate::header::Header; @@ -78,26 +76,20 @@ pub async fn read_object_into_headers( continue; } - let mut object = store.get_object(¤t_hash).await?; - - if object.header.object_type == ObjectType::Index { - return Err(anyhow::anyhow!("indexes cannot exist within a tree (possible hash collision)")); - } - - let header = object.header; + let raw = store.get_raw_bytes(¤t_hash).await?; + let (header, body) = read_header_and_body(&raw) + .ok_or_else(|| anyhow::anyhow!("failed to parse header for object {}", current_hash))?; - let mut data = Vec::new(); - object.read_to_end(&mut data).await?; + anyhow::ensure!(header.object_type != ObjectType::Index, "indexes cannot exist within a tree (possible hash collision)"); if header.object_type == ObjectType::Tree { - let tree = crate::object_body::Tree::from_data(&data)?; - + let tree = crate::object_body::Tree::from_data(body)?; for entry in &tree.contents { stack.push(entry.hash.clone()); } } - headers.insert(current_hash, (header, data)); + headers.insert(current_hash, (header, body.to_vec())); } tracing::debug!(objects_collected = headers.len(), "object tree walk complete"); @@ -105,6 +97,55 @@ pub async fn read_object_into_headers( Ok(()) } +/// Parallel version that reads all objects using direct blocking I/O. +/// Walks tree structure to discover hashes, then reads all objects in a single blocking task. +pub async fn read_object_into_headers_parallel( + store: &Store, + headers: &mut HashMap)>, + object_hash: &Hash, +) -> anyhow::Result<()> { + tracing::debug!(root_hash = %object_hash, "reading object tree into headers (parallel)"); + + // Read everything in a single blocking task to avoid async overhead + let store_clone = store.clone(); + let root_hash = object_hash.clone(); + let result = tokio::task::spawn_blocking(move || { + let mut headers: HashMap)> = HashMap::new(); + let mut stack = vec![root_hash]; + + while let Some(current_hash) = stack.pop() { + if headers.contains_key(¤t_hash) { + continue; + } + + let raw = store_clone.get_bytes_direct_blocking(¤t_hash)?; + let (header, body) = read_header_and_body(&raw) + .ok_or_else(|| anyhow::anyhow!("failed to parse header for object {}", current_hash))?; + + anyhow::ensure!(header.object_type != ObjectType::Index, "indexes cannot exist within a tree"); + + if header.object_type == ObjectType::Tree { + let tree = crate::object_body::Tree::from_data(body)?; + for entry in &tree.contents { + if !headers.contains_key(&entry.hash) { + stack.push(entry.hash.clone()); + } + } + } + + headers.insert(current_hash, (header, body.to_vec())); + } + + Ok::<_, anyhow::Error>(headers) + }) + .await??; + + *headers = result; + tracing::debug!(objects_collected = headers.len(), "object tree walk complete"); + + Ok(()) +} + pub fn pipe(reader: &mut dyn Read, writer: &mut dyn Write) -> anyhow::Result<()> { let mut buffer: [u8; 65536] = [0; 65536]; loop { diff --git a/common/src/object_body.rs b/common/src/object_body.rs index 3655233..e19fe4b 100644 --- a/common/src/object_body.rs +++ b/common/src/object_body.rs @@ -74,7 +74,9 @@ impl Object for Index { } fn to_data(&self) -> Vec { - let mut data: Vec = Vec::new(); + // Pre-allocate: "tree: " + 128-char hash + "\n" + "timestamp: " + rfc3339 + "\n" + metadata + "\n" + let estimated = 200 + self.metadata.iter().map(|(k, v)| k.len() + v.len() + 4).sum::(); + let mut data: Vec = Vec::with_capacity(estimated); fn write_kv(data: &mut Vec, key: &str, value: &str) -> anyhow::Result<()> { data.write_all(key.as_bytes())?; @@ -111,7 +113,9 @@ pub struct Tree { } impl Object for Tree { fn from_data(data: &[u8]) -> anyhow::Result { - let mut contents = Vec::new(); + // Estimate entry count: ~92 bytes per entry + let estimated_entries = data.len() / 80; + let mut contents = Vec::with_capacity(estimated_entries); let mut index: usize = 0; loop { @@ -152,7 +156,8 @@ impl Object for Tree { } fn to_data(&self) -> Vec { - let mut data: Vec = Vec::new(); + // Each entry: ~6 bytes mode + 1 space + ~20 bytes name + 1 null + 64 bytes hash = ~92 bytes avg + let mut data: Vec = Vec::with_capacity(self.contents.len() * 92); fn write_entry(data: &mut Vec, entry: &TreeEntry) -> anyhow::Result<()> { data.write_all(entry.mode.as_str().as_bytes())?; diff --git a/common/src/primitives.rs b/common/src/primitives.rs index 2609896..23bd880 100644 --- a/common/src/primitives.rs +++ b/common/src/primitives.rs @@ -2,7 +2,7 @@ use std::fmt::Display; use crate::{BLOB_KEY, INDEX_KEY, TREE_KEY}; -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Mode { Tree = 40000, Normal = 100644, diff --git a/common/src/store.rs b/common/src/store.rs index 7a24a53..ccf758b 100644 --- a/common/src/store.rs +++ b/common/src/store.rs @@ -3,6 +3,7 @@ use anyhow::Result; use futures::io::copy; use futures::{AsyncBufRead, AsyncRead, AsyncWriteExt}; use opendal::{Builder, FuturesAsyncReader, Operator}; +use std::path::PathBuf; pub struct StoreObject { pub header: Header, @@ -53,17 +54,31 @@ where #[derive(Clone)] pub struct Store { operator: Operator, + /// Root path for direct filesystem access (bypasses OpenDAL overhead for bulk writes) + root_path: Option, } impl Store { pub fn new(operator: Operator) -> Self { - Self { operator } + Self { operator, root_path: None } } pub fn from_builder(builder: impl Builder) -> Result { Ok(Self::new(Operator::new(builder)?.finish())) } + /// Create a store from a filesystem path with direct I/O support. + /// This enables `put_bytes_direct` for high-performance bulk writes. + pub fn from_fs_path(path: &std::path::Path) -> Result { + let builder = opendal::services::Fs::default() + .root(path.to_str().ok_or_else(|| anyhow::anyhow!("store path must be valid UTF-8"))?); + let operator = Operator::new(builder)?.finish(); + Ok(Self { + operator, + root_path: Some(path.to_path_buf()), + }) + } + pub async fn exists(&self, hash: &Hash) -> Result { let result = self.operator.exists(hash.as_str()).await?; tracing::trace!(hash = %hash, exists = result, "store exists check"); @@ -106,6 +121,49 @@ impl Store { Ok(bytes.to_vec()) } + /// Write raw bytes directly to the store (caller provides complete content). + pub async fn put_raw_bytes(&self, hash: &Hash, data: Vec) -> Result<()> { + self.operator.write(hash.as_str(), data).await?; + Ok(()) + } + + /// Write raw bytes using direct filesystem I/O, bypassing OpenDAL's atomic write/fsync. + /// Safe for content-addressable storage where partial writes are harmless. + pub fn put_bytes_direct_blocking(&self, hash: &Hash, data: &[u8]) -> Result<()> { + if let Some(root) = &self.root_path { + let path = root.join(hash.as_str()); + std::fs::write(&path, data)?; + Ok(()) + } else { + Err(anyhow::anyhow!("direct write not available: store was not created with from_fs_path")) + } + } + + /// Read raw bytes using direct filesystem I/O, bypassing OpenDAL overhead. + pub fn get_bytes_direct_blocking(&self, hash: &Hash) -> Result> { + if let Some(root) = &self.root_path { + let path = root.join(hash.as_str()); + let data = std::fs::read(&path)?; + Ok(data) + } else { + Err(anyhow::anyhow!("direct read not available: store was not created with from_fs_path")) + } + } + + /// Check if an object exists using direct filesystem I/O. + pub fn exists_direct_blocking(&self, hash: &Hash) -> Result { + if let Some(root) = &self.root_path { + Ok(root.join(hash.as_str()).exists()) + } else { + Err(anyhow::anyhow!("direct exists not available: store was not created with from_fs_path")) + } + } + + /// Whether this store supports direct filesystem I/O. + pub fn has_direct_io(&self) -> bool { + self.root_path.is_some() + } + /// List all object hashes in the store. pub async fn list_hashes(&self) -> Result> { use opendal::EntryMode; diff --git a/common/src/tree_walk.rs b/common/src/tree_walk.rs index 3d4586c..9d5d750 100644 --- a/common/src/tree_walk.rs +++ b/common/src/tree_walk.rs @@ -1,6 +1,5 @@ use std::collections::{BTreeMap, HashMap}; -use futures::AsyncReadExt; use serde::Serialize; use crate::object_body::{self, Object as _}; @@ -42,8 +41,9 @@ pub async fn collect_index_metadata( ) -> anyhow::Result { tracing::info!(index_hash = %index_hash, "collecting index metadata"); // 1. Read and parse the Index object - let mut index_obj = store.get_object(index_hash).await?; - let index_header = index_obj.header; + let raw = store.get_raw_bytes(index_hash).await?; + let (index_header, index_body) = crate::read_header_and_body(&raw) + .ok_or_else(|| anyhow::anyhow!("invalid index header"))?; if index_header.object_type != ObjectType::Index { anyhow::bail!( @@ -52,9 +52,7 @@ pub async fn collect_index_metadata( ); } - let mut index_body = Vec::new(); - index_obj.read_to_end(&mut index_body).await?; - let index = object_body::Index::from_data(&index_body)?; + let index = object_body::Index::from_data(index_body)?; let mut objects = vec![ObjectInfo { hash: index_hash.clone(), @@ -85,8 +83,9 @@ async fn walk_tree( objects: &mut Vec, files: &mut Vec, ) -> anyhow::Result<()> { - let mut tree_obj = store.get_object(tree_hash).await?; - let tree_header = tree_obj.header; + let raw = store.get_raw_bytes(tree_hash).await?; + let (tree_header, body) = crate::read_header_and_body(&raw) + .ok_or_else(|| anyhow::anyhow!("invalid tree object header"))?; if tree_header.object_type != ObjectType::Tree { anyhow::bail!( @@ -101,9 +100,10 @@ async fn walk_tree( size: tree_header.size, }); - let mut tree_body = Vec::new(); - tree_obj.read_to_end(&mut tree_body).await?; - let tree = object_body::Tree::from_data(&tree_body)?; + let tree = object_body::Tree::from_data(body)?; + + // Collect blob entries for batch reading + let mut blob_entries: Vec<(Hash, String, Mode)> = Vec::new(); for entry in &tree.contents { let current_path = if prefix.is_empty() { @@ -117,30 +117,45 @@ async fn walk_tree( Box::pin(walk_tree(store, &entry.hash, ¤t_path, objects, files)).await?; } _ => { - // Blob entry — record the object and the file - let blob_header = read_header(store, &entry.hash).await?; - objects.push(ObjectInfo { - hash: entry.hash.clone(), - object_type: ObjectType::Blob, - size: blob_header.size, - }); - files.push(FileInfo { - path: current_path, - hash: entry.hash.clone(), - size: blob_header.size, - mode: entry.mode, - }); + blob_entries.push((entry.hash.clone(), current_path, entry.mode)); } } } - Ok(()) -} + // Batch-read blob headers concurrently + if !blob_entries.is_empty() { + use futures::StreamExt; + let results: Vec> = futures::stream::iter(blob_entries) + .map(|(hash, path, mode)| { + let store = store.clone(); + async move { + let raw = store.get_raw_bytes(&hash).await?; + let (header, _) = crate::read_header_and_body(&raw) + .ok_or_else(|| anyhow::anyhow!("invalid blob header"))?; + Ok((hash, path, mode, header)) + } + }) + .buffer_unordered(256) + .collect() + .await; + + for result in results { + let (hash, path, mode, blob_header) = result?; + objects.push(ObjectInfo { + hash: hash.clone(), + object_type: ObjectType::Blob, + size: blob_header.size, + }); + files.push(FileInfo { + path, + hash, + size: blob_header.size, + mode, + }); + } + } -/// Read just the header of an object (discarding the body). -async fn read_header(store: &Store, hash: &Hash) -> anyhow::Result
{ - let obj = store.get_object(hash).await?; - Ok(obj.header) + Ok(()) } /// Content-addressable metadata: index info + flat hash-keyed object map. @@ -182,25 +197,118 @@ pub enum MetadataObject { } /// Walk an index's tree in the store and build a flat hash-keyed object map. +/// Uses direct I/O in a single blocking task when available for maximum throughput. pub async fn collect_tree_metadata( store: &Store, index_hash: &Hash, ) -> anyhow::Result { tracing::info!(index_hash = %index_hash, "collecting tree metadata"); - let mut index_obj = store.get_object(index_hash).await?; - let index_header = index_obj.header; + if store.has_direct_io() { + // Fast path: do everything in a single blocking task with direct I/O + let store = store.clone(); + let index_hash = index_hash.clone(); + tokio::task::spawn_blocking(move || { + collect_tree_metadata_blocking(&store, &index_hash) + }).await? + } else { + collect_tree_metadata_async(store, index_hash).await + } +} + +/// Blocking implementation using direct filesystem I/O. +#[allow(clippy::mutable_key_type)] +fn collect_tree_metadata_blocking( + store: &Store, + index_hash: &Hash, +) -> anyhow::Result { + let raw = store.get_bytes_direct_blocking(index_hash)?; + let (index_header, index_body) = crate::read_header_and_body(&raw) + .ok_or_else(|| anyhow::anyhow!("invalid index header"))?; if index_header.object_type != ObjectType::Index { - anyhow::bail!( - "expected Index object, got {:?}", - index_header.object_type + anyhow::bail!("expected Index object, got {:?}", index_header.object_type); + } + + let index = object_body::Index::from_data(index_body)?; + + let mut objects = HashMap::new(); + collect_objects_blocking(store, &index.tree, &mut objects)?; + + Ok(TreeMetadata { + index: IndexInfo { + hash: index_hash.clone(), + tree: index.tree, + timestamp: index.timestamp, + metadata: index.metadata, + }, + objects, + }) +} + +#[allow(clippy::mutable_key_type)] +fn collect_objects_blocking( + store: &Store, + tree_hash: &Hash, + objects: &mut HashMap, +) -> anyhow::Result<()> { + if objects.contains_key(tree_hash) { + return Ok(()); + } + + let raw = store.get_bytes_direct_blocking(tree_hash)?; + let (header, body) = crate::read_header_and_body(&raw) + .ok_or_else(|| anyhow::anyhow!("invalid tree object header"))?; + + if header.object_type != ObjectType::Tree { + anyhow::bail!("expected Tree object, got {:?}", header.object_type); + } + + let tree = object_body::Tree::from_data(body)?; + + let mut entries = BTreeMap::new(); + for entry in &tree.contents { + entries.insert( + entry.path.clone(), + TreeDirEntry { + mode: entry.mode.as_str().to_string(), + hash: entry.hash.clone(), + }, ); + + match entry.mode { + Mode::Tree => { + collect_objects_blocking(store, &entry.hash, objects)?; + } + _ => { + if !objects.contains_key(&entry.hash) { + let raw = store.get_bytes_direct_blocking(&entry.hash)?; + let (blob_header, _) = crate::read_header_and_body(&raw) + .ok_or_else(|| anyhow::anyhow!("invalid blob header"))?; + objects.insert(entry.hash.clone(), MetadataObject::Blob { size: blob_header.size }); + } + } + } } - let mut index_body = Vec::new(); - index_obj.read_to_end(&mut index_body).await?; - let index = object_body::Index::from_data(&index_body)?; + objects.insert(tree_hash.clone(), MetadataObject::Tree { entries }); + Ok(()) +} + +/// Async implementation for non-filesystem backends. +async fn collect_tree_metadata_async( + store: &Store, + index_hash: &Hash, +) -> anyhow::Result { + let raw = store.get_raw_bytes(index_hash).await?; + let (index_header, index_body) = crate::read_header_and_body(&raw) + .ok_or_else(|| anyhow::anyhow!("invalid index header"))?; + + if index_header.object_type != ObjectType::Index { + anyhow::bail!("expected Index object, got {:?}", index_header.object_type); + } + + let index = object_body::Index::from_data(index_body)?; let mut objects = HashMap::new(); collect_objects(store, &index.tree, &mut objects).await?; @@ -226,20 +334,22 @@ async fn collect_objects( return Ok(()); } - let mut tree_obj = store.get_object(tree_hash).await?; + let raw = store.get_raw_bytes(tree_hash).await?; + let (header, body) = crate::read_header_and_body(&raw) + .ok_or_else(|| anyhow::anyhow!("invalid tree object header"))?; - if tree_obj.header.object_type != ObjectType::Tree { + if header.object_type != ObjectType::Tree { anyhow::bail!( "expected Tree object, got {:?}", - tree_obj.header.object_type + header.object_type ); } - let mut tree_body = Vec::new(); - tree_obj.read_to_end(&mut tree_body).await?; - let tree = object_body::Tree::from_data(&tree_body)?; + let tree = object_body::Tree::from_data(body)?; let mut entries = BTreeMap::new(); + let mut blob_hashes = Vec::new(); + for entry in &tree.contents { entries.insert( entry.path.clone(), @@ -255,16 +365,35 @@ async fn collect_objects( } _ => { if !objects.contains_key(&entry.hash) { - let blob_header = read_header(store, &entry.hash).await?; - objects.insert( - entry.hash.clone(), - MetadataObject::Blob { size: blob_header.size }, - ); + blob_hashes.push(entry.hash.clone()); } } } } + // Batch-read blob headers concurrently + if !blob_hashes.is_empty() { + use futures::StreamExt; + let results: Vec> = futures::stream::iter(blob_hashes) + .map(|hash| { + let store = store.clone(); + async move { + let raw = store.get_raw_bytes(&hash).await?; + let (header, _) = crate::read_header_and_body(&raw) + .ok_or_else(|| anyhow::anyhow!("invalid blob header"))?; + Ok((hash, header)) + } + }) + .buffer_unordered(256) + .collect() + .await; + + for result in results { + let (hash, header) = result?; + objects.insert(hash, MetadataObject::Blob { size: header.size }); + } + } + objects.insert(tree_hash.clone(), MetadataObject::Tree { entries }); Ok(()) diff --git a/server/src/lib.rs b/server/src/lib.rs index 5216c32..59bcceb 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -27,7 +27,7 @@ use serde::Deserialize; use tokio_util::compat::FuturesAsyncReadCompatExt; use tokio_util::io::ReaderStream; -use futures::{AsyncReadExt, StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; const STREAM_CHUNK_SIZE: usize = 64 * 1024; const STREAM_CHANNEL_CAPACITY: usize = 32; @@ -168,16 +168,10 @@ async fn get_object( ) -> Result, ServerError> { tracing::debug!(hash = %object_hash, "GET /object - retrieving object"); - match store.exists(&object_hash).await { - Ok(true) => Ok(()), - Ok(false) => Err(ServerError::NotFound("no object".into())), - Err(err) => Err(ServerError::Internal(err.to_string())), - }?; - let object = store .get_object(&object_hash) .await - .map_err(|err| ServerError::Internal(err.to_string()))?; + .map_err(|_| ServerError::NotFound("no object".into()))?; let Header { object_type, size } = object.header; let reader_stream = ReaderStream::new(object.compat()); @@ -196,36 +190,46 @@ struct CompressionQuery { } async fn read_index_from_store(store: &Store, index_hash: &Hash) -> Result { - match store.exists(index_hash).await { - Ok(true) => Ok(()), - Ok(false) => Err(ServerError::NotFound("index not found".into())), - Err(err) => Err(ServerError::Internal(err.to_string())), - }?; - - let mut store_obj = store - .get_object(index_hash) + let raw = store + .get_raw_bytes(index_hash) .await - .map_err(|err| ServerError::Internal(err.to_string()))?; + .map_err(|_| ServerError::NotFound("index not found".into()))?; + + let (header, body) = read_header_and_body(&raw) + .ok_or_else(|| ServerError::Internal("invalid index header".into()))?; - if store_obj.header.object_type != ObjectType::Index { + if header.object_type != ObjectType::Index { return Err(ServerError::BadRequest("object is not an index".into())); } - let mut body = Vec::new(); - store_obj - .read_to_end(&mut body) - .await - .map_err(|err| ServerError::Internal(err.to_string()))?; - - Index::from_data(&body).map_err(|err| ServerError::Internal(err.to_string())) + Index::from_data(body).map_err(|err| ServerError::Internal(err.to_string())) } /// Walk a tree collecting (Hash, Header) pairs and the ordered list of hashes. -/// Only tree bodies are read to discover children; blob bodies are NOT loaded. +/// Uses direct I/O in a single blocking task when available. #[allow(clippy::mutable_key_type)] async fn collect_entry_metadata( store: &Store, tree_hash: &Hash, +) -> Result<(Vec, HashMap), ServerError> { + if store.has_direct_io() { + let store = store.clone(); + let tree_hash = tree_hash.clone(); + tokio::task::spawn_blocking(move || { + collect_entry_metadata_blocking(&store, &tree_hash) + }) + .await + .map_err(|e| ServerError::Internal(e.to_string()))? + } else { + collect_entry_metadata_async(store, tree_hash).await + } +} + +/// Blocking version using direct filesystem I/O. +#[allow(clippy::mutable_key_type)] +fn collect_entry_metadata_blocking( + store: &Store, + tree_hash: &Hash, ) -> Result<(Vec, HashMap), ServerError> { let mut meta: HashMap = HashMap::new(); let mut order: Vec = Vec::new(); @@ -236,20 +240,65 @@ async fn collect_entry_metadata( continue; } - let mut object = store.get_object(¤t_hash).await + let raw = store.get_bytes_direct_blocking(¤t_hash) .map_err(|e| ServerError::Internal(e.to_string()))?; - let header = object.header; + let (header, body) = read_header_and_body(&raw) + .ok_or_else(|| ServerError::Internal("invalid object header".into()))?; if header.object_type == ObjectType::Tree { - let mut data = Vec::new(); - object.read_to_end(&mut data).await + let tree = Tree::from_data(body) .map_err(|e| ServerError::Internal(e.to_string()))?; - let tree = Tree::from_data(&data) + for entry in &tree.contents { + if !meta.contains_key(&entry.hash) { + stack.push(entry.hash.clone()); + } + } + meta.insert(current_hash.clone(), Header::new(ObjectType::Tree, body.len() as u64)); + order.push(current_hash); + } else { + meta.insert(current_hash.clone(), header); + order.push(current_hash); + } + } + + Ok((order, meta)) +} + +/// Async version for non-filesystem backends. +#[allow(clippy::mutable_key_type)] +async fn collect_entry_metadata_async( + store: &Store, + tree_hash: &Hash, +) -> Result<(Vec, HashMap), ServerError> { + let mut meta: HashMap = HashMap::new(); + let mut order: Vec = Vec::new(); + let mut stack = vec![tree_hash.clone()]; + let mut blob_hashes: Vec = Vec::new(); + + // Phase 1: Walk trees to discover all hashes + while let Some(current_hash) = stack.pop() { + if meta.contains_key(¤t_hash) { + continue; + } + + let raw = store.get_raw_bytes(¤t_hash).await + .map_err(|e| ServerError::Internal(e.to_string()))?; + let (header, body) = read_header_and_body(&raw) + .ok_or_else(|| ServerError::Internal("invalid object header".into()))?; + + if header.object_type == ObjectType::Tree { + let tree = Tree::from_data(body) .map_err(|e| ServerError::Internal(e.to_string()))?; for entry in &tree.contents { - stack.push(entry.hash.clone()); + if !meta.contains_key(&entry.hash) { + if entry.mode == Mode::Tree { + stack.push(entry.hash.clone()); + } else { + blob_hashes.push(entry.hash.clone()); + } + } } - meta.insert(current_hash.clone(), Header::new(ObjectType::Tree, data.len() as u64)); + meta.insert(current_hash.clone(), Header::new(ObjectType::Tree, body.len() as u64)); order.push(current_hash); } else { meta.insert(current_hash.clone(), header); @@ -257,6 +306,30 @@ async fn collect_entry_metadata( } } + // Phase 2: Read blob headers concurrently + let blob_results: Vec> = futures::stream::iter(blob_hashes) + .map(|hash| { + let store = store.clone(); + async move { + let raw = store.get_raw_bytes(&hash).await + .map_err(|e| ServerError::Internal(e.to_string()))?; + let (header, _) = read_header_and_body(&raw) + .ok_or_else(|| ServerError::Internal("invalid blob header".into()))?; + Ok((hash, header)) + } + }) + .buffer_unordered(256) + .collect() + .await; + + for result in blob_results { + let (hash, header) = result?; + if let std::collections::hash_map::Entry::Vacant(e) = meta.entry(hash.clone()) { + order.push(hash); + e.insert(header); + } + } + Ok((order, meta)) } @@ -310,15 +383,18 @@ fn write_streaming_archive( let prefix = header.to_string(); w.write_all(prefix.as_bytes())?; - let body = tokio::task::block_in_place(|| { + // Use direct I/O when available for maximum throughput, + // otherwise fall back to async via block_on + let raw = if store.has_direct_io() { + store.get_bytes_direct_blocking(hash)? + } else { tokio::runtime::Handle::current().block_on(async { - let mut obj = store.get_object(hash).await?; - let mut data = Vec::new(); - obj.read_to_end(&mut data).await?; - Ok::<_, anyhow::Error>(data) - }) - })?; - w.write_all(&body)?; + store.get_raw_bytes(hash).await + })? + }; + let (_, body) = read_header_and_body(&raw) + .ok_or_else(|| anyhow::anyhow!("invalid object header for {}", hash))?; + w.write_all(body)?; } w.flush()?; Ok(()) @@ -487,10 +563,10 @@ fn stream_zip_tree<'a, W: tokio::io::AsyncWrite + Unpin + Send + 'a>( zip_writer: &'a mut ZipFileWriter, ) -> std::pin::Pin> + Send + 'a>> { Box::pin(async move { - let mut store_obj = store.get_object(tree_hash).await?; - let mut body = Vec::new(); - store_obj.read_to_end(&mut body).await?; - let tree = Tree::from_data(&body)?; + let raw = store.get_raw_bytes(tree_hash).await?; + let (_, body) = read_header_and_body(&raw) + .ok_or_else(|| anyhow::anyhow!("invalid tree header"))?; + let tree = Tree::from_data(body)?; for entry in &tree.contents { let entry_path = if prefix.is_empty() { @@ -504,16 +580,16 @@ fn stream_zip_tree<'a, W: tokio::io::AsyncWrite + Unpin + Send + 'a>( stream_zip_tree(store, &entry.hash, &entry_path, zip_writer).await?; } _ => { - let mut blob_obj = store.get_object(&entry.hash).await?; - let mut data = Vec::new(); - blob_obj.read_to_end(&mut data).await?; + let raw = store.get_raw_bytes(&entry.hash).await?; + let (_, data) = read_header_and_body(&raw) + .ok_or_else(|| anyhow::anyhow!("invalid blob header"))?; let zip_entry = ZipEntryBuilder::new( entry_path.into(), ZipCompression::Deflate, ); let mut entry_writer = zip_writer.write_entry_stream(zip_entry).await?; - futures::AsyncWriteExt::write_all(&mut entry_writer, &data).await?; + futures::AsyncWriteExt::write_all(&mut entry_writer, data).await?; entry_writer.close().await?; } } @@ -574,35 +650,61 @@ async fn upload_archive( let archive = Archive::::from_data(&mut std::io::Cursor::new(&body)) .map_err(|err| ServerError::BadRequest(format!("invalid archive: {}", err)))?; - let mut added: usize = 0; - let mut skipped: usize = 0; - - for (header_entry, raw) in archive.body.header.iter().zip(archive.body.entries.iter()) { - let hash = &header_entry.hash; - - match store.exists(hash).await { - Ok(true) => { - skipped += 1; - continue; + let entries: Vec<_> = archive.body.header.iter().zip(archive.body.entries.iter()) + .map(|(header_entry, raw)| (header_entry.hash.clone(), raw.0.clone())) + .collect(); + + let total = entries.len(); + + // Use direct blocking I/O when available for maximum throughput + if store.has_direct_io() { + let store_clone = store.clone(); + let (added, skipped) = tokio::task::spawn_blocking(move || { + let mut added = 0usize; + let mut skipped = 0usize; + for (hash, raw_data) in &entries { + if store_clone.exists_direct_blocking(hash).unwrap_or(false) { + skipped += 1; + continue; + } + store_clone.put_bytes_direct_blocking(hash, raw_data) + .map_err(|e| ServerError::Internal(e.to_string()))?; + added += 1; } - Ok(false) => {} - Err(err) => return Err(ServerError::Internal(err.to_string())), + Ok::<_, ServerError>((added, skipped)) + }).await.map_err(|e| ServerError::Internal(e.to_string()))??; + + tracing::info!(added, skipped, total, "POST /upload - complete (direct I/O)"); + Ok(format!("Added {} objects, skipped {}", added, skipped)) + } else { + // Async path with exists check for non-fs backends + let results: Vec> = futures::stream::iter(entries) + .map(|(hash, raw_data)| { + let store = store.clone(); + async move { + if store.exists(&hash).await.unwrap_or(false) { + return Ok(false); + } + let (header, body_bytes) = read_header_and_body(&raw_data) + .ok_or_else(|| ServerError::BadRequest("invalid entry data".into()))?; + store.put_object_bytes(&hash, header, body_bytes.to_vec()).await + .map_err(|err| ServerError::Internal(err.to_string()))?; + Ok(true) + } + }) + .buffer_unordered(64) + .collect() + .await; + + let mut added = 0; + let mut skipped = 0; + for result in results { + if result? { added += 1; } else { skipped += 1; } } - let (header, body_bytes) = read_header_and_body(&raw.0) - .ok_or_else(|| ServerError::BadRequest("invalid entry data: could not parse header".into()))?; - - store - .put_object_bytes(hash, header, body_bytes.to_vec()) - .await - .map_err(|err| ServerError::Internal(err.to_string()))?; - - added += 1; + tracing::info!(added, skipped, total, "POST /upload - complete"); + Ok(format!("Added {} objects, skipped {}", added, skipped)) } - - tracing::info!(added = added, skipped = skipped, "POST /upload - complete"); - - Ok(format!("Added {} objects, skipped {}", added, skipped)) } #[derive(serde::Deserialize)] @@ -621,13 +723,33 @@ async fn check_missing( ) -> Result, ServerError> { tracing::debug!(requested = request.hashes.len(), "POST /missing - checking hashes"); - let mut missing = Vec::new(); - for hash in request.hashes { - match store.exists(&hash).await { - Ok(true) => {} - _ => missing.push(hash), - } - } + let missing = if store.has_direct_io() { + // Single blocking task for all exists checks — much faster than concurrent async + let store_clone = store.clone(); + tokio::task::spawn_blocking(move || { + request.hashes.into_iter() + .filter(|hash| !store_clone.exists_direct_blocking(hash).unwrap_or(false)) + .collect::>() + }) + .await + .map_err(|e| ServerError::Internal(e.to_string()))? + } else { + // Concurrent async for non-fs backends + futures::stream::iter(request.hashes) + .map(|hash| { + let store = store.clone(); + async move { + match store.exists(&hash).await { + Ok(true) => None, + _ => Some(hash), + } + } + }) + .buffer_unordered(256) + .filter_map(|x| async { x }) + .collect() + .await + }; tracing::debug!(missing = missing.len(), "POST /missing - result"); @@ -641,15 +763,16 @@ async fn get_metadata( ) -> Result, ServerError> { tracing::debug!(hash = %index_hash, "GET /metadata - collecting metadata"); - match store.exists(&index_hash).await { - Ok(true) => Ok(()), - Ok(false) => Err(ServerError::NotFound("index not found".into())), - Err(err) => Err(ServerError::Internal(err.to_string())), - }?; - let meta = collect_tree_metadata(&store, &index_hash) .await - .map_err(|err| ServerError::Internal(err.to_string()))?; + .map_err(|err| { + let msg = err.to_string(); + if msg.contains("not found") || msg.contains("NotFound") || msg.contains("No such file") { + ServerError::NotFound("index not found".into()) + } else { + ServerError::Internal(msg) + } + })?; Ok(axum::Json(meta)) } diff --git a/server/src/main.rs b/server/src/main.rs index 3ad4783..b486567 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -95,8 +95,7 @@ fn build_store(config: &StoreConfig) -> anyhow::Result { if !path.exists() { std::fs::create_dir_all(path)?; } - let builder = opendal::services::Fs::default().root(root); - Store::from_builder(builder) + Store::from_fs_path(path) } StoreConfig::S3 { bucket, diff --git a/server/tests/http_integration.rs b/server/tests/http_integration.rs index 1d1d1fb..0be3ace 100644 --- a/server/tests/http_integration.rs +++ b/server/tests/http_integration.rs @@ -11,8 +11,7 @@ use tempfile::TempDir; use tower::ServiceExt; fn make_store(dir: &TempDir) -> Store { - let builder = opendal::services::Fs::default().root(dir.path().to_str().unwrap()); - Store::from_builder(builder).expect("failed to create store") + Store::from_fs_path(dir.path()).expect("failed to create store") } fn put_request(hash_str: &str, body: &[u8], object_type: &str, object_size: u64) -> Request {