From d4605cde6d825c0bfaf4282c4cbd85d9f07dc43f Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 22 Aug 2021 09:54:18 +0800 Subject: [PATCH 01/11] [features] refactor --- git-features/src/parallel/mod.rs | 4 ++-- git-features/src/parallel/serial.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/git-features/src/parallel/mod.rs b/git-features/src/parallel/mod.rs index 58f7083d3a2..7d6a38e8aa0 100644 --- a/git-features/src/parallel/mod.rs +++ b/git-features/src/parallel/mod.rs @@ -35,11 +35,11 @@ #[cfg(feature = "parallel")] mod in_parallel; #[cfg(feature = "parallel")] -pub use in_parallel::*; +pub use in_parallel::{in_parallel, join}; mod serial; #[cfg(not(feature = "parallel"))] -pub use serial::*; +pub use serial::{in_parallel, join}; mod eager_iter; pub use eager_iter::{EagerIter, EagerIterIf}; diff --git a/git-features/src/parallel/serial.rs b/git-features/src/parallel/serial.rs index cddde00db49..ea22a37bb5b 100644 --- a/git-features/src/parallel/serial.rs +++ b/git-features/src/parallel/serial.rs @@ -1,7 +1,7 @@ use crate::parallel::Reduce; -#[cfg(not(feature = "parallel"))] /// Runs `left` and then `right`, one after another, returning their output when both are done. +#[cfg(not(feature = "parallel"))] pub fn join(left: impl FnOnce() -> O1 + Send, right: impl FnOnce() -> O2 + Send) -> (O1, O2) { (left(), right()) } From 04fe855a37577d3da5bbd619807b44e449947893 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 22 Aug 2021 10:22:49 +0800 Subject: [PATCH 02/11] =?UTF-8?q?[pack]=20A=20non-iterator=20version=20of?= =?UTF-8?q?=20parallel=20object=20counting=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …which is good enough for tests but the real world example shows that it needs some additional changes. --- .../data/output/count/iter_from_objects.rs | 279 +++++++++++++++++- git-pack/src/data/output/count/mod.rs | 2 +- .../pack/data/output/count_and_entries.rs | 11 +- gitoxide-core/src/pack/create.rs | 2 +- 4 files changed, 282 insertions(+), 12 deletions(-) diff --git a/git-pack/src/data/output/count/iter_from_objects.rs b/git-pack/src/data/output/count/iter_from_objects.rs index e37c1aa7a5b..0fa3765bcbc 100644 --- a/git-pack/src/data/output/count/iter_from_objects.rs +++ b/git-pack/src/data/output/count/iter_from_objects.rs @@ -252,6 +252,248 @@ where ) } +/// Generate [`Count`][output::Count]s from input `objects` with object expansion based on [`options`][Options] +/// to learn which objects would would constitute a pack. This step is required to know exactly how many objects would +/// be in a pack while keeping data around to avoid minimize object database access. +/// +/// A [`Count`][output::Count] object maintains enough state to greatly accelerate future access of packed objects. +/// +/// * `db` - the object store to use for accessing objects. +/// * `make_cache` - a function to create thread-local pack caches +/// * `objects_ids` +/// * A list of objects ids to add to the pack. Duplication checks are performed so no object is ever added to a pack twice. +/// * Objects may be expanded based on the provided [`options`][Options] +/// * `progress` +/// * a way to obtain progress information +/// * `options` +/// * more configuration +pub fn objects( + db: Find, + make_cache: impl Fn() -> Cache + Send + Sync, + objects_ids: Iter, + progress: impl Progress, + Options { + thread_limit, + input_object_expansion, + chunk_size, + }: Options, +) -> Result<(Vec, Outcome), Error>> +where + Find: crate::Find + Send + Sync, + ::Error: Send, + Iter: Iterator + Send, + Oid: AsRef + Send, + Cache: crate::cache::DecodeEntry, +{ + let lower_bound = objects_ids.size_hint().0; + let (chunk_size, thread_limit, _) = parallel::optimize_chunk_size_and_thread_limit( + chunk_size, + if lower_bound == 0 { None } else { Some(lower_bound) }, + thread_limit, + None, + ); + let chunks = util::Chunks { + iter: objects_ids, + size: chunk_size, + }; + let seen_objs = dashmap::DashSet::::new(); + let progress = parking_lot::Mutex::new(progress); + + parallel::in_parallel( + chunks, + thread_limit, + { + move |n| { + ( + Vec::new(), // object data buffer + Vec::new(), // object data buffer 2 to hold two objects at a time + make_cache(), // cache to speed up pack operations + { + let mut p = progress.lock().add_child(format!("thread {}", n)); + p.init(None, git_features::progress::count("objects")); + p + }, + ) + } + }, + { + move |oids: Vec, (buf1, buf2, cache, progress)| { + use ObjectExpansion::*; + let mut out = Vec::new(); + let mut tree_traversal_state = git_traverse::tree::breadthfirst::State::default(); + let mut tree_diff_state = git_diff::tree::State::default(); + let mut parent_commit_ids = Vec::new(); + let mut traverse_delegate = tree::traverse::AllUnseen::new(&seen_objs); + let mut changes_delegate = tree::changes::AllNew::new(&seen_objs); + let mut outcome = Outcome::default(); + let stats = &mut outcome; + + for id in oids.into_iter() { + let id = id.as_ref(); + let obj = db.find_existing(id, buf1, cache)?; + stats.input_objects += 1; + match input_object_expansion { + TreeAdditionsComparedToAncestor => { + use git_object::Kind::*; + let mut obj = obj; + let mut id = id.to_owned(); + + loop { + push_obj_count_unique(&mut out, &seen_objs, &id, &obj, progress, stats, false); + match obj.kind { + Tree | Blob => break, + Tag => { + id = immutable::TagIter::from_bytes(obj.data) + .target_id() + .expect("every tag has a target"); + obj = db.find_existing(id, buf1, cache)?; + stats.expanded_objects += 1; + continue; + } + Commit => { + let current_tree_iter = { + let mut commit_iter = immutable::CommitIter::from_bytes(obj.data); + let tree_id = commit_iter.tree_id().expect("every commit has a tree"); + parent_commit_ids.clear(); + for token in commit_iter { + match token { + Ok(immutable::commit::iter::Token::Parent { id }) => { + parent_commit_ids.push(id) + } + Ok(_) => break, + Err(err) => return Err(Error::CommitDecode(err)), + } + } + let obj = db.find_existing(tree_id, buf1, cache)?; + push_obj_count_unique( + &mut out, &seen_objs, &tree_id, &obj, progress, stats, true, + ); + immutable::TreeIter::from_bytes(obj.data) + }; + + let objects = if parent_commit_ids.is_empty() { + traverse_delegate.clear(); + git_traverse::tree::breadthfirst( + current_tree_iter, + &mut tree_traversal_state, + |oid, buf| { + stats.decoded_objects += 1; + db.find_existing_tree_iter(oid, buf, cache).ok() + }, + &mut traverse_delegate, + ) + .map_err(Error::TreeTraverse)?; + &traverse_delegate.objects + } else { + for commit_id in &parent_commit_ids { + let parent_tree_id = { + let parent_commit_obj = db.find_existing(commit_id, buf2, cache)?; + + push_obj_count_unique( + &mut out, + &seen_objs, + commit_id, + &parent_commit_obj, + progress, + stats, + true, + ); + immutable::CommitIter::from_bytes(parent_commit_obj.data) + .tree_id() + .expect("every commit has a tree") + }; + let parent_tree = { + let parent_tree_obj = + db.find_existing(parent_tree_id, buf2, cache)?; + push_obj_count_unique( + &mut out, + &seen_objs, + &parent_tree_id, + &parent_tree_obj, + progress, + stats, + true, + ); + immutable::TreeIter::from_bytes(parent_tree_obj.data) + }; + + changes_delegate.clear(); + git_diff::tree::Changes::from(Some(parent_tree)) + .needed_to_obtain( + current_tree_iter.clone(), + &mut tree_diff_state, + |oid, buf| { + stats.decoded_objects += 1; + db.find_existing_tree_iter(oid, buf, cache).ok() + }, + &mut changes_delegate, + ) + .map_err(Error::TreeChanges)?; + } + &changes_delegate.objects + }; + for id in objects.iter() { + out.push(id_to_count(&db, buf2, id, progress, stats)); + } + break; + } + } + } + } + TreeContents => { + use git_object::Kind::*; + let mut id: ObjectId = id.into(); + let mut obj = obj; + loop { + push_obj_count_unique(&mut out, &seen_objs, &id, &obj, progress, stats, false); + match obj.kind { + Tree => { + traverse_delegate.clear(); + git_traverse::tree::breadthfirst( + git_object::immutable::TreeIter::from_bytes(obj.data), + &mut tree_traversal_state, + |oid, buf| { + stats.decoded_objects += 1; + db.find_existing_tree_iter(oid, buf, cache).ok() + }, + &mut traverse_delegate, + ) + .map_err(Error::TreeTraverse)?; + for id in traverse_delegate.objects.iter() { + out.push(id_to_count(&db, buf1, id, progress, stats)); + } + break; + } + Commit => { + id = immutable::CommitIter::from_bytes(obj.data) + .tree_id() + .expect("every commit has a tree"); + stats.expanded_objects += 1; + obj = db.find_existing(id, buf1, cache)?; + continue; + } + Blob => break, + Tag => { + id = immutable::TagIter::from_bytes(obj.data) + .target_id() + .expect("every tag has a target"); + stats.expanded_objects += 1; + obj = db.find_existing(id, buf1, cache)?; + continue; + } + } + } + } + AsIs => push_obj_count_unique(&mut out, &seen_objs, id, &obj, progress, stats, false), + } + } + Ok((out, outcome)) + } + }, + reduce::Statistics2::default(), + ) +} + mod tree { pub mod changes { use dashmap::DashSet; @@ -431,7 +673,7 @@ mod types { pub expanded_objects: usize, /// The amount of fully decoded objects. These are the most expensive as they are fully decoded pub decoded_objects: usize, - /// The total amount of objects seed. Should be `expanded_objects + input_objects`. + /// The total amount of encountered objects. Should be `expanded_objects + input_objects`. pub total_objects: usize, } @@ -563,4 +805,39 @@ mod reduce { Ok(self.total) } } + + pub struct Statistics2 { + total: Outcome, + counts: Vec, + _err: PhantomData, + } + + impl Default for Statistics2 { + fn default() -> Self { + Statistics2 { + total: Default::default(), + counts: Default::default(), + _err: PhantomData::default(), + } + } + } + + impl parallel::Reduce for Statistics2 { + type Input = Result<(Vec, Outcome), Error>; + type FeedProduce = (); + type Output = (Vec, Outcome); + type Error = Error; + + fn feed(&mut self, item: Self::Input) -> Result { + let (counts, mut stats) = item?; + stats.total_objects = counts.len(); + self.total.aggregate(stats); + self.counts.extend(counts); + Ok(()) + } + + fn finalize(self) -> Result { + Ok((self.counts, self.total)) + } + } } diff --git a/git-pack/src/data/output/count/mod.rs b/git-pack/src/data/output/count/mod.rs index 65382445314..f530253817b 100644 --- a/git-pack/src/data/output/count/mod.rs +++ b/git-pack/src/data/output/count/mod.rs @@ -27,4 +27,4 @@ impl Count { /// pub mod iter_from_objects; -pub use iter_from_objects::iter_from_objects; +pub use iter_from_objects::{iter_from_objects, objects}; diff --git a/git-pack/tests/pack/data/output/count_and_entries.rs b/git-pack/tests/pack/data/output/count_and_entries.rs index 285c14ca3b4..d16a7b746a3 100644 --- a/git-pack/tests/pack/data/output/count_and_entries.rs +++ b/git-pack/tests/pack/data/output/count_and_entries.rs @@ -240,7 +240,7 @@ fn traversals() -> crate::Result { } let deterministic_count_needs_single_thread = Some(1); - let mut counts_iter = output::count::iter_from_objects( + let (counts, stats) = output::count::objects( db.clone(), || pack::cache::Never, commits @@ -257,13 +257,7 @@ fn traversals() -> crate::Result { thread_limit: deterministic_count_needs_single_thread, ..Default::default() }, - ); - let counts: Vec<_> = counts_iter - .by_ref() - .collect::, _>>()? - .into_iter() - .flatten() - .collect(); + )?; let actual_count = counts.iter().fold(ObjectCount::default(), |mut c, e| { let mut buf = Vec::new(); if let Some(obj) = db.find_existing(e.id, &mut buf, &mut pack::cache::Never).ok() { @@ -275,7 +269,6 @@ fn traversals() -> crate::Result { let counts_len = counts.len(); assert_eq!(counts_len, expected_obj_count.total()); - let stats = counts_iter.finalize()?; assert_eq!(stats, expected_counts_outcome); assert_eq!(stats.total_objects, expected_obj_count.total()); diff --git a/gitoxide-core/src/pack/create.rs b/gitoxide-core/src/pack/create.rs index ba0b06ca83e..15c683e8152 100644 --- a/gitoxide-core/src/pack/create.rs +++ b/gitoxide-core/src/pack/create.rs @@ -218,7 +218,7 @@ where )) }; - let mut entries_progress = progress.add_child("consumed"); + let mut entries_progress = progress.add_child("consuming"); entries_progress.init(Some(num_objects), progress::count("entries")); let mut write_progress = progress.add_child("writing"); write_progress.init(None, progress::bytes()); From 0958fc8dbaa72dda0c1e2d40a88d74b4e18bfe39 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 22 Aug 2021 11:23:52 +0800 Subject: [PATCH 03/11] [features] refactor --- git-features/src/interrupt.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/git-features/src/interrupt.rs b/git-features/src/interrupt.rs index 3b86433a07d..1f90465594b 100644 --- a/git-features/src/interrupt.rs +++ b/git-features/src/interrupt.rs @@ -10,7 +10,6 @@ pub struct Iter<'a, I, EFN> { pub inner: I, make_err: Option, should_interrupt: &'a AtomicBool, - is_done: bool, } impl<'a, I, EFN, E> Iter<'a, I, EFN> @@ -25,7 +24,6 @@ where inner, make_err: Some(make_err), should_interrupt, - is_done: false, } } } @@ -38,17 +36,16 @@ where type Item = Result; fn next(&mut self) -> Option { - if self.is_done { + if self.make_err.is_none() { return None; } if self.should_interrupt.load(Ordering::Relaxed) { - self.is_done = true; return Some(Err(self.make_err.take().expect("no bug")())); } match self.inner.next() { Some(next) => Some(Ok(next)), None => { - self.is_done = true; + self.make_err = None; None } } From 7ec2f2b40e83aaa218360a8b5989792cd67de2ed Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 22 Aug 2021 11:51:57 +0800 Subject: [PATCH 04/11] =?UTF-8?q?[pack=20#167]=20remove=20iterator=20based?= =?UTF-8?q?=20count=20objects=20impl=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …and backport all capabilities like progress reporting and interruptability to something that's semantically similar. --- .../data/output/count/iter_from_objects.rs | 298 +----------------- git-pack/src/data/output/count/mod.rs | 2 +- .../pack/data/output/count_and_entries.rs | 1 + gitoxide-core/src/pack/create.rs | 65 ++-- 4 files changed, 47 insertions(+), 319 deletions(-) diff --git a/git-pack/src/data/output/count/iter_from_objects.rs b/git-pack/src/data/output/count/iter_from_objects.rs index 0fa3765bcbc..de552c58231 100644 --- a/git-pack/src/data/output/count/iter_from_objects.rs +++ b/git-pack/src/data/output/count/iter_from_objects.rs @@ -7,251 +7,6 @@ use git_object::immutable; use crate::{data::output, find, FindExt}; -/// Generate [`Count`][output::Count] from input `objects` with object expansion based on [`options`][Options] -/// to learn which objects would be part of a pack. -/// -/// A [`Count`][output::Count] object maintains enough state to greatly accelerate future access of packed objects. -/// -/// * `db` - the object store to use for accessing objects. -/// * `make_cache` - a function to create thread-local pack caches -/// * `objects_ids` -/// * A list of objects ids to add to the pack. Duplication checks are performed so no object is ever added to a pack twice. -/// * Objects may be expanded based on the provided [`options`][Options] -/// * `progress` -/// * a way to obtain progress information -/// * `options` -/// * more configuration -pub fn iter_from_objects( - db: Find, - make_cache: impl Fn() -> Cache + Send + Clone + Sync + 'static, - objects_ids: Iter, - progress: impl Progress, - Options { - thread_limit, - input_object_expansion, - chunk_size, - }: Options, -) -> impl Iterator, Error>>> - + parallel::reduce::Finalize>>> -where - Find: crate::Find + Clone + Send + Sync + 'static, - ::Error: Send, - Iter: Iterator + Send + 'static, - Oid: AsRef + Send + 'static, - Cache: crate::cache::DecodeEntry, -{ - let lower_bound = objects_ids.size_hint().0; - let (chunk_size, thread_limit, _) = parallel::optimize_chunk_size_and_thread_limit( - chunk_size, - if lower_bound == 0 { None } else { Some(lower_bound) }, - thread_limit, - None, - ); - let chunks = util::Chunks { - iter: objects_ids, - size: chunk_size, - }; - let seen_objs = Arc::new(dashmap::DashSet::::new()); - let progress = Arc::new(parking_lot::Mutex::new(progress)); - - parallel::reduce::Stepwise::new( - chunks, - thread_limit, - { - let progress = Arc::clone(&progress); - move |n| { - ( - Vec::new(), // object data buffer - Vec::new(), // object data buffer 2 to hold two objects at a time - make_cache(), // cache to speed up pack operations - { - let mut p = progress.lock().add_child(format!("thread {}", n)); - p.init(None, git_features::progress::count("objects")); - p - }, - ) - } - }, - { - let seen_objs = Arc::clone(&seen_objs); - move |oids: Vec, (buf1, buf2, cache, progress)| { - use ObjectExpansion::*; - let mut out = Vec::new(); - let mut tree_traversal_state = git_traverse::tree::breadthfirst::State::default(); - let mut tree_diff_state = git_diff::tree::State::default(); - let mut parent_commit_ids = Vec::new(); - let seen_objs = seen_objs.as_ref(); - let mut traverse_delegate = tree::traverse::AllUnseen::new(seen_objs); - let mut changes_delegate = tree::changes::AllNew::new(seen_objs); - let mut outcome = Outcome::default(); - let stats = &mut outcome; - - for id in oids.into_iter() { - let id = id.as_ref(); - let obj = db.find_existing(id, buf1, cache)?; - stats.input_objects += 1; - match input_object_expansion { - TreeAdditionsComparedToAncestor => { - use git_object::Kind::*; - let mut obj = obj; - let mut id = id.to_owned(); - - loop { - push_obj_count_unique(&mut out, seen_objs, &id, &obj, progress, stats, false); - match obj.kind { - Tree | Blob => break, - Tag => { - id = immutable::TagIter::from_bytes(obj.data) - .target_id() - .expect("every tag has a target"); - obj = db.find_existing(id, buf1, cache)?; - stats.expanded_objects += 1; - continue; - } - Commit => { - let current_tree_iter = { - let mut commit_iter = immutable::CommitIter::from_bytes(obj.data); - let tree_id = commit_iter.tree_id().expect("every commit has a tree"); - parent_commit_ids.clear(); - for token in commit_iter { - match token { - Ok(immutable::commit::iter::Token::Parent { id }) => { - parent_commit_ids.push(id) - } - Ok(_) => break, - Err(err) => return Err(Error::CommitDecode(err)), - } - } - let obj = db.find_existing(tree_id, buf1, cache)?; - push_obj_count_unique( - &mut out, seen_objs, &tree_id, &obj, progress, stats, true, - ); - immutable::TreeIter::from_bytes(obj.data) - }; - - let objects = if parent_commit_ids.is_empty() { - traverse_delegate.clear(); - git_traverse::tree::breadthfirst( - current_tree_iter, - &mut tree_traversal_state, - |oid, buf| { - stats.decoded_objects += 1; - db.find_existing_tree_iter(oid, buf, cache).ok() - }, - &mut traverse_delegate, - ) - .map_err(Error::TreeTraverse)?; - &traverse_delegate.objects - } else { - for commit_id in &parent_commit_ids { - let parent_tree_id = { - let parent_commit_obj = db.find_existing(commit_id, buf2, cache)?; - - push_obj_count_unique( - &mut out, - seen_objs, - commit_id, - &parent_commit_obj, - progress, - stats, - true, - ); - immutable::CommitIter::from_bytes(parent_commit_obj.data) - .tree_id() - .expect("every commit has a tree") - }; - let parent_tree = { - let parent_tree_obj = - db.find_existing(parent_tree_id, buf2, cache)?; - push_obj_count_unique( - &mut out, - seen_objs, - &parent_tree_id, - &parent_tree_obj, - progress, - stats, - true, - ); - immutable::TreeIter::from_bytes(parent_tree_obj.data) - }; - - changes_delegate.clear(); - git_diff::tree::Changes::from(Some(parent_tree)) - .needed_to_obtain( - current_tree_iter.clone(), - &mut tree_diff_state, - |oid, buf| { - stats.decoded_objects += 1; - db.find_existing_tree_iter(oid, buf, cache).ok() - }, - &mut changes_delegate, - ) - .map_err(Error::TreeChanges)?; - } - &changes_delegate.objects - }; - for id in objects.iter() { - out.push(id_to_count(&db, buf2, id, progress, stats)); - } - break; - } - } - } - } - TreeContents => { - use git_object::Kind::*; - let mut id: ObjectId = id.into(); - let mut obj = obj; - loop { - push_obj_count_unique(&mut out, seen_objs, &id, &obj, progress, stats, false); - match obj.kind { - Tree => { - traverse_delegate.clear(); - git_traverse::tree::breadthfirst( - git_object::immutable::TreeIter::from_bytes(obj.data), - &mut tree_traversal_state, - |oid, buf| { - stats.decoded_objects += 1; - db.find_existing_tree_iter(oid, buf, cache).ok() - }, - &mut traverse_delegate, - ) - .map_err(Error::TreeTraverse)?; - for id in traverse_delegate.objects.iter() { - out.push(id_to_count(&db, buf1, id, progress, stats)); - } - break; - } - Commit => { - id = immutable::CommitIter::from_bytes(obj.data) - .tree_id() - .expect("every commit has a tree"); - stats.expanded_objects += 1; - obj = db.find_existing(id, buf1, cache)?; - continue; - } - Blob => break, - Tag => { - id = immutable::TagIter::from_bytes(obj.data) - .target_id() - .expect("every tag has a target"); - stats.expanded_objects += 1; - obj = db.find_existing(id, buf1, cache)?; - continue; - } - } - } - } - AsIs => push_obj_count_unique(&mut out, seen_objs, id, &obj, progress, stats, false), - } - } - Ok((out, outcome)) - } - }, - reduce::Statistics::default(), - ) -} - /// Generate [`Count`][output::Count]s from input `objects` with object expansion based on [`options`][Options] /// to learn which objects would would constitute a pack. This step is required to know exactly how many objects would /// be in a pack while keeping data around to avoid minimize object database access. @@ -265,6 +20,8 @@ where /// * Objects may be expanded based on the provided [`options`][Options] /// * `progress` /// * a way to obtain progress information +/// * `should_interrupt` +/// * A flag that is set to true if the operation should stop /// * `options` /// * more configuration pub fn objects( @@ -272,6 +29,7 @@ pub fn objects( make_cache: impl Fn() -> Cache + Send + Sync, objects_ids: Iter, progress: impl Progress, + should_interrupt: &AtomicBool, Options { thread_limit, input_object_expansion, @@ -297,12 +55,13 @@ where size: chunk_size, }; let seen_objs = dashmap::DashSet::::new(); - let progress = parking_lot::Mutex::new(progress); + let top_level_progress = Arc::new(parking_lot::Mutex::new(progress)); parallel::in_parallel( chunks, thread_limit, { + let progress = Arc::clone(&top_level_progress); move |n| { ( Vec::new(), // object data buffer @@ -317,6 +76,7 @@ where } }, { + let top_level_progress = Arc::clone(&top_level_progress); move |oids: Vec, (buf1, buf2, cache, progress)| { use ObjectExpansion::*; let mut out = Vec::new(); @@ -327,6 +87,9 @@ where let mut changes_delegate = tree::changes::AllNew::new(&seen_objs); let mut outcome = Outcome::default(); let stats = &mut outcome; + if should_interrupt.load(Ordering::Relaxed) { + return Err(Error::Interrupted); + } for id in oids.into_iter() { let id = id.as_ref(); @@ -487,10 +250,11 @@ where AsIs => push_obj_count_unique(&mut out, &seen_objs, id, &obj, progress, stats, false), } } + top_level_progress.lock().inc_by(out.len()); Ok((out, outcome)) } }, - reduce::Statistics2::default(), + reduce::Statistics::default(), ) } @@ -761,8 +525,11 @@ mod types { TreeTraverse(git_traverse::tree::breadthfirst::Error), #[error(transparent)] TreeChanges(git_diff::tree::changes::Error), + #[error("Operation interrupted")] + Interrupted, } } +use std::sync::atomic::{AtomicBool, Ordering}; pub use types::{Error, ObjectExpansion, Options, Outcome}; mod reduce { @@ -775,46 +542,13 @@ mod reduce { pub struct Statistics { total: Outcome, + counts: Vec, _err: PhantomData, } impl Default for Statistics { fn default() -> Self { Statistics { - total: Default::default(), - _err: PhantomData::default(), - } - } - } - - impl parallel::Reduce for Statistics { - type Input = Result<(Vec, Outcome), Error>; - type FeedProduce = Vec; - type Output = Outcome; - type Error = Error; - - fn feed(&mut self, item: Self::Input) -> Result { - item.map(|(counts, mut stats)| { - stats.total_objects = counts.len(); - self.total.aggregate(stats); - counts - }) - } - - fn finalize(self) -> Result { - Ok(self.total) - } - } - - pub struct Statistics2 { - total: Outcome, - counts: Vec, - _err: PhantomData, - } - - impl Default for Statistics2 { - fn default() -> Self { - Statistics2 { total: Default::default(), counts: Default::default(), _err: PhantomData::default(), @@ -822,7 +556,7 @@ mod reduce { } } - impl parallel::Reduce for Statistics2 { + impl parallel::Reduce for Statistics { type Input = Result<(Vec, Outcome), Error>; type FeedProduce = (); type Output = (Vec, Outcome); diff --git a/git-pack/src/data/output/count/mod.rs b/git-pack/src/data/output/count/mod.rs index f530253817b..5159105fb58 100644 --- a/git-pack/src/data/output/count/mod.rs +++ b/git-pack/src/data/output/count/mod.rs @@ -27,4 +27,4 @@ impl Count { /// pub mod iter_from_objects; -pub use iter_from_objects::{iter_from_objects, objects}; +pub use iter_from_objects::objects; diff --git a/git-pack/tests/pack/data/output/count_and_entries.rs b/git-pack/tests/pack/data/output/count_and_entries.rs index d16a7b746a3..ea765c8aa4f 100644 --- a/git-pack/tests/pack/data/output/count_and_entries.rs +++ b/git-pack/tests/pack/data/output/count_and_entries.rs @@ -252,6 +252,7 @@ fn traversals() -> crate::Result { }))) .filter(|o| !o.is_null()), progress::Discard, + &AtomicBool::new(false), count::iter_from_objects::Options { input_object_expansion: expansion_mode, thread_limit: deterministic_count_needs_single_thread, diff --git a/gitoxide-core/src/pack/create.rs b/gitoxide-core/src/pack/create.rs index 15c683e8152..d4c1961f194 100644 --- a/gitoxide-core/src/pack/create.rs +++ b/gitoxide-core/src/pack/create.rs @@ -97,6 +97,7 @@ pub fn create( where W: std::io::Write, { + // TODO: review need for Arc for the counting part, use different kinds of Easy depending on need let repo = Arc::new(git_repository::discover(repository_path)?); progress.init(Some(2), progress::steps()); let tips = tips.into_iter(); @@ -108,6 +109,7 @@ where let tips = tips .map(|tip| { ObjectId::from_hex(&Vec::from_os_str_lossy(tip.as_ref())).or_else({ + // TODO: Use Easy when ready… let packed = repo.refs.packed().ok().flatten(); let refs = &repo.refs; let repo = Arc::clone(&repo); @@ -132,12 +134,13 @@ where Err(_) => unreachable!("there is only one instance left here"), }; let iter = Box::new( + // TODO: Easy-based traversal traverse::commit::Ancestors::new(tips, traverse::commit::ancestors::State::default(), { let db = Arc::clone(&db); move |oid, buf| db.find_existing_commit_iter(oid, buf, &mut pack::cache::Never).ok() }) - .inspect(move |_| progress.inc()) - .map(Result::unwrap), + .map(Result::unwrap) + .inspect(move |_| progress.inc()), ); // todo: should there be a better way, maybe let input support Option or Result (db, iter) } @@ -157,44 +160,34 @@ where }; let mut stats = Statistics::default(); - let chunk_size = 200; - let start = Instant::now(); + let chunk_size = 1000; // What's a good value for this? let counts = { let mut progress = progress.add_child("counting"); progress.init(None, progress::count("objects")); - let mut interruptible_counts_iter = interrupt::Iter::new( - pack::data::output::count::iter_from_objects( - Arc::clone(&db), - move || { - if nondeterministic_count { - Box::new(pack::cache::lru::StaticLinkedList::<64>::default()) as Box - } else { - Box::new(pack::cache::lru::MemoryCappedHashmap::new(400 * 1024 * 1024)) as Box - // todo: Make that configurable - } - }, - input, - progress.add_child("threads"), - pack::data::output::count::iter_from_objects::Options { - thread_limit: if nondeterministic_count || matches!(expansion, ObjectExpansion::None) { - thread_limit - } else { - Some(1) - }, - chunk_size, - input_object_expansion: expansion.into(), + let (mut counts, count_stats) = pack::data::output::count::objects( + Arc::clone(&db), + move || { + if nondeterministic_count { + Box::new(pack::cache::lru::StaticLinkedList::<64>::default()) as Box + } else { + Box::new(pack::cache::lru::MemoryCappedHashmap::new(400 * 1024 * 1024)) as Box + // todo: Make that configurable + } + }, + input, + progress::ThroughputOnDrop::new(progress), + &interrupt::IS_INTERRUPTED, + pack::data::output::count::iter_from_objects::Options { + thread_limit: if nondeterministic_count || matches!(expansion, ObjectExpansion::None) { + thread_limit + } else { + Some(1) }, - ), - make_cancellation_err, - ); - let mut counts = Vec::new(); - for c in interruptible_counts_iter.by_ref() { - let c = c??; - progress.inc_by(c.len()); - counts.extend(c.into_iter()); - } - stats.counts = interruptible_counts_iter.into_inner().finalize()?; - progress.show_throughput(start); + chunk_size, + input_object_expansion: expansion.into(), + }, + )?; + stats.counts = count_stats; counts.shrink_to_fit(); counts }; From d689599d1b819c18a3be60075170dbe00462e216 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 22 Aug 2021 11:56:15 +0800 Subject: [PATCH 05/11] thanks clippy --- git-features/src/interrupt.rs | 4 +--- git-pack/src/data/output/count/iter_from_objects.rs | 5 ++++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/git-features/src/interrupt.rs b/git-features/src/interrupt.rs index 1f90465594b..d689e3915ea 100644 --- a/git-features/src/interrupt.rs +++ b/git-features/src/interrupt.rs @@ -36,9 +36,7 @@ where type Item = Result; fn next(&mut self) -> Option { - if self.make_err.is_none() { - return None; - } + self.make_err.as_ref()?; if self.should_interrupt.load(Ordering::Relaxed) { return Some(Err(self.make_err.take().expect("no bug")())); } diff --git a/git-pack/src/data/output/count/iter_from_objects.rs b/git-pack/src/data/output/count/iter_from_objects.rs index de552c58231..09c7969b47a 100644 --- a/git-pack/src/data/output/count/iter_from_objects.rs +++ b/git-pack/src/data/output/count/iter_from_objects.rs @@ -7,6 +7,9 @@ use git_object::immutable; use crate::{data::output, find, FindExt}; +/// The return type used by [`objects()`]. +pub type Result = std::result::Result<(Vec, Outcome), Error>; + /// Generate [`Count`][output::Count]s from input `objects` with object expansion based on [`options`][Options] /// to learn which objects would would constitute a pack. This step is required to know exactly how many objects would /// be in a pack while keeping data around to avoid minimize object database access. @@ -35,7 +38,7 @@ pub fn objects( input_object_expansion, chunk_size, }: Options, -) -> Result<(Vec, Outcome), Error>> +) -> Result> where Find: crate::Find + Send + Sync, ::Error: Send, From 0aac40c88a5c26f7c295db8433b510b168f15ca3 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 22 Aug 2021 12:25:40 +0800 Subject: [PATCH 06/11] [pack #167] Error handling for object input --- .../data/output/count/iter_from_objects.rs | 24 ++++++----- .../pack/data/output/count_and_entries.rs | 4 +- gitoxide-core/src/pack/create.rs | 42 +++++++++++++++---- 3 files changed, 52 insertions(+), 18 deletions(-) diff --git a/git-pack/src/data/output/count/iter_from_objects.rs b/git-pack/src/data/output/count/iter_from_objects.rs index 09c7969b47a..3eed4b00992 100644 --- a/git-pack/src/data/output/count/iter_from_objects.rs +++ b/git-pack/src/data/output/count/iter_from_objects.rs @@ -8,7 +8,7 @@ use git_object::immutable; use crate::{data::output, find, FindExt}; /// The return type used by [`objects()`]. -pub type Result = std::result::Result<(Vec, Outcome), Error>; +pub type Result = std::result::Result<(Vec, Outcome), Error>; /// Generate [`Count`][output::Count]s from input `objects` with object expansion based on [`options`][Options] /// to learn which objects would would constitute a pack. This step is required to know exactly how many objects would @@ -27,7 +27,7 @@ pub type Result = std::result::Result<(Vec, Outcome), Error /// * A flag that is set to true if the operation should stop /// * `options` /// * more configuration -pub fn objects( +pub fn objects( db: Find, make_cache: impl Fn() -> Cache + Send + Sync, objects_ids: Iter, @@ -38,12 +38,13 @@ pub fn objects( input_object_expansion, chunk_size, }: Options, -) -> Result> +) -> Result, IterErr> where Find: crate::Find + Send + Sync, ::Error: Send, - Iter: Iterator + Send, - Oid: AsRef + Send, + Iter: Iterator> + Send, + Oid: Into + Send, + IterErr: std::error::Error + Send, Cache: crate::cache::DecodeEntry, { let lower_bound = objects_ids.size_hint().0; @@ -80,7 +81,7 @@ where }, { let top_level_progress = Arc::clone(&top_level_progress); - move |oids: Vec, (buf1, buf2, cache, progress)| { + move |oids: Vec>, (buf1, buf2, cache, progress)| { use ObjectExpansion::*; let mut out = Vec::new(); let mut tree_traversal_state = git_traverse::tree::breadthfirst::State::default(); @@ -95,7 +96,7 @@ where } for id in oids.into_iter() { - let id = id.as_ref(); + let id = id.map(|oid| oid.into()).map_err(Error::InputIteration)?; let obj = db.find_existing(id, buf1, cache)?; stats.input_objects += 1; match input_object_expansion { @@ -208,7 +209,7 @@ where } TreeContents => { use git_object::Kind::*; - let mut id: ObjectId = id.into(); + let mut id = id; let mut obj = obj; loop { push_obj_count_unique(&mut out, &seen_objs, &id, &obj, progress, stats, false); @@ -250,7 +251,7 @@ where } } } - AsIs => push_obj_count_unique(&mut out, &seen_objs, id, &obj, progress, stats, false), + AsIs => push_obj_count_unique(&mut out, &seen_objs, &id, &obj, progress, stats, false), } } top_level_progress.lock().inc_by(out.len()); @@ -516,15 +517,18 @@ mod types { /// The error returned by the pack generation iterator [bytes::FromEntriesIter][crate::data::output::bytes::FromEntriesIter]. #[derive(Debug, thiserror::Error)] #[allow(missing_docs)] - pub enum Error + pub enum Error where FindErr: std::error::Error + 'static, + IterErr: std::error::Error + 'static, { #[error(transparent)] CommitDecode(git_object::immutable::object::decode::Error), #[error(transparent)] FindExisting(#[from] FindErr), #[error(transparent)] + InputIteration(IterErr), + #[error(transparent)] TreeTraverse(git_traverse::tree::breadthfirst::Error), #[error(transparent)] TreeChanges(git_diff::tree::changes::Error), diff --git a/git-pack/tests/pack/data/output/count_and_entries.rs b/git-pack/tests/pack/data/output/count_and_entries.rs index ea765c8aa4f..09de2008ae5 100644 --- a/git-pack/tests/pack/data/output/count_and_entries.rs +++ b/git-pack/tests/pack/data/output/count_and_entries.rs @@ -12,6 +12,7 @@ use crate::pack::{ data::output::{db, DbKind}, hex_to_id, }; +use std::convert::Infallible; #[test] fn traversals() -> crate::Result { @@ -250,7 +251,8 @@ fn traversals() -> crate::Result { } else { "e3fb53cbb4c346d48732a24f09cf445e49bc63d6" }))) - .filter(|o| !o.is_null()), + .filter(|o| !o.is_null()) + .map(Ok::<_, Infallible>), progress::Discard, &AtomicBool::new(false), count::iter_from_objects::Options { diff --git a/gitoxide-core/src/pack/create.rs b/gitoxide-core/src/pack/create.rs index d4c1961f194..102d0958f17 100644 --- a/gitoxide-core/src/pack/create.rs +++ b/gitoxide-core/src/pack/create.rs @@ -102,7 +102,10 @@ where progress.init(Some(2), progress::steps()); let tips = tips.into_iter(); let make_cancellation_err = || anyhow!("Cancelled by user"); - let (db, input): (_, Box + Send + 'static>) = match input { + let (db, input): ( + _, + Box> + Send>, + ) = match input { None => { let mut progress = progress.add_child("traversing"); progress.init(None, progress::count("commits")); @@ -139,9 +142,9 @@ where let db = Arc::clone(&db); move |oid, buf| db.find_existing_commit_iter(oid, buf, &mut pack::cache::Never).ok() }) - .map(Result::unwrap) + .map(|res| res.map_err(Into::into)) .inspect(move |_| progress.inc()), - ); // todo: should there be a better way, maybe let input support Option or Result + ); (db, iter) } Some(input) => { @@ -149,11 +152,10 @@ where Ok(repo) => Arc::new(repo.odb), Err(_) => unreachable!("there is only one instance left here"), }; - let iter = Box::new(input.lines().filter_map(|hex_id| { + let iter = Box::new(input.lines().map(|hex_id| { hex_id - .ok() - .and_then(|hex_id| ObjectId::from_hex(hex_id.as_bytes()).ok()) - // todo: should there be a better way, maybe let input support Option or Result + .map_err(Into::into) + .and_then(|hex_id| ObjectId::from_hex(hex_id.as_bytes()).map_err(Into::into)) })); (db, iter) } @@ -327,3 +329,29 @@ struct Statistics { counts: pack::data::output::count::iter_from_objects::Outcome, entries: pack::data::output::entry::iter_from_counts::Outcome, } + +pub mod input_iteration { + use git_repository::{hash, traverse}; + + use quick_error::quick_error; + quick_error! { + #[derive(Debug)] + pub enum Error { + Iteration(err: traverse::commit::ancestors::Error) { + display("input objects couldn't be iterated completely") + from() + source(err) + } + InputLinesIo(err: std::io::Error) { + display("An error occurred while reading hashes from standard input") + from() + source(err) + } + HashDecode(err: hash::decode::Error) { + display("Could not decode hex hash provided on standard input") + from() + source(err) + } + } + } +} From a22f8e171e705bc42fcf290789e8e05423bd72d1 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 22 Aug 2021 13:17:37 +0800 Subject: [PATCH 07/11] =?UTF-8?q?[pack=20#167]=20progress=20is=20handled?= =?UTF-8?q?=20by=20reducer=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …which forms the basis for having a single-threaded version of this --- .../data/output/count/iter_from_objects.rs | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/git-pack/src/data/output/count/iter_from_objects.rs b/git-pack/src/data/output/count/iter_from_objects.rs index 3eed4b00992..14473a1f3f3 100644 --- a/git-pack/src/data/output/count/iter_from_objects.rs +++ b/git-pack/src/data/output/count/iter_from_objects.rs @@ -59,13 +59,13 @@ where size: chunk_size, }; let seen_objs = dashmap::DashSet::::new(); - let top_level_progress = Arc::new(parking_lot::Mutex::new(progress)); + let progress = Arc::new(parking_lot::Mutex::new(progress)); parallel::in_parallel( chunks, thread_limit, { - let progress = Arc::clone(&top_level_progress); + let progress = Arc::clone(&progress); move |n| { ( Vec::new(), // object data buffer @@ -80,7 +80,6 @@ where } }, { - let top_level_progress = Arc::clone(&top_level_progress); move |oids: Vec>, (buf1, buf2, cache, progress)| { use ObjectExpansion::*; let mut out = Vec::new(); @@ -254,11 +253,10 @@ where AsIs => push_obj_count_unique(&mut out, &seen_objs, &id, &obj, progress, stats, false), } } - top_level_progress.lock().inc_by(out.len()); Ok((out, outcome)) } }, - reduce::Statistics::default(), + reduce::Statistics::new(progress), ) } @@ -546,33 +544,44 @@ mod reduce { use super::Outcome; use crate::data::output; + use git_features::progress::Progress; + use std::sync::Arc; - pub struct Statistics { + pub struct Statistics { total: Outcome, counts: Vec, + progress: Arc>, _err: PhantomData, } - impl Default for Statistics { - fn default() -> Self { + impl Statistics + where + P: Progress, + { + pub fn new(progress: Arc>) -> Self { Statistics { total: Default::default(), counts: Default::default(), + progress, _err: PhantomData::default(), } } } - impl parallel::Reduce for Statistics { - type Input = Result<(Vec, Outcome), Error>; + impl parallel::Reduce for Statistics + where + P: Progress, + { + type Input = Result<(Vec, Outcome), E>; type FeedProduce = (); type Output = (Vec, Outcome); - type Error = Error; + type Error = E; fn feed(&mut self, item: Self::Input) -> Result { let (counts, mut stats) = item?; stats.total_objects = counts.len(); self.total.aggregate(stats); + self.progress.lock().inc_by(counts.len()); self.counts.extend(counts); Ok(()) } From 6bf0f7e86312b2a4d262c80979c61c94519bd4b0 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 22 Aug 2021 13:38:49 +0800 Subject: [PATCH 08/11] [pack #167] refactor This opens a pathway to using something that's not a dashmap --- .../data/output/count/iter_from_objects.rs | 346 ++++++++++-------- 1 file changed, 185 insertions(+), 161 deletions(-) diff --git a/git-pack/src/data/output/count/iter_from_objects.rs b/git-pack/src/data/output/count/iter_from_objects.rs index 14473a1f3f3..c4f3b9c14c4 100644 --- a/git-pack/src/data/output/count/iter_from_objects.rs +++ b/git-pack/src/data/output/count/iter_from_objects.rs @@ -6,6 +6,7 @@ use git_hash::{oid, ObjectId}; use git_object::immutable; use crate::{data::output, find, FindExt}; +use std::sync::atomic::{AtomicBool, Ordering}; /// The return type used by [`objects()`]. pub type Result = std::result::Result<(Vec, Outcome), Error>; @@ -81,183 +82,207 @@ where }, { move |oids: Vec>, (buf1, buf2, cache, progress)| { - use ObjectExpansion::*; - let mut out = Vec::new(); - let mut tree_traversal_state = git_traverse::tree::breadthfirst::State::default(); - let mut tree_diff_state = git_diff::tree::State::default(); - let mut parent_commit_ids = Vec::new(); - let mut traverse_delegate = tree::traverse::AllUnseen::new(&seen_objs); - let mut changes_delegate = tree::changes::AllNew::new(&seen_objs); - let mut outcome = Outcome::default(); - let stats = &mut outcome; if should_interrupt.load(Ordering::Relaxed) { return Err(Error::Interrupted); } - for id in oids.into_iter() { - let id = id.map(|oid| oid.into()).map_err(Error::InputIteration)?; - let obj = db.find_existing(id, buf1, cache)?; - stats.input_objects += 1; - match input_object_expansion { - TreeAdditionsComparedToAncestor => { - use git_object::Kind::*; - let mut obj = obj; - let mut id = id.to_owned(); - - loop { - push_obj_count_unique(&mut out, &seen_objs, &id, &obj, progress, stats, false); - match obj.kind { - Tree | Blob => break, - Tag => { - id = immutable::TagIter::from_bytes(obj.data) - .target_id() - .expect("every tag has a target"); - obj = db.find_existing(id, buf1, cache)?; - stats.expanded_objects += 1; - continue; - } - Commit => { - let current_tree_iter = { - let mut commit_iter = immutable::CommitIter::from_bytes(obj.data); - let tree_id = commit_iter.tree_id().expect("every commit has a tree"); - parent_commit_ids.clear(); - for token in commit_iter { - match token { - Ok(immutable::commit::iter::Token::Parent { id }) => { - parent_commit_ids.push(id) - } - Ok(_) => break, - Err(err) => return Err(Error::CommitDecode(err)), - } - } - let obj = db.find_existing(tree_id, buf1, cache)?; - push_obj_count_unique( - &mut out, &seen_objs, &tree_id, &obj, progress, stats, true, - ); - immutable::TreeIter::from_bytes(obj.data) - }; - - let objects = if parent_commit_ids.is_empty() { - traverse_delegate.clear(); - git_traverse::tree::breadthfirst( - current_tree_iter, - &mut tree_traversal_state, - |oid, buf| { - stats.decoded_objects += 1; - db.find_existing_tree_iter(oid, buf, cache).ok() - }, - &mut traverse_delegate, - ) - .map_err(Error::TreeTraverse)?; - &traverse_delegate.objects - } else { - for commit_id in &parent_commit_ids { - let parent_tree_id = { - let parent_commit_obj = db.find_existing(commit_id, buf2, cache)?; - - push_obj_count_unique( - &mut out, - &seen_objs, - commit_id, - &parent_commit_obj, - progress, - stats, - true, - ); - immutable::CommitIter::from_bytes(parent_commit_obj.data) - .tree_id() - .expect("every commit has a tree") - }; - let parent_tree = { - let parent_tree_obj = - db.find_existing(parent_tree_id, buf2, cache)?; - push_obj_count_unique( - &mut out, - &seen_objs, - &parent_tree_id, - &parent_tree_obj, - progress, - stats, - true, - ); - immutable::TreeIter::from_bytes(parent_tree_obj.data) - }; - - changes_delegate.clear(); - git_diff::tree::Changes::from(Some(parent_tree)) - .needed_to_obtain( - current_tree_iter.clone(), - &mut tree_diff_state, - |oid, buf| { - stats.decoded_objects += 1; - db.find_existing_tree_iter(oid, buf, cache).ok() - }, - &mut changes_delegate, - ) - .map_err(Error::TreeChanges)?; - } - &changes_delegate.objects - }; - for id in objects.iter() { - out.push(id_to_count(&db, buf2, id, progress, stats)); - } - break; + expand_inner( + &db, + input_object_expansion, + &seen_objs, + oids, + buf1, + buf2, + cache, + progress, + ) + } + }, + reduce::Statistics::new(progress), + ) +} + +#[allow(clippy::too_many_arguments)] +fn expand_inner( + db: &Find, + input_object_expansion: ObjectExpansion, + seen_objs: &DashSet, + oids: impl IntoIterator>, + buf1: &mut Vec, + buf2: &mut Vec, + cache: &mut impl crate::cache::DecodeEntry, + progress: &mut impl Progress, +) -> Result, IterErr> +where + Find: crate::Find + Send + Sync, + Oid: Into + Send, + IterErr: std::error::Error + Send, +{ + use ObjectExpansion::*; + + let mut out = Vec::new(); + let mut tree_traversal_state = git_traverse::tree::breadthfirst::State::default(); + let mut tree_diff_state = git_diff::tree::State::default(); + let mut parent_commit_ids = Vec::new(); + let mut traverse_delegate = tree::traverse::AllUnseen::new(seen_objs); + let mut changes_delegate = tree::changes::AllNew::new(seen_objs); + let mut outcome = Outcome::default(); + let stats = &mut outcome; + for id in oids.into_iter() { + let id = id.map(|oid| oid.into()).map_err(Error::InputIteration)?; + let obj = db.find_existing(id, buf1, cache)?; + stats.input_objects += 1; + match input_object_expansion { + TreeAdditionsComparedToAncestor => { + use git_object::Kind::*; + let mut obj = obj; + let mut id = id.to_owned(); + + loop { + push_obj_count_unique(&mut out, seen_objs, &id, &obj, progress, stats, false); + match obj.kind { + Tree | Blob => break, + Tag => { + id = immutable::TagIter::from_bytes(obj.data) + .target_id() + .expect("every tag has a target"); + obj = db.find_existing(id, buf1, cache)?; + stats.expanded_objects += 1; + continue; + } + Commit => { + let current_tree_iter = { + let mut commit_iter = immutable::CommitIter::from_bytes(obj.data); + let tree_id = commit_iter.tree_id().expect("every commit has a tree"); + parent_commit_ids.clear(); + for token in commit_iter { + match token { + Ok(immutable::commit::iter::Token::Parent { id }) => parent_commit_ids.push(id), + Ok(_) => break, + Err(err) => return Err(Error::CommitDecode(err)), } } - } - } - TreeContents => { - use git_object::Kind::*; - let mut id = id; - let mut obj = obj; - loop { - push_obj_count_unique(&mut out, &seen_objs, &id, &obj, progress, stats, false); - match obj.kind { - Tree => { - traverse_delegate.clear(); - git_traverse::tree::breadthfirst( - git_object::immutable::TreeIter::from_bytes(obj.data), - &mut tree_traversal_state, + let obj = db.find_existing(tree_id, buf1, cache)?; + push_obj_count_unique(&mut out, seen_objs, &tree_id, &obj, progress, stats, true); + immutable::TreeIter::from_bytes(obj.data) + }; + + let objects = if parent_commit_ids.is_empty() { + traverse_delegate.clear(); + git_traverse::tree::breadthfirst( + current_tree_iter, + &mut tree_traversal_state, + |oid, buf| { + stats.decoded_objects += 1; + db.find_existing_tree_iter(oid, buf, cache).ok() + }, + &mut traverse_delegate, + ) + .map_err(Error::TreeTraverse)?; + &traverse_delegate.objects + } else { + for commit_id in &parent_commit_ids { + let parent_tree_id = { + let parent_commit_obj = db.find_existing(commit_id, buf2, cache)?; + + push_obj_count_unique( + &mut out, + seen_objs, + commit_id, + &parent_commit_obj, + progress, + stats, + true, + ); + immutable::CommitIter::from_bytes(parent_commit_obj.data) + .tree_id() + .expect("every commit has a tree") + }; + let parent_tree = { + let parent_tree_obj = db.find_existing(parent_tree_id, buf2, cache)?; + push_obj_count_unique( + &mut out, + seen_objs, + &parent_tree_id, + &parent_tree_obj, + progress, + stats, + true, + ); + immutable::TreeIter::from_bytes(parent_tree_obj.data) + }; + + changes_delegate.clear(); + git_diff::tree::Changes::from(Some(parent_tree)) + .needed_to_obtain( + current_tree_iter.clone(), + &mut tree_diff_state, |oid, buf| { stats.decoded_objects += 1; db.find_existing_tree_iter(oid, buf, cache).ok() }, - &mut traverse_delegate, + &mut changes_delegate, ) - .map_err(Error::TreeTraverse)?; - for id in traverse_delegate.objects.iter() { - out.push(id_to_count(&db, buf1, id, progress, stats)); - } - break; - } - Commit => { - id = immutable::CommitIter::from_bytes(obj.data) - .tree_id() - .expect("every commit has a tree"); - stats.expanded_objects += 1; - obj = db.find_existing(id, buf1, cache)?; - continue; - } - Blob => break, - Tag => { - id = immutable::TagIter::from_bytes(obj.data) - .target_id() - .expect("every tag has a target"); - stats.expanded_objects += 1; - obj = db.find_existing(id, buf1, cache)?; - continue; - } + .map_err(Error::TreeChanges)?; } + &changes_delegate.objects + }; + for id in objects.iter() { + out.push(id_to_count(db, buf2, id, progress, stats)); } + break; } - AsIs => push_obj_count_unique(&mut out, &seen_objs, &id, &obj, progress, stats, false), } } - Ok((out, outcome)) } - }, - reduce::Statistics::new(progress), - ) + TreeContents => { + use git_object::Kind::*; + let mut id = id; + let mut obj = obj; + loop { + push_obj_count_unique(&mut out, seen_objs, &id, &obj, progress, stats, false); + match obj.kind { + Tree => { + traverse_delegate.clear(); + git_traverse::tree::breadthfirst( + git_object::immutable::TreeIter::from_bytes(obj.data), + &mut tree_traversal_state, + |oid, buf| { + stats.decoded_objects += 1; + db.find_existing_tree_iter(oid, buf, cache).ok() + }, + &mut traverse_delegate, + ) + .map_err(Error::TreeTraverse)?; + for id in traverse_delegate.objects.iter() { + out.push(id_to_count(db, buf1, id, progress, stats)); + } + break; + } + Commit => { + id = immutable::CommitIter::from_bytes(obj.data) + .tree_id() + .expect("every commit has a tree"); + stats.expanded_objects += 1; + obj = db.find_existing(id, buf1, cache)?; + continue; + } + Blob => break, + Tag => { + id = immutable::TagIter::from_bytes(obj.data) + .target_id() + .expect("every tag has a target"); + stats.expanded_objects += 1; + obj = db.find_existing(id, buf1, cache)?; + continue; + } + } + } + } + AsIs => push_obj_count_unique(&mut out, seen_objs, &id, &obj, progress, stats, false), + } + } + Ok((out, outcome)) } mod tree { @@ -534,7 +559,6 @@ mod types { Interrupted, } } -use std::sync::atomic::{AtomicBool, Ordering}; pub use types::{Error, ObjectExpansion, Options, Outcome}; mod reduce { From 169f000087aab18f0257fb0c61dc3b3901e97505 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 22 Aug 2021 13:56:26 +0800 Subject: [PATCH 09/11] =?UTF-8?q?[pack=20#167]=20generalize=20over=20immut?= =?UTF-8?q?able=20insertions=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …to allow for a single-threaded implementation with a RefCell. Unfortunately we can't just use a mutable HashSet without duplicating everything thanks to the &mut requirement or without unsafe code (i.e. storing a pointer and just turning it into a mutable ref) --- .../data/output/count/iter_from_objects.rs | 65 ++++++++++++++----- 1 file changed, 50 insertions(+), 15 deletions(-) diff --git a/git-pack/src/data/output/count/iter_from_objects.rs b/git-pack/src/data/output/count/iter_from_objects.rs index c4f3b9c14c4..4b0def4d0b5 100644 --- a/git-pack/src/data/output/count/iter_from_objects.rs +++ b/git-pack/src/data/output/count/iter_from_objects.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use dashmap::DashSet; use git_features::{parallel, progress::Progress}; use git_hash::{oid, ObjectId}; use git_object::immutable; @@ -106,7 +105,7 @@ where fn expand_inner( db: &Find, input_object_expansion: ObjectExpansion, - seen_objs: &DashSet, + seen_objs: &impl util::InsertImmutable, oids: impl IntoIterator>, buf1: &mut Vec, buf2: &mut Vec, @@ -287,7 +286,7 @@ where mod tree { pub mod changes { - use dashmap::DashSet; + use crate::data::output::count::iter_from_objects::util::InsertImmutable; use git_diff::tree::{ visit::{Action, Change}, Visit, @@ -295,13 +294,16 @@ mod tree { use git_hash::ObjectId; use git_object::bstr::BStr; - pub struct AllNew<'a> { + pub struct AllNew<'a, H> { pub objects: Vec, - all_seen: &'a DashSet, + all_seen: &'a H, } - impl<'a> AllNew<'a> { - pub fn new(all_seen: &'a DashSet) -> Self { + impl<'a, H> AllNew<'a, H> + where + H: InsertImmutable, + { + pub fn new(all_seen: &'a H) -> Self { AllNew { objects: Default::default(), all_seen, @@ -312,7 +314,10 @@ mod tree { } } - impl<'a> Visit for AllNew<'a> { + impl<'a, H> Visit for AllNew<'a, H> + where + H: InsertImmutable, + { fn pop_front_tracked_path_and_set_current(&mut self) {} fn push_back_tracked_path_component(&mut self, _component: &BStr) {} @@ -337,18 +342,21 @@ mod tree { } pub mod traverse { - use dashmap::DashSet; + use crate::data::output::count::iter_from_objects::util::InsertImmutable; use git_hash::ObjectId; use git_object::{bstr::BStr, immutable::tree::Entry}; use git_traverse::tree::visit::{Action, Visit}; - pub struct AllUnseen<'a> { + pub struct AllUnseen<'a, H> { pub objects: Vec, - all_seen: &'a DashSet, + all_seen: &'a H, } - impl<'a> AllUnseen<'a> { - pub fn new(all_seen: &'a DashSet) -> Self { + impl<'a, H> AllUnseen<'a, H> + where + H: InsertImmutable, + { + pub fn new(all_seen: &'a H) -> Self { AllUnseen { objects: Default::default(), all_seen, @@ -359,7 +367,10 @@ mod tree { } } - impl<'a> Visit for AllUnseen<'a> { + impl<'a, H> Visit for AllUnseen<'a, H> + where + H: InsertImmutable, + { fn pop_front_tracked_path_and_set_current(&mut self) {} fn push_back_tracked_path_component(&mut self, _component: &BStr) {} @@ -391,7 +402,7 @@ mod tree { fn push_obj_count_unique( out: &mut Vec, - all_seen: &DashSet, + all_seen: &impl util::InsertImmutable, id: &oid, obj: &crate::data::Object<'_>, progress: &mut impl Progress, @@ -425,6 +436,30 @@ fn id_to_count( } mod util { + pub trait InsertImmutable { + fn insert(&self, item: Item) -> bool; + } + + mod trait_impls { + use super::InsertImmutable; + use dashmap::DashSet; + use std::cell::RefCell; + use std::collections::HashSet; + use std::hash::Hash; + + impl InsertImmutable for DashSet { + fn insert(&self, item: T) -> bool { + self.insert(item) + } + } + + impl InsertImmutable for RefCell> { + fn insert(&self, item: T) -> bool { + self.borrow_mut().insert(item) + } + } + } + pub struct Chunks { pub size: usize, pub iter: I, From 65e29de45a92c82cebd832634ab194db19a1b590 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 22 Aug 2021 14:17:22 +0800 Subject: [PATCH 10/11] =?UTF-8?q?[pack=20#167]=20a=20single-threaded=20spe?= =?UTF-8?q?cial=20case=20for=20counting=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …which is still not optimal due to RefCell, but probably the cost of that is neglectable or can be made up for with a faster hash. However, it's not exactly faster and it doesn't max out one core either. --- .../data/output/count/iter_from_objects.rs | 43 +++++++++++-- git-pack/src/data/output/count/mod.rs | 2 +- gitoxide-core/src/pack/create.rs | 61 ++++++++++++------- 3 files changed, 78 insertions(+), 28 deletions(-) diff --git a/git-pack/src/data/output/count/iter_from_objects.rs b/git-pack/src/data/output/count/iter_from_objects.rs index 4b0def4d0b5..07d2b040f2f 100644 --- a/git-pack/src/data/output/count/iter_from_objects.rs +++ b/git-pack/src/data/output/count/iter_from_objects.rs @@ -81,10 +81,6 @@ where }, { move |oids: Vec>, (buf1, buf2, cache, progress)| { - if should_interrupt.load(Ordering::Relaxed) { - return Err(Error::Interrupted); - } - expand_inner( &db, input_object_expansion, @@ -94,6 +90,7 @@ where buf2, cache, progress, + should_interrupt, ) } }, @@ -101,6 +98,36 @@ where ) } +/// Like [`objects()`] but using a single thread only to mostly save on the otherwise required overhead. +pub fn objects_unthreaded( + db: Find, + pack_cache: &mut impl crate::cache::DecodeEntry, + object_ids: impl Iterator>, + mut progress: impl Progress, + should_interrupt: &AtomicBool, + input_object_expansion: ObjectExpansion, +) -> Result, IterErr> +where + Find: crate::Find + Send + Sync, + Oid: Into + Send, + IterErr: std::error::Error + Send, +{ + let seen_objs = RefCell::new(HashSet::::new()); + + let (mut buf1, mut buf2) = (Vec::new(), Vec::new()); + expand_inner( + &db, + input_object_expansion, + &seen_objs, + object_ids, + &mut buf1, + &mut buf2, + pack_cache, + &mut progress, + should_interrupt, + ) +} + #[allow(clippy::too_many_arguments)] fn expand_inner( db: &Find, @@ -111,6 +138,7 @@ fn expand_inner( buf2: &mut Vec, cache: &mut impl crate::cache::DecodeEntry, progress: &mut impl Progress, + should_interrupt: &AtomicBool, ) -> Result, IterErr> where Find: crate::Find + Send + Sync, @@ -128,6 +156,10 @@ where let mut outcome = Outcome::default(); let stats = &mut outcome; for id in oids.into_iter() { + if should_interrupt.load(Ordering::Relaxed) { + return Err(Error::Interrupted); + } + let id = id.map(|oid| oid.into()).map_err(Error::InputIteration)?; let obj = db.find_existing(id, buf1, cache)?; stats.input_objects += 1; @@ -556,7 +588,6 @@ mod types { /// especially when tree traversal is involved. Thus deterministic ordering requires `Some(1)` to be set. pub thread_limit: Option, /// The amount of objects per chunk or unit of work to be sent to threads for processing - /// TODO: could this become the window size? pub chunk_size: usize, /// The way input objects are handled pub input_object_expansion: ObjectExpansion, @@ -594,6 +625,8 @@ mod types { Interrupted, } } +use std::cell::RefCell; +use std::collections::HashSet; pub use types::{Error, ObjectExpansion, Options, Outcome}; mod reduce { diff --git a/git-pack/src/data/output/count/mod.rs b/git-pack/src/data/output/count/mod.rs index 5159105fb58..f3cf5c2c697 100644 --- a/git-pack/src/data/output/count/mod.rs +++ b/git-pack/src/data/output/count/mod.rs @@ -27,4 +27,4 @@ impl Count { /// pub mod iter_from_objects; -pub use iter_from_objects::objects; +pub use iter_from_objects::{objects, objects_unthreaded}; diff --git a/gitoxide-core/src/pack/create.rs b/gitoxide-core/src/pack/create.rs index 102d0958f17..7fa2e1783fb 100644 --- a/gitoxide-core/src/pack/create.rs +++ b/gitoxide-core/src/pack/create.rs @@ -166,29 +166,46 @@ where let counts = { let mut progress = progress.add_child("counting"); progress.init(None, progress::count("objects")); - let (mut counts, count_stats) = pack::data::output::count::objects( - Arc::clone(&db), - move || { - if nondeterministic_count { - Box::new(pack::cache::lru::StaticLinkedList::<64>::default()) as Box - } else { - Box::new(pack::cache::lru::MemoryCappedHashmap::new(400 * 1024 * 1024)) as Box - // todo: Make that configurable - } - }, - input, - progress::ThroughputOnDrop::new(progress), - &interrupt::IS_INTERRUPTED, - pack::data::output::count::iter_from_objects::Options { - thread_limit: if nondeterministic_count || matches!(expansion, ObjectExpansion::None) { - thread_limit - } else { - Some(1) + let may_use_multiple_threads = nondeterministic_count || matches!(expansion, ObjectExpansion::None); + let thread_limit = if may_use_multiple_threads { + thread_limit + } else { + Some(1) + }; + let make_cache = move || { + if may_use_multiple_threads { + Box::new(pack::cache::lru::StaticLinkedList::<64>::default()) as Box + } else { + Box::new(pack::cache::lru::MemoryCappedHashmap::new(400 * 1024 * 1024)) as Box + // todo: Make that configurable + } + }; + let db = Arc::clone(&db); + let progress = progress::ThroughputOnDrop::new(progress); + let input_object_expansion = expansion.into(); + let (mut counts, count_stats) = if may_use_multiple_threads { + pack::data::output::count::objects( + db, + make_cache, + input, + progress, + &interrupt::IS_INTERRUPTED, + pack::data::output::count::iter_from_objects::Options { + thread_limit, + chunk_size, + input_object_expansion, }, - chunk_size, - input_object_expansion: expansion.into(), - }, - )?; + )? + } else { + pack::data::output::count::objects_unthreaded( + db, + &mut make_cache(), + input, + progress, + &interrupt::IS_INTERRUPTED, + input_object_expansion, + )? + }; stats.counts = count_stats; counts.shrink_to_fit(); counts From 8d499762b74c08437d901bb98806e0a1fc6f93bb Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 22 Aug 2021 16:20:55 +0800 Subject: [PATCH 11/11] =?UTF-8?q?[pack=20#167]=20Use=20custom=20uluru=20ve?= =?UTF-8?q?rsion=20to=20avoid=20a=20lot=20of=20allocations=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …even though that doesn't really translate into much performance, despite technically saving millions of allocations. Maybe allocators are already pretty good at this. --- Cargo.lock | 3 +-- git-pack/Cargo.toml | 2 +- git-pack/src/cache.rs | 19 +++++++++++++++---- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 797eacdcff5..e3046230505 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2664,8 +2664,7 @@ checksum = "879f6906492a7cd215bfa4cf595b600146ccfac0c79bcbd1f3000162af5e8b06" [[package]] name = "uluru" version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "308dcc9d947b227796f851adb99936fb060681a89c002c9c1928404a3b6c2d72" +source = "git+https://github.com/Byron/uluru?branch=master#43fed348af8c391419354a0829e67e44a6a24f8a" dependencies = [ "arrayvec", ] diff --git a/git-pack/Cargo.toml b/git-pack/Cargo.toml index 8e940e55c85..a4bcc844171 100644 --- a/git-pack/Cargo.toml +++ b/git-pack/Cargo.toml @@ -48,7 +48,7 @@ itoa = "0.4.6" bytesize = "1.0.1" parking_lot = { version = "0.11.0", default-features = false } thiserror = "1.0.26" -uluru = { version = "2.2.0", optional = true } +uluru = { version = "2.2.0", optional = true, git = "https://github.com/Byron/uluru", features = ["std"], branch = "master"} memory-lru = { version = "0.1.0", optional = true } dashmap = "4.0.2" diff --git a/git-pack/src/cache.rs b/git-pack/src/cache.rs index b42f66725fd..7ddbac5336f 100644 --- a/git-pack/src/cache.rs +++ b/git-pack/src/cache.rs @@ -106,17 +106,28 @@ pub mod lru { /// The cache must be small as the search is 'naive' and the underlying data structure is a linked list. /// Values of 64 seem to improve performance. #[derive(Default)] - pub struct StaticLinkedList(uluru::LRUCache); + pub struct StaticLinkedList(uluru::LRUCache, Vec>); impl DecodeEntry for StaticLinkedList { fn put(&mut self, pack_id: u32, offset: u64, data: &[u8], kind: git_object::Kind, compressed_size: usize) { - self.0.insert(Entry { + if let Some(previous) = self.0.insert(Entry { offset, pack_id, - data: Vec::from(data), + data: self + .1 + .pop() + .map(|mut v| { + v.clear(); + v.resize(data.len(), 0); + v.copy_from_slice(data); + v + }) + .unwrap_or_else(|| Vec::from(data)), kind, compressed_size, - }) + }) { + self.1.push(previous.data) + } } fn get(&mut self, pack_id: u32, offset: u64, out: &mut Vec) -> Option<(git_object::Kind, usize)> {