diff --git a/Cargo.lock b/Cargo.lock index 797eacdcff5..e3046230505 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2664,8 +2664,7 @@ checksum = "879f6906492a7cd215bfa4cf595b600146ccfac0c79bcbd1f3000162af5e8b06" [[package]] name = "uluru" version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "308dcc9d947b227796f851adb99936fb060681a89c002c9c1928404a3b6c2d72" +source = "git+https://github.com/Byron/uluru?branch=master#43fed348af8c391419354a0829e67e44a6a24f8a" dependencies = [ "arrayvec", ] diff --git a/git-features/src/interrupt.rs b/git-features/src/interrupt.rs index 3b86433a07d..d689e3915ea 100644 --- a/git-features/src/interrupt.rs +++ b/git-features/src/interrupt.rs @@ -10,7 +10,6 @@ pub struct Iter<'a, I, EFN> { pub inner: I, make_err: Option, should_interrupt: &'a AtomicBool, - is_done: bool, } impl<'a, I, EFN, E> Iter<'a, I, EFN> @@ -25,7 +24,6 @@ where inner, make_err: Some(make_err), should_interrupt, - is_done: false, } } } @@ -38,17 +36,14 @@ where type Item = Result; fn next(&mut self) -> Option { - if self.is_done { - return None; - } + self.make_err.as_ref()?; if self.should_interrupt.load(Ordering::Relaxed) { - self.is_done = true; return Some(Err(self.make_err.take().expect("no bug")())); } match self.inner.next() { Some(next) => Some(Ok(next)), None => { - self.is_done = true; + self.make_err = None; None } } diff --git a/git-features/src/parallel/mod.rs b/git-features/src/parallel/mod.rs index 58f7083d3a2..7d6a38e8aa0 100644 --- a/git-features/src/parallel/mod.rs +++ b/git-features/src/parallel/mod.rs @@ -35,11 +35,11 @@ #[cfg(feature = "parallel")] mod in_parallel; #[cfg(feature = "parallel")] -pub use in_parallel::*; +pub use in_parallel::{in_parallel, join}; mod serial; #[cfg(not(feature = "parallel"))] -pub use serial::*; +pub use serial::{in_parallel, join}; mod eager_iter; pub use eager_iter::{EagerIter, EagerIterIf}; diff --git a/git-features/src/parallel/serial.rs b/git-features/src/parallel/serial.rs index cddde00db49..ea22a37bb5b 100644 --- a/git-features/src/parallel/serial.rs +++ b/git-features/src/parallel/serial.rs @@ -1,7 +1,7 @@ use crate::parallel::Reduce; -#[cfg(not(feature = "parallel"))] /// Runs `left` and then `right`, one after another, returning their output when both are done. +#[cfg(not(feature = "parallel"))] pub fn join(left: impl FnOnce() -> O1 + Send, right: impl FnOnce() -> O2 + Send) -> (O1, O2) { (left(), right()) } diff --git a/git-pack/Cargo.toml b/git-pack/Cargo.toml index 8e940e55c85..a4bcc844171 100644 --- a/git-pack/Cargo.toml +++ b/git-pack/Cargo.toml @@ -48,7 +48,7 @@ itoa = "0.4.6" bytesize = "1.0.1" parking_lot = { version = "0.11.0", default-features = false } thiserror = "1.0.26" -uluru = { version = "2.2.0", optional = true } +uluru = { version = "2.2.0", optional = true, git = "https://github.com/Byron/uluru", features = ["std"], branch = "master"} memory-lru = { version = "0.1.0", optional = true } dashmap = "4.0.2" diff --git a/git-pack/src/cache.rs b/git-pack/src/cache.rs index b42f66725fd..7ddbac5336f 100644 --- a/git-pack/src/cache.rs +++ b/git-pack/src/cache.rs @@ -106,17 +106,28 @@ pub mod lru { /// The cache must be small as the search is 'naive' and the underlying data structure is a linked list. /// Values of 64 seem to improve performance. #[derive(Default)] - pub struct StaticLinkedList(uluru::LRUCache); + pub struct StaticLinkedList(uluru::LRUCache, Vec>); impl DecodeEntry for StaticLinkedList { fn put(&mut self, pack_id: u32, offset: u64, data: &[u8], kind: git_object::Kind, compressed_size: usize) { - self.0.insert(Entry { + if let Some(previous) = self.0.insert(Entry { offset, pack_id, - data: Vec::from(data), + data: self + .1 + .pop() + .map(|mut v| { + v.clear(); + v.resize(data.len(), 0); + v.copy_from_slice(data); + v + }) + .unwrap_or_else(|| Vec::from(data)), kind, compressed_size, - }) + }) { + self.1.push(previous.data) + } } fn get(&mut self, pack_id: u32, offset: u64, out: &mut Vec) -> Option<(git_object::Kind, usize)> { diff --git a/git-pack/src/data/output/count/iter_from_objects.rs b/git-pack/src/data/output/count/iter_from_objects.rs index e37c1aa7a5b..07d2b040f2f 100644 --- a/git-pack/src/data/output/count/iter_from_objects.rs +++ b/git-pack/src/data/output/count/iter_from_objects.rs @@ -1,14 +1,18 @@ use std::sync::Arc; -use dashmap::DashSet; use git_features::{parallel, progress::Progress}; use git_hash::{oid, ObjectId}; use git_object::immutable; use crate::{data::output, find, FindExt}; +use std::sync::atomic::{AtomicBool, Ordering}; -/// Generate [`Count`][output::Count] from input `objects` with object expansion based on [`options`][Options] -/// to learn which objects would be part of a pack. +/// The return type used by [`objects()`]. +pub type Result = std::result::Result<(Vec, Outcome), Error>; + +/// Generate [`Count`][output::Count]s from input `objects` with object expansion based on [`options`][Options] +/// to learn which objects would would constitute a pack. This step is required to know exactly how many objects would +/// be in a pack while keeping data around to avoid minimize object database access. /// /// A [`Count`][output::Count] object maintains enough state to greatly accelerate future access of packed objects. /// @@ -19,25 +23,28 @@ use crate::{data::output, find, FindExt}; /// * Objects may be expanded based on the provided [`options`][Options] /// * `progress` /// * a way to obtain progress information +/// * `should_interrupt` +/// * A flag that is set to true if the operation should stop /// * `options` /// * more configuration -pub fn iter_from_objects( +pub fn objects( db: Find, - make_cache: impl Fn() -> Cache + Send + Clone + Sync + 'static, + make_cache: impl Fn() -> Cache + Send + Sync, objects_ids: Iter, progress: impl Progress, + should_interrupt: &AtomicBool, Options { thread_limit, input_object_expansion, chunk_size, }: Options, -) -> impl Iterator, Error>>> - + parallel::reduce::Finalize>>> +) -> Result, IterErr> where - Find: crate::Find + Clone + Send + Sync + 'static, + Find: crate::Find + Send + Sync, ::Error: Send, - Iter: Iterator + Send + 'static, - Oid: AsRef + Send + 'static, + Iter: Iterator> + Send, + Oid: Into + Send, + IterErr: std::error::Error + Send, Cache: crate::cache::DecodeEntry, { let lower_bound = objects_ids.size_hint().0; @@ -51,10 +58,10 @@ where iter: objects_ids, size: chunk_size, }; - let seen_objs = Arc::new(dashmap::DashSet::::new()); + let seen_objs = dashmap::DashSet::::new(); let progress = Arc::new(parking_lot::Mutex::new(progress)); - parallel::reduce::Stepwise::new( + parallel::in_parallel( chunks, thread_limit, { @@ -73,188 +80,245 @@ where } }, { - let seen_objs = Arc::clone(&seen_objs); - move |oids: Vec, (buf1, buf2, cache, progress)| { - use ObjectExpansion::*; - let mut out = Vec::new(); - let mut tree_traversal_state = git_traverse::tree::breadthfirst::State::default(); - let mut tree_diff_state = git_diff::tree::State::default(); - let mut parent_commit_ids = Vec::new(); - let seen_objs = seen_objs.as_ref(); - let mut traverse_delegate = tree::traverse::AllUnseen::new(seen_objs); - let mut changes_delegate = tree::changes::AllNew::new(seen_objs); - let mut outcome = Outcome::default(); - let stats = &mut outcome; - - for id in oids.into_iter() { - let id = id.as_ref(); - let obj = db.find_existing(id, buf1, cache)?; - stats.input_objects += 1; - match input_object_expansion { - TreeAdditionsComparedToAncestor => { - use git_object::Kind::*; - let mut obj = obj; - let mut id = id.to_owned(); - - loop { - push_obj_count_unique(&mut out, seen_objs, &id, &obj, progress, stats, false); - match obj.kind { - Tree | Blob => break, - Tag => { - id = immutable::TagIter::from_bytes(obj.data) - .target_id() - .expect("every tag has a target"); - obj = db.find_existing(id, buf1, cache)?; - stats.expanded_objects += 1; - continue; - } - Commit => { - let current_tree_iter = { - let mut commit_iter = immutable::CommitIter::from_bytes(obj.data); - let tree_id = commit_iter.tree_id().expect("every commit has a tree"); - parent_commit_ids.clear(); - for token in commit_iter { - match token { - Ok(immutable::commit::iter::Token::Parent { id }) => { - parent_commit_ids.push(id) - } - Ok(_) => break, - Err(err) => return Err(Error::CommitDecode(err)), - } - } - let obj = db.find_existing(tree_id, buf1, cache)?; - push_obj_count_unique( - &mut out, seen_objs, &tree_id, &obj, progress, stats, true, - ); - immutable::TreeIter::from_bytes(obj.data) - }; - - let objects = if parent_commit_ids.is_empty() { - traverse_delegate.clear(); - git_traverse::tree::breadthfirst( - current_tree_iter, - &mut tree_traversal_state, - |oid, buf| { - stats.decoded_objects += 1; - db.find_existing_tree_iter(oid, buf, cache).ok() - }, - &mut traverse_delegate, - ) - .map_err(Error::TreeTraverse)?; - &traverse_delegate.objects - } else { - for commit_id in &parent_commit_ids { - let parent_tree_id = { - let parent_commit_obj = db.find_existing(commit_id, buf2, cache)?; - - push_obj_count_unique( - &mut out, - seen_objs, - commit_id, - &parent_commit_obj, - progress, - stats, - true, - ); - immutable::CommitIter::from_bytes(parent_commit_obj.data) - .tree_id() - .expect("every commit has a tree") - }; - let parent_tree = { - let parent_tree_obj = - db.find_existing(parent_tree_id, buf2, cache)?; - push_obj_count_unique( - &mut out, - seen_objs, - &parent_tree_id, - &parent_tree_obj, - progress, - stats, - true, - ); - immutable::TreeIter::from_bytes(parent_tree_obj.data) - }; - - changes_delegate.clear(); - git_diff::tree::Changes::from(Some(parent_tree)) - .needed_to_obtain( - current_tree_iter.clone(), - &mut tree_diff_state, - |oid, buf| { - stats.decoded_objects += 1; - db.find_existing_tree_iter(oid, buf, cache).ok() - }, - &mut changes_delegate, - ) - .map_err(Error::TreeChanges)?; - } - &changes_delegate.objects - }; - for id in objects.iter() { - out.push(id_to_count(&db, buf2, id, progress, stats)); - } - break; + move |oids: Vec>, (buf1, buf2, cache, progress)| { + expand_inner( + &db, + input_object_expansion, + &seen_objs, + oids, + buf1, + buf2, + cache, + progress, + should_interrupt, + ) + } + }, + reduce::Statistics::new(progress), + ) +} + +/// Like [`objects()`] but using a single thread only to mostly save on the otherwise required overhead. +pub fn objects_unthreaded( + db: Find, + pack_cache: &mut impl crate::cache::DecodeEntry, + object_ids: impl Iterator>, + mut progress: impl Progress, + should_interrupt: &AtomicBool, + input_object_expansion: ObjectExpansion, +) -> Result, IterErr> +where + Find: crate::Find + Send + Sync, + Oid: Into + Send, + IterErr: std::error::Error + Send, +{ + let seen_objs = RefCell::new(HashSet::::new()); + + let (mut buf1, mut buf2) = (Vec::new(), Vec::new()); + expand_inner( + &db, + input_object_expansion, + &seen_objs, + object_ids, + &mut buf1, + &mut buf2, + pack_cache, + &mut progress, + should_interrupt, + ) +} + +#[allow(clippy::too_many_arguments)] +fn expand_inner( + db: &Find, + input_object_expansion: ObjectExpansion, + seen_objs: &impl util::InsertImmutable, + oids: impl IntoIterator>, + buf1: &mut Vec, + buf2: &mut Vec, + cache: &mut impl crate::cache::DecodeEntry, + progress: &mut impl Progress, + should_interrupt: &AtomicBool, +) -> Result, IterErr> +where + Find: crate::Find + Send + Sync, + Oid: Into + Send, + IterErr: std::error::Error + Send, +{ + use ObjectExpansion::*; + + let mut out = Vec::new(); + let mut tree_traversal_state = git_traverse::tree::breadthfirst::State::default(); + let mut tree_diff_state = git_diff::tree::State::default(); + let mut parent_commit_ids = Vec::new(); + let mut traverse_delegate = tree::traverse::AllUnseen::new(seen_objs); + let mut changes_delegate = tree::changes::AllNew::new(seen_objs); + let mut outcome = Outcome::default(); + let stats = &mut outcome; + for id in oids.into_iter() { + if should_interrupt.load(Ordering::Relaxed) { + return Err(Error::Interrupted); + } + + let id = id.map(|oid| oid.into()).map_err(Error::InputIteration)?; + let obj = db.find_existing(id, buf1, cache)?; + stats.input_objects += 1; + match input_object_expansion { + TreeAdditionsComparedToAncestor => { + use git_object::Kind::*; + let mut obj = obj; + let mut id = id.to_owned(); + + loop { + push_obj_count_unique(&mut out, seen_objs, &id, &obj, progress, stats, false); + match obj.kind { + Tree | Blob => break, + Tag => { + id = immutable::TagIter::from_bytes(obj.data) + .target_id() + .expect("every tag has a target"); + obj = db.find_existing(id, buf1, cache)?; + stats.expanded_objects += 1; + continue; + } + Commit => { + let current_tree_iter = { + let mut commit_iter = immutable::CommitIter::from_bytes(obj.data); + let tree_id = commit_iter.tree_id().expect("every commit has a tree"); + parent_commit_ids.clear(); + for token in commit_iter { + match token { + Ok(immutable::commit::iter::Token::Parent { id }) => parent_commit_ids.push(id), + Ok(_) => break, + Err(err) => return Err(Error::CommitDecode(err)), } } - } - } - TreeContents => { - use git_object::Kind::*; - let mut id: ObjectId = id.into(); - let mut obj = obj; - loop { - push_obj_count_unique(&mut out, seen_objs, &id, &obj, progress, stats, false); - match obj.kind { - Tree => { - traverse_delegate.clear(); - git_traverse::tree::breadthfirst( - git_object::immutable::TreeIter::from_bytes(obj.data), - &mut tree_traversal_state, + let obj = db.find_existing(tree_id, buf1, cache)?; + push_obj_count_unique(&mut out, seen_objs, &tree_id, &obj, progress, stats, true); + immutable::TreeIter::from_bytes(obj.data) + }; + + let objects = if parent_commit_ids.is_empty() { + traverse_delegate.clear(); + git_traverse::tree::breadthfirst( + current_tree_iter, + &mut tree_traversal_state, + |oid, buf| { + stats.decoded_objects += 1; + db.find_existing_tree_iter(oid, buf, cache).ok() + }, + &mut traverse_delegate, + ) + .map_err(Error::TreeTraverse)?; + &traverse_delegate.objects + } else { + for commit_id in &parent_commit_ids { + let parent_tree_id = { + let parent_commit_obj = db.find_existing(commit_id, buf2, cache)?; + + push_obj_count_unique( + &mut out, + seen_objs, + commit_id, + &parent_commit_obj, + progress, + stats, + true, + ); + immutable::CommitIter::from_bytes(parent_commit_obj.data) + .tree_id() + .expect("every commit has a tree") + }; + let parent_tree = { + let parent_tree_obj = db.find_existing(parent_tree_id, buf2, cache)?; + push_obj_count_unique( + &mut out, + seen_objs, + &parent_tree_id, + &parent_tree_obj, + progress, + stats, + true, + ); + immutable::TreeIter::from_bytes(parent_tree_obj.data) + }; + + changes_delegate.clear(); + git_diff::tree::Changes::from(Some(parent_tree)) + .needed_to_obtain( + current_tree_iter.clone(), + &mut tree_diff_state, |oid, buf| { stats.decoded_objects += 1; db.find_existing_tree_iter(oid, buf, cache).ok() }, - &mut traverse_delegate, + &mut changes_delegate, ) - .map_err(Error::TreeTraverse)?; - for id in traverse_delegate.objects.iter() { - out.push(id_to_count(&db, buf1, id, progress, stats)); - } - break; - } - Commit => { - id = immutable::CommitIter::from_bytes(obj.data) - .tree_id() - .expect("every commit has a tree"); - stats.expanded_objects += 1; - obj = db.find_existing(id, buf1, cache)?; - continue; - } - Blob => break, - Tag => { - id = immutable::TagIter::from_bytes(obj.data) - .target_id() - .expect("every tag has a target"); - stats.expanded_objects += 1; - obj = db.find_existing(id, buf1, cache)?; - continue; - } + .map_err(Error::TreeChanges)?; } + &changes_delegate.objects + }; + for id in objects.iter() { + out.push(id_to_count(db, buf2, id, progress, stats)); } + break; } - AsIs => push_obj_count_unique(&mut out, seen_objs, id, &obj, progress, stats, false), } } - Ok((out, outcome)) } - }, - reduce::Statistics::default(), - ) + TreeContents => { + use git_object::Kind::*; + let mut id = id; + let mut obj = obj; + loop { + push_obj_count_unique(&mut out, seen_objs, &id, &obj, progress, stats, false); + match obj.kind { + Tree => { + traverse_delegate.clear(); + git_traverse::tree::breadthfirst( + git_object::immutable::TreeIter::from_bytes(obj.data), + &mut tree_traversal_state, + |oid, buf| { + stats.decoded_objects += 1; + db.find_existing_tree_iter(oid, buf, cache).ok() + }, + &mut traverse_delegate, + ) + .map_err(Error::TreeTraverse)?; + for id in traverse_delegate.objects.iter() { + out.push(id_to_count(db, buf1, id, progress, stats)); + } + break; + } + Commit => { + id = immutable::CommitIter::from_bytes(obj.data) + .tree_id() + .expect("every commit has a tree"); + stats.expanded_objects += 1; + obj = db.find_existing(id, buf1, cache)?; + continue; + } + Blob => break, + Tag => { + id = immutable::TagIter::from_bytes(obj.data) + .target_id() + .expect("every tag has a target"); + stats.expanded_objects += 1; + obj = db.find_existing(id, buf1, cache)?; + continue; + } + } + } + } + AsIs => push_obj_count_unique(&mut out, seen_objs, &id, &obj, progress, stats, false), + } + } + Ok((out, outcome)) } mod tree { pub mod changes { - use dashmap::DashSet; + use crate::data::output::count::iter_from_objects::util::InsertImmutable; use git_diff::tree::{ visit::{Action, Change}, Visit, @@ -262,13 +326,16 @@ mod tree { use git_hash::ObjectId; use git_object::bstr::BStr; - pub struct AllNew<'a> { + pub struct AllNew<'a, H> { pub objects: Vec, - all_seen: &'a DashSet, + all_seen: &'a H, } - impl<'a> AllNew<'a> { - pub fn new(all_seen: &'a DashSet) -> Self { + impl<'a, H> AllNew<'a, H> + where + H: InsertImmutable, + { + pub fn new(all_seen: &'a H) -> Self { AllNew { objects: Default::default(), all_seen, @@ -279,7 +346,10 @@ mod tree { } } - impl<'a> Visit for AllNew<'a> { + impl<'a, H> Visit for AllNew<'a, H> + where + H: InsertImmutable, + { fn pop_front_tracked_path_and_set_current(&mut self) {} fn push_back_tracked_path_component(&mut self, _component: &BStr) {} @@ -304,18 +374,21 @@ mod tree { } pub mod traverse { - use dashmap::DashSet; + use crate::data::output::count::iter_from_objects::util::InsertImmutable; use git_hash::ObjectId; use git_object::{bstr::BStr, immutable::tree::Entry}; use git_traverse::tree::visit::{Action, Visit}; - pub struct AllUnseen<'a> { + pub struct AllUnseen<'a, H> { pub objects: Vec, - all_seen: &'a DashSet, + all_seen: &'a H, } - impl<'a> AllUnseen<'a> { - pub fn new(all_seen: &'a DashSet) -> Self { + impl<'a, H> AllUnseen<'a, H> + where + H: InsertImmutable, + { + pub fn new(all_seen: &'a H) -> Self { AllUnseen { objects: Default::default(), all_seen, @@ -326,7 +399,10 @@ mod tree { } } - impl<'a> Visit for AllUnseen<'a> { + impl<'a, H> Visit for AllUnseen<'a, H> + where + H: InsertImmutable, + { fn pop_front_tracked_path_and_set_current(&mut self) {} fn push_back_tracked_path_component(&mut self, _component: &BStr) {} @@ -358,7 +434,7 @@ mod tree { fn push_obj_count_unique( out: &mut Vec, - all_seen: &DashSet, + all_seen: &impl util::InsertImmutable, id: &oid, obj: &crate::data::Object<'_>, progress: &mut impl Progress, @@ -392,6 +468,30 @@ fn id_to_count( } mod util { + pub trait InsertImmutable { + fn insert(&self, item: Item) -> bool; + } + + mod trait_impls { + use super::InsertImmutable; + use dashmap::DashSet; + use std::cell::RefCell; + use std::collections::HashSet; + use std::hash::Hash; + + impl InsertImmutable for DashSet { + fn insert(&self, item: T) -> bool { + self.insert(item) + } + } + + impl InsertImmutable for RefCell> { + fn insert(&self, item: T) -> bool { + self.borrow_mut().insert(item) + } + } + } + pub struct Chunks { pub size: usize, pub iter: I, @@ -431,7 +531,7 @@ mod types { pub expanded_objects: usize, /// The amount of fully decoded objects. These are the most expensive as they are fully decoded pub decoded_objects: usize, - /// The total amount of objects seed. Should be `expanded_objects + input_objects`. + /// The total amount of encountered objects. Should be `expanded_objects + input_objects`. pub total_objects: usize, } @@ -488,7 +588,6 @@ mod types { /// especially when tree traversal is involved. Thus deterministic ordering requires `Some(1)` to be set. pub thread_limit: Option, /// The amount of objects per chunk or unit of work to be sent to threads for processing - /// TODO: could this become the window size? pub chunk_size: usize, /// The way input objects are handled pub input_object_expansion: ObjectExpansion, @@ -507,20 +606,27 @@ mod types { /// The error returned by the pack generation iterator [bytes::FromEntriesIter][crate::data::output::bytes::FromEntriesIter]. #[derive(Debug, thiserror::Error)] #[allow(missing_docs)] - pub enum Error + pub enum Error where FindErr: std::error::Error + 'static, + IterErr: std::error::Error + 'static, { #[error(transparent)] CommitDecode(git_object::immutable::object::decode::Error), #[error(transparent)] FindExisting(#[from] FindErr), #[error(transparent)] + InputIteration(IterErr), + #[error(transparent)] TreeTraverse(git_traverse::tree::breadthfirst::Error), #[error(transparent)] TreeChanges(git_diff::tree::changes::Error), + #[error("Operation interrupted")] + Interrupted, } } +use std::cell::RefCell; +use std::collections::HashSet; pub use types::{Error, ObjectExpansion, Options, Outcome}; mod reduce { @@ -530,37 +636,50 @@ mod reduce { use super::Outcome; use crate::data::output; + use git_features::progress::Progress; + use std::sync::Arc; - pub struct Statistics { + pub struct Statistics { total: Outcome, + counts: Vec, + progress: Arc>, _err: PhantomData, } - impl Default for Statistics { - fn default() -> Self { + impl Statistics + where + P: Progress, + { + pub fn new(progress: Arc>) -> Self { Statistics { total: Default::default(), + counts: Default::default(), + progress, _err: PhantomData::default(), } } } - impl parallel::Reduce for Statistics { - type Input = Result<(Vec, Outcome), Error>; - type FeedProduce = Vec; - type Output = Outcome; - type Error = Error; + impl parallel::Reduce for Statistics + where + P: Progress, + { + type Input = Result<(Vec, Outcome), E>; + type FeedProduce = (); + type Output = (Vec, Outcome); + type Error = E; fn feed(&mut self, item: Self::Input) -> Result { - item.map(|(counts, mut stats)| { - stats.total_objects = counts.len(); - self.total.aggregate(stats); - counts - }) + let (counts, mut stats) = item?; + stats.total_objects = counts.len(); + self.total.aggregate(stats); + self.progress.lock().inc_by(counts.len()); + self.counts.extend(counts); + Ok(()) } fn finalize(self) -> Result { - Ok(self.total) + Ok((self.counts, self.total)) } } } diff --git a/git-pack/src/data/output/count/mod.rs b/git-pack/src/data/output/count/mod.rs index 65382445314..f3cf5c2c697 100644 --- a/git-pack/src/data/output/count/mod.rs +++ b/git-pack/src/data/output/count/mod.rs @@ -27,4 +27,4 @@ impl Count { /// pub mod iter_from_objects; -pub use iter_from_objects::iter_from_objects; +pub use iter_from_objects::{objects, objects_unthreaded}; diff --git a/git-pack/tests/pack/data/output/count_and_entries.rs b/git-pack/tests/pack/data/output/count_and_entries.rs index 285c14ca3b4..09de2008ae5 100644 --- a/git-pack/tests/pack/data/output/count_and_entries.rs +++ b/git-pack/tests/pack/data/output/count_and_entries.rs @@ -12,6 +12,7 @@ use crate::pack::{ data::output::{db, DbKind}, hex_to_id, }; +use std::convert::Infallible; #[test] fn traversals() -> crate::Result { @@ -240,7 +241,7 @@ fn traversals() -> crate::Result { } let deterministic_count_needs_single_thread = Some(1); - let mut counts_iter = output::count::iter_from_objects( + let (counts, stats) = output::count::objects( db.clone(), || pack::cache::Never, commits @@ -250,20 +251,16 @@ fn traversals() -> crate::Result { } else { "e3fb53cbb4c346d48732a24f09cf445e49bc63d6" }))) - .filter(|o| !o.is_null()), + .filter(|o| !o.is_null()) + .map(Ok::<_, Infallible>), progress::Discard, + &AtomicBool::new(false), count::iter_from_objects::Options { input_object_expansion: expansion_mode, thread_limit: deterministic_count_needs_single_thread, ..Default::default() }, - ); - let counts: Vec<_> = counts_iter - .by_ref() - .collect::, _>>()? - .into_iter() - .flatten() - .collect(); + )?; let actual_count = counts.iter().fold(ObjectCount::default(), |mut c, e| { let mut buf = Vec::new(); if let Some(obj) = db.find_existing(e.id, &mut buf, &mut pack::cache::Never).ok() { @@ -275,7 +272,6 @@ fn traversals() -> crate::Result { let counts_len = counts.len(); assert_eq!(counts_len, expected_obj_count.total()); - let stats = counts_iter.finalize()?; assert_eq!(stats, expected_counts_outcome); assert_eq!(stats.total_objects, expected_obj_count.total()); diff --git a/gitoxide-core/src/pack/create.rs b/gitoxide-core/src/pack/create.rs index ba0b06ca83e..7fa2e1783fb 100644 --- a/gitoxide-core/src/pack/create.rs +++ b/gitoxide-core/src/pack/create.rs @@ -97,17 +97,22 @@ pub fn create( where W: std::io::Write, { + // TODO: review need for Arc for the counting part, use different kinds of Easy depending on need let repo = Arc::new(git_repository::discover(repository_path)?); progress.init(Some(2), progress::steps()); let tips = tips.into_iter(); let make_cancellation_err = || anyhow!("Cancelled by user"); - let (db, input): (_, Box + Send + 'static>) = match input { + let (db, input): ( + _, + Box> + Send>, + ) = match input { None => { let mut progress = progress.add_child("traversing"); progress.init(None, progress::count("commits")); let tips = tips .map(|tip| { ObjectId::from_hex(&Vec::from_os_str_lossy(tip.as_ref())).or_else({ + // TODO: Use Easy when ready… let packed = repo.refs.packed().ok().flatten(); let refs = &repo.refs; let repo = Arc::clone(&repo); @@ -132,13 +137,14 @@ where Err(_) => unreachable!("there is only one instance left here"), }; let iter = Box::new( + // TODO: Easy-based traversal traverse::commit::Ancestors::new(tips, traverse::commit::ancestors::State::default(), { let db = Arc::clone(&db); move |oid, buf| db.find_existing_commit_iter(oid, buf, &mut pack::cache::Never).ok() }) - .inspect(move |_| progress.inc()) - .map(Result::unwrap), - ); // todo: should there be a better way, maybe let input support Option or Result + .map(|res| res.map_err(Into::into)) + .inspect(move |_| progress.inc()), + ); (db, iter) } Some(input) => { @@ -146,55 +152,61 @@ where Ok(repo) => Arc::new(repo.odb), Err(_) => unreachable!("there is only one instance left here"), }; - let iter = Box::new(input.lines().filter_map(|hex_id| { + let iter = Box::new(input.lines().map(|hex_id| { hex_id - .ok() - .and_then(|hex_id| ObjectId::from_hex(hex_id.as_bytes()).ok()) - // todo: should there be a better way, maybe let input support Option or Result + .map_err(Into::into) + .and_then(|hex_id| ObjectId::from_hex(hex_id.as_bytes()).map_err(Into::into)) })); (db, iter) } }; let mut stats = Statistics::default(); - let chunk_size = 200; - let start = Instant::now(); + let chunk_size = 1000; // What's a good value for this? let counts = { let mut progress = progress.add_child("counting"); progress.init(None, progress::count("objects")); - let mut interruptible_counts_iter = interrupt::Iter::new( - pack::data::output::count::iter_from_objects( - Arc::clone(&db), - move || { - if nondeterministic_count { - Box::new(pack::cache::lru::StaticLinkedList::<64>::default()) as Box - } else { - Box::new(pack::cache::lru::MemoryCappedHashmap::new(400 * 1024 * 1024)) as Box - // todo: Make that configurable - } - }, + let may_use_multiple_threads = nondeterministic_count || matches!(expansion, ObjectExpansion::None); + let thread_limit = if may_use_multiple_threads { + thread_limit + } else { + Some(1) + }; + let make_cache = move || { + if may_use_multiple_threads { + Box::new(pack::cache::lru::StaticLinkedList::<64>::default()) as Box + } else { + Box::new(pack::cache::lru::MemoryCappedHashmap::new(400 * 1024 * 1024)) as Box + // todo: Make that configurable + } + }; + let db = Arc::clone(&db); + let progress = progress::ThroughputOnDrop::new(progress); + let input_object_expansion = expansion.into(); + let (mut counts, count_stats) = if may_use_multiple_threads { + pack::data::output::count::objects( + db, + make_cache, input, - progress.add_child("threads"), + progress, + &interrupt::IS_INTERRUPTED, pack::data::output::count::iter_from_objects::Options { - thread_limit: if nondeterministic_count || matches!(expansion, ObjectExpansion::None) { - thread_limit - } else { - Some(1) - }, + thread_limit, chunk_size, - input_object_expansion: expansion.into(), + input_object_expansion, }, - ), - make_cancellation_err, - ); - let mut counts = Vec::new(); - for c in interruptible_counts_iter.by_ref() { - let c = c??; - progress.inc_by(c.len()); - counts.extend(c.into_iter()); - } - stats.counts = interruptible_counts_iter.into_inner().finalize()?; - progress.show_throughput(start); + )? + } else { + pack::data::output::count::objects_unthreaded( + db, + &mut make_cache(), + input, + progress, + &interrupt::IS_INTERRUPTED, + input_object_expansion, + )? + }; + stats.counts = count_stats; counts.shrink_to_fit(); counts }; @@ -218,7 +230,7 @@ where )) }; - let mut entries_progress = progress.add_child("consumed"); + let mut entries_progress = progress.add_child("consuming"); entries_progress.init(Some(num_objects), progress::count("entries")); let mut write_progress = progress.add_child("writing"); write_progress.init(None, progress::bytes()); @@ -334,3 +346,29 @@ struct Statistics { counts: pack::data::output::count::iter_from_objects::Outcome, entries: pack::data::output::entry::iter_from_counts::Outcome, } + +pub mod input_iteration { + use git_repository::{hash, traverse}; + + use quick_error::quick_error; + quick_error! { + #[derive(Debug)] + pub enum Error { + Iteration(err: traverse::commit::ancestors::Error) { + display("input objects couldn't be iterated completely") + from() + source(err) + } + InputLinesIo(err: std::io::Error) { + display("An error occurred while reading hashes from standard input") + from() + source(err) + } + HashDecode(err: hash::decode::Error) { + display("Could not decode hex hash provided on standard input") + from() + source(err) + } + } + } +}