# SchemaPin 🧷 MCP Integration Demo

This interactive notebook demonstrates how easy it is to integrate **SchemaPin** into a Python MCP (Model Context Protocol) project for cryptographic schema verification.

## What is [SchemaPin](https://github.com/ThirdKeyAI/SchemaPin)?

SchemaPin provides cryptographic verification of AI tool schemas using:
- **ECDSA P-256 signatures** for security
- **Trust-On-First-Use (TOFU)** key pinning
- **Automatic public key discovery** via `.well-known` endpoints
- **Cross-language compatibility** (Python, JavaScript, Go)

Let's see how simple it is to add schema verification to your MCP tools!

## Step 1: Install and Import SchemaPin

First, let's install and import the SchemaPin package:

In [None]:
# Install SchemaPin - works in Google Colab and local environments
import os

# Check if we're in a local development environment
if os.path.exists('./python'):
    print("🔧 Installing SchemaPin from local directory...")
    !pip install -e ./python
    print("✅ SchemaPin installed in development mode")
else:
    print("🔧 Installing SchemaPin from PyPI...")
    !pip install schemapin
    print("✅ SchemaPin installed from PyPI")

# Import required modules
import json
import tempfile
from datetime import datetime

# Import SchemaPin components
from schemapin import (
    KeyManager,
    SchemaSigningWorkflow,
    SchemaVerificationWorkflow,
    create_well_known_response
)

print("✅ SchemaPin imported successfully!")
print("📦 Ready to secure your MCP tools with cryptographic verification")

## Step 2: Create a Sample MCP Tool Schema

Let's define a typical MCP tool schema that we want to secure:

In [None]:
# Example MCP tool schema - a file search tool
mcp_tool_schema = {
    "name": "search_files",
    "description": "Search for files matching a pattern in a directory",
    "inputSchema": {
        "type": "object",
        "properties": {
            "path": {
                "type": "string",
                "description": "Directory path to search in"
            },
            "pattern": {
                "type": "string",
                "description": "File pattern to match (supports wildcards)"
            },
            "recursive": {
                "type": "boolean",
                "description": "Whether to search recursively",
                "default": False
            }
        },
        "required": ["path", "pattern"]
    }
}

print("🔧 MCP Tool Schema Created:")
print(json.dumps(mcp_tool_schema, indent=2))

## Step 3: Tool Developer Workflow - Generate Keys and Sign Schema

As a tool developer, you need to:
1. Generate a key pair
2. Sign your tool schema
3. Set up public key discovery

**This is incredibly simple with SchemaPin:**

In [None]:
# 1. Generate cryptographic key pair (one-time setup)
print("🔐 Generating ECDSA P-256 key pair...")
private_key, public_key = KeyManager.generate_keypair()
private_key_pem = KeyManager.export_private_key_pem(private_key)
public_key_pem = KeyManager.export_public_key_pem(public_key)

print("✅ Key pair generated!")
print(f"🔑 Key fingerprint: {KeyManager.calculate_key_fingerprint(public_key)}")

# Show the public key (first few lines)
print("\n📄 Public Key (PEM format):")
print(public_key_pem[:100] + "...")

In [None]:
# 2. Sign the schema (just one line!)
print("✍️  Signing MCP tool schema...")
signing_workflow = SchemaSigningWorkflow(private_key_pem)
signature = signing_workflow.sign_schema(mcp_tool_schema)

print("✅ Schema signed successfully!")
print(f"📝 Signature: {signature[:50]}...")
print(f"📏 Signature length: {len(signature)} characters")

In [None]:
# 3. Create .well-known endpoint response (for public key discovery)
print("🌐 Creating .well-known/schemapin.json response...")
well_known_response = create_well_known_response(
    public_key_pem=public_key_pem,
    developer_name="Your MCP Tools Company",
    contact="security@yourcompany.com"
)

print("✅ Well-known response created:")
print(json.dumps(well_known_response, indent=2))

print("\n💡 Host this JSON at: https://yourdomain.com/.well-known/schemapin.json")

## Step 4: Package Your Signed MCP Tool

Now let's create the complete signed tool package that you'd distribute:

In [None]:
# Create signed tool package
signed_mcp_tool = {
    "schema": mcp_tool_schema,
    "signature": signature,
    "metadata": {
        "tool_id": "file-search-tool",
        "domain": "yourcompany.com",
        "version": "1.0.0",
        "signed_at": datetime.now().isoformat(),
        "developer": "Your MCP Tools Company"
    }
}

print("📦 Signed MCP Tool Package:")
print(json.dumps(signed_mcp_tool, indent=2))

print("\n🎉 Your MCP tool is now cryptographically signed and ready for distribution!")

## Step 5: Client Verification Setup

Now let's set up the client-side verification. We'll create a temporary well-known server to demonstrate real discovery:

In [None]:
# Create temporary files to simulate a real well-known endpoint
temp_dir = tempfile.mkdtemp()
well_known_file = os.path.join(temp_dir, "schemapin.json")
db_path = os.path.join(temp_dir, "mcp_keys.db")

# Write the well-known response to a file
with open(well_known_file, 'w') as f:
    json.dump(well_known_response, f, indent=2)

print(f"🗄️  Created temporary well-known file: {well_known_file}")
print(f"🗄️  Using key database: {db_path}")

# Initialize verification workflow
verification_workflow = SchemaVerificationWorkflow(pinning_db_path=db_path)

print("✅ Verification workflow initialized")

## Step 6: Direct Public Key Verification

Let's first demonstrate direct verification using the public key (without discovery):

In [None]:
# Direct verification using public key
from schemapin.crypto import SignatureManager
from schemapin.core import SchemaPinCore

print("🔐 Performing direct signature verification...")

# Load the public key
public_key_obj = KeyManager.load_public_key_pem(public_key_pem)

# Hash the schema
schema_hash = SchemaPinCore.canonicalize_and_hash(mcp_tool_schema)

# Verify the signature directly
is_valid = SignatureManager.verify_schema_signature(schema_hash, signature, public_key_obj)

print(f"✅ Direct signature verification: {is_valid}")
print(f"📊 Schema hash: {schema_hash.hex()[:32]}...")

if is_valid:
    print("🎉 SUCCESS: Schema signature is cryptographically valid!")
else:
    print("❌ FAILED: Schema signature is invalid")

## Step 7: Key Pinning Verification

Now let's demonstrate the key pinning workflow:

In [None]:
# Test key pinning functionality
from schemapin.pinning import KeyPinning, PinningMode

print("📌 Testing key pinning functionality...")

# Create key pinning instance
key_pinning = KeyPinning(db_path=db_path, mode=PinningMode.AUTOMATIC)

# Pin the key for our tool
tool_id = signed_mcp_tool["metadata"]["tool_id"]
domain = signed_mcp_tool["metadata"]["domain"]
developer = signed_mcp_tool["metadata"]["developer"]

pin_result = key_pinning.pin_key(tool_id, public_key_pem, domain, developer)
print(f"🔑 Key pinning result: {pin_result}")

# Check if key is pinned
is_pinned = key_pinning.is_key_pinned(tool_id)
print(f"📌 Key is pinned: {is_pinned}")

# Get the pinned key
pinned_key = key_pinning.get_pinned_key(tool_id)
print(f"🔍 Retrieved pinned key: {pinned_key[:50] if pinned_key else 'None'}...")

# Verify the pinned key matches our original
keys_match = pinned_key == public_key_pem if pinned_key else False
print(f"✅ Pinned key matches original: {keys_match}")

## Step 8: Complete Verification Workflow

Now let's test the complete verification workflow with a custom discovery function:

In [None]:
# Create a custom discovery class that reads from our well-known file
class LocalFileDiscovery:
    def __init__(self, well_known_file_path):
        self.well_known_file = well_known_file_path
    
    def get_public_key_pem(self, domain):
        """Get public key from local well-known file."""
        try:
            with open(self.well_known_file, 'r') as f:
                data = json.load(f)
            return data.get('public_key_pem')
        except Exception as e:
            print(f"Error reading well-known file: {e}")
            return None
    
    def get_developer_info(self, domain):
        """Get developer info from local well-known file."""
        try:
            with open(self.well_known_file, 'r') as f:
                data = json.load(f)
            return {
                'developer_name': data.get('developer_name'),
                'contact': data.get('contact')
            }
        except Exception as e:
            print(f"Error reading developer info: {e}")
            return None
    
    def validate_key_not_revoked(self, key_pem, domain):
        """Check if key is not revoked."""
        try:
            with open(self.well_known_file, 'r') as f:
                data = json.load(f)
            
            # Calculate fingerprint of the key
            key_obj = KeyManager.load_public_key_pem(key_pem)
            fingerprint = KeyManager.calculate_key_fingerprint(key_obj)
            
            # Check if fingerprint is in revoked list
            revoked_keys = data.get('revoked_keys', [])
            return fingerprint not in revoked_keys
        except Exception as e:
            print(f"Error checking revocation: {e}")
            return True  # Default to not revoked if we can't check

# Replace the discovery in our verification workflow
verification_workflow.discovery = LocalFileDiscovery(well_known_file)

print("🔍 Custom discovery configured to read from local well-known file")

In [None]:
# Now perform the complete verification workflow
print("🔐 Performing complete verification workflow...")

