Skip to content

Commit

Permalink
chore(volo-http): unwrap request and response (#319)
Browse files Browse the repository at this point in the history
* chore(volo-http): unwrap `Request` and `Response`

* chore(volo-http): refactor and enhance filter layer

* chore(volo-http): bump volo-http to 0.1.9

---------

Signed-off-by: Yu Li <liyu.yukiteru@bytedance.com>
  • Loading branch information
yukiiiteru authored Jan 8, 2024
1 parent 4efb1e4 commit 6a3aaad
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 145 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion examples/src/http/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use volo_http::{
extension::Extension,
extract::{Form, Query},
http::header,
layer::TimeoutLayer,
layer::{FilterLayer, TimeoutLayer},
middleware::{self, Next},
response::IntoResponse,
route::{from_handler, get, post, service_fn, MethodRouter, Router},
Expand Down Expand Up @@ -172,6 +172,14 @@ fn test_router() -> Router {
.get(service_fn(service_fn_test))
.build(),
)
// curl -v http://127.0.0.1:8080/test/anyaddr?reject_me
.layer(FilterLayer::new(|uri: Uri| async move {
if uri.query().is_some() && uri.query().unwrap() == "reject_me" {
Err(StatusCode::INTERNAL_SERVER_ERROR)
} else {
Ok(())
}
}))
}

// You can use the following commands for testing cookies
Expand Down
2 changes: 1 addition & 1 deletion volo-http/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "volo-http"
version = "0.1.8"
version = "0.1.9"
edition.workspace = true
homepage.workspace = true
repository.workspace = true
Expand Down
24 changes: 11 additions & 13 deletions volo-http/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,18 +251,17 @@ where
}
}

