Skip to content

Commit

Permalink
input_stream: add AsyncBufRead and AsyncBufRead adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillaume Desmottes committed May 22, 2020
1 parent 725f93a commit 8eeddde
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 1 deletion.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ git = "https://github.com/gtk-rs/lgpl-docs"
libc = "0.2"
bitflags = "1.0"
once_cell = "1.0"
futures = "0.3"
futures-core = "0.3"
futures-channel = "0.3"
futures-util = "0.3"
Expand All @@ -55,6 +56,7 @@ gio-sys = { git = "https://github.com/gtk-rs/sys" }
glib-sys = { git = "https://github.com/gtk-rs/sys" }
glib = { git = "https://github.com/gtk-rs/glib" }
gobject-sys = { git = "https://github.com/gtk-rs/sys" }
thiserror = "1"

[dev-dependencies]
gir-format-check = "^0.1"
Expand Down
225 changes: 225 additions & 0 deletions src/input_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
// Licensed under the MIT license, see the LICENSE file or <http://opensource.org/licenses/MIT>

use error::to_std_io_result;
use futures_core::task::{Context, Poll};
use futures_io::{AsyncBufRead, AsyncRead};
use futures_util::future::FutureExt;
use gio_sys;
use glib::object::IsA;
use glib::translate::*;
Expand Down Expand Up @@ -81,6 +84,13 @@ pub trait InputStreamExtManual: Sized {
{
InputStreamRead(self)
}

fn into_async_buf_read(self, buffer_size: usize) -> InputStreamAsyncBufRead<Self>
where
Self: IsA<InputStream>,
{
InputStreamAsyncBufRead::new(self, buffer_size)
}
}

impl<O: IsA<InputStream>> InputStreamExtManual for O {
Expand Down Expand Up @@ -341,6 +351,221 @@ impl<T: IsA<InputStream> + IsA<Seekable>> io::Seek for InputStreamRead<T> {
}
}

enum State {
Waiting {
buffer: Vec<u8>,
},
Transitioning,
Reading {
pending: Pin<
Box<
dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
+ 'static,
>,
>,
},
HasData {
buffer: Vec<u8>,
valid: (usize, usize), // first index is inclusive, second is exclusive
},
Failed(crate::IOErrorEnum),
}

impl State {
fn into_buffer(self) -> Vec<u8> {
match self {
State::Waiting { buffer } => buffer,
_ => panic!("Invalid state"),
}
}

fn get_pending(
&mut self,
) -> &mut Pin<
Box<
dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
+ 'static,
>,
> {
match self {
State::Reading { ref mut pending } => pending,
_ => panic!("Invalid state"),
}
}
}
pub struct InputStreamAsyncBufRead<T: IsA<InputStream>> {
stream: T,
state: State,
}

impl<T: IsA<InputStream>> InputStreamAsyncBufRead<T> {
pub fn into_input_stream(self) -> T {
self.stream
}

pub fn input_stream(&self) -> &T {
&self.stream
}

fn new(stream: T, buffer_size: usize) -> Self {
let buffer = vec![0; buffer_size];

Self {
stream,
state: State::Waiting { buffer },
}
}
fn set_reading(
&mut self,
) -> &mut Pin<
Box<
dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
+ 'static,
>,
> {
match self.state {
State::Waiting { .. } => {
let waiting = mem::replace(&mut self.state, State::Transitioning);
let buffer = waiting.into_buffer();
let pending = self
.input_stream()
.read_async_future(buffer, Priority::default());
self.state = State::Reading { pending };
}
State::Reading { .. } => {}
_ => panic!("Invalid state"),
};

self.state.get_pending()
}

fn get_data(&self) -> Poll<io::Result<&[u8]>> {
if let State::HasData {
ref buffer,
valid: (i, j),
} = self.state
{
return Poll::Ready(Ok(&buffer[i..j]));
}
panic!("Invalid state")
}

fn set_waiting(&mut self, buffer: Vec<u8>) {
match self.state {
State::Reading { .. } | State::Transitioning => self.state = State::Waiting { buffer },
_ => panic!("Invalid state"),
}
}

fn set_has_data(&mut self, buffer: Vec<u8>, valid: (usize, usize)) {
match self.state {
State::Reading { .. } | State::Transitioning { .. } => {
self.state = State::HasData { buffer, valid }
}
_ => panic!("Invalid state"),
}
}

fn poll_fill_buf(&mut self, cx: &mut Context) -> Poll<Result<&[u8], futures::io::Error>> {
match self.state {
State::Failed(kind) => Poll::Ready(Err(io::Error::new(
io::ErrorKind::from(kind),
BufReadError::Failed,
))),
State::HasData { .. } => self.get_data(),
State::Transitioning => panic!("Invalid state"),
State::Waiting { .. } | State::Reading { .. } => {
let pending = self.set_reading();
match pending.poll_unpin(cx) {
Poll::Ready(Ok((buffer, res))) => {
if res == 0 {
self.set_waiting(buffer);
Poll::Ready(Ok(&[]))
} else {
self.set_has_data(buffer, (0, res));
self.get_data()
}
}
Poll::Ready(Err((_, err))) => {
let kind = err.kind::<crate::IOErrorEnum>().unwrap();
self.state = State::Failed(kind);
Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
}
Poll::Pending => Poll::Pending,
}
}
}
}

fn consume(&mut self, amt: usize) {
if amt == 0 {
return;
}

if let State::HasData { .. } = self.state {
let has_data = mem::replace(&mut self.state, State::Transitioning);
if let State::HasData {
buffer,
valid: (i, j),
} = has_data
{
let available = j - i;
let remaining = available - amt;
if remaining == 0 {
return self.set_waiting(buffer);
} else {
return self.set_has_data(buffer, (i + amt, j));
}
}
}

panic!("Invalid state")
}
}

