diff --git a/Cargo.toml b/Cargo.toml index 3c7b665..f6f485e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,9 @@ documentation = "https://docs.rs/hyper-tungstenite" edition = "2021" [dependencies] -hyper = { version = "0.14.4" } +http-body-util = "0.1.0" +hyper = { version = "1.0.0" } +hyper-util = { version = "0.1.0", features = ["tokio"] } pin-project-lite = "0.2.10" tokio = "1.2.0" tokio-tungstenite = "0.20.0" @@ -20,6 +22,6 @@ tungstenite = "0.20.0" [dev-dependencies] assert2 = "0.3.4" -hyper = { version = "0.14.18", features = ["http1", "server", "tcp"] } +hyper = { version = "1.0.0", features = ["http1", "server"] } tokio = { version = "1.2.0", features = ["net", "macros", "rt-multi-thread"] } futures = { version = "0.3.12" } diff --git a/README.md b/README.md index 5e447ec..e81ecb4 100644 --- a/README.md +++ b/README.md @@ -18,15 +18,16 @@ you can manually inspect the `Connection` and `Upgrade` headers. ## Example ```rust use futures::{sink::SinkExt, stream::StreamExt}; -use hyper::{Body, Request, Response}; +use http_body_util::Full; +use hyper::{body::{Bytes, Incoming}, Request, Response}; +use hyper_util::rt::TokioIo; use hyper_tungstenite::{tungstenite, HyperWebsocket}; -use std::convert::Infallible; use tungstenite::Message; type Error = Box; /// Handle a HTTP or WebSocket request. -async fn handle_request(mut request: Request) -> Result, Error> { +async fn handle_request(mut request: Request) -> Result>, Error> { // Check if the request is a websocket upgrade request. if hyper_tungstenite::is_upgrade_request(&request) { let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)?; @@ -34,7 +35,7 @@ async fn handle_request(mut request: Request) -> Result, Er // Spawn a task to handle the websocket connection. tokio::spawn(async move { if let Err(e) = serve_websocket(websocket).await { - eprintln!("Error in websocket connection: {}", e); + eprintln!("Error in websocket connection: {e}"); } }); @@ -42,7 +43,7 @@ async fn handle_request(mut request: Request) -> Result, Er Ok(response) } else { // Handle regular HTTP requests here. - Ok(Response::new(Body::from("Hello HTTP!"))) + Ok(Response::new(Full::::from("Hello HTTP!"))) } } @@ -52,19 +53,19 @@ async fn serve_websocket(websocket: HyperWebsocket) -> Result<(), Error> { while let Some(message) = websocket.next().await { match message? { Message::Text(msg) => { - println!("Received text message: {}", msg); + println!("Received text message: {msg}"); websocket.send(Message::text("Thank you, come again.")).await?; }, Message::Binary(msg) => { - println!("Received binary message: {:02X?}", msg); + println!("Received binary message: {msg:02X?}"); websocket.send(Message::binary(b"Thank you, come again.".to_vec())).await?; }, Message::Ping(msg) => { // No need to send a reply: tungstenite takes care of this for you. - println!("Received ping message: {:02X?}", msg); + println!("Received ping message: {msg:02X?}"); }, Message::Pong(msg) => { - println!("Received pong message: {:02X?}", msg); + println!("Received pong message: {msg:02X?}"); } Message::Close(msg) => { // No need to send a reply: tungstenite takes care of this for you. @@ -74,8 +75,8 @@ async fn serve_websocket(websocket: HyperWebsocket) -> Result<(), Error> { println!("Received close message"); } }, - Message::Frame(msg) => { - unreachable!(); + Message::Frame(_msg) => { + unreachable!(); } } } @@ -86,22 +87,20 @@ async fn serve_websocket(websocket: HyperWebsocket) -> Result<(), Error> { #[tokio::main] async fn main() -> Result<(), Error> { let addr: std::net::SocketAddr = "[::1]:3000".parse()?; - println!("Listening on http://{}", addr); let listener = tokio::net::TcpListener::bind(&addr).await?; - println!("listening on {}", addr); + println!("Listening on http://{addr}"); - let mut http = hyper::server::conn::Http::new(); - http.http1_only(true); - http.http1_keep_alive(true); + let mut http = hyper::server::conn::http1::Builder::new(); + http.keep_alive(true); loop { let (stream, _) = listener.accept().await?; let connection = http - .serve_connection(stream, hyper::service::service_fn(handle_request)) + .serve_connection(TokioIo::new(stream), hyper::service::service_fn(handle_request)) .with_upgrades(); tokio::spawn(async move { if let Err(err) = connection.await { - println!("Error serving HTTP connection: {:?}", err); + println!("Error serving HTTP connection: {err:?}"); } }); } diff --git a/examples/server.rs b/examples/server.rs index 2973b16..0ab1d6f 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -1,12 +1,14 @@ use futures::{sink::SinkExt, stream::StreamExt}; -use hyper::{Body, Request, Response}; +use http_body_util::Full; +use hyper::{body::{Bytes, Incoming}, Request, Response}; +use hyper_util::rt::TokioIo; use hyper_tungstenite::{tungstenite, HyperWebsocket}; use tungstenite::Message; type Error = Box; /// Handle a HTTP or WebSocket request. -async fn handle_request(mut request: Request) -> Result, Error> { +async fn handle_request(mut request: Request) -> Result>, Error> { // Check if the request is a websocket upgrade request. if hyper_tungstenite::is_upgrade_request(&request) { let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)?; @@ -22,7 +24,7 @@ async fn handle_request(mut request: Request) -> Result, Er Ok(response) } else { // Handle regular HTTP requests here. - Ok(Response::new(Body::from("Hello HTTP!"))) + Ok(Response::new(Full::::from("Hello HTTP!"))) } } @@ -69,14 +71,13 @@ async fn main() -> Result<(), Error> { let listener = tokio::net::TcpListener::bind(&addr).await?; println!("Listening on http://{addr}"); - let mut http = hyper::server::conn::Http::new(); - http.http1_only(true); - http.http1_keep_alive(true); + let mut http = hyper::server::conn::http1::Builder::new(); + http.keep_alive(true); loop { let (stream, _) = listener.accept().await?; let connection = http - .serve_connection(stream, hyper::service::service_fn(handle_request)) + .serve_connection(TokioIo::new(stream), hyper::service::service_fn(handle_request)) .with_upgrades(); tokio::spawn(async move { if let Err(err) = connection.await { diff --git a/src/lib.rs b/src/lib.rs index 492324b..6d51917 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,15 +13,16 @@ //! # Example //! ```no_run //! use futures::{sink::SinkExt, stream::StreamExt}; -//! use hyper::{Body, Request, Response}; +//! use http_body_util::Full; +//! use hyper::{body::{Bytes, Incoming}, Request, Response}; +//! use hyper_util::rt::TokioIo; //! use hyper_tungstenite::{tungstenite, HyperWebsocket}; -//! use std::convert::Infallible; //! use tungstenite::Message; //! //! type Error = Box; //! //! /// Handle a HTTP or WebSocket request. -//! async fn handle_request(mut request: Request) -> Result, Error> { +//! async fn handle_request(mut request: Request) -> Result>, Error> { //! // Check if the request is a websocket upgrade request. //! if hyper_tungstenite::is_upgrade_request(&request) { //! let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)?; @@ -29,7 +30,7 @@ //! // Spawn a task to handle the websocket connection. //! tokio::spawn(async move { //! if let Err(e) = serve_websocket(websocket).await { -//! eprintln!("Error in websocket connection: {}", e); +//! eprintln!("Error in websocket connection: {e}"); //! } //! }); //! @@ -37,7 +38,7 @@ //! Ok(response) //! } else { //! // Handle regular HTTP requests here. -//! Ok(Response::new(Body::from("Hello HTTP!"))) +//! Ok(Response::new(Full::::from("Hello HTTP!"))) //! } //! } //! @@ -47,19 +48,19 @@ //! while let Some(message) = websocket.next().await { //! match message? { //! Message::Text(msg) => { -//! println!("Received text message: {}", msg); +//! println!("Received text message: {msg}"); //! websocket.send(Message::text("Thank you, come again.")).await?; //! }, //! Message::Binary(msg) => { -//! println!("Received binary message: {:02X?}", msg); +//! println!("Received binary message: {msg:02X?}"); //! websocket.send(Message::binary(b"Thank you, come again.".to_vec())).await?; //! }, //! Message::Ping(msg) => { //! // No need to send a reply: tungstenite takes care of this for you. -//! println!("Received ping message: {:02X?}", msg); +//! println!("Received ping message: {msg:02X?}"); //! }, //! Message::Pong(msg) => { -//! println!("Received pong message: {:02X?}", msg); +//! println!("Received pong message: {msg:02X?}"); //! } //! Message::Close(msg) => { //! // No need to send a reply: tungstenite takes care of this for you. @@ -69,8 +70,8 @@ //! println!("Received close message"); //! } //! }, -//! Message::Frame(msg) => { -//! unreachable!(); +//! Message::Frame(_msg) => { +//! unreachable!(); //! } //! } //! } @@ -81,29 +82,30 @@ //! #[tokio::main] //! async fn main() -> Result<(), Error> { //! let addr: std::net::SocketAddr = "[::1]:3000".parse()?; -//! println!("Listening on http://{}", addr); //! let listener = tokio::net::TcpListener::bind(&addr).await?; -//! println!("listening on {}", addr); +//! println!("Listening on http://{addr}"); //! -//! let mut http = hyper::server::conn::Http::new(); -//! http.http1_only(true); -//! http.http1_keep_alive(true); +//! let mut http = hyper::server::conn::http1::Builder::new(); +//! http.keep_alive(true); //! //! loop { //! let (stream, _) = listener.accept().await?; //! let connection = http -//! .serve_connection(stream, hyper::service::service_fn(handle_request)) +//! .serve_connection(TokioIo::new(stream), hyper::service::service_fn(handle_request)) //! .with_upgrades(); //! tokio::spawn(async move { //! if let Err(err) = connection.await { -//! println!("Error serving HTTP connection: {:?}", err); +//! println!("Error serving HTTP connection: {err:?}"); //! } //! }); //! } //! } //! ``` -use hyper::{Body, Request, Response}; +use http_body_util::Full; +use hyper::body::Bytes; +use hyper::{Request, Response}; +use hyper_util::rt::TokioIo; use std::task::{Context, Poll}; use std::pin::Pin; use pin_project_lite::pin_project; @@ -144,7 +146,7 @@ pin_project! { pub fn upgrade( mut request: impl std::borrow::BorrowMut>, config: Option, -) -> Result<(Response, HyperWebsocket), ProtocolError> { +) -> Result<(Response>, HyperWebsocket), ProtocolError> { let request = request.borrow_mut(); let key = request.headers().get("Sec-WebSocket-Key") @@ -158,7 +160,7 @@ pub fn upgrade( .header(hyper::header::CONNECTION, "upgrade") .header(hyper::header::UPGRADE, "websocket") .header("Sec-WebSocket-Accept", &derive_accept_key(key.as_bytes())) - .body(Body::from("switching to websocket protocol")) + .body(Full::::from("switching to websocket protocol")) .expect("bug: failed to build response"); let stream = HyperWebsocket { @@ -212,7 +214,7 @@ fn trim_end(data: &[u8]) -> &[u8] { } impl std::future::Future for HyperWebsocket { - type Output = Result, Error>; + type Output = Result>, Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let this = self.project(); @@ -224,7 +226,7 @@ impl std::future::Future for HyperWebsocket { let upgraded = upgraded.map_err(|_| Error::Protocol(ProtocolError::HandshakeIncomplete))?; let stream = WebSocketStream::from_raw_socket( - upgraded, + TokioIo::new(upgraded), Role::Server, this.config.take(), ); diff --git a/tests/simple-server.rs b/tests/simple-server.rs index 374225b..3effd08 100644 --- a/tests/simple-server.rs +++ b/tests/simple-server.rs @@ -1,6 +1,8 @@ -use hyper::{Body, Request, Response}; -use hyper::server::Server; -use hyper::service::{service_fn, make_service_fn}; +use http_body_util::Full; +use hyper::{Request, Response}; +use hyper::body::{Bytes, Incoming}; +use hyper::service::service_fn; +use hyper_util::rt::TokioIo; use hyper_tungstenite::tungstenite::Error; use tokio::net::TcpStream; use std::net::Ipv6Addr; @@ -13,16 +15,15 @@ use assert2::{assert, let_assert}; #[tokio::test] async fn hyper() { // Bind a TCP listener to an ephemeral port. - let_assert!(Ok(listener) = std::net::TcpListener::bind((Ipv6Addr::LOCALHOST, 0u16))); + let_assert!(Ok(listener) = tokio::net::TcpListener::bind((Ipv6Addr::LOCALHOST, 0u16)).await); let_assert!(Ok(bind_addr) = listener.local_addr()); - let_assert!(Ok(server) = Server::from_tcp(listener)); + let server = hyper::server::conn::http1::Builder::new(); // Spawn the server in a task. tokio::spawn(async move { - let service = make_service_fn(|_conn| async { - Ok::<_, hyper::Error>(service_fn(upgrade_websocket)) - }); - let_assert!(Ok(()) = server.http1_only(true).serve(service).await); + let service = service_fn(upgrade_websocket); + let_assert!(Ok((stream, _)) = listener.accept().await); + let_assert!(Ok(()) = server.serve_connection(TokioIo::new(stream), service).with_upgrades().await); }); // Try to create a websocket connection with the server. @@ -36,7 +37,7 @@ async fn hyper() { assert!(let Some(Ok(Message::Close(None))) = stream.next().await); } -async fn upgrade_websocket(request: Request) -> Result> { +async fn upgrade_websocket(request: Request) -> Result>> { assert!(hyper_tungstenite::is_upgrade_request(&request) == true); let (response, stream) = hyper_tungstenite::upgrade(request, None) diff --git a/tests/split.rs b/tests/split.rs index bb4c234..6536253 100644 --- a/tests/split.rs +++ b/tests/split.rs @@ -1,6 +1,8 @@ -use hyper::{Body, Request, Response}; -use hyper::server::Server; -use hyper::service::{service_fn, make_service_fn}; +use http_body_util::Full; +use hyper::{Request, Response}; +use hyper::body::{Bytes, Incoming}; +use hyper::service::service_fn; +use hyper_util::rt::TokioIo; use hyper_tungstenite::tungstenite::Error; use tokio::net::TcpStream; use std::net::Ipv6Addr; @@ -13,16 +15,15 @@ use assert2::{assert, let_assert}; #[tokio::test] async fn hyper() { // Bind a TCP listener to an ephemeral port. - let_assert!(Ok(listener) = std::net::TcpListener::bind((Ipv6Addr::LOCALHOST, 0u16))); + let_assert!(Ok(listener) = tokio::net::TcpListener::bind((Ipv6Addr::LOCALHOST, 0u16)).await); let_assert!(Ok(bind_addr) = listener.local_addr()); - let_assert!(Ok(server) = Server::from_tcp(listener)); + let server = hyper::server::conn::http1::Builder::new(); // Spawn the server in a task. tokio::spawn(async move { - let service = make_service_fn(|_conn| async { - Ok::<_, hyper::Error>(service_fn(upgrade_websocket)) - }); - let_assert!(Ok(()) = server.http1_only(true).serve(service).await); + let service = service_fn(upgrade_websocket); + let_assert!(Ok((stream, _)) = listener.accept().await); + let_assert!(Ok(()) = server.serve_connection(TokioIo::new(stream), service).with_upgrades().await); }); // Try to create a websocket connection with the server. @@ -36,7 +37,7 @@ async fn hyper() { assert!(let Some(Ok(Message::Close(None))) = stream.next().await); } -async fn upgrade_websocket(mut request: Request) -> Result> { +async fn upgrade_websocket(mut request: Request) -> Result>> { assert!(hyper_tungstenite::is_upgrade_request(&request) == true); let (response, stream) = hyper_tungstenite::upgrade(&mut request, None) diff --git a/tests/upgrade-request-by-value.rs b/tests/upgrade-request-by-value.rs index 84ca3f4..4a04d65 100644 --- a/tests/upgrade-request-by-value.rs +++ b/tests/upgrade-request-by-value.rs @@ -1,6 +1,8 @@ -use hyper::{Body, Request, Response}; -use hyper::server::Server; -use hyper::service::{service_fn, make_service_fn}; +use http_body_util::Full; +use hyper::{Request, Response}; +use hyper::body::{Bytes, Incoming}; +use hyper::service::service_fn; +use hyper_util::rt::TokioIo; use hyper_tungstenite::tungstenite::Error; use tokio::net::TcpStream; use std::net::Ipv6Addr; @@ -13,16 +15,15 @@ use assert2::{assert, let_assert}; #[tokio::test] async fn hyper() { // Bind a TCP listener to an ephemeral port. - let_assert!(Ok(listener) = std::net::TcpListener::bind((Ipv6Addr::LOCALHOST, 0u16))); + let_assert!(Ok(listener) = tokio::net::TcpListener::bind((Ipv6Addr::LOCALHOST, 0u16)).await); let_assert!(Ok(bind_addr) = listener.local_addr()); - let_assert!(Ok(server) = Server::from_tcp(listener)); + let server = hyper::server::conn::http1::Builder::new(); // Spawn the server in a task. tokio::spawn(async move { - let service = make_service_fn(|_conn| async { - Ok::<_, hyper::Error>(service_fn(upgrade_websocket)) - }); - let_assert!(Ok(()) = server.http1_only(true).serve(service).await); + let service = service_fn(upgrade_websocket); + let_assert!(Ok((stream, _)) = listener.accept().await); + let_assert!(Ok(()) = server.serve_connection(TokioIo::new(stream), service).with_upgrades().await); }); // Try to create a websocket connection with the server. @@ -36,7 +37,7 @@ async fn hyper() { assert!(let Some(Ok(Message::Close(None))) = stream.next().await); } -async fn upgrade_websocket(mut request: Request) -> Result> { +async fn upgrade_websocket(mut request: Request) -> Result>> { assert!(hyper_tungstenite::is_upgrade_request(&request) == true); let (response, stream) = hyper_tungstenite::upgrade(&mut request, None)