From cba725b4ba18c2864e5d41dfeddb02b815a12fef Mon Sep 17 00:00:00 2001 From: Shudipto Trafder Date: Wed, 29 Apr 2026 18:08:56 +0600 Subject: [PATCH 1/5] feat: implement rate limiting middleware and configuration from agentflow.json --- agentflow.json | 8 +- .../src/app/core/config/graph_config.py | 52 +++ .../src/app/core/config/setup_middleware.py | 20 +- .../src/app/core/middleware/rate_limit.py | 138 ++++++++ agentflow_cli/src/app/main.py | 2 +- docs/rate-limit-review.md | 317 ++++++++++++++++++ 6 files changed, 534 insertions(+), 3 deletions(-) create mode 100644 agentflow_cli/src/app/core/middleware/rate_limit.py create mode 100644 docs/rate-limit-review.md diff --git a/agentflow.json b/agentflow.json index 82df8be..fd11aa4 100644 --- a/agentflow.json +++ b/agentflow.json @@ -2,5 +2,11 @@ "agent": "graph.react:app", "thread_name_generator": "graph.thread_name_generator:MyNameGenerator", "env": ".env", - "auth": null + "auth": null, + "rate_limit": { + "enabled": true, + "requests": 100, + "window": 60, + "by": "ip" + } } diff --git a/agentflow_cli/src/app/core/config/graph_config.py b/agentflow_cli/src/app/core/config/graph_config.py index a4d0e6d..6d187d5 100644 --- a/agentflow_cli/src/app/core/config/graph_config.py +++ b/agentflow_cli/src/app/core/config/graph_config.py @@ -1,10 +1,37 @@ import json import os +from dataclasses import dataclass from pathlib import Path from dotenv import load_dotenv +@dataclass +class RateLimitConfig: + """Rate limit configuration parsed from agentflow.json.""" + + enabled: bool + requests: int + window: int + by: str # "ip" | "global" + + @classmethod + def from_dict(cls, data: dict) -> "RateLimitConfig": + enabled = data.get("enabled", True) + requests = int(data.get("requests", 100)) + window = int(data.get("window", 60)) + by = data.get("by", "ip") + + if by not in ("ip", "global"): + raise ValueError(f"rate_limit.by must be 'ip' or 'global', got '{by}'") + if requests <= 0: + raise ValueError("rate_limit.requests must be a positive integer") + if window <= 0: + raise ValueError("rate_limit.window must be a positive integer") + + return cls(enabled=enabled, requests=requests, window=window, by=by) + + class GraphConfig: def __init__(self, path: str = "agentflow.json"): with Path(path).open() as f: @@ -84,3 +111,28 @@ def auth_config(self) -> dict | None: } raise ValueError(f"Unsupported auth method: {res}") + + @property + def rate_limit(self) -> RateLimitConfig | None: + """ + Get rate limit configuration from agentflow.json. + + Returns: + RateLimitConfig if 'rate_limit' key is present and enabled, else None. + + Example agentflow.json entry:: + + "rate_limit": { + "enabled": true, + "requests": 100, + "window": 60, + "by": "ip" + } + """ + data = self.data.get("rate_limit", None) + if data is None: + return None + config = RateLimitConfig.from_dict(data) + if not config.enabled: + return None + return config diff --git a/agentflow_cli/src/app/core/config/setup_middleware.py b/agentflow_cli/src/app/core/config/setup_middleware.py index cf3c03f..8be2581 100644 --- a/agentflow_cli/src/app/core/config/setup_middleware.py +++ b/agentflow_cli/src/app/core/config/setup_middleware.py @@ -9,9 +9,11 @@ from starlette.requests import Request from starlette.types import ASGIApp, Receive, Scope, Send +from agentflow_cli.src.app.core.middleware.rate_limit import RateLimitMiddleware from agentflow_cli.src.app.core.middleware.request_limits import RequestSizeLimitMiddleware from agentflow_cli.src.app.core.middleware.security_headers import SecurityHeadersMiddleware +from .graph_config import GraphConfig from .sentry_config import init_sentry from .settings import get_settings, logger @@ -94,17 +96,20 @@ async def dispatch(self, request: Request, call_next): return response -def setup_middleware(app: FastAPI): +def setup_middleware(app: FastAPI, graph_config: GraphConfig | None = None): """ Set up middleware for the FastAPI application. Args: app (FastAPI): The FastAPI application instance. + graph_config (GraphConfig | None): Optional graph configuration used to + enable dynamic rate limiting from ``agentflow.json``. Middleware: - CORS: Configured based on settings.ORIGINS. - TrustedHost: Configured with allowed hosts from settings.ALLOWED_HOST. - GZip: Applied with a minimum size of 1000 bytes (excludes streaming endpoints). + - RateLimit: Applied when ``rate_limit`` is configured in ``agentflow.json``. """ settings = get_settings() # init cors @@ -142,6 +147,19 @@ def setup_middleware(app: FastAPI): # Use SelectiveGZipMiddleware to exclude streaming endpoints from compression # Streaming endpoints need immediate data transmission without buffering app.add_middleware(SelectiveGZipMiddleware, minimum_size=1000) + + # Apply rate limiting only when configured in agentflow.json + if graph_config is not None: + rate_limit_config = graph_config.rate_limit + if rate_limit_config is not None: + app.add_middleware(RateLimitMiddleware, config=rate_limit_config) + logger.info( + "Rate limiting enabled: %d requests per %ds by %s", + rate_limit_config.requests, + rate_limit_config.window, + rate_limit_config.by, + ) + logger.debug("Middleware set up") # Initialize Sentry diff --git a/agentflow_cli/src/app/core/middleware/rate_limit.py b/agentflow_cli/src/app/core/middleware/rate_limit.py new file mode 100644 index 0000000..40cc442 --- /dev/null +++ b/agentflow_cli/src/app/core/middleware/rate_limit.py @@ -0,0 +1,138 @@ +"""In-memory sliding-window rate limit middleware. + +Configured via ``agentflow.json``:: + + "rate_limit": { + "enabled": true, + "requests": 100, + "window": 60, + "by": "ip" + } + +Fields +------ +enabled : bool – turn the limiter on/off without removing the key. +requests : int – max requests allowed in the window. +window : int – rolling window duration in seconds. +by : str – "ip" (per client IP) or "global" (single shared bucket). +""" + +import asyncio +import time +from collections import deque + +from fastapi import status +from fastapi.responses import JSONResponse +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.requests import Request + +from agentflow_cli.src.app.core import logger +from agentflow_cli.src.app.core.config.graph_config import RateLimitConfig + + +class RateLimitMiddleware(BaseHTTPMiddleware): + """Sliding-window rate limiter middleware. + + Args: + app: The ASGI application. + config: Parsed :class:`RateLimitConfig` from ``agentflow.json``. + """ + + def __init__(self, app, config: RateLimitConfig) -> None: + super().__init__(app) + self.config = config + # bucket key -> deque of request timestamps (float, epoch seconds) + self._buckets: dict[str, deque[float]] = {} + self._lock = asyncio.Lock() + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _bucket_key(self, request: Request) -> str: + if self.config.by == "global": + return "__global__" + # Per-IP: honour X-Forwarded-For when behind a proxy, fall back to + # the direct client address. + forwarded_for = request.headers.get("X-Forwarded-For") + if forwarded_for: + return forwarded_for.split(",")[0].strip() + client = request.client + return client.host if client else "unknown" + + async def _is_allowed(self, key: str) -> tuple[bool, int, int]: + """Check whether the request is within the rate limit. + + Returns + ------- + (allowed, remaining, reset_in_seconds) + """ + now = time.monotonic() + window_start = now - self.config.window + + async with self._lock: + bucket = self._buckets.setdefault(key, deque()) + + # Drop timestamps that fell outside the current window. + while bucket and bucket[0] < window_start: + bucket.popleft() + + count = len(bucket) + remaining = max(0, self.config.requests - count - 1) + + if count >= self.config.requests: + # How long until the oldest entry expires. + reset_in = int(self.config.window - (now - bucket[0])) + 1 + return False, 0, reset_in + + bucket.append(now) + return True, remaining, self.config.window + + # ------------------------------------------------------------------ + # Middleware dispatch + # ------------------------------------------------------------------ + + async def dispatch(self, request: Request, call_next): + key = self._bucket_key(request) + allowed, remaining, reset_in = await self._is_allowed(key) + + if not allowed: + request_id = getattr(request.state, "request_id", "unknown") + logger.warning( + "Rate limit exceeded for %s on %s %s", + key, + request.method, + request.url.path, + ) + return JSONResponse( + status_code=status.HTTP_429_TOO_MANY_REQUESTS, + content={ + "error": { + "code": "RATE_LIMIT_EXCEEDED", + "message": ( + f"Too many requests. Limit is {self.config.requests} " + f"requests per {self.config.window}s. " + f"Retry after {reset_in}s." + ), + "limit": self.config.requests, + "window_seconds": self.config.window, + "retry_after_seconds": reset_in, + }, + "metadata": { + "request_id": request_id, + "status": "error", + }, + }, + headers={ + "Retry-After": str(reset_in), + "X-RateLimit-Limit": str(self.config.requests), + "X-RateLimit-Remaining": "0", + "X-RateLimit-Reset": str(reset_in), + }, + ) + + response = await call_next(request) + response.headers["X-RateLimit-Limit"] = str(self.config.requests) + response.headers["X-RateLimit-Remaining"] = str(remaining) + response.headers["X-RateLimit-Reset"] = str(reset_in) + return response diff --git a/agentflow_cli/src/app/main.py b/agentflow_cli/src/app/main.py index aef1173..62e5004 100644 --- a/agentflow_cli/src/app/main.py +++ b/agentflow_cli/src/app/main.py @@ -99,7 +99,7 @@ async def lifespan(app: FastAPI): root_path=settings.ROOT_PATH, ) -setup_middleware(app) +setup_middleware(app, graph_config=graph_config) # attach_injector(app, injector=injector) setup_fastapi(container=container, app=app) diff --git a/docs/rate-limit-review.md b/docs/rate-limit-review.md new file mode 100644 index 0000000..72764d5 --- /dev/null +++ b/docs/rate-limit-review.md @@ -0,0 +1,317 @@ +# Rate Limit Review + +Reviewed files: + +- `agentflow-api/agentflow_cli/src/app/core/config/graph_config.py` +- `agentflow-api/agentflow_cli/src/app/core/middleware/rate_limit.py` +- `agentflow-api/agentflow_cli/src/app/core/config/setup_middleware.py` +- `agentflow-api/agentflow.json` + +## Verdict + +The current rate limiter is fine as a local development toy, but it is not a production-scalable rate limiter. It uses per-process memory, accepts spoofable client identity by default, serializes every request through one lock, has no backend abstraction, and exposes config that looks production-ready while silently breaking across workers or containers. + +For a framework whose promise is "focus on building the agent, not the scalable logic," this needs a backend-driven design. + +## Findings + +### High: limits are per process, not per deployment + +`RateLimitMiddleware` stores buckets in `self._buckets`, an in-memory dictionary owned by one Python process: + +- `rate_limit.py:44-46` +- `rate_limit.py:73-89` + +That means every Uvicorn/Gunicorn worker has its own quota. If the config says `100 requests / 60s` and the API runs with 8 workers, the real limit can become roughly `800 requests / 60s` per host. If the app runs across 5 containers, it can become roughly `4000 requests / 60s`. + +This is the main scalability failure. + +### High: the API config has no backend choice + +`RateLimitConfig` only supports: + +- `enabled` +- `requests` +- `window` +- `by` + +See `graph_config.py:9-32`. The example config in `agentflow.json:6-11` also has no way to say whether the limiter should use memory, Redis, or another storage backend. + +That makes the current implementation a hard-coded memory strategy wearing a production config costume. + +### High: `X-Forwarded-For` is trusted unconditionally + +The IP key uses `X-Forwarded-For` whenever present: + +- `rate_limit.py:55-59` + +This is dangerous unless the app is guaranteed to sit behind a trusted proxy that strips untrusted forwarding headers. A direct client can spoof `X-Forwarded-For` and rotate the value to bypass per-IP limits. + +This should be configurable and tied to trusted proxy settings, or use Starlette/Uvicorn proxy header handling in a controlled deployment path. + +### Medium: one global lock serializes all limiter checks + +Every request enters the same `asyncio.Lock`: + +- `rate_limit.py:46` +- `rate_limit.py:73` + +Even unrelated IPs block each other. Under load, the rate limiter becomes a request funnel. This is especially awkward because rate limiting is supposed to protect the app during pressure, not become the pressure point. + +### Medium: bucket cleanup is passive and memory can grow + +Old timestamps are only removed when the same bucket key is seen again: + +- `rate_limit.py:76-78` + +If many unique client keys appear once, their empty or stale buckets can remain in `_buckets` indefinitely. A hostile client can generate many spoofed `X-Forwarded-For` values and inflate memory. + +### Medium: `global` mode is global only inside one process + +The `global` bucket key is `__global__`: + +- `rate_limit.py:52-54` + +Because storage is local memory, this is not actually global across workers, pods, hosts, or deployments. The name promises more than the implementation can deliver. + +### Medium: algorithm choice is fixed and storage-heavy + +The middleware implements a sliding window by storing every request timestamp in a deque: + +- `rate_limit.py:44` +- `rate_limit.py:88` + +This gives accurate sliding windows, but memory grows with `number_of_keys * requests`. Redis can support this with sorted sets, but for high traffic a token bucket or fixed-window-with-Lua approach may be cheaper and easier to operate. + +### Medium: `X-RateLimit-Reset` is not a standard reset timestamp + +The response uses seconds-until-reset as `X-RateLimit-Reset`: + +- `rate_limit.py:130` +- `rate_limit.py:137` + +Many clients expect reset to be an epoch timestamp. If the project wants delta seconds, use a clearer header such as `X-RateLimit-Reset-After`, or document the behavior explicitly. + +### Low: config validation is too narrow for future extension + +`RateLimitConfig.from_dict` validates `by`, `requests`, and `window`, but there is no namespacing for backend-specific settings: + +- `graph_config.py:19-32` + +If Redis is added by sprinkling `redis_url` into the same flat object, the config will become messy quickly. + +### Low: no route/method exclusions + +The limiter applies to every HTTP request once enabled: + +- `setup_middleware.py:151-155` + +That includes docs, health checks, metrics, readiness probes, and possibly streaming endpoints. Production deployments usually need `exclude_paths`, `include_paths`, or route groups. + + +Claude Sonnet built the rate limiter equivalent of putting a bicycle lock on a cloud load balancer. + +It has the aesthetic of scalability: config file, middleware class, headers, neat docstring. But the actual quota lives in a Python dictionary inside one worker. The moment you add another worker, container, or node, the "limit" becomes a polite suggestion with multiplication enabled. + +The `global` option is especially funny. It is global in the same way a sticky note on one laptop is company policy. Accurate within a very small emotional radius. + +And trusting `X-Forwarded-For` directly is the security version of accepting "I am definitely the CEO" because someone wrote it in a request header. + +To be fair, this is a reasonable first draft for local demos. But for AgentFlow's stated goal, this should not be the production shape. + +## Proposed config shape + +Add one more keyword: `backend`. + +Recommended default: + +```json +{ + "rate_limit": { + "enabled": true, + "backend": "memory", + "requests": 100, + "window": 60, + "by": "ip" + } +} +``` + +Production Redis example: + +```json +{ + "rate_limit": { + "enabled": true, + "backend": "redis", + "redis": { + "url": "${REDIS_URL}", + "prefix": "agentflow:rate-limit" + }, + "requests": 100, + "window": 60, + "by": "ip" + } +} +``` + +Future custom backend example: + +```json +{ + "rate_limit": { + "enabled": true, + "backend": "custom", + "path": "my_project.rate_limit:backend", + "requests": 100, + "window": 60, + "by": "user" + } +} +``` + +Suggested fields: + +| Field | Type | Notes | +| --- | --- | --- | +| `enabled` | bool | Enables/disables middleware. | +| `backend` | string | `memory`, `redis`, or `custom`. Default can be `memory` for backward compatibility. | +| `requests` | int | Max requests per window. | +| `window` | int | Window duration in seconds. | +| `by` | string | Start with `ip` and `global`; later add `user`, `api_key`, `thread`, or `custom`. | +| `redis.url` | string | Required when `backend = "redis"` unless a shared app Redis config is used. | +| `redis.prefix` | string | Key prefix for isolation. | +| `exclude_paths` | list[string] | Optional health/docs/metrics exclusions. | +| `trusted_proxy_headers` | bool | Only honor forwarding headers when explicitly enabled. | + +## Proposed architecture + +Split the rate limiter into three layers: + +1. `RateLimitConfig` + Parses and validates config, including backend-specific options. + +2. `RateLimitBackend` + Owns the storage and algorithm. + +3. `RateLimitMiddleware` + Extracts identity, asks the backend whether the request is allowed, and writes response headers. + +Example interface: + +```python +from dataclasses import dataclass +from typing import Protocol + + +@dataclass(frozen=True) +class RateLimitDecision: + allowed: bool + limit: int + remaining: int + retry_after: int + reset_after: int + + +class RateLimitBackend(Protocol): + async def check(self, key: str, *, limit: int, window: int) -> RateLimitDecision: + ... +``` + +Then implement: + +- `MemoryRateLimitBackend` +- `RedisRateLimitBackend` +- `CustomRateLimitBackend` + +The middleware should not know whether the counter lives in a deque, Redis, Postgres, or a user's custom implementation. + +## Redis backend recommendation + +For Redis, use an atomic operation. Do not perform `GET`, calculate in Python, then `SET`, because concurrent requests will race. + +Two good options: + +1. Fixed window with Redis `INCR` + `EXPIRE` + Simple, fast, atomic enough if wrapped carefully. Less smooth at window boundaries. + +2. Sliding window with Redis sorted sets + Lua script + More accurate, more expensive, still atomic if implemented in Lua. + +For AgentFlow, I would start with fixed window or token bucket unless exact sliding-window semantics are required. The framework likely cares more about predictable distributed enforcement than perfect boundary smoothing. + +## Migration plan + +### Step 1: Make config extensible + +Update `RateLimitConfig` to include: + +- `backend: str = "memory"` +- `redis_url: str | None = None` +- `redis_prefix: str = "agentflow:rate-limit"` +- `exclude_paths: tuple[str, ...] = ()` +- `trusted_proxy_headers: bool = False` + +Keep current config working by defaulting `backend` to `memory`. + +### Step 2: Extract the backend interface + +Move the deque logic out of `RateLimitMiddleware` into `MemoryRateLimitBackend`. + +This keeps the current behavior intact while creating the seam for Redis. + +### Step 3: Add Redis backend + +Use `redis.asyncio` from the existing optional `redis` dependency. + +The Redis backend should: + +- use a configurable key prefix +- use atomic commands or Lua +- return the same `RateLimitDecision` as memory +- fail closed or fail open based on an explicit config option, not accidental exception behavior + +### Step 4: Fix identity extraction + +Add an identity resolver with explicit modes: + +- `global` +- `ip` +- later: `user`, `api_key`, `custom` + +Only trust `X-Forwarded-For` when configured. Otherwise use `request.client.host`. + +### Step 5: Add exclusions + +Support paths like: + +```json +"exclude_paths": ["/health", "/metrics", "/docs", "/openapi.json"] +``` + +This avoids rate limiting health checks and operational endpoints. + +### Step 6: Add tests + +Minimum tests: + +- config parses old format without `backend` +- config parses `backend = "memory"` +- config parses `backend = "redis"` and requires Redis URL +- invalid backend raises a helpful error +- memory backend enforces limit +- Redis backend enforces limit across two backend instances +- `X-Forwarded-For` is ignored unless trusted proxy mode is enabled +- excluded paths bypass limiter + +## Recommended next implementation target + +Do this in one focused PR: + +1. Add `backend` parsing with backward compatibility. +2. Extract `MemoryRateLimitBackend`. +3. Keep current runtime behavior unchanged when `backend` is omitted. +4. Add tests for config and memory behavior. + +Then a second PR can add Redis without mixing architectural cleanup and networked storage in the same change. + From 825f956c60e751e17e5010ad01ff25044a2ba4fc Mon Sep 17 00:00:00 2001 From: Shudipto Trafder Date: Thu, 30 Apr 2026 12:21:00 +0600 Subject: [PATCH 2/5] wip --- agentflow.json | 5 +- .../src/app/core/config/graph_config.py | 118 +++++++++- .../src/app/core/config/setup_middleware.py | 24 +- .../src/app/core/middleware/__init__.py | 14 ++ .../src/app/core/middleware/rate_limit.py | 138 ----------- .../core/middleware/rate_limit/__init__.py | 15 ++ .../app/core/middleware/rate_limit/base.py | 27 +++ .../app/core/middleware/rate_limit/factory.py | 97 ++++++++ .../app/core/middleware/rate_limit/memory.py | 98 ++++++++ .../core/middleware/rate_limit/middleware.py | 93 ++++++++ .../app/core/middleware/rate_limit/redis.py | 114 +++++++++ agentflow_cli/src/app/main.py | 7 +- docs/rate-limit-review.md | 11 +- tests/unit_tests/test_rate_limit.py | 218 ++++++++++++++++++ uv.lock | 4 +- 15 files changed, 828 insertions(+), 155 deletions(-) delete mode 100644 agentflow_cli/src/app/core/middleware/rate_limit.py create mode 100644 agentflow_cli/src/app/core/middleware/rate_limit/__init__.py create mode 100644 agentflow_cli/src/app/core/middleware/rate_limit/base.py create mode 100644 agentflow_cli/src/app/core/middleware/rate_limit/factory.py create mode 100644 agentflow_cli/src/app/core/middleware/rate_limit/memory.py create mode 100644 agentflow_cli/src/app/core/middleware/rate_limit/middleware.py create mode 100644 agentflow_cli/src/app/core/middleware/rate_limit/redis.py create mode 100644 tests/unit_tests/test_rate_limit.py diff --git a/agentflow.json b/agentflow.json index fd11aa4..36732de 100644 --- a/agentflow.json +++ b/agentflow.json @@ -5,8 +5,11 @@ "auth": null, "rate_limit": { "enabled": true, + "backend": "memory", "requests": 100, "window": 60, - "by": "ip" + "by": "ip", + "trusted_proxy_headers": false, + "exclude_paths": ["/health", "/docs", "/redoc", "/openapi.json"] } } diff --git a/agentflow_cli/src/app/core/config/graph_config.py b/agentflow_cli/src/app/core/config/graph_config.py index 6d187d5..344278d 100644 --- a/agentflow_cli/src/app/core/config/graph_config.py +++ b/agentflow_cli/src/app/core/config/graph_config.py @@ -6,30 +6,140 @@ from dotenv import load_dotenv +def _parse_bool(value: object, *, field: str) -> bool: + if isinstance(value, bool): + return value + if isinstance(value, str): + normalized = value.strip().lower() + if normalized in {"1", "true", "yes", "on"}: + return True + if normalized in {"0", "false", "no", "off"}: + return False + raise ValueError(f"{field} must be a boolean") + + +def _expand_env(value: str | None) -> str | None: + if value is None: + return None + expanded = os.path.expandvars(value) + if expanded == value and (value.startswith("$") or "${" in value): + raise ValueError(f"Unresolved environment variable in value: {value}") + return expanded + + @dataclass class RateLimitConfig: - """Rate limit configuration parsed from agentflow.json.""" + """Rate limit configuration parsed from agentflow.json. + + Example (memory backend, default):: + + "rate_limit": { + "enabled": true, + "backend": "memory", + "requests": 100, + "window": 60, + "by": "ip", + "trusted_proxy_headers": false, + "exclude_paths": ["/health", "/docs", "/redoc", "/openapi.json"] + } + + Example (Redis backend):: + + "rate_limit": { + "enabled": true, + "backend": "redis", + "requests": 100, + "window": 60, + "by": "ip", + "trusted_proxy_headers": true, + "exclude_paths": ["/health"], + "redis": { + "url": "redis://localhost:6379/0", + "prefix": "agentflow:rate-limit" + }, + "fail_open": true + } + + Example (custom backend):: + + "rate_limit": { + "enabled": true, + "backend": "custom", + "requests": 100, + "window": 60, + "by": "ip" + } + + For custom backends, bind a ``BaseRateLimitBackend`` instance in InjectQ. + """ enabled: bool requests: int window: int by: str # "ip" | "global" + backend: str # "memory" | "redis" | "custom" + redis_url: str | None + redis_prefix: str + exclude_paths: tuple[str, ...] + trusted_proxy_headers: bool # honour X-Forwarded-For only when True + fail_open: bool # on backend error: True=allow, False=deny @classmethod def from_dict(cls, data: dict) -> "RateLimitConfig": - enabled = data.get("enabled", True) + if not isinstance(data, dict): + raise ValueError("rate_limit must be an object") + + enabled = _parse_bool(data.get("enabled", True), field="rate_limit.enabled") requests = int(data.get("requests", 100)) window = int(data.get("window", 60)) by = data.get("by", "ip") - + backend = data.get("backend", "memory") + trusted_proxy_headers = _parse_bool( + data.get("trusted_proxy_headers", False), + field="rate_limit.trusted_proxy_headers", + ) + exclude_paths_raw = data.get("exclude_paths", []) + if not isinstance(exclude_paths_raw, list | tuple): + raise ValueError("rate_limit.exclude_paths must be a list of paths") + exclude_paths = tuple(str(path) for path in exclude_paths_raw) + fail_open = _parse_bool(data.get("fail_open", True), field="rate_limit.fail_open") + + # Redis sub-object: {"url": "...", "prefix": "..."} + redis_obj = data.get("redis") or {} + if isinstance(redis_obj, str): + # Allow shorthand: "redis": "redis://..." + redis_url: str | None = _expand_env(redis_obj) + redis_prefix = "agentflow:rate-limit" + elif isinstance(redis_obj, dict): + redis_url = _expand_env(redis_obj.get("url") or None) + redis_prefix = str(redis_obj.get("prefix", "agentflow:rate-limit")) + else: + raise ValueError("rate_limit.redis must be an object or Redis URL string") + + # Validation if by not in ("ip", "global"): raise ValueError(f"rate_limit.by must be 'ip' or 'global', got '{by}'") + if backend not in ("memory", "redis", "custom"): + raise ValueError( + f"rate_limit.backend must be 'memory', 'redis', or 'custom', got '{backend}'" + ) if requests <= 0: raise ValueError("rate_limit.requests must be a positive integer") if window <= 0: raise ValueError("rate_limit.window must be a positive integer") - return cls(enabled=enabled, requests=requests, window=window, by=by) + return cls( + enabled=enabled, + requests=requests, + window=window, + by=by, + backend=backend, + redis_url=redis_url, + redis_prefix=redis_prefix, + exclude_paths=exclude_paths, + trusted_proxy_headers=trusted_proxy_headers, + fail_open=fail_open, + ) class GraphConfig: diff --git a/agentflow_cli/src/app/core/config/setup_middleware.py b/agentflow_cli/src/app/core/config/setup_middleware.py index 8be2581..b1da167 100644 --- a/agentflow_cli/src/app/core/config/setup_middleware.py +++ b/agentflow_cli/src/app/core/config/setup_middleware.py @@ -4,12 +4,13 @@ from fastapi import FastAPI from fastapi.middleware.gzip import GZipMiddleware from fastapi.middleware.trustedhost import TrustedHostMiddleware +from injectq import InjectQ from starlette.middleware.base import BaseHTTPMiddleware from starlette.middleware.cors import CORSMiddleware from starlette.requests import Request from starlette.types import ASGIApp, Receive, Scope, Send -from agentflow_cli.src.app.core.middleware.rate_limit import RateLimitMiddleware +from agentflow_cli.src.app.core.middleware.rate_limit import RateLimitMiddleware, build_backend from agentflow_cli.src.app.core.middleware.request_limits import RequestSizeLimitMiddleware from agentflow_cli.src.app.core.middleware.security_headers import SecurityHeadersMiddleware @@ -96,7 +97,11 @@ async def dispatch(self, request: Request, call_next): return response -def setup_middleware(app: FastAPI, graph_config: GraphConfig | None = None): +def setup_middleware( + app: FastAPI, + graph_config: GraphConfig | None = None, + container: InjectQ | None = None, +): """ Set up middleware for the FastAPI application. @@ -152,12 +157,23 @@ def setup_middleware(app: FastAPI, graph_config: GraphConfig | None = None): if graph_config is not None: rate_limit_config = graph_config.rate_limit if rate_limit_config is not None: - app.add_middleware(RateLimitMiddleware, config=rate_limit_config) + backend = build_backend(rate_limit_config, container=container) + # Store on app.state so lifespan can close it cleanly + app.state.rate_limit_backend = backend + app.add_middleware( + RateLimitMiddleware, + config=rate_limit_config, + backend=backend, + ) logger.info( - "Rate limiting enabled: %d requests per %ds by %s", + "Rate limiting enabled: backend=%s, %d req/%ds, by=%s, " + "exclude_paths=%s, trusted_proxy_headers=%s", + rate_limit_config.backend, rate_limit_config.requests, rate_limit_config.window, rate_limit_config.by, + rate_limit_config.exclude_paths or "(none)", + rate_limit_config.trusted_proxy_headers, ) logger.debug("Middleware set up") diff --git a/agentflow_cli/src/app/core/middleware/__init__.py b/agentflow_cli/src/app/core/middleware/__init__.py index 8491a5b..84f98b7 100644 --- a/agentflow_cli/src/app/core/middleware/__init__.py +++ b/agentflow_cli/src/app/core/middleware/__init__.py @@ -1,10 +1,24 @@ """Middleware modules for agentflow-cli.""" +from .rate_limit import ( + BaseRateLimitBackend, + MemoryRateLimitBackend, + RateLimitDecision, + RateLimitMiddleware, + RedisRateLimitBackend, + build_backend, +) from .request_limits import RequestSizeLimitMiddleware from .security_headers import SecurityHeadersMiddleware, create_security_headers_middleware __all__ = [ + "BaseRateLimitBackend", + "RateLimitMiddleware", + "RateLimitDecision", + "MemoryRateLimitBackend", + "RedisRateLimitBackend", + "build_backend", "RequestSizeLimitMiddleware", "SecurityHeadersMiddleware", "create_security_headers_middleware", diff --git a/agentflow_cli/src/app/core/middleware/rate_limit.py b/agentflow_cli/src/app/core/middleware/rate_limit.py deleted file mode 100644 index 40cc442..0000000 --- a/agentflow_cli/src/app/core/middleware/rate_limit.py +++ /dev/null @@ -1,138 +0,0 @@ -"""In-memory sliding-window rate limit middleware. - -Configured via ``agentflow.json``:: - - "rate_limit": { - "enabled": true, - "requests": 100, - "window": 60, - "by": "ip" - } - -Fields ------- -enabled : bool – turn the limiter on/off without removing the key. -requests : int – max requests allowed in the window. -window : int – rolling window duration in seconds. -by : str – "ip" (per client IP) or "global" (single shared bucket). -""" - -import asyncio -import time -from collections import deque - -from fastapi import status -from fastapi.responses import JSONResponse -from starlette.middleware.base import BaseHTTPMiddleware -from starlette.requests import Request - -from agentflow_cli.src.app.core import logger -from agentflow_cli.src.app.core.config.graph_config import RateLimitConfig - - -class RateLimitMiddleware(BaseHTTPMiddleware): - """Sliding-window rate limiter middleware. - - Args: - app: The ASGI application. - config: Parsed :class:`RateLimitConfig` from ``agentflow.json``. - """ - - def __init__(self, app, config: RateLimitConfig) -> None: - super().__init__(app) - self.config = config - # bucket key -> deque of request timestamps (float, epoch seconds) - self._buckets: dict[str, deque[float]] = {} - self._lock = asyncio.Lock() - - # ------------------------------------------------------------------ - # Internal helpers - # ------------------------------------------------------------------ - - def _bucket_key(self, request: Request) -> str: - if self.config.by == "global": - return "__global__" - # Per-IP: honour X-Forwarded-For when behind a proxy, fall back to - # the direct client address. - forwarded_for = request.headers.get("X-Forwarded-For") - if forwarded_for: - return forwarded_for.split(",")[0].strip() - client = request.client - return client.host if client else "unknown" - - async def _is_allowed(self, key: str) -> tuple[bool, int, int]: - """Check whether the request is within the rate limit. - - Returns - ------- - (allowed, remaining, reset_in_seconds) - """ - now = time.monotonic() - window_start = now - self.config.window - - async with self._lock: - bucket = self._buckets.setdefault(key, deque()) - - # Drop timestamps that fell outside the current window. - while bucket and bucket[0] < window_start: - bucket.popleft() - - count = len(bucket) - remaining = max(0, self.config.requests - count - 1) - - if count >= self.config.requests: - # How long until the oldest entry expires. - reset_in = int(self.config.window - (now - bucket[0])) + 1 - return False, 0, reset_in - - bucket.append(now) - return True, remaining, self.config.window - - # ------------------------------------------------------------------ - # Middleware dispatch - # ------------------------------------------------------------------ - - async def dispatch(self, request: Request, call_next): - key = self._bucket_key(request) - allowed, remaining, reset_in = await self._is_allowed(key) - - if not allowed: - request_id = getattr(request.state, "request_id", "unknown") - logger.warning( - "Rate limit exceeded for %s on %s %s", - key, - request.method, - request.url.path, - ) - return JSONResponse( - status_code=status.HTTP_429_TOO_MANY_REQUESTS, - content={ - "error": { - "code": "RATE_LIMIT_EXCEEDED", - "message": ( - f"Too many requests. Limit is {self.config.requests} " - f"requests per {self.config.window}s. " - f"Retry after {reset_in}s." - ), - "limit": self.config.requests, - "window_seconds": self.config.window, - "retry_after_seconds": reset_in, - }, - "metadata": { - "request_id": request_id, - "status": "error", - }, - }, - headers={ - "Retry-After": str(reset_in), - "X-RateLimit-Limit": str(self.config.requests), - "X-RateLimit-Remaining": "0", - "X-RateLimit-Reset": str(reset_in), - }, - ) - - response = await call_next(request) - response.headers["X-RateLimit-Limit"] = str(self.config.requests) - response.headers["X-RateLimit-Remaining"] = str(remaining) - response.headers["X-RateLimit-Reset"] = str(reset_in) - return response diff --git a/agentflow_cli/src/app/core/middleware/rate_limit/__init__.py b/agentflow_cli/src/app/core/middleware/rate_limit/__init__.py new file mode 100644 index 0000000..84c13c8 --- /dev/null +++ b/agentflow_cli/src/app/core/middleware/rate_limit/__init__.py @@ -0,0 +1,15 @@ +from .base import BaseRateLimitBackend, RateLimitDecision +from .factory import build_backend +from .memory import MemoryRateLimitBackend +from .middleware import RateLimitMiddleware +from .redis import RedisRateLimitBackend + + +__all__ = [ + "BaseRateLimitBackend", + "MemoryRateLimitBackend", + "RateLimitDecision", + "RateLimitMiddleware", + "RedisRateLimitBackend", + "build_backend", +] diff --git a/agentflow_cli/src/app/core/middleware/rate_limit/base.py b/agentflow_cli/src/app/core/middleware/rate_limit/base.py new file mode 100644 index 0000000..cd3ea4f --- /dev/null +++ b/agentflow_cli/src/app/core/middleware/rate_limit/base.py @@ -0,0 +1,27 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass + + +@dataclass(frozen=True) +class RateLimitDecision: + """Result of a single rate-limit check.""" + + allowed: bool + remaining: int + reset_after: int + + +class BaseRateLimitBackend(ABC): + """Abstract base class for rate-limit backends. + + Users can implement this class and bind an instance into InjectQ when they + need a backend other than the built-in memory or Redis implementations. + """ + + @abstractmethod + async def check(self, key: str, *, limit: int, window: int) -> RateLimitDecision: + """Atomically check and record a request for *key*.""" + + @abstractmethod + async def close(self) -> None: + """Release resources held by the backend, if any.""" diff --git a/agentflow_cli/src/app/core/middleware/rate_limit/factory.py b/agentflow_cli/src/app/core/middleware/rate_limit/factory.py new file mode 100644 index 0000000..8097bbd --- /dev/null +++ b/agentflow_cli/src/app/core/middleware/rate_limit/factory.py @@ -0,0 +1,97 @@ +import logging +from typing import TYPE_CHECKING, Any + +from injectq import InjectQ + +from agentflow_cli.src.app.core.config.graph_config import RateLimitConfig + +from .base import BaseRateLimitBackend +from .memory import MemoryRateLimitBackend +from .redis import AsyncRedis, RedisRateLimitBackend + + +if TYPE_CHECKING: + from redis.asyncio import Redis + + +logger = logging.getLogger("agentflow_api.rate_limit") + + +def build_backend( + config: RateLimitConfig, + container: InjectQ | None = None, +) -> BaseRateLimitBackend: + """Build or resolve the configured rate-limit backend. + + Custom backends are provided by binding a ``BaseRateLimitBackend`` instance + into InjectQ, matching the style used by auth/authorization. + """ + container = container or InjectQ.get_instance() + + injected_backend = container.try_get(BaseRateLimitBackend) + if config.backend == "custom": + if not injected_backend: + raise ValueError( + "rate_limit.backend='custom' requires a BaseRateLimitBackend " + "instance bound in InjectQ" + ) + return injected_backend + + if config.backend == "redis": + return _build_redis_backend(config, container) + + logger.info("Rate-limit backend: memory (in-process, not shared across workers)") + return MemoryRateLimitBackend() + + +def _build_redis_backend( + config: RateLimitConfig, + container: InjectQ, +) -> RedisRateLimitBackend: + redis = _get_redis_from_container(container) + if redis is not None: + logger.info( + "Rate-limit backend: Redis from InjectQ (prefix=%s, fail_open=%s)", + config.redis_prefix, + config.fail_open, + ) + return RedisRateLimitBackend( + redis=redis, + prefix=config.redis_prefix, + fail_open=config.fail_open, + close_redis=False, + ) + + if not config.redis_url: + raise ValueError("rate_limit.redis.url is required when no Redis client is bound") + + backend = RedisRateLimitBackend.from_url( + redis_url=config.redis_url, + prefix=config.redis_prefix, + fail_open=config.fail_open, + ) + _bind_redis(container, backend._redis) + logger.info( + "Rate-limit backend: created Redis and bound it in InjectQ " + "(prefix=%s, fail_open=%s)", + config.redis_prefix, + config.fail_open, + ) + return backend + + +def _get_redis_from_container(container: InjectQ) -> Any | None: + redis = container.try_get("redis") or container.try_get("redis_client") + if redis is not None: + return redis + if AsyncRedis is not None: + redis = container.try_get(AsyncRedis) + if redis is not None: + return redis + return container.try_get("Redis") + + +def _bind_redis(container: InjectQ, redis: "Redis") -> None: + container.bind_instance("redis", redis) + if AsyncRedis is not None: + container.bind_instance(AsyncRedis, redis) diff --git a/agentflow_cli/src/app/core/middleware/rate_limit/memory.py b/agentflow_cli/src/app/core/middleware/rate_limit/memory.py new file mode 100644 index 0000000..d8033c6 --- /dev/null +++ b/agentflow_cli/src/app/core/middleware/rate_limit/memory.py @@ -0,0 +1,98 @@ +import asyncio +import logging +import time +from collections import deque + +from .base import BaseRateLimitBackend, RateLimitDecision + + +logger = logging.getLogger("agentflow_api.rate_limit") + +_NUM_STRIPES = 64 + + +class MemoryRateLimitBackend(BaseRateLimitBackend): + """In-process sliding-window rate limiter. + + This is useful for development and single-process deployments. It is not + shared across workers or containers; use Redis for distributed enforcement. + """ + + _SWEEP_INTERVAL = 2000 + + def __init__(self, max_unique_keys: int = 50_000) -> None: + self._buckets: dict[str, deque[float]] = {} + self._last_seen: dict[str, float] = {} + self._locks = [asyncio.Lock() for _ in range(_NUM_STRIPES)] + self._sweep_lock = asyncio.Lock() + self._bg_tasks: set[asyncio.Task] = set() + self._check_count = 0 + self._max_unique_keys = max_unique_keys + + def _stripe_lock(self, key: str) -> asyncio.Lock: + return self._locks[hash(key) % _NUM_STRIPES] + + def _schedule_sweep(self, window: int) -> None: + task = asyncio.ensure_future(self._sweep(window)) + self._bg_tasks.add(task) + task.add_done_callback(self._bg_tasks.discard) + + def _evict_oldest_bucket(self) -> None: + if not self._last_seen: + return + oldest_key = min(self._last_seen, key=self._last_seen.__getitem__) + self._buckets.pop(oldest_key, None) + self._last_seen.pop(oldest_key, None) + + async def _sweep(self, window: int) -> None: + async with self._sweep_lock: + cutoff = time.monotonic() - window * 2 + stale = [k for k, ts in self._last_seen.items() if ts < cutoff] + for key in stale: + self._buckets.pop(key, None) + self._last_seen.pop(key, None) + if stale: + logger.debug("Rate-limit memory sweep removed %d stale buckets", len(stale)) + + async def check(self, key: str, *, limit: int, window: int) -> RateLimitDecision: + now = time.monotonic() + window_start = now - window + + self._check_count += 1 + if self._check_count % self._SWEEP_INTERVAL == 0: + self._schedule_sweep(window) + + async with self._stripe_lock(key): + if key not in self._buckets and len(self._buckets) >= self._max_unique_keys: + logger.warning( + "Rate-limit bucket cap (%d) reached; running emergency sweep", + self._max_unique_keys, + ) + await self._sweep(window) + if len(self._buckets) >= self._max_unique_keys: + self._evict_oldest_bucket() + + bucket = self._buckets.setdefault(key, deque()) + self._last_seen[key] = now + + while bucket and bucket[0] < window_start: + bucket.popleft() + + count = len(bucket) + if count >= limit: + reset_after = max(1, int(bucket[0] + window - now) + 1) + return RateLimitDecision(allowed=False, remaining=0, reset_after=reset_after) + + bucket.append(now) + return RateLimitDecision( + allowed=True, + remaining=limit - len(bucket), + reset_after=window, + ) + + async def close(self) -> None: + for task in self._bg_tasks: + task.cancel() + if self._bg_tasks: + await asyncio.gather(*self._bg_tasks, return_exceptions=True) + self._bg_tasks.clear() diff --git a/agentflow_cli/src/app/core/middleware/rate_limit/middleware.py b/agentflow_cli/src/app/core/middleware/rate_limit/middleware.py new file mode 100644 index 0000000..336ce13 --- /dev/null +++ b/agentflow_cli/src/app/core/middleware/rate_limit/middleware.py @@ -0,0 +1,93 @@ +import time + +from fastapi import status +from fastapi.responses import JSONResponse +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.requests import Request + +from agentflow_cli.src.app.core import logger +from agentflow_cli.src.app.core.config.graph_config import RateLimitConfig + +from .base import BaseRateLimitBackend + + +class RateLimitMiddleware(BaseHTTPMiddleware): + """Backend-agnostic rate-limit middleware.""" + + def __init__( + self, + app, + config: RateLimitConfig, + backend: BaseRateLimitBackend, + ) -> None: + super().__init__(app) + self.config = config + self.backend = backend + self._exclude = frozenset(config.exclude_paths) + + def _client_key(self, request: Request) -> str: + if self.config.by == "global": + return "__global__" + + if self.config.trusted_proxy_headers: + forwarded_for = request.headers.get("X-Forwarded-For") + if forwarded_for: + return forwarded_for.split(",")[0].strip() + + client = request.client + return client.host if client else "unknown" + + async def dispatch(self, request: Request, call_next): + if request.url.path in self._exclude: + return await call_next(request) + + key = self._client_key(request) + decision = await self.backend.check( + key, + limit=self.config.requests, + window=self.config.window, + ) + reset_at_epoch = int(time.time()) + decision.reset_after + + if not decision.allowed: + request_id = getattr(request.state, "request_id", "unknown") + logger.warning( + "Rate limit exceeded for %s on %s %s", + key, + request.method, + request.url.path, + ) + return JSONResponse( + status_code=status.HTTP_429_TOO_MANY_REQUESTS, + content={ + "error": { + "code": "RATE_LIMIT_EXCEEDED", + "message": ( + f"Too many requests. Limit: {self.config.requests} " + f"per {self.config.window}s. " + f"Retry after {decision.reset_after}s." + ), + "limit": self.config.requests, + "window_seconds": self.config.window, + "retry_after_seconds": decision.reset_after, + }, + "metadata": { + "request_id": request_id, + "status": "error", + }, + }, + headers={ + "Retry-After": str(decision.reset_after), + "X-RateLimit-Limit": str(self.config.requests), + "X-RateLimit-Remaining": "0", + "X-RateLimit-Reset": str(reset_at_epoch), + "X-RateLimit-Reset-After": str(decision.reset_after), + }, + ) + + response = await call_next(request) + response.headers["X-RateLimit-Limit"] = str(self.config.requests) + response.headers["X-RateLimit-Remaining"] = str(decision.remaining) + response.headers["X-RateLimit-Reset"] = str(reset_at_epoch) + response.headers["X-RateLimit-Reset-After"] = str(decision.reset_after) + return response diff --git a/agentflow_cli/src/app/core/middleware/rate_limit/redis.py b/agentflow_cli/src/app/core/middleware/rate_limit/redis.py new file mode 100644 index 0000000..d303763 --- /dev/null +++ b/agentflow_cli/src/app/core/middleware/rate_limit/redis.py @@ -0,0 +1,114 @@ +import logging +import time +import uuid +from typing import Any + +from .base import BaseRateLimitBackend, RateLimitDecision + + +try: + from redis.asyncio import Redis as AsyncRedis # type: ignore[import] + + _REDIS_AVAILABLE = True +except ImportError: + _REDIS_AVAILABLE = False + AsyncRedis = None # type: ignore[assignment,misc] + + +logger = logging.getLogger("agentflow_api.rate_limit") + +# Atomic sliding-window check using a Redis sorted set. +# +# The script: +# 1. Removes timestamps older than the current window. +# 2. Counts the remaining requests for the key. +# 3. If under the limit, adds the current request as a unique sorted-set member. +# 4. Sets an expiry so idle keys clean themselves up. +# 5. Returns whether the request is allowed, remaining quota, and reset time. +_SLIDING_WINDOW_LUA = """ +local key = KEYS[1] +local now_ms = tonumber(ARGV[1]) +local window_ms = tonumber(ARGV[2]) +local limit = tonumber(ARGV[3]) +local member = ARGV[4] + +local window_start = now_ms - window_ms + +redis.call('ZREMRANGEBYSCORE', key, '-inf', tostring(window_start)) + +local count = tonumber(redis.call('ZCARD', key)) + +if count < limit then + redis.call('ZADD', key, tostring(now_ms), member) + redis.call('EXPIRE', key, math.ceil(window_ms / 1000) + 1) + return {1, limit - count - 1, math.ceil(window_ms / 1000)} +else + local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES') + local reset_after + if #oldest >= 2 then + local oldest_ms = tonumber(oldest[2]) + reset_after = math.ceil((oldest_ms + window_ms - now_ms) / 1000) + 1 + else + reset_after = math.ceil(window_ms / 1000) + end + return {0, 0, math.max(reset_after, 1)} +end +""" + + +class RedisRateLimitBackend(BaseRateLimitBackend): + """Distributed sliding-window rate limiter backed by Redis.""" + + def __init__( + self, + redis: Any, + prefix: str, + fail_open: bool = True, + close_redis: bool = False, + ) -> None: + self._redis = redis + self._prefix = prefix + self._fail_open = fail_open + self._close_redis = close_redis + self._script = self._redis.register_script(_SLIDING_WINDOW_LUA) + + @classmethod + def from_url( + cls, + redis_url: str, + prefix: str, + fail_open: bool = True, + ) -> "RedisRateLimitBackend": + if not _REDIS_AVAILABLE or AsyncRedis is None: + raise ImportError( + "Redis backend requires the 'redis' package. " + "Install it with: pip install 'redis>=5.0.7'" + ) + redis = AsyncRedis.from_url(redis_url, decode_responses=False) + return cls(redis=redis, prefix=prefix, fail_open=fail_open, close_redis=True) + + async def check(self, key: str, *, limit: int, window: int) -> RateLimitDecision: + redis_key = f"{self._prefix}:{key}" + now_ms = int(time.time() * 1000) + window_ms = window * 1000 + member = f"{now_ms}:{uuid.uuid4().hex}" + + try: + result = await self._script( + keys=[redis_key], + args=[now_ms, window_ms, limit, member], + ) + return RateLimitDecision( + allowed=bool(result[0]), + remaining=int(result[1]), + reset_after=int(result[2]), + ) + except Exception: + logger.exception("Redis rate-limit backend error") + if self._fail_open: + return RateLimitDecision(allowed=True, remaining=0, reset_after=window) + return RateLimitDecision(allowed=False, remaining=0, reset_after=window) + + async def close(self) -> None: + if self._close_redis: + await self._redis.aclose() diff --git a/agentflow_cli/src/app/main.py b/agentflow_cli/src/app/main.py index 62e5004..9d39f1e 100644 --- a/agentflow_cli/src/app/main.py +++ b/agentflow_cli/src/app/main.py @@ -86,6 +86,11 @@ async def lifespan(app: FastAPI): # release all the resources await graph.aclose() + # Close rate-limit backend (e.g. Redis connection pool) + backend = getattr(app.state, "rate_limit_backend", None) + if backend is not None: + await backend.close() + app = FastAPI( title=settings.APP_NAME, @@ -99,7 +104,7 @@ async def lifespan(app: FastAPI): root_path=settings.ROOT_PATH, ) -setup_middleware(app, graph_config=graph_config) +setup_middleware(app, graph_config=graph_config, container=container) # attach_injector(app, injector=injector) setup_fastapi(container=container, app=app) diff --git a/docs/rate-limit-review.md b/docs/rate-limit-review.md index 72764d5..23d21f5 100644 --- a/docs/rate-limit-review.md +++ b/docs/rate-limit-review.md @@ -162,7 +162,6 @@ Future custom backend example: "rate_limit": { "enabled": true, "backend": "custom", - "path": "my_project.rate_limit:backend", "requests": 100, "window": 60, "by": "user" @@ -170,6 +169,9 @@ Future custom backend example: } ``` +For custom backends, implement `BaseRateLimitBackend` and bind an instance into +InjectQ, matching the authentication/authorization pattern. + Suggested fields: | Field | Type | Notes | @@ -200,8 +202,8 @@ Split the rate limiter into three layers: Example interface: ```python +from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Protocol @dataclass(frozen=True) @@ -213,7 +215,8 @@ class RateLimitDecision: reset_after: int -class RateLimitBackend(Protocol): +class BaseRateLimitBackend(ABC): + @abstractmethod async def check(self, key: str, *, limit: int, window: int) -> RateLimitDecision: ... ``` @@ -222,7 +225,6 @@ Then implement: - `MemoryRateLimitBackend` - `RedisRateLimitBackend` -- `CustomRateLimitBackend` The middleware should not know whether the counter lives in a deque, Redis, Postgres, or a user's custom implementation. @@ -314,4 +316,3 @@ Do this in one focused PR: 4. Add tests for config and memory behavior. Then a second PR can add Redis without mixing architectural cleanup and networked storage in the same change. - diff --git a/tests/unit_tests/test_rate_limit.py b/tests/unit_tests/test_rate_limit.py new file mode 100644 index 0000000..26ed06c --- /dev/null +++ b/tests/unit_tests/test_rate_limit.py @@ -0,0 +1,218 @@ +# ruff: noqa: S101, PLR2004, SLF001 + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient +from injectq import InjectQ + +from agentflow_cli.src.app.core.config.graph_config import RateLimitConfig +from agentflow_cli.src.app.core.middleware.rate_limit import ( + BaseRateLimitBackend, + MemoryRateLimitBackend, + RateLimitDecision, + RateLimitMiddleware, + RedisRateLimitBackend, + build_backend, +) + + +def _config(**overrides) -> RateLimitConfig: + data = { + "enabled": True, + "backend": "memory", + "requests": 2, + "window": 60, + "by": "ip", + } + data.update(overrides) + return RateLimitConfig.from_dict(data) + + +@pytest.mark.asyncio +async def test_memory_backend_enforces_limit(): + backend = MemoryRateLimitBackend() + + first = await backend.check("client", limit=2, window=60) + second = await backend.check("client", limit=2, window=60) + third = await backend.check("client", limit=2, window=60) + + assert first.allowed is True + assert second.allowed is True + assert third.allowed is False + assert third.remaining == 0 + + +@pytest.mark.asyncio +async def test_memory_backend_unique_key_cap_evicts(): + backend = MemoryRateLimitBackend(max_unique_keys=2) + + await backend.check("a", limit=10, window=60) + await backend.check("b", limit=10, window=60) + await backend.check("c", limit=10, window=60) + + assert len(backend._buckets) <= 2 + assert "c" in backend._buckets + + +@pytest.mark.asyncio +async def test_redis_backend_uses_unique_members_for_same_millisecond(monkeypatch): + calls = [] + + async def fake_script(*, keys, args): + calls.append((keys, args)) + return [1, 1, 60] + + backend = object.__new__(RedisRateLimitBackend) + backend._prefix = "agentflow:test" + backend._fail_open = True + backend._script = fake_script + + monkeypatch.setattr("time.time", lambda: 123.456) + + await backend.check("client", limit=2, window=60) + await backend.check("client", limit=2, window=60) + + first_member = calls[0][1][3] + second_member = calls[1][1][3] + assert first_member.startswith("123456:") + assert second_member.startswith("123456:") + assert first_member != second_member + + +def test_rate_limit_config_parses_boolean_strings_and_expands_redis_url(monkeypatch): + monkeypatch.setenv("RATE_LIMIT_REDIS_URL", "redis://localhost:6379/4") + + config = RateLimitConfig.from_dict( + { + "enabled": "true", + "backend": "redis", + "requests": 5, + "window": 10, + "by": "global", + "trusted_proxy_headers": "false", + "fail_open": "no", + "redis": {"url": "${RATE_LIMIT_REDIS_URL}", "prefix": "agentflow:test"}, + } + ) + + assert config.enabled is True + assert config.trusted_proxy_headers is False + assert config.fail_open is False + assert config.redis_url == "redis://localhost:6379/4" + + +def test_rate_limit_config_allows_custom_backend_without_path(): + config = RateLimitConfig.from_dict({"backend": "custom"}) + + assert config.backend == "custom" + + +def test_rate_limit_config_rejects_invalid_boolean_string(): + with pytest.raises(ValueError, match="rate_limit.fail_open must be a boolean"): + RateLimitConfig.from_dict({"fail_open": "sometimes"}) + + +def test_rate_limit_middleware_ignores_forwarded_for_by_default(): + app = FastAPI() + backend = MemoryRateLimitBackend() + app.add_middleware(RateLimitMiddleware, config=_config(), backend=backend) + + @app.get("/") + def root(): + return {"ok": True} + + client = TestClient(app) + assert client.get("/", headers={"X-Forwarded-For": "1.1.1.1"}).status_code == 200 + assert client.get("/", headers={"X-Forwarded-For": "2.2.2.2"}).status_code == 200 + assert client.get("/", headers={"X-Forwarded-For": "3.3.3.3"}).status_code == 429 + + +def test_rate_limit_middleware_uses_forwarded_for_when_trusted(): + app = FastAPI() + backend = MemoryRateLimitBackend() + app.add_middleware( + RateLimitMiddleware, + config=_config(trusted_proxy_headers=True), + backend=backend, + ) + + @app.get("/") + def root(): + return {"ok": True} + + client = TestClient(app) + assert client.get("/", headers={"X-Forwarded-For": "1.1.1.1"}).status_code == 200 + assert client.get("/", headers={"X-Forwarded-For": "2.2.2.2"}).status_code == 200 + assert client.get("/", headers={"X-Forwarded-For": "3.3.3.3"}).status_code == 200 + + +def test_rate_limit_middleware_excludes_paths(): + app = FastAPI() + backend = MemoryRateLimitBackend() + app.add_middleware( + RateLimitMiddleware, + config=_config(requests=1, exclude_paths=["/health"]), + backend=backend, + ) + + @app.get("/health") + def health(): + return {"ok": True} + + client = TestClient(app) + assert client.get("/health").status_code == 200 + assert client.get("/health").status_code == 200 + + +@pytest.mark.asyncio +async def test_custom_backend_resolves_from_injectq(): + class MyRateLimitBackend(BaseRateLimitBackend): + async def check(self, key: str, *, limit: int, window: int) -> RateLimitDecision: + return RateLimitDecision(allowed=True, remaining=limit - 1, reset_after=window) + + async def close(self) -> None: + return None + + container = InjectQ() + custom_backend = MyRateLimitBackend() + container.bind_instance(BaseRateLimitBackend, custom_backend) + + backend = build_backend(_config(backend="custom"), container=container) + + assert backend is custom_backend + assert (await backend.check("client", limit=2, window=60)).allowed is True + + +def test_custom_backend_requires_injectq_binding(): + with pytest.raises(ValueError, match="BaseRateLimitBackend"): + build_backend(_config(backend="custom"), container=InjectQ()) + + +def test_redis_backend_reuses_injectq_redis_client(): + class FakeRedis: + def register_script(self, script): + async def fake_script(*, keys, args): + return [1, 0, 60] + + return fake_script + + container = InjectQ() + redis = FakeRedis() + container.bind_instance("redis", redis) + + backend = build_backend( + _config( + backend="redis", + redis={"prefix": "agentflow:test"}, + ), + container=container, + ) + + assert isinstance(backend, RedisRateLimitBackend) + assert backend._redis is redis + assert backend._close_redis is False + + +def test_redis_backend_requires_url_when_no_injected_client(): + with pytest.raises(ValueError, match="redis.url"): + build_backend(_config(backend="redis"), container=InjectQ()) diff --git a/uv.lock b/uv.lock index 08ff4dd..f71a7d5 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.12" resolution-markers = [ "python_full_version >= '3.13'", @@ -23,7 +23,7 @@ wheels = [ [[package]] name = "10xscale-agentflow-cli" -version = "0.3.0" +version = "0.3.1" source = { editable = "." } dependencies = [ { name = "10xscale-agentflow" }, From b397e4a5717ed57c7590c5a50c7b9afeb8d2dfb0 Mon Sep 17 00:00:00 2001 From: Shudipto Trafder Date: Thu, 30 Apr 2026 14:34:33 +0600 Subject: [PATCH 3/5] feat: add rate limiting support with Redis and memory backends, update documentation --- .codex | 0 README.md | 11 +- .../app/core/middleware/rate_limit/factory.py | 20 +- .../app/core/middleware/rate_limit/redis.py | 3 +- docs/configuration.md | 78 +++++ docs/deployment.md | 2 +- docs/rate-limit-review.md | 318 ------------------ docs/rate-limiting.md | 192 +++++++++++ mkdocs.yaml | 86 ----- requirements.txt | 2 - tests/unit_tests/test_rate_limit.py | 17 + 11 files changed, 310 insertions(+), 419 deletions(-) create mode 100644 .codex delete mode 100644 docs/rate-limit-review.md create mode 100644 docs/rate-limiting.md delete mode 100644 mkdocs.yaml diff --git a/.codex b/.codex new file mode 100644 index 0000000..e69de29 diff --git a/README.md b/README.md index f9ca4a4..d5f6351 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ A professional Python API framework for building agent-based applications with F - **[Configuration Guide](./docs/configuration.md)** - All configuration options explained - **[Deployment Guide](./docs/deployment.md)** - Docker, Kubernetes, and cloud deployment - **[Authentication Guide](./docs/authentication.md)** - JWT and custom authentication +- **[Rate Limiting Guide](./docs/rate-limiting.md)** - Memory, Redis, and custom rate-limit backends - **[ID Generation Guide](./docs/id-generation.md)** - Snowflake ID generation - **[Thread Name Generator Guide](./docs/thread-name-generator.md)** - Thread naming strategies @@ -19,6 +20,13 @@ A professional Python API framework for building agent-based applications with F pip install 10xscale-agentflow-cli ``` +Redis rate limiting is optional. Install the Redis extra only when you configure +`rate_limit.backend` as `redis`: + +```bash +pip install "10xscale-agentflow-cli[redis]" +``` + ### Initialize a New Project ```bash @@ -53,6 +61,7 @@ agentflow build --docker-compose - ✅ **State Graph Orchestration** - Build complex agent workflows with LangGraph - ✅ **FastAPI Backend** - High-performance async web framework - ✅ **Authentication** - Built-in JWT auth and custom authentication support +- ✅ **Rate Limiting** - Sliding-window limits with memory, Redis, and custom backends - ✅ **ID Generation** - Distributed Snowflake ID generation - ✅ **Thread Management** - Intelligent thread naming and conversation management - ✅ **Docker Ready** - Generate production-ready Docker files @@ -174,6 +183,7 @@ The configuration file (`agentflow.json`) defines your agent, authentication, an | `injectq` | string\|null | Path to InjectQ container | | `store` | string\|null | Path to data store | | `redis` | string\|null | Redis connection URL | +| `rate_limit` | object\|null | Sliding-window rate limiting configuration | | `thread_name_generator` | string\|null | Path to custom thread name generator | See the [Configuration Guide](./docs/configuration.md) for complete details. @@ -544,4 +554,3 @@ Developed by [10xScale](https://10xscale.ai) and maintained by the community. --- **Made with ❤️ for the AI agent development community** - diff --git a/agentflow_cli/src/app/core/middleware/rate_limit/factory.py b/agentflow_cli/src/app/core/middleware/rate_limit/factory.py index 8097bbd..1929f81 100644 --- a/agentflow_cli/src/app/core/middleware/rate_limit/factory.py +++ b/agentflow_cli/src/app/core/middleware/rate_limit/factory.py @@ -7,7 +7,7 @@ from .base import BaseRateLimitBackend from .memory import MemoryRateLimitBackend -from .redis import AsyncRedis, RedisRateLimitBackend +from .redis import RedisRateLimitBackend if TYPE_CHECKING: @@ -72,8 +72,7 @@ def _build_redis_backend( ) _bind_redis(container, backend._redis) logger.info( - "Rate-limit backend: created Redis and bound it in InjectQ " - "(prefix=%s, fail_open=%s)", + "Rate-limit backend: created Redis and bound it in InjectQ (prefix=%s, fail_open=%s)", config.redis_prefix, config.fail_open, ) @@ -84,14 +83,15 @@ def _get_redis_from_container(container: InjectQ) -> Any | None: redis = container.try_get("redis") or container.try_get("redis_client") if redis is not None: return redis - if AsyncRedis is not None: - redis = container.try_get(AsyncRedis) - if redis is not None: - return redis + + from redis.asyncio import Redis + + redis = container.try_get(Redis) + if redis is not None: + return redis return container.try_get("Redis") def _bind_redis(container: InjectQ, redis: "Redis") -> None: - container.bind_instance("redis", redis) - if AsyncRedis is not None: - container.bind_instance(AsyncRedis, redis) + if Redis is not None: + container.bind_instance(Redis, redis) diff --git a/agentflow_cli/src/app/core/middleware/rate_limit/redis.py b/agentflow_cli/src/app/core/middleware/rate_limit/redis.py index d303763..3ff0fb3 100644 --- a/agentflow_cli/src/app/core/middleware/rate_limit/redis.py +++ b/agentflow_cli/src/app/core/middleware/rate_limit/redis.py @@ -16,6 +16,7 @@ logger = logging.getLogger("agentflow_api.rate_limit") +_REDIS_EXTRA_INSTALL = 'pip install "10xscale-agentflow-cli[redis]"' # Atomic sliding-window check using a Redis sorted set. # @@ -82,7 +83,7 @@ def from_url( if not _REDIS_AVAILABLE or AsyncRedis is None: raise ImportError( "Redis backend requires the 'redis' package. " - "Install it with: pip install 'redis>=5.0.7'" + f"Install the optional Redis extra with: {_REDIS_EXTRA_INSTALL}" ) redis = AsyncRedis.from_url(redis_url, decode_responses=False) return cls(redis=redis, prefix=prefix, fail_open=fail_open, close_redis=True) diff --git a/docs/configuration.md b/docs/configuration.md index 54f82fb..2db6c55 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -9,6 +9,7 @@ This document provides a complete reference for configuring your AgentFlow appli - [Authentication](#authentication) - [Dependency Injection](#dependency-injection) - [Storage & Persistence](#storage--persistence) +- [Rate Limiting](#rate-limiting) - [Environment Variables](#environment-variables) - [Application Settings](#application-settings) - [Examples](#examples) @@ -43,6 +44,7 @@ agentflow api --config /path/to/config.json "injectq": null, "store": null, "redis": null, + "rate_limit": null, "thread_name_generator": null } ``` @@ -363,6 +365,82 @@ REDIS_URL=redis://localhost:6379 --- +## Rate Limiting + +### `rate_limit` + +Configures AgentFlow's built-in sliding-window rate limiter. + +**Type:** `object | null` + +**Default:** `null` + +**Local development example:** +```json +{ + "rate_limit": { + "enabled": true, + "backend": "memory", + "requests": 100, + "window": 60, + "by": "ip", + "exclude_paths": ["/health", "/docs", "/redoc", "/openapi.json"] + } +} +``` + +**Production Redis example:** + +Install the optional Redis extra before using `backend: "redis"`: + +```bash +pip install "10xscale-agentflow-cli[redis]" +``` + +```json +{ + "rate_limit": { + "enabled": true, + "backend": "redis", + "requests": 1000, + "window": 60, + "by": "ip", + "trusted_proxy_headers": true, + "exclude_paths": ["/health", "/metrics", "/docs", "/redoc", "/openapi.json"], + "redis": { + "url": "${RATE_LIMIT_REDIS_URL}", + "prefix": "agentflow:rate-limit" + }, + "fail_open": true + } +} +``` + +**Options:** + +| Field | Type | Default | Description | +| --- | --- | --- | --- | +| `enabled` | boolean | `true` | Enables rate limiting when the block exists. | +| `backend` | string | `"memory"` | `memory`, `redis`, or `custom`. | +| `requests` | integer | `100` | Maximum requests per window. | +| `window` | integer | `60` | Window duration in seconds. | +| `by` | string | `"ip"` | `ip` or `global`. | +| `exclude_paths` | string array | `[]` | Paths that bypass rate limiting. | +| `trusted_proxy_headers` | boolean | `false` | Use `X-Forwarded-For` only behind a trusted proxy. | +| `redis.url` | string | `null` | Redis URL for the Redis backend. | +| `redis.prefix` | string | `"agentflow:rate-limit"` | Redis key prefix. | +| `fail_open` | boolean | `true` | On Redis errors, allow requests when `true`; deny when `false`. | + +Use the `memory` backend for local development or one-process services. Use the +`redis` backend for production deployments with multiple workers, containers, or +servers. Redis is optional and is not installed unless you install the `redis` +extra. + +See the [Rate Limiting Guide](./rate-limiting.md) for usage details, response +headers, and custom backend examples. + +--- + ## Thread Name Generation ### `thread_name_generator` diff --git a/docs/deployment.md b/docs/deployment.md index 8221aaf..4fed583 100644 --- a/docs/deployment.md +++ b/docs/deployment.md @@ -640,7 +640,7 @@ echo "secrets.yaml" >> .gitignore pip install --upgrade 10xscale-agentflow-cli # 7. Enable rate limiting -# Use nginx, Traefik, or API Gateway +# Use AgentFlow's Redis backend, or enforce limits at nginx, Traefik, or an API gateway ``` ### Performance diff --git a/docs/rate-limit-review.md b/docs/rate-limit-review.md deleted file mode 100644 index 23d21f5..0000000 --- a/docs/rate-limit-review.md +++ /dev/null @@ -1,318 +0,0 @@ -# Rate Limit Review - -Reviewed files: - -- `agentflow-api/agentflow_cli/src/app/core/config/graph_config.py` -- `agentflow-api/agentflow_cli/src/app/core/middleware/rate_limit.py` -- `agentflow-api/agentflow_cli/src/app/core/config/setup_middleware.py` -- `agentflow-api/agentflow.json` - -## Verdict - -The current rate limiter is fine as a local development toy, but it is not a production-scalable rate limiter. It uses per-process memory, accepts spoofable client identity by default, serializes every request through one lock, has no backend abstraction, and exposes config that looks production-ready while silently breaking across workers or containers. - -For a framework whose promise is "focus on building the agent, not the scalable logic," this needs a backend-driven design. - -## Findings - -### High: limits are per process, not per deployment - -`RateLimitMiddleware` stores buckets in `self._buckets`, an in-memory dictionary owned by one Python process: - -- `rate_limit.py:44-46` -- `rate_limit.py:73-89` - -That means every Uvicorn/Gunicorn worker has its own quota. If the config says `100 requests / 60s` and the API runs with 8 workers, the real limit can become roughly `800 requests / 60s` per host. If the app runs across 5 containers, it can become roughly `4000 requests / 60s`. - -This is the main scalability failure. - -### High: the API config has no backend choice - -`RateLimitConfig` only supports: - -- `enabled` -- `requests` -- `window` -- `by` - -See `graph_config.py:9-32`. The example config in `agentflow.json:6-11` also has no way to say whether the limiter should use memory, Redis, or another storage backend. - -That makes the current implementation a hard-coded memory strategy wearing a production config costume. - -### High: `X-Forwarded-For` is trusted unconditionally - -The IP key uses `X-Forwarded-For` whenever present: - -- `rate_limit.py:55-59` - -This is dangerous unless the app is guaranteed to sit behind a trusted proxy that strips untrusted forwarding headers. A direct client can spoof `X-Forwarded-For` and rotate the value to bypass per-IP limits. - -This should be configurable and tied to trusted proxy settings, or use Starlette/Uvicorn proxy header handling in a controlled deployment path. - -### Medium: one global lock serializes all limiter checks - -Every request enters the same `asyncio.Lock`: - -- `rate_limit.py:46` -- `rate_limit.py:73` - -Even unrelated IPs block each other. Under load, the rate limiter becomes a request funnel. This is especially awkward because rate limiting is supposed to protect the app during pressure, not become the pressure point. - -### Medium: bucket cleanup is passive and memory can grow - -Old timestamps are only removed when the same bucket key is seen again: - -- `rate_limit.py:76-78` - -If many unique client keys appear once, their empty or stale buckets can remain in `_buckets` indefinitely. A hostile client can generate many spoofed `X-Forwarded-For` values and inflate memory. - -### Medium: `global` mode is global only inside one process - -The `global` bucket key is `__global__`: - -- `rate_limit.py:52-54` - -Because storage is local memory, this is not actually global across workers, pods, hosts, or deployments. The name promises more than the implementation can deliver. - -### Medium: algorithm choice is fixed and storage-heavy - -The middleware implements a sliding window by storing every request timestamp in a deque: - -- `rate_limit.py:44` -- `rate_limit.py:88` - -This gives accurate sliding windows, but memory grows with `number_of_keys * requests`. Redis can support this with sorted sets, but for high traffic a token bucket or fixed-window-with-Lua approach may be cheaper and easier to operate. - -### Medium: `X-RateLimit-Reset` is not a standard reset timestamp - -The response uses seconds-until-reset as `X-RateLimit-Reset`: - -- `rate_limit.py:130` -- `rate_limit.py:137` - -Many clients expect reset to be an epoch timestamp. If the project wants delta seconds, use a clearer header such as `X-RateLimit-Reset-After`, or document the behavior explicitly. - -### Low: config validation is too narrow for future extension - -`RateLimitConfig.from_dict` validates `by`, `requests`, and `window`, but there is no namespacing for backend-specific settings: - -- `graph_config.py:19-32` - -If Redis is added by sprinkling `redis_url` into the same flat object, the config will become messy quickly. - -### Low: no route/method exclusions - -The limiter applies to every HTTP request once enabled: - -- `setup_middleware.py:151-155` - -That includes docs, health checks, metrics, readiness probes, and possibly streaming endpoints. Production deployments usually need `exclude_paths`, `include_paths`, or route groups. - - -Claude Sonnet built the rate limiter equivalent of putting a bicycle lock on a cloud load balancer. - -It has the aesthetic of scalability: config file, middleware class, headers, neat docstring. But the actual quota lives in a Python dictionary inside one worker. The moment you add another worker, container, or node, the "limit" becomes a polite suggestion with multiplication enabled. - -The `global` option is especially funny. It is global in the same way a sticky note on one laptop is company policy. Accurate within a very small emotional radius. - -And trusting `X-Forwarded-For` directly is the security version of accepting "I am definitely the CEO" because someone wrote it in a request header. - -To be fair, this is a reasonable first draft for local demos. But for AgentFlow's stated goal, this should not be the production shape. - -## Proposed config shape - -Add one more keyword: `backend`. - -Recommended default: - -```json -{ - "rate_limit": { - "enabled": true, - "backend": "memory", - "requests": 100, - "window": 60, - "by": "ip" - } -} -``` - -Production Redis example: - -```json -{ - "rate_limit": { - "enabled": true, - "backend": "redis", - "redis": { - "url": "${REDIS_URL}", - "prefix": "agentflow:rate-limit" - }, - "requests": 100, - "window": 60, - "by": "ip" - } -} -``` - -Future custom backend example: - -```json -{ - "rate_limit": { - "enabled": true, - "backend": "custom", - "requests": 100, - "window": 60, - "by": "user" - } -} -``` - -For custom backends, implement `BaseRateLimitBackend` and bind an instance into -InjectQ, matching the authentication/authorization pattern. - -Suggested fields: - -| Field | Type | Notes | -| --- | --- | --- | -| `enabled` | bool | Enables/disables middleware. | -| `backend` | string | `memory`, `redis`, or `custom`. Default can be `memory` for backward compatibility. | -| `requests` | int | Max requests per window. | -| `window` | int | Window duration in seconds. | -| `by` | string | Start with `ip` and `global`; later add `user`, `api_key`, `thread`, or `custom`. | -| `redis.url` | string | Required when `backend = "redis"` unless a shared app Redis config is used. | -| `redis.prefix` | string | Key prefix for isolation. | -| `exclude_paths` | list[string] | Optional health/docs/metrics exclusions. | -| `trusted_proxy_headers` | bool | Only honor forwarding headers when explicitly enabled. | - -## Proposed architecture - -Split the rate limiter into three layers: - -1. `RateLimitConfig` - Parses and validates config, including backend-specific options. - -2. `RateLimitBackend` - Owns the storage and algorithm. - -3. `RateLimitMiddleware` - Extracts identity, asks the backend whether the request is allowed, and writes response headers. - -Example interface: - -```python -from abc import ABC, abstractmethod -from dataclasses import dataclass - - -@dataclass(frozen=True) -class RateLimitDecision: - allowed: bool - limit: int - remaining: int - retry_after: int - reset_after: int - - -class BaseRateLimitBackend(ABC): - @abstractmethod - async def check(self, key: str, *, limit: int, window: int) -> RateLimitDecision: - ... -``` - -Then implement: - -- `MemoryRateLimitBackend` -- `RedisRateLimitBackend` - -The middleware should not know whether the counter lives in a deque, Redis, Postgres, or a user's custom implementation. - -## Redis backend recommendation - -For Redis, use an atomic operation. Do not perform `GET`, calculate in Python, then `SET`, because concurrent requests will race. - -Two good options: - -1. Fixed window with Redis `INCR` + `EXPIRE` - Simple, fast, atomic enough if wrapped carefully. Less smooth at window boundaries. - -2. Sliding window with Redis sorted sets + Lua script - More accurate, more expensive, still atomic if implemented in Lua. - -For AgentFlow, I would start with fixed window or token bucket unless exact sliding-window semantics are required. The framework likely cares more about predictable distributed enforcement than perfect boundary smoothing. - -## Migration plan - -### Step 1: Make config extensible - -Update `RateLimitConfig` to include: - -- `backend: str = "memory"` -- `redis_url: str | None = None` -- `redis_prefix: str = "agentflow:rate-limit"` -- `exclude_paths: tuple[str, ...] = ()` -- `trusted_proxy_headers: bool = False` - -Keep current config working by defaulting `backend` to `memory`. - -### Step 2: Extract the backend interface - -Move the deque logic out of `RateLimitMiddleware` into `MemoryRateLimitBackend`. - -This keeps the current behavior intact while creating the seam for Redis. - -### Step 3: Add Redis backend - -Use `redis.asyncio` from the existing optional `redis` dependency. - -The Redis backend should: - -- use a configurable key prefix -- use atomic commands or Lua -- return the same `RateLimitDecision` as memory -- fail closed or fail open based on an explicit config option, not accidental exception behavior - -### Step 4: Fix identity extraction - -Add an identity resolver with explicit modes: - -- `global` -- `ip` -- later: `user`, `api_key`, `custom` - -Only trust `X-Forwarded-For` when configured. Otherwise use `request.client.host`. - -### Step 5: Add exclusions - -Support paths like: - -```json -"exclude_paths": ["/health", "/metrics", "/docs", "/openapi.json"] -``` - -This avoids rate limiting health checks and operational endpoints. - -### Step 6: Add tests - -Minimum tests: - -- config parses old format without `backend` -- config parses `backend = "memory"` -- config parses `backend = "redis"` and requires Redis URL -- invalid backend raises a helpful error -- memory backend enforces limit -- Redis backend enforces limit across two backend instances -- `X-Forwarded-For` is ignored unless trusted proxy mode is enabled -- excluded paths bypass limiter - -## Recommended next implementation target - -Do this in one focused PR: - -1. Add `backend` parsing with backward compatibility. -2. Extract `MemoryRateLimitBackend`. -3. Keep current runtime behavior unchanged when `backend` is omitted. -4. Add tests for config and memory behavior. - -Then a second PR can add Redis without mixing architectural cleanup and networked storage in the same change. diff --git a/docs/rate-limiting.md b/docs/rate-limiting.md new file mode 100644 index 0000000..d28425f --- /dev/null +++ b/docs/rate-limiting.md @@ -0,0 +1,192 @@ +# Rate Limiting + +AgentFlow can protect your API with a sliding-window rate limiter configured from +`agentflow.json`. The limiter is disabled until you add a `rate_limit` block. + +## Quick Start + +For local development or a single-process deployment, use the in-memory backend: + +```json +{ + "agent": "graph.react:app", + "rate_limit": { + "enabled": true, + "backend": "memory", + "requests": 100, + "window": 60, + "by": "ip", + "exclude_paths": ["/health", "/docs", "/redoc", "/openapi.json"] + } +} +``` + +This allows each client IP to make `100` requests every `60` seconds. + +## Production With Redis + +Use Redis when your API runs with multiple workers, containers, or servers. +Redis stores the counters centrally, so the limit is enforced across the whole +deployment. + +Redis support is optional. Install AgentFlow with the Redis extra before using +`backend: "redis"`: + +```bash +pip install "10xscale-agentflow-cli[redis]" +``` + +Configure Redis in `agentflow.json`: + +```json +{ + "agent": "graph.react:app", + "rate_limit": { + "enabled": true, + "backend": "redis", + "requests": 1000, + "window": 60, + "by": "ip", + "trusted_proxy_headers": true, + "exclude_paths": ["/health", "/metrics", "/docs", "/redoc", "/openapi.json"], + "redis": { + "url": "${RATE_LIMIT_REDIS_URL}", + "prefix": "agentflow:rate-limit" + }, + "fail_open": true + } +} +``` + +Then set the environment variable: + +```bash +RATE_LIMIT_REDIS_URL=redis://localhost:6379/0 +``` + +The Redis backend uses an atomic Lua script with sorted sets. That means the +check and the request recording happen as one Redis operation, which prevents +concurrent requests from racing past the configured limit. + +## Configuration Reference + +| Field | Type | Default | Description | +| --- | --- | --- | --- | +| `enabled` | boolean | `true` | Enables the middleware when the `rate_limit` block exists. | +| `backend` | string | `"memory"` | `memory`, `redis`, or `custom`. | +| `requests` | integer | `100` | Maximum requests allowed in each window. | +| `window` | integer | `60` | Window size in seconds. | +| `by` | string | `"ip"` | Limit by client IP or use `"global"` for one shared quota. | +| `exclude_paths` | string array | `[]` | Paths that bypass rate limiting. | +| `trusted_proxy_headers` | boolean | `false` | Whether to use `X-Forwarded-For` as the client IP. | +| `redis.url` | string | `null` | Redis URL for the Redis backend. Required unless a Redis client is injected. | +| `redis.prefix` | string | `"agentflow:rate-limit"` | Prefix for Redis keys. | +| `fail_open` | boolean | `true` | For Redis errors, allow requests when `true` or deny them when `false`. | + +## Identity Modes + +Use per-IP limits for most public APIs: + +```json +{ + "rate_limit": { + "requests": 100, + "window": 60, + "by": "ip" + } +} +``` + +Use a global limit when you want one shared quota for the whole service: + +```json +{ + "rate_limit": { + "requests": 5000, + "window": 60, + "by": "global" + } +} +``` + +Only enable `trusted_proxy_headers` when your app is behind a trusted proxy or +load balancer that strips untrusted forwarding headers from direct clients. + +## Response Headers + +Every limited response includes: + +| Header | Description | +| --- | --- | +| `X-RateLimit-Limit` | Configured request limit. | +| `X-RateLimit-Remaining` | Requests remaining in the current window. | +| `X-RateLimit-Reset` | Unix timestamp for the current reset estimate. | +| `X-RateLimit-Reset-After` | Seconds until the current reset estimate. | +| `Retry-After` | Present on `429` responses. | + +When the limit is exceeded, AgentFlow returns `429 Too Many Requests`: + +```json +{ + "error": { + "code": "RATE_LIMIT_EXCEEDED", + "message": "Too many requests. Limit: 100 per 60s. Retry after 12s.", + "limit": 100, + "window_seconds": 60, + "retry_after_seconds": 12 + }, + "metadata": { + "request_id": "request-id", + "status": "error" + } +} +``` + +## Custom Backend + +Use a custom backend when you want to store rate-limit state somewhere else. +Implement `BaseRateLimitBackend`, then bind an instance in InjectQ. + +```python +from agentflow_cli.src.app.core.middleware.rate_limit import ( + BaseRateLimitBackend, + RateLimitDecision, +) + + +class MyRateLimitBackend(BaseRateLimitBackend): + async def check(self, key: str, *, limit: int, window: int) -> RateLimitDecision: + allowed = True + remaining = limit - 1 + reset_after = window + return RateLimitDecision( + allowed=allowed, + remaining=remaining, + reset_after=reset_after, + ) + + async def close(self) -> None: + return None +``` + +Configure AgentFlow to use the custom backend: + +```json +{ + "rate_limit": { + "enabled": true, + "backend": "custom", + "requests": 100, + "window": 60, + "by": "ip" + } +} +``` + +## Choosing A Backend + +Use `memory` for local development, tests, demos, and one-process services. + +Use `redis` for production APIs, Gunicorn/Uvicorn deployments with multiple +workers, Docker/Kubernetes deployments, and any setup with more than one API +instance. Redis is not needed for the `memory` or `custom` backends. diff --git a/mkdocs.yaml b/mkdocs.yaml deleted file mode 100644 index add176d..0000000 --- a/mkdocs.yaml +++ /dev/null @@ -1,86 +0,0 @@ -site_name: AgentFlow-CLI -site_description: "A lightweight Python framework for building intelligent agents and multi-agent workflows." -# Required for Material's instant navigation and previews -site_url: https://10xhub.github.io/agentflow-cli/ -repo_url: https://github.com/10xhub/agentflow-cli -repo_name: 10xhub/agentflow-cli - -theme: - name: material - language: en - logo: null - favicon: null - features: - - navigation.pagination - - navigation.goto_top - - navigation.path - - navigation.tabs - - navigation.top - - search-suggest - - search.highlight - - search.share - - content.code.copy - - -plugins: - - search - - autorefs - - section-index - - mkdocstrings: - default_handler: python - handlers: - python: - options: - docstring_style: google - show_inheritance: true - show_signature: true - show_source: true - allow_inspection: true - show_typehints: true - show_toc: true - show_bases: true - show_inheritance_diagram: true - parameter_headings: true - group_by_category: true - show_category_heading: true - show_symbol_type_heading: true - summary: true - load_external_modules: true - inherited_members: true - show_submodules: true - show_if_no_docstring: true - separate_signature: true - - - gen-files: - scripts: - - scripts/generate_docs.py - -markdown_extensions: - - admonition - - pymdownx.details - - pymdownx.superfences - - pymdownx.highlight: - anchor_linenums: true - line_spans: __span - pygments_lang_class: true - - pymdownx.inlinehilite - - pymdownx.snippets - - pymdownx.superfences - - pymdownx.superfences: - custom_fences: - - name: mermaid - class: mermaid - -nav: - - Home: index.md - - Getting Started: - - CLI Guide: cli-guide.md - - Configuration: configuration.md - - Features: - - Authentication: authentication.md - - ID Generation: id-generation.md - - Thread Name Generator: thread-name-generator.md - - Deployment: - - Deployment Guide: deployment.md - - Reference: - - CLI Reference: cli.md diff --git a/requirements.txt b/requirements.txt index 81386ab..8232623 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,8 +22,6 @@ pytest-env>=1.1.5 pytest-xdist>=3.8.0 pre-commit>=3.8.0 ruff==0.5.2 -mkdocs-gen-files==0.5.0 -mkdocstrings==0.25.2 mypy-extensions==1.0.0 httpx==0.27.0 lib==4.0.0 diff --git a/tests/unit_tests/test_rate_limit.py b/tests/unit_tests/test_rate_limit.py index 26ed06c..6c0e698 100644 --- a/tests/unit_tests/test_rate_limit.py +++ b/tests/unit_tests/test_rate_limit.py @@ -216,3 +216,20 @@ async def fake_script(*, keys, args): def test_redis_backend_requires_url_when_no_injected_client(): with pytest.raises(ValueError, match="redis.url"): build_backend(_config(backend="redis"), container=InjectQ()) + + +def test_redis_backend_from_url_requires_optional_extra(monkeypatch): + monkeypatch.setattr( + "agentflow_cli.src.app.core.middleware.rate_limit.redis._REDIS_AVAILABLE", + False, + ) + monkeypatch.setattr( + "agentflow_cli.src.app.core.middleware.rate_limit.redis.AsyncRedis", + None, + ) + + with pytest.raises(ImportError, match=r"10xscale-agentflow-cli\[redis\]"): + RedisRateLimitBackend.from_url( + redis_url="redis://localhost:6379/0", + prefix="agentflow:test", + ) From ee10930973ffaa2062f6c161714ea7f1ef4f8b3a Mon Sep 17 00:00:00 2001 From: Shudipto Trafder Date: Thu, 30 Apr 2026 15:23:50 +0600 Subject: [PATCH 4/5] feat: enhance documentation and optional dependencies for JWT and media extraction --- README.md | 8 +++++++ agentflow_cli/src/app/core/auth/jwt_auth.py | 13 ++++++++++- .../app/core/middleware/rate_limit/factory.py | 9 ++++---- .../src/app/utils/media/extractor.py | 2 +- docs/cli-guide.md | 2 +- pyproject.toml | 11 ++++++++-- uv.lock | 22 ++++++++++++++----- 7 files changed, 52 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index d5f6351..49d082f 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,14 @@ Redis rate limiting is optional. Install the Redis extra only when you configure pip install "10xscale-agentflow-cli[redis]" ``` +JWT auth and document text extraction are optional too. Install only the extra +you need: + +```bash +pip install "10xscale-agentflow-cli[jwt]" +pip install "10xscale-agentflow-cli[media]" +``` + ### Initialize a New Project ```bash diff --git a/agentflow_cli/src/app/core/auth/jwt_auth.py b/agentflow_cli/src/app/core/auth/jwt_auth.py index a49face..b58b928 100644 --- a/agentflow_cli/src/app/core/auth/jwt_auth.py +++ b/agentflow_cli/src/app/core/auth/jwt_auth.py @@ -1,6 +1,5 @@ from typing import Any -import jwt from fastapi import Request, Response from fastapi.security import HTTPAuthorizationCredentials @@ -10,6 +9,12 @@ from agentflow_cli.src.app.core.exceptions import UserAccountError +try: + import jwt +except ImportError: # pragma: no cover + jwt = None # type: ignore[assignment] + + class JwtAuth(BaseAuth): def authenticate( self, @@ -51,6 +56,12 @@ def authenticate( token = credential.credentials + if jwt is None: + raise ImportError( + "PyJWT is required for JWT authentication. " + 'Install with `pip install "10xscale-agentflow-cli[jwt]"`' + ) + if jwt_secret_key is None or jwt_algorithm is None: raise UserAccountError( message="JWT settings are not configured", diff --git a/agentflow_cli/src/app/core/middleware/rate_limit/factory.py b/agentflow_cli/src/app/core/middleware/rate_limit/factory.py index 1929f81..e4615b9 100644 --- a/agentflow_cli/src/app/core/middleware/rate_limit/factory.py +++ b/agentflow_cli/src/app/core/middleware/rate_limit/factory.py @@ -80,18 +80,19 @@ def _build_redis_backend( def _get_redis_from_container(container: InjectQ) -> Any | None: - redis = container.try_get("redis") or container.try_get("redis_client") + redis = container.try_get("redis") if redis is not None: return redis - from redis.asyncio import Redis - - redis = container.try_get(Redis) + redis = container.try_get("redis_client") if redis is not None: return redis + return container.try_get("Redis") def _bind_redis(container: InjectQ, redis: "Redis") -> None: if Redis is not None: container.bind_instance(Redis, redis) + container.bind_instance("redis", redis) + container.bind_instance("redis_client", redis) diff --git a/agentflow_cli/src/app/utils/media/extractor.py b/agentflow_cli/src/app/utils/media/extractor.py index 8aec11d..0c54a42 100644 --- a/agentflow_cli/src/app/utils/media/extractor.py +++ b/agentflow_cli/src/app/utils/media/extractor.py @@ -37,7 +37,7 @@ def __init__(self, extractor: Any | None = None): if AsyncTextExtractor is None: raise ImportError( "textxtract is required for document extraction. " - "Install with `pip install textxtract[pdf,docx,html,xml,md]`" + 'Install with `pip install "10xscale-agentflow-cli[media]"`' ) self.extractor = AsyncTextExtractor() diff --git a/docs/cli-guide.md b/docs/cli-guide.md index 9a274e0..da042d0 100644 --- a/docs/cli-guide.md +++ b/docs/cli-guide.md @@ -11,7 +11,7 @@ pip install 10xscale-agentflow-cli For development with all optional dependencies: ```bash -pip install "10xscale-agentflow-cli[redis,sentry,firebase,snowflakekit,gcloud]" +pip install "10xscale-agentflow-cli[redis,jwt,media,sentry,firebase,snowflakekit,gcloud]" ``` ## Quick Start diff --git a/pyproject.toml b/pyproject.toml index 7bb3286..4cb6197 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,8 +44,6 @@ dependencies = [ "uvicorn", "typer", "python-dotenv", - "PyJWT", - "textxtract[pdf,docx,html,xml,md]>=0.2.0", ] [project.urls] @@ -68,6 +66,12 @@ snowflakekit = [ redis = [ "redis>=5.0.7", ] +jwt = [ + "PyJWT", +] +media = [ + "textxtract[pdf,docx,html,xml,md]>=0.2.0", +] gcloud = [ "google-cloud-logging", ] @@ -245,4 +249,7 @@ dev = [ "lib==4.0.0", "markdown-it-py==3.0.0", "requests==2.32.3", + "redis>=5.0.7", + "PyJWT", + "textxtract[pdf,docx,html,xml,md]>=0.2.0", ] diff --git a/uv.lock b/uv.lock index f71a7d5..391bb37 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.12" resolution-markers = [ "python_full_version >= '3.13'", @@ -32,10 +32,8 @@ dependencies = [ { name = "orjson" }, { name = "pydantic" }, { name = "pydantic-settings" }, - { name = "pyjwt" }, { name = "python-dotenv" }, { name = "python-multipart" }, - { name = "textxtract", extra = ["docx", "html", "md", "pdf", "xml"] }, { name = "typer" }, { name = "uvicorn" }, ] @@ -48,6 +46,12 @@ firebase = [ gcloud = [ { name = "google-cloud-logging" }, ] +jwt = [ + { name = "pyjwt" }, +] +media = [ + { name = "textxtract", extra = ["docx", "html", "md", "pdf", "xml"] }, +] redis = [ { name = "redis" }, ] @@ -67,14 +71,17 @@ dev = [ { name = "mkdocstrings" }, { name = "mypy-extensions" }, { name = "pre-commit" }, + { name = "pyjwt" }, { name = "pytest" }, { name = "pytest-asyncio" }, { name = "pytest-cov" }, { name = "pytest-env" }, { name = "pytest-xdist" }, + { name = "redis" }, { name = "requests" }, { name = "ruff" }, { name = "snowflakekit" }, + { name = "textxtract", extra = ["docx", "html", "md", "pdf", "xml"] }, ] [package.metadata] @@ -88,17 +95,17 @@ requires-dist = [ { name = "orjson" }, { name = "pydantic" }, { name = "pydantic-settings" }, - { name = "pyjwt" }, + { name = "pyjwt", marker = "extra == 'jwt'" }, { name = "python-dotenv" }, { name = "python-multipart" }, { name = "redis", marker = "extra == 'redis'", specifier = ">=5.0.7" }, { name = "sentry-sdk", marker = "extra == 'sentry'", specifier = ">=2.10.0" }, { name = "snowflakekit", marker = "extra == 'snowflakekit'" }, - { name = "textxtract", extras = ["pdf", "docx", "html", "xml", "md"], specifier = ">=0.2.0" }, + { name = "textxtract", extras = ["pdf", "docx", "html", "xml", "md"], marker = "extra == 'media'", specifier = ">=0.2.0" }, { name = "typer" }, { name = "uvicorn" }, ] -provides-extras = ["sentry", "firebase", "snowflakekit", "redis", "gcloud"] +provides-extras = ["sentry", "firebase", "snowflakekit", "redis", "jwt", "media", "gcloud"] [package.metadata.requires-dev] dev = [ @@ -109,14 +116,17 @@ dev = [ { name = "mkdocstrings", specifier = "==0.25.2" }, { name = "mypy-extensions", specifier = "==1.0.0" }, { name = "pre-commit", specifier = ">=3.8.0" }, + { name = "pyjwt" }, { name = "pytest", specifier = ">=8.4.2" }, { name = "pytest-asyncio", specifier = ">=1.2.0" }, { name = "pytest-cov", specifier = ">=7.0.0" }, { name = "pytest-env", specifier = ">=1.1.5" }, { name = "pytest-xdist", specifier = ">=3.8.0" }, + { name = "redis", specifier = ">=5.0.7" }, { name = "requests", specifier = "==2.32.3" }, { name = "ruff", specifier = "==0.5.2" }, { name = "snowflakekit" }, + { name = "textxtract", extras = ["pdf", "docx", "html", "xml", "md"], specifier = ">=0.2.0" }, ] [[package]] From 6a4d2b27222df3a321e498b59205ad2f27b01d29 Mon Sep 17 00:00:00 2001 From: Shudipto Trafder Date: Thu, 30 Apr 2026 16:35:53 +0600 Subject: [PATCH 5/5] feat: add rate limiting documentation and configuration details to relevant references --- .../templates/skills/agent-skills/SKILL.md | 2 + .../references/api-configuration.md | 12 +- .../references/api-settings-and-middleware.md | 8 + .../references/callbacks-and-command.md | 166 ++++++++++++++- .../agent-skills/references/rate-limiting.md | 194 ++++++++++++++++++ .../skills/copilot/agentflow.instructions.md | 3 + 6 files changed, 383 insertions(+), 2 deletions(-) create mode 100644 agentflow_cli/cli/templates/skills/agent-skills/references/rate-limiting.md diff --git a/agentflow_cli/cli/templates/skills/agent-skills/SKILL.md b/agentflow_cli/cli/templates/skills/agent-skills/SKILL.md index 0d73205..404b0b2 100644 --- a/agentflow_cli/cli/templates/skills/agent-skills/SKILL.md +++ b/agentflow_cli/cli/templates/skills/agent-skills/SKILL.md @@ -26,6 +26,7 @@ metadata: - references/api-configuration.md - references/auth-and-authorization.md - references/api-settings-and-middleware.md + - references/rate-limiting.md - references/rest-api-and-errors.md - references/id-and-thread-name-generators.md - references/client-auth-and-errors.md @@ -78,6 +79,7 @@ Treat `agentflow-docs/docs` as the first source of truth for public package name - `agentflow.json` and dependency loading: `references/api-configuration.md` - API auth and authorization: `references/auth-and-authorization.md` - API environment, settings, and middleware: `references/api-settings-and-middleware.md` + - Rate limiting (config, backends, headers, custom backend): `references/rate-limiting.md` - REST routes and error behavior: `references/rest-api-and-errors.md` - API Snowflake IDs and thread naming: `references/id-and-thread-name-generators.md` - TypeScript auth helpers and structured errors: `references/client-auth-and-errors.md` diff --git a/agentflow_cli/cli/templates/skills/agent-skills/references/api-configuration.md b/agentflow_cli/cli/templates/skills/agent-skills/references/api-configuration.md index 072f061..a30c6e1 100644 --- a/agentflow_cli/cli/templates/skills/agent-skills/references/api-configuration.md +++ b/agentflow_cli/cli/templates/skills/agent-skills/references/api-configuration.md @@ -23,7 +23,15 @@ Common full shape: "thread_name_generator": "graph.thread_name_generator:MyNameGenerator", "authorization": "graph.auth:my_authorization_backend", "env": ".env", - "auth": "jwt" + "auth": "jwt", + "rate_limit": { + "enabled": true, + "backend": "memory", + "requests": 100, + "window": 60, + "by": "ip", + "exclude_paths": ["/health", "/docs", "/redoc", "/openapi.json"] + } } ``` @@ -37,6 +45,7 @@ Common full shape: - `authorization`: optional import path to an authorization backend. - `env`: optional `.env` path loaded before graph import. - `auth`: `null`, `"jwt"`, or `{"method": "custom", "path": "module:backend"}`. +- `rate_limit`: optional sliding-window rate limiter config object; omit or set to `null` to disable. See `references/rate-limiting.md` for the full field reference. ## Loading Order @@ -62,3 +71,4 @@ Common full shape: - App startup: `agentflow-api/agentflow_cli/src/app/main.py` - Main docs: `agentflow-docs/docs/reference/api-cli/configuration.md` - How-to: `agentflow-docs/docs/how-to/api-cli/configure-agentflow-json.md` +- Rate limit config details: `references/rate-limiting.md` diff --git a/agentflow_cli/cli/templates/skills/agent-skills/references/api-settings-and-middleware.md b/agentflow_cli/cli/templates/skills/agent-skills/references/api-settings-and-middleware.md index b151a70..e88c7fd 100644 --- a/agentflow_cli/cli/templates/skills/agent-skills/references/api-settings-and-middleware.md +++ b/agentflow_cli/cli/templates/skills/agent-skills/references/api-settings-and-middleware.md @@ -48,6 +48,11 @@ Active middleware areas: - Request ID assignment. - Selective gzip behavior; streaming endpoints should avoid gzip buffering when configured. - Worker middleware where used by deployment. +- Rate limiting: sliding-window limiter controlled by the `rate_limit` block in `agentflow.json`. + Uses an in-process `memory` backend by default; use the `redis` backend (requires + `pip install "10xscale-agentflow-cli[redis]"`) for multi-worker or multi-instance deployments. + Excluded paths, identity mode (`ip` or `global`), and `fail_open` behavior are all configurable. + See `references/rate-limiting.md` for the full option reference. ## Production Warnings @@ -76,7 +81,10 @@ Production mode warns about unsafe defaults such as: - Middleware setup: `agentflow-api/agentflow_cli/src/app/core/config/setup_middleware.py` - Request limits: `agentflow-api/agentflow_cli/src/app/core/middleware/request_limits.py` - Security headers: `agentflow-api/agentflow_cli/src/app/core/middleware/security_headers.py` +- Rate limit middleware: `agentflow-api/agentflow_cli/src/app/core/middleware/rate_limit/` +- Rate limit base class: `agentflow-api/agentflow_cli/src/app/core/middleware/rate_limit/base.py` - Sentry: `agentflow-api/agentflow_cli/src/app/core/config/sentry_config.py` - Log sanitizer: `agentflow-api/agentflow_cli/src/app/core/utils/log_sanitizer.py` - Main docs: `agentflow-docs/docs/reference/api-cli/environment.md` - Production docs: `agentflow-docs/docs/how-to/production/environment-variables.md` +- Rate limiting docs: `agentflow-api/docs/rate-limiting.md` diff --git a/agentflow_cli/cli/templates/skills/agent-skills/references/callbacks-and-command.md b/agentflow_cli/cli/templates/skills/agent-skills/references/callbacks-and-command.md index 94ad1ee..5cc0347 100644 --- a/agentflow_cli/cli/templates/skills/agent-skills/references/callbacks-and-command.md +++ b/agentflow_cli/cli/templates/skills/agent-skills/references/callbacks-and-command.md @@ -43,6 +43,166 @@ app = graph.compile(callback_manager=callback_manager) Use validators for safety checks, business rules, input policy, and prompt-injection detection. +## Graph Lifecycle Hooks + +Graph lifecycle hooks fire at **graph orchestration level** — before/after the entire graph run, on checkpoints, on interrupts, on resume, and after each node transition. They complement the invocation-level hooks above (`before_invoke` / `after_invoke` / `on_error`), which fire inside a single node's AI/Tool/MCP call. + +### GraphLifecycleContext + +All 7 hooks receive this as their first argument. It provides run-identifying metadata. + +```python +@dataclass +class GraphLifecycleContext: + config: dict[str, Any] # full config dict passed to invoke/stream + timestamp: str # ISO8601 start time from config["timestamp"] + metadata: dict[str, Any] | None = None # open-ended extra context + + @property + def thread_id(self) -> str | None: ... + + @property + def run_id(self) -> str | None: ... +``` + +### GraphLifecycleHook + +Subclass `GraphLifecycleHook` and override only the methods you need. All methods are async and default to no-ops. + +```python +from agentflow.utils.callbacks import GraphLifecycleHook, GraphLifecycleContext, CallbackManager +from agentflow.core.state import AgentState +from agentflow.core.state.message import Message + +class GraphLifecycleHook(ABC): + async def on_graph_start( + self, context: GraphLifecycleContext, state: AgentState + ) -> AgentState | None: ... + # Fires: after state is loaded, before the first node executes. + # Return modified AgentState to replace the initial state, or None. + + async def on_graph_end( + self, context: GraphLifecycleContext, final_state: AgentState, + messages: list[Message], total_steps: int + ) -> AgentState | None: ... + # Fires: after execution loop completes, before final persistence. + # Return modified AgentState or None. + + async def on_graph_error( + self, context: GraphLifecycleContext, error: Exception, + partial_state: AgentState, messages: list[Message], step: int, node_name: str + ) -> tuple[AgentState, str] | None: ... + # Fires: when an unhandled exception escapes the graph loop. + # Return (AgentState, error_message) to change persisted error snapshot, or None. + # Cannot suppress the error — always re-raised after this hook. + + async def on_interrupt( + self, context: GraphLifecycleContext, interrupted_node: str, + interrupt_type: str, # "before" | "after" | "stop" | "remote_tool" + state: AgentState + ) -> AgentState | None: ... + # Fires: before interrupt state is persisted (covers before/after/stop/remote_tool types). + # Return modified AgentState or None. + + async def on_resume( + self, context: GraphLifecycleContext, resumed_node: str, + state: AgentState, resume_data: dict[str, Any] + ) -> AgentState | None: ... + # Fires: when a paused graph is resumed, before clear_interrupt(). + # resume_data is mutable in-place. Return modified AgentState or None. + + async def on_checkpoint( + self, context: GraphLifecycleContext, state: AgentState, + messages: list[Message], is_context_trimmed: bool + ) -> tuple[AgentState, list[Message]] | AgentState | None: ... + # Fires: immediately before state/messages are persisted (every checkpoint, not just final). + # Return (AgentState, messages), AgentState, or None. + + async def on_state_update( + self, context: GraphLifecycleContext, node_name: str, + old_state: AgentState, new_state: AgentState, step: int + ) -> AgentState | None: ... + # Fires: after each node produces a result and state is merged. + # Return modified AgentState or None. +``` + +### Registration + +```python +callback_mgr = CallbackManager() +callback_mgr.register_lifecycle_hook(MyHook()) + +# Combine with existing invocation-level hooks on the same manager: +callback_mgr.register_after_invoke(InvocationType.AI, my_ai_callback) + +app = graph.compile(callback_manager=callback_mgr) +``` + +### Hook Summary + +| Hook | Returns | Fires N times | Fire location | +|---|---|---|---| +| `on_graph_start` | `AgentState \| None` | 1 per run | After state load, before loop | +| `on_graph_end` | `AgentState \| None` | 1 per successful run | After `state.complete()`, before final `sync_data()` | +| `on_graph_error` | `tuple[AgentState, str] \| None` | 1 per failed run | In except block, before error `sync_data()` | +| `on_interrupt` | `AgentState \| None` | 0–N per run | Before interrupt checkpoint persistence | +| `on_resume` | `AgentState \| None` | 0–1 per call | Before `clear_interrupt()` | +| `on_checkpoint` | `(AgentState, list[Message]) \| AgentState \| None` | 1–N per run | Before every durable checkpoint write | +| `on_state_update` | `AgentState \| None` | N per run (once per node) | After each node result is merged | + +### Example + +```python +class ObservabilityHook(GraphLifecycleHook): + async def on_graph_start(self, ctx, state): + self._span = tracer.start_span(f"graph.run.{ctx.thread_id}") + return None + + async def on_graph_end(self, ctx, final_state, messages, total_steps): + self._span.set_attribute("steps", total_steps) + self._span.end() + + async def on_graph_error(self, ctx, error, partial_state, messages, step, node_name): + self._span.record_exception(error) + self._span.end() + alert_oncall(f"Graph failed at node {node_name}: {error}") + return None + + async def on_interrupt(self, ctx, interrupted_node, interrupt_type, state): + notify_frontend(ctx.thread_id, status="waiting_for_input", node=interrupted_node) + return None + + async def on_resume(self, ctx, resumed_node, state, resume_data): + notify_frontend(ctx.thread_id, status="resuming", node=resumed_node) + return None + + async def on_checkpoint(self, ctx, state, messages, is_context_trimmed): + metrics.increment("agentflow.checkpoints", tags={"thread": ctx.thread_id}) + return None + + async def on_state_update(self, ctx, node_name, old_state, new_state, step): + diff = compute_diff(old_state, new_state) + stream_diff_to_frontend(ctx.thread_id, diff) + return None + + +callback_mgr = CallbackManager() +callback_mgr.register_lifecycle_hook(ObservabilityHook()) +app = graph.compile(callback_manager=callback_mgr) +``` + +### Common Use Cases by Hook + +- **`on_graph_start`**: inject trace IDs, pre-populate state from external DB, set rate-limit budgets, initialize OpenTelemetry spans. +- **`on_graph_end`**: send completion notifications (Slack/email), record step/message count metrics, archive transcripts, trigger downstream webhooks. +- **`on_graph_error`**: alert PagerDuty/Sentry, log structured failure diagnostics, close OTel spans with error status. +- **`on_interrupt`**: push "waiting for approval" notifications to frontend/mobile, start timeout timers, update task queue status. +- **`on_resume`**: cancel timeout timers, validate resume payload, record interrupt→resume cycle for audit trail. +- **`on_checkpoint`**: redact sensitive data before persistence, replicate to secondary store, invalidate caches, compliance audit logging (SOC2/HIPAA). +- **`on_state_update`**: real-time state diffing for frontend streaming, per-node invariant assertions, security scanning of state content. + +--- + ## Command `Command` lets a node combine state/message updates with control flow. Use it when the next node depends on runtime logic inside the node and is awkward to express as a static conditional edge. @@ -94,9 +254,13 @@ def router_node(state, config): ## Source Map -- Callback system: `agentflow/agentflow/utils/callbacks.py` +- Callback system (invocation hooks + graph lifecycle hooks): `agentflow/agentflow/utils/callbacks.py` - Default validators: `agentflow/agentflow/utils/validators.py` - Graph compile callback argument: `agentflow/agentflow/core/graph/state_graph.py` - Command API: `agentflow/agentflow/utils/command.py` - Command execution paths: `agentflow/agentflow/core/graph/compiled_graph.py` +- Lifecycle hook fire points — invoke path: `agentflow/agentflow/core/graph/utils/invoke_handler.py` +- Lifecycle hook fire points — stream path: `agentflow/agentflow/core/graph/utils/stream_handler.py` +- Lifecycle hook fire points — interrupt/resume: `agentflow/agentflow/core/graph/utils/heandler_utils.py` +- Lifecycle hook fire points — checkpoint: `agentflow/agentflow/core/graph/utils/utils.py` - Legacy docs: `agentflow-docs/docs-mkdocs-legacy/reference/library/Command.md` diff --git a/agentflow_cli/cli/templates/skills/agent-skills/references/rate-limiting.md b/agentflow_cli/cli/templates/skills/agent-skills/references/rate-limiting.md new file mode 100644 index 0000000..7db1a51 --- /dev/null +++ b/agentflow_cli/cli/templates/skills/agent-skills/references/rate-limiting.md @@ -0,0 +1,194 @@ +# Rate Limiting + +Use this when adding, configuring, or debugging AgentFlow's built-in sliding-window rate limiter. + +## Overview + +AgentFlow provides a sliding-window rate limiter configured via the `rate_limit` block in +`agentflow.json`. The limiter is disabled by default — add the block to activate it. + +## Quick Start + +In-memory backend for local development or single-process services: + +```json +{ + "agent": "graph.react:app", + "rate_limit": { + "enabled": true, + "backend": "memory", + "requests": 100, + "window": 60, + "by": "ip", + "exclude_paths": ["/health", "/docs", "/redoc", "/openapi.json"] + } +} +``` + +Each client IP may make `requests` calls every `window` seconds. + +## Redis Backend (Production) + +Redis stores counters centrally so the limit is enforced across multiple workers, +containers, or servers. Install the optional extra first: + +```bash +pip install "10xscale-agentflow-cli[redis]" +``` + +```json +{ + "agent": "graph.react:app", + "rate_limit": { + "enabled": true, + "backend": "redis", + "requests": 1000, + "window": 60, + "by": "ip", + "trusted_proxy_headers": true, + "exclude_paths": ["/health", "/metrics", "/docs", "/redoc", "/openapi.json"], + "redis": { + "url": "${RATE_LIMIT_REDIS_URL}", + "prefix": "agentflow:rate-limit" + }, + "fail_open": true + } +} +``` + +Set the environment variable: + +```bash +RATE_LIMIT_REDIS_URL=redis://localhost:6379/0 +``` + +The Redis backend uses an atomic Lua script with sorted sets — check and record happen as a +single Redis operation, which prevents concurrent requests from racing past the limit. + +## Configuration Reference + +| Field | Type | Default | Description | +| --- | --- | --- | --- | +| `enabled` | boolean | `true` | Enables the middleware when the `rate_limit` block exists. | +| `backend` | string | `"memory"` | `memory`, `redis`, or `custom`. | +| `requests` | integer | `100` | Maximum requests allowed in each window. | +| `window` | integer | `60` | Window size in seconds. | +| `by` | string | `"ip"` | Limit by client IP (`"ip"`) or one shared quota (`"global"`). | +| `exclude_paths` | string array | `[]` | Paths that bypass rate limiting entirely. | +| `trusted_proxy_headers` | boolean | `false` | Use `X-Forwarded-For` as the client IP (only behind a trusted proxy). | +| `redis.url` | string | `null` | Redis URL; required for the Redis backend. Supports `${ENV_VAR}` expansion. | +| `redis.prefix` | string | `"agentflow:rate-limit"` | Prefix for all Redis keys. | +| `fail_open` | boolean | `true` | On Redis errors, allow (`true`) or deny (`false`) requests. | + +## Identity Modes + +Per-IP limit (most public APIs): + +```json +{ "rate_limit": { "requests": 100, "window": 60, "by": "ip" } } +``` + +Global limit (one shared quota for the whole service): + +```json +{ "rate_limit": { "requests": 5000, "window": 60, "by": "global" } } +``` + +Only enable `trusted_proxy_headers` when your app sits behind a trusted proxy that strips +untrusted `X-Forwarded-For` headers from direct clients. + +## Response Headers + +Every response includes: + +| Header | Description | +| --- | --- | +| `X-RateLimit-Limit` | Configured request limit. | +| `X-RateLimit-Remaining` | Requests remaining in the current window. | +| `X-RateLimit-Reset` | Unix timestamp for the window reset estimate. | +| `X-RateLimit-Reset-After` | Seconds until the window reset estimate. | +| `Retry-After` | Present on `429` responses only. | + +When the limit is exceeded, AgentFlow returns `429 Too Many Requests`: + +```json +{ + "error": { + "code": "RATE_LIMIT_EXCEEDED", + "message": "Too many requests. Limit: 100 per 60s. Retry after 12s.", + "limit": 100, + "window_seconds": 60, + "retry_after_seconds": 12 + }, + "metadata": { + "request_id": "request-id", + "status": "error" + } +} +``` + +## Custom Backend + +Implement `BaseRateLimitBackend` and bind the instance through InjectQ, then set +`"backend": "custom"` in `agentflow.json`. + +```python +from agentflow_cli.src.app.core.middleware.rate_limit import ( + BaseRateLimitBackend, + RateLimitDecision, +) + + +class MyRateLimitBackend(BaseRateLimitBackend): + async def check(self, key: str, *, limit: int, window: int) -> RateLimitDecision: + allowed = True + remaining = limit - 1 + reset_after = window + return RateLimitDecision( + allowed=allowed, + remaining=remaining, + reset_after=reset_after, + ) + + async def close(self) -> None: + return None +``` + +```json +{ + "rate_limit": { + "enabled": true, + "backend": "custom", + "requests": 100, + "window": 60, + "by": "ip" + } +} +``` + +## Choosing a Backend + +| Scenario | Backend | +| --- | --- | +| Local development, tests, demos | `memory` | +| Single-process production | `memory` | +| Gunicorn/Uvicorn with multiple workers | `redis` | +| Docker / Kubernetes (multiple replicas) | `redis` | +| Custom storage or quotas | `custom` | + +## Rules + +- Do not enable `trusted_proxy_headers` unless a reverse proxy strips untrusted forwarding headers. +- Always add health-check and observability paths to `exclude_paths`. +- In production with Redis set `fail_open: false` only when hard enforcement is required; otherwise `true` prevents availability issues during Redis outages. +- The `redis` backend requires the `redis` extra: `pip install "10xscale-agentflow-cli[redis]"`. +- The `RATE_LIMIT_REDIS_URL` value supports `${ENV_VAR}` expansion — keep secrets out of committed config. + +## Source Map + +- Middleware: `agentflow-api/agentflow_cli/src/app/core/middleware/rate_limit/` +- Base class: `agentflow-api/agentflow_cli/src/app/core/middleware/rate_limit/base.py` +- Rate-limit config model: `agentflow-api/agentflow_cli/src/app/core/config/graph_config.py` +- Middleware setup: `agentflow-api/agentflow_cli/src/app/core/config/setup_middleware.py` +- Docs: `agentflow-api/docs/rate-limiting.md` +- Configuration reference: `agentflow-api/docs/configuration.md` — "Rate Limiting" section diff --git a/agentflow_cli/cli/templates/skills/copilot/agentflow.instructions.md b/agentflow_cli/cli/templates/skills/copilot/agentflow.instructions.md index a863ea5..d0a2399 100644 --- a/agentflow_cli/cli/templates/skills/copilot/agentflow.instructions.md +++ b/agentflow_cli/cli/templates/skills/copilot/agentflow.instructions.md @@ -32,6 +32,7 @@ Never use repository folder names (e.g. `agentflow-cli`) in install commands or - Tools need docstrings and type annotations so model-facing schemas are useful. - Injectable parameters (`state`, `config`, `tool_call_id`) are hidden from the model schema. - For production, avoid process-local storage for shared state — use durable checkpointer/store backends. +- Add observability, audit, or business-logic side effects by registering a `GraphLifecycleHook` on `CallbackManager` — do not wrap `ainvoke()` / `astream()` calls in application code to achieve the same result. ## Where to look when you need more detail @@ -44,7 +45,9 @@ For deeper context on any subsystem, read the matching reference under `.github/ - Multimodal media, long-term memory stores - Streaming, SSE, runtime publishers, A2A/ACP protocols - API server, REST routes, auth, errors, settings, middleware +- Rate limiting: sliding-window config, memory/Redis/custom backends, response headers, 429 behavior: `references/rate-limiting.md` - TypeScript client: invoke, stream, threads, memory, files, A2UI +- Observability, validators, graph lifecycle hooks (`GraphLifecycleHook`), and runtime jumps (`Command`): `references/callbacks-and-command.md` ## Verifying behavior