Skip to content

Commit

Permalink
add middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
hlbarber committed Mar 9, 2024
1 parent 827211f commit 27e95c9
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 10 deletions.
14 changes: 14 additions & 0 deletions examples/middleware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use burger::{service_fn, Middleware, Placeholder, ServiceExt};

#[tokio::main]
async fn main() {
// Construct middleware
let middleware = Placeholder.concurrency_limit(3).buffer(2).load_shed();

let svc = service_fn(|x: String| async move { format!("hello, {x}") });
// Apply middleware
let svc = middleware.apply(svc);

// Result is a `Service`
let y = svc.oneshot("world".to_string()).await.unwrap();
}
17 changes: 16 additions & 1 deletion src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use std::fmt;
use futures_util::FutureExt;
use tokio::sync::{Semaphore, SemaphorePermit};

use crate::{load::Load, Service};
use crate::{load::Load, Middleware, Service};

/// A wrapper [`Service`] for the [`ServiceExt::buffer`](crate::ServiceExt::buffer) combinator.
///
Expand Down Expand Up @@ -148,3 +148,18 @@ where
self.inner.load()
}
}

impl<S, T> Middleware<S> for Buffer<T>
where
T: Middleware<S>,
{
type Service = Buffer<T::Service>;

fn apply(self, svc: S) -> Self::Service {
let Self { inner, semaphore } = self;
Buffer {
inner: inner.apply(svc),
semaphore,
}
}
}
17 changes: 16 additions & 1 deletion src/concurrency_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::fmt;

use tokio::sync::{Semaphore, SemaphorePermit};

use crate::{load::Load, Service};
use crate::{load::Load, Middleware, Service};

/// A wrapper for the [`ServiceExt::concurrency_limit`](crate::ServiceExt::concurrency_limit)
/// combinator.
Expand Down Expand Up @@ -105,3 +105,18 @@ where
self.inner.load()
}
}

impl<S, T> Middleware<S> for ConcurrencyLimit<T>
where
T: Middleware<S>,
{
type Service = ConcurrencyLimit<T::Service>;

fn apply(self, svc: S) -> Self::Service {
let Self { inner, semaphore } = self;
ConcurrencyLimit {
inner: inner.apply(svc),
semaphore,
}
}
}
16 changes: 15 additions & 1 deletion src/depressurize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
//!
//! The [`Load::load`] on [`Depressurize`] defers to the inner service.

use crate::{load::Load, Service, ServiceExt};
use crate::{load::Load, Middleware, Service, ServiceExt};

/// A wrapper for the [`ServiceExt::depressurize`] combinator.
///
Expand Down Expand Up @@ -71,3 +71,17 @@ where
self.inner.load()
}
}

impl<S, T> Middleware<S> for Depressurize<T>
where
T: Middleware<S>,
{
type Service = Depressurize<T::Service>;

fn apply(self, svc: S) -> Self::Service {
let Self { inner } = self;
Depressurize {
inner: inner.apply(svc),
}
}
}
18 changes: 17 additions & 1 deletion src/leak.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

use std::{fmt, sync::Arc};

use crate::{load::Load, Service};
use crate::{load::Load, Middleware, Service};

/// A wrapper [`Service`] for the [`ServiceExt::leak`](crate::ServiceExt::leak) combinator.
///
Expand Down Expand Up @@ -96,3 +96,19 @@ where
self.inner.load()
}
}

impl<'t, S, T> Middleware<S> for Leak<'t, T>
where
T: Middleware<S>,
{
type Service = Leak<'t, T::Service>;

fn apply(self, svc: S) -> Self::Service {
let Self { inner, _ref } = self;
let inner = Arc::into_inner(inner).expect("there cannot be inflight requests");
Leak {
inner: Arc::new(inner.apply(svc)),
_ref,
}
}
}
41 changes: 40 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub mod service_fn;
pub mod steer;
pub mod then;

use std::sync::Arc;
use std::{convert::Infallible, sync::Arc};

use buffer::Buffer;
use concurrency_limit::ConcurrencyLimit;
Expand Down Expand Up @@ -361,3 +361,42 @@ where
S::call(permit, request).await
}
}

