Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 131 additions & 100 deletions client/src/main.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion common/src/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ where
);

tracing::trace!(hash = %entry.hash, length = amount, "reading archive entry");
entries.push(RawEntryData(data.to_vec()));
entries.push(RawEntryData(data));

counter += amount;
}
Expand Down
69 changes: 55 additions & 14 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use std::{

use sha2::{Digest, Sha512};

use futures::AsyncReadExt;

pub use crate::constants::{BLOB_KEY, INDEX_KEY, TREE_KEY};
pub use crate::hash::Hash;
pub use crate::header::Header;
Expand Down Expand Up @@ -78,33 +76,76 @@ pub async fn read_object_into_headers(
continue;
}

let mut object = store.get_object(&current_hash).await?;

if object.header.object_type == ObjectType::Index {
return Err(anyhow::anyhow!("indexes cannot exist within a tree (possible hash collision)"));
}

let header = object.header;
let raw = store.get_raw_bytes(&current_hash).await?;
let (header, body) = read_header_and_body(&raw)
.ok_or_else(|| anyhow::anyhow!("failed to parse header for object {}", current_hash))?;

let mut data = Vec::new();
object.read_to_end(&mut data).await?;
anyhow::ensure!(header.object_type != ObjectType::Index, "indexes cannot exist within a tree (possible hash collision)");

if header.object_type == ObjectType::Tree {
let tree = crate::object_body::Tree::from_data(&data)?;

let tree = crate::object_body::Tree::from_data(body)?;
for entry in &tree.contents {
stack.push(entry.hash.clone());
}
}

headers.insert(current_hash, (header, data));
headers.insert(current_hash, (header, body.to_vec()));
}

tracing::debug!(objects_collected = headers.len(), "object tree walk complete");

Ok(())
}

/// Parallel version that reads all objects using direct blocking I/O.
/// Walks tree structure to discover hashes, then reads all objects in a single blocking task.
pub async fn read_object_into_headers_parallel(
store: &Store,
headers: &mut HashMap<Hash, (Header, Vec<u8>)>,
object_hash: &Hash,
) -> anyhow::Result<()> {
tracing::debug!(root_hash = %object_hash, "reading object tree into headers (parallel)");

// Read everything in a single blocking task to avoid async overhead
let store_clone = store.clone();
let root_hash = object_hash.clone();
let result = tokio::task::spawn_blocking(move || {
let mut headers: HashMap<Hash, (Header, Vec<u8>)> = HashMap::new();
let mut stack = vec![root_hash];

while let Some(current_hash) = stack.pop() {
if headers.contains_key(&current_hash) {
continue;
}

let raw = store_clone.get_bytes_direct_blocking(&current_hash)?;
let (header, body) = read_header_and_body(&raw)
.ok_or_else(|| anyhow::anyhow!("failed to parse header for object {}", current_hash))?;

anyhow::ensure!(header.object_type != ObjectType::Index, "indexes cannot exist within a tree");

if header.object_type == ObjectType::Tree {
let tree = crate::object_body::Tree::from_data(body)?;
for entry in &tree.contents {
if !headers.contains_key(&entry.hash) {
stack.push(entry.hash.clone());
}
}
}

headers.insert(current_hash, (header, body.to_vec()));
}

Ok::<_, anyhow::Error>(headers)
})
.await??;

*headers = result;
tracing::debug!(objects_collected = headers.len(), "object tree walk complete");

Ok(())
}

