Skip to content

Commit

Permalink
native: clone/close_accept for win32 pipes
Browse files Browse the repository at this point in the history
This commits takes a similar strategy to the previous commit to implement
close_accept and clone for the native win32 pipes implementation.

Closes #15595
  • Loading branch information
alexcrichton committed Aug 25, 2014
1 parent c301db2 commit fd763a5
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 48 deletions.
6 changes: 6 additions & 0 deletions src/libnative/io/c_windows.rs
Expand Up @@ -115,6 +115,12 @@ extern "system" {
optval: *mut libc::c_char,
optlen: *mut libc::c_int) -> libc::c_int;

pub fn SetEvent(hEvent: libc::HANDLE) -> libc::BOOL;
pub fn WaitForMultipleObjects(nCount: libc::DWORD,
lpHandles: *const libc::HANDLE,
bWaitAll: libc::BOOL,
dwMilliseconds: libc::DWORD) -> libc::DWORD;

pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL;
pub fn CancelIoEx(hFile: libc::HANDLE,
lpOverlapped: libc::LPOVERLAPPED) -> libc::BOOL;
Expand Down
35 changes: 23 additions & 12 deletions src/libnative/io/net.rs
Expand Up @@ -15,7 +15,7 @@ use std::ptr;
use std::rt::mutex;
use std::rt::rtio;
use std::rt::rtio::{IoResult, IoError};
use std::sync::atomics;
use std::sync::atomic;

use super::{retry, keep_going};
use super::c;
Expand Down Expand Up @@ -456,7 +456,7 @@ impl TcpListener {
listener: self,
reader: reader,
writer: writer,
closed: atomics::AtomicBool::new(false),
closed: atomic::AtomicBool::new(false),
}),
deadline: 0,
})
Expand All @@ -476,7 +476,7 @@ impl TcpListener {
listener: self,
abort: try!(os::Event::new()),
accept: accept,
closed: atomics::AtomicBool::new(false),
closed: atomic::AtomicBool::new(false),
}),
deadline: 0,
})
Expand Down Expand Up @@ -510,15 +510,15 @@ struct AcceptorInner {
listener: TcpListener,
reader: FileDesc,
writer: FileDesc,
closed: atomics::AtomicBool,
closed: atomic::AtomicBool,
}

#[cfg(windows)]
struct AcceptorInner {
listener: TcpListener,
abort: os::Event,
accept: os::Event,
closed: atomics::AtomicBool,
closed: atomic::AtomicBool,
}

