diff --git a/examples/akamai.rs b/examples/akamai.rs index 0f8f7ce20..2222c97de 100644 --- a/examples/akamai.rs +++ b/examples/akamai.rs @@ -1,3 +1,10 @@ +fn main() { + // Enable the below code once tokio_rustls moves to std::future +} + +/* +#![feature(async_await)] + use h2::client; use futures::*; @@ -10,10 +17,12 @@ use tokio_rustls::ClientConfigExt; use webpki::DNSNameRef; use std::net::ToSocketAddrs; +use std::error::Error; const ALPN_H2: &str = "h2"; -pub fn main() { +#[tokio::main] +pub async fn main() -> Result<(), Box> { let _ = env_logger::try_init(); let tls_client_config = std::sync::Arc::new({ @@ -33,49 +42,30 @@ pub fn main() { println!("ADDR: {:?}", addr); - let tcp = TcpStream::connect(&addr); + let tcp = TcpStream::connect(&addr).await?; let dns_name = DNSNameRef::try_from_ascii_str("http2.akamai.com").unwrap(); - - let tcp = tcp.then(move |res| { - let tcp = res.unwrap(); - tls_client_config - .connect_async(dns_name, tcp) - .then(|res| { - let tls = res.unwrap(); - { - let (_, session) = tls.get_ref(); - let negotiated_protocol = session.get_alpn_protocol(); - assert_eq!(Some(ALPN_H2), negotiated_protocol.as_ref().map(|x| &**x)); - } - - println!("Starting client handshake"); - client::handshake(tls) - }) - .then(|res| { - let (mut client, h2) = res.unwrap(); - - let request = Request::builder() + let res = tls_client_config.connect_async(dns_name, tcp).await; + let tls = res.unwrap(); + { + let (_, session) = tls.get_ref(); + let negotiated_protocol = session.get_alpn_protocol(); + assert_eq!(Some(ALPN_H2), negotiated_protocol.as_ref().map(|x| &**x)); + } + + println!("Starting client handshake"); + let (mut client, h2) = client::handshake(tls).await?; + + let request = Request::builder() .method(Method::GET) .uri("https://http2.akamai.com/") .body(()) .unwrap(); - let (response, _) = client.send_request(request, true).unwrap(); - - let stream = response.and_then(|response| { - let (_, body) = response.into_parts(); - - body.for_each(|chunk| { - println!("RX: {:?}", chunk); - Ok(()) - }) - }); - - h2.join(stream) - }) - }) - .map_err(|e| eprintln!("ERROR: {:?}", e)) - .map(|((), ())| ()); - - tokio::run(tcp); + let (response, _) = client.send_request(request, true).unwrap(); + let (_, mut body) = response.await?.into_parts(); + while let Some(chunk) = body.next().await { + println!("RX: {:?}", chunk?); + } + Ok(()) } +*/ diff --git a/examples/client.rs b/examples/client.rs index 53a014f92..0f5ff2935 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,8 +1,14 @@ +#![feature(async_await)] + +use futures::{ready, Stream}; use h2::client; use h2::RecvStream; +use http::{HeaderMap, Request}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; -use futures::*; -use http::*; +use std::error::Error; use tokio::net::TcpStream; @@ -12,76 +18,70 @@ struct Process { } impl Future for Process { - type Item = (); - type Error = h2::Error; + type Output = Result<(), h2::Error>; - fn poll(&mut self) -> Poll<(), h2::Error> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { if self.trailers { - let trailers = try_ready!(self.body.poll_trailers()); + let trailers = ready!(self.body.poll_trailers(cx)); println!("GOT TRAILERS: {:?}", trailers); - return Ok(().into()); + return Poll::Ready(Ok(())); } else { - match try_ready!(self.body.poll()) { - Some(chunk) => { + match ready!(Pin::new(&mut self.body).poll_next(cx)) { + Some(Ok(chunk)) => { println!("GOT CHUNK = {:?}", chunk); - }, + } + Some(Err(e)) => return Poll::Ready(Err(e)), None => { self.trailers = true; - }, + } } } } } } -pub fn main() { +#[tokio::main] +pub async fn main() -> Result<(), Box> { let _ = env_logger::try_init(); - let tcp = TcpStream::connect(&"127.0.0.1:5928".parse().unwrap()); - - let tcp = tcp.then(|res| { - let tcp = res.unwrap(); - client::handshake(tcp) - }).then(|res| { - let (mut client, h2) = res.unwrap(); + let tcp = TcpStream::connect(&"127.0.0.1:5928".parse().unwrap()).await?; + let (mut client, h2) = client::handshake(tcp).await?; - println!("sending request"); + println!("sending request"); - let request = Request::builder() - .uri("https://http2.akamai.com/") - .body(()) - .unwrap(); + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()) + .unwrap(); - let mut trailers = HeaderMap::new(); - trailers.insert("zomg", "hello".parse().unwrap()); + let mut trailers = HeaderMap::new(); + trailers.insert("zomg", "hello".parse().unwrap()); - let (response, mut stream) = client.send_request(request, false).unwrap(); + let (response, mut stream) = client.send_request(request, false).unwrap(); - // send trailers - stream.send_trailers(trailers).unwrap(); + // send trailers + stream.send_trailers(trailers).unwrap(); - // Spawn a task to run the conn... - tokio::spawn(h2.map_err(|e| println!("GOT ERR={:?}", e))); - - response - .and_then(|response| { - println!("GOT RESPONSE: {:?}", response); + // Spawn a task to run the conn... + tokio::spawn(async move { + if let Err(e) = h2.await { + println!("GOT ERR={:?}", e); + } + }); - // Get the body - let (_, body) = response.into_parts(); + let response = response.await?; + println!("GOT RESPONSE: {:?}", response); - Process { - body, - trailers: false, - } - }) - .map_err(|e| { - println!("GOT ERR={:?}", e); - }) - }); + // Get the body + let (_, body) = response.into_parts(); - tokio::run(tcp); + Process { + body, + trailers: false, + } + .await?; + Ok(()) } diff --git a/examples/server.rs b/examples/server.rs index 89e66591b..870e23c72 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -1,62 +1,50 @@ +#![feature(async_await)] + use h2::server; use bytes::*; use futures::*; -use http::*; +use http::{Response, StatusCode}; -use tokio::net::TcpListener; +use tokio::net::{TcpListener, TcpStream}; +use std::error::Error; -pub fn main() { +#[tokio::main] +pub async fn main() -> Result<(), Box> { let _ = env_logger::try_init(); let listener = TcpListener::bind(&"127.0.0.1:5928".parse().unwrap()).unwrap(); println!("listening on {:?}", listener.local_addr()); + let mut incoming = listener.incoming(); + + while let Some(socket) = incoming.next().await { + tokio::spawn(async move { + if let Err(e) = handle(socket).await { + println!(" -> err={:?}", e); + } + }); + } - let server = listener.incoming().for_each(move |socket| { - // let socket = io_dump::Dump::to_stdout(socket); - - let connection = server::handshake(socket) - .and_then(|conn| { - println!("H2 connection bound"); - - conn.for_each(|(request, mut respond)| { - println!("GOT request: {:?}", request); - - let response = Response::builder().status(StatusCode::OK).body(()).unwrap(); - - let mut send = match respond.send_response(response, false) { - Ok(send) => send, - Err(e) => { - println!(" error respond; err={:?}", e); - return Ok(()); - } - }; - - println!(">>>> sending data"); - if let Err(e) = send.send_data(Bytes::from_static(b"hello world"), true) { - println!(" -> err={:?}", e); - } - - Ok(()) - }) - }) - .and_then(|_| { - println!("~~~~~~~~~~~~~~~~~~~~~~~~~~~ H2 connection CLOSE !!!!!! ~~~~~~~~~~~"); - Ok(()) - }) - .then(|res| { - if let Err(e) = res { - println!(" -> err={:?}", e); - } - - Ok(()) - }); - - tokio::spawn(Box::new(connection)); - Ok(()) - }) - .map_err(|e| eprintln!("accept error: {}", e)); - - tokio::run(server); + Ok(()) } + +async fn handle(socket: io::Result) -> Result<(), Box> { + let mut connection = server::handshake(socket?).await?; + println!("H2 connection bound"); + + while let Some(result) = connection.next().await { + let (request, mut respond) = result?; + println!("GOT request: {:?}", request); + let response = Response::builder().status(StatusCode::OK).body(()).unwrap(); + + let mut send = respond.send_response(response, false)?; + + println!(">>>> sending data"); + send.send_data(Bytes::from_static(b"hello world"), true)?; + } + + println!("~~~~~~~~~~~~~~~~~~~~~~~~~~~ H2 connection CLOSE !!!!!! ~~~~~~~~~~~"); + + Ok(()) +} \ No newline at end of file