Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added .codex
Empty file.
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ A professional Python API framework for building agent-based applications with F
- **[Configuration Guide](./docs/configuration.md)** - All configuration options explained
- **[Deployment Guide](./docs/deployment.md)** - Docker, Kubernetes, and cloud deployment
- **[Authentication Guide](./docs/authentication.md)** - JWT and custom authentication
- **[Rate Limiting Guide](./docs/rate-limiting.md)** - Memory, Redis, and custom rate-limit backends
- **[ID Generation Guide](./docs/id-generation.md)** - Snowflake ID generation
- **[Thread Name Generator Guide](./docs/thread-name-generator.md)** - Thread naming strategies

Expand All @@ -19,6 +20,21 @@ A professional Python API framework for building agent-based applications with F
pip install 10xscale-agentflow-cli
```

Redis rate limiting is optional. Install the Redis extra only when you configure
`rate_limit.backend` as `redis`:

```bash
pip install "10xscale-agentflow-cli[redis]"
```

JWT auth and document text extraction are optional too. Install only the extra
you need:

```bash
pip install "10xscale-agentflow-cli[jwt]"
pip install "10xscale-agentflow-cli[media]"
```

### Initialize a New Project

```bash
Expand Down Expand Up @@ -53,6 +69,7 @@ agentflow build --docker-compose
- ✅ **State Graph Orchestration** - Build complex agent workflows with LangGraph
- ✅ **FastAPI Backend** - High-performance async web framework
- ✅ **Authentication** - Built-in JWT auth and custom authentication support
- ✅ **Rate Limiting** - Sliding-window limits with memory, Redis, and custom backends
- ✅ **ID Generation** - Distributed Snowflake ID generation
- ✅ **Thread Management** - Intelligent thread naming and conversation management
- ✅ **Docker Ready** - Generate production-ready Docker files
Expand Down Expand Up @@ -174,6 +191,7 @@ The configuration file (`agentflow.json`) defines your agent, authentication, an
| `injectq` | string\|null | Path to InjectQ container |
| `store` | string\|null | Path to data store |
| `redis` | string\|null | Redis connection URL |
| `rate_limit` | object\|null | Sliding-window rate limiting configuration |
| `thread_name_generator` | string\|null | Path to custom thread name generator |

See the [Configuration Guide](./docs/configuration.md) for complete details.
Expand Down Expand Up @@ -544,4 +562,3 @@ Developed by [10xScale](https://10xscale.ai) and maintained by the community.
---

**Made with ❤️ for the AI agent development community**

11 changes: 10 additions & 1 deletion agentflow.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,14 @@
"agent": "graph.react:app",
"thread_name_generator": "graph.thread_name_generator:MyNameGenerator",
"env": ".env",
"auth": null
"auth": null,
"rate_limit": {
"enabled": true,
"backend": "memory",
"requests": 100,
"window": 60,
"by": "ip",
"trusted_proxy_headers": false,
"exclude_paths": ["/health", "/docs", "/redoc", "/openapi.json"]
}
}
2 changes: 2 additions & 0 deletions agentflow_cli/cli/templates/skills/agent-skills/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ metadata:
- references/api-configuration.md
- references/auth-and-authorization.md
- references/api-settings-and-middleware.md
- references/rate-limiting.md
- references/rest-api-and-errors.md
- references/id-and-thread-name-generators.md
- references/client-auth-and-errors.md
Expand Down Expand Up @@ -78,6 +79,7 @@ Treat `agentflow-docs/docs` as the first source of truth for public package name
- `agentflow.json` and dependency loading: `references/api-configuration.md`
- API auth and authorization: `references/auth-and-authorization.md`
- API environment, settings, and middleware: `references/api-settings-and-middleware.md`
- Rate limiting (config, backends, headers, custom backend): `references/rate-limiting.md`
- REST routes and error behavior: `references/rest-api-and-errors.md`
- API Snowflake IDs and thread naming: `references/id-and-thread-name-generators.md`
- TypeScript auth helpers and structured errors: `references/client-auth-and-errors.md`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,15 @@ Common full shape:
"thread_name_generator": "graph.thread_name_generator:MyNameGenerator",
"authorization": "graph.auth:my_authorization_backend",
"env": ".env",
"auth": "jwt"
"auth": "jwt",
"rate_limit": {
"enabled": true,
"backend": "memory",
"requests": 100,
"window": 60,
"by": "ip",
"exclude_paths": ["/health", "/docs", "/redoc", "/openapi.json"]
}
}
```

Expand All @@ -37,6 +45,7 @@ Common full shape:
- `authorization`: optional import path to an authorization backend.
- `env`: optional `.env` path loaded before graph import.
- `auth`: `null`, `"jwt"`, or `{"method": "custom", "path": "module:backend"}`.
- `rate_limit`: optional sliding-window rate limiter config object; omit or set to `null` to disable. See `references/rate-limiting.md` for the full field reference.

## Loading Order

Expand All @@ -62,3 +71,4 @@ Common full shape:
- App startup: `agentflow-api/agentflow_cli/src/app/main.py`
- Main docs: `agentflow-docs/docs/reference/api-cli/configuration.md`
- How-to: `agentflow-docs/docs/how-to/api-cli/configure-agentflow-json.md`
- Rate limit config details: `references/rate-limiting.md`
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ Active middleware areas:
- Request ID assignment.
- Selective gzip behavior; streaming endpoints should avoid gzip buffering when configured.
- Worker middleware where used by deployment.
- Rate limiting: sliding-window limiter controlled by the `rate_limit` block in `agentflow.json`.
Uses an in-process `memory` backend by default; use the `redis` backend (requires
`pip install "10xscale-agentflow-cli[redis]"`) for multi-worker or multi-instance deployments.
Excluded paths, identity mode (`ip` or `global`), and `fail_open` behavior are all configurable.
See `references/rate-limiting.md` for the full option reference.

## Production Warnings

Expand Down Expand Up @@ -76,7 +81,10 @@ Production mode warns about unsafe defaults such as:
- Middleware setup: `agentflow-api/agentflow_cli/src/app/core/config/setup_middleware.py`
- Request limits: `agentflow-api/agentflow_cli/src/app/core/middleware/request_limits.py`
- Security headers: `agentflow-api/agentflow_cli/src/app/core/middleware/security_headers.py`
- Rate limit middleware: `agentflow-api/agentflow_cli/src/app/core/middleware/rate_limit/`
- Rate limit base class: `agentflow-api/agentflow_cli/src/app/core/middleware/rate_limit/base.py`
- Sentry: `agentflow-api/agentflow_cli/src/app/core/config/sentry_config.py`
- Log sanitizer: `agentflow-api/agentflow_cli/src/app/core/utils/log_sanitizer.py`
- Main docs: `agentflow-docs/docs/reference/api-cli/environment.md`
- Production docs: `agentflow-docs/docs/how-to/production/environment-variables.md`
- Rate limiting docs: `agentflow-api/docs/rate-limiting.md`
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,166 @@ app = graph.compile(callback_manager=callback_manager)

Use validators for safety checks, business rules, input policy, and prompt-injection detection.

## Graph Lifecycle Hooks

Graph lifecycle hooks fire at **graph orchestration level** — before/after the entire graph run, on checkpoints, on interrupts, on resume, and after each node transition. They complement the invocation-level hooks above (`before_invoke` / `after_invoke` / `on_error`), which fire inside a single node's AI/Tool/MCP call.

### GraphLifecycleContext

All 7 hooks receive this as their first argument. It provides run-identifying metadata.

```python
@dataclass
class GraphLifecycleContext:
config: dict[str, Any] # full config dict passed to invoke/stream
timestamp: str # ISO8601 start time from config["timestamp"]
metadata: dict[str, Any] | None = None # open-ended extra context

