-
Notifications
You must be signed in to change notification settings - Fork 0
Architecture
github-actions[bot] edited this page Mar 18, 2026
·
20 revisions
AI Agent (Claude, Cursor, Copilot, etc.)
│
│ MCP Protocol (stdio or SSE)
▼
┌─────────────────────────────────────────┐
│ MCP Server Entry Point (server.py) │
│ ├─ FastMCP framework │
│ ├─ Tool registration & routing │
│ ├─ Session manager │
│ ├─ Security validator │
│ ├─ Result formatter │
│ ├─ Monitoring collector │
│ └─ Custom tools (from YAML) │
└──────────┬────────────────────────────┬─┘
│ │
┌──────────▼──────────────────┐ ┌─────▼─────────────┐
│ Job Executor │ │ Monitoring System │
│ ├─ Job tracker │ │ ├─ Metrics │
│ ├─ Hybrid sync/async exec │ │ ├─ Event logging │
│ ├─ Timeout-based promotion │ │ └─ Store │
│ ├─ Context injection │ └───────────────────┘
│ └─ Progress tracking │
└──────────┬────────────────┬──┘
│ │
┌──────────▼──────────────┐ │
│ Security Validator │ │
│ ├─ Function blocklist │ │
│ ├─ Filename sanitizer │ │
│ └─ Upload size limits │ │
└────────────────────────┘ │
│
┌───────────────────────────▼────────────┐
│ Engine Pool Manager │
│ ├─ Elastic scaling (min→max) │
│ ├─ Health checks & replacement │
│ ├─ Proactive warmup (80% threshold) │
│ └─ Idle scale-down (15 min timeout) │
└────────────────────┬────────────────────┘
│
┌────────────────────▼────────────────────┐
│ Engine Wrapper (per engine) │
│ ├─ Lifecycle (start/stop) │
│ ├─ Code execution (sync/async) │
│ ├─ Workspace reset │
│ ├─ Health check ping │
│ └─ State tracking │
└────────────────────┬────────────────────┘
│
┌────────────────────▼─────────────────────┐
│ MATLAB Engines (2020b+) │
│ Engine 1 │ Engine 2 │ ... │ Engine N │
└──────────────────────────────────────────┘
The main entry point using FastMCP. Responsibilities:
- Initialize and manage server-level state (pools, trackers, executors, validators, formatters)
- Register all tools (20+ built-in + custom tools from YAML)
- Handle MCP protocol details (tool calls, resource requests)
- Manage server lifecycle (startup, shutdown, graceful drain)
- Route tool calls to implementation modules
- Run background tasks (health checks, session cleanup, metrics)
- Determine session context (stdio uses "default" session, SSE uses context's session_id)
Key State Class: MatlabMCPServer holds:
-
pool:EnginePoolManager -
tracker:JobTracker -
executor:JobExecutor -
sessions:SessionManager -
security:SecurityValidator -
collector:MetricsCollector(optional, when monitoring enabled)
Manages a pool of MATLAB engine instances with elastic scaling:
-
Elastic scaling: Starts with
min_engines, scales up tomax_enginesunder load -
Proactive warmup: When utilization exceeds
proactive_warmup_threshold(80%), starts a new engine before it's needed -
Scale-down: Idle engines beyond
min_enginesare stopped if they've been idle longer thanscale_down_idle_timeout(15 min) -
Health checks: Periodic trivial eval (
1+1) to verify engines are responsive. Unhealthy engines are replaced -
Queue: Requests wait in an async queue (
asyncio.Queue) when all engines are busy - Acquire/Release: Async API for acquiring available engines and returning them to the pool
Wraps a single matlab.engine instance with state tracking:
-
Lifecycle:
start()launches MATLAB, appliesdefault_pathsandstartup_commands;stop()quits MATLAB -
Execution:
execute(code, nargout, background, stdout, stderr)runs code with optional capture -
Workspace reset:
reset_workspace()clears all variables/functions and re-applies configuration -
Health check:
health_check()runs trivial eval to confirm responsiveness -
State tracking: Tracks
EngineState(STOPPED, STARTING, IDLE, BUSY) and idle duration
Orchestrates the full lifecycle of code execution:
-
Create job via
JobTrackerwith uniquejob_id - Acquire engine from pool (waits if all busy)
-
Inject job context into MATLAB workspace:
-
__mcp_job_id__(string) -
__mcp_session_id__(string) -
__mcp_temp_dir__(path to session temp directory)
-
-
Execute code with
engine.execute(code, background=True)and stdout/stderr capture -
Sync timeout logic:
- Wait up to
sync_timeoutseconds for completion - If completes: build and return result inline (status="completed")
- If times out: promote to async background task (status="pending", return
job_id)
- Wait up to
- Async background task: Continue monitoring future, build result when done, store in tracker
- Error handling: Catch and format execution errors, mark job failed
- Monitoring: Record events (job_completed, job_failed, job_promoted) via collector
In-memory store for job metadata and results:
- Create/get/list/cancel jobs: Thread-safe operations with asyncio locks
- Job lifecycle: Track status transitions (PENDING → RUNNING → COMPLETED/FAILED)
-
Retention: Automatically prune completed jobs older than
job_retention_seconds -
Job object: Stores
job_id,session_id,code,status,result,error,future,started_at,completed_at, stdout/stderr buffers
Per-user session isolation and lifecycle:
-
Create session: Allocates unique
session_idand temporary directory underconfig.execution.temp_dir - Per-session state: Each session owns a temp dir for uploads, results, and working files
-
Workspace isolation: When
workspace_isolation=true, workspace is cleared between sessions -
Session timeout: Idle sessions (no activity for
session_timeoutseconds) are eligible for cleanup -
Max sessions: Enforces
max_sessionslimit to prevent unbounded resource consumption -
Default session: stdio transport uses a single "default" session; SSE uses context's
session_id -
Cleanup:
cleanup_expired()removes idle sessions that have no active jobs
Pre-execution security checks to block unsafe code:
-
Function blocklist: Scans code for dangerous functions (
system,unix,dos,!,eval,feval,evalc,evalin,assignin,perl,python). Smart token-based scanning strips comments and string literals first to avoid false positives -
Filename sanitization: Prevents path traversal in upload filenames (rejects
.., absolute paths, etc.) -
Upload size limits: Enforces
max_upload_size_mbfor file uploads - Metrics: Records security events (violations) via collector
Structures tool responses from raw execution results:
-
Text output: Truncates to
max_inline_text_length, saves overflow to file - Variables: Summarizes workspace variables with name, type, size, and value (truncates large arrays)
- Success response builder: Combines text, variables, figures, files, warnings, execution_time into structured dict
- Error response builder: Formats error_type, message, and optional MATLAB error ID
- File listing: Summarizes output files written during execution
Plotly Converter (output/plotly_convert.py, output/plotly_style_mapper.py, matlab_helpers/mcp_extract_props.m)
Converts MATLAB figures to interactive Plotly JSON:
-
MATLAB-side:
mcp_extract_props.mextracts raw figure properties (axes, lines, scatter, bar, histogram, surface, image data) - File-based exchange: Saves extracted properties to JSON file in temp directory
-
Python-side:
plotly_convert.pyreads the saved JSON and builds Plotly layout/traces -
Style mapping:
plotly_style_mapper.pyconverts MATLAB styles (line styles, markers, colormaps, fonts, colors) to Plotly equivalents - WebGL optimization: Automatically uses WebGL for large datasets (10,000+ points)
-
Result: Returns dict with
plotly_json(interactive),png(static thumbnail), and optionalthumbnail(small preview)
When monitoring.enabled=true:
- MetricsCollector: Records events and metrics (engine scale-up, job completion, security violations, etc.)
- Store: Persists metrics to configured backend (e.g., file, external service)
- Integration: Collector is passed to major components (pool, executor, sessions, security) for event recording
Agent → execute_code("x = magic(3)")
→ Security check (OK)
→ Create job in tracker
→ Acquire engine from pool
→ Inject job context (__mcp_job_id__, __mcp_temp_dir__)
→ Engine.execute("x = magic(3)", background=True)
→ Wait up to sync_timeout (30s default)
→ Completes in <30s
→ Build result (query workspace, convert figures, list files)
→ Mark job completed in tracker
→ Release engine to pool (reset workspace)
→ Return {status: "completed", job_id: "abc123", output: {...}, variables: [...], figures: [...]}
Agent → execute_code("long_simulation()")
→ Security check (OK)
→ Create job in tracker
→ Acquire engine from pool
→ Inject job context
→ Engine.execute("long_simulation()", background=True)
→ Wait up to sync_timeout (30s)
→ Timeout exceeded
→ Promote to async background task
→ Release engine to pool (engine remains busy until job completes)
→ Return {status: "pending", job_id: "abc123"}
→ Agent polls get_job_status("abc123") → {status: "running", progress: 45%}
→ Agent polls get_job_status("abc123") → {status: "running", progress: 90%}
→ Background task detects completion
→ Build result, mark job completed
→ Release engine to pool
→ Agent calls get_job_result("abc123") → {status: "completed", output: {...}, variables: [...]}
- One agent, one session
- Communication via stdin/stdout
- Simplest setup, no network
- Uses single "default" session for all requests
- Multiple agents, multiple sessions
- HTTP-based, supports remote connections
- Session isolation via context's
session_id - Each client connection gets its own session
-
Production: Put behind a reverse proxy with auth
- Set
require_proxy_auth: trueto enforce proxy is present - Proxy must authenticate clients and set trusted headers (e.g.,
X-User-ID) - Recommendation: Use OAuth2 or mTLS for client authentication
- Set