Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillaume Desmottes committed May 19, 2020
1 parent 725f93a commit 84bd74d
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ git = "https://github.com/gtk-rs/lgpl-docs"
libc = "0.2"
bitflags = "1.0"
once_cell = "1.0"
futures = "0.3"
futures-core = "0.3"
futures-channel = "0.3"
futures-util = "0.3"
Expand Down
99 changes: 98 additions & 1 deletion src/input_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
// Licensed under the MIT license, see the LICENSE file or <http://opensource.org/licenses/MIT>

use error::to_std_io_result;
use futures_core::task::{Context, Poll};
use futures_io::AsyncRead;
use futures_util::future::FutureExt;
use gio_sys;
use glib::object::IsA;
use glib::translate::*;
Expand All @@ -12,7 +15,7 @@ use gobject_sys;
use std::io;
use std::mem;
use std::pin::Pin;
use std::ptr;
use std::{cell::RefCell, ptr};
use Cancellable;
use InputStream;
use Seekable;
Expand Down Expand Up @@ -81,6 +84,13 @@ pub trait InputStreamExtManual: Sized {
{
InputStreamRead(self)
}

fn into_async_buf_read(self, buffer_size: usize) -> InputStreamAsyncBufRead<Self>
where
Self: IsA<InputStream>,
{
InputStreamAsyncBufRead::new(self, buffer_size)
}
}

impl<O: IsA<InputStream>> InputStreamExtManual for O {
Expand Down Expand Up @@ -341,6 +351,93 @@ impl<T: IsA<InputStream> + IsA<Seekable>> io::Seek for InputStreamRead<T> {
}
}

enum State {
Waiting {
buffer: Vec<u8>,
},
Reading {
pending: Pin<
Box<
dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
+ 'static,
>,
>,
},
}
pub struct InputStreamAsyncBufRead<T: IsA<InputStream>> {
stream: T,
state: RefCell<State>,
}

impl<T: IsA<InputStream>> InputStreamAsyncBufRead<T> {
pub fn into_input_stream(self) -> T {
self.stream
}

pub fn input_stream(&self) -> &T {
&self.stream
}

fn new(stream: T, buffer_size: usize) -> Self {
let mut buffer = Vec::with_capacity(buffer_size);
buffer.resize(buffer_size, 0);

Self {
stream,
state: RefCell::new(State::Waiting { buffer }),
}
}

fn to_reading(&self) {
let mut state = self.state.borrow_mut();
match *state {
State::Waiting { buffer: buffer } => {
let pending = self
.input_stream()
.read_async_future(buffer, Priority::default());
*state = State::Reading { pending };
}
_ => {}
}
}

fn to_waiting(&self, buffer: Vec<u8>) {
self.state.replace(State::Waiting { buffer });
}
}

impl<T: IsA<InputStream>> AsyncRead for InputStreamAsyncBufRead<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let reader = Pin::get_ref(self.as_ref());
reader.to_reading();

let mut state = reader.state.borrow_mut();
let pending = match *state {
State::Reading {
pending: ref mut pending,
} => pending,
_ => panic!("Wrong state: should be Reading"),
};

match pending.poll_unpin(cx) {
Poll::Ready(Ok((out_buf, res))) => {
buf.clone_from_slice(&out_buf);
reader.to_waiting(out_buf);
Poll::Ready(Ok(res))
}
Poll::Ready(Err((_, err))) => {
let kind = err.kind::<crate::IOErrorEnum>().unwrap();
Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
}
Poll::Pending => Poll::Pending,
}
}
}

#[cfg(test)]
mod tests {
use crate::prelude::*;
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ mod inet_address;
mod io_stream;
pub use io_stream::IOStreamAsyncReadWrite;
mod input_stream;
pub use input_stream::InputStreamRead;
pub use input_stream::{InputStreamAsyncBufRead, InputStreamRead};
#[cfg(any(feature = "v2_44", feature = "dox"))]
mod list_store;
mod memory_input_stream;
Expand Down
26 changes: 26 additions & 0 deletions tests/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,29 @@
// This target is build with edition 2018 for testing futures API.
// TODO: merge to the test module of the corresponding source files once the crate
// has been ported to 2018.

use futures::prelude::*;
use gio::prelude::*;
use gio::MemoryInputStream;
use glib::Bytes;
use std::error::Error;

#[test]
fn async_buf_read() {
async fn run() -> Result<(), Box<dyn Error>> {
let b = Bytes::from_owned(vec![1, 2, 3]);
let mut read = MemoryInputStream::new_from_bytes(&b).into_async_buf_read(16);
let mut buf = [0u8; 10];

let ret = read.read(&mut buf).await?;
assert_eq!(ret, 3);
assert_eq!(buf[0], 1);
assert_eq!(buf[1], 2);
assert_eq!(buf[2], 3);

Ok(())
}

let main_context = glib::MainContext::default();
main_context.block_on(run()).unwrap();
}

0 comments on commit 84bd74d

Please sign in to comment.