# F-Prime MCP Server - Authentication Testing Notebook

This notebook allows you to test the Microsoft Entra ID (Azure AD) OIDC authentication flow for the F-Prime MCP Server.

## Prerequisites
1. Azure Entra ID app registration completed
2. F-Prime MCP Server running locally or remotely
3. Required Python packages installed

In [None]:
# Install required packages (run once)
!pip install msal httpx python-jose pydantic rich

In [None]:
# Imports
import os
import json
import time
from datetime import datetime, timedelta
from typing import Optional

import httpx
import msal
from jose import jwt
from rich import print as rprint
from rich.console import Console
from rich.table import Table
from rich.panel import Panel

console = Console()

## 1. Configuration

Enter your Azure Entra ID and server configuration below.

In [None]:
# =============================================================================
# CONFIGURATION - Update these values with your Azure settings
# =============================================================================

CONFIG = {
    # Azure Entra ID Settings
    "tenant_id": "YOUR_TENANT_ID",           # Directory (tenant) ID
    "client_id": "YOUR_CLIENT_ID",           # Application (client) ID
    "client_secret": "YOUR_CLIENT_SECRET",   # Client secret (for confidential client tests)
    
    # F-Prime Authorization
    "fprime_group_id": "YOUR_FPRIME_GROUP_ID",  # Security group Object ID
    
    # MCP Server
    "server_url": "http://localhost:8000",   # MCP server URL
    
    # Scopes
    "scopes": ["openid", "profile", "email"],
}

# Add API scope
CONFIG["api_scope"] = f"api://{CONFIG['client_id']}/access"
CONFIG["authority"] = f"https://login.microsoftonline.com/{CONFIG['tenant_id']}"

rprint(Panel("[green]Configuration loaded![/green]", title="Status"))
rprint(f"Authority: {CONFIG['authority']}")
rprint(f"Server URL: {CONFIG['server_url']}")

## 2. Test Azure Entra ID Connectivity

Verify we can reach the Azure OIDC endpoints.

In [None]:
async def test_azure_connectivity():
    """Test connectivity to Azure Entra ID OIDC endpoints."""
    
    endpoints = {
        "OpenID Configuration": f"{CONFIG['authority']}/v2.0/.well-known/openid-configuration",
        "JWKS (Keys)": f"https://login.microsoftonline.com/{CONFIG['tenant_id']}/discovery/v2.0/keys",
    }
    
    table = Table(title="Azure Entra ID Endpoint Connectivity")
    table.add_column("Endpoint", style="cyan")
    table.add_column("Status", style="green")
    table.add_column("Details", style="yellow")
    
    async with httpx.AsyncClient() as client:
        for name, url in endpoints.items():
            try:
                resp = await client.get(url, timeout=10)
                if resp.status_code == 200:
                    data = resp.json()
                    if name == "OpenID Configuration":
                        detail = f"Issuer: {data.get('issuer', 'N/A')}"
                    else:
                        detail = f"Keys found: {len(data.get('keys', []))}"
                    table.add_row(name, "✓ OK", detail)
                else:
                    table.add_row(name, f"✗ {resp.status_code}", "Failed")
            except Exception as e:
                table.add_row(name, "✗ Error", str(e)[:50])
    
    console.print(table)

# Run the test
await test_azure_connectivity()

## 3. Test MCP Server Connectivity

Verify the MCP server is running and accessible.

