<a href="https://colab.research.google.com/github/NormLorenz/ai-llm-openai-mcp/blob/main/openai-mcp.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Using OpenAI with a MCP Server

In [None]:
# Install required packages

!pip install --upgrade pip
!pip install fastmcp openai nest_asyncio

In [None]:
# The MCP Server

from fastmcp import FastMCP
import nest_asyncio
import threading
import time

nest_asyncio.apply()

mcp = FastMCP(
    name="WeatherServer",
    instructions="This provides an up to date weather forecast for any location."
)

# Tool 1: Forecast
@mcp.tool("get_forecast")
def get_forecast(location: str):
    return {"forecast": f"Sunny in {location}"}

# Tool 2: Alerts
@mcp.tool(name="get_alerts")
def get_alerts(location: str):
    return {"alerts": f"No severe alerts currently for {location}"}

# Tool 3: Math
@mcp.tool
def multiply(a: float, b: float) -> float:
    """Multiplies two numbers together."""
    return a * b

# Tool 4: Health Check
@mcp.tool
def health_check():
    """Returns the health status of the server."""
    return {"status": "ok"}

# Resource 1: Climate Data
@mcp.resource(uri="http://example.com", name="climate_data")
def climate_data():
    """Return static climate information."""
    return {
        "Berlin": {"avg_temp": "10¬∞C", "rainfall": "570mm"},
        "Boise": {"avg_temp": "12¬∞C", "rainfall": "300mm"},
        "Tokyo": {"avg_temp": "16¬∞C", "rainfall": "1500mm"}
    }

# Prompt 1: Analysis of Numerical data
@mcp.prompt
def analyze_data(data_points: list[float]) -> str:
    """Creates a prompt asking for analysis of numerical data."""
    formatted_data = ", ".join(str(point) for point in data_points)
    return f"Please analyze these data points: {formatted_data}"

# Define the function to run the server
def run_server():
  # Use transport="streamable-http" for compatibility with notebooks/Colab
  print("üöÄ Starting FastMCP server in background thread...")
  mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)

# Start the server in a separate thread
server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()

# Give the server a moment to start up
time.sleep(5)
print("‚úÖ Server should be running. Access it at http://localhost:8000/mcp")

In [6]:
# The MCP Server test suite

import requests
import json
from typing import Dict, Any, List
from dataclasses import dataclass
import time

# Configuration
MCP_SERVER_URL = "http://localhost:8000/mcp"
REQUEST_ID = 1

@dataclass
class TestResult:
    name: str
    passed: bool
    message: str
    response: Any = None
    duration: float = 0.0

