From 6cebf1d721e5ac0b8645ea2efef21e18f743ae44 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Tue, 16 Jul 2024 11:58:28 -0400 Subject: [PATCH 1/2] feat: add rate limiter for L1 deprecation --- gateway-framework/src/auth.rs | 50 +++++++++++++++++-- .../src/http/middleware/require_auth.rs | 3 +- graph-gateway/src/client_query.rs | 1 + graph-gateway/src/config.rs | 3 ++ graph-gateway/src/main.rs | 11 +++- 5 files changed, 62 insertions(+), 6 deletions(-) diff --git a/gateway-framework/src/auth.rs b/gateway-framework/src/auth.rs index 822a8bef..4a15b75f 100644 --- a/gateway-framework/src/auth.rs +++ b/gateway-framework/src/auth.rs @@ -4,12 +4,14 @@ mod common; use std::{ collections::{HashMap, HashSet}, sync::Arc, + time::Duration, }; -use anyhow::ensure; +use anyhow::{bail, ensure}; +use dashmap::DashMap; use ordered_float::NotNan; use thegraph_core::types::{alloy_primitives::Address, SubgraphId}; -use tokio::sync::watch; +use tokio::{sync::watch, time::MissedTickBehavior}; use self::api_keys::APIKey; @@ -41,15 +43,57 @@ pub struct AuthContext { pub payment_required: bool, pub api_keys: watch::Receiver>, pub special_api_keys: Arc>, + // TODO: remove after L1 deprecation + pub rate_limiter: Option, } impl AuthContext { /// Parse an authorization token into its corresponding settings, and check that the query /// should be handled. pub fn check(&self, token: &str, domain: &str) -> anyhow::Result { - ensure!(!token.is_empty(), "missing bearer token"); + ensure!(!token.is_empty(), "missing API key"); + + if let Some(rate_limiter) = &self.rate_limiter { + if rate_limiter.above_limit(token) { + bail!("rate limit exceeded"); + } + rate_limiter.increment(token); + } // For now, the only option is an API key. api_keys::check(self, token, domain) } } + +// TODO: remove after L1 deprecation +#[derive(Clone)] +pub struct RateLimiter { + counters: Arc>, + limit: u16, +} + +impl RateLimiter { + pub fn new(limit: u16) -> Self { + let counters: Arc> = Default::default(); + { + let counters = counters.clone(); + let mut interval = tokio::time::interval(Duration::from_secs(1)); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + tokio::spawn(async move { + loop { + interval.tick().await; + counters.clear(); + } + }) + }; + Self { counters, limit } + } + + pub fn above_limit(&self, key: &str) -> bool { + self.counters.get(key).map(|v| *v).unwrap_or(0) > self.limit + } + + pub fn increment(&self, key: &str) { + *self.counters.entry(key.to_string()).or_default() += 1; + } +} diff --git a/gateway-framework/src/http/middleware/require_auth.rs b/gateway-framework/src/http/middleware/require_auth.rs index 24d3f6b1..54ee7f22 100644 --- a/gateway-framework/src/http/middleware/require_auth.rs +++ b/gateway-framework/src/http/middleware/require_auth.rs @@ -186,6 +186,7 @@ mod tests { payment_required: false, api_keys: watch::channel(Default::default()).1, special_api_keys: Default::default(), + rate_limiter: None, }; if let Some(key) = key { ctx.api_keys = watch::channel(HashMap::from([( @@ -342,7 +343,7 @@ mod tests { assert_eq!(res.headers().typed_get(), Some(ContentType::json())); assert_matches!(deserialize_graphql_response_body::<()>(res.body_mut()).await, Ok(res_body) => { assert_eq!(res_body.errors.len(), 1); - assert_eq!(res_body.errors[0].message, "auth error: missing bearer token"); + assert_eq!(res_body.errors[0].message, "auth error: missing API key"); }); }); } diff --git a/graph-gateway/src/client_query.rs b/graph-gateway/src/client_query.rs index cc370f3f..b3776de5 100644 --- a/graph-gateway/src/client_query.rs +++ b/graph-gateway/src/client_query.rs @@ -966,6 +966,7 @@ mod tests { payment_required: false, api_keys: watch::channel(Default::default()).1, special_api_keys: Default::default(), + rate_limiter: None, }; if let Some(key) = key { ctx.api_keys = watch::channel(HashMap::from([( diff --git a/graph-gateway/src/config.rs b/graph-gateway/src/config.rs index 41bc101a..1019e767 100644 --- a/graph-gateway/src/config.rs +++ b/graph-gateway/src/config.rs @@ -29,6 +29,9 @@ use url::Url; pub struct Config { #[serde(default)] pub api_keys: Option, + /// Rate limit per API key in queries per second. + // TODO: remove after L1 deprecation + pub api_key_rate_limit: Option, pub attestations: AttestationConfig, /// List of indexer addresses to block. This should only be used temprorarily, to compensate for /// indexer-selection imperfections. diff --git a/graph-gateway/src/main.rs b/graph-gateway/src/main.rs index 58f7afdf..6bc2503b 100644 --- a/graph-gateway/src/main.rs +++ b/graph-gateway/src/main.rs @@ -134,8 +134,13 @@ async fn main() { ))); // Initialize the auth service - let auth_service = - init_auth_service(http_client.clone(), conf.api_keys, conf.payment_required).await; + let auth_service = init_auth_service( + http_client.clone(), + conf.api_keys, + conf.payment_required, + conf.api_key_rate_limit, + ) + .await; let budgeter: &'static Budgeter = Box::leak(Box::new(Budgeter::new(USD(conf.query_fees_target)))); @@ -350,6 +355,7 @@ async fn init_auth_service( http: reqwest::Client, config: Option, payment_required: bool, + api_key_rate_limit: Option, ) -> AuthContext { let special_api_keys = match &config { Some(ApiKeys::Endpoint { special, .. }) => Arc::new(HashSet::from_iter(special.clone())), @@ -371,6 +377,7 @@ async fn init_auth_service( payment_required, api_keys, special_api_keys, + rate_limiter: api_key_rate_limit.map(gateway_framework::auth::RateLimiter::new), } } From 1db3de6b2d604625c3c9b3b43d6330422937e73a Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Tue, 16 Jul 2024 12:24:48 -0400 Subject: [PATCH 2/2] improve error message --- gateway-framework/src/auth.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gateway-framework/src/auth.rs b/gateway-framework/src/auth.rs index 4a15b75f..c1c81c08 100644 --- a/gateway-framework/src/auth.rs +++ b/gateway-framework/src/auth.rs @@ -55,7 +55,7 @@ impl AuthContext { if let Some(rate_limiter) = &self.rate_limiter { if rate_limiter.above_limit(token) { - bail!("rate limit exceeded"); + bail!("Rate limit exceeded. Querying L1 subgraphs is deprecated and will be removed soon."); } rate_limiter.increment(token); }