Skip to content

Commit

Permalink
change!: parallel utilities now use Send + Clone insted of `Send + …
Browse files Browse the repository at this point in the history
…Sync` (#263)

This helps to assure that thread-local computations always work with the
kind of types we provide. The ones that are carrying out actions are
notably not `Sync` anymore.

We cater to that by defining our bounds accordingly, but for those
who want to use other utilities that need Sync, using types like
`Repository` and `thread_local!()` is the only way to make this
work.
  • Loading branch information
Byron committed Nov 27, 2021
1 parent 0af5077 commit e7526b2
Show file tree
Hide file tree
Showing 17 changed files with 67 additions and 86 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ check: ## Build all code in suitable configurations
&& cargo check
cd git-features && cargo check --all-features \
&& cargo check --features parallel \
&& cargo check --features threading \
&& cargo check --features rustsha1 \
&& cargo check --features fast-sha1 \
&& cargo check --features progress \
Expand Down
13 changes: 13 additions & 0 deletions git-features/src/threading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ mod _impl {
pub type OwnShared<T> = Arc<T>;
/// A synchronization primitive which can start read-only and transition to support mutation.
pub type MutableOnDemand<T> = parking_lot::RwLock<T>;
/// A synchronization primitive which provides read-write access right away.
pub type Mutable<T> = parking_lot::Mutex<T>;

/// Get an upgradable shared reference through a [`MutableOnDemand`] for read-only access.
///
Expand All @@ -28,6 +30,11 @@ mod _impl {
v.write()
}

/// Get a mutable reference through a [`Mutable`] for read-write access.
pub fn lock<T>(v: &Mutable<T>) -> parking_lot::MutexGuard<'_, T> {
v.lock()
}

/// Upgrade a handle previously obtained with [`get_ref_upgradeable()`] to support mutation.
pub fn upgrade_ref_to_mut<T>(
v: parking_lot::RwLockUpgradableReadGuard<'_, T>,
Expand All @@ -47,6 +54,8 @@ mod _impl {
pub type OwnShared<T> = Rc<T>;
/// A synchronization primitive which can start read-only and transition to support mutation.
pub type MutableOnDemand<T> = RefCell<T>;
/// A synchronization primitive which provides read-write access right away.
pub type Mutable<T> = RefCell<T>;

/// Get an upgradable shared reference through a [`MutableOnDemand`] for read-only access.
///
Expand All @@ -60,6 +69,10 @@ mod _impl {
v.borrow_mut()
}

/// Get a mutable reference through a [`Mutable`] for read-write access.
pub fn lock<T>(v: &Mutable<T>) -> RefMut<'_, T> {
v.borrow_mut()
}
/// Get a mutable reference through a [`MutableOnDemand`] for read-write access.
pub fn get_ref<T>(v: &RefCell<T>) -> Ref<'_, T> {
v.borrow()
Expand Down
1 change: 0 additions & 1 deletion git-pack/src/bundle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ mod verify {
>
where
P: Progress,
<P as Progress>::SubProgress: Sync,
C: crate::cache::DecodeEntry,
{
self.index.verify_integrity(
Expand Down
26 changes: 7 additions & 19 deletions git-pack/src/bundle/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ impl crate::Bundle {
options: Options,
) -> Result<Outcome, Error>
where
P: Progress + Sync,
<P as Progress>::SubProgress: Sync,
<<P as Progress>::SubProgress as Progress>::SubProgress: Sync,
P: Progress,
{
let mut read_progress = progress.add_child("read pack");
read_progress.init(None, progress::bytes());
Expand Down Expand Up @@ -136,20 +134,15 @@ impl crate::Bundle {
/// As it sends portions of the input to a thread it requires the 'static lifetime for the interrupt flags. This can only
/// be satisfied by a static AtomicBool which is only suitable for programs that only run one of these operations at a time
/// or don't mind that all of them abort when the flag is set.
pub fn write_to_directory_eagerly<P>(
pub fn write_to_directory_eagerly(
pack: impl io::Read + Send + 'static,
pack_size: Option<u64>,
directory: Option<impl AsRef<Path>>,
mut progress: P,
mut progress: impl Progress,
should_interrupt: &'static AtomicBool,
thin_pack_base_object_lookup_fn: Option<ThinPackLookupFnSend>,
options: Options,
) -> Result<Outcome, Error>
where
P: Progress + Sync,
<P as Progress>::SubProgress: Sync,
<<P as Progress>::SubProgress as Progress>::SubProgress: Sync,
{
) -> Result<Outcome, Error> {
let mut read_progress = progress.add_child("read pack");
read_progress.init(pack_size.map(|s| s as usize), progress::bytes());
let pack = progress::Read {
Expand Down Expand Up @@ -222,9 +215,9 @@ impl crate::Bundle {
})
}

fn inner_write<P>(
fn inner_write(
directory: Option<impl AsRef<Path>>,
mut progress: P,
mut progress: impl Progress,
Options {
thread_limit,
iteration_mode: _,
Expand All @@ -233,12 +226,7 @@ impl crate::Bundle {
data_file: Arc<parking_lot::Mutex<git_tempfile::Handle<Writable>>>,
pack_entries_iter: impl Iterator<Item = Result<data::input::Entry, data::input::Error>>,
should_interrupt: &AtomicBool,
) -> Result<(crate::index::write::Outcome, Option<PathBuf>, Option<PathBuf>), Error>
where
P: Progress + Sync,
<P as Progress>::SubProgress: Sync,
<<P as Progress>::SubProgress as Progress>::SubProgress: Sync,
{
) -> Result<(crate::index::write::Outcome, Option<PathBuf>, Option<PathBuf>), Error> {
let indexing_progress = progress.add_child("create index file");
Ok(match directory {
Some(directory) => {
Expand Down
43 changes: 23 additions & 20 deletions git-pack/src/cache/delta/traverse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
sync::atomic::{AtomicBool, Ordering},
};

use git_features::threading::{get_mut, MutableOnDemand, OwnShared};
use git_features::threading::{lock, Mutable, OwnShared};
use git_features::{
parallel,
parallel::in_parallel_if,
Expand Down Expand Up @@ -74,12 +74,12 @@ where
///
/// _Note_ that this method consumed the Tree to assure safe parallel traversal with mutation support.
#[allow(clippy::too_many_arguments)]
pub fn traverse<F, P, MBFN, S, E>(
pub fn traverse<F, P1, P2, MBFN, S, E>(
mut self,
should_run_in_parallel: impl FnOnce() -> bool,
resolve: F,
object_progress: P,
size_progress: P,
object_progress: P1,
size_progress: P2,
thread_limit: Option<usize>,
should_interrupt: &AtomicBool,
pack_entries_end: u64,
Expand All @@ -88,13 +88,14 @@ where
) -> Result<VecDeque<Item<T>>, Error>
where
F: for<'r> Fn(EntryRange, &'r mut Vec<u8>) -> Option<()> + Send + Clone,
P: Progress + Send + Sync,
MBFN: Fn(&mut T, &mut <P as Progress>::SubProgress, Context<'_, S>) -> Result<(), E> + Send + Clone,
P1: Progress,
P2: Progress,
MBFN: Fn(&mut T, &mut <P1 as Progress>::SubProgress, Context<'_, S>) -> Result<(), E> + Send + Clone,
E: std::error::Error + Send + Sync + 'static,
{
self.set_pack_entries_end(pack_entries_end);
let (chunk_size, thread_limit, _) = parallel::optimize_chunk_size_and_thread_limit(1, None, thread_limit, None);
let object_progress = OwnShared::new(MutableOnDemand::new(object_progress));
let object_progress = OwnShared::new(Mutable::new(object_progress));

let num_objects = self.items.len();
in_parallel_if(
Expand All @@ -106,7 +107,7 @@ where
move |thread_index| {
(
Vec::<u8>::with_capacity(4096),
get_mut(&object_progress).add_child(format!("thread {}", thread_index)),
lock(&object_progress).add_child(format!("thread {}", thread_index)),
new_thread_state(),
resolve.clone(),
inspect_object.clone(),
Expand All @@ -120,25 +121,26 @@ where
}
}

struct Reducer<'a, P> {
struct Reducer<'a, P1, P2> {
item_count: usize,
progress: OwnShared<MutableOnDemand<P>>,
progress: OwnShared<Mutable<P1>>,
start: std::time::Instant,
size_progress: P,
size_progress: P2,
should_interrupt: &'a AtomicBool,
}

impl<'a, P> Reducer<'a, P>
impl<'a, P1, P2> Reducer<'a, P1, P2>
where
P: Progress,
P1: Progress,
P2: Progress,
{
pub fn new(
num_objects: usize,
progress: OwnShared<MutableOnDemand<P>>,
mut size_progress: P,
progress: OwnShared<Mutable<P1>>,
mut size_progress: P2,
should_interrupt: &'a AtomicBool,
) -> Self {
get_mut(&progress).init(Some(num_objects), progress::count("objects"));
lock(&progress).init(Some(num_objects), progress::count("objects"));
size_progress.init(None, progress::bytes());
Reducer {
item_count: 0,
Expand All @@ -150,9 +152,10 @@ where
}
}

impl<'a, P> parallel::Reduce for Reducer<'a, P>
impl<'a, P1, P2> parallel::Reduce for Reducer<'a, P1, P2>
where
P: Progress,
P1: Progress,
P2: Progress,
{
type Input = Result<(usize, u64), Error>;
type FeedProduce = ();
Expand All @@ -163,15 +166,15 @@ where
let (num_objects, decompressed_size) = input?;
self.item_count += num_objects;
self.size_progress.inc_by(decompressed_size as usize);
get_mut(&self.progress).set(self.item_count);
lock(&self.progress).set(self.item_count);
if self.should_interrupt.load(Ordering::SeqCst) {
return Err(Error::Interrupted);
}
Ok(())
}

fn finalize(mut self) -> Result<Self::Output, Self::Error> {
get_mut(&self.progress).show_throughput(self.start);
lock(&self.progress).show_throughput(self.start);
self.size_progress.show_throughput(self.start);
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion git-pack/src/cache/delta/traverse/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub(crate) fn deltas<T, F, P, MBFN, S, E>(
(bytes_buf, ref mut progress, state, resolve, modify_base): &mut (Vec<u8>, P, S, F, MBFN),
) -> Result<(usize, u64), Error>
where
F: for<'r> Fn(EntryRange, &'r mut Vec<u8>) -> Option<()> + Send + Clone,
F: for<'r> Fn(EntryRange, &'r mut Vec<u8>) -> Option<()>,
P: Progress,
MBFN: Fn(&mut T, &mut P, Context<'_, S>) -> Result<(), E>,
T: Default,
Expand Down
1 change: 0 additions & 1 deletion git-pack/src/index/traverse/indexed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ impl index::File {
) -> Result<(git_hash::ObjectId, index::traverse::Outcome, P), Error<E>>
where
P: Progress,
<P as Progress>::SubProgress: Sync,
Processor: FnMut(
git_object::Kind,
&[u8],
Expand Down
1 change: 0 additions & 1 deletion git-pack/src/index/traverse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ impl index::File {
) -> Result<(git_hash::ObjectId, Outcome, Option<P>), Error<E>>
where
P: Progress,
<P as Progress>::SubProgress: Sync,
C: crate::cache::DecodeEntry,
E: std::error::Error + Send + Sync + 'static,
Processor: FnMut(
Expand Down
12 changes: 6 additions & 6 deletions git-pack/src/index/traverse/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
time::Instant,
};

use git_features::threading::{get_mut, MutableOnDemand, OwnShared};
use git_features::threading::{lock, Mutable, OwnShared};
use git_features::{parallel, progress::Progress};

use crate::{data, index::traverse};
Expand All @@ -25,7 +25,7 @@ fn div_decode_result(lhs: &mut data::decode_entry::Outcome, div: usize) {
}

pub struct Reducer<'a, P, E> {
progress: OwnShared<MutableOnDemand<P>>,
progress: OwnShared<Mutable<P>>,
check: traverse::SafetyCheck,
then: Instant,
entries_seen: usize,
Expand All @@ -39,7 +39,7 @@ where
P: Progress,
{
pub fn from_progress(
progress: OwnShared<MutableOnDemand<P>>,
progress: OwnShared<Mutable<P>>,
pack_data_len_in_bytes: usize,
check: traverse::SafetyCheck,
should_interrupt: &'a AtomicBool,
Expand Down Expand Up @@ -73,7 +73,7 @@ where
fn feed(&mut self, input: Self::Input) -> Result<(), Self::Error> {
let chunk_stats: Vec<_> = match input {
Err(err @ traverse::Error::PackDecode { .. }) if !self.check.fatal_decode_error() => {
get_mut(&self.progress).info(format!("Ignoring decode error: {}", err));
lock(&self.progress).info(format!("Ignoring decode error: {}", err));
return Ok(());
}
res => res,
Expand All @@ -100,7 +100,7 @@ where
);

add_decode_result(&mut self.stats.average, chunk_total);
get_mut(&self.progress).set(self.entries_seen);
lock(&self.progress).set(self.entries_seen);

if self.should_interrupt.load(Ordering::SeqCst) {
return Err(Self::Error::Interrupted);
Expand All @@ -114,7 +114,7 @@ where
let elapsed_s = self.then.elapsed().as_secs_f32();
let objects_per_second = (self.entries_seen as f32 / elapsed_s) as u32;

get_mut(&self.progress).info(format!(
lock(&self.progress).info(format!(
"of {} objects done in {:.2}s ({} objects/s, ~{}/s)",
self.entries_seen,
elapsed_s,
Expand Down
7 changes: 3 additions & 4 deletions git-pack/src/index/traverse/with_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ mod options {
}
}
}
use git_features::threading::{get_mut, MutableOnDemand, OwnShared};
use git_features::threading::{lock, Mutable, OwnShared};
use std::sync::atomic::Ordering;

pub use options::Options;
Expand All @@ -63,7 +63,6 @@ impl index::File {
P: Progress,
C: crate::cache::DecodeEntry,
E: std::error::Error + Send + Sync + 'static,
<P as Progress>::SubProgress: Send + Sync,
Processor: FnMut(
git_object::Kind,
&[u8],
Expand Down Expand Up @@ -98,7 +97,7 @@ impl index::File {
parallel::optimize_chunk_size_and_thread_limit(1000, Some(index_entries.len()), thread_limit, None);
let there_are_enough_entries_to_process = || index_entries.len() > chunk_size * available_cores;
let input_chunks = index_entries.chunks(chunk_size.max(chunk_size));
let reduce_progress = OwnShared::new(MutableOnDemand::new({
let reduce_progress = OwnShared::new(Mutable::new({
let mut p = progress.add_child("Traversing");
p.init(Some(self.num_objects() as usize), progress::count("objects"));
p
Expand All @@ -110,7 +109,7 @@ impl index::File {
new_cache(),
new_processor(),
Vec::with_capacity(2048), // decode buffer
get_mut(&reduce_progress).add_child(format!("thread {}", index)), // per thread progress
lock(&reduce_progress).add_child(format!("thread {}", index)), // per thread progress
)
}
};
Expand Down
1 change: 0 additions & 1 deletion git-pack/src/index/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ impl index::File {
>
where
P: Progress,
<P as Progress>::SubProgress: Sync,
C: crate::cache::DecodeEntry,
{
let mut root = progress::DoOrDiscard::from(progress);
Expand Down
6 changes: 2 additions & 4 deletions git-pack/src/index/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,17 @@ impl crate::index::File {
/// provides all bytes belonging to a pack entry writing them to the given mutable output `Vec`.
/// It should return `None` if the entry cannot be resolved from the pack that produced the `entries` iterator, causing
/// the write operation to fail.
pub fn write_data_iter_to_stream<F, F2, P>(
pub fn write_data_iter_to_stream<F, F2>(
kind: crate::index::Version,
make_resolver: F,
entries: impl Iterator<Item = Result<crate::data::input::Entry, crate::data::input::Error>>,
thread_limit: Option<usize>,
mut root_progress: P,
mut root_progress: impl Progress,
out: impl io::Write,
should_interrupt: &AtomicBool,
) -> Result<Outcome, Error>
where
F: FnOnce() -> io::Result<F2>,
P: Progress + Sync,
<P as Progress>::SubProgress: Sync,
F2: for<'r> Fn(crate::data::EntryRange, &'r mut Vec<u8>) -> Option<()> + Send + Clone,
{
if kind != crate::index::Version::default() {
Expand Down
1 change: 1 addition & 0 deletions git-pack/tests/pack/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ mod file {
}
}

#[cfg(feature = "internal-testing-git-features-parallel")]
mod any {
use std::{fs, io, sync::atomic::AtomicBool};

Expand Down

0 comments on commit e7526b2

Please sign in to comment.