Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillaume Desmottes committed May 18, 2020
1 parent 725f93a commit be9f0ad
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ git = "https://github.com/gtk-rs/lgpl-docs"
libc = "0.2"
bitflags = "1.0"
once_cell = "1.0"
futures = "0.3"
futures-core = "0.3"
futures-channel = "0.3"
futures-util = "0.3"
Expand Down
51 changes: 51 additions & 0 deletions src/input_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
// Licensed under the MIT license, see the LICENSE file or <http://opensource.org/licenses/MIT>

use error::to_std_io_result;
use futures_core::task::{Context, Poll};
use futures_io::AsyncRead;
use futures_util::future::FutureExt;
use gio_sys;
use glib::object::IsA;
use glib::translate::*;
Expand Down Expand Up @@ -81,6 +84,16 @@ pub trait InputStreamExtManual: Sized {
{
InputStreamRead(self)
}

fn into_async_buf_read(self, buffer_size: usize) -> InputStreamAsyncBufRead<Self>
where
Self: IsA<InputStream>,
{
let mut buffer = Vec::with_capacity(buffer_size);
buffer.resize(buffer_size, 0);

InputStreamAsyncBufRead(self, buffer)
}
}

impl<O: IsA<InputStream>> InputStreamExtManual for O {
Expand Down Expand Up @@ -341,6 +354,44 @@ impl<T: IsA<InputStream> + IsA<Seekable>> io::Seek for InputStreamRead<T> {
}
}

#[derive(Debug)]
pub struct InputStreamAsyncBufRead<T: IsA<InputStream>>(T, Vec<u8>);

impl<T: IsA<InputStream>> InputStreamAsyncBufRead<T> {
pub fn into_input_stream(self) -> T {
self.0
}

pub fn input_stream(&self) -> &T {
&self.0
}
}

impl<T: IsA<InputStream>> AsyncRead for InputStreamAsyncBufRead<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let stream = Pin::get_ref(self.as_ref());
let my_buf = vec![0; 10];

This comment has been minimized.

Copy link
@sdroege

sdroege May 18, 2020

The idea would be to store a fixed-size buffer inside the adapter directly, probably a configurable buffer size. And then re-use that for any read requests

This comment has been minimized.

Copy link
@gdesmott

gdesmott May 18, 2020

Owner

Yes that was my plan as well but I didn't figure out how to do that as read_async_future consume the buffer.

This comment has been minimized.

Copy link
@sdroege

sdroege May 18, 2020

It gives you back the buffer afterwards :) You would switch between two different states: a) have a buffer of N bytes readable, b) have no buffer but a pending read operation that we're waiting for

This comment has been minimized.

Copy link
@gdesmott

gdesmott May 18, 2020

Owner

That's exactly what I'm implementing atm, good to know that's the right path. Thanks! :)

let mut fut = stream
.0
.as_ref()
.read_async_future(my_buf, Priority::default());

match dbg!(fut.poll_unpin(cx)) {

This comment has been minimized.

Copy link
@sdroege

sdroege May 18, 2020

You create a new read_async_future() here every time, and then poll the new one while the old one was dropped after returning Pending here (which would then cancel the gio::Cancellable).

You need to keep a pending read request around.

// FIXME: copy to @buf
Poll::Ready(Ok((out_buf, res))) => Poll::Ready(Ok(res)),
Poll::Ready(Err((_, err))) => {
let kind = err.kind::<crate::IOErrorEnum>().unwrap();
Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
}
Poll::Pending => Poll::Pending,
}
}
}

#[cfg(test)]
mod tests {
use crate::prelude::*;
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ mod inet_address;
mod io_stream;
pub use io_stream::IOStreamAsyncReadWrite;
mod input_stream;
pub use input_stream::InputStreamRead;
pub use input_stream::{InputStreamAsyncBufRead, InputStreamRead};
#[cfg(any(feature = "v2_44", feature = "dox"))]
mod list_store;
mod memory_input_stream;
Expand Down
27 changes: 27 additions & 0 deletions tests/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,30 @@
// This target is build with edition 2018 for testing futures API.
// TODO: merge to the test module of the corresponding source files once the crate
// has been ported to 2018.

use futures::prelude::*;
use gio::prelude::*;
use gio::MemoryInputStream;
use glib::Bytes;
use std::error::Error;

#[test]
fn async_buf_read() {
async fn run() -> Result<(), Box<dyn Error>> {
let b = Bytes::from_owned(vec![1, 2, 3]);
let mut read = MemoryInputStream::new_from_bytes(&b).into_async_buf_read(16);
let mut buf = [0u8; 10];

let ret = read.read(&mut buf).await?;

assert_eq!(ret, 3);
assert_eq!(buf[0], 1);
assert_eq!(buf[1], 2);
assert_eq!(buf[2], 3);

Ok(())
}

let main_context = glib::MainContext::default();
main_context.block_on(run()).unwrap();
}

0 comments on commit be9f0ad

Please sign in to comment.