diff --git a/CHANGELOG.md b/CHANGELOG.md index 05a2a781e..9d47a8caa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,11 @@ Unreleased changes. Release notes have not yet been written. `rg -B1 -A2`. That is, `-A` and `-B` no longer completely override `-C`. Instead, they only partially override `-C`. +Performance improvements: + +* [PERF #2591](https://github.com/BurntSushi/ripgrep/pull/2591): + Parallel directory traversal now uses work stealing for faster searches. + Feature enhancements: * Added or improved file type filtering for Ada, DITA, Elixir, Fuchsia, Gentoo, diff --git a/Cargo.lock b/Cargo.lock index 259a7c21b..d11bbc294 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,6 +78,30 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae211234986c545741a7dc064309f67ee1e5ad243d0e48335adc0484d960bcc7" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", + "scopeguard", +] + [[package]] name = "crossbeam-utils" version = "0.8.16" @@ -224,6 +248,7 @@ name = "ignore" version = "0.4.20" dependencies = [ "crossbeam-channel", + "crossbeam-deque", "globset", "lazy_static", "log", @@ -309,6 +334,15 @@ dependencies = [ "libc", ] +[[package]] +name = "memoffset" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" +dependencies = [ + "autocfg", +] + [[package]] name = "num-traits" version = "0.2.16" @@ -444,6 +478,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "serde" version = "1.0.188" diff --git a/crates/ignore/Cargo.toml b/crates/ignore/Cargo.toml index a9495aa33..c659bb2d4 100644 --- a/crates/ignore/Cargo.toml +++ b/crates/ignore/Cargo.toml @@ -19,6 +19,7 @@ name = "ignore" bench = false [dependencies] +crossbeam-deque = "0.8.3" globset = { version = "0.4.10", path = "../globset" } lazy_static = "1.1" log = "0.4.5" diff --git a/crates/ignore/src/walk.rs b/crates/ignore/src/walk.rs index 734b87667..fdeea93c4 100644 --- a/crates/ignore/src/walk.rs +++ b/crates/ignore/src/walk.rs @@ -3,14 +3,15 @@ use std::ffi::OsStr; use std::fmt; use std::fs::{self, FileType, Metadata}; use std::io; -use std::iter::FusedIterator; +use std::iter::{self, FusedIterator}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::thread; use std::time::Duration; use std::vec; +use crossbeam_deque::{Stealer, Worker as Deque}; use same_file::Handle; use walkdir::{self, WalkDir}; @@ -1231,9 +1232,8 @@ impl WalkParallel { /// can be merged together into a single data structure. pub fn visit(mut self, builder: &mut dyn ParallelVisitorBuilder<'_>) { let threads = self.threads(); - let stack = Arc::new(Mutex::new(vec![])); + let mut stack = vec![]; { - let mut stack = stack.lock().unwrap(); let mut visitor = builder.build(); let mut paths = Vec::new().into_iter(); std::mem::swap(&mut paths, &mut self.paths); @@ -1283,14 +1283,14 @@ impl WalkParallel { } // Create the workers and then wait for them to finish. let quit_now = Arc::new(AtomicBool::new(false)); - let num_pending = - Arc::new(AtomicUsize::new(stack.lock().unwrap().len())); + let num_pending = Arc::new(AtomicUsize::new(stack.len())); + let stacks = Stack::new_for_each_thread(threads, stack); std::thread::scope(|s| { - let mut handles = vec![]; - for _ in 0..threads { - let worker = Worker { + let handles: Vec<_> = stacks + .into_iter() + .map(|stack| Worker { visitor: builder.build(), - stack: stack.clone(), + stack, quit_now: quit_now.clone(), num_pending: num_pending.clone(), max_depth: self.max_depth, @@ -1298,9 +1298,9 @@ impl WalkParallel { follow_links: self.follow_links, skip: self.skip.clone(), filter: self.filter.clone(), - }; - handles.push(s.spawn(|| worker.run())); - } + }) + .map(|worker| s.spawn(|| worker.run())) + .collect(); for handle in handles { handle.join().unwrap(); } @@ -1390,6 +1390,73 @@ impl Work { } } +/// A work-stealing stack. +#[derive(Debug)] +struct Stack { + /// This thread's index. + index: usize, + /// The thread-local stack. + deque: Deque, + /// The work stealers. + stealers: Arc<[Stealer]>, +} + +impl Stack { + /// Create a work-stealing stack for each thread. The given messages + /// correspond to the initial paths to start the search at. They will + /// be distributed automatically to each stack in a round-robin fashion. + fn new_for_each_thread(threads: usize, init: Vec) -> Vec { + // Using new_lifo() ensures each worker operates depth-first, not + // breadth-first. We do depth-first because a breadth first traversal + // on wide directories with a lot of gitignores is disastrous (for + // example, searching a directory tree containing all of crates.io). + let deques: Vec> = + iter::repeat_with(Deque::new_lifo).take(threads).collect(); + let stealers = Arc::<[Stealer]>::from( + deques.iter().map(Deque::stealer).collect::>(), + ); + let stacks: Vec = deques + .into_iter() + .enumerate() + .map(|(index, deque)| Stack { + index, + deque, + stealers: stealers.clone(), + }) + .collect(); + // Distribute the initial messages. + init.into_iter() + .zip(stacks.iter().cycle()) + .for_each(|(m, s)| s.push(m)); + stacks + } + + /// Push a message. + fn push(&self, msg: Message) { + self.deque.push(msg); + } + + /// Pop a message. + fn pop(&self) -> Option { + self.deque.pop().or_else(|| self.steal()) + } + + /// Steal a message from another queue. + fn steal(&self) -> Option { + // For fairness, try to steal from index - 1, then index - 2, ... 0, + // then wrap around to len - 1, len - 2, ... index + 1. + let (left, right) = self.stealers.split_at(self.index); + // Don't steal from ourselves + let right = &right[1..]; + + left.iter() + .rev() + .chain(right.iter().rev()) + .map(|s| s.steal_batch_and_pop(&self.deque)) + .find_map(|s| s.success()) + } +} + /// A worker is responsible for descending into directories, updating the /// ignore matchers, producing new work and invoking the caller's callback. /// @@ -1397,13 +1464,13 @@ impl Work { struct Worker<'s> { /// The caller's callback. visitor: Box, - /// A stack of work to do. + /// A work-stealing stack of work to do. /// /// We use a stack instead of a channel because a stack lets us visit /// directories in depth first order. This can substantially reduce peak - /// memory usage by keeping both the number of files path and gitignore + /// memory usage by keeping both the number of file paths and gitignore /// matchers in memory lower. - stack: Arc>>, + stack: Stack, /// Whether all workers should terminate at the next opportunity. Note /// that we need this because we don't want other `Work` to be done after /// we quit. We wouldn't need this if have a priority channel. @@ -1668,20 +1735,17 @@ impl<'s> Worker<'s> { /// Send work. fn send(&self, work: Work) { self.num_pending.fetch_add(1, Ordering::SeqCst); - let mut stack = self.stack.lock().unwrap(); - stack.push(Message::Work(work)); + self.stack.push(Message::Work(work)); } /// Send a quit message. fn send_quit(&self) { - let mut stack = self.stack.lock().unwrap(); - stack.push(Message::Quit); + self.stack.push(Message::Quit); } /// Receive work. fn recv(&self) -> Option { - let mut stack = self.stack.lock().unwrap(); - stack.pop() + self.stack.pop() } /// Signal that work has been finished.