In [None]:
async def test_server_connectivity():
    """Test connectivity to the F-Prime MCP Server."""
    
    endpoints = {
        "Health Check": f"{CONFIG['server_url']}/health",
        "Root Info": f"{CONFIG['server_url']}/",
        "Auth Status (should fail)": f"{CONFIG['server_url']}/auth/me",
    }
    
    table = Table(title="MCP Server Connectivity")
    table.add_column("Endpoint", style="cyan")
    table.add_column("Status", style="green")
    table.add_column("Response", style="yellow")
    
    async with httpx.AsyncClient() as client:
        for name, url in endpoints.items():
            try:
                resp = await client.get(url, timeout=10)
                status = "✓" if resp.status_code in [200, 401, 403] else "✗"
                try:
                    detail = json.dumps(resp.json())[:60] + "..."
                except:
                    detail = resp.text[:60]
                table.add_row(name, f"{status} {resp.status_code}", detail)
            except httpx.ConnectError:
                table.add_row(name, "✗ Connection Failed", "Is the server running?")
            except Exception as e:
                table.add_row(name, "✗ Error", str(e)[:50])
    
    console.print(table)

# Run the test
await test_server_connectivity()

## 4. Authenticate with Device Code Flow

This is the recommended flow for notebooks and CLI tools. It will display a code and URL for you to authenticate in your browser.

In [None]:
# Store tokens globally for use in other cells
TOKENS = {}

def authenticate_device_code():
    """Authenticate using device code flow."""
    global TOKENS
    
    app = msal.PublicClientApplication(
        client_id=CONFIG["client_id"],
        authority=CONFIG["authority"],
    )
    
    # Request scopes including custom API scope
    scopes = CONFIG["scopes"] + [CONFIG["api_scope"]]
    
    # Initiate device code flow
    flow = app.initiate_device_flow(scopes=scopes)
    
    if "user_code" not in flow:
        rprint(f"[red]Failed to create device flow: {flow.get('error_description', 'Unknown error')}[/red]")
        return None
    
    # Display instructions
    rprint(Panel(
        f"[bold yellow]To sign in:[/bold yellow]\n\n"
        f"1. Open: [link={flow['verification_uri']}]{flow['verification_uri']}[/link]\n"
        f"2. Enter code: [bold cyan]{flow['user_code']}[/bold cyan]\n\n"
        f"Waiting for authentication...",
        title="Device Code Authentication"
    ))
    
    # Wait for user to complete authentication
    result = app.acquire_token_by_device_flow(flow)
    
    if "access_token" in result:
        TOKENS = {
            "access_token": result["access_token"],
            "refresh_token": result.get("refresh_token"),
            "id_token": result.get("id_token"),
            "expires_in": result.get("expires_in", 3600),
            "token_type": result.get("token_type", "Bearer"),
        }
        
        # Extract user info from ID token claims
        claims = result.get("id_token_claims", {})
        
        rprint(Panel(
            f"[bold green]Authentication Successful![/bold green]\n\n"
            f"User: {claims.get('name', 'Unknown')}\n"
            f"Email: {claims.get('preferred_username', 'Unknown')}\n"
            f"Token expires in: {result.get('expires_in', 'Unknown')} seconds",
            title="Success"
        ))
        
        return TOKENS
    else:
        rprint(f"[red]Authentication failed: {result.get('error_description', 'Unknown error')}[/red]")
        return None

# Run authentication
authenticate_device_code()

## 5. Inspect the Access Token

Decode and inspect the JWT access token to verify its contents.

