diff --git a/Cargo.lock b/Cargo.lock index 22bb053..fed8919 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -582,9 +582,9 @@ dependencies = [ [[package]] name = "rayon" -version = "1.6.1" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6db3a213adf02b3bcfd2d3846bb41cb22857d131789e01df434fb7e7bc0759b7" +checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b" dependencies = [ "either", "rayon-core", @@ -592,9 +592,9 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.10.2" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "356a0625f1954f730c0201cdab48611198dc6ce21f4acff55089b5a78e6e835b" +checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d" dependencies = [ "crossbeam-channel", "crossbeam-deque", @@ -628,7 +628,7 @@ dependencies = [ "glob", "log", "mrp", - "num_cpus", + "rayon", "regex", "stderrlog", ] diff --git a/Cargo.toml b/Cargo.toml index 3ae5690..bfb2d68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,9 +14,9 @@ clap = { version = "3.2.6", features = ["derive"] } regex = "1.5.6" mrp = { path = "./mrp/" } glob = "0.3.1" -num_cpus = "1.15.0" stderrlog = "0.5.4" log = "0.4.17" +rayon = "1.7.0" [dev-dependencies] criterion = "0.4" diff --git a/benches/bulk_renames.rs b/benches/bulk_renames.rs index 2952d9a..c091967 100644 --- a/benches/bulk_renames.rs +++ b/benches/bulk_renames.rs @@ -1,3 +1,4 @@ +use rayon::prelude::*; use std::{path::PathBuf, str::FromStr}; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; @@ -39,5 +40,57 @@ fn renaming_files(c: &mut Criterion) { } } -criterion_group!(benches, renaming_files); +fn comparing_rayon_and_single_threaded(c: &mut Criterion) { + let renamer = get_renamer(); + let mut group = c.benchmark_group("rayon vs serial with a few files"); + group.sample_size(10); + + #[derive(Debug, Clone, Copy)] + enum VS { + Serial, + Rayon, + } + + for size in [ + (2, VS::Serial), + (2, VS::Rayon), + (20, VS::Serial), + (20, VS::Rayon), + (200, VS::Serial), + (200, VS::Rayon), + (20000, VS::Serial), + (20000, VS::Rayon), + ] + .iter() + { + let files = create_file_paths(size.0); + group.throughput(criterion::Throughput::Elements(size.0 as u64)); + + group.bench_with_input( + BenchmarkId::from_parameter(format!("{} with {:?}", size.0, size.1)), + &(files, size.1), + |b, (files, choice)| match choice { + VS::Serial => { + b.iter(|| { + files.iter().filter_map(|p| p.to_str()).for_each(|name| { + renamer.apply(name); + }); + }); + } + VS::Rayon => { + b.iter(|| { + files + .par_iter() + .filter_map(|p| p.to_str()) + .for_each(|name| { + renamer.apply(name); + }); + }); + } + }, + ); + } +} + +criterion_group!(benches, renaming_files, comparing_rayon_and_single_threaded); criterion_main!(benches); diff --git a/src/lib.rs b/src/lib.rs index 5d0cbd2..c143b7c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,12 +1,11 @@ -use std::{path::PathBuf, thread}; +use std::path::PathBuf; use log::*; use mrp::MatchAndReplaceStrategy; +use rayon::prelude::*; -#[derive(Default)] pub struct BulkRenameOptions { pub no_rename: bool, - pub multi: bool, } pub fn in_bulk<'p: 'r, 'r, R: MatchAndReplaceStrategy<'r> + std::marker::Sync>( @@ -14,96 +13,27 @@ pub fn in_bulk<'p: 'r, 'r, R: MatchAndReplaceStrategy<'r> + std::marker::Sync>( rename: &R, options: &BulkRenameOptions, ) { - if paths.is_empty() { - return; - } - - if options.multi { - let thread_count = num_cpus::get(); - - if thread_count > paths.len() { - warn!("there are more threads that files to rename, so single threaded it is"); - } else if thread_count * 500 > paths.len() { - warn!("probably too few files to warrant multithreading, but here we go..."); - return in_bulk_multithreaded(paths, rename, thread_count, options.no_rename); - } else { - return in_bulk_multithreaded(paths, rename, thread_count, options.no_rename); - } - } - return in_bulk_single_thread(paths, rename, options.no_rename); -} - -fn in_bulk_single_thread<'p: 'r, 'r, R: MatchAndReplaceStrategy<'r>>( - paths: &'p [PathBuf], - rename: &R, - no_rename: bool, -) { - let values = paths.iter().filter_map(|p| { - let str = p.to_str(); - - if str.is_none() { - error!("Path is invalid unicode: {:?}", p); - } - - return str; - }); - - for from in values { - if let Some(to) = rename.apply(from) { - if no_rename { + paths + .par_iter() + .filter_map(|p| { + let path_string = p.to_str(); + + if path_string.is_none() { + error!("Path is invalid unicode: {:?}", p); + } + + return match path_string { + Some(s) => rename.apply(s).map(|renamed| (s, renamed)), + None => None, + }; + }) + .for_each(|(from, to)| { + if options.no_rename { println!("{:?} -> {:?}", from, to); } else { if let Err(err) = std::fs::rename(from, to.to_string()) { error!("{:?}: {}", from, err); } }; - } - } -} - -fn in_bulk_multithreaded<'p: 'r, 'r, R: MatchAndReplaceStrategy<'r> + std::marker::Sync>( - paths: &'p [PathBuf], - rename: &R, - thread_count: usize, - no_rename: bool, -) { - debug!("found {} threads available on this machine", thread_count); - let max_chunk_size = paths.len() / (thread_count - 1); - - debug!( - "chunking work, to handle {} files in each of {} threads", - max_chunk_size, thread_count - ); - - let chunks = paths.chunks(max_chunk_size); - - thread::scope(|s| { - let mut join_handles = vec![]; - - for (id, path_chunk) in chunks.enumerate() { - if let Ok(handle) = thread::Builder::new().spawn_scoped(s, || { - in_bulk_single_thread(path_chunk, rename, no_rename); - }) { - debug!( - "spawned thread {} with {} file to rename", - id, - path_chunk.len() - ); - join_handles.push(handle); - } else { - error!( - "failed to spawn thread {}, renaming the next {} files in the main thread", - id, - path_chunk.len() - ); - in_bulk_single_thread(path_chunk, rename, no_rename); - }; - } - - for handle in join_handles { - handle - .join() - .expect("Couldn't join on the associated thread") - } - }) + }) } diff --git a/src/main.rs b/src/main.rs index e0d2efc..fa3ce71 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,10 +14,6 @@ struct RenameArgs { #[clap(global = true, long, conflicts_with = "paths")] glob: Option, - /// Enable multi-threading to process more files at a time - #[clap(global = true, short, long = "multi")] - multithreading: bool, - /// Prevent diagnostic logging #[clap(global = true, short, long)] quiet: bool, @@ -65,7 +61,6 @@ fn main() -> ExitCode { let options = &rename::BulkRenameOptions { no_rename: base_args.dry_run, - multi: base_args.multithreading, }; match base_args.command { @@ -73,7 +68,7 @@ fn main() -> ExitCode { Command::SIMPLE(args) => { let mut replacer = MatchAndReplacer::new(args.expression); replacer.set_strip(args.strip); - rename::in_bulk(&paths, &mut replacer, options); + rename::in_bulk(&paths, &replacer, options); } };