@property
def thread_id(self) -> str | None: ...

@property
def run_id(self) -> str | None: ...
```

### GraphLifecycleHook

Subclass `GraphLifecycleHook` and override only the methods you need. All methods are async and default to no-ops.

```python
from agentflow.utils.callbacks import GraphLifecycleHook, GraphLifecycleContext, CallbackManager
from agentflow.core.state import AgentState
from agentflow.core.state.message import Message

class GraphLifecycleHook(ABC):
async def on_graph_start(
self, context: GraphLifecycleContext, state: AgentState
) -> AgentState | None: ...
# Fires: after state is loaded, before the first node executes.
# Return modified AgentState to replace the initial state, or None.

async def on_graph_end(
self, context: GraphLifecycleContext, final_state: AgentState,
messages: list[Message], total_steps: int
) -> AgentState | None: ...
# Fires: after execution loop completes, before final persistence.
# Return modified AgentState or None.

async def on_graph_error(
self, context: GraphLifecycleContext, error: Exception,
partial_state: AgentState, messages: list[Message], step: int, node_name: str
) -> tuple[AgentState, str] | None: ...
# Fires: when an unhandled exception escapes the graph loop.
# Return (AgentState, error_message) to change persisted error snapshot, or None.
# Cannot suppress the error — always re-raised after this hook.

async def on_interrupt(
self, context: GraphLifecycleContext, interrupted_node: str,
interrupt_type: str, # "before" | "after" | "stop" | "remote_tool"
state: AgentState
) -> AgentState | None: ...
# Fires: before interrupt state is persisted (covers before/after/stop/remote_tool types).
# Return modified AgentState or None.

async def on_resume(
self, context: GraphLifecycleContext, resumed_node: str,
state: AgentState, resume_data: dict[str, Any]
) -> AgentState | None: ...
# Fires: when a paused graph is resumed, before clear_interrupt().
# resume_data is mutable in-place. Return modified AgentState or None.

async def on_checkpoint(
self, context: GraphLifecycleContext, state: AgentState,
messages: list[Message], is_context_trimmed: bool
) -> tuple[AgentState, list[Message]] | AgentState | None: ...
# Fires: immediately before state/messages are persisted (every checkpoint, not just final).
# Return (AgentState, messages), AgentState, or None.

