<a href="https://colab.research.google.com/github/Starborn/A2A/blob/main/MCP_Server_Validator_V_1%2C1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# üîç MCP SERVER VALIDATOR *Read This!

**Version:** 1.1.0  
**Purpose:** Automated validation and quality assessment of Model Context Protocol (MCP) servers

---

## üìã About This Validator

The MCP Server Validator is a comprehensive tool that analyzes Python-based MCP servers for:

- ‚úÖ **Structural Integrity** - Proper SDK usage and server initialization
- üõ†Ô∏è **Tool Detection** - Identifies and validates all exposed tools
- üìù **Naming Conventions** - Enforces `service_action_resource` format
- ‚ö° **Async Best Practices** - Flags synchronous implementations
- üìä **Quality Scoring** - 0-100% assessment with pass/fail threshold

### Supported SDKs
- **FastMCP** (`@mcp.tool()` decorator pattern)
- **MCP-SDK** (`@server.call_tool()` decorator pattern)

### Scoring System
- **90-100%** - EXCELLENT ‚ú®
- **80-89%** - GOOD ‚úÖ
- **70-79%** - FAIR ‚ö†Ô∏è
- **Below 70%** - NEEDS WORK ‚ùå

**Pass Threshold:** 70% (configurable)

---

## üöÄ Release Notes

### v1.1.0 (Current Release)
**Added:**
- ‚úÖ Support for mcp-sdk servers (previously only fastmcp)
- ‚úÖ Dual SDK detection (fastmcp + mcp-sdk)
- ‚úÖ Enhanced tool extraction for both decorator patterns

**Fixed:**
- üîß Tool detection now works for `@server.call_tool()` pattern
- üîß Improved async/sync detection accuracy

**Known Issues:**
- ‚ö†Ô∏è **Strict Naming Convention** - Tools with simple names like `fetch`, `hello`, or `run` will fail validation even if contextually clear. The validator enforces `service_action_resource` format (e.g., `website_fetch_page`, `app_greet_user`). This is intentional for consistency but may feel overly restrictive for simple servers.
- ‚ö†Ô∏è TypeScript/JavaScript MCP servers not yet supported (Python only)
- ‚ö†Ô∏è Resource and prompt validation coming in future releases
- ‚ö†Ô∏è Custom decorators or non-standard patterns may not be detected

### v1.0.0 (Initial Release)
- Initial validator with fastmcp support
- Basic validation rules and scoring
- Error/warning/info classification system

---

## üö® Error Codes Reference

### CRITICAL Errors (Block Deployment)
| Code | Issue | Example | Fix |
|------|-------|---------|-----|
| C001 | Missing required imports | No `from mcp import Server` | Add SDK import |
| C002 | No server initialization | Missing `mcp = FastMCP()` | Initialize server object |
| C003 | Syntax errors | Invalid Python | Fix syntax |

### ERROR Level (Should Fix)
| Code | Issue | Example | Fix |
|------|-------|---------|-----|
| E001 | Tool naming convention | `fetch` instead of `website_fetch_page` | Rename: `service_action_resource` |
| E002 | Missing tool docstring | No description | Add descriptive docstring |
| E003 | Poor parameter typing | Untyped params | Add type hints |
| E004 | Hardcoded credentials | `api_key = "12345"` | Use environment variables |

### WARNING Level (Best Practice)
| Code | Issue | Example | Fix |
|------|-------|---------|-----|
| W001 | Synchronous tool | `def tool()` instead of `async def` | Convert to async |
| W002 | Missing error handling | No try/except | Add error handling |
| W003 | Long tool functions | >50 lines | Refactor/modularize |
| W004 | Unclear variable names | `x`, `tmp`, `data` | Use descriptive names |

### INFO Level (Suggestions)
| Code | Issue | Example | Suggestion |
|------|-------|---------|-----------|
| I001 | Could add logging | No logging statements | Consider adding logging |
| I002 | Could add validation | No input validation | Add parameter validation |
| I003 | Could add examples | No usage examples | Add example in docstring |

---

## üìñ Usage Instructions

### In Google Colab:
1. Run Cell 1 (imports and validator class)
2. Run Cell 2 (file upload and validation)
3. Review validation report
4. Fix issues and re-validate

### As Standalone Script:
```python
from mcp_validator import MCPServerValidator

validator = MCPServerValidator()
result = validator.validate_file("your_server.py")
print(result.format_report())
```

---

## üéØ Naming Convention Guide

