From a45739306a66e9777cf07cfed44c168b7e97c111 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 22 Nov 2025 11:18:04 +0000 Subject: [PATCH 1/2] Implement on-demand thread for non-blocking piped data processing Co-authored-by: Byron <63622+Byron@users.noreply.github.com> --- gix-filter/src/driver/apply.rs | 93 ++++++++++++++++++++++++++++++- gix-filter/tests/filter/driver.rs | 89 +++++++++++++++++++++++++++++ 2 files changed, 180 insertions(+), 2 deletions(-) diff --git a/gix-filter/src/driver/apply.rs b/gix-filter/src/driver/apply.rs index cde092a49c4..24487564adb 100644 --- a/gix-filter/src/driver/apply.rs +++ b/gix-filter/src/driver/apply.rs @@ -111,10 +111,24 @@ impl State { ) -> Result>, Error> { match self.maybe_launch_process(driver, operation, ctx.rela_path)? { Some(Process::SingleFile { mut child, command }) => { - std::io::copy(src, &mut child.stdin.take().expect("configured"))?; + // To avoid deadlock when the filter immediately echoes input to output (like `cat`), + // we need to write to stdin and read from stdout concurrently. If we write all data + // to stdin before reading from stdout, and the pipe buffer fills up, both processes + // will block: the filter blocks writing to stdout (buffer full), and we block writing + // to stdin (waiting for the filter to consume data). + // + // Solution: Read all data into a buffer, then spawn a thread to write it to stdin + // while we can immediately read from stdout. + let mut input_data = Vec::new(); + std::io::copy(src, &mut input_data)?; + + let stdin = child.stdin.take().expect("configured"); + let write_thread = WriterThread::spawn(input_data, stdin)?; + Ok(Some(MaybeDelayed::Immediate(Box::new(ReadFilterOutput { inner: child.stdout.take(), child: driver.required.then_some((child, command)), + write_thread: Some(write_thread), })))) } Some(Process::MultiFile { client, key }) => { @@ -202,11 +216,67 @@ pub enum MaybeDelayed<'a> { Immediate(Box), } +/// A helper to manage writing to stdin on a separate thread to avoid deadlock. +struct WriterThread { + handle: Option>>, +} + +impl WriterThread { + /// Spawn a thread that will write all data from `data` to `stdin`. + fn spawn(data: Vec, mut stdin: std::process::ChildStdin) -> std::io::Result { + let handle = std::thread::Builder::new() + .name("gix-filter-stdin-writer".into()) + .spawn(move || { + use std::io::Write; + stdin.write_all(&data)?; + // Explicitly drop stdin to close the pipe and signal EOF to the child + drop(stdin); + Ok(()) + })?; + + Ok(Self { handle: Some(handle) }) + } + + /// Wait for the writer thread to complete and return any error that occurred. + fn join(&mut self) -> std::io::Result<()> { + if let Some(handle) = self.handle.take() { + match handle.join() { + Ok(result) => result, + Err(panic_info) => { + let msg = if let Some(s) = panic_info.downcast_ref::() { + format!("Writer thread panicked: {s}") + } else if let Some(s) = panic_info.downcast_ref::<&str>() { + format!("Writer thread panicked: {s}") + } else { + "Writer thread panicked while writing to filter stdin".to_string() + }; + Err(std::io::Error::other(msg)) + } + } + } else { + Ok(()) + } + } +} + +impl Drop for WriterThread { + fn drop(&mut self) { + // Best effort join on drop. If the thread panicked or failed, log it for debugging. + // We can't propagate errors from Drop, so we only log them. Thread panics during Drop + // are unusual but can occur if the filter process closes stdin unexpectedly. + if let Err(_err) = self.join() { + gix_trace::debug!(err = %_err, "Failed to join writer thread during drop"); + } + } +} + /// A utility type to facilitate streaming the output of a filter process. struct ReadFilterOutput { inner: Option, /// The child is present if we need its exit code to be positive. child: Option<(std::process::Child, std::process::Command)>, + /// The thread writing to stdin, if any. Must be joined when reading is done. + write_thread: Option, } pub(crate) fn handle_io_err(err: &std::io::Error, running: &mut HashMap, process: &BStr) { @@ -222,9 +292,28 @@ impl std::io::Read for ReadFilterOutput { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { match self.inner.as_mut() { Some(inner) => { - let num_read = inner.read(buf)?; + let num_read = match inner.read(buf) { + Ok(n) => n, + Err(e) => { + // On read error, ensure we join the writer thread before propagating the error + if let Some(mut write_thread) = self.write_thread.take() { + // Try to join but prioritize the original read error + if let Err(_thread_err) = write_thread.join() { + gix_trace::debug!(thread_err = %_thread_err, read_err = %e, "Writer thread error during failed read"); + } + } + return Err(e); + } + }; + if num_read == 0 { self.inner.take(); + + // Join the writer thread first to ensure all data has been written + if let Some(mut write_thread) = self.write_thread.take() { + write_thread.join()?; + } + if let Some((mut child, cmd)) = self.child.take() { let status = child.wait()?; if !status.success() { diff --git a/gix-filter/tests/filter/driver.rs b/gix-filter/tests/filter/driver.rs index 77cde3c851c..59fbcfac330 100644 --- a/gix-filter/tests/filter/driver.rs +++ b/gix-filter/tests/filter/driver.rs @@ -371,6 +371,95 @@ pub(crate) mod apply { Ok(()) } + #[serial] + #[test] + fn large_file_with_cat_filter_does_not_hang() -> crate::Result { + // This test reproduces issue #2080 where using `cat` as a filter with a large file + // causes a deadlock. The pipe buffer is typically 64KB on Linux, so we use files + // larger than that to ensure the buffer fills up. + + // Typical pipe buffer sizes on Unix systems + const PIPE_BUFFER_SIZE: usize = 64 * 1024; // 64KB + + let mut state = gix_filter::driver::State::default(); + + // Create a driver that uses `cat` command (which echoes input to output immediately) + let driver = Driver { + name: "cat".into(), + clean: Some("cat".into()), + smudge: Some("cat".into()), + process: None, + required: false, + }; + + // Test with multiple sizes to ensure robustness + for size in [ + PIPE_BUFFER_SIZE, + 2 * PIPE_BUFFER_SIZE, + 8 * PIPE_BUFFER_SIZE, + 16 * PIPE_BUFFER_SIZE, + ] { + let input = vec![b'a'; size]; + + // Apply the filter - this should not hang + let mut filtered = state + .apply( + &driver, + &mut input.as_slice(), + driver::Operation::Smudge, + context_from_path("large-file.txt"), + )? + .expect("filter present"); + + let mut output = Vec::new(); + filtered.read_to_end(&mut output)?; + + assert_eq!( + input.len(), + output.len(), + "cat should pass through all data unchanged for {size} bytes" + ); + assert_eq!(input, output, "cat should not modify the data"); + } + Ok(()) + } + + #[serial] + #[test] + fn large_file_with_cat_filter_early_drop() -> crate::Result { + // Test that dropping the reader early doesn't cause issues (thread cleanup) + let mut state = gix_filter::driver::State::default(); + + let driver = Driver { + name: "cat".into(), + clean: Some("cat".into()), + smudge: Some("cat".into()), + process: None, + required: false, + }; + + let input = vec![b'x'; 256 * 1024]; + + // Apply the filter but only read a small amount + let mut filtered = state + .apply( + &driver, + &mut input.as_slice(), + driver::Operation::Clean, + context_from_path("early-drop.txt"), + )? + .expect("filter present"); + + let mut output = vec![0u8; 100]; + filtered.read_exact(&mut output)?; + assert_eq!(output, vec![b'x'; 100], "should read first 100 bytes"); + + // Drop the reader early - the thread should still clean up properly + drop(filtered); + + Ok(()) + } + pub(crate) fn extract_delayed_key(res: Option>) -> driver::Key { match res { Some(apply::MaybeDelayed::Immediate(_)) | None => { From 1aaa6fab136f69822f98bf7809b7c606b6ae49ba Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 22 Nov 2025 19:51:07 +0100 Subject: [PATCH 2/2] refactor - unrelated: remove unused dependencies - optimise writer thread --- Cargo.lock | 2 -- gix-filter/src/driver/apply.rs | 52 ++++++++++++++++------------------ gix-worktree-state/Cargo.toml | 2 -- 3 files changed, 25 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3a35bdba321..0456a923b2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2557,8 +2557,6 @@ dependencies = [ "gix-features", "gix-filter", "gix-fs", - "gix-glob", - "gix-hash", "gix-index", "gix-object", "gix-path", diff --git a/gix-filter/src/driver/apply.rs b/gix-filter/src/driver/apply.rs index 24487564adb..1ad5063d7d4 100644 --- a/gix-filter/src/driver/apply.rs +++ b/gix-filter/src/driver/apply.rs @@ -77,7 +77,7 @@ impl State { /// /// ### Deviation /// - /// If a long running process returns the 'abort' status after receiving the data, it will be removed similarly to how `git` does it. + /// If a long-running process returns the 'abort' status after receiving the data, it will be removed similarly to how `git` does it. /// However, if it returns an unsuccessful error status later, it will not be removed, but reports the error only. /// If any other non-'error' status is received, the process will be stopped. But that doesn't happen if such a status is received /// after reading the filtered result. @@ -97,9 +97,9 @@ impl State { } } - /// Like [`apply()]`[Self::apply()], but use `delay` to determine if the filter result may be delayed or not. + /// Like [`apply()`](Self::apply()), but use `delay` to determine if the filter result may be delayed or not. /// - /// Poll [`list_delayed_paths()`][Self::list_delayed_paths()] until it is empty and query the available paths again. + /// Poll [`list_delayed_paths()`](Self::list_delayed_paths()) until it is empty and query the available paths again. /// Note that even though it's possible, the API assumes that commands aren't mixed when delays are allowed. pub fn apply_delayed<'a>( &'a mut self, @@ -123,7 +123,7 @@ impl State { std::io::copy(src, &mut input_data)?; let stdin = child.stdin.take().expect("configured"); - let write_thread = WriterThread::spawn(input_data, stdin)?; + let write_thread = WriterThread::write_all_in_background(input_data, stdin)?; Ok(Some(MaybeDelayed::Immediate(Box::new(ReadFilterOutput { inner: child.stdout.take(), @@ -223,9 +223,10 @@ struct WriterThread { impl WriterThread { /// Spawn a thread that will write all data from `data` to `stdin`. - fn spawn(data: Vec, mut stdin: std::process::ChildStdin) -> std::io::Result { + fn write_all_in_background(data: Vec, mut stdin: std::process::ChildStdin) -> std::io::Result { let handle = std::thread::Builder::new() .name("gix-filter-stdin-writer".into()) + .stack_size(128 * 1024) .spawn(move || { use std::io::Write; stdin.write_all(&data)?; @@ -239,31 +240,25 @@ impl WriterThread { /// Wait for the writer thread to complete and return any error that occurred. fn join(&mut self) -> std::io::Result<()> { - if let Some(handle) = self.handle.take() { - match handle.join() { - Ok(result) => result, - Err(panic_info) => { - let msg = if let Some(s) = panic_info.downcast_ref::() { - format!("Writer thread panicked: {s}") - } else if let Some(s) = panic_info.downcast_ref::<&str>() { - format!("Writer thread panicked: {s}") - } else { - "Writer thread panicked while writing to filter stdin".to_string() - }; - Err(std::io::Error::other(msg)) - } - } - } else { - Ok(()) - } + let Some(handle) = self.handle.take() else { + return Ok(()); + }; + handle.join().map_err(|panic_info| { + let msg = if let Some(s) = panic_info.downcast_ref::() { + format!("Writer thread panicked: {s}") + } else if let Some(s) = panic_info.downcast_ref::<&str>() { + format!("Writer thread panicked: {s}") + } else { + "Writer thread panicked while writing to filter stdin".to_string() + }; + std::io::Error::other(msg) + })? } } impl Drop for WriterThread { fn drop(&mut self) { - // Best effort join on drop. If the thread panicked or failed, log it for debugging. - // We can't propagate errors from Drop, so we only log them. Thread panics during Drop - // are unusual but can occur if the filter process closes stdin unexpectedly. + // Best effort join on drop. if let Err(_err) = self.join() { gix_trace::debug!(err = %_err, "Failed to join writer thread during drop"); } @@ -295,11 +290,13 @@ impl std::io::Read for ReadFilterOutput { let num_read = match inner.read(buf) { Ok(n) => n, Err(e) => { - // On read error, ensure we join the writer thread before propagating the error + // On read error, ensure we join the writer thread before propagating the error. + // This is expected to finish with failure as well as it should fail to write + // to the process which now fails to produce output (that we try to read). if let Some(mut write_thread) = self.write_thread.take() { // Try to join but prioritize the original read error if let Err(_thread_err) = write_thread.join() { - gix_trace::debug!(thread_err = %_thread_err, read_err = %e, "Writer thread error during failed read"); + gix_trace::debug!(thread_err = %_thread_err, read_err = %e, "write to stdin error during failed read"); } } return Err(e); @@ -310,6 +307,7 @@ impl std::io::Read for ReadFilterOutput { self.inner.take(); // Join the writer thread first to ensure all data has been written + // and that resources are freed now. if let Some(mut write_thread) = self.write_thread.take() { write_thread.join()?; } diff --git a/gix-worktree-state/Cargo.toml b/gix-worktree-state/Cargo.toml index b90c1136463..d4108dc298a 100644 --- a/gix-worktree-state/Cargo.toml +++ b/gix-worktree-state/Cargo.toml @@ -19,9 +19,7 @@ doctest = false gix-worktree = { version = "^0.43.0", path = "../gix-worktree", default-features = false, features = ["attributes"] } gix-index = { version = "^0.42.0", path = "../gix-index" } gix-fs = { version = "^0.17.0", path = "../gix-fs" } -gix-hash = { version = "^0.20.0", path = "../gix-hash" } gix-object = { version = "^0.51.0", path = "../gix-object" } -gix-glob = { version = "^0.22.0", path = "../gix-glob" } gix-path = { version = "^0.10.21", path = "../gix-path" } gix-features = { version = "^0.44.0", path = "../gix-features" } gix-filter = { version = "^0.21.0", path = "../gix-filter" }