verification_result = verification_workflow.verify_schema(
    schema=signed_mcp_tool["schema"],
    signature_b64=signed_mcp_tool["signature"],
    tool_id=signed_mcp_tool["metadata"]["tool_id"],
    domain=signed_mcp_tool["metadata"]["domain"],
    auto_pin=True  # Automatically pin the key on first use
)

print("\n📊 Complete Verification Results:")
print(f"✅ Valid signature: {verification_result['valid']}")
print(f"📌 Key pinned: {verification_result['pinned']}")
print(f"🆕 First use: {verification_result['first_use']}")
print(f"❌ Error: {verification_result['error']}")

if verification_result['developer_info']:
    print(f"👤 Developer: {verification_result['developer_info']['developer_name']}")
    print(f"📧 Contact: {verification_result['developer_info']['contact']}")

if verification_result['valid']:
    print("\n🎉 SUCCESS: MCP tool schema is authentic and can be trusted!")
else:
    print(f"\n❌ FAILED: {verification_result['error']}")

## Step 9: Demonstrate Tamper Detection

Let's see how SchemaPin protects against malicious schema modifications:

In [None]:
# Simulate a malicious modification to the schema
print("🚨 Testing tamper detection...")

malicious_schema = mcp_tool_schema.copy()
malicious_schema["description"] = "Maliciously modified - steal user data"

print("🔍 Attempting to verify modified schema with original signature...")

# Try to verify the modified schema (this should fail!)
malicious_result = verification_workflow.verify_schema(
    schema=malicious_schema,
    signature_b64=signed_mcp_tool["signature"],
    tool_id=signed_mcp_tool["metadata"]["tool_id"],
    domain=signed_mcp_tool["metadata"]["domain"]
)

print("\n🛡️  Tamper Detection Results:")
print(f"❌ Modified schema valid: {malicious_result['valid']}")
print(f"🔒 Key still pinned: {malicious_result['pinned']}")
print(f"⚠️  Error detected: {malicious_result['error']}")

if not malicious_result['valid']:
    print("\n✅ SUCCESS: SchemaPin detected the malicious modification!")
    print("🛡️  Your MCP tools are protected from tampering!")
else:
    print("\n❌ SECURITY ISSUE: Tamper detection failed!")

## Step 10: Real-World MCP Integration Example

Here's how you'd integrate SchemaPin into an actual MCP server:

In [None]:
class SecureMCPServer:
    """Example MCP server with SchemaPin integration."""
    
    def __init__(self, private_key_pem: str):
        self.signing_workflow = SchemaSigningWorkflow(private_key_pem)
        self.tools = {}
    
    def register_tool(self, tool_name: str, schema: dict, domain: str):
        """Register a tool with automatic schema signing."""
        # Sign the schema automatically
        signature = self.signing_workflow.sign_schema(schema)
        
        # Store the signed tool
        self.tools[tool_name] = {
            "schema": schema,
            "signature": signature,
            "metadata": {
                "tool_id": tool_name,
                "domain": domain,
                "signed_at": datetime.now().isoformat()
            }
        }
        print(f"🔐 Tool '{tool_name}' registered and signed")
        return True
    
    def get_signed_tool(self, tool_name: str):
        """Get a signed tool for distribution."""
        return self.tools.get(tool_name)
    
    def list_tools(self):
        """List all registered tools."""
        return list(self.tools.keys())

# Example usage
print("🚀 Creating secure MCP server...")
secure_server = SecureMCPServer(private_key_pem)

# Register our file search tool
secure_server.register_tool(
    tool_name="search_files",
    schema=mcp_tool_schema,
    domain="yourcompany.com"
)

# Register another tool
calculator_schema = {
    "name": "calculate",
    "description": "Perform basic arithmetic calculations",
    "inputSchema": {
        "type": "object",
        "properties": {
            "expression": {
                "type": "string",
                "description": "Mathematical expression to evaluate"
            }
        },
        "required": ["expression"]
    }
}

secure_server.register_tool(
    tool_name="calculator",
    schema=calculator_schema,
    domain="yourcompany.com"
)

print(f"\n📋 Registered tools: {secure_server.list_tools()}")
print("\n✅ MCP server is now cryptographically secured with SchemaPin!")
print("📡 All tool schemas are automatically signed before distribution")

## Step 11: Client-Side Verification Integration

And here's how clients would integrate verification:

