#  MCP client running in jupyter notebook -- using SSE model

Experiment on building MCP client using SSE model running in jupyter notebook. 

It connects to postgres MCP server (SSE), get and print available tools, then run a sample query


# MCP postgresql server (SSE) is rebuilt referenced to official server list on modelcontextprotocol.io

@modelcontextprotocol/server-postgres

rebuilt server is provided as: 

./server/postgres/postgres.py


# references

https://github.com/sidharthrajaram/mcp-sse


In [4]:
# https://github.com/sidharthrajaram/mcp-sse
# ./mcp-servers/postgres/postgres.py -- MCP server was rebuilt from its nodejs version to python
# note: FastMCP ran into some issues with read_resource -- it was not returning the content as expected, 
# so we are using the original MCP server -- client does not change, server was updated

import asyncio
import json
import os
import sys
from typing import Optional

from mcp import ClientSession
from mcp.client.sse import sse_client
import mcp.types as types

from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

class MCPClient:
    def __init__(self):
        # Initialize session and context variables
        self.session: Optional[ClientSession] = None
        self._streams_context = None
        self._session_context = None

    async def initialize(self, server_url: str):
        """
        Connect to an MCP server running with SSE transport and initialize the session.
        
        Parameters:
            server_url (str): URL of the SSE MCP server (e.g. "http://localhost:8080/sse")
        """
        # Enter the SSE client context and create the MCP session
        self._streams_context = sse_client(url=server_url)
        streams = await self._streams_context.__aenter__()

        self._session_context = ClientSession(*streams)
        self.session = await self._session_context.__aenter__()

        # Initialize the session handshake with the MCP server
        await self.session.initialize()
        print("Initialized SSE client.")

    async def get_resources(self):
        """
        Retrieve the list of resources from the MCP server.
        
        Returns:
            The list of resources objects provided by the server.
        """
        response = await self.session.list_resources()
        return response.resources

    async def read_resource(self, resource_uri):
        """
        Read a specific resource from the MCP server.
        
        Parameters:
            resource_name (str): The name of the resource to read.
        
        Returns:
            The content of the specified resource.
        """
        response = await self.session.read_resource(resource_uri)

        return response.contents if response.contents else None


    async def get_tools(self):
        """
        Retrieve the list of tools from the MCP server.
        
        Returns:
            The list of tool objects provided by the server.
        """
        response = await self.session.list_tools()
        return response.tools
    
    async def call_tool(self, tool_name: str, parameters: dict): 
        """
        Call a specific tool on the MCP server with given parameters.
        
        Parameters:
            tool_name (str): The name of the tool to call.
            parameters (dict): The parameters to pass to the tool.
        
        Returns:
            The response from the MCP server after calling the tool.
        """
        response = await self.session.call_tool(tool_name, parameters)
        return response
    

    async def get_tools_openai_format(self):
        """
        Convert the retrieved MCP tools into a simplified OpenAI tool format.
        
        Returns:
            A list of dictionaries each containing:
              - "name": tool name.
              - "description": a description of the tool.
              - "parameters": a dictionary representing tool parameters (from inputSchema).
        """
        tools = await self.get_tools()
        openai_tools = []
        for tool in tools:
            openai_tool = {
                "type": "function",
                "function": {
                    "name": getattr(tool, "name", "unknown_tool"),
                    "description": getattr(tool, "description", "No description provided"),
                    "parameters": getattr(tool, "inputSchema", {})
                }
            }
            openai_tools.append(openai_tool)
        return openai_tools

    async def cleanup(self):
        """Clean up the MCP session and underlying stream contexts."""
        if self._session_context:
            await self._session_context.__aexit__(None, None, None)
        if self._streams_context:
            await self._streams_context.__aexit__(None, None, None)


server_url = "http://localhost:8081/sse"