In [None]:
def inspect_token(token: str, token_name: str = "Access Token"):
    """Decode and display JWT token contents (without verification)."""
    
    try:
        # Decode without verification to inspect claims
        claims = jwt.get_unverified_claims(token)
        header = jwt.get_unverified_header(token)
        
        # Header info
        rprint(Panel(f"Algorithm: {header.get('alg')}\nKey ID: {header.get('kid')}", title=f"{token_name} Header"))
        
        # Claims table
        table = Table(title=f"{token_name} Claims")
        table.add_column("Claim", style="cyan")
        table.add_column("Value", style="yellow")
        
        # Important claims to highlight
        important_claims = ['sub', 'oid', 'name', 'preferred_username', 'email', 'aud', 'iss', 'exp', 'iat', 'groups', 'roles']
        
        for claim in important_claims:
            if claim in claims:
                value = claims[claim]
                if claim in ['exp', 'iat', 'nbf']:
                    # Convert timestamp to readable date
                    dt = datetime.fromtimestamp(value)
                    value = f"{value} ({dt.isoformat()})"
                elif isinstance(value, list):
                    value = json.dumps(value, indent=2)
                table.add_row(claim, str(value))
        
        console.print(table)
        
        # Check F-Prime membership
        groups = claims.get('groups', [])
        roles = claims.get('roles', [])
        
        is_member = CONFIG['fprime_group_id'] in groups or 'FPrime.Member' in roles
        
        if is_member:
            rprint("[bold green]✓ User IS an F-Prime member[/bold green]")
        else:
            rprint("[bold red]✗ User is NOT an F-Prime member[/bold red]")
            rprint(f"  Expected group ID: {CONFIG['fprime_group_id']}")
            rprint(f"  User's groups: {groups}")
            rprint(f"  User's roles: {roles}")
        
        return claims
        
    except Exception as e:
        rprint(f"[red]Failed to decode token: {e}[/red]")
        return None

# Inspect the access token
if TOKENS.get("access_token"):
    inspect_token(TOKENS["access_token"], "Access Token")
else:
    rprint("[yellow]No access token available. Run the authentication cell first.[/yellow]")

## 6. Test Authenticated API Calls

Make authenticated requests to the MCP server.

In [None]:
async def test_authenticated_endpoints():
    """Test authenticated API endpoints."""
    
    if not TOKENS.get("access_token"):
        rprint("[yellow]No access token available. Run the authentication cell first.[/yellow]")
        return
    
    headers = {
        "Authorization": f"Bearer {TOKENS['access_token']}",
        "Content-Type": "application/json",
    }
    
    table = Table(title="Authenticated Endpoint Tests")
    table.add_column("Endpoint", style="cyan")
    table.add_column("Method", style="magenta")
    table.add_column("Status", style="green")
    table.add_column("Response", style="yellow")
    
    tests = [
        ("GET", "/auth/me", None),
        ("GET", "/mcp/tools", None),
        ("POST", "/mcp/tools/call", {"name": "fprime_search_projects", "arguments": {"query": "test"}}),
    ]
    
    async with httpx.AsyncClient(base_url=CONFIG["server_url"]) as client:
        for method, path, body in tests:
            try:
                if method == "GET":
                    resp = await client.get(path, headers=headers, timeout=10)
                else:
                    resp = await client.post(path, headers=headers, json=body, timeout=10)
                
                status_icon = "✓" if resp.status_code == 200 else "✗"
                try:
                    response_text = json.dumps(resp.json(), indent=2)[:100] + "..."
                except:
                    response_text = resp.text[:100]
                
                table.add_row(path, method, f"{status_icon} {resp.status_code}", response_text)
                
            except Exception as e:
                table.add_row(path, method, "✗ Error", str(e)[:50])
    
    console.print(table)

# Run authenticated tests
await test_authenticated_endpoints()

## 7. Test Tool Execution

Test executing specific MCP tools with authentication.

In [None]:
async def call_mcp_tool(tool_name: str, arguments: dict):
    """Call an MCP tool and display results."""
    
    if not TOKENS.get("access_token"):
        rprint("[yellow]No access token available. Run the authentication cell first.[/yellow]")
        return None
    
    headers = {
        "Authorization": f"Bearer {TOKENS['access_token']}",
        "Content-Type": "application/json",
    }
    
    payload = {
        "name": tool_name,
        "arguments": arguments,
    }
    
    rprint(f"[cyan]Calling tool:[/cyan] {tool_name}")
    rprint(f"[cyan]Arguments:[/cyan] {json.dumps(arguments, indent=2)}")
    
    async with httpx.AsyncClient(base_url=CONFIG["server_url"]) as client:
        try:
            resp = await client.post("/mcp/tools/call", headers=headers, json=payload, timeout=30)
            
            if resp.status_code == 200:
                result = resp.json()
                rprint(Panel(
                    json.dumps(result, indent=2),
                    title=f"[green]Tool Response (Status: {resp.status_code})[/green]"
                ))
                return result
            else:
                rprint(Panel(
                    resp.text,
                    title=f"[red]Error (Status: {resp.status_code})[/red]"
                ))
                return None
                
        except Exception as e:
            rprint(f"[red]Request failed: {e}[/red]")
            return None

