# JSON-RPC Basics
### **Request** Structure
```json
{
    "jsonrpc": "2.0",
    "id": 1,
    "method": "some.method",
    "params":{...}
}
```

### **Response** Structure
*Success*
```json
{
    "jsonrpc": "2.0",
    "id":1,
    "result": {...}
}
```
*Failure*
```json
{
    "jsonrpc": "2.0",
    "id": 1,
    "error": {
        "code": 123,
        "message": "Something broke"
    }
}
```
### Fields
**Required**
| Field              | Meaning                                    |
| ------------------ | ------------------------------------------ |
| `"jsonrpc": "2.0"` | Must always be exactly `"2.0"`             |
| `"method"`         | The name of the API method                 |
| `"id"`             | A unique request ID (any type except null) |

**Optional**
| Field      | Meaning                                  |
| ---------- | ---------------------------------------- |
| `"params"` | Arguments to the method (object or list) |

### Batch Requests
```json
[
  { "jsonrpc": "2.0", "id": 1, "method": "math.add", "params": [1,2] },
  { "jsonrpc": "2.0", "id": 2, "method": "math.mul", "params": [3,4] }
]
```

### Notifications
when you send a request without `id`, it becomes a notification or request without response
```json
{
  "jsonrpc": "2.0",
  "method": "metrics/ping"
}
```
Used for events like "didOpen", "statusChanged" etc. MCP uses these for server→client updates.

### JSON-RPC over different transports
**Over HTTP**:  
you `post` the JSON to a URL  
  
**Over WebSockets/TCP**:  
Just send the JSON message as strings  
  
**Over SSE (for MCP)**:  
- Client connects to `/sse`
- Server sents messages as SSE events
- Client writes the JSON-RPC messages over the *write channel* (usually HTTP POST or WebSocket)


In [22]:
!docker ps

CONTAINER ID   IMAGE                COMMAND                  CREATED         STATUS         PORTS                                         NAMES
81d7804abdf6   docker/mcp-gateway   "/docker-mcp gateway…"   8 seconds ago   Up 7 seconds   0.0.0.0:8811->8811/tcp, [::]:8811->8811/tcp   docker_mcp_host-gateway-1


# Important
MCP needs to carry out an **Initialization Phase**
1. Client sends initilization request
2. Server sends a response
3. Client needs to notify the server

Takaways:
1. Initialize the client with these headers:
    ```json
    {
        "Mcp-Protocol-Version": "2024-11-05",
        "Accept": "application/json, text/event-stream",
    }
    ```
2. You need the *same async client* for the *same request sequence* or you would need to begin a new **Initialization Phase**
3. When the server responds you need to get the **mcp-session-id**
4. Subsequent requests needs the extra header: `headers={"Mcp-Session-Id": mcp-session-id}`
5. Since, `--transport=streaming`, MPC server return SSE or `f"data: {some_data}\n\n"`. So return `response.text` and parse the sse output

### Error codes:
PARSE_ERROR = -32700  
INVALID_REQUEST = -32600  
METHOD_NOT_FOUND = -32601  
INVALID_PARAMS = -32602  
INTERNAL_ERROR = -32603  

### Initialization jRPC request:
```python
payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "initialize",
        "params": {
            "protocolVersion": "2024-11-05",
            "capabilities": {},
            "clientInfo": {
                "name": "test-client",
                "version": "1.0.0",
            },
        },
    }
```
### Notification jRPC request:
It will not recieve any response from the server
```python
payload = {
        "jsonrpc": "2.0",
        "method": "notifications/initialized",
    }
```
### Tool List jRPC request:
```python
payload = {
        "jsonrpc": "2.0",
        "id": 2,
        "method": "tools/list",
        "params": {},
    }
```

In [None]:
# Cannot run this on ipynb
import httpx
import json
import asyncio

server_url = "http://localhost:8811/mcp"

def parse_sse_response(response_text: str):
    lines = response_text.split("\n")
    for line in lines:
        if line.startswith("data: "):
            data = line[6:]
            try:
                return json.loads(data)
            except json.JSONDecodeError:
                print(f"Could not parse JSON: {data}")
    return None

async def initialize_session(client: httpx.AsyncClient):
    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "initialize",
        "params": {
            "protocolVersion": "2024-11-05",
            "capabilities": {},
            "clientInfo": {
                "name": "test-client",
                "version": "1.0.0",
            },
        },
    }

    resp = await client.post(server_url, json=payload)
    resp.raise_for_status()
    session_id = resp.headers.get("Mcp-Session-Id") or resp.headers.get("mcp-session-id")
    print("Initialize response headers:")
    print(dict(resp.headers))
    print("Mcp-Session-Id:", session_id) # Get this session for subsequent request

    return resp.text, session_id

