Skip to content

Commit

Permalink
Add vectored read and write support
Browse files Browse the repository at this point in the history
This functionality has lived for a while in the tokio ecosystem, where
it can improve performance by minimizing copies.
  • Loading branch information
sfackler committed Feb 14, 2019
1 parent 4772dc8 commit 31bcec6
Show file tree
Hide file tree
Showing 29 changed files with 1,033 additions and 92 deletions.
33 changes: 31 additions & 2 deletions src/libstd/io/buffered.rs
Expand Up @@ -5,7 +5,7 @@ use io::prelude::*;
use cmp;
use error;
use fmt;
use io::{self, Initializer, DEFAULT_BUF_SIZE, Error, ErrorKind, SeekFrom};
use io::{self, Initializer, DEFAULT_BUF_SIZE, Error, ErrorKind, SeekFrom, IoVec, IoVecMut};
use memchr;

/// The `BufReader` struct adds buffering to any reader.
Expand Down Expand Up @@ -235,6 +235,19 @@ impl<R: Read> Read for BufReader<R> {
Ok(nread)
}

fn read_vectored(&mut self, bufs: &mut [IoVecMut<'_>]) -> io::Result<usize> {
let total_len = bufs.iter().map(|b| b.as_slice().len()).sum::<usize>();
if self.pos == self.cap && total_len >= self.buf.len() {
return self.inner.read_vectored(bufs);
}
let nread = {
let mut rem = self.fill_buf()?;
rem.read_vectored(bufs)?
};
self.consume(nread);
Ok(nread)
}

// we can't skip unconditionally because of the large buffer case in read.
unsafe fn initializer(&self) -> Initializer {
self.inner.initializer()
Expand Down Expand Up @@ -577,9 +590,25 @@ impl<W: Write> Write for BufWriter<W> {
self.panicked = false;
r
} else {
Write::write(&mut self.buf, buf)
self.buf.write(buf)
}
}

fn write_vectored(&mut self, bufs: &[IoVec<'_>]) -> io::Result<usize> {
let total_len = bufs.iter().map(|b| b.as_slice().len()).sum::<usize>();
if self.buf.len() + total_len > self.buf.capacity() {
self.flush_buf()?;
}
if total_len >= self.buf.capacity() {
self.panicked = true;
let r = self.inner.as_mut().unwrap().write_vectored(bufs);
self.panicked = false;
r
} else {
self.buf.write_vectored(bufs)
}
}

fn flush(&mut self) -> io::Result<()> {
self.flush_buf().and_then(|()| self.get_mut().flush())
}
Expand Down
217 changes: 212 additions & 5 deletions src/libstd/io/cursor.rs
Expand Up @@ -2,7 +2,7 @@ use io::prelude::*;

use core::convert::TryInto;
use cmp;
use io::{self, Initializer, SeekFrom, Error, ErrorKind};
use io::{self, Initializer, SeekFrom, Error, ErrorKind, IoVec, IoVecMut};

/// A `Cursor` wraps an in-memory buffer and provides it with a
/// [`Seek`] implementation.
Expand Down Expand Up @@ -221,6 +221,19 @@ impl<T> Read for Cursor<T> where T: AsRef<[u8]> {
Ok(n)
}

fn read_vectored(&mut self, bufs: &mut [IoVecMut<'_>]) -> io::Result<usize> {
let mut nread = 0;
for buf in bufs {
let buf = buf.as_mut_slice();
let n = self.read(buf)?;
nread += n;
if n < buf.len() {
break;
}
}
Ok(nread)
}

fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
let n = buf.len();
Read::read_exact(&mut self.fill_buf()?, buf)?;
Expand Down Expand Up @@ -251,6 +264,24 @@ fn slice_write(pos_mut: &mut u64, slice: &mut [u8], buf: &[u8]) -> io::Result<us
Ok(amt)
}

fn slice_write_vectored(
pos_mut: &mut u64,
slice: &mut [u8],
bufs: &[IoVec<'_>],
) -> io::Result<usize>
{
let mut nwritten = 0;
for buf in bufs {
let buf = buf.as_slice();
let n = slice_write(pos_mut, slice, buf)?;
nwritten += n;
if n < buf.len() {
break;
}
}
Ok(nwritten)
}

// Resizing write implementation
fn vec_write(pos_mut: &mut u64, vec: &mut Vec<u8>, buf: &[u8]) -> io::Result<usize> {
let pos: usize = (*pos_mut).try_into().map_err(|_| {
Expand Down Expand Up @@ -278,12 +309,31 @@ fn vec_write(pos_mut: &mut u64, vec: &mut Vec<u8>, buf: &[u8]) -> io::Result<usi
Ok(buf.len())
}

fn vec_write_vectored(
pos_mut: &mut u64,
vec: &mut Vec<u8>,
bufs: &[IoVec<'_>],
) -> io::Result<usize>
{
let mut nwritten = 0;
for buf in bufs {
nwritten += vec_write(pos_mut, vec, buf.as_slice())?;
}
Ok(nwritten)
}

#[stable(feature = "rust1", since = "1.0.0")]
impl<'a> Write for Cursor<&'a mut [u8]> {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
slice_write(&mut self.pos, self.inner, buf)
}

#[inline]
fn write_vectored(&mut self, bufs: &[IoVec<'_>]) -> io::Result<usize> {
slice_write_vectored(&mut self.pos, self.inner, bufs)
}

fn flush(&mut self) -> io::Result<()> { Ok(()) }
}

Expand All @@ -292,6 +342,11 @@ impl<'a> Write for Cursor<&'a mut Vec<u8>> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
vec_write(&mut self.pos, self.inner, buf)
}

fn write_vectored(&mut self, bufs: &[IoVec<'_>]) -> io::Result<usize> {
vec_write_vectored(&mut self.pos, self.inner, bufs)
}

fn flush(&mut self) -> io::Result<()> { Ok(()) }
}

Expand All @@ -300,6 +355,11 @@ impl Write for Cursor<Vec<u8>> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
vec_write(&mut self.pos, &mut self.inner, buf)
}

fn write_vectored(&mut self, bufs: &[IoVec<'_>]) -> io::Result<usize> {
vec_write_vectored(&mut self.pos, &mut self.inner, bufs)
}

fn flush(&mut self) -> io::Result<()> { Ok(()) }
}

Expand All @@ -309,21 +369,30 @@ impl Write for Cursor<Box<[u8]>> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
slice_write(&mut self.pos, &mut self.inner, buf)
}

#[inline]
fn write_vectored(&mut self, bufs: &[IoVec<'_>]) -> io::Result<usize> {
slice_write_vectored(&mut self.pos, &mut self.inner, bufs)
}

fn flush(&mut self) -> io::Result<()> { Ok(()) }
}

#[cfg(test)]
mod tests {
use io::prelude::*;
use io::{Cursor, SeekFrom};
use io::{Cursor, SeekFrom, IoVec, IoVecMut};

#[test]
fn test_vec_writer() {
let mut writer = Vec::new();
assert_eq!(writer.write(&[0]).unwrap(), 1);
assert_eq!(writer.write(&[1, 2, 3]).unwrap(), 3);
assert_eq!(writer.write(&[4, 5, 6, 7]).unwrap(), 4);
let b: &[_] = &[0, 1, 2, 3, 4, 5, 6, 7];
assert_eq!(writer.write_vectored(
&[IoVec::new(&[]), IoVec::new(&[8, 9]), IoVec::new(&[10])],
).unwrap(), 3);
let b: &[_] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
assert_eq!(writer, b);
}

Expand All @@ -333,7 +402,10 @@ mod tests {
assert_eq!(writer.write(&[0]).unwrap(), 1);
assert_eq!(writer.write(&[1, 2, 3]).unwrap(), 3);
assert_eq!(writer.write(&[4, 5, 6, 7]).unwrap(), 4);
let b: &[_] = &[0, 1, 2, 3, 4, 5, 6, 7];
assert_eq!(writer.write_vectored(
&[IoVec::new(&[]), IoVec::new(&[8, 9]), IoVec::new(&[10])],
).unwrap(), 3);
let b: &[_] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
assert_eq!(&writer.get_ref()[..], b);
}

Expand All @@ -344,7 +416,10 @@ mod tests {
assert_eq!(writer.write(&[0]).unwrap(), 1);
assert_eq!(writer.write(&[1, 2, 3]).unwrap(), 3);
assert_eq!(writer.write(&[4, 5, 6, 7]).unwrap(), 4);
let b: &[_] = &[0, 1, 2, 3, 4, 5, 6, 7];
assert_eq!(writer.write_vectored(
&[IoVec::new(&[]), IoVec::new(&[8, 9]), IoVec::new(&[10])],
).unwrap(), 3);
let b: &[_] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
assert_eq!(&writer.get_ref()[..], b);
}

Expand All @@ -366,6 +441,26 @@ mod tests {
assert_eq!(&**writer.get_ref(), b);
}

#[test]
fn test_box_slice_writer_vectored() {
let mut writer = Cursor::new(vec![0u8; 9].into_boxed_slice());
assert_eq!(writer.position(), 0);
assert_eq!(writer.write_vectored(&[IoVec::new(&[0])]).unwrap(), 1);
assert_eq!(writer.position(), 1);
assert_eq!(
writer.write_vectored(&[IoVec::new(&[1, 2, 3]), IoVec::new(&[4, 5, 6, 7])]).unwrap(),
7,
);
assert_eq!(writer.position(), 8);
assert_eq!(writer.write_vectored(&[]).unwrap(), 0);
assert_eq!(writer.position(), 8);

assert_eq!(writer.write_vectored(&[IoVec::new(&[8, 9])]).unwrap(), 1);
assert_eq!(writer.write_vectored(&[IoVec::new(&[10])]).unwrap(), 0);
let b: &[_] = &[0, 1, 2, 3, 4, 5, 6, 7, 8];
assert_eq!(&**writer.get_ref(), b);
}

#[test]
fn test_buf_writer() {
let mut buf = [0 as u8; 9];
Expand All @@ -387,6 +482,31 @@ mod tests {
assert_eq!(buf, b);
}

#[test]
fn test_buf_writer_vectored() {
let mut buf = [0 as u8; 9];
{
let mut writer = Cursor::new(&mut buf[..]);
assert_eq!(writer.position(), 0);
assert_eq!(writer.write_vectored(&[IoVec::new(&[0])]).unwrap(), 1);
assert_eq!(writer.position(), 1);
assert_eq!(
writer.write_vectored(
&[IoVec::new(&[1, 2, 3]), IoVec::new(&[4, 5, 6, 7])],
).unwrap(),
7,
);
assert_eq!(writer.position(), 8);
assert_eq!(writer.write_vectored(&[]).unwrap(), 0);
assert_eq!(writer.position(), 8);

assert_eq!(writer.write_vectored(&[IoVec::new(&[8, 9])]).unwrap(), 1);
assert_eq!(writer.write_vectored(&[IoVec::new(&[10])]).unwrap(), 0);
}
let b: &[_] = &[0, 1, 2, 3, 4, 5, 6, 7, 8];
assert_eq!(buf, b);
}

#[test]
fn test_buf_writer_seek() {
let mut buf = [0 as u8; 8];
Expand Down Expand Up @@ -447,6 +567,35 @@ mod tests {
assert_eq!(reader.read(&mut buf).unwrap(), 0);
}

#[test]
fn test_mem_reader_vectored() {
let mut reader = Cursor::new(vec![0, 1, 2, 3, 4, 5, 6, 7]);
let mut buf = [];
assert_eq!(reader.read_vectored(&mut [IoVecMut::new(&mut buf)]).unwrap(), 0);
assert_eq!(reader.position(), 0);
let mut buf = [0];
assert_eq!(
reader.read_vectored(&mut [IoVecMut::new(&mut []), IoVecMut::new(&mut buf)]).unwrap(),
1,
);
assert_eq!(reader.position(), 1);
let b: &[_] = &[0];
assert_eq!(buf, b);
let mut buf1 = [0; 4];
let mut buf2 = [0; 4];
assert_eq!(
reader.read_vectored(
&mut [IoVecMut::new(&mut buf1), IoVecMut::new(&mut buf2)],
).unwrap(),
7,
);
let b1: &[_] = &[1, 2, 3, 4];
let b2: &[_] = &[5, 6, 7];
assert_eq!(buf1, b1);
assert_eq!(&buf2[..3], b2);
assert_eq!(reader.read(&mut buf).unwrap(), 0);
}

#[test]
fn test_boxed_slice_reader() {
let mut reader = Cursor::new(vec![0, 1, 2, 3, 4, 5, 6, 7].into_boxed_slice());
Expand All @@ -469,6 +618,35 @@ mod tests {
assert_eq!(reader.read(&mut buf).unwrap(), 0);
}

#[test]
fn test_boxed_slice_reader_vectored() {
let mut reader = Cursor::new(vec![0, 1, 2, 3, 4, 5, 6, 7].into_boxed_slice());
let mut buf = [];
assert_eq!(reader.read_vectored(&mut [IoVecMut::new(&mut buf)]).unwrap(), 0);
assert_eq!(reader.position(), 0);
let mut buf = [0];
assert_eq!(
reader.read_vectored(&mut [IoVecMut::new(&mut []), IoVecMut::new(&mut buf)]).unwrap(),
1,
);
assert_eq!(reader.position(), 1);
let b: &[_] = &[0];
assert_eq!(buf, b);
let mut buf1 = [0; 4];
let mut buf2 = [0; 4];
assert_eq!(
reader.read_vectored(
&mut [IoVecMut::new(&mut buf1), IoVecMut::new(&mut buf2)],
).unwrap(),
7,
);
let b1: &[_] = &[1, 2, 3, 4];
let b2: &[_] = &[5, 6, 7];
assert_eq!(buf1, b1);
assert_eq!(&buf2[..3], b2);
assert_eq!(reader.read(&mut buf).unwrap(), 0);
}

#[test]
fn read_to_end() {
let mut reader = Cursor::new(vec![0, 1, 2, 3, 4, 5, 6, 7]);
Expand Down Expand Up @@ -499,6 +677,35 @@ mod tests {
assert_eq!(reader.read(&mut buf).unwrap(), 0);
}

#[test]
fn test_slice_reader_vectored() {
let in_buf = vec![0, 1, 2, 3, 4, 5, 6, 7];
let reader = &mut &in_buf[..];
let mut buf = [];
assert_eq!(reader.read_vectored(&mut [IoVecMut::new(&mut buf)]).unwrap(), 0);
let mut buf = [0];
assert_eq!(
reader.read_vectored(&mut [IoVecMut::new(&mut []), IoVecMut::new(&mut buf)]).unwrap(),
1,
);
assert_eq!(reader.len(), 7);
let b: &[_] = &[0];
assert_eq!(buf, b);
let mut buf1 = [0; 4];
let mut buf2 = [0; 4];
assert_eq!(
reader.read_vectored(
&mut [IoVecMut::new(&mut buf1), IoVecMut::new(&mut buf2)],
).unwrap(),
7,
);
let b1: &[_] = &[1, 2, 3, 4];
let b2: &[_] = &[5, 6, 7];
assert_eq!(buf1, b1);
assert_eq!(&buf2[..3], b2);
assert_eq!(reader.read(&mut buf).unwrap(), 0);
}

#[test]
fn test_read_exact() {
let in_buf = vec![0, 1, 2, 3, 4, 5, 6, 7];
Expand Down

0 comments on commit 31bcec6

Please sign in to comment.