# Test: Search Projects
await call_mcp_tool("fprime_search_projects", {"query": "demo", "limit": 5})

In [None]:
# Test: Get Document
await call_mcp_tool("fprime_get_document", {"document_id": "doc-123", "include_content": True})

In [None]:
# Test: Team Directory
await call_mcp_tool("fprime_team_directory", {"name": "Smith", "department": "Engineering"})

## 8. Token Validation Test

Verify the token can be validated against Azure's JWKS.

In [None]:
async def validate_token_with_jwks(token: str):
    """Validate token signature using Azure's JWKS."""
    from jose import jwt as jose_jwt
    from jose.exceptions import JWTError, ExpiredSignatureError
    
    jwks_url = f"https://login.microsoftonline.com/{CONFIG['tenant_id']}/discovery/v2.0/keys"
    
    async with httpx.AsyncClient() as client:
        # Fetch JWKS
        rprint("[cyan]Fetching JWKS...[/cyan]")
        resp = await client.get(jwks_url)
        jwks = resp.json()
        rprint(f"[green]Found {len(jwks.get('keys', []))} keys[/green]")
        
        # Get the key ID from token header
        header = jose_jwt.get_unverified_header(token)
        kid = header.get('kid')
        rprint(f"[cyan]Token key ID: {kid}[/cyan]")
        
        # Find matching key
        rsa_key = None
        for key in jwks.get('keys', []):
            if key.get('kid') == kid:
                rsa_key = key
                rprint("[green]✓ Found matching key in JWKS[/green]")
                break
        
        if not rsa_key:
            rprint("[red]✗ No matching key found in JWKS[/red]")
            return False
        
        # Validate token
        try:
            issuer = f"https://login.microsoftonline.com/{CONFIG['tenant_id']}/v2.0"
            
            claims = jose_jwt.decode(
                token,
                rsa_key,
                algorithms=['RS256'],
                audience=CONFIG['client_id'],
                issuer=issuer,
                options={
                    'verify_exp': True,
                    'verify_aud': True,
                    'verify_iss': True,
                }
            )
            
            rprint(Panel(
                "[bold green]✓ Token signature is VALID[/bold green]\n\n"
                f"Subject: {claims.get('sub')}\n"
                f"Audience: {claims.get('aud')}\n"
                f"Issuer: {claims.get('iss')}",
                title="Token Validation Result"
            ))
            return True
            
        except ExpiredSignatureError:
            rprint("[red]✗ Token has EXPIRED[/red]")
            return False
        except JWTError as e:
            rprint(f"[red]✗ Token validation FAILED: {e}[/red]")
            return False

# Validate the token
if TOKENS.get("access_token"):
    await validate_token_with_jwks(TOKENS["access_token"])
else:
    rprint("[yellow]No access token available. Run the authentication cell first.[/yellow]")

## 9. Full Integration Test

Run a complete end-to-end test of the authentication flow.

