# Secure MCP Server Deployment

Deploy custom MCP server as Docker container behind Azure APIM with OAuth2 authentication.

> ⚠️ **Educational Purpose Only**
> 
> This notebook demonstrates basic security concepts for MCP deployments. For production-grade security best practices, patterns, and guidance, see the official Microsoft documentation:
> 
> **[MCP for Beginners - Security Best Practices](https://github.com/microsoft/mcp-for-beginners/tree/main/02-Security)**
> **[Secure access to MCP servers in API Management](https://learn.microsoft.com/en-us/azure/api-management/secure-mcp-servers)**

In [None]:
!pip install -r requirements.txt

In [None]:
import os, subprocess, sys, time, tempfile, json, re
from pathlib import Path
from textwrap import dedent
from dotenv import load_dotenv

subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "python-dotenv", "fastmcp", "azure-ai-projects==1.0.0", "azure-ai-agents==1.2.0b4", "azure-identity"])
load_dotenv('.env')
print("✓ Dependencies installed")

## 1. Configure Deployment Settings
Set resource names and location.

In [3]:
RESOURCE_GROUP = os.getenv("RESOURCE_GROUP", "mcp-rg")
LOCATION = os.getenv("LOCATION", "eastus")
APP_NAME = "qms-mcp-server"
APIM_NAME = f"qms-apim-{int(time.time())}"

print(f"Resource Group: {RESOURCE_GROUP}")
print(f"Location: {LOCATION}")
print(f"App Name: {APP_NAME}")
print(f"APIM Name: {APIM_NAME}")

# Create resource group
subprocess.run(["az", "group", "create", "--name", RESOURCE_GROUP, "--location", LOCATION], 
               capture_output=True, check=True)
print("✓ Resource group ready")

Resource Group: mcp-rg
Location: eastus
App Name: qms-mcp-server
APIM Name: qms-apim-1759856715
✓ Resource group ready
✓ Resource group ready


## 2. Build MCP Server Code
Quality Management System with FastMCP.

In [4]:
mcp_server_code = dedent("""
from fastmcp import FastMCP
from datetime import datetime

class QualityManagementSystem:
    def __init__(self):
        self.defects = []
        self.defect_counter = 1000
    
    def log_defect(self, description: str, severity: str = "medium"):
        defect_id = f"DEF-{self.defect_counter}"
        self.defect_counter += 1
        defect = {
            "defect_id": defect_id,
            "timestamp": datetime.now().isoformat(),
            "description": description,
            "severity": severity,
            "status": "open"
        }
        self.defects.append(defect)
        return defect
    
    def get_stats(self):
        return {
            "total": len(self.defects),
            "open": len([d for d in self.defects if d["status"] == "open"])
        }

qms = QualityManagementSystem()
mcp = FastMCP(name="quality-control")

@mcp.tool()
def log_defect(description: str, severity: str = "medium") -> dict:
    if severity not in ["low", "medium", "high"]:
        return {"success": False, "error": "Invalid severity"}
    defect = qms.log_defect(description, severity)
    return {
        "success": True,
        "defect_id": defect["defect_id"],
        "message": f"Defect {defect['defect_id']} logged"
    }

@mcp.tool()
def get_defect_summary() -> dict:
    stats = qms.get_stats()
    return {
        "statistics": stats,
        "recent": qms.defects[-5:],
        "message": f"{stats['total']} defects, {stats['open']} open"
    }

if __name__ == '__main__':
    mcp.run(transport="http", port=8080, host="0.0.0.0")
""")

print("✓ MCP server code prepared")

✓ MCP server code prepared


## 3. Deploy Container to Azure Container Apps
In-memory Docker build and deploy.

In [None]:
with tempfile.TemporaryDirectory() as tmpdir:
    # Write application files
    app_path = Path(tmpdir) / "main.py"
    req_path = Path(tmpdir) / "requirements.txt"
    dockerfile_path = Path(tmpdir) / "Dockerfile"
    
    app_path.write_text(mcp_server_code)
    req_path.write_text("fastmcp\n")
    dockerfile_path.write_text(dedent("""
        FROM python:3.10-slim
        WORKDIR /app
        COPY . /app
        RUN pip install --no-cache-dir -r requirements.txt
        EXPOSE 8080
        CMD ["python", "main.py"]
    """))
    
    print("✓ Build context created")
    
    # Deploy to Azure Container Apps
    print("Deploying container app (this may take a few minutes)...")
    try:
        subprocess.run([
            "az", "containerapp", "up",
            "--name", APP_NAME,
            "--resource-group", RESOURCE_GROUP,
            "--location", LOCATION,
            "--ingress", "external",
            "--target-port", "8080",
            "--source", tmpdir
        ], check=True)
        
        # Query the deployed container app to get FQDN
        result = subprocess.run([
            "az", "containerapp", "show",
            "--name", APP_NAME,
            "--resource-group", RESOURCE_GROUP,
            "--query", "properties.configuration.ingress.fqdn",
            "-o", "tsv"
        ], capture_output=True, text=True, check=True)
        
        CONTAINER_URL = result.stdout.strip()
        
        if not CONTAINER_URL:
            raise ValueError("Failed to retrieve container URL")
        
        print(f"✓ Container deployed: https://{CONTAINER_URL}")
    except subprocess.CalledProcessError as e:
        print("✗ Failed to deploy container app.")
        if hasattr(e, 'stderr') and e.stderr:
            print("STDERR:\n", e.stderr)
        raise

