-
Notifications
You must be signed in to change notification settings - Fork 0
Architecture
# Architecture
## System Overview
AI Agent (Claude, Cursor, Copilot, etc.) │ │ MCP Protocol (stdio or SSE) ▼ ┌─────────────────────────────────────────┐ │ MCP Server Entry Point (server.py) │ │ ├─ FastMCP instance │ │ ├─ Tool registration & routing │ │ ├─ Lifespan management │ │ └─ Background task coordination │ └──────────┬──────────────────────────────┘ │ ┌──────────▼──────────────────────────────┐ │ Security Validator │ │ ├─ Function blocklist scanning │ │ ├─ Filename sanitization │ │ └─ Upload size enforcement │ └──────────┬──────────────────────────────┘ │ ┌──────────▼──────────────────────────────┐ │ Job Executor │ │ ├─ Job creation & context injection │ │ ├─ Hybrid sync/async execution │ │ ├─ Timeout-based async promotion │ │ └─ Background result collection │ └──────────┬──────────────────────────────┘ │ ┌──────────▼──────────────────────────────┐ │ Engine Pool Manager │ │ ├─ Elastic scaling (min→max) │ │ ├─ Health checks & engine replacement │ │ ├─ Workspace isolation & reset │ │ └─ Queue-based acquisition │ └──────────┬──────────────────────────────┘ │ ┌──────────▼──────────────────────────────┐ │ Engine Wrappers │ │ ├─ Lifecycle (start, stop) │ │ ├─ Code execution (sync & background) │ │ ├─ Workspace reset │ │ └─ Health check ping │ └──────────┬──────────────────────────────┘ │ ┌──────────▼──────────────────────────────┐ │ MATLAB Engines (2020b+) │ │ Engine 1 │ Engine 2 │ ... │ Engine N │ └──────────────────────────────────────────┘
## Component Details
### MCP Server Entry Point (`server.py`)
The main orchestrator. Uses [FastMCP](https://github.com/jlowin/fastmcp) framework to handle MCP protocol details. Responsibilities:
- Holds all server-level state: engine pool, job tracker, executor, session manager, security validator, and optional metrics collector
- Registers all built-in tools (20+ functions) and custom tools from YAML
- Manages server lifecycle: startup (engine pool initialization, background tasks), graceful shutdown (drain jobs, stop engines), and request routing
- Runs background tasks: periodic health checks, session/job cleanup, metrics collection
- Extracts session ID from MCP context (SSE) or uses default (stdio) and routes to appropriate session
### Security Validator (`security/validator.py`)
Pre-execution security checks on all code:
- **Function blocklist:** Scans code for dangerous functions (`system`, `unix`, `dos`, `!`, `eval`, `feval`, `evalc`, `evalin`, `assignin`, `perl`, `python`). Smart pattern matching strips string literals and comments first to avoid false positives
- **Filename sanitization:** Prevents path traversal attacks in file upload operations
- **Upload size limits:** Enforces `max_upload_size_mb` configuration per request
- **Metrics:** Records security events when monitoring is enabled
### Job Executor (`jobs/executor.py`)
Orchestrates the full lifecycle of code execution:
1. **Job creation:** Creates job record in tracker with session context
2. **Engine acquisition:** Acquires an engine from the pool (blocks if at max capacity)
3. **Context injection:** Injects job metadata into MATLAB workspace (`__mcp_job_id__`, `__mcp_session_id__`, `__mcp_temp_dir__`)
4. **Background execution:** Starts code evaluation with stdout/stderr capture via `engine.execute(background=True)`
5. **Sync wait:** Waits up to `sync_timeout` seconds for execution
- **On completion:** Returns result inline with `status="completed"`
- **On timeout:** Promotes to async, spawns background monitoring task, returns `status="pending"` with job ID
6. **Result building:** Calls formatter to structure output (text, variables, figures, files)
7. **Engine release:** Resets workspace and returns engine to pool
8. **Metrics:** Records execution time, code length, output length, and errors
### Job Tracker (`jobs/tracker.py`)
In-memory, thread-safe store for job metadata:
- Create/get/list/cancel jobs with unique IDs
- Store job status (PENDING, RUNNING, COMPLETED, FAILED, CANCELLED)
- Maintain future objects for async monitoring
- Auto-prune completed jobs older than `job_retention_seconds`
- Thread-safe operations using asyncio locks
### Engine Pool Manager (`pool/manager.py`)
Manages a dynamic pool of MATLAB engine instances:
- **Startup:** Starts `min_engines` engines in parallel during server startup
- **Elastic scaling:** On acquire, if no engines available and total < `max_engines`, starts a new engine. If at max, blocks in queue until one becomes available
- **Scale-down:** Periodic task stops idle engines beyond `min_engines` if idle longer than `scale_down_idle_timeout` (e.g., 15 min)
- **Health checks:** Periodic task runs `1+1` eval on idle engines. Failed engines are stopped and replaced with fresh instances
- **Queue:** `asyncio.Queue` holds available engines; waiters block if queue empty and pool at max
- **Metrics:** Records scale-up events, health check failures, and pool utilization
### Engine Wrapper (`pool/engine.py`)
Wraps a single `matlab.engine` instance with state management:
- **Lazy import:** Defers `matlab.engine` import so tests can mock it
- **Lifecycle:** Start (applies default paths and startup commands), stop (quit engine)
- **Workspace reset:** Clears all variables, globals, functions, closes files, restores paths, re-applies config
- **Health check:** Runs trivial `1+1` eval to confirm responsiveness
- **Code execution:** Sync and background execution modes with stdout/stderr capture
- **State tracking:** Tracks lifecycle state (STOPPED, STARTING, IDLE, BUSY) and idle duration for scale-down logic
### Session Manager (`session/manager.py`)
Per-user session isolation and temporary file management:
- **Session creation:** Generates unique session ID, creates dedicated temp directory
- **Default session:** Single "default" session for stdio transport (single-user mode)
- **Per-session state:** Temp directory path, creation timestamp, last activity timestamp
- **Session cleanup:** Removes expired sessions (idle > `session_timeout`) and deletes temp directories. Respects active jobs (skips sessions with running/pending jobs)
- **Max sessions:** Enforces `max_sessions` limit to prevent resource exhaustion
- **Metrics:** Records session creation/destruction events
### Output Formatter (`output/formatter.py`)
Structures tool responses into MCP-compatible format:
- **Text formatting:** Truncates output to `max_inline_text_length`, saves overflow to file, flags truncation
- **Variable formatting:** Summarizes workspace variables with name, type, size, and value (truncates large arrays)
- **Success response:** Combines text, variables, figures, files, warnings, and execution time into structured dict
- **Error response:** Builds error responses with error type, message, MATLAB error ID (if available)
- **Figure attachment:** Includes Plotly JSON, static PNG, and optional thumbnail
- **File listing:** Tracks files written during execution
### Plotly Converter (`output/plotly_convert.py`, `output/plotly_style_mapper.py` + `matlab_helpers/mcp_extract_props.m`)
Converts MATLAB figures to interactive Plotly JSON:
1. **MATLAB-side** (`mcp_extract_props.m`): Extracts raw figure properties (line, scatter, bar, histogram, surface, image objects) and saves as JSON
2. **Python-side** (`plotly_style_mapper.py`): Converts MATLAB styles (line styles, markers, colormaps, fonts, colors) to Plotly equivalents. Detects large datasets (10,000+ points) and enables WebGL rendering for performance
3. **Python-side** (`plotly_convert.py`): Reads extracted JSON, applies style mapping, builds Plotly figure dict
4. **Result composition:** Returns Plotly JSON + static PNG image + optional thumbnail
### Monitoring (`monitoring/collector.py`, `monitoring/store.py`)
Optional metrics collection and export:
- **MetricsCollector:** Records events (engine scale-up, job completed, security checks, etc.) with timestamps and metadata
- **MetricsStore:** Writes events to file or in-memory buffer. Supports export formats for observability tools
- **Lifespan wiring:** Initialized during server startup if `monitoring.enabled=true`, closed during shutdown
- **Integration:** Collector passed to pool, executor, session manager, and security validator for comprehensive instrumentation
## Data Flow
### Sync Execution (Completes in < sync_timeout)
Agent → execute_code("x = magic(3)") → Security check (passes blocklist scan) → Create job in tracker → Acquire engine from pool → Inject job context (mcp_job_id, mcp_temp_dir) → Engine.execute(code, background=True) with stdout capture → Wait for completion (< 30s timeout) → Collect result from engine → Format output (text, variables, figures) → Mark job completed in tracker → Release engine (reset workspace, return to pool) → Return {status: "completed", job_id: "...", output: {...}, variables: [...]} → Record metrics (execution time, output size)
### Async Promotion (Exceeds sync_timeout)
Agent → execute_code("long_simulation()") → Security check (passes) → Create job in tracker → Acquire engine → Inject job context → Engine.execute(code, background=True) with stdout capture → Wait up to sync_timeout (30s) → TIMEOUT exceeded → async promotion → Spawn background task to monitor completion → Release engine to pool (workspace NOT reset — job still running) → Return {status: "pending", job_id: "abc123"} → Record event: job promoted to async
Agent → get_job_status("abc123") → Return {status: "running", progress: 45%, started_at: "...", engine_id: "engine-0"}
Agent → get_job_status("abc123") [later] → Return {status: "running", progress: 90%, ...}
[Background task monitoring future completes] → Mark job completed in tracker → Engine reset and released to pool
Agent → get_job_result("abc123") → Return {status: "completed", job_id: "abc123", output: {...}, variables: [...]} → Record metrics (total execution time, background flag)
## Transport Modes
### stdio (Default)
- Single agent connection, single session ("default")
- Communication via stdin/stdout
- Simplest deployment, no network exposure
- Ideal for local development or single-user scripting
### SSE (Server-Sent Events)
- Multiple concurrent agents, multiple isolated sessions
- HTTP-based transport with session ID in request headers
- Each session gets dedicated temp directory, isolated workspace
- **Production setup:** Must run behind authenticating reverse proxy
- Set `require_proxy_auth: true` to enforce proxy-level authentication
- Server logs warning if SSE enabled without proxy auth
- Reverse proxy (nginx, Apache, etc.) handles session authentication and routing