Skip to content

Architecture

github-actions[bot] edited this page Mar 18, 2026 · 20 revisions

Architecture

System Overview

AI Agent (Claude, Cursor, Copilot, etc.)
       │
       │ MCP Protocol (stdio or SSE)
       ▼
┌─────────────────────────────────┐
│   MCP Server (FastMCP)           │
│   ├─ 20 built-in tools           │
│   ├─ Custom tools (from YAML)    │
│   ├─ Session manager             │
│   ├─ Security validator          │
│   └─ Result formatter            │
└──────────┬──────────────────────┘
           │
┌──────────▼──────────────────────┐
│   Job Executor                    │
│   ├─ Hybrid sync/async execution  │
│   ├─ Timeout-based promotion      │
│   ├─ Job context injection        │
│   └─ Background task monitoring   │
└──────────┬──────────────────────┘
           │
┌──────────▼──────────────────────┐
│   Engine Pool Manager             │
│   ├─ Elastic scaling (min→max)    │
│   ├─ Health checks                │
│   ├─ Workspace isolation          │
│   └─ Idle scale-down              │
└──────────┬──────────────────────┘
           │
┌──────────▼──────────────────────┐
│   MATLAB Engines (2020b+)         │
│   Engine 1 │ Engine 2 │ ... │ N   │
└───────────────────────────────────┘

Component Details

MCP Server (server.py)