async def send_initialized_notification(client: httpx.AsyncClient, session_id: str):
    payload = {
        "jsonrpc": "2.0",
        "method": "notifications/initialized",
    }

    resp = await client.post(
        server_url,
        json=payload,
        headers={"Mcp-Session-Id": session_id}, # Adding the session id as a header
    )
    resp.raise_for_status()
    return resp.text

async def get_tools_list(client: httpx.AsyncClient, session_id: str):
    payload = {
        "jsonrpc": "2.0",
        "id": 2,
        "method": "tools/list",
        "params": {},
    }

    resp = await client.post(
        server_url,
        json=payload,
        headers={"Mcp-Session-Id": session_id}, # Adding the session id as a header
    )
    resp.raise_for_status()
    return resp.text

async def main():
    try:
        async with httpx.AsyncClient(
            timeout=300,
            headers={
                # Need this headers
                "Mcp-Protocol-Version": "2024-11-05",
                "Accept": "application/json, text/event-stream",
            },
            limits=httpx.Limits(max_keepalive_connections=1, max_connections=1),
        ) as client:
            print("Initializing session...")
            init_response, session_id = await initialize_session(client)
            print("Initialization response:")
            print(init_response)

            init_data = parse_sse_response(init_response)
            if init_data and "result" in init_data:
                print("\n===PARSED INITIALIZATION===")
                print(json.dumps(init_data["result"], indent=2))

            print("\nSending initialized notification...")
            notif_response = await send_initialized_notification(client, session_id)
            print("Notification response:")
            # repr prints all the characters like '\n' etc. Here the response will usually be '' since notification jRPC does not have a response
            print(repr(notif_response))

            print("\nGetting tools list...")
            tools_response = await get_tools_list(client, session_id)
            print("Tools list raw response:")
            print(tools_response)

            tools_data = parse_sse_response(tools_response)
            if tools_data:
                print("\n===PARSED TOOLS LIST===")
                print(json.dumps(tools_data, indent=2))

    except Exception as e:
        print(f"Error: {e}")

if __name__ == "__main__":
    asyncio.run(main())


# Output:
```python
# tools list output -> List[Dict[str, Any]]
# Typically:
tools = [{
  "description": "...",
  "inputSchema": {
    "properties": {...}
  },
  "name": "..."
  "outputSchema": {...},
}]
```
### Example
```json
{
  "jsonrpc": "2.0",
  "id": 2,
  "result": {
    "tools": [
      {
        "_meta": {
          "_fastmcp": {
            "tags": []
          }
        },
        "description": "Extract key facts from a Wikipedia article, optionally focused on a topic.",
        "inputSchema": {
          "properties": {
            "count": {
              "default": 5,
              "type": "integer"
            },
            "title": {
              "type": "string"
            },
            "topic_within_article": {
              "default": "",
              "type": "string"
            }
          },
          "required": [
            "title"
          ],
          "type": "object"
        },
        "name": "extract_key_facts",
        "outputSchema": {
          "additionalProperties": true,
          "type": "object"
        }
      },
  }
}
```

### Trial with OpenAI

In [2]:
import os
import httpx
import asyncio
import json
import copy
from typing import Optional, List, Dict, Any
from dotenv import load_dotenv

load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_API_URL = "https://api.openai.com/v1/chat/completions"
MCP_PROTOCOL_VERSION = "2024-11-05"
MCP_URL = "http://localhost:8811/mcp"

def parse_sse_json(response_text: str) -> Optional[Dict[str, Any]]:
    """
    Given an SSE response like:
        event: message
        id: ...
        data: {...JSON...}

    extract and return the JSON object from the first 'data: ' line.
    """
    for line in response_text.splitlines():
        if line.startswith("data: "):
            data = line[6:]
            try:
                return json.loads(data)
            except json.JSONDecodeError:
                print("Could not parse JSON from SSE data:", data)
                return None
    return None