#[derive(thiserror::Error, Debug)]
enum BufReadError {
#[error("Previous read operation failed")]
Failed,
}

impl<T: IsA<InputStream>> AsyncRead for InputStreamAsyncBufRead<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
out_buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let reader = self.get_mut();
let poll = reader.poll_fill_buf(cx);

let poll = poll.map_ok(|buffer| {
let copied = buffer.len().min(out_buf.len());
out_buf[..copied].copy_from_slice(&buffer[..copied]);
copied
});

if let Poll::Ready(Ok(consumed)) = poll {
reader.consume(consumed);
}
poll
}
}

impl<T: IsA<InputStream>> AsyncBufRead for InputStreamAsyncBufRead<T> {
fn poll_fill_buf(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<&[u8], futures::io::Error>> {
self.get_mut().poll_fill_buf(cx)
}

fn consume(self: Pin<&mut Self>, amt: usize) {
self.get_mut().consume(amt);
}
}

impl<T: IsA<InputStream>> Unpin for InputStreamAsyncBufRead<T> {}

#[cfg(test)]
mod tests {
use crate::prelude::*;
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ mod inet_address;
mod io_stream;
pub use io_stream::IOStreamAsyncReadWrite;
mod input_stream;
pub use input_stream::InputStreamRead;
pub use input_stream::{InputStreamAsyncBufRead, InputStreamRead};
#[cfg(any(feature = "v2_44", feature = "dox"))]
mod list_store;
mod memory_input_stream;
Expand Down
79 changes: 79 additions & 0 deletions tests/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,82 @@
// This target is build with edition 2018 for testing futures API.
// TODO: merge to the test module of the corresponding source files once the crate
// has been ported to 2018.

use futures::prelude::*;
use gio::prelude::*;
use gio::MemoryInputStream;
use glib::Bytes;
use std::error::Error;

#[test]
fn async_read() {
async fn run() -> Result<(), Box<dyn Error>> {
let b = Bytes::from_owned(vec![1, 2, 3]);

// Adapter is big enough to read everything in one read
let mut read = MemoryInputStream::new_from_bytes(&b).into_async_buf_read(8);
let mut buf = [0u8; 4];
assert_eq!(read.read(&mut buf).await?, 3);
assert_eq!(buf, [1, 2, 3, 0]);
assert_eq!(read.read(&mut buf).await?, 0);

let mut read = MemoryInputStream::new_from_bytes(&b).into_async_buf_read(8);
let mut buf = [0u8; 1];
assert_eq!(read.read(&mut buf).await?, 1);
assert_eq!(buf, [1]);
assert_eq!(read.read(&mut buf).await?, 1);
assert_eq!(buf, [2]);
assert_eq!(read.read(&mut buf).await?, 1);
assert_eq!(buf, [3]);
assert_eq!(read.read(&mut buf).await?, 0);

// Adapter is NOT big enough to read everything in one read
let mut read = MemoryInputStream::new_from_bytes(&b).into_async_buf_read(2);
let mut buf = [0u8; 4];
assert_eq!(read.read(&mut buf).await?, 2);
assert_eq!(buf, [1, 2, 0, 0]);
assert_eq!(read.read(&mut buf).await?, 1);
assert_eq!(buf[0], 3);
assert_eq!(read.read(&mut buf).await?, 0);

let mut read = MemoryInputStream::new_from_bytes(&b).into_async_buf_read(2);
let mut buf = [0u8; 1];
assert_eq!(read.read(&mut buf).await?, 1);
assert_eq!(buf, [1]);
assert_eq!(read.read(&mut buf).await?, 1);
assert_eq!(buf, [2]);
assert_eq!(read.read(&mut buf).await?, 1);
assert_eq!(buf, [3]);
assert_eq!(read.read(&mut buf).await?, 0);

Ok(())
}

let main_context = glib::MainContext::new();
main_context.block_on(run()).unwrap();
}

#[test]
fn async_buf_read() {
async fn run() -> Result<(), Box<dyn Error>> {
let b = Bytes::from_owned(vec![1, 2, 3]);
// Adapter is big enough to read everything in one read
let mut read = MemoryInputStream::new_from_bytes(&b).into_async_buf_read(16);
let mut buf = String::new();
assert_eq!(read.read_line(&mut buf).await?, 3);
assert_eq!(buf.as_bytes(), [1, 2, 3]);
assert_eq!(read.read_line(&mut buf).await?, 0);

// Adapter is NOT big enough to read everything in one read
let mut read = MemoryInputStream::new_from_bytes(&b).into_async_buf_read(2);
let mut buf = String::new();
assert_eq!(read.read_line(&mut buf).await?, 3);
assert_eq!(buf.as_bytes(), [1, 2, 3]);
assert_eq!(read.read_line(&mut buf).await?, 0);

Ok(())
}

let main_context = glib::MainContext::new();
main_context.block_on(run()).unwrap();
}

0 comments on commit 8eeddde

Please sign in to comment.