From be9f0ad6f792a9eb6ce664ed3450f93fe25bbb8d Mon Sep 17 00:00:00 2001 From: Guillaume Desmottes Date: Fri, 15 May 2020 14:35:39 +0200 Subject: [PATCH] wip --- Cargo.toml | 1 + src/input_stream.rs | 51 +++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 +- tests/futures.rs | 27 ++++++++++++++++++++++++ 4 files changed, 80 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index faa52007..e48ccabf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ git = "https://github.com/gtk-rs/lgpl-docs" libc = "0.2" bitflags = "1.0" once_cell = "1.0" +futures = "0.3" futures-core = "0.3" futures-channel = "0.3" futures-util = "0.3" diff --git a/src/input_stream.rs b/src/input_stream.rs index 5cac8add..d7c0e517 100644 --- a/src/input_stream.rs +++ b/src/input_stream.rs @@ -3,6 +3,9 @@ // Licensed under the MIT license, see the LICENSE file or use error::to_std_io_result; +use futures_core::task::{Context, Poll}; +use futures_io::AsyncRead; +use futures_util::future::FutureExt; use gio_sys; use glib::object::IsA; use glib::translate::*; @@ -81,6 +84,16 @@ pub trait InputStreamExtManual: Sized { { InputStreamRead(self) } + + fn into_async_buf_read(self, buffer_size: usize) -> InputStreamAsyncBufRead + where + Self: IsA, + { + let mut buffer = Vec::with_capacity(buffer_size); + buffer.resize(buffer_size, 0); + + InputStreamAsyncBufRead(self, buffer) + } } impl> InputStreamExtManual for O { @@ -341,6 +354,44 @@ impl + IsA> io::Seek for InputStreamRead { } } +#[derive(Debug)] +pub struct InputStreamAsyncBufRead>(T, Vec); + +impl> InputStreamAsyncBufRead { + pub fn into_input_stream(self) -> T { + self.0 + } + + pub fn input_stream(&self) -> &T { + &self.0 + } +} + +impl> AsyncRead for InputStreamAsyncBufRead { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll> { + let stream = Pin::get_ref(self.as_ref()); + let my_buf = vec![0; 10]; + let mut fut = stream + .0 + .as_ref() + .read_async_future(my_buf, Priority::default()); + + match dbg!(fut.poll_unpin(cx)) { + // FIXME: copy to @buf + Poll::Ready(Ok((out_buf, res))) => Poll::Ready(Ok(res)), + Poll::Ready(Err((_, err))) => { + let kind = err.kind::().unwrap(); + Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err))) + } + Poll::Pending => Poll::Pending, + } + } +} + #[cfg(test)] mod tests { use crate::prelude::*; diff --git a/src/lib.rs b/src/lib.rs index 7b6035c7..3adc9898 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,7 +44,7 @@ mod inet_address; mod io_stream; pub use io_stream::IOStreamAsyncReadWrite; mod input_stream; -pub use input_stream::InputStreamRead; +pub use input_stream::{InputStreamAsyncBufRead, InputStreamRead}; #[cfg(any(feature = "v2_44", feature = "dox"))] mod list_store; mod memory_input_stream; diff --git a/tests/futures.rs b/tests/futures.rs index 248886e3..7a1d8535 100644 --- a/tests/futures.rs +++ b/tests/futures.rs @@ -5,3 +5,30 @@ // This target is build with edition 2018 for testing futures API. // TODO: merge to the test module of the corresponding source files once the crate // has been ported to 2018. + +use futures::prelude::*; +use gio::prelude::*; +use gio::MemoryInputStream; +use glib::Bytes; +use std::error::Error; + +#[test] +fn async_buf_read() { + async fn run() -> Result<(), Box> { + let b = Bytes::from_owned(vec![1, 2, 3]); + let mut read = MemoryInputStream::new_from_bytes(&b).into_async_buf_read(16); + let mut buf = [0u8; 10]; + + let ret = read.read(&mut buf).await?; + + assert_eq!(ret, 3); + assert_eq!(buf[0], 1); + assert_eq!(buf[1], 2); + assert_eq!(buf[2], 3); + + Ok(()) + } + + let main_context = glib::MainContext::default(); + main_context.block_on(run()).unwrap(); +}