class MCPGatewayClient:
    def __init__(self):
        self.gateway_url = MCP_URL
        self.session_id:Optional[str]=None
        self._next_id = 1

    async def initialize(self, client: httpx.AsyncClient):
        payload = {
            "jsonrpc": "2.0",
            "id": self._next_id,
            "method": "initialize",
            "params": {
                "protocolVersion": MCP_PROTOCOL_VERSION,
                "capabilities": {},
                "clientInfo": {
                    "name": "gpt-mcp-bridge",
                    "version": "1.0.0"
                }
            }
        }
        self._next_id+=1
        response = await client.post(
            url=self.gateway_url,
            json=payload,
            headers={
                "Mcp-Protocol-Version": MCP_PROTOCOL_VERSION,
                "Accept": "application/json, text/event-stream",
            }
        )
        response.raise_for_status()
        self.session_id = response.headers.get("Mcp-Session-Id") or response.headers.get("mcp-session-id")
        data = parse_sse_json(response.text)
        if not data:
            raise RuntimeError(f"Invalid initialize response: {response.text}")
        
        notif_payload = {
            "jsonrpc": "2.0",
            "method": "notifications/initialized",
        }
        notif_headers = {
            "Mcp-Session-Id": self.session_id,
            "Mcp-Protocol-Version": MCP_PROTOCOL_VERSION,
            "Accept": "application/json, text/event-stream",
        }
        notif_response = await client.post(
            url=self.gateway_url,
            json=notif_payload,
            headers=notif_headers
        )
        notif_response.raise_for_status()

        return data
    
    async def list_tools(self, client: httpx.AsyncClient):
        payload = {
            "jsonrpc": "2.0",
            "id": self._next_id,
            "method": "tools/list",
            "params": {}
        }
        self._next_id+=1
        headers = {
            "Mcp-Session-Id": self.session_id,
            "Mcp-Protocol-Version": MCP_PROTOCOL_VERSION,
            "Accept": "application/json, text/event-stream",
        }
        response = await client.post(
            url=self.gateway_url,
            json=payload,
            headers=headers
        )
        data = parse_sse_json(response.text)
        if "error" in data:
            raise RuntimeError(f"MCP tools/list error: {data['error']}")
        
        return data['result']['tools']
    
    async def call_tool(self, client:httpx.AsyncClient, name:str, arguments: Dict[str, Any]):
        payload ={
            "jsonrpc": "2.0",
            "id": self._next_id,
            "method": "tools/call",
            "params": {
                "name": name,
                "arguments": arguments
            }
        }
        self._next_id+=1
        headers = {
            "Mcp-Session-Id": self.session_id,
            "Mcp-Protocol-Version": MCP_PROTOCOL_VERSION,
            "Accept": "application/json, text/event-stream",
        }

        response = await client.post(
            url=self.gateway_url,
            json=payload,
            headers=headers
        )
        data = parse_sse_json(response.text)
        if 'error' in data:
            raise RuntimeError(f"MCP tools/call error: {data['error']}")
        
        return data["result"]
    
def tool_schema_conversion(mcp_tools: List[Dict[str, Any]]):
    """
    Convert MCP tool definitions to OpenAI function tools.
    """
    tools: List[Dict[str, Any]] = []
    for t in mcp_tools:
        name = t.get('name')
        if not name:
            continue

        description = t.get("description", "")
        input_schema = copy.deepcopy(t.get("inputSchema", {})) or {}

        if input_schema.get('type') is None:
            input_schema['type'] = "object"
        if "properties" not in input_schema:
            input_schema["properties"] = {}
        input_schema.setdefault("additionalProperties", False)
        
        tools.append(
            {
                "type": "function",
                "function": { 
                    "name": name,
                    "description": description,
                    "parameters": input_schema,
                }
            }
        )
    return tools