## 4. Deploy Azure API Management
Create APIM instance with OAuth2 validation.

In [None]:
print("Checking for existing APIM instance...")

# Check if resource group exists and has APIM instances
try:
    result = subprocess.run([
        "az", "apim", "list",
        "--resource-group", RESOURCE_GROUP,
        "--query", "[0].name",
        "-o", "tsv"
    ], capture_output=True, text=True, check=False)
    
    existing_apim = result.stdout.strip()
    
    if existing_apim and result.returncode == 0:
        # Reuse existing APIM
        APIM_NAME = existing_apim
        print(f"✓ Found existing APIM instance: {APIM_NAME}")
    else:
        # Create new APIM instance
        print(f"No existing APIM found. Creating new instance: {APIM_NAME}")
        print("(This takes ~30 minutes)...")
        
        subprocess.run([
            "az", "apim", "create",
            "--resource-group", RESOURCE_GROUP,
            "--name", APIM_NAME,
            "--location", LOCATION,
            "--publisher-name", "MCP",
            "--publisher-email", "admin@mcp.local",
            "--sku-name", "Developer"
        ], check=True, capture_output=True)
        
        print(f"✓ APIM instance created: {APIM_NAME}")
    
    # Wait for APIM to be ready (whether existing or new)
    subprocess.run([
        "az", "apim", "wait", "--created",
        "--name", APIM_NAME,
        "--resource-group", RESOURCE_GROUP,
        "--timeout", "3600"
    ], check=True, capture_output=True)
    
    # Get APIM public IP
    result = subprocess.run([
        "az", "apim", "show",
        "--resource-group", RESOURCE_GROUP,
        "--name", APIM_NAME,
        "--query", "publicIpAddresses[0]",
        "-o", "tsv"
    ], capture_output=True, text=True, check=True)
    
    APIM_PUBLIC_IP = result.stdout.strip()
    print(f"✓ APIM ready. Public IP: {APIM_PUBLIC_IP}")

except subprocess.CalledProcessError as e:
    print(f"✗ Error working with APIM: {e}")
    if e.stderr:
        print(f"STDERR: {e.stderr}")
    raise

## 5. Restrict Container Access to APIM
Allow only APIM to call the container.

In [None]:
subprocess.run([
    "az", "containerapp", "ingress", "access-restriction", "set",
    "--name", APP_NAME,
    "--resource-group", RESOURCE_GROUP,
    "--rule-name", "allow-apim",
    "--ip-address", f"{APIM_PUBLIC_IP}/32",
    "--action", "Allow"
], check=True, capture_output=True)

print(f"✓ Container locked to APIM IP: {APIM_PUBLIC_IP}")

## 6. Configure MCP API in APIM
Create API with MCP endpoints.

In [8]:
API_ID = "qms-mcp-api"
BACKEND_URL = f"https://{CONTAINER_URL}"

# Create API
subprocess.run([
    "az", "apim", "api", "create",
    "--resource-group", RESOURCE_GROUP,
    "--service-name", APIM_NAME,
    "--api-id", API_ID,
    "--display-name", "QMS MCP API",
    "--path", "mcp",
    "--protocols", "https",
    "--service-url", BACKEND_URL,
    "--subscription-required", "false"
], check=True, capture_output=True)

# Create MCP operations
for method, url, op_id, display in [
    ("GET", "/", "mcp-get", "MCP GET Endpoint"),
    ("POST", "/", "mcp-post", "MCP POST Endpoint"),
    ("GET", "/sse", "mcp-sse", "MCP SSE Endpoint"),
    ("POST", "/message", "mcp-message", "MCP Message Endpoint")
]:
    subprocess.run([
        "az", "apim", "api", "operation", "create",
        "--resource-group", RESOURCE_GROUP,
        "--service-name", APIM_NAME,
        "--api-id", API_ID,
        "--operation-id", op_id,
        "--display-name", display,
        "--method", method,
        "--url-template", url
    ], check=True, capture_output=True)

print(f"✓ API configured with MCP endpoints")

✓ API configured with MCP endpoints


## 7. Configure OAuth2 Validation Policy
Validate Microsoft Entra ID tokens.

### Optional: Register Azure AD App for OAuth2