class MCPServerTester:
    def __init__(self, server_url: str):
        self.server_url = server_url
        self.request_id = 1
        self.results: List[TestResult] = []
        self.session_initialized = False

    def _make_request(self, method: str, params: Dict = None) -> Dict:
        """Make a JSON-RPC request to the MCP server"""
        start_time = time.time()

        payload = {
            "jsonrpc": "2.0",
            "id": self.request_id,
            "method": method,
        }

        headers={
          "Accept": "application/json, text/event-stream"
        }

        if params:
            payload["params"] = params

        try:
            response = requests.post(self.server_url, json=payload, headers=headers, timeout=10)
            duration = time.time() - start_time
            self.request_id += 1

            if response.status_code == 200:
                clean_text = response.text.removeprefix("event: message\r\ndata: ").removesuffix("\r\n\r\n")
                clean_json = json.loads(clean_text)

                # return {"success": True, "data": response.json(), "duration": duration}
                return {"success": True, "data": clean_json, "duration": duration}
            else:
                return {
                    "success": False,
                    "error": f"HTTP {response.status_code}: {response.text}",
                    "duration": duration
                }
        except Exception as e:
            duration = time.time() - start_time
            return {"success": False, "error": str(e), "duration": duration}

    def test_initialize(self) -> TestResult:
        """Test server initialization"""
        print("üîß Testing: Server Initialization")

        result = self._make_request(
            "initialize",
            {
                "protocolVersion": "2024-11-05",
                "capabilities": {},
                "clientInfo": {
                    "name": "test-client",
                    "version": "1.0.0"
                }
            }
        )

        if result["success"]:
            data = result["data"]
            if "result" in data and "capabilities" in data["result"]:
                self.session_initialized = True
                return TestResult(
                    "initialize",
                    True,
                    "‚úÖ Server initialized successfully",
                    data["result"],
                    result["duration"]
                )

        return TestResult(
            "initialize",
            False,
            f"‚ùå Initialization failed: {result.get('error', 'Unknown error')}",
            result.get("data"),
            result.get("duration", 0)
        )

    def test_list_tools(self) -> TestResult:
        """Test listing available tools"""
        print("üîß Testing: List Tools")

        result = self._make_request("tools/list")

        if result["success"]:
            data = result["data"]
            if "result" in data and "tools" in data["result"]:
                tools = data["result"]["tools"]
                return TestResult(
                    "tools/list",
                    True,
                    f"‚úÖ Found {len(tools)} tool(s)",
                    tools,
                    result["duration"]
                )

        return TestResult(
            "tools/list",
            False,
            f"‚ùå Failed to list tools: {result.get('error', 'Unknown error')}",
            result.get("data"),
            result.get("duration", 0)
        )

    def test_tool_call(self, tool_name: str, arguments: Dict = None) -> TestResult:
        """Test calling a specific tool"""
        print(f"üîß Testing: Tool '{tool_name}'")

        params = {"name": tool_name}
        if arguments:
            params["arguments"] = arguments

        result = self._make_request("tools/call", params)

        if result["success"]:
            data = result["data"]
            if "result" in data:
                return TestResult(
                    f"tool:{tool_name}",
                    True,
                    f"‚úÖ Tool '{tool_name}' executed successfully",
                    data["result"],
                    result["duration"]
                )
            elif "error" in data:
                return TestResult(
                    f"tool:{tool_name}",
                    False,
                    f"‚ùå Tool error: {data['error'].get('message', 'Unknown error')}",
                    data["error"],
                    result["duration"]
                )

        return TestResult(
            f"tool:{tool_name}",
            False,
            f"‚ùå Tool call failed: {result.get('error', 'Unknown error')}",
            result.get("data"),
            result.get("duration", 0)
        )

    def test_list_resources(self) -> TestResult:
        """Test listing available resources"""
        print("üîß Testing: List Resources")

        result = self._make_request("resources/list")

        if result["success"]:
            data = result["data"]
            if "result" in data and "resources" in data["result"]:
                resources = data["result"]["resources"]
                return TestResult(
                    "resources/list",
                    True,
                    f"‚úÖ Found {len(resources)} resource(s)",
                    resources,
                    result["duration"]
                )

        return TestResult(
            "resources/list",
            False,
            f"‚ùå Failed to list resources: {result.get('error', 'Unknown error')}",
            result.get("data"),
            result.get("duration", 0)
        )

    def test_read_resource(self, uri: str) -> TestResult:
        """Test reading a specific resource"""
        print(f"üîß Testing: Resource '{uri}'")

        result = self._make_request("resources/read", {"uri": uri})

        if result["success"]:
            data = result["data"]
            if "result" in data:
                return TestResult(
                    f"resource:{uri}",
                    True,
                    f"‚úÖ Resource '{uri}' read successfully",
                    data["result"],
                    result["duration"]
                )
            elif "error" in data:
                return TestResult(
                    f"resource:{uri}",
                    False,
                    f"‚ùå Resource error: {data['error'].get('message', 'Unknown error')}",
                    data["error"],
                    result["duration"]
                )

        return TestResult(
            f"resource:{uri}",
            False,
            f"‚ùå Resource read failed: {result.get('error', 'Unknown error')}",
            result.get("data"),
            result.get("duration", 0)
        )

    def test_list_prompts(self) -> TestResult:
        """Test listing available prompts"""
        print("üîß Testing: List Prompts")

        result = self._make_request("prompts/list")

        if result["success"]:
            data = result["data"]
            if "result" in data and "prompts" in data["result"]:
                prompts = data["result"]["prompts"]
                return TestResult(
                    "prompts/list",
                    True,
                    f"‚úÖ Found {len(prompts)} prompt(s)",
                    prompts,
                    result["duration"]
                )

        return TestResult(
            "prompts/list",
            False,
            f"‚ùå Failed to list prompts: {result.get('error', 'Unknown error')}",
            result.get("data"),
            result.get("duration", 0)
        )

    def test_get_prompt(self, name: str, arguments: Dict = None) -> TestResult:
        """Test getting a specific prompt"""
        print(f"üîß Testing: Prompt '{name}'")

        params = {"name": name}
        if arguments:
            params["arguments"] = arguments

        result = self._make_request("prompts/get", params)

        if result["success"]:
            data = result["data"]
            if "result" in data:
                return TestResult(
                    f"prompt:{name}",
                    True,
                    f"‚úÖ Prompt '{name}' retrieved successfully",
                    data["result"],
                    result["duration"]
                )
            elif "error" in data:
                return TestResult(
                    f"prompt:{name}",
                    False,
                    f"‚ùå Prompt error: {data['error'].get('message', 'Unknown error')}",
                    data["error"],
                    result["duration"]
                )

        return TestResult(
            f"prompt:{name}",
            False,
            f"‚ùå Prompt retrieval failed: {result.get('error', 'Unknown error')}",
            result.get("data"),
            result.get("duration", 0)
        )

    def run_all_tests(self,
                      tool_tests: List[Dict] = None,
                      resource_tests: List[str] = None,
                      prompt_tests: List[Dict] = None) -> None:
        """Run all tests and print results"""
        print("\n" + "="*80)
        print("üöÄ STARTING MCP SERVER TEST SUITE")
        print("="*80 + "\n")

        # Test 1: Initialize
        result = self.test_initialize()
        self.results.append(result)
        print(f"{result.message} ({result.duration:.3f}s)\n")

        if not result.passed:
            print("‚ö†Ô∏è  Cannot continue without successful initialization\n")
            self.print_summary()
            return

        # Test 2: List and test tools
        result = self.test_list_tools()
        self.results.append(result)
        print(f"{result.message} ({result.duration:.3f}s)")

        if result.passed and result.response:
            print("\nüìã Available Tools:")
            for tool in result.response:
                print(f"  ‚Ä¢ {tool.get('name', 'unknown')}: {tool.get('description', 'No description')}")
            print()

            # Run custom tool tests if provided
            if tool_tests:
                for test in tool_tests:
                    result = self.test_tool_call(test["name"], test.get("arguments"))
                    self.results.append(result)
                    print(f"{result.message} ({result.duration:.3f}s)\n")

        # Test 3: List and test resources
        result = self.test_list_resources()
        self.results.append(result)
        print(f"{result.message} ({result.duration:.3f}s)")

        if result.passed and result.response:
            print("\nüìã Available Resources:")
            for resource in result.response:
                print(f"  ‚Ä¢ {resource.get('uri', 'unknown')}: {resource.get('description', 'No description')}")
            print()

            # Run custom resource tests if provided
            if resource_tests:
                for uri in resource_tests:
                    result = self.test_read_resource(uri)
                    self.results.append(result)
                    print(f"{result.message} ({result.duration:.3f}s)\n")

        # Test 4: List and test prompts
        result = self.test_list_prompts()
        self.results.append(result)
        print(f"{result.message} ({result.duration:.3f}s)")

        if result.passed and result.response:
            print("\nüìã Available Prompts:")
            for prompt in result.response:
                print(f"  ‚Ä¢ {prompt.get('name', 'unknown')}: {prompt.get('description', 'No description')}")
            print()

            # Run custom prompt tests if provided
            if prompt_tests:
                for test in prompt_tests:
                    result = self.test_get_prompt(test["name"], test.get("arguments"))
                    self.results.append(result)
                    print(f"{result.message} ({result.duration:.3f}s)\n")

        self.print_summary()

    def print_summary(self) -> None:
        """Print test summary"""
        print("\n" + "="*80)
        print("üìä TEST SUMMARY")
        print("="*80 + "\n")

        passed = sum(1 for r in self.results if r.passed)
        failed = sum(1 for r in self.results if not r.passed)
        total = len(self.results)
        total_time = sum(r.duration for r in self.results)

        print(f"‚úÖ Passed: {passed}/{total}")
        print(f"‚ùå Failed: {failed}/{total}")
        print(f"‚è±Ô∏è  Total time: {total_time:.3f}s\n")

        if failed > 0:
            print("Failed Tests:")
            for result in self.results:
                if not result.passed:
                    print(f"  ‚Ä¢ {result.name}: {result.message}")

        print("\n" + "="*80 + "\n")


