Skip to content

Commit

Permalink
Imporve ratelimiter
Browse files Browse the repository at this point in the history
  • Loading branch information
DrVilepis committed Apr 24, 2022
1 parent fe5311d commit c636556
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 72 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion testausratelimiter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,4 @@ authors = ["Ville Järvinen <jarvinenville606@gmail.com>", "Luukas Pörtfors <la
actix-web = { version = "^4.0.1", features = ["macros","openssl"] }
actix = "~0.13"
log = "~0.4"
chrono = { version = "~0.4", features = ["serde"] }
futures-util = "~0.3"
184 changes: 114 additions & 70 deletions testausratelimiter/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,86 +1,78 @@
use std::{
collections::HashMap,
future::{ready, Ready},
rc::Rc,
time::{Duration, Instant},
};

use actix::prelude::*;
use actix_web::{
body::EitherBody,
dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform},
Error,
http::header::{HeaderName, HeaderValue},
Error, HttpResponse,
};
use chrono::prelude::*;
use futures_util::{future::LocalBoxFuture, stream::once};

pub struct RateLimiterStorage {
pub clients: HashMap<String, (u32, NaiveDateTime)>,
pub clients: HashMap<String, (usize, Instant)>,
pub maxrpm: usize,
pub reset_interval: usize,
event_count: usize,
}

pub struct RateLimiter {
pub storage: Addr<RateLimiterStorage>,
pub use_peer_addr: bool,
pub maxrpm: usize,
}

pub struct RateLimiterTransform<S> {
pub service: S,
pub ratelimiter: Addr<RateLimiterStorage>,
pub use_peer_addr: bool,
pub maxrpm: usize,
}

#[derive(Message)]
#[rtype(result = "usize")]
struct RpmLimitRequest;

#[derive(Message)]
#[rtype(result = "()")]
struct ClearRequest;

struct IpRequest {
pub ip: String,
}

impl Actor for RateLimiterStorage {
type Context = Context<Self>;
}