async def gpt_with_mcp(user_message: str, max_iterations:int=5):
    """
    Example:
    - initialize MCP
    - list tools
    - send user + tools to gpt
    - handle a round of tool calls via mcp
    - return final assitant answer
    """
    mcp = MCPGatewayClient()
    async with httpx.AsyncClient(timeout=300) as client:
        await mcp.initialize(client)
        mcp_tools = await mcp.list_tools(client)
        openai_tools = tool_schema_conversion(mcp_tools)

        messages = [
            {
                "role": "system",
                "content": "You are a helpful assistant with access to Wikipedia via tools. Use tools when the user asks about factual topics."
            },
            {
                "role": "user",
                "content": user_message
            }
        ]
        headers = {
            "Authorization": f"Bearer {OPENAI_API_KEY}",
            "Content-Type": "application/json",
        }

        #Set max itirations of tool calls
        for i in range(max_iterations):
            payload = {
                "model": "gpt-4o-mini",
                "messages": messages,
                "tools": openai_tools,
                "tool_choice": "auto",
            }
            response = await client.post(
                OPENAI_API_URL,
                headers=headers,
                json=payload
            )
            response.raise_for_status()
            data = response.json()

            assistant_message = data['choices'][0]['message']
            finish_reason = data['choices'][0]['finish_reason']
            messages.append(assistant_message)
            
            if finish_reason == 'stop':
                return {
                    "content": assistant_message.get('content'),
                    "full_response": data
                }
            
            if finish_reason == 'tool_calls' and assistant_message.get('tool_calls'):
                tool_calls = assistant_message['tool_calls']
                print(f"\n===== Iteration {i} ===== Processing {len(tool_calls)} tool calls====")
                for tc in tool_calls:
                    tool_name = tc['function']['name']
                    tool_args = json.loads(tc['function']['arguments'])
                    tool_call_id = tc['id']

                    #Calling tool
                    try:
                        tool_result = await mcp.call_tool(client=client, name=tool_name, arguments=tool_args)
                        if isinstance(tool_result, dict) and 'content' in tool_result:
                            content_items = tool_result['content']
                            text_parts = []
                            for item in content_items:
                                if item.get('type') == "text" and 'text' in item:
                                    text_parts.append(item['text'])

                            result_text = "\n".join(text_parts)
                        else:
                            result_text = json.dumps(tool_result)

                        print(f"Tool Result Preview: {result_text[:200]}...")

                        messages.append({
                            "role": "tool",
                            "tool_call_id": tool_call_id,
                            "content": result_text
                        })

                    except Exception as e:
                        print(f"Error calling tool: {tool_name}: {str(e)}")
                        messages.append({
                            "role": "tool",
                            "tool_call_id": tool_call_id,
                            "content": f"Error: {str(e)}"
                        })

                    continue

            print(f"Unexpected finish_reason: {finish_reason}")
            break

        return {
            "content": "Maximum iterations reached",
            "messages": messages,
            "full_response": data
        }

# if __name__ == "__main__":
#     async def _test():
#         answer = await gpt_with_mcp(
#             user_message= "Who is Alan Turing? Use wikipedia if helpful", max_iterations=3
#         )
#         print("\n====Answer====\n")
#         print(answer['messages'][-1]['content'])

#     asyncio.run(_test())


In [4]:
from prompts import LLM_TOOL_SCHEMAS
from typing import List, Dict, Any
def tool_schema_conversion(mcp_tools: List[Dict[str, Any]], mode: str='default'):
    """
    Convert MCP tool definitions to OpenAI function tools
    Now handles dynamic MCP tools (mcp-find, mcp-add, mcp-remove) and code-mode
    """    
    tools: List[Dict[str, Any]] = []

    dynamic_tools = {'mcp-find'}
    code_mode_tools = {'code-mode', 'mcp-exec'}
    exposed_tools = dynamic_tools | code_mode_tools

    for t in mcp_tools:
        name = t.get('name')
        if not name:
            continue

        if mode == "default":
            # In default mode, exclude all dynamic tools
            if name in exposed_tools:
                continue
        elif mode in ["dynamic", "code"]:
            # In dynamic and code modes, only expose specific tools
            if name not in exposed_tools:
                continue

        description = t.get("description", "")
        
        # Use clean schemas for LLM-exposed tools
        if name in LLM_TOOL_SCHEMAS:
            input_schema = copy.deepcopy(LLM_TOOL_SCHEMAS[name])
        else:
            # For other tools, use original schema with fixes
            input_schema = copy.deepcopy(t.get("inputSchema", {})) or {}
            
            if input_schema.get('type') is None:
                input_schema['type'] = 'object'
            if 'properties' not in input_schema:
                input_schema['properties'] = {}
            input_schema.setdefault("additionalProperties", False)
            
            #input_schema = fix_openai_schema(input_schema)

        tools.append(
            {
                "type": "function",
                "function": { 
                    "name": name,
                    "description": description,
                    "parameters": input_schema,
                }
            }
        )
    
    return tools

In [5]:
code_mode = [{
      "description":"Create a JavaScript-enabled tool that combines multiple MCP server tools. \nThis allows you to write scripts that call multiple tools and combine their results.\nUse the mcp-find tool to find servers and make sure they are are ready with the mcp-add tool. When running\nmcp-add, we don't have to activate the tools.\n",
      "inputSchema":{
         "type":"object",
         "required":[
            "servers",
            "name"
         ],
         "properties":{
            "name":{
               "type":"string",
               "description":"Name for the new code-mode tool (will be prefixed with 'code-mode-')"
            },
            "servers":{
               "type":"array",
               "description":"List of MCP server names whose tools should be available in the JavaScript environment",
               "items":{
                  "type":"string"
               }
            }
         }
      },
      "name":"code-mode"
   }]

In [6]:
tools = tool_schema_conversion(code_mode)

In [7]:
tools

[]