async def on_state_update(
self, context: GraphLifecycleContext, node_name: str,
old_state: AgentState, new_state: AgentState, step: int
) -> AgentState | None: ...
# Fires: after each node produces a result and state is merged.
# Return modified AgentState or None.
```

### Registration

```python
callback_mgr = CallbackManager()
callback_mgr.register_lifecycle_hook(MyHook())

# Combine with existing invocation-level hooks on the same manager:
callback_mgr.register_after_invoke(InvocationType.AI, my_ai_callback)

app = graph.compile(callback_manager=callback_mgr)
```

### Hook Summary

| Hook | Returns | Fires N times | Fire location |
|---|---|---|---|
| `on_graph_start` | `AgentState \| None` | 1 per run | After state load, before loop |
| `on_graph_end` | `AgentState \| None` | 1 per successful run | After `state.complete()`, before final `sync_data()` |
| `on_graph_error` | `tuple[AgentState, str] \| None` | 1 per failed run | In except block, before error `sync_data()` |
| `on_interrupt` | `AgentState \| None` | 0–N per run | Before interrupt checkpoint persistence |
| `on_resume` | `AgentState \| None` | 0–1 per call | Before `clear_interrupt()` |
| `on_checkpoint` | `(AgentState, list[Message]) \| AgentState \| None` | 1–N per run | Before every durable checkpoint write |
| `on_state_update` | `AgentState \| None` | N per run (once per node) | After each node result is merged |

### Example

```python
class ObservabilityHook(GraphLifecycleHook):
async def on_graph_start(self, ctx, state):
self._span = tracer.start_span(f"graph.run.{ctx.thread_id}")
return None

async def on_graph_end(self, ctx, final_state, messages, total_steps):
self._span.set_attribute("steps", total_steps)
self._span.end()

async def on_graph_error(self, ctx, error, partial_state, messages, step, node_name):
self._span.record_exception(error)
self._span.end()
alert_oncall(f"Graph failed at node {node_name}: {error}")
return None

async def on_interrupt(self, ctx, interrupted_node, interrupt_type, state):
notify_frontend(ctx.thread_id, status="waiting_for_input", node=interrupted_node)
return None

async def on_resume(self, ctx, resumed_node, state, resume_data):
notify_frontend(ctx.thread_id, status="resuming", node=resumed_node)
return None

async def on_checkpoint(self, ctx, state, messages, is_context_trimmed):
metrics.increment("agentflow.checkpoints", tags={"thread": ctx.thread_id})
return None

async def on_state_update(self, ctx, node_name, old_state, new_state, step):
diff = compute_diff(old_state, new_state)
stream_diff_to_frontend(ctx.thread_id, diff)
return None


callback_mgr = CallbackManager()
callback_mgr.register_lifecycle_hook(ObservabilityHook())
app = graph.compile(callback_manager=callback_mgr)
```

### Common Use Cases by Hook

- **`on_graph_start`**: inject trace IDs, pre-populate state from external DB, set rate-limit budgets, initialize OpenTelemetry spans.
- **`on_graph_end`**: send completion notifications (Slack/email), record step/message count metrics, archive transcripts, trigger downstream webhooks.
- **`on_graph_error`**: alert PagerDuty/Sentry, log structured failure diagnostics, close OTel spans with error status.
- **`on_interrupt`**: push "waiting for approval" notifications to frontend/mobile, start timeout timers, update task queue status.
- **`on_resume`**: cancel timeout timers, validate resume payload, record interrupt→resume cycle for audit trail.
- **`on_checkpoint`**: redact sensitive data before persistence, replicate to secondary store, invalidate caches, compliance audit logging (SOC2/HIPAA).
- **`on_state_update`**: real-time state diffing for frontend streaming, per-node invariant assertions, security scanning of state content.

---

## Command

`Command` lets a node combine state/message updates with control flow. Use it when the next node depends on runtime logic inside the node and is awkward to express as a static conditional edge.
Expand Down Expand Up @@ -94,9 +254,13 @@ def router_node(state, config):

## Source Map

- Callback system: `agentflow/agentflow/utils/callbacks.py`
- Callback system (invocation hooks + graph lifecycle hooks): `agentflow/agentflow/utils/callbacks.py`
- Default validators: `agentflow/agentflow/utils/validators.py`
- Graph compile callback argument: `agentflow/agentflow/core/graph/state_graph.py`
- Command API: `agentflow/agentflow/utils/command.py`
- Command execution paths: `agentflow/agentflow/core/graph/compiled_graph.py`
- Lifecycle hook fire points — invoke path: `agentflow/agentflow/core/graph/utils/invoke_handler.py`
- Lifecycle hook fire points — stream path: `agentflow/agentflow/core/graph/utils/stream_handler.py`
- Lifecycle hook fire points — interrupt/resume: `agentflow/agentflow/core/graph/utils/heandler_utils.py`
- Lifecycle hook fire points — checkpoint: `agentflow/agentflow/core/graph/utils/utils.py`
- Legacy docs: `agentflow-docs/docs-mkdocs-legacy/reference/library/Command.md`
Loading
Loading