# Example usage
if __name__ == "__main__":
    # Initialize tester
    tester = MCPServerTester(MCP_SERVER_URL)

    # Define your custom tests here
    # Example tool tests
    tool_tests = [
        {"name": "get_weather", "arguments": {"city": "San Francisco"}},
        {"name": "calculate", "arguments": {"expression": "2 + 2"}},
        # Add more tool tests as needed
    ]

    # Example resource tests
    resource_tests = [
        "weather://current",
        "config://settings",
        # Add more resource URIs as needed
    ]

    # Example prompt tests
    prompt_tests = [
        {"name": "weather_summary", "arguments": {"city": "New York"}},
        {"name": "help", "arguments": {}},
        # Add more prompt tests as needed
    ]

    # Run all tests
    tester.run_all_tests(
        tool_tests=tool_tests,
        resource_tests=resource_tests,
        prompt_tests=prompt_tests
    )

    # Optional: Export results to JSON
    results_json = [
        {
            "name": r.name,
            "passed": r.passed,
            "message": r.message,
            "duration": r.duration
        }
        for r in tester.results
    ]

    with open("test_results.json", "w") as f:
        json.dump(results_json, f, indent=2)

    print("üíæ Results saved to test_results.json")


üöÄ STARTING MCP SERVER TEST SUITE