If you want to enable OAuth2 authentication, you need to register an Azure AD app and add the client ID to your `.env` file. This cell is commented out by default.

In [9]:
TENANT_ID = subprocess.run([
    "az", "account", "show", "--query", "tenantId", "-o", "tsv"
], capture_output=True, text=True, check=True).stdout.strip()

CLIENT_ID = os.getenv("AZURE_CLIENT_ID", "")

if CLIENT_ID and CLIENT_ID != "YOUR_CLIENT_ID":
    # tools/list remains open for discovery
    policy_xml = f"""<policies>
    <inbound>
        <base />
        <choose>
            <when condition="@(context.Request.Body.As&lt;JObject&gt;(preserveContent: true)[&quot;method&quot;]?.ToString() != &quot;tools/list&quot;)">
                <!-- Require OAuth2 for all endpoints except tools/list -->
                <validate-azure-ad-token tenant-id="{TENANT_ID}">
                    <audiences>
                        <audience>https://management.azure.com</audience>
                    </audiences>
                </validate-azure-ad-token>
                <set-header name="Authorization" exists-action="override">
                    <value>@(context.Request.Headers.GetValueOrDefault("Authorization"))</value>
                </set-header>
            </when>
        </choose>
        <set-backend-service base-url="https://{CONTAINER_URL}/mcp" />
    </inbound>
    <backend>
        <base />
    </backend>
    <outbound>
        <base />
    </outbound>
    <on-error>
        <base />
    </on-error>
</policies>"""
    
    try:
        sub_id = subprocess.run([
            "az", "account", "show", "--query", "id", "-o", "tsv"
        ], capture_output=True, text=True, check=True).stdout.strip()
        
        subprocess.run([
            "az", "rest",
            "--method", "PUT",
            "--url", f"https://management.azure.com/subscriptions/{sub_id}/resourceGroups/{RESOURCE_GROUP}/providers/Microsoft.ApiManagement/service/{APIM_NAME}/apis/{API_ID}/policies/policy?api-version=2021-08-01",
            "--body", json.dumps({
                "properties": {
                    "value": policy_xml,
                    "format": "xml"
                }
            }),
            "--headers", "Content-Type=application/json"
        ], check=True, capture_output=True, text=True)
        
        print("✓ OAuth2 validation configured")
        print("  - tools/list: Open for discovery")
        print("  - tools/call: Protected with OAuth2 (tenant validation)")
        print(f"  - Allowed tenant: {TENANT_ID}")
        print(f"  - Audience: https://management.azure.com")
    except subprocess.CalledProcessError as e:
        print("✗ Failed to configure OAuth2 policy")
        print("STDOUT:", e.stdout)
        print("STDERR:", e.stderr)
        print("\nNote: OAuth2 configuration failed. API is accessible without authentication.")
        # Don't raise - allow notebook to continue
else:
    print("⚠️  Skipping OAuth2 validation (no AZURE_CLIENT_ID in .env)")
    print("   API is accessible without authentication for testing")
    print("\n   For production, add AZURE_CLIENT_ID to .env to enable OAuth2")

✓ OAuth2 validation configured
  - tools/list: Open for discovery
  - tools/call: Protected with OAuth2 (tenant validation)
  - Allowed tenant: 8ee20a11-fd47-4c84-bb3b-264bf8aecf45
  - Audience: https://management.azure.com


## 8. Get APIM Gateway URL

In [None]:
result = subprocess.run([
    "az", "apim", "show",
    "--resource-group", RESOURCE_GROUP,
    "--name", APIM_NAME,
    "--query", "gatewayUrl",
    "-o", "tsv"
], capture_output=True, text=True, check=True)

APIM_GATEWAY_URL = result.stdout.strip().rstrip('/')
MCP_ENDPOINT = f"{APIM_GATEWAY_URL}/mcp/"

print(f"✓ MCP Endpoint: {MCP_ENDPOINT}")

## 9. Connect Azure AI Agent with OAuth2
Agent uses Azure credential for authentication.

In [None]:
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from azure.ai.projects import AIProjectClient
from azure.ai.agents.models import McpTool, RequiredMcpToolCall, SubmitToolApprovalAction, ToolApproval, MCPToolResource

credential = DefaultAzureCredential()

# Get OAuth token for agent - tools/call will use this
token_provider = get_bearer_token_provider(credential, "https://management.azure.com/.default")
access_token = token_provider()

project_client = AIProjectClient(
    endpoint=os.getenv("PROJECT_ENDPOINT"),
    credential=credential
)

print(f"MCP Endpoint: {MCP_ENDPOINT}")

mcp_tool = McpTool(
    server_label="qms_mcp",
    server_url=MCP_ENDPOINT,
    allowed_tools=["log_defect", "get_defect_summary"]
)