The entry point. Uses FastMCP to handle MCP protocol details. Responsibilities:

  • Register all 20 built-in tools + custom tools from YAML
  • Create and manage core components: EnginePoolManager, JobTracker, JobExecutor, SessionManager, SecurityValidator
  • Manage server lifecycle (startup, shutdown, drain)
  • Route tool calls to implementation modules
  • Handle session identification (stdio uses "default" session; SSE uses context's session_id)
  • Run background tasks (health checks, session cleanup, monitoring)
  • Optionally initialize metrics collector and monitoring store when monitoring.enabled=true

Engine Pool Manager (pool/manager.py)

Manages a pool of MATLAB engine instances with elastic scaling:

  • Startup: Initializes min_engines engines in parallel on server start
  • Acquisition: Returns an available engine immediately; if none available but under max_engines, starts a new one; if at capacity, waits for one to become available
  • State tracking: Each engine is marked busy or idle
  • Health checks: Periodic 1+1 eval to verify engines are responsive. Unhealthy engines are stopped and replaced
  • Scale-down: Engines idle longer than scale_down_idle_timeout (15 min) are stopped, down to min_engines
  • Queue: Maintains an async queue of available engines for FIFO acquire/release

Engine Wrapper (pool/engine.py)

Wraps a single matlab.engine instance with lifecycle and execution control:

  • State tracking: Maintains EngineState enum (STOPPED, STARTING, IDLE, BUSY)
  • Lifecycle: start() launches MATLAB, applies default_paths, runs startup_commands; stop() cleanly quits
  • Execution: execute() method supports sync/background code evaluation with optional stdout/stderr capture
  • Workspace reset: reset_workspace() clears all variables/functions, closes files, restores paths, and re-runs startup commands between jobs
  • Health checks: health_check() runs trivial eval to confirm responsiveness
  • Idle tracking: Monitors time since last transition to IDLE state

Job Executor (jobs/executor.py)

Orchestrates the full lifecycle of a MATLAB code execution request:

  1. Job creation: Creates a job record in JobTracker
  2. Engine acquisition: Acquires an engine from the pool
  3. Context injection: Injects job metadata (__mcp_job_id__, __mcp_temp_dir__, __mcp_session_id__) into MATLAB workspace
  4. Background execution: Launches code with engine.execute(code, background=True) and captures stdout/stderr
  5. Sync/async decision:
    • Waits up to sync_timeout seconds for completion
    • If done: returns result inline with status="completed"
    • If timeout: promotes to async background task (status="pending") and monitors completion asynchronously
  6. Error handling: Catches execution exceptions, marks job failed, releases engine
  7. Result building: Calls _build_result() to format output, variables, figures, and files
  8. Metrics collection: Records events (job_created, job_completed, job_failed) when collector is enabled

Job Tracker (jobs/tracker.py)

In-memory store for job metadata:

  • Create/get/list/cancel jobs with unique job_id
  • Store job state (PENDING, RUNNING, COMPLETED, FAILED, CANCELLED)
  • Prune completed jobs older than job_retention_seconds
  • Thread-safe operations using asyncio locks

Session Manager (session/manager.py)

Per-user session isolation with temporary directory management:

  • Session creation: Each session gets a unique ID and temporary directory under config.execution.temp_dir
  • Default session: stdio transport uses a single "default" session for backward compatibility
  • Session lookup: SSE transport uses context's session_id to route requests to appropriate session
  • Idle tracking: Sessions maintain last_active timestamp
  • Session expiration: cleanup_expired() removes sessions idle beyond session_timeout (skipping sessions with active jobs)
  • Resource cleanup: Destroys temp directories when sessions are removed
  • Max sessions: Enforces max_sessions limit to prevent unbounded growth

Security Validator (security/validator.py)

Pre-execution security checks:

  • Function blocklist: Scans code for blocked functions (system, unix, dos, !, eval, feval, evalc, evalin, assignin, perl, python). Smart scanning strips string literals and comments first to avoid false positives
  • Filename sanitization: Prevents path traversal in upload filenames via pathlib.Path.resolve()
  • Upload size limits: Enforces max_upload_size_mb on file uploads
  • Metrics recording: Records security check events when collector is enabled

Result Formatter (output/formatter.py)

Structures tool responses:

  • Text formatting: Truncates output longer than max_inline_text_length; saves full text to file if truncated and save_dir provided
  • Variable formatting: Summarizes workspace variables with name, type, size; includes value for small items, omits for large arrays
  • Success response: Builds structured dict with status, job_id, output, variables, figures, files, warnings, execution_time
  • Error response: Builds error dict with status, job_id, error_type, message, matlab_id (if available), execution_time

Plotly Converter (output/plotly_convert.py, output/plotly_style_mapper.py, matlab_helpers/mcp_extract_props.m)

Converts MATLAB figures to interactive Plotly JSON:

  1. MATLAB-side extraction: mcp_extract_props.m extracts raw figure properties (line objects, scatter plots, bar charts, histograms, surface plots, images) and saves as JSON
  2. Python-side style mapping: plotly_style_mapper.py converts MATLAB styles (line styles, markers, colormaps, fonts, colors) to Plotly equivalents
  3. WebGL optimization: Automatically uses WebGL rendering for large datasets (10,000+ points)
  4. Result composition: plotly_convert.py reads saved JSON, returns Plotly JSON + static PNG + optional thumbnail

Monitoring (monitoring/collector.py, monitoring/store.py)

Optional metrics collection and reporting (enabled via monitoring.enabled=true):

  • Event recording: MetricsCollector records system events (engine_scale_up, job_completed, job_failed, session_created, etc.)
  • Time-series storage: MetricsStore persists events with timestamps for historical analysis
  • Reporting: Metrics accessible via admin tools and logs

Data Flow

Sync Execution

Agent → execute_code("x = magic(3)")
  → Security validator.check_code() (OK)
  → JobExecutor.execute() creates job
  → Pool.acquire() gets engine
  → Inject job context into MATLAB workspace
  → Engine.execute(code, background=True) with stdout/stderr capture
  → Wait sync_timeout (default 30s)
  → Execution completes in <30s
  → ResultFormatter.build_success_response() structures result
  → Pool.release() resets workspace and returns engine
  → Return {status: "completed", job_id: "...", output: {...}, ...}

Async Promotion

Agent → execute_code("long_simulation()")
  → Security validator.check_code() (OK)
  → JobExecutor.execute() creates job
  → Pool.acquire() gets engine
  → Inject job context into MATLAB workspace
  → Engine.execute(code, background=True) starts background future
  → Wait sync_timeout (default 30s)
  → Timeout exceeded → promote to async
  → Create background task: JobExecutor._wait_for_completion()
  → Return {status: "pending", job_id: "abc123"}
  → Engine held by background task
  →
  → Agent calls get_job_status("abc123")
  → JobTracker returns {status: "running", progress: 45%}
  →
  → Agent calls get_job_status("abc123")
  → JobTracker returns {status: "running", progress: 90%}
  →
  → Agent calls get_job_result("abc123")
  → Background task completes, builds result
  → JobTracker.get_job() returns {status: "completed", output: {...}, ...}
  → Pool.release() resets workspace and returns engine

Transport Modes

stdio (Default)

  • One agent, one session
  • Communication via stdin/stdout
  • Uses fixed "default" session ID
  • Simplest setup, no network, suitable for local Claude/Cursor/Copilot integration

SSE (Server-Sent Events)

  • Multiple agents, multiple sessions
  • HTTP-based, supports remote connections
  • Session isolation via context's session_id from request
  • Each session gets its own temporary directory
  • Production: Put behind a reverse proxy with authentication (require_proxy_auth: true)
  • Security warning logged if SSE enabled without require_proxy_auth=true

Clone this wiki locally