Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(body): remove "full" constructors from Body #2958

Merged
merged 1 commit into from Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 0 additions & 5 deletions Cargo.toml
Expand Up @@ -181,11 +181,6 @@ name = "state"
path = "examples/state.rs"
required-features = ["full"]

[[example]]
name = "tower_server"
path = "examples/tower_server.rs"
required-features = ["full"]

[[example]]
name = "upgrades"
path = "examples/upgrades.rs"
Expand Down
9 changes: 7 additions & 2 deletions benches/pipeline.rs
Expand Up @@ -3,17 +3,20 @@

extern crate test;

use std::convert::Infallible;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpStream};
use std::sync::mpsc;
use std::time::Duration;

use bytes::Bytes;
use http_body_util::Full;
use tokio::net::TcpListener;
use tokio::sync::oneshot;

use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Response};
use hyper::Response;

const PIPELINED_REQUESTS: usize = 16;

Expand Down Expand Up @@ -43,7 +46,9 @@ fn hello_world_16(b: &mut test::Bencher) {
.serve_connection(
stream,
service_fn(|_| async {
Ok::<_, hyper::Error>(Response::new(Body::from("Hello, World!")))
Ok::<_, Infallible>(Response::new(Full::new(Bytes::from(
"Hello, World!",
))))
}),
)
.await
Expand Down
7 changes: 4 additions & 3 deletions benches/server.rs
Expand Up @@ -8,8 +8,9 @@ use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc;
use std::time::Duration;

use bytes::Bytes;
use futures_util::{stream, StreamExt};
use http_body_util::{BodyExt, StreamBody};
use http_body_util::{BodyExt, Full, StreamBody};
use tokio::sync::oneshot;

use hyper::server::conn::Http;
Expand Down Expand Up @@ -87,8 +88,8 @@ macro_rules! bench_server {
}};
}

