extern crate futures; extern crate futures_io; extern crate futures_mio; use std::net::ToSocketAddrs; use std::io; use futures::Future; use futures::Poll; use futures::stream::Stream; use futures_io::IoFuture; use futures_io::TaskIo; use futures_mio::Loop; struct ChunkStream { future: IoFuture<(A,Vec)>, } impl ChunkStream where A: io::Read + Send + 'static { pub fn new(socket: A) -> ChunkStream { let future = ChunkStream::new_future(socket); ChunkStream { future: future, } } fn new_future(socket: A) -> IoFuture<(A,Vec)> { let buf = vec![0, 0, 0, 0]; let future = futures_io::read_exact(socket, buf); Box::new(future) } } impl Stream for ChunkStream where A: io::Read + Send + 'static { type Item = Vec; type Error = io::Error; fn poll (&mut self) -> Poll, Self::Error> { match self.future.poll() { Poll::NotReady => Poll::NotReady, Poll::Err(x) => Poll::Err(x), Poll::Ok((socket,item)) => { self.future = ChunkStream::new_future(socket); Poll::Ok(Some(item)) }, } } } fn start_background(socket: A) where A: io::Write + io::Read + Send + 'static { futures::lazy(|| { let (read, _) = TaskIo::new(socket).split(); ChunkStream::new(read).for_each(|chunk| { println!("chunk {:?}", chunk); Ok(()) }).forget(); futures::finished::<(),()>(()) }).forget(); } fn main() { let mut lp = Loop::new().unwrap(); let addr = "localhost:1234".to_socket_addrs().unwrap().next().unwrap(); let future_socket = lp.handle().tcp_connect(&addr); let socket = lp.run(future_socket).unwrap(); start_background(socket); }