From 31bcec648aa57391115f877a2ca022d7ff6415aa Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Fri, 8 Feb 2019 20:42:34 +0100 Subject: [PATCH] Add vectored read and write support This functionality has lived for a while in the tokio ecosystem, where it can improve performance by minimizing copies. --- src/libstd/io/buffered.rs | 33 ++++- src/libstd/io/cursor.rs | 217 +++++++++++++++++++++++++++++++- src/libstd/io/impls.rs | 59 ++++++++- src/libstd/io/mod.rs | 124 +++++++++++++++++- src/libstd/io/util.rs | 18 ++- src/libstd/lib.rs | 1 + src/libstd/net/tcp.rs | 69 +++++++++- src/libstd/sys/cloudabi/io.rs | 32 +++++ src/libstd/sys/cloudabi/mod.rs | 36 +++--- src/libstd/sys/redox/io.rs | 32 +++++ src/libstd/sys/redox/mod.rs | 11 +- src/libstd/sys/redox/net/tcp.rs | 16 ++- src/libstd/sys/sgx/io.rs | 32 +++++ src/libstd/sys/sgx/mod.rs | 52 ++++---- src/libstd/sys/sgx/net.rs | 16 +++ src/libstd/sys/unix/ext/net.rs | 37 +++++- src/libstd/sys/unix/fd.rs | 20 ++- src/libstd/sys/unix/io.rs | 61 +++++++++ src/libstd/sys/unix/l4re.rs | 18 ++- src/libstd/sys/unix/mod.rs | 9 +- src/libstd/sys/unix/net.rs | 10 +- src/libstd/sys/wasm/io.rs | 32 +++++ src/libstd/sys/wasm/mod.rs | 12 +- src/libstd/sys/wasm/net.rs | 10 +- src/libstd/sys/windows/c.rs | 25 ++++ src/libstd/sys/windows/io.rs | 63 ++++++++++ src/libstd/sys/windows/mod.rs | 27 ++-- src/libstd/sys/windows/net.rs | 43 ++++++- src/libstd/sys_common/net.rs | 10 +- 29 files changed, 1033 insertions(+), 92 deletions(-) create mode 100644 src/libstd/sys/cloudabi/io.rs create mode 100644 src/libstd/sys/redox/io.rs create mode 100644 src/libstd/sys/sgx/io.rs create mode 100644 src/libstd/sys/unix/io.rs create mode 100644 src/libstd/sys/wasm/io.rs create mode 100644 src/libstd/sys/windows/io.rs diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index 0615cd59db4eb..a5edc4360ca4b 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -5,7 +5,7 @@ use io::prelude::*; use cmp; use error; use fmt; -use io::{self, Initializer, DEFAULT_BUF_SIZE, Error, ErrorKind, SeekFrom}; +use io::{self, Initializer, DEFAULT_BUF_SIZE, Error, ErrorKind, SeekFrom, IoVec, IoVecMut}; use memchr; /// The `BufReader` struct adds buffering to any reader. @@ -235,6 +235,19 @@ impl Read for BufReader { Ok(nread) } + fn read_vectored(&mut self, bufs: &mut [IoVecMut<'_>]) -> io::Result { + let total_len = bufs.iter().map(|b| b.as_slice().len()).sum::(); + if self.pos == self.cap && total_len >= self.buf.len() { + return self.inner.read_vectored(bufs); + } + let nread = { + let mut rem = self.fill_buf()?; + rem.read_vectored(bufs)? + }; + self.consume(nread); + Ok(nread) + } + // we can't skip unconditionally because of the large buffer case in read. unsafe fn initializer(&self) -> Initializer { self.inner.initializer() @@ -577,9 +590,25 @@ impl Write for BufWriter { self.panicked = false; r } else { - Write::write(&mut self.buf, buf) + self.buf.write(buf) + } + } + + fn write_vectored(&mut self, bufs: &[IoVec<'_>]) -> io::Result { + let total_len = bufs.iter().map(|b| b.as_slice().len()).sum::(); + if self.buf.len() + total_len > self.buf.capacity() { + self.flush_buf()?; + } + if total_len >= self.buf.capacity() { + self.panicked = true; + let r = self.inner.as_mut().unwrap().write_vectored(bufs); + self.panicked = false; + r + } else { + self.buf.write_vectored(bufs) } } + fn flush(&mut self) -> io::Result<()> { self.flush_buf().and_then(|()| self.get_mut().flush()) } diff --git a/src/libstd/io/cursor.rs b/src/libstd/io/cursor.rs index b205f7888389f..ef636cc6f8c69 100644 --- a/src/libstd/io/cursor.rs +++ b/src/libstd/io/cursor.rs @@ -2,7 +2,7 @@ use io::prelude::*; use core::convert::TryInto; use cmp; -use io::{self, Initializer, SeekFrom, Error, ErrorKind}; +use io::{self, Initializer, SeekFrom, Error, ErrorKind, IoVec, IoVecMut}; /// A `Cursor` wraps an in-memory buffer and provides it with a /// [`Seek`] implementation. @@ -221,6 +221,19 @@ impl Read for Cursor where T: AsRef<[u8]> { Ok(n) } + fn read_vectored(&mut self, bufs: &mut [IoVecMut<'_>]) -> io::Result { + let mut nread = 0; + for buf in bufs { + let buf = buf.as_mut_slice(); + let n = self.read(buf)?; + nread += n; + if n < buf.len() { + break; + } + } + Ok(nread) + } + fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> { let n = buf.len(); Read::read_exact(&mut self.fill_buf()?, buf)?; @@ -251,6 +264,24 @@ fn slice_write(pos_mut: &mut u64, slice: &mut [u8], buf: &[u8]) -> io::Result], +) -> io::Result +{ + let mut nwritten = 0; + for buf in bufs { + let buf = buf.as_slice(); + let n = slice_write(pos_mut, slice, buf)?; + nwritten += n; + if n < buf.len() { + break; + } + } + Ok(nwritten) +} + // Resizing write implementation fn vec_write(pos_mut: &mut u64, vec: &mut Vec, buf: &[u8]) -> io::Result { let pos: usize = (*pos_mut).try_into().map_err(|_| { @@ -278,12 +309,31 @@ fn vec_write(pos_mut: &mut u64, vec: &mut Vec, buf: &[u8]) -> io::Result, + bufs: &[IoVec<'_>], +) -> io::Result +{ + let mut nwritten = 0; + for buf in bufs { + nwritten += vec_write(pos_mut, vec, buf.as_slice())?; + } + Ok(nwritten) +} + #[stable(feature = "rust1", since = "1.0.0")] impl<'a> Write for Cursor<&'a mut [u8]> { #[inline] fn write(&mut self, buf: &[u8]) -> io::Result { slice_write(&mut self.pos, self.inner, buf) } + + #[inline] + fn write_vectored(&mut self, bufs: &[IoVec<'_>]) -> io::Result { + slice_write_vectored(&mut self.pos, self.inner, bufs) + } + fn flush(&mut self) -> io::Result<()> { Ok(()) } } @@ -292,6 +342,11 @@ impl<'a> Write for Cursor<&'a mut Vec> { fn write(&mut self, buf: &[u8]) -> io::Result { vec_write(&mut self.pos, self.inner, buf) } + + fn write_vectored(&mut self, bufs: &[IoVec<'_>]) -> io::Result { + vec_write_vectored(&mut self.pos, self.inner, bufs) + } + fn flush(&mut self) -> io::Result<()> { Ok(()) } } @@ -300,6 +355,11 @@ impl Write for Cursor> { fn write(&mut self, buf: &[u8]) -> io::Result { vec_write(&mut self.pos, &mut self.inner, buf) } + + fn write_vectored(&mut self, bufs: &[IoVec<'_>]) -> io::Result { + vec_write_vectored(&mut self.pos, &mut self.inner, bufs) + } + fn flush(&mut self) -> io::Result<()> { Ok(()) } } @@ -309,13 +369,19 @@ impl Write for Cursor> { fn write(&mut self, buf: &[u8]) -> io::Result { slice_write(&mut self.pos, &mut self.inner, buf) } + + #[inline] + fn write_vectored(&mut self, bufs: &[IoVec<'_>]) -> io::Result { + slice_write_vectored(&mut self.pos, &mut self.inner, bufs) + } + fn flush(&mut self) -> io::Result<()> { Ok(()) } } #[cfg(test)] mod tests { use io::prelude::*; - use io::{Cursor, SeekFrom}; + use io::{Cursor, SeekFrom, IoVec, IoVecMut}; #[test] fn test_vec_writer() { @@ -323,7 +389,10 @@ mod tests { assert_eq!(writer.write(&[0]).unwrap(), 1); assert_eq!(writer.write(&[1, 2, 3]).unwrap(), 3); assert_eq!(writer.write(&[4, 5, 6, 7]).unwrap(), 4); - let b: &[_] = &[0, 1, 2, 3, 4, 5, 6, 7]; + assert_eq!(writer.write_vectored( + &[IoVec::new(&[]), IoVec::new(&[8, 9]), IoVec::new(&[10])], + ).unwrap(), 3); + let b: &[_] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; assert_eq!(writer, b); } @@ -333,7 +402,10 @@ mod tests { assert_eq!(writer.write(&[0]).unwrap(), 1); assert_eq!(writer.write(&[1, 2, 3]).unwrap(), 3); assert_eq!(writer.write(&[4, 5, 6, 7]).unwrap(), 4); - let b: &[_] = &[0, 1, 2, 3, 4, 5, 6, 7]; + assert_eq!(writer.write_vectored( + &[IoVec::new(&[]), IoVec::new(&[8, 9]), IoVec::new(&[10])], + ).unwrap(), 3); + let b: &[_] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; assert_eq!(&writer.get_ref()[..], b); } @@ -344,7 +416,10 @@ mod tests { assert_eq!(writer.write(&[0]).unwrap(), 1); assert_eq!(writer.write(&[1, 2, 3]).unwrap(), 3); assert_eq!(writer.write(&[4, 5, 6, 7]).unwrap(), 4); - let b: &[_] = &[0, 1, 2, 3, 4, 5, 6, 7]; + assert_eq!(writer.write_vectored( + &[IoVec::new(&[]), IoVec::new(&[8, 9]), IoVec::new(&[10])], + ).unwrap(), 3); + let b: &[_] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; assert_eq!(&writer.get_ref()[..], b); } @@ -366,6 +441,26 @@ mod tests { assert_eq!(&**writer.get_ref(), b); } + #[test] + fn test_box_slice_writer_vectored() { + let mut writer = Cursor::new(vec![0u8; 9].into_boxed_slice()); + assert_eq!(writer.position(), 0); + assert_eq!(writer.write_vectored(&[IoVec::new(&[0])]).unwrap(), 1); + assert_eq!(writer.position(), 1); + assert_eq!( + writer.write_vectored(&[IoVec::new(&[1, 2, 3]), IoVec::new(&[4, 5, 6, 7])]).unwrap(), + 7, + ); + assert_eq!(writer.position(), 8); + assert_eq!(writer.write_vectored(&[]).unwrap(), 0); + assert_eq!(writer.position(), 8); + + assert_eq!(writer.write_vectored(&[IoVec::new(&[8, 9])]).unwrap(), 1); + assert_eq!(writer.write_vectored(&[IoVec::new(&[10])]).unwrap(), 0); + let b: &[_] = &[0, 1, 2, 3, 4, 5, 6, 7, 8]; + assert_eq!(&**writer.get_ref(), b); + } + #[test] fn test_buf_writer() { let mut buf = [0 as u8; 9]; @@ -387,6 +482,31 @@ mod tests { assert_eq!(buf, b); } + #[test] + fn test_buf_writer_vectored() { + let mut buf = [0 as u8; 9]; + { + let mut writer = Cursor::new(&mut buf[..]); + assert_eq!(writer.position(), 0); + assert_eq!(writer.write_vectored(&[IoVec::new(&[0])]).unwrap(), 1); + assert_eq!(writer.position(), 1); + assert_eq!( + writer.write_vectored( + &[IoVec::new(&[1, 2, 3]), IoVec::new(&[4, 5, 6, 7])], + ).unwrap(), + 7, + ); + assert_eq!(writer.position(), 8); + assert_eq!(writer.write_vectored(&[]).unwrap(), 0); + assert_eq!(writer.position(), 8); + + assert_eq!(writer.write_vectored(&[IoVec::new(&[8, 9])]).unwrap(), 1); + assert_eq!(writer.write_vectored(&[IoVec::new(&[10])]).unwrap(), 0); + } + let b: &[_] = &[0, 1, 2, 3, 4, 5, 6, 7, 8]; + assert_eq!(buf, b); + } + #[test] fn test_buf_writer_seek() { let mut buf = [0 as u8; 8]; @@ -447,6 +567,35 @@ mod tests { assert_eq!(reader.read(&mut buf).unwrap(), 0); } + #[test] + fn test_mem_reader_vectored() { + let mut reader = Cursor::new(vec![0, 1, 2, 3, 4, 5, 6, 7]); + let mut buf = []; + assert_eq!(reader.read_vectored(&mut [IoVecMut::new(&mut buf)]).unwrap(), 0); + assert_eq!(reader.position(), 0); + let mut buf = [0]; + assert_eq!( + reader.read_vectored(&mut [IoVecMut::new(&mut []), IoVecMut::new(&mut buf)]).unwrap(), + 1, + ); + assert_eq!(reader.position(), 1); + let b: &[_] = &[0]; + assert_eq!(buf, b); + let mut buf1 = [0; 4]; + let mut buf2 = [0; 4]; + assert_eq!( + reader.read_vectored( + &mut [IoVecMut::new(&mut buf1), IoVecMut::new(&mut buf2)], + ).unwrap(), + 7, + ); + let b1: &[_] = &[1, 2, 3, 4]; + let b2: &[_] = &[5, 6, 7]; + assert_eq!(buf1, b1); + assert_eq!(&buf2[..3], b2); + assert_eq!(reader.read(&mut buf).unwrap(), 0); + } + #[test] fn test_boxed_slice_reader() { let mut reader = Cursor::new(vec![0, 1, 2, 3, 4, 5, 6, 7].into_boxed_slice()); @@ -469,6 +618,35 @@ mod tests { assert_eq!(reader.read(&mut buf).unwrap(), 0); } + #[test] + fn test_boxed_slice_reader_vectored() { + let mut reader = Cursor::new(vec![0, 1, 2, 3, 4, 5, 6, 7].into_boxed_slice()); + let mut buf = []; + assert_eq!(reader.read_vectored(&mut [IoVecMut::new(&mut buf)]).unwrap(), 0); + assert_eq!(reader.position(), 0); + let mut buf = [0]; + assert_eq!( + reader.read_vectored(&mut [IoVecMut::new(&mut []), IoVecMut::new(&mut buf)]).unwrap(), + 1, + ); + assert_eq!(reader.position(), 1); + let b: &[_] = &[0]; + assert_eq!(buf, b); + let mut buf1 = [0; 4]; + let mut buf2 = [0; 4]; + assert_eq!( + reader.read_vectored( + &mut [IoVecMut::new(&mut buf1), IoVecMut::new(&mut buf2)], + ).unwrap(), + 7, + ); + let b1: &[_] = &[1, 2, 3, 4]; + let b2: &[_] = &[5, 6, 7]; + assert_eq!(buf1, b1); + assert_eq!(&buf2[..3], b2); + assert_eq!(reader.read(&mut buf).unwrap(), 0); + } + #[test] fn read_to_end() { let mut reader = Cursor::new(vec![0, 1, 2, 3, 4, 5, 6, 7]); @@ -499,6 +677,35 @@ mod tests { assert_eq!(reader.read(&mut buf).unwrap(), 0); } + #[test] + fn test_slice_reader_vectored() { + let in_buf = vec![0, 1, 2, 3, 4, 5, 6, 7]; + let reader = &mut &in_buf[..]; + let mut buf = []; + assert_eq!(reader.read_vectored(&mut [IoVecMut::new(&mut buf)]).unwrap(), 0); + let mut buf = [0]; + assert_eq!( + reader.read_vectored(&mut [IoVecMut::new(&mut []), IoVecMut::new(&mut buf)]).unwrap(), + 1, + ); + assert_eq!(reader.len(), 7); + let b: &[_] = &[0]; + assert_eq!(buf, b); + let mut buf1 = [0; 4]; + let mut buf2 = [0; 4]; + assert_eq!( + reader.read_vectored( + &mut [IoVecMut::new(&mut buf1), IoVecMut::new(&mut buf2)], + ).unwrap(), + 7, + ); + let b1: &[_] = &[1, 2, 3, 4]; + let b2: &[_] = &[5, 6, 7]; + assert_eq!(buf1, b1); + assert_eq!(&buf2[..3], b2); + assert_eq!(reader.read(&mut buf).unwrap(), 0); + } + #[test] fn test_read_exact() { let in_buf = vec![0, 1, 2, 3, 4, 5, 6, 7]; diff --git a/src/libstd/io/impls.rs b/src/libstd/io/impls.rs index ec75a87aec34f..4bba4af77c482 100644 --- a/src/libstd/io/impls.rs +++ b/src/libstd/io/impls.rs @@ -1,5 +1,6 @@ use cmp; -use io::{self, SeekFrom, Read, Initializer, Write, Seek, BufRead, Error, ErrorKind}; +use io::{self, SeekFrom, Read, Initializer, Write, Seek, BufRead, Error, ErrorKind, IoVecMut, + IoVec}; use fmt; use mem; @@ -13,6 +14,11 @@ impl<'a, R: Read + ?Sized> Read for &'a mut R { (**self).read(buf) } + #[inline] + fn read_vectored(&mut self, bufs: &mut [IoVecMut<'_>]) -> io::Result { + (**self).read_vectored(bufs) + } + #[inline] unsafe fn initializer(&self) -> Initializer { (**self).initializer() @@ -38,6 +44,11 @@ impl<'a, W: Write + ?Sized> Write for &'a mut W { #[inline] fn write(&mut self, buf: &[u8]) -> io::Result { (**self).write(buf) } + #[inline] + fn write_vectored(&mut self, bufs: &[IoVec<'_>]) -> io::Result { + (**self).write_vectored(bufs) + } + #[inline] fn flush(&mut self) -> io::Result<()> { (**self).flush() } @@ -82,6 +93,11 @@ impl Read for Box { (**self).read(buf) } + #[inline] + fn read_vectored(&mut self, bufs: &mut [IoVecMut<'_>]) -> io::Result { + (**self).read_vectored(bufs) + } + #[inline] unsafe fn initializer(&self) -> Initializer { (**self).initializer() @@ -107,6 +123,11 @@ impl Write for Box { #[inline] fn write(&mut self, buf: &[u8]) -> io::Result { (**self).write(buf) } + #[inline] + fn write_vectored(&mut self, bufs: &[IoVec<'_>]) -> io::Result { + (**self).write_vectored(bufs) + } + #[inline] fn flush(&mut self) -> io::Result<()> { (**self).flush() } @@ -171,6 +192,19 @@ impl<'a> Read for &'a [u8] { Ok(amt) } + #[inline] + fn read_vectored(&mut self, bufs: &mut [IoVecMut<'_>]) -> io::Result { + let mut nread = 0; + for buf in bufs { + nread += self.read(buf.as_mut_slice())?; + if self.is_empty() { + break; + } + } + + Ok(nread) + } + #[inline] unsafe fn initializer(&self) -> Initializer { Initializer::nop() @@ -231,6 +265,19 @@ impl<'a> Write for &'a mut [u8] { Ok(amt) } + #[inline] + fn write_vectored(&mut self, bufs: &[IoVec<'_>]) -> io::Result { + let mut nwritten = 0; + for buf in bufs { + nwritten += self.write(buf.as_slice())?; + if self.is_empty() { + break; + } + } + + Ok(nwritten) + } + #[inline] fn write_all(&mut self, data: &[u8]) -> io::Result<()> { if self.write(data)? == data.len() { @@ -254,6 +301,16 @@ impl Write for Vec { Ok(buf.len()) } + #[inline] + fn write_vectored(&mut self, bufs: &[IoVec<'_>]) -> io::Result { + let len = bufs.iter().map(|b| b.as_slice().len()).sum(); + self.reserve(len); + for buf in bufs { + self.extend_from_slice(buf.as_slice()); + } + Ok(len) + } + #[inline] fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { self.extend_from_slice(buf); diff --git a/src/libstd/io/mod.rs b/src/libstd/io/mod.rs index c0570ae60a19c..b9765605f8e3b 100644 --- a/src/libstd/io/mod.rs +++ b/src/libstd/io/mod.rs @@ -265,6 +265,7 @@ use slice; use str; use memchr; use ptr; +use sys; #[stable(feature = "rust1", since = "1.0.0")] pub use self::buffered::{BufReader, BufWriter, LineWriter}; @@ -520,6 +521,22 @@ pub trait Read { #[stable(feature = "rust1", since = "1.0.0")] fn read(&mut self, buf: &mut [u8]) -> Result; + /// Like `read`, except that it reads into a slice of buffers. + /// + /// Data is copied to fill each buffer in order, with the final buffer + /// written to possibly being only partially filled. This method must behave + /// as a single call to `read` with the buffers concatenated would. + /// + /// The default implementation simply passes the first nonempty buffer to + /// `read`. + #[unstable(feature = "iovec", issue = "0")] + fn read_vectored(&mut self, bufs: &mut [IoVecMut<'_>]) -> Result { + match bufs.iter_mut().map(|b| b.as_mut_slice()).find(|b| !b.is_empty()) { + Some(buf) => self.read(buf), + None => Ok(0), + } + } + /// Determines if this `Read`er can work with buffers of uninitialized /// memory. /// @@ -867,6 +884,85 @@ pub trait Read { } } +/// A buffer type used with `Read::read_vectored`. +/// +/// It is semantically a wrapper around an `&mut [u8]`, but is guaranteed to be +/// ABI compatible with the `iovec` type on Unix platforms and `WSABUF` on +/// Windows. +#[unstable(feature = "iovec", issue = "0")] +#[repr(transparent)] +pub struct IoVecMut<'a>(sys::io::IoVecMut<'a>); + +#[unstable(feature = "iovec", issue = "0")] +impl<'a> fmt::Debug for IoVecMut<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(self.as_slice(), fmt) + } +} + +impl<'a> IoVecMut<'a> { + /// Creates a new `IoVecMut` wrapping a byte slice. + /// + /// # Panics + /// + /// Panics on Windows if the slice is larger than 4GB. + #[unstable(feature = "iovec", issue = "0")] + #[inline] + pub fn new(buf: &'a mut [u8]) -> IoVecMut<'a> { + IoVecMut(sys::io::IoVecMut::new(buf)) + } + + /// Returns a shared reference to the inner slice. + #[unstable(feature = "iovec", issue = "0")] + #[inline] + pub fn as_slice(&self) -> &'a [u8] { + self.0.as_slice() + } + + /// Returns a mutable reference to the inner slice. + #[unstable(feature = "iovec", issue = "0")] + #[inline] + pub fn as_mut_slice(&mut self) -> &'a mut [u8] { + self.0.as_mut_slice() + } +} + +/// A buffer type used with `Write::write_vectored`. +/// +/// It is semantically a wrapper around an `&[u8]`, but is guaranteed to be +/// ABI compatible with the `iovec` type on Unix platforms and `WSABUF` on +/// Windows. +#[unstable(feature = "iovec", issue = "0")] +#[repr(transparent)] +pub struct IoVec<'a>(sys::io::IoVec<'a>); + +#[unstable(feature = "iovec", issue = "0")] +impl<'a> fmt::Debug for IoVec<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(self.as_slice(), fmt) + } +} + +impl<'a> IoVec<'a> { + /// Creates a new `IoVec` wrapping a byte slice. + /// + /// # Panics + /// + /// Panics on Windows if the slice is larger than 4GB. + #[unstable(feature = "iovec", issue = "0")] + #[inline] + pub fn new(buf: &'a [u8]) -> IoVec<'a> { + IoVec(sys::io::IoVec::new(buf)) + } + + /// Returns a shared reference to the inner slice. + #[unstable(feature = "iovec", issue = "0")] + #[inline] + pub fn as_slice(&self) -> &'a [u8] { + self.0.as_slice() + } +} + /// A type used to conditionally initialize buffers passed to `Read` methods. #[unstable(feature = "read_initializer", issue = "42788")] #[derive(Debug)] @@ -997,6 +1093,22 @@ pub trait Write { #[stable(feature = "rust1", since = "1.0.0")] fn write(&mut self, buf: &[u8]) -> Result; + /// Like `write`, except that it writes from a slice of buffers. + /// + /// Data is copied to from each buffer in order, with the final buffer + /// read from possibly being only partially consumed. This method must + /// behave as a call to `write` with the buffers concatenated would. + /// + /// The default implementation simply passes the first nonempty buffer to + /// `write`. + #[unstable(feature = "iovec", issue = "0")] + fn write_vectored(&mut self, bufs: &[IoVec<'_>]) -> Result { + match bufs.iter().map(|b| b.as_slice()).find(|b| !b.is_empty()) { + Some(buf) => self.write(buf), + None => Ok(0), + } + } + /// Flush this output stream, ensuring that all intermediately buffered /// contents reach their destination. /// @@ -1691,13 +1803,23 @@ impl Read for Chain { fn read(&mut self, buf: &mut [u8]) -> Result { if !self.done_first { match self.first.read(buf)? { - 0 if buf.len() != 0 => { self.done_first = true; } + 0 if buf.len() != 0 => self.done_first = true, n => return Ok(n), } } self.second.read(buf) } + fn read_vectored(&mut self, bufs: &mut [IoVecMut<'_>]) -> Result { + if !self.done_first { + match self.first.read_vectored(bufs)? { + 0 if bufs.iter().any(|b| !b.as_slice().is_empty()) => self.done_first = true, + n => return Ok(n), + } + } + self.second.read_vectored(bufs) + } + unsafe fn initializer(&self) -> Initializer { let initializer = self.first.initializer(); if initializer.should_initialize() { diff --git a/src/libstd/io/util.rs b/src/libstd/io/util.rs index 8df961a9add6b..6743018793fb6 100644 --- a/src/libstd/io/util.rs +++ b/src/libstd/io/util.rs @@ -1,7 +1,7 @@ #![allow(missing_copy_implementations)] use fmt; -use io::{self, Read, Initializer, Write, ErrorKind, BufRead}; +use io::{self, Read, Initializer, Write, ErrorKind, BufRead, IoVec, IoVecMut}; use mem; /// Copies the entire contents of a reader into a writer. @@ -152,6 +152,15 @@ impl Read for Repeat { Ok(buf.len()) } + #[inline] + fn read_vectored(&mut self, bufs: &mut [IoVecMut<'_>]) -> io::Result { + let mut nwritten = 0; + for buf in bufs { + nwritten += self.read(buf.as_mut_slice())?; + } + Ok(nwritten) + } + #[inline] unsafe fn initializer(&self) -> Initializer { Initializer::nop() @@ -195,6 +204,13 @@ pub fn sink() -> Sink { Sink { _priv: () } } impl Write for Sink { #[inline] fn write(&mut self, buf: &[u8]) -> io::Result { Ok(buf.len()) } + + #[inline] + fn write_vectored(&mut self, bufs: &[IoVec<'_>]) -> io::Result { + let total_len = bufs.iter().map(|b| b.as_slice().len()).sum(); + Ok(total_len) + } + #[inline] fn flush(&mut self) -> io::Result<()> { Ok(()) } } diff --git a/src/libstd/lib.rs b/src/libstd/lib.rs index e57cb2ce5fd15..ebfe0b0de9ec9 100644 --- a/src/libstd/lib.rs +++ b/src/libstd/lib.rs @@ -228,6 +228,7 @@ #![feature(arbitrary_self_types)] #![feature(array_error_internals)] #![feature(asm)] +#![feature(bind_by_move_pattern_guards)] #![feature(box_syntax)] #![feature(c_variadic)] #![feature(cfg_target_has_atomic)] diff --git a/src/libstd/net/tcp.rs b/src/libstd/net/tcp.rs index c4b0cd0f17c34..d8a9d80cdbf78 100644 --- a/src/libstd/net/tcp.rs +++ b/src/libstd/net/tcp.rs @@ -1,7 +1,7 @@ use io::prelude::*; use fmt; -use io::{self, Initializer}; +use io::{self, Initializer, IoVec, IoVecMut}; use net::{ToSocketAddrs, SocketAddr, Shutdown}; use sys_common::net as net_imp; use sys_common::{AsInner, FromInner, IntoInner}; @@ -569,6 +569,10 @@ impl TcpStream { impl Read for TcpStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } + fn read_vectored(&mut self, bufs: &mut [IoVecMut<'_>]) -> io::Result { + self.0.read_vectored(bufs) + } + #[inline] unsafe fn initializer(&self) -> Initializer { Initializer::nop() @@ -577,12 +581,21 @@ impl Read for TcpStream { #[stable(feature = "rust1", since = "1.0.0")] impl Write for TcpStream { fn write(&mut self, buf: &[u8]) -> io::Result { self.0.write(buf) } + + fn write_vectored(&mut self, bufs: &[IoVec<'_>]) -> io::Result { + self.0.write_vectored(bufs) + } + fn flush(&mut self) -> io::Result<()> { Ok(()) } } #[stable(feature = "rust1", since = "1.0.0")] impl<'a> Read for &'a TcpStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } + fn read_vectored(&mut self, bufs: &mut [IoVecMut<'_>]) -> io::Result { + self.0.read_vectored(bufs) + } + #[inline] unsafe fn initializer(&self) -> Initializer { Initializer::nop() @@ -591,6 +604,11 @@ impl<'a> Read for &'a TcpStream { #[stable(feature = "rust1", since = "1.0.0")] impl<'a> Write for &'a TcpStream { fn write(&mut self, buf: &[u8]) -> io::Result { self.0.write(buf) } + + fn write_vectored(&mut self, bufs: &[IoVec<'_>]) -> io::Result { + self.0.write_vectored(bufs) + } + fn flush(&mut self) -> io::Result<()> { Ok(()) } } @@ -911,7 +929,7 @@ impl fmt::Debug for TcpListener { #[cfg(all(test, not(any(target_os = "cloudabi", target_os = "emscripten"))))] mod tests { - use io::ErrorKind; + use io::{ErrorKind, IoVec, IoVecMut}; use io::prelude::*; use net::*; use net::test::{next_test_ip4, next_test_ip6}; @@ -1184,6 +1202,53 @@ mod tests { }) } + #[test] + fn read_vectored() { + each_ip(&mut |addr| { + let srv = t!(TcpListener::bind(&addr)); + let mut s1 = t!(TcpStream::connect(&addr)); + let mut s2 = t!(srv.accept()).0; + + let len = s1.write(&[10, 11, 12]).unwrap(); + assert_eq!(len, 3); + + let mut a = []; + let mut b = [0]; + let mut c = [0; 3]; + let len = t!(s2.read_vectored( + &mut [IoVecMut::new(&mut a), IoVecMut::new(&mut b), IoVecMut::new(&mut c)], + )); + assert!(len > 0); + assert_eq!(b, [10]); + // some implementations don't support readv, so we may only fill the first buffer + assert!(len == 1 || c == [11, 12, 0]); + }) + } + + #[test] + fn write_vectored() { + each_ip(&mut |addr| { + let srv = t!(TcpListener::bind(&addr)); + let mut s1 = t!(TcpStream::connect(&addr)); + let mut s2 = t!(srv.accept()).0; + + let a = []; + let b = [10]; + let c = [11, 12]; + t!(s1.write_vectored(&[IoVec::new(&a), IoVec::new(&b), IoVec::new(&c)])); + + let mut buf = [0; 4]; + let len = t!(s2.read(&mut buf)); + // some implementations don't support writev, so we may only write the first buffer + if len == 1 { + assert_eq!(buf, [10, 0, 0, 0]); + } else { + assert_eq!(len, 3); + assert_eq!(buf, [10, 11, 12, 0]); + } + }) + } + #[test] fn double_bind() { each_ip(&mut |addr| { diff --git a/src/libstd/sys/cloudabi/io.rs b/src/libstd/sys/cloudabi/io.rs new file mode 100644 index 0000000000000..9ee5788c58009 --- /dev/null +++ b/src/libstd/sys/cloudabi/io.rs @@ -0,0 +1,32 @@ +pub struct IoVec<'a>(&'a [u8]); + +impl<'a> IoVec<'a> { + #[inline] + pub fn new(buf: &'a [u8]) -> IoVec<'a> { + IoVec(buf) + } + + #[inline] + pub fn as_slice(&self) -> &'a [u8] { + self.0 + } +} + +pub struct IoVecMut<'a>(&'a mut [u8]); + +impl<'a> IoVecMut<'a> { + #[inline] + pub fn new(buf: &'a mut [u8]) -> IoVecMut<'a> { + IoVecMut(buf) + } + + #[inline] + pub fn as_slice(&self) -> &'a [u8] { + self.0 + } + + #[inline] + pub fn as_mut_slice(&mut self) -> &'a mut [u8] { + self.0 + } +} diff --git a/src/libstd/sys/cloudabi/mod.rs b/src/libstd/sys/cloudabi/mod.rs index cd621b769456a..d9bc21861c90d 100644 --- a/src/libstd/sys/cloudabi/mod.rs +++ b/src/libstd/sys/cloudabi/mod.rs @@ -1,4 +1,3 @@ -use io; use libc; use mem; @@ -10,6 +9,7 @@ pub mod backtrace; #[path = "../unix/cmath.rs"] pub mod cmath; pub mod condvar; +pub mod io; #[path = "../unix/memchr.rs"] pub mod memchr; pub mod mutex; @@ -32,24 +32,24 @@ pub use self::shims::*; #[allow(dead_code)] pub fn init() {} -pub fn decode_error_kind(errno: i32) -> io::ErrorKind { +pub fn decode_error_kind(errno: i32) -> ::io::ErrorKind { match errno { - x if x == abi::errno::ACCES as i32 => io::ErrorKind::PermissionDenied, - x if x == abi::errno::ADDRINUSE as i32 => io::ErrorKind::AddrInUse, - x if x == abi::errno::ADDRNOTAVAIL as i32 => io::ErrorKind::AddrNotAvailable, - x if x == abi::errno::AGAIN as i32 => io::ErrorKind::WouldBlock, - x if x == abi::errno::CONNABORTED as i32 => io::ErrorKind::ConnectionAborted, - x if x == abi::errno::CONNREFUSED as i32 => io::ErrorKind::ConnectionRefused, - x if x == abi::errno::CONNRESET as i32 => io::ErrorKind::ConnectionReset, - x if x == abi::errno::EXIST as i32 => io::ErrorKind::AlreadyExists, - x if x == abi::errno::INTR as i32 => io::ErrorKind::Interrupted, - x if x == abi::errno::INVAL as i32 => io::ErrorKind::InvalidInput, - x if x == abi::errno::NOENT as i32 => io::ErrorKind::NotFound, - x if x == abi::errno::NOTCONN as i32 => io::ErrorKind::NotConnected, - x if x == abi::errno::PERM as i32 => io::ErrorKind::PermissionDenied, - x if x == abi::errno::PIPE as i32 => io::ErrorKind::BrokenPipe, - x if x == abi::errno::TIMEDOUT as i32 => io::ErrorKind::TimedOut, - _ => io::ErrorKind::Other, + x if x == abi::errno::ACCES as i32 => ::io::ErrorKind::PermissionDenied, + x if x == abi::errno::ADDRINUSE as i32 => ::io::ErrorKind::AddrInUse, + x if x == abi::errno::ADDRNOTAVAIL as i32 => ::io::ErrorKind::AddrNotAvailable, + x if x == abi::errno::AGAIN as i32 => ::io::ErrorKind::WouldBlock, + x if x == abi::errno::CONNABORTED as i32 => ::io::ErrorKind::ConnectionAborted, + x if x == abi::errno::CONNREFUSED as i32 => ::io::ErrorKind::ConnectionRefused, + x if x == abi::errno::CONNRESET as i32 => ::io::ErrorKind::ConnectionReset, + x if x == abi::errno::EXIST as i32 => ::io::ErrorKind::AlreadyExists, + x if x == abi::errno::INTR as i32 => ::io::ErrorKind::Interrupted, + x if x == abi::errno::INVAL as i32 => ::io::ErrorKind::InvalidInput, + x if x == abi::errno::NOENT as i32 => ::io::ErrorKind::NotFound, + x if x == abi::errno::NOTCONN as i32 => ::io::ErrorKind::NotConnected, + x if x == abi::errno::PERM as i32 => ::io::ErrorKind::PermissionDenied, + x if x == abi::errno::PIPE as i32 => ::io::ErrorKind::BrokenPipe, + x if x == abi::errno::TIMEDOUT as i32 => ::io::ErrorKind::TimedOut, + _ => ::io::ErrorKind::Other, } } diff --git a/src/libstd/sys/redox/io.rs b/src/libstd/sys/redox/io.rs new file mode 100644 index 0000000000000..9ee5788c58009 --- /dev/null +++ b/src/libstd/sys/redox/io.rs @@ -0,0 +1,32 @@ +pub struct IoVec<'a>(&'a [u8]); + +impl<'a> IoVec<'a> { + #[inline] + pub fn new(buf: &'a [u8]) -> IoVec<'a> { + IoVec(buf) + } + + #[inline] + pub fn as_slice(&self) -> &'a [u8] { + self.0 + } +} + +pub struct IoVecMut<'a>(&'a mut [u8]); + +impl<'a> IoVecMut<'a> { + #[inline] + pub fn new(buf: &'a mut [u8]) -> IoVecMut<'a> { + IoVecMut(buf) + } + + #[inline] + pub fn as_slice(&self) -> &'a [u8] { + self.0 + } + + #[inline] + pub fn as_mut_slice(&mut self) -> &'a mut [u8] { + self.0 + } +} diff --git a/src/libstd/sys/redox/mod.rs b/src/libstd/sys/redox/mod.rs index c106db8ddfaf5..c3878349bb329 100644 --- a/src/libstd/sys/redox/mod.rs +++ b/src/libstd/sys/redox/mod.rs @@ -1,6 +1,6 @@ #![allow(dead_code, missing_docs, nonstandard_style)] -use io::{self, ErrorKind}; +use ::io::{ErrorKind}; pub use libc::strlen; pub use self::rand::hashmap_random_keys; @@ -17,6 +17,7 @@ pub mod ext; pub mod fast_thread_local; pub mod fd; pub mod fs; +pub mod io; pub mod memchr; pub mod mutex; pub mod net; @@ -63,8 +64,8 @@ pub fn decode_error_kind(errno: i32) -> ErrorKind { } } -pub fn cvt(result: Result) -> io::Result { - result.map_err(|err| io::Error::from_raw_os_error(err.errno)) +pub fn cvt(result: Result) -> ::io::Result { + result.map_err(|err| ::io::Error::from_raw_os_error(err.errno)) } #[doc(hidden)] @@ -82,9 +83,9 @@ macro_rules! impl_is_minus_one { impl_is_minus_one! { i8 i16 i32 i64 isize } -pub fn cvt_libc(t: T) -> io::Result { +pub fn cvt_libc(t: T) -> ::io::Result { if t.is_minus_one() { - Err(io::Error::last_os_error()) + Err(::io::Error::last_os_error()) } else { Ok(t) } diff --git a/src/libstd/sys/redox/net/tcp.rs b/src/libstd/sys/redox/net/tcp.rs index e0353b130bb48..08e12dc1ab100 100644 --- a/src/libstd/sys/redox/net/tcp.rs +++ b/src/libstd/sys/redox/net/tcp.rs @@ -1,5 +1,5 @@ use cmp; -use io::{self, Error, ErrorKind, Result}; +use io::{self, Error, ErrorKind, Result, IoVec, IoVecMut}; use mem; use net::{SocketAddr, Shutdown}; use path::Path; @@ -34,10 +34,24 @@ impl TcpStream { self.0.read(buf) } + pub fn read_vectored(&self, buf: &mut [IoVecMut<'_>]) -> io::Result { + match buf.iter_mut().map(|b| b.as_mut_slice()).find(|b| !b.is_empty()) { + Some(buf) => self.read(buf), + None => Ok(0), + } + } + pub fn write(&self, buf: &[u8]) -> Result { self.0.write(buf) } + pub fn write_vectored(&self, bufs: &[IoVec<'_>]) -> io::Result { + match buf.iter().map(|b| b.as_slice()).find(|b| !b.is_empty()) { + Some(buf) => self.write(buf), + None => Ok(0), + } + } + pub fn take_error(&self) -> Result> { Ok(None) } diff --git a/src/libstd/sys/sgx/io.rs b/src/libstd/sys/sgx/io.rs new file mode 100644 index 0000000000000..9ee5788c58009 --- /dev/null +++ b/src/libstd/sys/sgx/io.rs @@ -0,0 +1,32 @@ +pub struct IoVec<'a>(&'a [u8]); + +impl<'a> IoVec<'a> { + #[inline] + pub fn new(buf: &'a [u8]) -> IoVec<'a> { + IoVec(buf) + } + + #[inline] + pub fn as_slice(&self) -> &'a [u8] { + self.0 + } +} + +pub struct IoVecMut<'a>(&'a mut [u8]); + +impl<'a> IoVecMut<'a> { + #[inline] + pub fn new(buf: &'a mut [u8]) -> IoVecMut<'a> { + IoVecMut(buf) + } + + #[inline] + pub fn as_slice(&self) -> &'a [u8] { + self.0 + } + + #[inline] + pub fn as_mut_slice(&mut self) -> &'a mut [u8] { + self.0 + } +} diff --git a/src/libstd/sys/sgx/mod.rs b/src/libstd/sys/sgx/mod.rs index 4225ecbb20651..403dd61187fc3 100644 --- a/src/libstd/sys/sgx/mod.rs +++ b/src/libstd/sys/sgx/mod.rs @@ -3,7 +3,6 @@ //! This module contains the facade (aka platform-specific) implementations of //! OS level functionality for Fortanix SGX. -use io; use os::raw::c_char; use sync::atomic::{AtomicBool, Ordering}; @@ -20,6 +19,7 @@ pub mod env; pub mod ext; pub mod fd; pub mod fs; +pub mod io; pub mod memchr; pub mod mutex; pub mod net; @@ -41,12 +41,12 @@ pub fn init() { /// This function is used to implement functionality that simply doesn't exist. /// Programs relying on this functionality will need to deal with the error. -pub fn unsupported() -> io::Result { +pub fn unsupported() -> ::io::Result { Err(unsupported_err()) } -pub fn unsupported_err() -> io::Error { - io::Error::new(io::ErrorKind::Other, +pub fn unsupported_err() -> ::io::Error { + ::io::Error::new(::io::ErrorKind::Other, "operation not supported on SGX yet") } @@ -55,58 +55,58 @@ pub fn unsupported_err() -> io::Error { /// returned, the program might very well be able to function normally. This is /// what happens when `SGX_INEFFECTIVE_ERROR` is set to `true`. If it is /// `false`, the behavior is the same as `unsupported`. -pub fn sgx_ineffective(v: T) -> io::Result { +pub fn sgx_ineffective(v: T) -> ::io::Result { static SGX_INEFFECTIVE_ERROR: AtomicBool = AtomicBool::new(false); if SGX_INEFFECTIVE_ERROR.load(Ordering::Relaxed) { - Err(io::Error::new(io::ErrorKind::Other, + Err(::io::Error::new(::io::ErrorKind::Other, "operation can't be trusted to have any effect on SGX")) } else { Ok(v) } } -pub fn decode_error_kind(code: i32) -> io::ErrorKind { +pub fn decode_error_kind(code: i32) -> ::io::ErrorKind { use fortanix_sgx_abi::Error; // FIXME: not sure how to make sure all variants of Error are covered if code == Error::NotFound as _ { - io::ErrorKind::NotFound + ::io::ErrorKind::NotFound } else if code == Error::PermissionDenied as _ { - io::ErrorKind::PermissionDenied + ::io::ErrorKind::PermissionDenied } else if code == Error::ConnectionRefused as _ { - io::ErrorKind::ConnectionRefused + ::io::ErrorKind::ConnectionRefused } else if code == Error::ConnectionReset as _ { - io::ErrorKind::ConnectionReset + ::io::ErrorKind::ConnectionReset } else if code == Error::ConnectionAborted as _ { - io::ErrorKind::ConnectionAborted + ::io::ErrorKind::ConnectionAborted } else if code == Error::NotConnected as _ { - io::ErrorKind::NotConnected + ::io::ErrorKind::NotConnected } else if code == Error::AddrInUse as _ { - io::ErrorKind::AddrInUse + ::io::ErrorKind::AddrInUse } else if code == Error::AddrNotAvailable as _ { - io::ErrorKind::AddrNotAvailable + ::io::ErrorKind::AddrNotAvailable } else if code == Error::BrokenPipe as _ { - io::ErrorKind::BrokenPipe + ::io::ErrorKind::BrokenPipe } else if code == Error::AlreadyExists as _ { - io::ErrorKind::AlreadyExists + ::io::ErrorKind::AlreadyExists } else if code == Error::WouldBlock as _ { - io::ErrorKind::WouldBlock + ::io::ErrorKind::WouldBlock } else if code == Error::InvalidInput as _ { - io::ErrorKind::InvalidInput + ::io::ErrorKind::InvalidInput } else if code == Error::InvalidData as _ { - io::ErrorKind::InvalidData + ::io::ErrorKind::InvalidData } else if code == Error::TimedOut as _ { - io::ErrorKind::TimedOut + ::io::ErrorKind::TimedOut } else if code == Error::WriteZero as _ { - io::ErrorKind::WriteZero + ::io::ErrorKind::WriteZero } else if code == Error::Interrupted as _ { - io::ErrorKind::Interrupted + ::io::ErrorKind::Interrupted } else if code == Error::Other as _ { - io::ErrorKind::Other + ::io::ErrorKind::Other } else if code == Error::UnexpectedEof as _ { - io::ErrorKind::UnexpectedEof + ::io::ErrorKind::UnexpectedEof } else { - io::ErrorKind::Other + ::io::ErrorKind::Other } } diff --git a/src/libstd/sys/sgx/net.rs b/src/libstd/sys/sgx/net.rs index 6e86b06b28626..2ecae1d746f74 100644 --- a/src/libstd/sys/sgx/net.rs +++ b/src/libstd/sys/sgx/net.rs @@ -103,10 +103,26 @@ impl TcpStream { self.inner.inner.read(buf) } + pub fn read_vectored(&self, buf: &mut [IoVecMut<'_>]) -> io::Result { + let buf = match buf.get(0) { + Some(buf) => buf.as_mut_slice(), + None => return Ok(0), + }; + self.read(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { self.inner.inner.write(buf) } + pub fn write_vectored(&self, buf: &[IoVec<'_>]) -> io::Result { + let buf = match buf.get(0) { + Some(buf) => buf.as_slice(), + None => return Ok(0), + }; + self.read(buf) + } + pub fn peer_addr(&self) -> io::Result { addr_to_sockaddr(&self.peer_addr) } diff --git a/src/libstd/sys/unix/ext/net.rs b/src/libstd/sys/unix/ext/net.rs index acc064acfcd29..4b60ea654c1f1 100644 --- a/src/libstd/sys/unix/ext/net.rs +++ b/src/libstd/sys/unix/ext/net.rs @@ -18,7 +18,7 @@ mod libc { use ascii; use ffi::OsStr; use fmt; -use io::{self, Initializer}; +use io::{self, Initializer, IoVec, IoVecMut}; use mem; use net::{self, Shutdown}; use os::unix::ffi::OsStrExt; @@ -551,6 +551,10 @@ impl io::Read for UnixStream { io::Read::read(&mut &*self, buf) } + fn read_vectored(&mut self, bufs: &mut [IoVecMut<'_>]) -> io::Result { + io::Read::read_vectored(&mut &*self, bufs) + } + #[inline] unsafe fn initializer(&self) -> Initializer { Initializer::nop() @@ -563,6 +567,10 @@ impl<'a> io::Read for &'a UnixStream { self.0.read(buf) } + fn read_vectored(&mut self, bufs: &mut [IoVecMut<'_>]) -> io::Result { + self.0.read_vectored(bufs) + } + #[inline] unsafe fn initializer(&self) -> Initializer { Initializer::nop() @@ -575,6 +583,10 @@ impl io::Write for UnixStream { io::Write::write(&mut &*self, buf) } + fn write_vectored(&mut self, bufs: &[IoVec<'_>]) -> io::Result { + io::Write::write_vectored(&mut &*self, bufs) + } + fn flush(&mut self) -> io::Result<()> { io::Write::flush(&mut &*self) } @@ -586,6 +598,10 @@ impl<'a> io::Write for &'a UnixStream { self.0.write(buf) } + fn write_vectored(&mut self, bufs: &[IoVec<'_>]) -> io::Result { + self.0.write_vectored(bufs) + } + fn flush(&mut self) -> io::Result<()> { Ok(()) } @@ -1510,6 +1526,25 @@ mod test { thread.join().unwrap(); } + #[test] + fn vectored() { + let (mut s1, mut s2) = or_panic!(UnixStream::pair()); + + let len = or_panic!(s1.write_vectored( + &[IoVec::new(b"hello"), IoVec::new(b" "), IoVec::new(b"world!")], + )); + assert_eq!(len, 12); + + let mut buf1 = [0; 6]; + let mut buf2 = [0; 7]; + let len = or_panic!(s2.read_vectored( + &mut [IoVecMut::new(&mut buf1), IoVecMut::new(&mut buf2)], + )); + assert_eq!(len, 12); + assert_eq!(&buf1, b"hello "); + assert_eq!(&buf2, b"world!\0"); + } + #[test] fn pair() { let msg1 = b"hello"; diff --git a/src/libstd/sys/unix/fd.rs b/src/libstd/sys/unix/fd.rs index 2cbd9536f4da7..6946b7b5dfa48 100644 --- a/src/libstd/sys/unix/fd.rs +++ b/src/libstd/sys/unix/fd.rs @@ -1,7 +1,7 @@ #![unstable(reason = "not public", issue = "0", feature = "fd")] use cmp; -use io::{self, Read, Initializer}; +use io::{self, Read, Initializer, IoVec, IoVecMut}; use libc::{self, c_int, c_void, ssize_t}; use mem; use sync::atomic::{AtomicBool, Ordering}; @@ -52,6 +52,15 @@ impl FileDesc { Ok(ret as usize) } + pub fn read_vectored(&self, bufs: &mut [IoVecMut<'_>]) -> io::Result { + let ret = cvt(unsafe { + libc::readv(self.fd, + bufs.as_ptr() as *const libc::iovec, + cmp::min(bufs.len(), c_int::max_value() as usize) as c_int) + })?; + Ok(ret as usize) + } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { let mut me = self; (&mut me).read_to_end(buf) @@ -105,6 +114,15 @@ impl FileDesc { Ok(ret as usize) } + pub fn write_vectored(&self, bufs: &[IoVec<'_>]) -> io::Result { + let ret = cvt(unsafe { + libc::writev(self.fd, + bufs.as_ptr() as *const libc::iovec, + cmp::min(bufs.len(), c_int::max_value() as usize) as c_int) + })?; + Ok(ret as usize) + } + pub fn write_at(&self, buf: &[u8], offset: u64) -> io::Result { #[cfg(target_os = "android")] use super::android::cvt_pwrite64; diff --git a/src/libstd/sys/unix/io.rs b/src/libstd/sys/unix/io.rs new file mode 100644 index 0000000000000..69b2db82ea3ae --- /dev/null +++ b/src/libstd/sys/unix/io.rs @@ -0,0 +1,61 @@ +use marker::PhantomData; +use libc::{iovec, c_void}; +use slice; + +#[repr(transparent)] +pub struct IoVec<'a> { + vec: iovec, + _p: PhantomData<&'a [u8]>, +} + +impl<'a> IoVec<'a> { + #[inline] + pub fn new(buf: &'a [u8]) -> IoVec<'a> { + IoVec { + vec: iovec { + iov_base: buf.as_ptr() as *mut u8 as *mut c_void, + iov_len: buf.len() + }, + _p: PhantomData, + } + } + + #[inline] + pub fn as_slice(&self) -> &'a [u8] { + unsafe { + slice::from_raw_parts(self.vec.iov_base as *mut u8, self.vec.iov_len) + } + } +} + +pub struct IoVecMut<'a> { + vec: iovec, + _p: PhantomData<&'a mut [u8]>, +} + +impl<'a> IoVecMut<'a> { + #[inline] + pub fn new(buf: &'a mut [u8]) -> IoVecMut<'a> { + IoVecMut { + vec: iovec { + iov_base: buf.as_mut_ptr() as *mut c_void, + iov_len: buf.len() + }, + _p: PhantomData, + } + } + + #[inline] + pub fn as_slice(&self) -> &'a [u8] { + unsafe { + slice::from_raw_parts(self.vec.iov_base as *mut u8, self.vec.iov_len) + } + } + + #[inline] + pub fn as_mut_slice(&mut self) -> &'a mut [u8] { + unsafe { + slice::from_raw_parts_mut(self.vec.iov_base as *mut u8, self.vec.iov_len) + } + } +} diff --git a/src/libstd/sys/unix/l4re.rs b/src/libstd/sys/unix/l4re.rs index 48037310c8d3a..4775e29fb5709 100644 --- a/src/libstd/sys/unix/l4re.rs +++ b/src/libstd/sys/unix/l4re.rs @@ -5,7 +5,7 @@ macro_rules! unimpl { pub mod net { #![allow(warnings)] use fmt; - use io; + use io::{self, IoVec, IoVecMut}; use libc; use net::{SocketAddr, Shutdown, Ipv4Addr, Ipv6Addr}; use sys_common::{AsInner, FromInner, IntoInner}; @@ -46,6 +46,10 @@ pub mod net { unimpl!(); } + pub fn read_vectored(&self, _: &mut [IoVecMut<'_>]) -> io::Result { + unimpl!(); + } + pub fn peek(&self, _: &mut [u8]) -> io::Result { unimpl!(); } @@ -62,6 +66,10 @@ pub mod net { unimpl!(); } + pub fn write_vectored(&self, _: &[IoVec<'_>]) -> io::Result { + unimpl!(); + } + pub fn set_timeout(&self, _: Option, _: libc::c_int) -> io::Result<()> { unimpl!(); } @@ -144,10 +152,18 @@ pub mod net { unimpl!(); } + pub fn read_vectored(&self, _: &mut [IoVecMut<'_>]) -> io::Result { + unimpl!(); + } + pub fn write(&self, _: &[u8]) -> io::Result { unimpl!(); } + pub fn write_vectored(&self, _: &[IoVec<'_>]) -> io::Result { + unimpl!(); + } + pub fn peer_addr(&self) -> io::Result { unimpl!(); } diff --git a/src/libstd/sys/unix/mod.rs b/src/libstd/sys/unix/mod.rs index b36c117fd09d4..0de1a223fbd12 100644 --- a/src/libstd/sys/unix/mod.rs +++ b/src/libstd/sys/unix/mod.rs @@ -1,6 +1,6 @@ #![allow(missing_docs, nonstandard_style)] -use io::{self, ErrorKind}; +use io::ErrorKind; use libc; #[cfg(any(rustdoc, target_os = "linux"))] pub use os::linux as platform; @@ -39,6 +39,7 @@ pub mod fast_thread_local; pub mod fd; pub mod fs; pub mod memchr; +pub mod io; pub mod mutex; #[cfg(not(target_os = "l4re"))] pub mod net; @@ -126,15 +127,15 @@ macro_rules! impl_is_minus_one { impl_is_minus_one! { i8 i16 i32 i64 isize } -pub fn cvt(t: T) -> io::Result { +pub fn cvt(t: T) -> ::io::Result { if t.is_minus_one() { - Err(io::Error::last_os_error()) + Err(::io::Error::last_os_error()) } else { Ok(t) } } -pub fn cvt_r(mut f: F) -> io::Result +pub fn cvt_r(mut f: F) -> ::io::Result where T: IsMinusOne, F: FnMut() -> T { diff --git a/src/libstd/sys/unix/net.rs b/src/libstd/sys/unix/net.rs index d780d71c37693..521d9b425179b 100644 --- a/src/libstd/sys/unix/net.rs +++ b/src/libstd/sys/unix/net.rs @@ -1,5 +1,5 @@ use ffi::CStr; -use io; +use io::{self, IoVec, IoVecMut}; use libc::{self, c_int, c_void, size_t, sockaddr, socklen_t, EAI_SYSTEM, MSG_PEEK}; use mem; use net::{SocketAddr, Shutdown}; @@ -241,6 +241,10 @@ impl Socket { self.recv_with_flags(buf, MSG_PEEK) } + pub fn read_vectored(&self, bufs: &mut [IoVecMut<'_>]) -> io::Result { + self.0.read_vectored(bufs) + } + fn recv_from_with_flags(&self, buf: &mut [u8], flags: c_int) -> io::Result<(usize, SocketAddr)> { let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() }; @@ -269,6 +273,10 @@ impl Socket { self.0.write(buf) } + pub fn write_vectored(&self, bufs: &[IoVec<'_>]) -> io::Result { + self.0.write_vectored(bufs) + } + pub fn set_timeout(&self, dur: Option, kind: libc::c_int) -> io::Result<()> { let timeout = match dur { Some(dur) => { diff --git a/src/libstd/sys/wasm/io.rs b/src/libstd/sys/wasm/io.rs new file mode 100644 index 0000000000000..9ee5788c58009 --- /dev/null +++ b/src/libstd/sys/wasm/io.rs @@ -0,0 +1,32 @@ +pub struct IoVec<'a>(&'a [u8]); + +impl<'a> IoVec<'a> { + #[inline] + pub fn new(buf: &'a [u8]) -> IoVec<'a> { + IoVec(buf) + } + + #[inline] + pub fn as_slice(&self) -> &'a [u8] { + self.0 + } +} + +pub struct IoVecMut<'a>(&'a mut [u8]); + +impl<'a> IoVecMut<'a> { + #[inline] + pub fn new(buf: &'a mut [u8]) -> IoVecMut<'a> { + IoVecMut(buf) + } + + #[inline] + pub fn as_slice(&self) -> &'a [u8] { + self.0 + } + + #[inline] + pub fn as_mut_slice(&mut self) -> &'a mut [u8] { + self.0 + } +} diff --git a/src/libstd/sys/wasm/mod.rs b/src/libstd/sys/wasm/mod.rs index e21455ec6da75..e71c6bcd7fe76 100644 --- a/src/libstd/sys/wasm/mod.rs +++ b/src/libstd/sys/wasm/mod.rs @@ -14,7 +14,6 @@ //! compiling for wasm. That way it's a compile time error for something that's //! guaranteed to be a runtime error! -use io; use os::raw::c_char; use ptr; use sys::os_str::Buf; @@ -29,6 +28,7 @@ pub mod backtrace; pub mod cmath; pub mod env; pub mod fs; +pub mod io; pub mod memchr; pub mod net; pub mod os; @@ -63,17 +63,17 @@ cfg_if! { pub fn init() { } -pub fn unsupported() -> io::Result { +pub fn unsupported() -> ::io::Result { Err(unsupported_err()) } -pub fn unsupported_err() -> io::Error { - io::Error::new(io::ErrorKind::Other, +pub fn unsupported_err() -> ::io::Error { + ::io::Error::new(::io::ErrorKind::Other, "operation not supported on wasm yet") } -pub fn decode_error_kind(_code: i32) -> io::ErrorKind { - io::ErrorKind::Other +pub fn decode_error_kind(_code: i32) -> ::io::ErrorKind { + ::io::ErrorKind::Other } // This enum is used as the storage for a bunch of types which can't actually diff --git a/src/libstd/sys/wasm/net.rs b/src/libstd/sys/wasm/net.rs index 81e4e8255bf5a..d9f5d53843205 100644 --- a/src/libstd/sys/wasm/net.rs +++ b/src/libstd/sys/wasm/net.rs @@ -1,5 +1,5 @@ use fmt; -use io; +use io::{self, IoVec, IoVecMut}; use net::{SocketAddr, Shutdown, Ipv4Addr, Ipv6Addr}; use time::Duration; use sys::{unsupported, Void}; @@ -40,10 +40,18 @@ impl TcpStream { match self.0 {} } + pub fn read_vectored(&self, _: &mut [IoVecMut<'_>]) -> io::Result { + match self.0 {} + } + pub fn write(&self, _: &[u8]) -> io::Result { match self.0 {} } + pub fn write_vectored(&self, _: &[IoVec<'_>]) -> io::Result { + match self.0 {} + } + pub fn peer_addr(&self) -> io::Result { match self.0 {} } diff --git a/src/libstd/sys/windows/c.rs b/src/libstd/sys/windows/c.rs index 28fd4df386e99..a78b599204b20 100644 --- a/src/libstd/sys/windows/c.rs +++ b/src/libstd/sys/windows/c.rs @@ -57,6 +57,9 @@ pub type LPWSAPROTOCOL_INFO = *mut WSAPROTOCOL_INFO; pub type LPSTR = *mut CHAR; pub type LPWSTR = *mut WCHAR; pub type LPFILETIME = *mut FILETIME; +pub type LPWSABUF = *mut WSABUF; +pub type LPWSAOVERLAPPED = *mut c_void; +pub type LPWSAOVERLAPPED_COMPLETION_ROUTINE = *mut c_void; pub type PCONDITION_VARIABLE = *mut CONDITION_VARIABLE; pub type PLARGE_INTEGER = *mut c_longlong; @@ -324,6 +327,12 @@ pub struct WSADATA { pub szSystemStatus: [u8; WSASYS_STATUS_LEN + 1], } +#[repr(C)] +pub struct WSABUF { + pub len: ULONG, + pub buf: *mut CHAR, +} + #[repr(C)] pub struct WSAPROTOCOL_INFO { pub dwServiceFlags1: DWORD, @@ -988,6 +997,22 @@ extern "system" { dwProcessId: DWORD, lpProtocolInfo: LPWSAPROTOCOL_INFO) -> c_int; + pub fn WSASend(s: SOCKET, + lpBuffers: LPWSABUF, + dwBufferCount: DWORD, + lpNumberOfBytesSent: LPDWORD, + dwFlags: DWORD, + lpOverlapped: LPWSAOVERLAPPED, + lpCompletionRoutine: LPWSAOVERLAPPED_COMPLETION_ROUTINE) + -> c_int; + pub fn WSARecv(s: SOCKET, + lpBuffers: LPWSABUF, + dwBufferCount: DWORD, + lpNumberOfBytesRecvd: LPDWORD, + lpFlags: LPDWORD, + lpOverlapped: LPWSAOVERLAPPED, + lpCompletionRoutine: LPWSAOVERLAPPED_COMPLETION_ROUTINE) + -> c_int; pub fn GetCurrentProcessId() -> DWORD; pub fn WSASocketW(af: c_int, kind: c_int, diff --git a/src/libstd/sys/windows/io.rs b/src/libstd/sys/windows/io.rs new file mode 100644 index 0000000000000..a14bfea9a2176 --- /dev/null +++ b/src/libstd/sys/windows/io.rs @@ -0,0 +1,63 @@ +use marker::PhantomData; +use slice; +use sys::c; + +#[repr(transparent)] +pub struct IoVec<'a> { + vec: c::WSABUF, + _p: PhantomData<&'a [u8]>, +} + +impl<'a> IoVec<'a> { + #[inline] + pub fn new(buf: &'a [u8]) -> IoVec<'a> { + assert!(buf.len() <= c::ULONG::max_value() as usize); + IoVec { + vec: c::WSABUF { + len: buf.len() as c::ULONG, + buf: buf.as_ptr() as *mut u8 as *mut c::CHAR, + }, + _p: PhantomData, + } + } + + #[inline] + pub fn as_slice(&self) -> &'a [u8] { + unsafe { + slice::from_raw_parts(self.vec.buf as *mut u8, self.vec.len as usize) + } + } +} + +pub struct IoVecMut<'a> { + vec: c::WSABUF, + _p: PhantomData<&'a mut [u8]>, +} + +impl<'a> IoVecMut<'a> { + #[inline] + pub fn new(buf: &'a mut [u8]) -> IoVecMut<'a> { + assert!(buf.len() <= c::ULONG::max_value() as usize); + IoVecMut { + vec: c::WSABUF { + len: buf.len() as c::ULONG, + buf: buf.as_mut_ptr() as *mut c::CHAR, + }, + _p: PhantomData, + } + } + + #[inline] + pub fn as_slice(&self) -> &'a [u8] { + unsafe { + slice::from_raw_parts(self.vec.buf as *mut u8, self.vec.len as usize) + } + } + + #[inline] + pub fn as_mut_slice(&mut self) -> &'a mut [u8] { + unsafe { + slice::from_raw_parts_mut(self.vec.buf as *mut u8, self.vec.len as usize) + } + } +} diff --git a/src/libstd/sys/windows/mod.rs b/src/libstd/sys/windows/mod.rs index e97e436efbf71..56c76a169feb8 100644 --- a/src/libstd/sys/windows/mod.rs +++ b/src/libstd/sys/windows/mod.rs @@ -2,7 +2,7 @@ use ptr; use ffi::{OsStr, OsString}; -use io::{self, ErrorKind}; +use io::ErrorKind; use os::windows::ffi::{OsStrExt, OsStringExt}; use path::PathBuf; use time::Duration; @@ -26,6 +26,7 @@ pub mod ext; pub mod fast_thread_local; pub mod fs; pub mod handle; +pub mod io; pub mod memchr; pub mod mutex; pub mod net; @@ -75,12 +76,12 @@ pub fn decode_error_kind(errno: i32) -> ErrorKind { } } -pub fn to_u16s>(s: S) -> io::Result> { - fn inner(s: &OsStr) -> io::Result> { +pub fn to_u16s>(s: S) -> ::io::Result> { + fn inner(s: &OsStr) -> ::io::Result> { let mut maybe_result: Vec = s.encode_wide().collect(); if maybe_result.iter().any(|&u| u == 0) { - return Err(io::Error::new(io::ErrorKind::InvalidInput, - "strings passed to WinAPI cannot contain NULs")); + return Err(::io::Error::new(::io::ErrorKind::InvalidInput, + "strings passed to WinAPI cannot contain NULs")); } maybe_result.push(0); Ok(maybe_result) @@ -102,7 +103,7 @@ pub fn to_u16s>(s: S) -> io::Result> { // Once the syscall has completed (errors bail out early) the second closure is // yielded the data which has been read from the syscall. The return value // from this closure is then the return value of the function. -fn fill_utf16_buf(mut f1: F1, f2: F2) -> io::Result +fn fill_utf16_buf(mut f1: F1, f2: F2) -> ::io::Result where F1: FnMut(*mut u16, c::DWORD) -> c::DWORD, F2: FnOnce(&[u16]) -> T { @@ -134,7 +135,7 @@ fn fill_utf16_buf(mut f1: F1, f2: F2) -> io::Result c::SetLastError(0); let k = match f1(buf.as_mut_ptr(), n as c::DWORD) { 0 if c::GetLastError() == 0 => 0, - 0 => return Err(io::Error::last_os_error()), + 0 => return Err(::io::Error::last_os_error()), n => n, } as usize; if k == n && c::GetLastError() == c::ERROR_INSUFFICIENT_BUFFER { @@ -157,7 +158,7 @@ fn wide_char_to_multi_byte(code_page: u32, flags: u32, s: &[u16], no_default_char: bool) - -> io::Result> { + -> ::io::Result> { unsafe { let mut size = c::WideCharToMultiByte(code_page, flags, @@ -168,7 +169,7 @@ fn wide_char_to_multi_byte(code_page: u32, ptr::null(), ptr::null_mut()); if size == 0 { - return Err(io::Error::last_os_error()); + return Err(::io::Error::last_os_error()); } let mut buf = Vec::with_capacity(size as usize); @@ -185,10 +186,10 @@ fn wide_char_to_multi_byte(code_page: u32, if no_default_char { &mut used_default_char } else { ptr::null_mut() }); if size == 0 { - return Err(io::Error::last_os_error()); + return Err(::io::Error::last_os_error()); } if no_default_char && used_default_char == c::TRUE { - return Err(io::Error::new(io::ErrorKind::InvalidData, + return Err(::io::Error::new(::io::ErrorKind::InvalidData, "string cannot be converted to requested code page")); } @@ -220,9 +221,9 @@ macro_rules! impl_is_zero { impl_is_zero! { i8 i16 i32 i64 isize u8 u16 u32 u64 usize } -pub fn cvt(i: I) -> io::Result { +pub fn cvt(i: I) -> ::io::Result { if i.is_zero() { - Err(io::Error::last_os_error()) + Err(::io::Error::last_os_error()) } else { Ok(i) } diff --git a/src/libstd/sys/windows/net.rs b/src/libstd/sys/windows/net.rs index acda81dcde574..76be26a9d1a57 100644 --- a/src/libstd/sys/windows/net.rs +++ b/src/libstd/sys/windows/net.rs @@ -1,7 +1,7 @@ #![unstable(issue = "0", feature = "windows_net")] use cmp; -use io::{self, Read}; +use io::{self, Read, IoVec, IoVecMut}; use libc::{c_int, c_void, c_ulong, c_long}; use mem; use net::{SocketAddr, Shutdown}; @@ -207,6 +207,30 @@ impl Socket { self.recv_with_flags(buf, 0) } + pub fn read_vectored(&self, bufs: &mut [IoVecMut<'_>]) -> io::Result { + // On unix when a socket is shut down all further reads return 0, so we + // do the same on windows to map a shut down socket to returning EOF. + let len = cmp::min(bufs.len(), c::DWORD::max_value() as usize) as c::DWORD; + let mut nread = 0; + let mut flags = 0; + unsafe { + let ret = c::WSARecv( + self.0, + bufs.as_mut_ptr() as *mut c::WSABUF, + len, + &mut nread, + &mut flags, + ptr::null_mut(), + ptr::null_mut(), + ); + match ret { + 0 => Ok(nread as usize), + _ if c::WSAGetLastError() == c::WSAESHUTDOWN => Ok(0), + _ => Err(last_error()), + } + } + } + pub fn peek(&self, buf: &mut [u8]) -> io::Result { self.recv_with_flags(buf, c::MSG_PEEK) } @@ -243,6 +267,23 @@ impl Socket { self.recv_from_with_flags(buf, c::MSG_PEEK) } + pub fn write_vectored(&self, bufs: &[IoVec<'_>]) -> io::Result { + let len = cmp::min(bufs.len(), c::DWORD::max_value() as usize) as c::DWORD; + let mut nwritten = 0; + unsafe { + cvt(c::WSASend( + self.0, + bufs.as_ptr() as *const c::WSABUF as *mut c::WSABUF, + len, + &mut nwritten, + 0, + ptr::null_mut(), + ptr::null_mut(), + ))?; + } + Ok(nwritten as usize) + } + pub fn set_timeout(&self, dur: Option, kind: c_int) -> io::Result<()> { let timeout = match dur { diff --git a/src/libstd/sys_common/net.rs b/src/libstd/sys_common/net.rs index f75df3ea695c6..0d60593ce1f2f 100644 --- a/src/libstd/sys_common/net.rs +++ b/src/libstd/sys_common/net.rs @@ -1,7 +1,7 @@ use cmp; use ffi::CString; use fmt; -use io::{self, Error, ErrorKind}; +use io::{self, Error, ErrorKind, IoVec, IoVecMut}; use libc::{c_int, c_void}; use mem; use net::{SocketAddr, Shutdown, Ipv4Addr, Ipv6Addr}; @@ -255,6 +255,10 @@ impl TcpStream { self.inner.read(buf) } + pub fn read_vectored(&self, bufs: &mut [IoVecMut<'_>]) -> io::Result { + self.inner.read_vectored(bufs) + } + pub fn write(&self, buf: &[u8]) -> io::Result { let len = cmp::min(buf.len(), ::max_value() as usize) as wrlen_t; let ret = cvt(unsafe { @@ -266,6 +270,10 @@ impl TcpStream { Ok(ret as usize) } + pub fn write_vectored(&self, bufs: &[IoVec<'_>]) -> io::Result { + self.inner.write_vectored(bufs) + } + pub fn peer_addr(&self) -> io::Result { sockname(|buf, len| unsafe { c::getpeername(*self.inner.as_inner(), buf, len)