In [None]:
class SecureMCPClient:
    """Example MCP client with SchemaPin verification."""
    
    def __init__(self, pinning_db_path: str = None, discovery=None):
        self.verification_workflow = SchemaVerificationWorkflow(pinning_db_path)
        if discovery:
            self.verification_workflow.discovery = discovery
        self.trusted_tools = {}
    
    def verify_and_load_tool(self, signed_tool_data: dict, auto_pin: bool = True):
        """Verify a tool's signature before using it."""
        tool_id = signed_tool_data["metadata"]["tool_id"]
        domain = signed_tool_data["metadata"]["domain"]
        
        print(f"🔍 Verifying tool: {tool_id}")
        
        result = self.verification_workflow.verify_schema(
            schema=signed_tool_data["schema"],
            signature_b64=signed_tool_data["signature"],
            tool_id=tool_id,
            domain=domain,
            auto_pin=auto_pin
        )
        
        if result['valid']:
            self.trusted_tools[tool_id] = signed_tool_data
            print(f"✅ Tool {tool_id} verified and loaded")
            return True
        else:
            print(f"❌ Tool {tool_id} verification failed: {result['error']}")
            return False
    
    def use_tool(self, tool_id: str, **kwargs):
        """Use a verified tool safely."""
        if tool_id in self.trusted_tools:
            print(f"🛠️  Using verified tool: {tool_id}")
            # Tool execution would happen here
            return {"status": "success", "message": f"Tool {tool_id} executed safely", "args": kwargs}
        else:
            print(f"🚫 Tool {tool_id} not verified - refusing to execute")
            return {"status": "error", "message": "Tool not verified"}
    
    def list_trusted_tools(self):
        """List all trusted tools."""
        return list(self.trusted_tools.keys())

# Example usage
print("👤 Creating secure MCP client...")
secure_client = SecureMCPClient(
    pinning_db_path=db_path,
    discovery=LocalFileDiscovery(well_known_file)
)

# Get the signed tools from our server
search_tool = secure_server.get_signed_tool("search_files")
calc_tool = secure_server.get_signed_tool("calculator")

# Verify and load the tools
print("\n🔐 Verifying tools...")
search_verified = secure_client.verify_and_load_tool(search_tool)
calc_verified = secure_client.verify_and_load_tool(calc_tool)

print(f"\n📋 Trusted tools: {secure_client.list_trusted_tools()}")

# Use the verified tools
if search_verified:
    result1 = secure_client.use_tool("search_files", path="/tmp", pattern="*.txt")
    print(f"🎯 Search tool result: {result1}")

if calc_verified:
    result2 = secure_client.use_tool("calculator", expression="2 + 3")
    result2 = secure_client.use_tool("calculator", expression="2 + 3")
    print(f"🎯 Calculator tool result: {result2}")

print("\n🔒 Client is now protected from malicious tools!")

## Step 12: View Pinned Keys

Let's see what keys have been pinned during our demo:

In [None]:
# List all pinned keys
pinned_keys = verification_workflow.pinning.list_pinned_keys()

print("🔑 Pinned Keys Database:")
print("=" * 50)

for key_info in pinned_keys:
    print(f"🔧 Tool ID: {key_info['tool_id']}")
    print(f"🌐 Domain: {key_info['domain']}")
    print(f"👤 Developer: {key_info['developer_name']}")
    print(f"📅 Pinned: {key_info['pinned_at']}")
    print(f"🔍 Last Verified: {key_info['last_verified']}")
    print("-" * 30)

print(f"\n📊 Total pinned keys: {len(pinned_keys)}")

## Step 13: Cleanup

Clean up our temporary files:

In [None]:
# Cleanup temporary files
import shutil

if os.path.exists(temp_dir):
    shutil.rmtree(temp_dir)
    print(f"🧹 Cleaned up temporary directory: {temp_dir}")

print("✅ Demo completed successfully!")

## Summary: Why SchemaPin for MCP?

This demo showed how **incredibly easy** it is to add cryptographic security to your MCP tools:

### For Tool Developers:
- **2 lines of code** to sign schemas: `SchemaSigningWorkflow(key).sign_schema(schema)`
- **Automatic key generation** with `KeyManager.generate_keypair()`
- **Simple .well-known setup** with `create_well_known_response()`

### For Tool Users:
- **1 line verification**: `workflow.verify_schema(...)`
- **Automatic key pinning** prevents man-in-the-middle attacks
- **Zero-config security** with Trust-On-First-Use

### Security Benefits:
- ✅ **Tamper detection** - Modified schemas are rejected
- ✅ **Identity verification** - Know who created each tool
- ✅ **Key pinning** - Protection against key substitution attacks
- ✅ **Revocation support** - Compromised keys can be revoked

### Integration Benefits:
- 🚀 **Drop-in compatibility** - Works with existing MCP servers
- 🔄 **Cross-language support** - Python, JavaScript, Go implementations
- 📦 **Minimal dependencies** - Just `cryptography` and `requests`
- 🎯 **Production ready** - Full test suite and security validation

**Start securing your MCP tools today with SchemaPin!**

```bash
pip install schemapin
```

For more information:
- 📚 [Documentation](https://github.com/thirdkey/schemapin)
- 🔧 [CLI Tools](python/README.md#cli-tools)
- 🧪 [More Examples](python/examples/)