In [None]:
async def run_full_integration_test():
    """Run complete integration test suite."""
    
    results = []
    
    # Test 1: Azure connectivity
    rprint("\n[bold]Test 1: Azure Entra ID Connectivity[/bold]")
    try:
        async with httpx.AsyncClient() as client:
            resp = await client.get(f"{CONFIG['authority']}/v2.0/.well-known/openid-configuration")
            results.append(("Azure OIDC Discovery", resp.status_code == 200))
    except:
        results.append(("Azure OIDC Discovery", False))
    
    # Test 2: Server connectivity
    rprint("\n[bold]Test 2: MCP Server Connectivity[/bold]")
    try:
        async with httpx.AsyncClient() as client:
            resp = await client.get(f"{CONFIG['server_url']}/health")
            results.append(("MCP Server Health", resp.status_code == 200))
    except:
        results.append(("MCP Server Health", False))
    
    # Test 3: Token available
    rprint("\n[bold]Test 3: Authentication Token[/bold]")
    has_token = bool(TOKENS.get("access_token"))
    results.append(("Access Token Present", has_token))
    
    if has_token:
        # Test 4: Token validation
        rprint("\n[bold]Test 4: Token Validation[/bold]")
        is_valid = await validate_token_with_jwks(TOKENS["access_token"])
        results.append(("Token Signature Valid", is_valid))
        
        # Test 5: Authenticated request
        rprint("\n[bold]Test 5: Authenticated API Request[/bold]")
        try:
            async with httpx.AsyncClient() as client:
                headers = {"Authorization": f"Bearer {TOKENS['access_token']}"}
                resp = await client.get(f"{CONFIG['server_url']}/auth/me", headers=headers)
                results.append(("Get Current User", resp.status_code == 200))
                
                if resp.status_code == 200:
                    user_data = resp.json()
                    results.append(("F-Prime Membership", user_data.get("is_fprime_member", False)))
        except:
            results.append(("Get Current User", False))
        
        # Test 6: Tool listing
        rprint("\n[bold]Test 6: MCP Tools Listing[/bold]")
        try:
            async with httpx.AsyncClient() as client:
                headers = {"Authorization": f"Bearer {TOKENS['access_token']}"}
                resp = await client.get(f"{CONFIG['server_url']}/mcp/tools", headers=headers)
                results.append(("List MCP Tools", resp.status_code == 200))
        except:
            results.append(("List MCP Tools", False))
    
    # Summary
    rprint("\n")
    table = Table(title="Integration Test Results")
    table.add_column("Test", style="cyan")
    table.add_column("Result", style="green")
    
    passed = 0
    for test_name, success in results:
        icon = "✓ PASS" if success else "✗ FAIL"
        style = "green" if success else "red"
        table.add_row(test_name, f"[{style}]{icon}[/{style}]")
        if success:
            passed += 1
    
    console.print(table)
    rprint(f"\n[bold]Summary: {passed}/{len(results)} tests passed[/bold]")

# Run full integration test
await run_full_integration_test()

## 10. Save Token for CLI Use

Save the token to a file for use with the CLI tools.

In [None]:
def save_token_to_file(filename: str = ".fprime_token"):
    """Save current token to file for CLI use."""
    
    if not TOKENS.get("access_token"):
        rprint("[yellow]No access token available. Run the authentication cell first.[/yellow]")
        return
    
    token_data = {
        "access_token": TOKENS["access_token"],
        "refresh_token": TOKENS.get("refresh_token"),
        "expires_in": TOKENS.get("expires_in", 3600),
        "saved_at": datetime.utcnow().isoformat(),
    }
    
    with open(filename, "w") as f:
        json.dump(token_data, f, indent=2)
    
    rprint(f"[green]✓ Token saved to {filename}[/green]")
    rprint(f"[yellow]Note: Keep this file secure and do not commit to version control![/yellow]")

# Save the token
save_token_to_file()

## Troubleshooting

### Common Issues

1. **"AADSTS50011: Reply URL mismatch"**
   - The redirect URI in Azure doesn't match. For device code flow, this shouldn't occur.

2. **"Groups claim is empty"**
   - Ensure groups claim is configured in Azure AD Token Configuration
   - User might be in too many groups (>200). Use app roles instead.

3. **"Access denied - not F-Prime member"**
   - Verify user is in the F-Prime security group
   - Check that `fprime_group_id` matches the group's Object ID

4. **"Token validation failed"**
   - Check that `client_id` matches the audience in the token
   - Verify the tenant ID is correct