**Required Format:** `service_action_resource`

### Good Examples ‚úÖ
- `github_list_repos` - Lists GitHub repositories
- `weather_fetch_forecast` - Fetches weather forecast
- `database_query_users` - Queries user database
- `slack_send_message` - Sends Slack message

### Bad Examples ‚ùå
- `fetch` - Too generic, no context
- `get_data` - Unclear what data
- `process` - No service or resource
- `hello` - Not descriptive

### Why This Matters
Consistent naming enables:
- Self-documenting code
- Easy discovery in large systems
- Clear understanding across teams
- Reduced naming conflicts

---

## ü§ù Contributing

Found a bug or have a suggestion?
- Use the validation reports to identify patterns
- Submit feedback via GitHub issues
- Contribute validation rules

---

## üìú License

Part of the MCP ecosystem. See individual repository for license details.

---

## üîó Related Resources

- [MCP Documentation](https://modelcontextprotocol.io)
- [FastMCP GitHub](https://github.com/jlowin/fastmcp)
- [MCP SDK Python](https://github.com/modelcontextprotocol/python-sdk)
- [W3C AI Knowledge Representation Community Group](https://www.w3.org/community/aikr/)

---

**Designed by:** Paola Di Maio, Independent Analyst & Research Lead, and Claude, First Officer of the Starfleet




# Cells 1 and 2 Just Below in this Notebook


In [18]:
# ============================================================
# CELL 1
# ============================================================

import ast, re
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any
from enum import Enum

class Severity(Enum):
    CRITICAL = "CRITICAL"
    ERROR = "ERROR"
    WARNING = "WARNING"
    INFO = "INFO"
    @property
    def weight(self):
        return {"CRITICAL": 4, "ERROR": 3, "WARNING": 2, "INFO": 1}[self.value]

@dataclass
class Location:
    file: str
    line: int
    def __str__(self): return f"{self.file}:{self.line}"

@dataclass
class Issue:
    rule_id: str
    rule_name: str
    severity: Severity
    category: str
    message: str
    location: Location
    suggestion: str = ""

@dataclass
class ToolDef:
    name: str
    function_name: str
    description: str
    parameters: List[Dict[str, Any]]
    return_type: Optional[str]
    is_async: bool
    has_docstring: bool
    location: Location
    annotations: Dict[str, Any] = field(default_factory=dict)

@dataclass
class ServerMeta:
    name: str = "unknown"
    version: str = "unknown"
    sdk_type: str = "unknown"
    tools: List[ToolDef] = field(default_factory=list)

@dataclass
class ValidationResult:
    file_path: str
    server: ServerMeta
    issues: List[Issue]

    @property
    def total_checks(self): return 5 + len(self.server.tools) * 4
    @property
    def critical_errors(self): return len([i for i in self.issues if i.severity == Severity.CRITICAL])
    @property
    def errors(self): return len([i for i in self.issues if i.severity == Severity.ERROR])
    @property
    def warnings(self): return len([i for i in self.issues if i.severity == Severity.WARNING])
    @property
    def score(self):
        if self.total_checks == 0: return 100.0
        penalty = sum(i.severity.weight * 5 for i in self.issues)
        return max(0, 100 * (1 - penalty / (self.total_checks * 10)))
    @property
    def compliance_level(self):
        s = self.score
        if s >= 90: return "EXCELLENT"
        if s >= 70: return "GOOD"
        if s >= 50: return "MODERATE"
        if s >= 25: return "POOR"
        return "CRITICAL"
    @property
    def passed(self): return self.critical_errors == 0 and self.errors == 0

class PythonAnalyzer:
    def __init__(self, file_path, source):
        self.file_path = file_path
        self.source = source
        self.tree = None

    def parse(self):
        try:
            self.tree = ast.parse(self.source)
            return True
        except SyntaxError:
            return False

    def analyze(self):
        if not self.tree: return ServerMeta()
        meta = ServerMeta()
        meta.sdk_type = self._detect_sdk()
        self._extract_server_info(meta)
        meta.tools = self._extract_tools()
        return meta

    def _detect_sdk(self):
        for node in ast.walk(self.tree):
            if isinstance(node, ast.Import):
                for alias in node.names:
                    if "fastmcp" in alias.name.lower(): return "fastmcp"
                    if "mcp" in alias.name.lower(): return "mcp-sdk"
            elif isinstance(node, ast.ImportFrom) and node.module:
                if "fastmcp" in node.module.lower(): return "fastmcp"
                if "mcp" in node.module.lower(): return "mcp-sdk"
        return "unknown"

    def _extract_server_info(self, meta):
        for node in ast.walk(self.tree):
            if isinstance(node, ast.Call):
                name = ""
                if isinstance(node.func, ast.Name): name = node.func.id
                elif isinstance(node.func, ast.Attribute): name = node.func.attr
                if name in ("FastMCP", "Server", "MCPServer"):
                    for kw in node.keywords:
                        if kw.arg == "name" and isinstance(kw.value, ast.Constant): meta.name = kw.value.value
                        elif kw.arg == "version" and isinstance(kw.value, ast.Constant): meta.version = kw.value.value
                    if node.args and isinstance(node.args[0], ast.Constant): meta.name = node.args[0].value

    def _extract_tools(self):
        tools = []
        for node in ast.walk(self.tree):
            if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
                # Check FastMCP pattern: @mcp.tool()
                tool = self._check_tool_decorator(node)
                if tool: tools.append(tool)
                # Check low-level pattern: @server.list_tools()
                lowlevel_tools = self._check_list_tools_decorator(node)
                tools.extend(lowlevel_tools)
        return tools

    def _check_list_tools_decorator(self, node):
        """Check for @server.list_tools() or @app.list_tools() pattern"""
        tools = []
        for dec in node.decorator_list:
            dec_name = self._get_decorator_name(dec)
            if dec_name.endswith("list_tools"):
                # Parse the function body for types.Tool() calls
                tools.extend(self._extract_tools_from_list_tools(node))
        return tools

    def _extract_tools_from_list_tools(self, node):
        """Extract Tool definitions from a list_tools handler"""
        tools = []
        for child in ast.walk(node):
            if isinstance(child, ast.Call):
                call_name = ""
                if isinstance(child.func, ast.Attribute):
                    call_name = child.func.attr
                elif isinstance(child.func, ast.Name):
                    call_name = child.func.id
                if call_name == "Tool":
                    tool = self._parse_types_tool(child, node.lineno)
                    if tool: tools.append(tool)
        return tools

    def _parse_types_tool(self, call_node, default_line):
        """Parse a types.Tool() call"""
        name = ""
        description = ""
        input_schema = {}
        for kw in call_node.keywords:
            if kw.arg == "name" and isinstance(kw.value, ast.Constant):
                name = kw.value.value
            elif kw.arg == "description" and isinstance(kw.value, ast.Constant):
                description = kw.value.value
            elif kw.arg == "inputSchema" and isinstance(kw.value, ast.Dict):
                input_schema = self._extract_input_schema(kw.value)
        if not name: return None
        # Extract parameters from inputSchema properties
        params = []
        if "properties" in input_schema:
            for pname, pinfo in input_schema.get("properties", {}).items():
                params.append({"name": pname, "type": pinfo.get("type", "any")})
        return ToolDef(
            name=name, function_name=name, description=description,
            parameters=params, return_type="list[types.ContentBlock]",
            is_async=True, has_docstring=bool(description),
            location=Location(self.file_path, call_node.lineno if hasattr(call_node, 'lineno') else default_line),
            annotations={}
        )

    def _extract_input_schema(self, dict_node):
        """Extract inputSchema dict from AST"""
        result = {}
        for k, v in zip(dict_node.keys, dict_node.values):
            if isinstance(k, ast.Constant):
                key = k.value
                if isinstance(v, ast.Constant):
                    result[key] = v.value
                elif isinstance(v, ast.Dict):
                    result[key] = self._extract_input_schema(v)
                elif isinstance(v, ast.List):
                    result[key] = [e.value for e in v.elts if isinstance(e, ast.Constant)]
        return result

    def _check_tool_decorator(self, node):
        for dec in node.decorator_list:
            dec_name = self._get_decorator_name(dec)
            if dec_name in ("tool", "mcp.tool", "server.tool"):
                return self._build_tool_def(node, dec)
        return None

    def _get_decorator_name(self, dec):
        if isinstance(dec, ast.Name): return dec.id
        elif isinstance(dec, ast.Attribute):
            parts = []
            n = dec
            while isinstance(n, ast.Attribute):
                parts.append(n.attr)
                n = n.value
            if isinstance(n, ast.Name): parts.append(n.id)
            return ".".join(reversed(parts))
        elif isinstance(dec, ast.Call): return self._get_decorator_name(dec.func)
        return ""

    def _build_tool_def(self, node, dec):
        tool_name = node.name
        description = ""
        annotations = {}
        if isinstance(dec, ast.Call):
            for kw in dec.keywords:
                if kw.arg == "name" and isinstance(kw.value, ast.Constant): tool_name = kw.value.value
                elif kw.arg == "description" and isinstance(kw.value, ast.Constant): description = kw.value.value
                elif kw.arg == "annotations" and isinstance(kw.value, ast.Dict):
                    for k, v in zip(kw.value.keys, kw.value.values):
                        if isinstance(k, ast.Constant) and isinstance(v, ast.Constant):
                            annotations[k.value] = v.value
        if not description: description = ast.get_docstring(node) or ""
        params = []
        for arg in node.args.args:
            if arg.arg == "self": continue
            p = {"name": arg.arg, "type": None}
            if arg.annotation and hasattr(ast, "unparse"): p["type"] = ast.unparse(arg.annotation)
            params.append(p)
        ret_type = ast.unparse(node.returns) if node.returns and hasattr(ast, "unparse") else None
        return ToolDef(name=tool_name, function_name=node.name, description=description, parameters=params,
                       return_type=ret_type, is_async=isinstance(node, ast.AsyncFunctionDef),
                       has_docstring=ast.get_docstring(node) is not None, location=Location(self.file_path, node.lineno),
                       annotations=annotations)

class RulesEngine:
    NAMING_PATTERN = re.compile(r'^[a-z]+_[a-z]+(_[a-z]+)?$')
    GENERIC_NAMES = {"get_data", "do_thing", "process", "handle", "run", "execute", "tool", "func"}
    SECRET_PATTERNS = [
        (r'api_key\s*=\s*["\'][^"\']{8,}["\']', "API key"),
        (r'password\s*=\s*["\'][^"\']+["\']', "password"),
        (r'token\s*=\s*["\'][^"\']{8,}["\']', "token"),
        (r'ghp_[a-zA-Z0-9]{36}', "GitHub token"),
        (r'sk-[a-zA-Z0-9]{40,}', "OpenAI API key"),
    ]
    DANGEROUS_PATTERNS = [
        (r'\beval\s*\(', "eval()"),
        (r'\bexec\s*\(', "exec()"),
        (r'subprocess\.[a-z]+\([^)]*shell\s*=\s*True', "subprocess with shell=True"),
    ]
    DESTRUCTIVE_VERBS = {"delete", "remove", "drop", "destroy", "clear", "purge"}

    def validate(self, meta, source, file_path):
        issues = []
        for tool in meta.tools:
            if not self.NAMING_PATTERN.match(tool.name):
                issues.append(Issue("MCP-NAME-001", "Tool Naming Convention", Severity.ERROR, "naming",
                    f"Tool '{tool.name}' doesn't follow service_action_resource format", tool.location,
                    "Rename to format: service_action_resource (e.g., github_list_repos)"))
            if tool.name.lower() in self.GENERIC_NAMES:
                issues.append(Issue("MCP-NAME-002", "Generic Tool Name", Severity.WARNING, "naming",
                    f"Tool '{tool.name}' is too generic", tool.location, "Use a descriptive name"))
            if not tool.description or len(tool.description.strip()) < 10:
                issues.append(Issue("MCP-NAME-003", "Missing Description", Severity.WARNING, "naming",
                    f"Tool '{tool.name}' lacks a meaningful description", tool.location, "Add a docstring"))
            untyped = [p["name"] for p in tool.parameters if not p.get("type")]
            if untyped:
                issues.append(Issue("MCP-SCHEMA-001", "Missing Type Hints", Severity.ERROR, "schema",
                    f"Tool '{tool.name}' has untyped parameters: {untyped}", tool.location, "Add type annotations"))
            if not tool.return_type:
                issues.append(Issue("MCP-SCHEMA-002", "Missing Return Type", Severity.WARNING, "schema",
                    f"Tool '{tool.name}' has no return type", tool.location, "Add return type"))
            if any(v in tool.name.lower() for v in self.DESTRUCTIVE_VERBS) and not tool.annotations.get("destructiveHint"):
                issues.append(Issue("MCP-BP-001", "Missing Destructive Hint", Severity.WARNING, "best_practice",
                    f"Destructive tool '{tool.name}' should have destructiveHint", tool.location,
                    "Add annotations={'destructiveHint': True}"))
            if not tool.is_async:
                issues.append(Issue("MCP-BP-002", "Sync Function", Severity.INFO, "best_practice",
                    f"Tool '{tool.name}' is synchronous", tool.location, "Consider using async"))
            if not tool.has_docstring:
                issues.append(Issue("MCP-BP-003", "Missing Docstring", Severity.INFO, "best_practice",
                    f"Tool '{tool.name}' has no docstring", tool.location, "Add a docstring"))
        lines = source.split('\n')
        for i, line in enumerate(lines, 1):
            if line.strip().startswith('#'): continue
            for pattern, secret_type in self.SECRET_PATTERNS:
                if re.search(pattern, line, re.IGNORECASE):
                    issues.append(Issue("MCP-SEC-001", "Hardcoded Secret", Severity.CRITICAL, "security",
                        f"Possible hardcoded {secret_type} detected", Location(file_path, i), "Use os.getenv()"))
            for pattern, func_name in self.DANGEROUS_PATTERNS:
                if re.search(pattern, line):
                    sev = Severity.CRITICAL if "shell=True" in pattern else Severity.WARNING
                    issues.append(Issue("MCP-SEC-002", "Dangerous Function", sev, "security",
                        f"Dangerous function: {func_name}", Location(file_path, i), "Use safer alternatives"))
        return issues

def validate_mcp_server(source_code, filename="server.py"):
    analyzer = PythonAnalyzer(filename, source_code)
    if not analyzer.parse():
        print(f"‚ùå Syntax error in {filename}")
        return None
    meta = analyzer.analyze()
    rules = RulesEngine()
    issues = rules.validate(meta, source_code, filename)
    return ValidationResult(filename, meta, issues)

def print_report(result):
    print("\n" + "="*60)
    print("   üîç MCP SERVER VALIDATOR v1.0.0")
    print("="*60)
    print(f"   üìÅ File:    {result.file_path}")
    print(f"   üì¶ Server:  {result.server.name} v{result.server.version}")
    print(f"   üîß SDK:     {result.server.sdk_type}")
    print(f"   üõ†Ô∏è  Tools:   {len(result.server.tools)}")
    print("-"*60)
    status = "‚úÖ PASSED" if result.passed else "‚ùå FAILED"
    print(f"   SCORE: {result.score:.0f}% ({result.compliance_level})")
    print(f"   Status: {status}")
    print(f"   Critical: {result.critical_errors} | Errors: {result.errors} | Warnings: {result.warnings}")
    print("-"*60)
    if result.issues:
        print("   ISSUES:\n")
        icons = {Severity.CRITICAL: "üî¥", Severity.ERROR: "‚ùå", Severity.WARNING: "‚ö†Ô∏è", Severity.INFO: "‚ÑπÔ∏è"}
        for sev in [Severity.CRITICAL, Severity.ERROR, Severity.WARNING, Severity.INFO]:
            sev_issues = [i for i in result.issues if i.severity == sev]
            if sev_issues:
                print(f"   [{sev.value}]")
                for issue in sev_issues:
                    print(f"     {icons[sev]} Line {issue.location.line}: {issue.message}")
                    if issue.suggestion: print(f"        üí° {issue.suggestion}")
                print()
    else:
        print("   ‚úÖ No issues found!")
    print("="*60)

def validate_uploaded_file():
    from google.colab import files
    print("üìÅ Upload your MCP server Python file...")
    uploaded = files.upload()
    for filename, content in uploaded.items():
        print(f"\nüîç Validating: {filename}")
        result = validate_mcp_server(content.decode('utf-8'), filename)
        if result: print_report(result)

print("‚úÖ MCP Validator ready. Run: validate_uploaded_file()")

‚úÖ MCP Validator ready. Run: validate_uploaded_file()


In [17]:
validate_uploaded_file()

üìÅ Upload your MCP server Python file...


Saving server[1].py to server[1] (1).py

üîç Validating: server[1] (1).py

   üîç MCP SERVER VALIDATOR v1.0.0
   üìÅ File:    server[1] (1).py
   üì¶ Server:  mcp-website-fetcher vunknown
   üîß SDK:     mcp-sdk
   üõ†Ô∏è  Tools:   1
------------------------------------------------------------
   SCORE: 83% (GOOD)
   Status: ‚ùå FAILED
------------------------------------------------------------
   ISSUES:

   [ERROR]
     ‚ùå Line 43: Tool 'fetch' doesn't follow service_action_resource format
        üí° Rename to format: service_action_resource (e.g., github_list_repos)