impl TcpAcceptor {
Expand All @@ -542,7 +542,7 @@ impl TcpAcceptor {
// self-pipe is never written to unless close_accept() is called.
let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};

while !self.inner.closed.load(atomics::SeqCst) {
while !self.inner.closed.load(atomic::SeqCst) {
match retry(|| unsafe {
libc::accept(self.fd(), ptr::mut_null(), ptr::mut_null())
}) {
Expand Down Expand Up @@ -581,12 +581,12 @@ impl TcpAcceptor {
// stolen, so we do all of this in a loop as well.
let events = [self.inner.abort.handle(), self.inner.accept.handle()];

while !self.inner.closed.load(atomics::SeqCst) {
while !self.inner.closed.load(atomic::SeqCst) {
let ms = if self.deadline == 0 {
c::WSA_INFINITE as u64
} else {
let now = ::io::timer::now();
if self.deadline < now {0} else {now - self.deadline}
if self.deadline < now {0} else {self.deadline - now}
};
let ret = unsafe {
c::WSAWaitForMultipleEvents(2, events.as_ptr(), libc::FALSE,
Expand All @@ -600,7 +600,6 @@ impl TcpAcceptor {
c::WSA_WAIT_EVENT_0 => break,
n => assert_eq!(n, c::WSA_WAIT_EVENT_0 + 1),
}
println!("woke up");

let mut wsaevents: c::WSANETWORKEVENTS = unsafe { mem::zeroed() };
let ret = unsafe {
Expand All @@ -614,7 +613,19 @@ impl TcpAcceptor {
} {
-1 if util::wouldblock() => {}
-1 => return Err(os::last_error()),
fd => return Ok(TcpStream::new(Inner::new(fd))),

// Accepted sockets inherit the same properties as the caller,
// so we need to deregister our event and switch the socket back
// to blocking mode
fd => {
let stream = TcpStream::new(Inner::new(fd));
let ret = unsafe {
c::WSAEventSelect(fd, events[1], 0)
};
if ret != 0 { return Err(os::last_error()) }
try!(util::set_nonblocking(fd, false));
return Ok(stream)
}
}
}

Expand Down Expand Up @@ -648,7 +659,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {

#[cfg(unix)]
fn close_accept(&mut self) -> IoResult<()> {
self.inner.closed.store(true, atomics::SeqCst);
self.inner.closed.store(true, atomic::SeqCst);
let mut fd = FileDesc::new(self.inner.writer.fd(), false);
match fd.inner_write([0]) {
Ok(..) => Ok(()),
Expand All @@ -659,7 +670,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {

#[cfg(windows)]
fn close_accept(&mut self) -> IoResult<()> {
self.inner.closed.store(true, atomics::SeqCst);
self.inner.closed.store(true, atomic::SeqCst);
let ret = unsafe { c::WSASetEvent(self.inner.abort.handle()) };
if ret == libc::TRUE {
Ok(())
Expand Down
10 changes: 5 additions & 5 deletions src/libnative/io/pipe_unix.rs
Expand Up @@ -15,7 +15,7 @@ use std::mem;
use std::rt::mutex;
use std::rt::rtio;
use std::rt::rtio::{IoResult, IoError};
use std::sync::atomics;
use std::sync::atomic;

use super::retry;
use super::net;
Expand Down Expand Up @@ -239,7 +239,7 @@ impl UnixListener {
listener: self,
reader: reader,
writer: writer,
closed: atomics::AtomicBool::new(false),
closed: atomic::AtomicBool::new(false),
}),
deadline: 0,
})
Expand Down Expand Up @@ -267,7 +267,7 @@ struct AcceptorInner {
listener: UnixListener,
reader: FileDesc,
writer: FileDesc,
closed: atomics::AtomicBool,
closed: atomic::AtomicBool,
}

impl UnixAcceptor {
Expand All @@ -276,7 +276,7 @@ impl UnixAcceptor {
pub fn native_accept(&mut self) -> IoResult<UnixStream> {
let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};

while !self.inner.closed.load(atomics::SeqCst) {
while !self.inner.closed.load(atomic::SeqCst) {
unsafe {
let mut storage: libc::sockaddr_storage = mem::zeroed();
let storagep = &mut storage as *mut libc::sockaddr_storage;
Expand Down Expand Up @@ -317,7 +317,7 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {

#[cfg(unix)]
fn close_accept(&mut self) -> IoResult<()> {
self.inner.closed.store(true, atomics::SeqCst);
self.inner.closed.store(true, atomic::SeqCst);
let mut fd = FileDesc::new(self.inner.writer.fd(), false);
match fd.inner_write([0]) {
Ok(..) => Ok(()),
Expand Down
94 changes: 71 additions & 23 deletions src/libnative/io/pipe_windows.rs
Expand Up @@ -169,23 +169,30 @@ unsafe fn pipe(name: *const u16, init: bool) -> libc::HANDLE {
}

pub fn await(handle: libc::HANDLE, deadline: u64,
overlapped: &mut libc::OVERLAPPED) -> bool {
if deadline == 0 { return true }
events: &[libc::HANDLE]) -> IoResult<uint> {
use libc::consts::os::extra::{WAIT_FAILED, WAIT_TIMEOUT, WAIT_OBJECT_0};

// If we've got a timeout, use WaitForSingleObject in tandem with CancelIo
// to figure out if we should indeed get the result.
let now = ::io::timer::now();
let timeout = deadline < now || unsafe {
let ms = (deadline - now) as libc::DWORD;
let r = libc::WaitForSingleObject(overlapped.hEvent,
ms);
r != libc::WAIT_OBJECT_0
};
if timeout {
unsafe { let _ = c::CancelIo(handle); }
false
let ms = if deadline == 0 {
libc::INFINITE as u64
} else {
true
let now = ::io::timer::now();
if deadline < now {0} else {deadline - now}
};
let ret = unsafe {
c::WaitForMultipleObjects(events.len() as libc::DWORD,
events.as_ptr(),
libc::FALSE,
ms as libc::DWORD)
};
match ret {
WAIT_FAILED => Err(super::last_error()),
WAIT_TIMEOUT => unsafe {
let _ = c::CancelIo(handle);
Err(util::timeout("operation timed out"))
},
n => Ok((n - WAIT_OBJECT_0) as uint)
}
}

Expand Down Expand Up @@ -390,8 +397,8 @@ impl rtio::RtioPipe for UnixStream {
drop(guard);
loop {
// Process a timeout if one is pending
let succeeded = await(self.handle(), self.read_deadline,
&mut overlapped);
let wait_succeeded = await(self.handle(), self.read_deadline,
[overlapped.hEvent]);

let ret = unsafe {
libc::GetOverlappedResult(self.handle(),
Expand All @@ -408,7 +415,7 @@ impl rtio::RtioPipe for UnixStream {

// If the reading half is now closed, then we're done. If we woke up
// because the writing half was closed, keep trying.
if !succeeded {
if wait_succeeded.is_err() {
return Err(util::timeout("read timed out"))
}
if self.read_closed() {
Expand Down Expand Up @@ -458,8 +465,8 @@ impl rtio::RtioPipe for UnixStream {
})
}
// Process a timeout if one is pending
let succeeded = await(self.handle(), self.write_deadline,
&mut overlapped);
let wait_succeeded = await(self.handle(), self.write_deadline,
[overlapped.hEvent]);
let ret = unsafe {
libc::GetOverlappedResult(self.handle(),
&mut overlapped,
Expand All @@ -473,7 +480,7 @@ impl rtio::RtioPipe for UnixStream {
if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
return Err(super::last_error())
}
if !succeeded {
if !wait_succeeded.is_ok() {
let amt = offset + bytes_written as uint;
return if amt > 0 {
Err(IoError {
Expand Down Expand Up @@ -577,6 +584,10 @@ impl UnixListener {
listener: self,
event: try!(Event::new(true, false)),
deadline: 0,
inner: Arc::new(AcceptorState {
abort: try!(Event::new(true, false)),
closed: atomic::AtomicBool::new(false),
}),
})
}
}
Expand All @@ -597,11 +608,17 @@ impl rtio::RtioUnixListener for UnixListener {
}

pub struct UnixAcceptor {
inner: Arc<AcceptorState>,
listener: UnixListener,
event: Event,
deadline: u64,
}

struct AcceptorState {
abort: Event,
closed: atomic::AtomicBool,
}

impl UnixAcceptor {
pub fn native_accept(&mut self) -> IoResult<UnixStream> {
// This function has some funky implementation details when working with
Expand Down Expand Up @@ -638,6 +655,10 @@ impl UnixAcceptor {
// using the original server pipe.
let handle = self.listener.handle;

// If we've had an artifical call to close_accept, be sure to never
// proceed in accepting new clients in the future
if self.inner.closed.load(atomic::SeqCst) { return Err(util::eof()) }

let name = try!(to_utf16(&self.listener.name));

// Once we've got a "server handle", we need to wait for a client to
Expand All @@ -652,7 +673,9 @@ impl UnixAcceptor {

if err == libc::ERROR_IO_PENDING as libc::DWORD {
// Process a timeout if one is pending
let _ = await(handle, self.deadline, &mut overlapped);
let wait_succeeded = await(handle, self.deadline,
[self.inner.abort.handle(),
overlapped.hEvent]);

// This will block until the overlapped I/O is completed. The
// timeout was previously handled, so this will either block in
Expand All @@ -665,7 +688,11 @@ impl UnixAcceptor {
libc::TRUE)
};
if ret == 0 {
err = unsafe { libc::GetLastError() };
if wait_succeeded.is_ok() {
err = unsafe { libc::GetLastError() };
} else {
return Err(util::timeout("accept timed out"))
}
} else {
// we succeeded, bypass the check below
err = libc::ERROR_PIPE_CONNECTED as libc::DWORD;
Expand Down Expand Up @@ -711,11 +738,32 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
}

fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
fail!()
let name = to_utf16(&self.listener.name).ok().unwrap();
box UnixAcceptor {
inner: self.inner.clone(),
event: Event::new(true, false).ok().unwrap(),
deadline: 0,
listener: UnixListener {
name: self.listener.name.clone(),
handle: unsafe {
let p = pipe(name.as_ptr(), false) ;
assert!(p != libc::INVALID_HANDLE_VALUE as libc::HANDLE);
p
},
},
} as Box<rtio::RtioUnixAcceptor + Send>
}

fn close_accept(&mut self) -> IoResult<()> {
fail!()
self.inner.closed.store(true, atomic::SeqCst);
let ret = unsafe {
c::SetEvent(self.inner.abort.handle())
};
if ret == 0 {
Err(super::last_error())
} else {
Ok(())
}
}
}

3 changes: 3 additions & 0 deletions src/libnative/io/util.rs
Expand Up @@ -175,6 +175,9 @@ pub fn await(fds: &[net::sock_t], deadline: Option<u64>,
c::fd_set(&mut set, fd);
max = cmp::max(max, fd + 1);
}
if cfg!(windows) {
max = fds.len() as net::sock_t;
}

let (read, write) = match status {
Readable => (&mut set as *mut _, ptr::mut_null()),
Expand Down
2 changes: 1 addition & 1 deletion src/librustuv/net.rs
Expand Up @@ -387,7 +387,7 @@ impl rtio::RtioSocket for TcpListener {
}

impl rtio::RtioTcpListener for TcpListener {
fn listen(self: Box<TcpListener>)
fn listen(mut self: Box<TcpListener>)
-> Result<Box<rtio::RtioTcpAcceptor + Send>, IoError> {
let _m = self.fire_homing_missile();

Expand Down
2 changes: 1 addition & 1 deletion src/librustuv/pipe.rs
Expand Up @@ -245,7 +245,7 @@ impl PipeListener {
}

impl rtio::RtioUnixListener for PipeListener {
fn listen(self: Box<PipeListener>)
fn listen(mut self: Box<PipeListener>)
-> IoResult<Box<rtio::RtioUnixAcceptor + Send>> {
let _m = self.fire_homing_missile();

Expand Down
3 changes: 1 addition & 2 deletions src/libstd/io/net/tcp.rs
Expand Up @@ -461,8 +461,7 @@ impl TcpAcceptor {
///
/// ```
/// # #![allow(experimental)]
/// use std::io::TcpListener;
/// use std::io::{Listener, Acceptor, TimedOut};
/// use std::io::{TcpListener, Listener, Acceptor, EndOfFile};
///
/// let mut a = TcpListener::bind("127.0.0.1", 8482).listen().unwrap();
/// let a2 = a.clone();
Expand Down

5 comments on commit fd763a5

@bors
Copy link
Contributor

@bors bors commented on fd763a5 Aug 25, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saw approval from brson
at alexcrichton@fd763a5

@bors
Copy link
Contributor

@bors bors commented on fd763a5 Aug 25, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merging alexcrichton/rust/issue-15595 = fd763a5 into auto

@bors
Copy link
Contributor

@bors bors commented on fd763a5 Aug 25, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alexcrichton/rust/issue-15595 = fd763a5 merged ok, testing candidate = 83804f9

@bors
Copy link
Contributor

@bors bors commented on fd763a5 Aug 25, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fast-forwarding master to auto = 83804f9

Please sign in to comment.