agent = project_client.agents.create_agent(
    model=os.getenv("MODEL_DEPLOYMENT_NAME"),
    name="secure-qms-agent",
    instructions="Help operators log defects using secure MCP server.",
    tools=mcp_tool.definitions
)

print(f"✓ Agent {agent.id} created with secure MCP connection")

## 10. Test Secure MCP Server
Log defect through OAuth-protected endpoint.

In [12]:
thread = project_client.agents.threads.create()
print(f"Created thread: {thread.id}")

message = project_client.agents.messages.create(
    thread_id=thread.id,
    role="user",
    content="Log defect: Weld joint misaligned on chassis frame, high severity."
)
print(f"Created message: {message.id}")

# Pass OAuth token in run resources for tools/call authentication
mcp_tool.update_headers("Authorization", f"Bearer {access_token}")

run = project_client.agents.runs.create(
    thread_id=thread.id,
    agent_id=agent.id,
    tool_resources=mcp_tool.resources
)
print(f"Created run: {run.id}, initial status: {run.status}")

# Monitor run with timeout
max_iterations = 60
iteration = 0
while run.status in ["queued", "in_progress", "requires_action"] and iteration < max_iterations:
    time.sleep(1)
    iteration += 1
    run = project_client.agents.runs.get(thread_id=thread.id, run_id=run.id)
    print(f"[{iteration}] Run status: {run.status}")
    
    if run.status == "requires_action" and isinstance(run.required_action, SubmitToolApprovalAction):
        print(f"  Tool calls pending approval: {len(run.required_action.submit_tool_approval.tool_calls)}")
        approvals = [
            ToolApproval(tool_call_id=tc.id, approve=True, headers=mcp_tool.headers)
            for tc in run.required_action.submit_tool_approval.tool_calls
            if isinstance(tc, RequiredMcpToolCall)
        ]
        if approvals:
            print(f"  Approving {len(approvals)} tool calls...")
            run = project_client.agents.runs.submit_tool_outputs(
                thread_id=thread.id, run_id=run.id, tool_approvals=approvals
            )

# Check final status
print(f"\nFinal run status: {run.status}")
if run.status == "failed":
    print(f"Run failed: {run.last_error}")

# Display all messages
messages = list(project_client.agents.messages.list(thread_id=thread.id))
print(f"\nTotal messages: {len(messages)}")
print("-" * 60)
for msg in reversed(messages):
    print(f"\n[{msg.role.upper()}]:")
    if msg.text_messages:
        for text_msg in msg.text_messages:
            print(text_msg.text.value)
    if msg.role == "assistant" and hasattr(msg, 'tool_calls') and msg.tool_calls:
        print(f"  Tool calls: {len(msg.tool_calls)}")
print("-" * 60)

Created thread: thread_IaKDmZFSaSspbWpDrCZYmWua
Created message: msg_NbfFXE0A9LWufGLgcblBTUOh
Created message: msg_NbfFXE0A9LWufGLgcblBTUOh
Created run: run_iOcJdubnf7bVvYZnZMURwCvu, initial status: RunStatus.QUEUED
Created run: run_iOcJdubnf7bVvYZnZMURwCvu, initial status: RunStatus.QUEUED
[1] Run status: RunStatus.IN_PROGRESS
[1] Run status: RunStatus.IN_PROGRESS
[2] Run status: RunStatus.IN_PROGRESS
[2] Run status: RunStatus.IN_PROGRESS
[3] Run status: RunStatus.COMPLETED

Final run status: RunStatus.COMPLETED
[3] Run status: RunStatus.COMPLETED

Final run status: RunStatus.COMPLETED

Total messages: 2
------------------------------------------------------------

[USER]:
Log defect: Weld joint misaligned on chassis frame, high severity.

[ASSISTANT]:
Defect Log Entry  
**Defect**: Weld joint misaligned on chassis frame  
**Severity**: High  

**Steps for Operators to Log This Defect Using Secure MCP Server**:

1. **Log in to MCP Server**  
   - Access the secure MCP defect logging p

## Cleanup

In [13]:
# project_client.agents.delete_agent(agent.id)
# subprocess.run([
#     "az", "group", "delete",
#     "--name", RESOURCE_GROUP,
#     "--yes", "--no-wait"
# ], capture_output=True)
# 
# print("✓ Cleanup initiated (resources deleting in background)")

## Summary

**Architecture:**
1. **MCP Server** - Dockerized FastMCP app in Container Apps
2. **API Management** - OAuth2 validation, IP restrictions
3. **Container Apps** - Only APIM can access
4. **Azure AI Agent** - Uses OAuth token for authentication

**Security Benefits:**
- **Authentication**: Microsoft Entra ID OAuth2 tokens
- **Network Isolation**: Container only accessible via APIM
- **No API Keys**: Token-based, short-lived credentials
- **Audit Trail**: APIM logs all requests