Skip to content

Commit

Permalink
Clear stats on save
Browse files Browse the repository at this point in the history
Document and test IpThrottler

Fix bug where stats saving schedule was set to digest schedule
  • Loading branch information
Jerboa-app committed Jun 1, 2024
1 parent 841cb78 commit ad561c4
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct RuntimeOptions

pub static mut OPTIONS: RuntimeOptions = RuntimeOptions { debug: false, debug_timestamp: false };

pub fn debug(msg: String, context: Option<String>)
pub fn debug(msg: String, context: Option<&str>)
{
unsafe { if OPTIONS.debug == false { return } }

Expand Down
4 changes: 2 additions & 2 deletions src/server/api/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ impl ApiRequest for StatsDigest
Ok(p) => p,
Err(e) =>
{
crate::debug(format!("{} deserialising POST payload",e), Some("Stats Digest".to_string()));
crate::debug(format!("{} deserialising POST payload",e), Some("Stats Digest"));
return StatusCode::BAD_REQUEST
}
}
}
Err(e) =>
{
crate::debug(format!("{} deserialising POST payload",e), Some("Stats Digest".to_string()));
crate::debug(format!("{} deserialising POST payload",e), Some("Stats Digest"));
return StatusCode::BAD_REQUEST
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/server/https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl Server
StatsSaveTask::new
(
stats.clone(),
schedule_from_option(config.stats.digest_schedule.clone())
schedule_from_option(config.stats.save_schedule.clone())
)
)
);
Expand Down
6 changes: 3 additions & 3 deletions src/server/stats/hits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub async fn process_hit
"\nTotal stats time: {} s (Passthrough)\nCompute stats time: {} s (Passthrough)",
start_time.elapsed().as_secs_f64(),
compute_start_time.elapsed().as_secs_f64()
), Some("PERFORMANCE".to_string()));
), Some("PERFORMANCE"));

return
}
Expand All @@ -141,7 +141,7 @@ pub async fn process_hit
}
};

crate::debug(format!("{:?}", hit), Some("Statistics".to_string()));
crate::debug(format!("{:?}", hit), Some("Statistics"));

stats.hits.insert(hash, hit);

Expand All @@ -150,7 +150,7 @@ pub async fn process_hit
"\nTotal stats time: {} s\nCompute stats time: {} s",
start_time.elapsed().as_secs_f64(),
compute_start_time.elapsed().as_secs_f64()
), Some("PERFORMANCE".to_string()));
), Some("PERFORMANCE"));
}

/// Gathers [Hit]s both from disk and those cached in [HitStats]
Expand Down
8 changes: 5 additions & 3 deletions src/server/stats/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fs::create_dir, sync::Arc};
use std::{collections::HashMap, fs::create_dir, sync::Arc};

use axum::async_trait;
use chrono::{DateTime, Utc};
Expand All @@ -13,7 +13,8 @@ pub mod hits;
pub mod digest;
pub mod file;

/// A task to periodically save HitStats to disk
/// A task to periodically save HitStats to disk, clearing
/// the HitStats memory.
/// See [crate::task::Task] and [crate::task::TaskPool]
pub struct StatsSaveTask
{
Expand Down Expand Up @@ -48,7 +49,7 @@ impl Task for StatsSaveTask
{
let config = Config::load_or_default(CONFIG_PATH);
{
let stats = self.state.lock().await;
let mut stats = self.state.lock().await;

if !std::path::Path::new(&config.stats.path).exists()
{
Expand All @@ -62,6 +63,7 @@ impl Task for StatsSaveTask
let mut file = StatsFile::new();
file.load(&stats);
file.write_bytes();
stats.hits = HashMap::new();
}

self.schedule = schedule_from_option(config.stats.save_schedule.clone());
Expand Down
32 changes: 22 additions & 10 deletions src/server/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@ pub struct Request
hash: [u8; 64]
}

/// sha512 an ip and uri
impl Request
{
pub fn new(ip: Ipv4Addr, uri: &str) -> Request
{
Request { hash: sha512(&[uri.as_bytes(), &ip.octets()].concat()) }
}

pub fn hash(&self) -> [u8; 64]
{
return self.hash
}
}

/// Represent a unique [Request] (ip+uri hash) repeated count times
pub struct RequestData
{
count: u32,
Expand All @@ -42,6 +49,7 @@ impl RequestData
}
}

/// Detect repeated [Request]s and reflect if block for [IpThrottler::timeout_millis]
pub struct IpThrottler
{
requests_from: HashMap<Request, RequestData>,
Expand All @@ -58,22 +66,26 @@ impl IpThrottler
IpThrottler
{
requests_from: HashMap::new(),
max_requests_per_second: max_requests_per_second,
timeout_millis: timeout_millis,
max_requests_per_second,
timeout_millis,
clear_period: Duration::from_secs(clear_period_seconds),
last_clear: Instant::now()
}
}

/// Free hashmap (= HashMap::new()) if [IpThrottler::clear_period] has elapsed
pub fn check_clear(&mut self)
{
if self.last_clear.elapsed() > self.clear_period
{
self.requests_from.clear();
self.requests_from = HashMap::new();
self.last_clear = Instant::now();
}
}

/// Record hit counts for unique [Request]s over a time window of
/// [IpThrottler::clear_period]s. If more than [IpThrottler::max_requests_per_second]
/// the [Request] is marked as in [RequestData::timeout] for [IpThrottler::timeout_millis]ms.
pub fn is_limited(&mut self, addr: SocketAddr, uri: &str) -> bool
{
let ip = addr.ip();
Expand All @@ -86,17 +98,15 @@ impl IpThrottler
}

let request = Request::new(ipv4, uri);

println!("{:?}", request);

let requests = if self.requests_from.contains_key(&request)
{
self.requests_from[&request].clone()
&self.requests_from[&request]
}
else
{
self.requests_from.insert(request.clone(), RequestData {count: 0 as u32, last_request_time: Instant::now(), timeout: false});
self.requests_from[&request].clone()
&self.requests_from[&request]
};

let time = requests.last_request_time.elapsed().as_millis();
Expand Down Expand Up @@ -128,6 +138,8 @@ impl IpThrottler
}
}