fn body(b: &'static [u8]) -> hyper::Body {
b.into()
fn body(b: &'static [u8]) -> Full<Bytes> {
Full::new(b.into())
}

#[bench]
Expand Down
6 changes: 4 additions & 2 deletions examples/client.rs
Expand Up @@ -2,7 +2,9 @@
#![warn(rust_2018_idioms)]
use std::env;

use hyper::{body::HttpBody as _, Body, Request};
use bytes::Bytes;
use http_body_util::Empty;
use hyper::{body::HttpBody as _, Request};
use tokio::io::{self, AsyncWriteExt as _};
use tokio::net::TcpStream;

Expand Down Expand Up @@ -51,7 +53,7 @@ async fn fetch_url(url: hyper::Uri) -> Result<()> {
let req = Request::builder()
.uri(url)
.header(hyper::header::HOST, authority.as_str())
.body(Body::empty())?;
.body(Empty::<Bytes>::new())?;

let mut res = sender.send_request(req).await?;

Expand Down
5 changes: 3 additions & 2 deletions examples/client_json.rs
@@ -1,7 +1,8 @@
#![deny(warnings)]
#![warn(rust_2018_idioms)]

use hyper::Body;
use bytes::Bytes;
use http_body_util::Empty;
use hyper::{body::Buf, Request};
use serde::Deserialize;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -42,7 +43,7 @@ async fn fetch_json(url: hyper::Uri) -> Result<Vec<User>> {
let req = Request::builder()
.uri(url)
.header(hyper::header::HOST, authority.as_str())
.body(Body::empty())?;
.body(Empty::<Bytes>::new())?;

let res = sender.send_request(req).await?;

Expand Down
26 changes: 20 additions & 6 deletions examples/echo.rs
Expand Up @@ -2,6 +2,8 @@

use std::net::SocketAddr;

use bytes::Bytes;
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
use hyper::body::HttpBody as _;
use hyper::server::conn::Http;
use hyper::service::service_fn;
Expand All @@ -10,15 +12,15 @@ use tokio::net::TcpListener;

/// This is our service handler. It receives a Request, routes on its
/// path, and returns a Future of a Response.
async fn echo(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
async fn echo(req: Request<Body>) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
match (req.method(), req.uri().path()) {
// Serve some instructions at /
(&Method::GET, "/") => Ok(Response::new(Body::from(
(&Method::GET, "/") => Ok(Response::new(full(
"Try POSTing data to /echo such as: `curl localhost:3000/echo -XPOST -d 'hello world'`",
))),

// Simply echo the body back to the client.
(&Method::POST, "/echo") => Ok(Response::new(req.into_body())),
(&Method::POST, "/echo") => Ok(Response::new(req.into_body().boxed())),

// TODO: Fix this, broken in PR #2896
// Convert to uppercase before sending back to client using a stream.
Expand All @@ -43,26 +45,38 @@ async fn echo(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
// 64kbs of data.
let max = req.body().size_hint().upper().unwrap_or(u64::MAX);
if max > 1024 * 64 {
let mut resp = Response::new(Body::from("Body too big"));
let mut resp = Response::new(full("Body too big"));
*resp.status_mut() = hyper::StatusCode::PAYLOAD_TOO_LARGE;
return Ok(resp);
}

let whole_body = hyper::body::to_bytes(req.into_body()).await?;

let reversed_body = whole_body.iter().rev().cloned().collect::<Vec<u8>>();
Ok(Response::new(Body::from(reversed_body)))
Ok(Response::new(full(reversed_body)))
}

// Return the 404 Not Found for other routes.
_ => {
let mut not_found = Response::default();
let mut not_found = Response::new(empty());
*not_found.status_mut() = StatusCode::NOT_FOUND;
Ok(not_found)
}
}
}

fn empty() -> BoxBody<Bytes, hyper::Error> {
Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed()
}

fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
Full::new(chunk.into())
.map_err(|never| match never {})
.boxed()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
Expand Down
6 changes: 4 additions & 2 deletions examples/hello.rs
Expand Up @@ -3,13 +3,15 @@
use std::convert::Infallible;
use std::net::SocketAddr;

use bytes::Bytes;
use http_body_util::Full;
use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Request, Response};
use tokio::net::TcpListener;

async fn hello(_: Request<Body>) -> Result<Response<Body>, Infallible> {
Ok(Response::new(Body::from("Hello World!")))
async fn hello(_: Request<Body>) -> Result<Response<Full<Bytes>>, Infallible> {
Ok(Response::new(Full::new(Bytes::from("Hello World!"))))
}

#[tokio::main]
Expand Down
23 changes: 19 additions & 4 deletions examples/http_proxy.rs
Expand Up @@ -2,6 +2,8 @@

use std::net::SocketAddr;

use bytes::Bytes;
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
use hyper::client::conn::Builder;
use hyper::server::conn::Http;
use hyper::service::service_fn;
Expand Down Expand Up @@ -41,7 +43,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}

async fn proxy(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
async fn proxy(req: Request<Body>) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
println!("req: {:?}", req);

if Method::CONNECT == req.method() {
Expand Down Expand Up @@ -70,10 +72,10 @@ async fn proxy(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
}
});

Ok(Response::new(Body::empty()))
Ok(Response::new(empty()))
} else {
eprintln!("CONNECT host is not socket addr: {:?}", req.uri());
let mut resp = Response::new(Body::from("CONNECT must be to a socket address"));
let mut resp = Response::new(full("CONNECT must be to a socket address"));
*resp.status_mut() = http::StatusCode::BAD_REQUEST;

Ok(resp)
Expand All @@ -96,14 +98,27 @@ async fn proxy(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
}
});

sender.send_request(req).await
let resp = sender.send_request(req).await?;
Ok(resp.map(|b| b.boxed()))
}
}

fn host_addr(uri: &http::Uri) -> Option<String> {
uri.authority().and_then(|auth| Some(auth.to_string()))
}

fn empty() -> BoxBody<Bytes, hyper::Error> {
Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed()
}

fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
Full::new(chunk.into())
.map_err(|never| match never {})
.boxed()
}

// Create a TCP connection to host:port, build a tunnel between the connection and
// the upgraded connection
async fn tunnel(mut upgraded: Upgraded, addr: String) -> std::io::Result<()> {
Expand Down
10 changes: 6 additions & 4 deletions examples/multi_server.rs
Expand Up @@ -3,7 +3,9 @@

use std::net::SocketAddr;

use bytes::Bytes;
use futures_util::future::join;
use http_body_util::Full;
use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Request, Response};
Expand All @@ -12,12 +14,12 @@ use tokio::net::TcpListener;
static INDEX1: &[u8] = b"The 1st service!";
static INDEX2: &[u8] = b"The 2nd service!";