/// A middleware, which adds additional behaviour to a [`Service`].
pub trait Middleware<S> {
/// The resultant service.
type Service;

/// Applies this middleware to an existing service.
fn apply(self, svc: S) -> Self::Service;
}

/// A placeholder [`Service`] which is unable to be called.
///
/// Has a
#[derive(Debug, Clone)]
pub struct Placeholder;

impl Service<Infallible> for Placeholder {
type Permit<'a> = ();
type Response = Infallible;

async fn acquire(&self) -> Self::Permit<'_> {
()
}

async fn call<'a>(_permit: Self::Permit<'a>, request: Infallible) -> Self::Response
where
Self: 'a,
{
request
}
}

impl<S> Middleware<S> for Placeholder {
type Service = S;

fn apply(self, svc: S) -> Self::Service {
svc
}
}
17 changes: 16 additions & 1 deletion src/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
sync::atomic::{AtomicUsize, Ordering},
};

use crate::Service;
use crate::{Middleware, Service};

/// A measurement of load on a [`Service`].
pub trait Load {
Expand Down Expand Up @@ -92,3 +92,18 @@ impl<S> Load for PendingRequests<S> {
self.count.load(Ordering::Acquire)
}
}

impl<'t, S, T> Middleware<S> for PendingRequests<T>
where
T: Middleware<S>,
{
type Service = PendingRequests<T::Service>;

fn apply(self, svc: S) -> Self::Service {
let Self { inner, count } = self;
PendingRequests {
inner: inner.apply(svc),
count,
}
}
}
16 changes: 15 additions & 1 deletion src/load_shed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

use futures_util::FutureExt;

use crate::{load::Load, Service};
use crate::{load::Load, Middleware, Service};

/// A wrapper [`Service`] for the [`ServiceExt::load_shed`](crate::ServiceExt::load_shed)
/// combinator.
Expand Down Expand Up @@ -87,3 +87,17 @@ where
self.inner.load()
}
}

impl<'t, S, T> Middleware<S> for LoadShed<T>
where
T: Middleware<S>,
{
type Service = LoadShed<T::Service>;

fn apply(self, svc: S) -> Self::Service {
let Self { inner } = self;
LoadShed {
inner: inner.apply(svc),
}
}
}
17 changes: 16 additions & 1 deletion src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use std::{any, fmt};

use crate::Service;
use crate::{Middleware, Service};

/// A wrapper [`Service`] for the [`ServiceExt::map`](crate::ServiceExt::map) combinator.
///
Expand Down Expand Up @@ -80,3 +80,18 @@ where
(permit.closure)(S::call(permit.inner, request).await)
}
}

impl<S, T, F> Middleware<S> for Map<T, F>
where
T: Middleware<S>,
{
type Service = Map<T::Service, F>;

fn apply(self, svc: S) -> Self::Service {
let Self { inner, closure } = self;
Map {
inner: inner.apply(svc),
closure,
}
}
}
17 changes: 16 additions & 1 deletion src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@

use std::fmt;

use crate::{load::Load, Service, ServiceExt};
use crate::{load::Load, Middleware, Service, ServiceExt};

/// A retry policy allows for customization of [Retry].
///
Expand Down Expand Up @@ -217,3 +217,18 @@ where
self.inner.load()
}
}

impl<S, T, P> Middleware<S> for Retry<T, P>
where
T: Middleware<S>,
{
type Service = Retry<T::Service, P>;

fn apply(self, svc: S) -> Self::Service {
let Self { inner, policy } = self;
Retry {
inner: inner.apply(svc),
policy,
}
}
}
17 changes: 16 additions & 1 deletion src/then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use std::{any, fmt, future::Future};

use crate::{load::Load, Service};
use crate::{load::Load, Middleware, Service};

/// A wrapper for the [`ServiceExt::then`](crate::ServiceExt::then) combinator.
///
Expand Down Expand Up @@ -92,3 +92,18 @@ where
self.inner.load()
}
}

impl<S, T, F> Middleware<S> for Then<T, F>
where
T: Middleware<S>,
{
type Service = Then<T::Service, F>;

fn apply(self, svc: S) -> Self::Service {
let Self { inner, closure } = self;
Then {
inner: inner.apply(svc),
closure,
}
}
}

0 comments on commit 27e95c9

Please sign in to comment.