/// Reflects any [Request]s in timeout (see [IpThrottler::is_limited]) as
/// [StatusCode::TOO_MANY_REQUESTS].
pub async fn handle_throttle<B>
(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
Expand All @@ -142,15 +154,15 @@ pub async fn handle_throttle<B>
throttler.check_clear();
if throttler.is_limited(addr, &request.uri().to_string())
{
crate::debug(format!("Denying: {} @/{}", addr, request.uri().to_string()), None);
crate::debug(format!("Serve time: {} s", serve_start.elapsed().as_secs_f64()), Some("PERFORMANCE".to_string()));
crate::debug(format!("Denying: {} @/{}", addr, request.uri().to_string()), Some("THROTTLE"));
crate::debug(format!("Serve time: {} s", serve_start.elapsed().as_secs_f64()), Some("PERFORMANCE"));
Err(StatusCode::TOO_MANY_REQUESTS)
}
else
{
crate::debug(format!("Allowing: {} @/{}", addr, request.uri().to_string()), None);
let response = next.run(request).await;
crate::debug(format!("Serve time: {} s", serve_start.elapsed().as_secs_f64()), Some("PERFORMANCE".to_string()));
crate::debug(format!("Serve time: {} s", serve_start.elapsed().as_secs_f64()), Some("PERFORMANCE"));
Ok(response)
}
}
Expand Down
60 changes: 60 additions & 0 deletions tests/test_throttler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
mod common;

#[cfg(test)]
mod test_throttle
{
use std::net::{Ipv4Addr, SocketAddr};

use busser::server::throttle::{IpThrottler, Request};
use openssl::sha::sha512;

#[test]
pub fn test_request()
{
let r1 = Request::new
(
Ipv4Addr::new(127, 0, 0, 0),
"/index.html"
);

let r2 = Request::new
(
Ipv4Addr::new(127, 0, 0, 0),
"/page.html"
);

let r3 = Request::new
(
Ipv4Addr::new(127, 1, 1, 1),
"/index.html"
);

assert_ne!(r1, r2);
assert_ne!(r1, r3);
assert_ne!(r2, r3);

assert_eq!(r1.hash(), sha512(&["/index.html".as_bytes(), &Ipv4Addr::new(127, 0, 0, 0).octets()].concat()));
assert_eq!(r2.hash(), sha512(&["/page.html".as_bytes(), &Ipv4Addr::new(127, 0, 0, 0).octets()].concat()));
assert_eq!(r3.hash(), sha512(&["/index.html".as_bytes(), &Ipv4Addr::new(127, 1, 1, 1).octets()].concat()));
}

#[test]
pub fn test_throttler()
{
let mut throttle = IpThrottler::new(1e-9, 5000, 3600);
let ip = Ipv4Addr::new(127, 0, 0, 0);
let path = "/index.html";
assert_eq!(throttle.is_limited(SocketAddr::new(std::net::IpAddr::V4(ip), 80), path), false);
assert_eq!(throttle.is_limited(SocketAddr::new(std::net::IpAddr::V4(ip), 80), path), true);
throttle.check_clear();
assert_eq!(throttle.is_limited(SocketAddr::new(std::net::IpAddr::V4(ip), 80), path), true);

let mut throttle = IpThrottler::new(1e-9, 5000, 0);
let ip = Ipv4Addr::new(127, 0, 0, 0);
let path = "/index.html";
assert_eq!(throttle.is_limited(SocketAddr::new(std::net::IpAddr::V4(ip), 80), path), false);
assert_eq!(throttle.is_limited(SocketAddr::new(std::net::IpAddr::V4(ip), 80), path), true);
throttle.check_clear();
assert_eq!(throttle.is_limited(SocketAddr::new(std::net::IpAddr::V4(ip), 80), path), false);
}
}

0 comments on commit ad561c4

Please sign in to comment.