Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 47 additions & 3 deletions gateway-framework/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ mod common;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};

use anyhow::ensure;
use anyhow::{bail, ensure};
use dashmap::DashMap;
use ordered_float::NotNan;
use thegraph_core::types::{alloy_primitives::Address, SubgraphId};
use tokio::sync::watch;
use tokio::{sync::watch, time::MissedTickBehavior};

use self::api_keys::APIKey;

Expand Down Expand Up @@ -41,15 +43,57 @@ pub struct AuthContext {
pub payment_required: bool,
pub api_keys: watch::Receiver<HashMap<String, APIKey>>,
pub special_api_keys: Arc<HashSet<String>>,
// TODO: remove after L1 deprecation
pub rate_limiter: Option<RateLimiter>,
}

impl AuthContext {
/// Parse an authorization token into its corresponding settings, and check that the query
/// should be handled.
pub fn check(&self, token: &str, domain: &str) -> anyhow::Result<AuthSettings> {
ensure!(!token.is_empty(), "missing bearer token");
ensure!(!token.is_empty(), "missing API key");

if let Some(rate_limiter) = &self.rate_limiter {
if rate_limiter.above_limit(token) {
bail!("Rate limit exceeded. Querying L1 subgraphs is deprecated and will be removed soon.");
}
rate_limiter.increment(token);
}

// For now, the only option is an API key.
api_keys::check(self, token, domain)
}
}

// TODO: remove after L1 deprecation
#[derive(Clone)]
pub struct RateLimiter {
counters: Arc<DashMap<String, u16>>,
limit: u16,
}

impl RateLimiter {
pub fn new(limit: u16) -> Self {
let counters: Arc<DashMap<String, u16>> = Default::default();
{
let counters = counters.clone();
let mut interval = tokio::time::interval(Duration::from_secs(1));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
tokio::spawn(async move {
loop {
interval.tick().await;
counters.clear();
}
})
};
Self { counters, limit }
}

pub fn above_limit(&self, key: &str) -> bool {
self.counters.get(key).map(|v| *v).unwrap_or(0) > self.limit
}

pub fn increment(&self, key: &str) {
*self.counters.entry(key.to_string()).or_default() += 1;
}
}
3 changes: 2 additions & 1 deletion gateway-framework/src/http/middleware/require_auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ mod tests {
payment_required: false,
api_keys: watch::channel(Default::default()).1,
special_api_keys: Default::default(),
rate_limiter: None,
};
if let Some(key) = key {
ctx.api_keys = watch::channel(HashMap::from([(
Expand Down Expand Up @@ -342,7 +343,7 @@ mod tests {
assert_eq!(res.headers().typed_get(), Some(ContentType::json()));
assert_matches!(deserialize_graphql_response_body::<()>(res.body_mut()).await, Ok(res_body) => {
assert_eq!(res_body.errors.len(), 1);
assert_eq!(res_body.errors[0].message, "auth error: missing bearer token");
assert_eq!(res_body.errors[0].message, "auth error: missing API key");
});
});
}
Expand Down
1 change: 1 addition & 0 deletions graph-gateway/src/client_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,7 @@ mod tests {
payment_required: false,
api_keys: watch::channel(Default::default()).1,
special_api_keys: Default::default(),
rate_limiter: None,
};
if let Some(key) = key {
ctx.api_keys = watch::channel(HashMap::from([(
Expand Down
3 changes: 3 additions & 0 deletions graph-gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ use url::Url;
pub struct Config {
#[serde(default)]
pub api_keys: Option<ApiKeys>,
/// Rate limit per API key in queries per second.
// TODO: remove after L1 deprecation
pub api_key_rate_limit: Option<u16>,
pub attestations: AttestationConfig,
/// List of indexer addresses to block. This should only be used temprorarily, to compensate for
/// indexer-selection imperfections.
Expand Down
11 changes: 9 additions & 2 deletions graph-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,13 @@ async fn main() {
)));

// Initialize the auth service
let auth_service =
init_auth_service(http_client.clone(), conf.api_keys, conf.payment_required).await;
let auth_service = init_auth_service(
http_client.clone(),
conf.api_keys,
conf.payment_required,
conf.api_key_rate_limit,
)
.await;

let budgeter: &'static Budgeter =
Box::leak(Box::new(Budgeter::new(USD(conf.query_fees_target))));
Expand Down Expand Up @@ -350,6 +355,7 @@ async fn init_auth_service(
http: reqwest::Client,
config: Option<ApiKeys>,
payment_required: bool,
api_key_rate_limit: Option<u16>,
) -> AuthContext {
let special_api_keys = match &config {
Some(ApiKeys::Endpoint { special, .. }) => Arc::new(HashSet::from_iter(special.clone())),
Expand All @@ -371,6 +377,7 @@ async fn init_auth_service(
payment_required,
api_keys,
special_api_keys,
rate_limiter: api_key_rate_limit.map(gateway_framework::auth::RateLimiter::new),
}
}

Expand Down