üîß Testing: Server Initialization
INFO:     127.0.0.1:48326 - "POST /mcp HTTP/1.1" 200 OK
‚úÖ Server initialized successfully (0.012s)

üîß Testing: List Tools
INFO:     127.0.0.1:48342 - "POST /mcp HTTP/1.1" 400 Bad Request
‚ùå Failed to list tools: HTTP 400: {"jsonrpc":"2.0","id":"server-error","error":{"code":-32600,"message":"Bad Request: Missing session ID"}} (0.005s)
üîß Testing: List Resources
INFO:     127.0.0.1:48354 - "POST /mcp HTTP/1.1" 400 Bad Request
‚ùå Failed to list resources: HTTP 400: {"jsonrpc":"2.0","id":"server-error","error":{"code":-32600,"message":"Bad Request: Missing session ID"}} (0.006s)
üîß Testing: List Prompts
INFO:     127.0.0.1:48360 - "POST /mcp HTTP/1.1" 400 Bad Request
‚ùå Failed to list prompts: HTTP 400: {"jsonrpc":"2.0","id":"server-error","error":{"code":-32600,"message":"Bad Request: Missing session ID"}} (0.008s)

üìä TEST SUMMARY

‚úÖ Passed: 1/4
‚ùå Failed: 3/4
‚è±Ô∏è  Total time: 0.031s

Failed Te

In [None]:
import requests

# The /mcp endpoint is designed for POST requests with JSON-RPC payloads.
# The server requires the client to accept both application/json and text/event-stream.
response = requests.post(
    "http://localhost:8000/mcp",
    json={
        "jsonrpc": "2.0",
        "id": 1,
        "method": "initialize",
        "params": {
            "protocolVersion": "2024-11-05",
            "capabilities": {},
            "clientInfo": {
                "name": "test-client",
                "version": "1.0.0"
            }
        }
    },
    headers={
        "Accept": "application/json, text/event-stream"
    }
)

# Check the status code first
if response.status_code == 200:
    print(f"Initialization successful (Status: {response.status_code})")
    # Try to display JSON if available, otherwise display text
    try:
        display(response.json())
    except requests.exceptions.JSONDecodeError:
        display("Server did not return a JSON response for initialize, or the response was empty.")
        display(f"Response text: xxx{response.text}xxx")
else:
    display(f"Initialization failed (Status: {response.status_code})")
    display(f"Response text: {response.text}")

In [None]:
# Set your API key
client = OpenAI(api_key="your-api-key-here")

# Now you can use the OpenAI client
# The FastMCP server is running in the background on localhost:8000
# and can be accessed by your agent if configured to do so

# Example OpenAI Agent code
response = client.chat.completions.create(
    model="gpt-4",
    messages=[
        {"role": "user", "content": "What's the weather like?"}
    ]
)

print(response.choices[0].message.content)