# to retrieve the schema resources from the main resource
# Have not found a proper way to list all resources in MCP server based on FastMCP, this API allow to retrieve all resources from the main resource 
async def process_schema_resources(client, resource_uri):

    # Read the main schema resource
    content = await client.read_resource(resource_uri)
    
    # Check that we got a list and it is not empty
    if isinstance(content, list) and content:

        # Check if the first element is a TextResourceContents instance
        first_resource = content[0]
        if isinstance(first_resource, types.TextResourceContents) or hasattr(first_resource, "text"):
            try:
                # Parse the JSON text
                schema_obj = json.loads(first_resource.text)
            except Exception as e:
                print("Error parsing JSON from the resource:", e)
                return
            
            # Ensure the parsed object has the 'resources' key
            resources = schema_obj.get("resources")
            if not resources:
                print("No 'resources' key found in the JSON data.")
                return
            
            # Iterate over each resource and process it
            print(f"Found {len(resources)} resources in the schema.")
            
            # Remove duplicates based on the 'uri' key
            unique_resources = []
            seen_uris = set()

            for res in resources:
                uri = res.get("uri")
                if uri not in seen_uris:
                    seen_uris.add(uri)
                    unique_resources.append(res)

            print(f"Found {len(unique_resources)} unique resources.")

            for res in unique_resources:
                resource_uri = res.get("uri")
                if resource_uri:
                    # Retrieve each resource by its URI
                    resource_content = await client.read_resource(resource_uri)
                    print(f"Content for resource {resource_uri}:")
                    print(resource_content)
                else:
                    print("A resource entry is missing a 'uri' key.")
        else:
            print("The first returned item does not have the expected 'text' attribute.")
    else:
        print("Returned content is not a non-empty list.")


async def main():

    client = MCPClient()
    try:
        # Initialize the MCP client
        await client.initialize(server_url)
        
        # Retrieve tools and convert to OpenAI format
        openai_tools = await client.get_tools_openai_format()
        print("Available tools in OpenAI format:")
        print(json.dumps(openai_tools, indent=2))


        # get the resources list
        resources_list = await client.get_resources()
        print("Available resources:", len(resources_list))
        #print(json.dumps(resources_list, indent=2))

        # read the resources
        if resources_list:
            for resource in resources_list:

                content = await client.read_resource(resource.uri)

                print(f"Content of resource '{resource.name}':")
                print(content)

                # check if it is TextResourceContents
                if isinstance(content, list) and content:

                    # Check if the first element is a TextResourceContents instance
                    first_resource = content[0]
                    if isinstance(first_resource, types.TextResourceContents) or hasattr(first_resource, "text"):

                        # process the schema resources if it is a schema resource
                        # (assuming the first resource is the main schema resource)                        
                        if first_resource.uri != resource.uri:
                            await process_schema_resources(client, resource.uri)

        # test of call_tool
        res = await client.call_tool("query", {"sql": "SELECT * FROM transactions limit 1"})

        if hasattr(res, 'content') and res.content:
            text_response = res.content[0].text
            try:
                parsed_result = json.loads(text_response)
                print("\n🔹 Query Result:")
                print(json.dumps(parsed_result, indent=2))
            except json.JSONDecodeError:
                print("\n⚠️ Response is not valid JSON:", text_response)
        

    except Exception as e:
        print("Error during initialization or tool retrieval:", e)
    finally:
        await client.cleanup()

await main()


Initialized SSE client.
Available tools in OpenAI format:
[
  {
    "type": "function",
    "function": {
      "name": "query",
      "description": "Run a read-only SQL query",
      "parameters": {
        "type": "object",
        "properties": {
          "sql": {
            "type": "string"
          }
        },
        "required": [
          "sql"
        ]
      }
    }
  }
]
Available resources: 5
Content of resource '"transactions_fdw" database schema':
[TextResourceContents(uri=Url('table-schema://transactions_fdw'), mimeType='text/plain', text='[\n  {\n    "column_name": "transaction_date",\n    "data_type": "transaction_date"\n  },\n  {\n    "column_name": "amount",\n    "data_type": "amount"\n  },\n  {\n    "column_name": "transaction_id",\n    "data_type": "transaction_id"\n  },\n  {\n    "column_name": "category",\n    "data_type": "category"\n  },\n  {\n    "column_name": "description",\n    "data_type": "description"\n  }\n]')]
Content of resource '"transactions_st