Skip to content

Commit

Permalink
Auto merge of rust-lang#96441 - ChrisDenton:sync-pipes, r=m-ou-se
Browse files Browse the repository at this point in the history
Windows: Make stdin pipes synchronous

Stdin pipes do not need to be used asynchronously within the standard library. This is a first step in making pipes mostly synchronous.

r? `@m-ou-se`
  • Loading branch information
bors committed Apr 29, 2022
2 parents baaa3b6 + 1e7c156 commit ddb7fbe
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 31 deletions.
12 changes: 12 additions & 0 deletions library/std/src/os/windows/io/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,18 @@ impl OwnedHandle {
})?;
unsafe { Ok(Self::from_raw_handle(ret)) }
}

/// Allow child processes to inherit the handle.
pub(crate) fn set_inheritable(&self) -> io::Result<()> {
cvt(unsafe {
c::SetHandleInformation(
self.as_raw_handle(),
c::HANDLE_FLAG_INHERIT,
c::HANDLE_FLAG_INHERIT,
)
})?;
Ok(())
}
}

impl TryFrom<HandleOrInvalid> for OwnedHandle {
Expand Down
6 changes: 6 additions & 0 deletions library/std/src/sys/windows/c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,12 @@ extern "system" {
bWaitAll: BOOL,
dwMilliseconds: DWORD,
) -> DWORD;
pub fn CreatePipe(
hReadPipe: *mut HANDLE,
hWritePipe: *mut HANDLE,
lpPipeAttributes: *const SECURITY_ATTRIBUTES,
nSize: DWORD,
) -> BOOL;
pub fn CreateNamedPipeW(
lpName: LPCWSTR,
dwOpenMode: DWORD,
Expand Down
4 changes: 4 additions & 0 deletions library/std/src/sys/windows/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ impl Handle {
Ok(Self(self.0.duplicate(access, inherit, options)?))
}

pub(crate) fn set_inheritable(&self) -> io::Result<()> {
self.0.set_inheritable()
}

/// Performs a synchronous read.
///
/// If the handle is opened for asynchronous I/O then this abort the process.
Expand Down
90 changes: 65 additions & 25 deletions library/std/src/sys/windows/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,56 @@ use crate::sys_common::IntoInner;
// Anonymous pipes
////////////////////////////////////////////////////////////////////////////////

pub struct AnonPipe {
inner: Handle,
// A 64kb pipe capacity is the same as a typical Linux default.
const PIPE_BUFFER_CAPACITY: u32 = 64 * 1024;

pub enum AnonPipe {
Sync(Handle),
Async(Handle),
}

impl IntoInner<Handle> for AnonPipe {
fn into_inner(self) -> Handle {
self.inner
match self {
Self::Sync(handle) => handle,
Self::Async(handle) => handle,
}
}
}

pub struct Pipes {
pub ours: AnonPipe,
pub theirs: AnonPipe,
}
impl Pipes {
/// Create a new pair of pipes where both pipes are synchronous.
///
/// These must not be used asynchronously.
pub fn new_synchronous(
ours_readable: bool,
their_handle_inheritable: bool,
) -> io::Result<Self> {
unsafe {
// If `CreatePipe` succeeds, these will be our pipes.
let mut read = ptr::null_mut();
let mut write = ptr::null_mut();

if c::CreatePipe(&mut read, &mut write, ptr::null(), PIPE_BUFFER_CAPACITY) == 0 {
Err(io::Error::last_os_error())
} else {
let (ours, theirs) = if ours_readable { (read, write) } else { (write, read) };
let ours = Handle::from_raw_handle(ours);
let theirs = Handle::from_raw_handle(theirs);

if their_handle_inheritable {
theirs.set_inheritable()?;
}

Ok(Pipes { ours: AnonPipe::Sync(ours), theirs: AnonPipe::Sync(theirs) })
}
}
}
}

/// Although this looks similar to `anon_pipe` in the Unix module it's actually
/// subtly different. Here we'll return two pipes in the `Pipes` return value,
Expand All @@ -53,9 +89,6 @@ pub struct Pipes {
/// with `OVERLAPPED` instances, but also works out ok if it's only ever used
/// once at a time (which we do indeed guarantee).
pub fn anon_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Result<Pipes> {
// A 64kb pipe capacity is the same as a typical Linux default.
const PIPE_BUFFER_CAPACITY: u32 = 64 * 1024;

// Note that we specifically do *not* use `CreatePipe` here because
// unfortunately the anonymous pipes returned do not support overlapped
// operations. Instead, we create a "hopefully unique" name and create a
Expand Down Expand Up @@ -156,12 +189,9 @@ pub fn anon_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Res
};
opts.security_attributes(&mut sa);
let theirs = File::open(Path::new(&name), &opts)?;
let theirs = AnonPipe { inner: theirs.into_inner() };
let theirs = AnonPipe::Sync(theirs.into_inner());

Ok(Pipes {
ours: AnonPipe { inner: ours },
theirs: AnonPipe { inner: theirs.into_inner() },
})
Ok(Pipes { ours: AnonPipe::Async(ours), theirs })
}
}

Expand All @@ -171,12 +201,12 @@ pub fn anon_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Res
/// This is achieved by creating a new set of pipes and spawning a thread that
/// relays messages between the source and the synchronous pipe.
pub fn spawn_pipe_relay(
source: &AnonPipe,
source: &Handle,
ours_readable: bool,
their_handle_inheritable: bool,
) -> io::Result<AnonPipe> {
// We need this handle to live for the lifetime of the thread spawned below.
let source = source.duplicate()?;
let source = AnonPipe::Async(source.duplicate(0, true, c::DUPLICATE_SAME_ACCESS)?);

// create a new pair of anon pipes.
let Pipes { theirs, ours } = anon_pipe(ours_readable, their_handle_inheritable)?;
Expand Down Expand Up @@ -227,19 +257,24 @@ type AlertableIoFn = unsafe extern "system" fn(

impl AnonPipe {
pub fn handle(&self) -> &Handle {
&self.inner
match self {
Self::Async(ref handle) => handle,
Self::Sync(ref handle) => handle,
}
}
pub fn into_handle(self) -> Handle {
self.inner
}
fn duplicate(&self) -> io::Result<Self> {
self.inner.duplicate(0, false, c::DUPLICATE_SAME_ACCESS).map(|inner| AnonPipe { inner })
self.into_inner()
}

pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
let result = unsafe {
let len = crate::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD;
self.alertable_io_internal(c::ReadFileEx, buf.as_mut_ptr() as _, len)
match self {
Self::Sync(ref handle) => handle.read(buf),
Self::Async(_) => {
self.alertable_io_internal(c::ReadFileEx, buf.as_mut_ptr() as _, len)
}
}
};

match result {
Expand All @@ -253,28 +288,33 @@ impl AnonPipe {
}

pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
self.inner.read_vectored(bufs)
io::default_read_vectored(|buf| self.read(buf), bufs)
}

#[inline]
pub fn is_read_vectored(&self) -> bool {
self.inner.is_read_vectored()
false
}

pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
unsafe {
let len = crate::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD;
self.alertable_io_internal(c::WriteFileEx, buf.as_ptr() as _, len)
match self {
Self::Sync(ref handle) => handle.write(buf),
Self::Async(_) => {
self.alertable_io_internal(c::WriteFileEx, buf.as_ptr() as _, len)
}
}
}
}

pub fn write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
self.inner.write_vectored(bufs)
io::default_write_vectored(|buf| self.write(buf), bufs)
}

#[inline]
pub fn is_write_vectored(&self) -> bool {
self.inner.is_write_vectored()
false
}

/// Synchronizes asynchronous reads or writes using our anonymous pipe.
Expand Down Expand Up @@ -346,7 +386,7 @@ impl AnonPipe {

// Asynchronous read of the pipe.
// If successful, `callback` will be called once it completes.
let result = io(self.inner.as_handle(), buf, len, &mut overlapped, callback);
let result = io(self.handle().as_handle(), buf, len, &mut overlapped, callback);
if result == c::FALSE {
// We can return here because the call failed.
// After this we must not return until the I/O completes.
Expand Down
38 changes: 32 additions & 6 deletions library/std/src/sys/windows/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::sys::cvt;
use crate::sys::fs::{File, OpenOptions};
use crate::sys::handle::Handle;
use crate::sys::path;
use crate::sys::pipe::{self, AnonPipe};
use crate::sys::pipe::{self, AnonPipe, Pipes};
use crate::sys::stdio;
use crate::sys_common::mutex::StaticMutex;
use crate::sys_common::process::{CommandEnv, CommandEnvs};
Expand Down Expand Up @@ -173,7 +173,7 @@ pub enum Stdio {
Inherit,
Null,
MakePipe,
Pipe(AnonPipe),
AsyncPipe(Handle),
Handle(Handle),
}

Expand Down Expand Up @@ -527,13 +527,33 @@ impl Stdio {
},

Stdio::MakePipe => {
let ours_readable = stdio_id != c::STD_INPUT_HANDLE;
let pipes = pipe::anon_pipe(ours_readable, true)?;
// Handles that are passed to a child process must be synchronous
// because they will be read synchronously (see #95759).
// Therefore we prefer to make both ends of a pipe synchronous
// just in case our end of the pipe is passed to another process.
//
// However, we may need to read from both the child's stdout and
// stderr simultaneously when waiting for output. This requires
// async reads so as to avoid blocking either pipe.
//
// The solution used here is to make handles synchronous
// except for our side of the stdout and sterr pipes.
// If our side of those pipes do end up being given to another
// process then we use a "pipe relay" to synchronize access
// (see `Stdio::AsyncPipe` below).
let pipes = if stdio_id == c::STD_INPUT_HANDLE {
// For stdin both sides of the pipe are synchronous.
Pipes::new_synchronous(false, true)?
} else {
// For stdout/stderr our side of the pipe is async and their side is synchronous.
pipe::anon_pipe(true, true)?
};
*pipe = Some(pipes.ours);
Ok(pipes.theirs.into_handle())
}

Stdio::Pipe(ref source) => {
Stdio::AsyncPipe(ref source) => {
// We need to synchronize asynchronous pipes by using a pipe relay.
let ours_readable = stdio_id != c::STD_INPUT_HANDLE;
pipe::spawn_pipe_relay(source, ours_readable, true).map(AnonPipe::into_handle)
}
Expand Down Expand Up @@ -562,7 +582,13 @@ impl Stdio {

impl From<AnonPipe> for Stdio {
fn from(pipe: AnonPipe) -> Stdio {
Stdio::Pipe(pipe)
// Note that it's very important we don't give async handles to child processes.
// Therefore if the pipe is asynchronous we must have a way to turn it synchronous.
// See #95759.
match pipe {
AnonPipe::Sync(handle) => Stdio::Handle(handle),
AnonPipe::Async(handle) => Stdio::AsyncPipe(handle),
}
}
}

Expand Down

0 comments on commit ddb7fbe

Please sign in to comment.