## Wealth Management SQL Agent

### Notebook 2: Build MCP Server
In this notebook, we will create an MCP server that will allow the agent to interact with the duckdb database we created in the previous notebook. 

>MCP is an open protocol that standardizes how applications provide context to large language models (LLMs). Think of MCP like a USB-C port for AI applications. Just as USB-C provides a standardized way to connect your devices to various peripherals and accessories, MCP provides a standardized way to connect AI models to different data sources and tools. MCP enables you build agents and complex workflows on top of LLMs and connects your models with the world.

For more information on MCP, see the [MCP documentation](https://modelcontextprotocol.io/docs/getting-started/intro).

By exposing the duckdb database through an MCP server, we can allow multiple agents to interact with the database without having to create a specific API for each agent. This makes our tools more modular, easier to maintain, more reusable, and more secure.

In [None]:
import sys
import os
module_path = "../.."
sys.path.append(os.path.abspath(module_path))
from utils.environment_validation import validate_environment, validate_model_access
validate_environment()

In [None]:
required_models = [
    "us.anthropic.claude-3-5-haiku-20241022-v1:0",
    "us.anthropic.claude-3-7-sonnet-20250219-v1:0",
]
validate_model_access(required_models)

The actual MCP server is implemented in the [mcp_server.py](mcp_server.py) file using the official [MCP Python SDK](https://github.com/modelcontextprotocol/python-sdk). All of the SQL capabilities are exposed as [MCP tools](https://modelcontextprotocol.io/docs/learn/server-concepts#tools-ai-actions), which can be used by any agent that connects to the MCP server.

To provide an illustration of how authorization could work, we'll generate a simple json web token (JWT) that will contain the client_id for the user along with access scopes that will be used to determine which tools the user can access. The JWT will be signed with a simple shared secret key that can be used to encode and decode the token. In a production system, you would want to use a more secure method of authentication and authorization, such as OAuth2 or OpenID Connect.

The JWT will be provided via the `Authorization` header in the HTTP request to the MCP server. The server will decode the JWT and use the client_id and scopes to determine which tools the user can access.
The defined scopes will give the user access to the following tools:
`client` scope: 
 - `get_customer_info`
 - `get_customer_accounts`
 - `get_account_holdings`
 - `get_account_transactions`
 The tools do not take any input parameters, and the client_id is derived from the JWT token and wil be dynamically injected into the tool using the request context.
Here is an example implementation 
```python
@mcp.tool(annotations={"required_scope": ["client"]})
def get_customer_accounts(context: Context) -> str:
    """Get a list of accounts for a specific customer."""
    request = context.request_context.request
    user = request.user
    client_id = user.display_name

    return (
        DATABASE.execute(f"EXECUTE query_customer_accounts('{client_id}')")
        .fetchdf()
        .to_markdown(index=False)
    )
```

A user with the `portfolio_analyst` scope will have access to the following tools:
- `get_data_catalog`: will provide the schema of the database, including the tables and columns
- `run_custom_query`: run custom SQL query against the database


In [None]:
from mcp.client.streamable_http import streamablehttp_client
from mcp import ClientSession
import nest_asyncio
import asyncio
import httpx
import subprocess
import sys
from rich import print as rprint
from rich.markdown import Markdown

nest_asyncio.apply()

In [None]:
import secrets
import jwt
import json
import os
from pathlib import Path

# user data file contains a mapping of user emails to their client IDs
USER_DATA = json.load(Path("user_info.json").open())

# generate a secret key for JWT encoding/decoding
secret_key_path = Path("secret_key.txt")
if secret_key_path.exists():
    secret_key = secret_key_path.read_text().strip()
else:
    secret_key = secrets.token_urlsafe(32)
    secret_key_path.write_text(secret_key)


os.environ["SECRET_KEY"] = secret_key

In [None]:
def get_user_jwt(user:str, scopes: list[str] = ["client"]) -> str:

    """Generate a JWT for the given user with the specified scopes."""

    client_id = USER_DATA.get(user)
    if not client_id:
        raise ValueError(f"User {user} not found in user_info.json")
    encoded_jwt = jwt.encode({"client_id": client_id, "scopes": scopes}, os.environ["SECRET_KEY"], algorithm="HS256")

    return encoded_jwt

In [None]:
# launch the MCP server in a background process
mcp_server_process = subprocess.Popen(
    [sys.executable, "mcp_server.py"],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)
rprint (f"MCP server started in the background on pid {mcp_server_process.pid}")

In [None]:
async def test_invoke(user: str | None = None, scopes: list[str] | None = None):
    
    """Helper function to test invoking the MCP server with different users and scopes."""
    
    if user is not None and scopes is not None:
        jwt_token = get_user_jwt(user, scopes)
        headers = {"Authorization": f"Bearer {jwt_token}"}
    else:
        headers = None
    
    try:    
    
        async with streamablehttp_client("http://localhost:8000/mcp", headers=headers) as (read_stream, write_stream, _):
            
            async with ClientSession(read_stream, write_stream) as session:
                # Initialize the connection
                await session.initialize()

                # List tools
                tool_list = await session.list_tools()
                available_tools = [tool.name for tool in tool_list.tools]
                rprint(Markdown("## Available tools:"), available_tools)
                
                # Call a tool
                list_accounts_results = await session.call_tool("get_customer_accounts")
                list_accounts_content = list_accounts_results.content[0].text
                rprint(Markdown(f"## Result from calling get_customer_accounts \n{list_accounts_content}"))

                sql_query_results = await session.call_tool("run_custom_query", {"query": "SELECT count(*) as num_accounts FROM wealth.accounts"})
                sql_query_content = sql_query_results.content[0].text
                rprint(Markdown(f"## SQL Query Result:\n{sql_query_content}"))

    except* httpx.HTTPStatusError as excgroup:
        for exc in excgroup.exceptions:
            print(f"HTTP error occurred: {exc.response.status_code}")
                


The helper function above will be used to test the following scenarios:
- An unauthenticated user without the Authorization header should not be able to access any tools and should receive a 401 Unauthorized error.
- A user with the `client` scope should be able to list the client tools and call the `get_customer_accounts` tool.
- A user with the `client` scope should not be able to see or call the `run_custom_query` tool.
- A user with the `portfolio_analyst` scope should be able to list the portfolio analyst tools and call the `run_custom_query` tool.
- A user with the `portfolio_analyst` scope should not be able to see or call the `get_customer_accounts` tool.

In [None]:

rprint(Markdown("# Testing with an unauthenticated user"))
await test_invoke()

rprint(Markdown("# Testing a user with 'client' and 'authenticated' scopes"))
await test_invoke("ehunt@example.org", ["client", "authenticated"])

rprint(Markdown("# Testing a user with 'portfolio_analyst' and 'authenticated' scopes"))
await test_invoke("mwise@example.net", ["portfolio_analyst", "authenticated"])


In [None]:
# shutdown the MCP server
mcp_server_process.terminate()