-
-
Notifications
You must be signed in to change notification settings - Fork 34
Distributed Nodes
Hub ←→ Node distributed architecture for CortexPrism, enabling a central Hub to orchestrate multiple remote Nodes with tiered capabilities, secure WebSocket communication, and session routing.
Hub side (src/hub/, 4 files):
| File | Purpose |
|---|---|
node-registry.ts |
Persistent Node registration, token management, status tracking |
ws-node.ts |
WebSocket handler, directive dispatch, heartbeat monitoring |
session-routing.ts |
Route directive results back to originating sessions |
capability-tiers.ts |
Three-tier permission model (root/sudo/unprivileged) |
Remote side (src/remote/, 3 files):
| File | Purpose |
|---|---|
agent.ts |
Node agent loop — connection, heartbeat, directive execution, local policy enforcement |
manager.ts |
In-memory agent registry and config management |
types.ts |
Protocol message types, metrics, stream chunk types |
Persistent storage in SQLite (nodes table) via the core database. Each node has:
| Field | Description |
|---|---|
id |
Generated: node_<timestamp>_<random>
|
name |
Human-readable name |
endpoint |
WebSocket endpoint URL |
tier |
root / sudo / unprivileged
|
status |
connecting / connected / disconnected / error / deregistered
|
capabilities |
JSON array of capability strings |
version |
Cortex version running on node |
group_name |
Optional grouping label |
last_heartbeat |
ISO timestamp of last ping |
last_processed_directive_id |
For resume on reconnect |
registered_at / created_at / updated_at
|
Timestamps |
Token management: 32-byte hex tokens stored in the vault (node_token_<id>), with rotation and validation. Nodes can be grouped and queried by tier, status, and group.
handleNodeWebSocket() upgrades HTTP requests to WebSocket and processes messages:
Hub → Node messages: registered, heartbeat_ack, directive, cancel, config_update, rekey, error
Node → Hub messages: register, heartbeat, result, stream_chunk, disconnect
Heartbeat interval: 30 seconds. Timeout: 3x interval (90s). A health check loop runs every 10 seconds to detect stale connections via heartbeat acknowledgments and the database (for disconnected nodes with stale heartbeat timestamps).
dispatchDirective() sends directives to connected nodes after validating against the node's capability tier and global policies via validateNodeDirective(). dispatchAndWait() returns a Promise that resolves when the node sends back a result message or rejects on timeout.
Pending directives are tracked with automatic cleanup on node disconnect or timeout.
directiveSessionMap maps directive IDs to {sessionId, nodeId} pairs. When a node returns a result, routeResult() fires result handlers with the session context. The system supports registration, cancellation, and event-based handlers for result routing.
| Tier | Shell | File Writes | System Paths | Description |
|---|---|---|---|---|
| root | Full sudo | All paths (/) |
Unrestricted | Full unrestricted access, audit-logged |
| sudo | Managed commands via regex |
/home/, /opt/, /var/, /tmp/, /etc/, /usr/local/
|
Forbidden: /etc/shadow, /root/, /boot/, /sys/, /proc/
|
Scoped sudo with allow-listed commands |
| unprivileged | Blocked |
/home/, /tmp/ only |
Forbidden: /etc/, /root/, /boot/, /sys/, /proc/, /var/, /opt/, /usr/
|
Read-only exploration, no shell |
Sudo-tier allowed commands use a regex allow-list:
systemctl\s+(restart|start|stop|status)\s+\S+
docker\s+(ps|logs|restart|start|stop)\b.*
apt-get\s+(install|update|upgrade)\b.*
journalctl\b.*
supervisorctl\s+(restart|start|stop|status)\s+\S+
Validation functions: isToolAllowedByTier(), isPathAllowedByTier(), isCommandAllowedByTier().
runNodeAgent() implements the Node-side loop:
-
Connect: Establishes WebSocket to Hub and sends
registerwith agentId, name, token, version, tier, group, and last-processed directive ID -
Heartbeat: Every
heartbeatMssendsheartbeatwith system metrics (CPU%, memory MB, disk free MB, active directives, uptime) -
Directive handling: Parses directives into tool calls, enforces local policy checks (tier + path + command validation), executes via
executeTool(), optionally streams output in 4KB chunks - Reconnection: Exponential backoff (doubles each attempt, capped at 30s)
Before executing any directive, the node performs a secondary policy check:
- Validates tool against tier whitelist/blocklist
- Validates shell/exec commands against tier command regex
- Validates file operation paths against tier path restrictions
Node metrics collected every heartbeat cycle:
- CPU percentage (from
/proc/stat) - Memory (from
/proc/meminfo) - Disk free/total (from
dfcommand) - Active directive count
- Uptime in seconds
| Type | Fields |
|---|---|
register |
agentId, name, token, capabilities, version, tier, group, lastProcessedDirectiveId
|
heartbeat |
agentId, metrics? (cpuPercent, memoryMb, memoryTotalMb, diskFreeMb, diskTotalMb, activeDirectives, uptimeSeconds) |
result |
directiveId, success, output, error?, durationMs
|
stream_chunk |
directiveId, seq, chunk
|
disconnect |
reason |
| Type | Fields |
|---|---|
registered |
agentId |
heartbeat_ack |
agentId |
directive |
id, sessionId, action, params, stream?, timeoutMs?
|
cancel |
directiveId |
config_update |
agentId, policies?, toolsAllowList?
|
rekey |
agentId, newToken
|
error |
message, code? (bidirectional) |
cortex hub start # Start Hub server
cortex node register # Register a new node
cortex node list # List registered nodes
cortex node deregister ID # Deregister a node
cortex node token ID # Show node token
cortex node token-rotate ID # Rotate node token
cortex remote connect # Start remote node agent| Method | Path | Description |
|---|---|---|
GET |
/api/nodes |
List nodes (filter: ?group=, ?tier=, ?status=) |
POST |
/api/nodes |
Register node |
DELETE |
/api/nodes/:id |
Deregister node |
POST |
/api/nodes/:id/token-rotate |
Rotate token |
GET |
/api/nodes/groups |
List node groups |
WS |
/ws/node |
Node WebSocket endpoint |
- MCP Gateway — Security model with risk levels and approval workflows
- A2A Protocol — Alternative inter-agent protocol
- Triggers — Event-driven automation
CortexPrism — Open-source agentic AI harness · MIT License · Built with Deno 2.x + TypeScript
- Agent Loop
- Metacognition
- Memory System
- Skills System
- Sub-Agents
- Built-in Tools
- Code Intelligence
- Code Sandbox
- Cross-Agent Context Protocol
- Prompt Lab
- PKM Assistant
- Voice Pipeline
- Computer Use
- Browser Tool
- Git & GitHub
- Scheduler & Jobs
- Dashboard
- Observability
- A2A Protocol
- MCP Gateway
- Distributed Nodes
- Memori Checkpoints
- Eval System
- Workflow Engine
- Triggers
- Projects
- TUI
- Glossary
- Update System