Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

97 changes: 92 additions & 5 deletions gix-filter/src/driver/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl State {
///
/// ### Deviation
///
/// If a long running process returns the 'abort' status after receiving the data, it will be removed similarly to how `git` does it.
/// If a long-running process returns the 'abort' status after receiving the data, it will be removed similarly to how `git` does it.
/// However, if it returns an unsuccessful error status later, it will not be removed, but reports the error only.
/// If any other non-'error' status is received, the process will be stopped. But that doesn't happen if such a status is received
/// after reading the filtered result.
Expand All @@ -97,9 +97,9 @@ impl State {
}
}

/// Like [`apply()]`[Self::apply()], but use `delay` to determine if the filter result may be delayed or not.
/// Like [`apply()`](Self::apply()), but use `delay` to determine if the filter result may be delayed or not.
///
/// Poll [`list_delayed_paths()`][Self::list_delayed_paths()] until it is empty and query the available paths again.
/// Poll [`list_delayed_paths()`](Self::list_delayed_paths()) until it is empty and query the available paths again.
/// Note that even though it's possible, the API assumes that commands aren't mixed when delays are allowed.
pub fn apply_delayed<'a>(
&'a mut self,
Expand All @@ -111,10 +111,24 @@ impl State {
) -> Result<Option<MaybeDelayed<'a>>, Error> {
match self.maybe_launch_process(driver, operation, ctx.rela_path)? {
Some(Process::SingleFile { mut child, command }) => {
std::io::copy(src, &mut child.stdin.take().expect("configured"))?;
// To avoid deadlock when the filter immediately echoes input to output (like `cat`),
// we need to write to stdin and read from stdout concurrently. If we write all data
// to stdin before reading from stdout, and the pipe buffer fills up, both processes
// will block: the filter blocks writing to stdout (buffer full), and we block writing
// to stdin (waiting for the filter to consume data).
//
// Solution: Read all data into a buffer, then spawn a thread to write it to stdin
// while we can immediately read from stdout.
let mut input_data = Vec::new();
std::io::copy(src, &mut input_data)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot instead of reading everything ahead of time, pass the child stdin handle to the thread instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current API has src: &mut impl std::io::Read which is a reference that can't be moved into a thread. To avoid buffering, would you like me to:

  1. Change the signature to take ownership (e.g., src: impl std::io::Read + Send + 'static)?
  2. Use a different approach like Box<dyn std::io::Read + Send> or channels?
  3. Something else?

Currently the code reads from src into a buffer before spawning the thread because the reference can't be moved. Could you clarify the preferred approach?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have to discard it as it probably needs a redesign or some much more intricate implementation.
It's the question if deadlocking is worse than possibly using a lot of memory.

Effectively the only kind of filter that's working now is those which don't output large amounts of data immediately, or multi-file filters.

And… thinking about it, deadlocking is clearly worse as it always fails, whereas using memory will typically work.


let stdin = child.stdin.take().expect("configured");
let write_thread = WriterThread::write_all_in_background(input_data, stdin)?;

Ok(Some(MaybeDelayed::Immediate(Box::new(ReadFilterOutput {
inner: child.stdout.take(),
child: driver.required.then_some((child, command)),
write_thread: Some(write_thread),
}))))
}
Some(Process::MultiFile { client, key }) => {
Expand Down Expand Up @@ -202,11 +216,62 @@ pub enum MaybeDelayed<'a> {
Immediate(Box<dyn std::io::Read + 'a>),
}

/// A helper to manage writing to stdin on a separate thread to avoid deadlock.
struct WriterThread {
handle: Option<std::thread::JoinHandle<std::io::Result<()>>>,
}

impl WriterThread {
/// Spawn a thread that will write all data from `data` to `stdin`.
fn write_all_in_background(data: Vec<u8>, mut stdin: std::process::ChildStdin) -> std::io::Result<Self> {
let handle = std::thread::Builder::new()
.name("gix-filter-stdin-writer".into())
.stack_size(128 * 1024)
.spawn(move || {
use std::io::Write;
stdin.write_all(&data)?;
// Explicitly drop stdin to close the pipe and signal EOF to the child
drop(stdin);
Ok(())
})?;

Ok(Self { handle: Some(handle) })
}

/// Wait for the writer thread to complete and return any error that occurred.
fn join(&mut self) -> std::io::Result<()> {
let Some(handle) = self.handle.take() else {
return Ok(());
};
handle.join().map_err(|panic_info| {
let msg = if let Some(s) = panic_info.downcast_ref::<String>() {
format!("Writer thread panicked: {s}")
} else if let Some(s) = panic_info.downcast_ref::<&str>() {
format!("Writer thread panicked: {s}")
} else {
"Writer thread panicked while writing to filter stdin".to_string()
};
std::io::Error::other(msg)
})?
}
}

impl Drop for WriterThread {
fn drop(&mut self) {
// Best effort join on drop.
if let Err(_err) = self.join() {
gix_trace::debug!(err = %_err, "Failed to join writer thread during drop");
}
}
}

/// A utility type to facilitate streaming the output of a filter process.
struct ReadFilterOutput {
inner: Option<std::process::ChildStdout>,
/// The child is present if we need its exit code to be positive.
child: Option<(std::process::Child, std::process::Command)>,
/// The thread writing to stdin, if any. Must be joined when reading is done.
write_thread: Option<WriterThread>,
}

pub(crate) fn handle_io_err(err: &std::io::Error, running: &mut HashMap<BString, process::Client>, process: &BStr) {
Expand All @@ -222,9 +287,31 @@ impl std::io::Read for ReadFilterOutput {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match self.inner.as_mut() {
Some(inner) => {
let num_read = inner.read(buf)?;
let num_read = match inner.read(buf) {
Ok(n) => n,
Err(e) => {
// On read error, ensure we join the writer thread before propagating the error.
// This is expected to finish with failure as well as it should fail to write
// to the process which now fails to produce output (that we try to read).
if let Some(mut write_thread) = self.write_thread.take() {
// Try to join but prioritize the original read error
if let Err(_thread_err) = write_thread.join() {
gix_trace::debug!(thread_err = %_thread_err, read_err = %e, "write to stdin error during failed read");
}
}
return Err(e);
}
};

if num_read == 0 {
self.inner.take();

// Join the writer thread first to ensure all data has been written
// and that resources are freed now.
if let Some(mut write_thread) = self.write_thread.take() {
write_thread.join()?;
}

if let Some((mut child, cmd)) = self.child.take() {
let status = child.wait()?;
if !status.success() {
Expand Down
89 changes: 89 additions & 0 deletions gix-filter/tests/filter/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,95 @@ pub(crate) mod apply {
Ok(())
}

#[serial]
#[test]
fn large_file_with_cat_filter_does_not_hang() -> crate::Result {
// This test reproduces issue #2080 where using `cat` as a filter with a large file
// causes a deadlock. The pipe buffer is typically 64KB on Linux, so we use files
// larger than that to ensure the buffer fills up.

// Typical pipe buffer sizes on Unix systems
const PIPE_BUFFER_SIZE: usize = 64 * 1024; // 64KB

let mut state = gix_filter::driver::State::default();

// Create a driver that uses `cat` command (which echoes input to output immediately)
let driver = Driver {
name: "cat".into(),
clean: Some("cat".into()),
smudge: Some("cat".into()),
process: None,
required: false,
};

// Test with multiple sizes to ensure robustness
for size in [
PIPE_BUFFER_SIZE,
2 * PIPE_BUFFER_SIZE,
8 * PIPE_BUFFER_SIZE,
16 * PIPE_BUFFER_SIZE,
] {
let input = vec![b'a'; size];

// Apply the filter - this should not hang
let mut filtered = state
.apply(
&driver,
&mut input.as_slice(),
driver::Operation::Smudge,
context_from_path("large-file.txt"),
)?
.expect("filter present");

let mut output = Vec::new();
filtered.read_to_end(&mut output)?;

assert_eq!(
input.len(),
output.len(),
"cat should pass through all data unchanged for {size} bytes"
);
assert_eq!(input, output, "cat should not modify the data");
}
Ok(())
}

#[serial]
#[test]
fn large_file_with_cat_filter_early_drop() -> crate::Result {
// Test that dropping the reader early doesn't cause issues (thread cleanup)
let mut state = gix_filter::driver::State::default();

let driver = Driver {
name: "cat".into(),
clean: Some("cat".into()),
smudge: Some("cat".into()),
process: None,
required: false,
};

let input = vec![b'x'; 256 * 1024];

// Apply the filter but only read a small amount
let mut filtered = state
.apply(
&driver,
&mut input.as_slice(),
driver::Operation::Clean,
context_from_path("early-drop.txt"),
)?
.expect("filter present");

let mut output = vec![0u8; 100];
filtered.read_exact(&mut output)?;
assert_eq!(output, vec![b'x'; 100], "should read first 100 bytes");

// Drop the reader early - the thread should still clean up properly
drop(filtered);

Ok(())
}

pub(crate) fn extract_delayed_key(res: Option<apply::MaybeDelayed<'_>>) -> driver::Key {
match res {
Some(apply::MaybeDelayed::Immediate(_)) | None => {
Expand Down
2 changes: 0 additions & 2 deletions gix-worktree-state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ doctest = false
gix-worktree = { version = "^0.43.0", path = "../gix-worktree", default-features = false, features = ["attributes"] }
gix-index = { version = "^0.42.0", path = "../gix-index" }
gix-fs = { version = "^0.17.0", path = "../gix-fs" }
gix-hash = { version = "^0.20.0", path = "../gix-hash" }
gix-object = { version = "^0.51.0", path = "../gix-object" }
gix-glob = { version = "^0.22.0", path = "../gix-glob" }
gix-path = { version = "^0.10.21", path = "../gix-path" }
gix-features = { version = "^0.44.0", path = "../gix-features" }
gix-filter = { version = "^0.21.0", path = "../gix-filter" }
Expand Down
Loading