pub fn pipe(reader: &mut dyn Read, writer: &mut dyn Write) -> anyhow::Result<()> {
let mut buffer: [u8; 65536] = [0; 65536];
loop {
Expand Down
11 changes: 8 additions & 3 deletions common/src/object_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ impl Object for Index {
}

fn to_data(&self) -> Vec<u8> {
let mut data: Vec<u8> = Vec::new();
// Pre-allocate: "tree: " + 128-char hash + "\n" + "timestamp: " + rfc3339 + "\n" + metadata + "\n"
let estimated = 200 + self.metadata.iter().map(|(k, v)| k.len() + v.len() + 4).sum::<usize>();
let mut data: Vec<u8> = Vec::with_capacity(estimated);

fn write_kv(data: &mut Vec<u8>, key: &str, value: &str) -> anyhow::Result<()> {
data.write_all(key.as_bytes())?;
Expand Down Expand Up @@ -111,7 +113,9 @@ pub struct Tree {
}
impl Object for Tree {
fn from_data(data: &[u8]) -> anyhow::Result<Self> {
let mut contents = Vec::new();
// Estimate entry count: ~92 bytes per entry
let estimated_entries = data.len() / 80;
let mut contents = Vec::with_capacity(estimated_entries);

let mut index: usize = 0;
loop {
Expand Down Expand Up @@ -152,7 +156,8 @@ impl Object for Tree {
}

fn to_data(&self) -> Vec<u8> {
let mut data: Vec<u8> = Vec::new();
// Each entry: ~6 bytes mode + 1 space + ~20 bytes name + 1 null + 64 bytes hash = ~92 bytes avg
let mut data: Vec<u8> = Vec::with_capacity(self.contents.len() * 92);

fn write_entry(data: &mut Vec<u8>, entry: &TreeEntry) -> anyhow::Result<()> {
data.write_all(entry.mode.as_str().as_bytes())?;
Expand Down
2 changes: 1 addition & 1 deletion common/src/primitives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fmt::Display;
use crate::{BLOB_KEY, INDEX_KEY, TREE_KEY};


#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Mode {
Tree = 40000,
Normal = 100644,
Expand Down
60 changes: 59 additions & 1 deletion common/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use anyhow::Result;
use futures::io::copy;
use futures::{AsyncBufRead, AsyncRead, AsyncWriteExt};
use opendal::{Builder, FuturesAsyncReader, Operator};
use std::path::PathBuf;

pub struct StoreObject<T> {
pub header: Header,
Expand Down Expand Up @@ -53,17 +54,31 @@ where
#[derive(Clone)]
pub struct Store {
operator: Operator,
/// Root path for direct filesystem access (bypasses OpenDAL overhead for bulk writes)
root_path: Option<PathBuf>,
}

impl Store {
pub fn new(operator: Operator) -> Self {
Self { operator }
Self { operator, root_path: None }
}

pub fn from_builder(builder: impl Builder) -> Result<Self> {
Ok(Self::new(Operator::new(builder)?.finish()))
}

/// Create a store from a filesystem path with direct I/O support.
/// This enables `put_bytes_direct` for high-performance bulk writes.
pub fn from_fs_path(path: &std::path::Path) -> Result<Self> {
let builder = opendal::services::Fs::default()
.root(path.to_str().ok_or_else(|| anyhow::anyhow!("store path must be valid UTF-8"))?);
let operator = Operator::new(builder)?.finish();
Ok(Self {
operator,
root_path: Some(path.to_path_buf()),
})
}

pub async fn exists(&self, hash: &Hash) -> Result<bool> {
let result = self.operator.exists(hash.as_str()).await?;
tracing::trace!(hash = %hash, exists = result, "store exists check");
Expand Down Expand Up @@ -106,6 +121,49 @@ impl Store {
Ok(bytes.to_vec())
}

/// Write raw bytes directly to the store (caller provides complete content).
pub async fn put_raw_bytes(&self, hash: &Hash, data: Vec<u8>) -> Result<()> {
self.operator.write(hash.as_str(), data).await?;
Ok(())
}

/// Write raw bytes using direct filesystem I/O, bypassing OpenDAL's atomic write/fsync.
/// Safe for content-addressable storage where partial writes are harmless.
pub fn put_bytes_direct_blocking(&self, hash: &Hash, data: &[u8]) -> Result<()> {
if let Some(root) = &self.root_path {
let path = root.join(hash.as_str());
std::fs::write(&path, data)?;
Ok(())
} else {
Err(anyhow::anyhow!("direct write not available: store was not created with from_fs_path"))
}
}

/// Read raw bytes using direct filesystem I/O, bypassing OpenDAL overhead.
pub fn get_bytes_direct_blocking(&self, hash: &Hash) -> Result<Vec<u8>> {
if let Some(root) = &self.root_path {
let path = root.join(hash.as_str());
let data = std::fs::read(&path)?;
Ok(data)
} else {
Err(anyhow::anyhow!("direct read not available: store was not created with from_fs_path"))
}
}

/// Check if an object exists using direct filesystem I/O.
pub fn exists_direct_blocking(&self, hash: &Hash) -> Result<bool> {
if let Some(root) = &self.root_path {
Ok(root.join(hash.as_str()).exists())
} else {
Err(anyhow::anyhow!("direct exists not available: store was not created with from_fs_path"))
}
}

/// Whether this store supports direct filesystem I/O.
pub fn has_direct_io(&self) -> bool {
self.root_path.is_some()
}

/// List all object hashes in the store.
pub async fn list_hashes(&self) -> Result<Vec<Hash>> {
use opendal::EntryMode;
Expand Down
Loading