async fn index1(_: Request<Body>) -> Result<Response<Body>, hyper::Error> {
Ok(Response::new(Body::from(INDEX1)))
async fn index1(_: Request<Body>) -> Result<Response<Full<Bytes>>, hyper::Error> {
Ok(Response::new(Full::new(Bytes::from(INDEX1))))
}

async fn index2(_: Request<Body>) -> Result<Response<Body>, hyper::Error> {
Ok(Response::new(Body::from(INDEX2)))
async fn index2(_: Request<Body>) -> Result<Response<Full<Bytes>>, hyper::Error> {
Ok(Response::new(Full::new(Bytes::from(INDEX2))))
}

#[tokio::main]
Expand Down
33 changes: 23 additions & 10 deletions examples/params.rs
@@ -1,12 +1,15 @@
// #![deny(warnings)] // FIXME: https://github.com/rust-lang/rust/issues/62411
#![warn(rust_2018_idioms)]

use bytes::Bytes;
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Method, Request, Response, StatusCode};
use tokio::net::TcpListener;

use std::collections::HashMap;
use std::convert::Infallible;
use std::net::SocketAddr;
use url::form_urlencoded;

Expand All @@ -15,9 +18,11 @@ static MISSING: &[u8] = b"Missing field";
static NOTNUMERIC: &[u8] = b"Number field is not numeric";

// Using service_fn, we can turn this function into a `Service`.
async fn param_example(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
async fn param_example(
req: Request<Body>,
) -> Result<Response<BoxBody<Bytes, Infallible>>, hyper::Error> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") | (&Method::GET, "/post") => Ok(Response::new(INDEX.into())),
(&Method::GET, "/") | (&Method::GET, "/post") => Ok(Response::new(full(INDEX))),
(&Method::POST, "/post") => {
// Concatenate the body...
let b = hyper::body::to_bytes(req).await?;
Expand All @@ -43,7 +48,7 @@ async fn param_example(req: Request<Body>) -> Result<Response<Body>, hyper::Erro
} else {
return Ok(Response::builder()
.status(StatusCode::UNPROCESSABLE_ENTITY)
.body(MISSING.into())
.body(full(MISSING))
.unwrap());
};
let number = if let Some(n) = params.get("number") {
Expand All @@ -52,13 +57,13 @@ async fn param_example(req: Request<Body>) -> Result<Response<Body>, hyper::Erro
} else {
return Ok(Response::builder()
.status(StatusCode::UNPROCESSABLE_ENTITY)
.body(NOTNUMERIC.into())
.body(full(NOTNUMERIC))
.unwrap());
}
} else {
return Ok(Response::builder()
.status(StatusCode::UNPROCESSABLE_ENTITY)
.body(MISSING.into())
.body(full(MISSING))
.unwrap());
};

Expand All @@ -69,15 +74,15 @@ async fn param_example(req: Request<Body>) -> Result<Response<Body>, hyper::Erro
// responses such as InternalServiceError may be
// needed here, too.
let body = format!("Hello {}, your number is {}", name, number);
Ok(Response::new(body.into()))
Ok(Response::new(full(body)))
}
(&Method::GET, "/get") => {
let query = if let Some(q) = req.uri().query() {
q
} else {
return Ok(Response::builder()
.status(StatusCode::UNPROCESSABLE_ENTITY)
.body(MISSING.into())
.body(full(MISSING))
.unwrap());
};
let params = form_urlencoded::parse(query.as_bytes())
Expand All @@ -88,19 +93,27 @@ async fn param_example(req: Request<Body>) -> Result<Response<Body>, hyper::Erro
} else {
return Ok(Response::builder()
.status(StatusCode::UNPROCESSABLE_ENTITY)
.body(MISSING.into())
.body(full(MISSING))
.unwrap());
};
let body = format!("You requested {}", page);
Ok(Response::new(body.into()))
Ok(Response::new(full(body)))
}
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.body(empty())
.unwrap()),
}
}

fn empty() -> BoxBody<Bytes, Infallible> {
Empty::<Bytes>::new().boxed()
}

fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, Infallible> {
Full::new(chunk.into()).boxed()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
pretty_env_logger::init();
Expand Down