impl RateLimiterStorage {
pub fn new(max: usize) -> Self {
pub fn new(maxrpm: usize, reset_interval: usize) -> Self {
RateLimiterStorage {
clients: HashMap::new(),
maxrpm: max,
maxrpm,
reset_interval,
event_count: 0,
}
}
}

impl Message for IpRequest {
type Result = Result<bool, std::io::Error>;
struct ConfigRequest;

impl Message for ConfigRequest {
type Result = Result<(usize, usize), std::io::Error>;
}

impl Handler<RpmLimitRequest> for RateLimiterStorage {
type Result = usize;
impl Handler<ConfigRequest> for RateLimiterStorage {
type Result = Result<(usize, usize), std::io::Error>;

fn handle(&mut self, _: RpmLimitRequest, _: &mut Context<Self>) -> Self::Result {
self.maxrpm
fn handle(&mut self, _: ConfigRequest, _: &mut Context<Self>) -> Self::Result {
Ok((self.maxrpm.to_owned(), self.reset_interval.to_owned()))
}
}

#[derive(Message)]
#[rtype(result = "()")]
struct ClearRequest;

impl Handler<ClearRequest> for RateLimiterStorage {
type Result = ();

fn handle(&mut self, _: ClearRequest, _: &mut Context<Self>) {
let cur_time = Local::now().naive_local();
self.clients.retain(|_, (_, time)| {
cur_time.signed_duration_since(*time) < chrono::Duration::minutes(30)
});
let cur_time = Instant::now();
self.clients
.retain(|_, (_, time)| cur_time.duration_since(*time) < Duration::from_secs(1800));
}
}

struct IpRequest {
pub ip: String,
}

impl Message for IpRequest {
type Result = Result<(Option<usize>, Duration), std::io::Error>;
}

impl Handler<IpRequest> for RateLimiterStorage {
type Result = Result<bool, std::io::Error>;
type Result = Result<(Option<usize>, Duration), std::io::Error>;

fn handle(&mut self, req: IpRequest, ctx: &mut Context<Self>) -> Self::Result {
if self.event_count > 1000 {
Expand All @@ -90,51 +82,88 @@ impl Handler<IpRequest> for RateLimiterStorage {
self.event_count += 1;
};
if let Some((r, s)) = self.clients.get_mut(&req.ip) {
let time = Local::now().naive_local();
if time.signed_duration_since(*s) > chrono::Duration::minutes(1) {
*r = 0;
*s = time;
let time = Instant::now();
let duration = (*s).duration_since(time);
if duration == Duration::from_secs(0) {
*r = 1;
*s = time + Duration::from_secs(self.reset_interval as u64);
Ok((
Some(self.maxrpm - *r),
Duration::from_secs(self.reset_interval as u64),
))
} else if *r as usize > self.maxrpm {
return Ok(false);
Ok((None, duration))
} else {
*r += 1
*r += 1;
Ok((Some(self.maxrpm - *r), duration))
}
} else {
self.clients.insert(req.ip, (1, Local::now().naive_local()));
self.clients.insert(
req.ip,
(
1,
std::time::Instant::now() + Duration::from_secs(self.reset_interval as u64),
),
);
Ok((
Some(self.maxrpm - 1),
Duration::from_secs(self.reset_interval as u64),
))
}
Ok(true)
}
}

pub struct RateLimiter {
pub storage: Addr<RateLimiterStorage>,
pub use_peer_addr: bool,
pub maxrpm: usize,
pub reset_interval: usize,
}

impl<S, B> Transform<S, ServiceRequest> for RateLimiter
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error> + 'static,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Response = ServiceResponse<EitherBody<B>>;
type Error = Error;
type InitError = ();
type Transform = RateLimiterTransform<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
type Future = LocalBoxFuture<'static, Result<Self::Transform, Self::InitError>>;

fn new_transform(&self, service: S) -> Self::Future {
ready(Ok(RateLimiterTransform {
service,
ratelimiter: self.storage.clone(),
use_peer_addr: self.use_peer_addr,
maxrpm: self.maxrpm,
}))
let ratelimiter = self.storage.clone();
let use_peer_addr = self.use_peer_addr;
let maxrpm = self.maxrpm;
let reset_interval = self.reset_interval;
Box::pin(async move {
Ok(RateLimiterTransform {
service: Rc::new(service),
ratelimiter,
use_peer_addr,
maxrpm,
reset_interval,
})
})
}
}

pub struct RateLimiterTransform<S> {
pub service: Rc<S>,
pub ratelimiter: Addr<RateLimiterStorage>,
pub use_peer_addr: bool,
pub maxrpm: usize,
pub reset_interval: usize,
}

impl<S, B> Service<ServiceRequest> for RateLimiterTransform<S>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error> + 'static,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Response = ServiceResponse<EitherBody<B>>;
type Error = S::Error;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;

Expand All @@ -150,20 +179,35 @@ where
}
} {
let res = self.ratelimiter.send(IpRequest { ip: ip.to_owned() });
let resp = self.service.call(req);
let service = Rc::clone(&self.service);
let maxrpm = self.maxrpm;
Box::pin(async move {
let res = res
let (remaining, reset) = res
.await
.map_err(|e| actix_web::error::ErrorInternalServerError(e))??;
if res {
let resp = resp.await?;
Ok(resp)
if let Some(remaining) = remaining {
let mut resp = service.call(req).await?;
let headers = resp.headers_mut();
headers.insert(
HeaderName::from_static("ratelimit-limit"),
HeaderValue::from_str(&maxrpm.to_string()).unwrap(),
);
headers.insert(
HeaderName::from_static("ratelimit-remaining"),
HeaderValue::from_str(&remaining.to_string()).unwrap(),
);
headers.insert(
HeaderName::from_static("ratelimit-reset"),
HeaderValue::from_str(&reset.as_secs().to_string()).unwrap(),
);
Ok(resp.map_into_left_body())
} else {
Err(actix_web::error::ErrorTooManyRequests(format!(
"You have sent more than `{}` requests this minute.",
maxrpm
)))
let response = HttpResponse::TooManyRequests()
.insert_header(("ratelimit-limit", maxrpm.to_string()))
.insert_header(("ratelimit-remaining", 0usize.to_string()))
.insert_header(("ratelimit-reset", reset.as_secs().to_string()))
.finish();
Ok(req.into_response(response.map_into_right_body()))
}
})
} else {
Expand Down

0 comments on commit c636556

Please sign in to comment.