pub trait HandlerWithoutRequest<T>: Sized {
fn call(self, cx: &mut HttpContext) -> impl Future<Output = Response> + Send;
pub trait HandlerWithoutRequest<T, Ret>: Sized {
fn call(self, cx: &mut HttpContext) -> impl Future<Output = Result<Ret, Response>> + Send;
}

impl<F, Fut, Res> HandlerWithoutRequest<()> for F
impl<F, Fut, Ret> HandlerWithoutRequest<(), Ret> for F
where
F: FnOnce() -> Fut + Clone + Send,
Fut: Future<Output = Res> + Send,
Res: IntoResponse,
Fut: Future<Output = Ret> + Send,
{
async fn call(self, _context: &mut HttpContext) -> Response {
self().await.into_response()
async fn call(self, _context: &mut HttpContext) -> Result<Ret, Response> {
Ok(self().await)
}
}

Expand All @@ -271,21 +270,20 @@ macro_rules! impl_handler_without_request {
$($ty:ident),* $(,)?
) => {
#[allow(non_snake_case, unused_mut, unused_variables)]
impl<F, Fut, Res, $($ty,)*> HandlerWithoutRequest<($($ty,)*)> for F
impl<F, Fut, Ret, $($ty,)*> HandlerWithoutRequest<($($ty,)*), Ret> for F
where
F: FnOnce($($ty,)*) -> Fut + Clone + Send,
Fut: Future<Output = Res> + Send,
Res: IntoResponse,
Fut: Future<Output = Ret> + Send,
$( for<'r> $ty: FromContext<()> + Send + 'r, )*
{
async fn call(self, cx: &mut HttpContext) -> Response {
async fn call(self, cx: &mut HttpContext) -> Result<Ret, Response> {
$(
let $ty = match $ty::from_context(cx, &()).await {
Ok(value) => value,
Err(rejection) => return rejection.into_response(),
Err(rejection) => return Err(rejection.into_response()),
};
)*
self($($ty,)*).await.into_response()
Ok(self($($ty,)*).await)
}
}
};
Expand Down
123 changes: 62 additions & 61 deletions volo-http/src/layer.rs
Original file line number Diff line number Diff line change
@@ -1,72 +1,62 @@
use std::{marker::PhantomData, time::Duration};
use std::{convert::Infallible, marker::PhantomData, time::Duration};

use hyper::{
body::Incoming,
http::{Method, StatusCode},
};
use hyper::body::Incoming;
use motore::{layer::Layer, service::Service};

use crate::{
handler::HandlerWithoutRequest,
request::Request,
response::{IntoResponse, Response},
HttpContext,
};

pub trait LayerExt {
fn method(
self,
method: Method,
) -> FilterLayer<Box<dyn Fn(&mut HttpContext, &Request) -> Result<(), StatusCode>>>
where
Self: Sized,
{
self.filter(Box::new(move |cx: &mut HttpContext, _: &Request| {
if cx.method == method {
Ok(())
} else {
Err(StatusCode::METHOD_NOT_ALLOWED)
}
}))
}

fn filter<F>(self, f: F) -> FilterLayer<F>
where
Self: Sized,
F: Fn(&mut HttpContext, &Request) -> Result<(), StatusCode>,
{
FilterLayer { f }
}
#[derive(Clone)]
pub struct FilterLayer<H, R, T> {
handler: H,
_marker: PhantomData<(R, T)>,
}

pub struct FilterLayer<F> {
f: F,
impl<H, R, T> FilterLayer<H, R, T> {
pub fn new(h: H) -> Self {
Self {
handler: h,
_marker: PhantomData,
}
}
}

impl<S, F> Layer<S> for FilterLayer<F>
impl<S, H, R, T> Layer<S> for FilterLayer<H, R, T>
where
S: Service<HttpContext, Request, Response = Response> + Send + Sync + 'static,
F: Fn(&mut HttpContext, &Request) -> Result<(), StatusCode> + Send + Sync,
S: Send + Sync + 'static,
H: Clone + Send + Sync + 'static,
T: Sync,
{
type Service = Filter<S, F>;
type Service = Filter<S, H, R, T>;

fn layer(self, inner: S) -> Self::Service {
Filter {
service: inner,
f: self.f,
handler: self.handler,
_marker: PhantomData,
}
}
}

pub struct Filter<S, F> {
#[derive(Clone)]
pub struct Filter<S, H, R, T> {
service: S,
f: F,
handler: H,
_marker: PhantomData<(R, T)>,
}

impl<S, F> Service<HttpContext, Request> for Filter<S, F>
impl<S, H, R, T> Service<HttpContext, Incoming> for Filter<S, H, R, T>
where
S: Service<HttpContext, Request, Response = Response> + Send + Sync + 'static,
F: Fn(&mut HttpContext, &Request) -> Result<(), StatusCode> + Send + Sync,
S: Service<HttpContext, Incoming, Response = Response, Error = Infallible>
+ Send
+ Sync
+ 'static,
H: HandlerWithoutRequest<T, Result<(), R>> + Clone + Send + Sync + 'static,
R: IntoResponse + Send + Sync,
T: Sync,
{
type Response = S::Response;

Expand All @@ -75,26 +65,33 @@ where
async fn call<'s, 'cx>(
&'s self,
cx: &'cx mut HttpContext,
req: Request,
req: Incoming,
) -> Result<Self::Response, Self::Error> {
if let Err(status) = (self.f)(cx, &req) {
return Ok(status.into_response());
match self.handler.clone().call(cx).await {
// do not filter it, call the service
Ok(Ok(())) => self.service.call(cx, req).await,
// filter it and return the specified response
Ok(Err(res)) => Ok(res.into_response()),
// something wrong while extracting
Err(rej) => {
tracing::warn!("[VOLO] FilterLayer: something wrong while extracting");
Ok(rej.into_response())
}
}
self.service.call(cx, req).await
}
}

#[derive(Clone)]
pub struct TimeoutLayer<H, T> {
pub struct TimeoutLayer<H, R, T> {
duration: Duration,
handler: H,
_marker: PhantomData<T>,
_marker: PhantomData<(R, T)>,
}

impl<H, T> TimeoutLayer<H, T> {
impl<H, R, T> TimeoutLayer<H, R, T> {
pub fn new(duration: Duration, handler: H) -> Self
where
H: HandlerWithoutRequest<T> + Clone + Send + Sync + 'static,
H: Send + Sync + 'static,
{
Self {
duration,
Expand All @@ -104,13 +101,14 @@ impl<H, T> TimeoutLayer<H, T> {
}
}

impl<S, H, T> Layer<S> for TimeoutLayer<H, T>
impl<S, H, R, T> Layer<S> for TimeoutLayer<H, R, T>
where
S: Service<HttpContext, Incoming, Response = Response> + Send + Sync + 'static,
H: HandlerWithoutRequest<T> + Clone + Send + Sync + 'static,
S: Send + Sync + 'static,
H: Clone + Send + Sync + 'static,
R: Sync,
T: Sync,
{
type Service = Timeout<S, H, T>;
type Service = Timeout<S, H, R, T>;

fn layer(self, inner: S) -> Self::Service {
Timeout {
Expand All @@ -123,18 +121,21 @@ where
}

#[derive(Clone)]
pub struct Timeout<S, H, T> {
pub struct Timeout<S, H, R, T> {
service: S,
duration: Duration,
handler: H,
_marker: PhantomData<T>,
_marker: PhantomData<(R, T)>,
}

impl<S, H, T> Service<HttpContext, Incoming> for Timeout<S, H, T>
impl<S, H, R, T> Service<HttpContext, Incoming> for Timeout<S, H, R, T>
where
S: Service<HttpContext, Incoming, Response = Response> + Send + Sync + 'static,
S::Error: Send,
H: HandlerWithoutRequest<T> + Clone + Send + Sync + 'static,
S: Service<HttpContext, Incoming, Response = Response, Error = Infallible>
+ Send
+ Sync
+ 'static,
H: HandlerWithoutRequest<T, R> + Clone + Send + Sync + 'static,
R: IntoResponse + Sync,
T: Sync,
{
type Response = S::Response;
Expand All @@ -152,7 +153,7 @@ where
tokio::select! {
resp = fut_service => resp,
_ = fut_timeout => {
Ok(self.handler.clone().call(cx).await)
Ok(self.handler.clone().call(cx).await.into_response())
},
}
}
Expand Down
32 changes: 1 addition & 31 deletions volo-http/src/request.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1 @@
use std::ops::{Deref, DerefMut};

use hyper::{body::Incoming, http::request::Builder};

pub struct Request(pub(crate) hyper::http::Request<hyper::body::Incoming>);

impl Request {
pub fn builder() -> Builder {
Builder::new()
}
}

impl Deref for Request {
type Target = hyper::http::Request<hyper::body::Incoming>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl DerefMut for Request {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl From<hyper::http::Request<Incoming>> for Request {
fn from(value: hyper::http::Request<Incoming>) -> Self {
Self(value)
}
}
pub type Request<B = hyper::body::Incoming> = hyper::http::Request<B>;
Loading

0 comments on commit 6a3aaad

Please sign in to comment.