# MCP Client Jupyter Notebook

This notebook demonstrates how to use the MCP Python SDK to connect to MCP servers and interact with their tools, resources, and prompts.

In [None]:
!pip install mcp

In [2]:
import asyncio
import subprocess
import threading
import queue
from mcp import ClientSession
import nest_asyncio
from typing import Any, Optional, List
from pprint import pprint
import anyio
import mcp.types as types

# Enable nested event loops
nest_asyncio.apply()

class MCPNotebookClient:
    """MCP client for Jupyter notebooks that manages subprocess communication with MCP servers."""
    
    def __init__(self):
        # Subprocess management
        self.process: Optional[subprocess.Popen] = None
        
        # MCP session
        self.session: Optional[ClientSession] = None
        
        # Communication streams
        self._read_stream_writer = None
        self._write_stream_reader = None
        self._read_stream = None
        self._write_stream = None
        
        # Threading
        self._stdout_thread: Optional[threading.Thread] = None
        self._stderr_thread: Optional[threading.Thread] = None
        self._should_stop = False
        
        # Message handling
        self._stdout_queue = queue.Queue()
        
    def start_server(self, command: str, args: Optional[List[str]] = None) -> Any:
        """Start the MCP server process and establish the session.
        
        Args:
            command: The command to run (e.g., 'npx')
            args: Optional list of arguments for the command
            
        Returns:
            The initialization result or None if failed
        """
        if self.process:
            raise RuntimeError("Server already running")
            
        full_command = [command] + (args or [])
        print(f"Starting server with command: {' '.join(full_command)}")
        
        try:
            self._start_process(full_command)
            self._setup_streams()
            self._start_threads()
            return self._initialize_session()
        except Exception as e:
            print(f"Failed to start server: {e}")
            self.stop_server()
            return None

    def stop_server(self) -> None:
        """Stop the server and clean up all resources."""
        self._should_stop = True
        
        if self.process:
            self._cleanup_session()
            self._cleanup_process()
    
    def _start_process(self, command: List[str]) -> None:
        """Start the subprocess with the given command."""
        self.process = subprocess.Popen(
            command,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            bufsize=1
        )
    
    def _setup_streams(self) -> None:
        """Set up the MCP communication streams."""
        self._read_stream_writer, self._read_stream = anyio.create_memory_object_stream(0)
        self._write_stream, self._write_stream_reader = anyio.create_memory_object_stream(0)
        
        loop = asyncio.get_event_loop()
        loop.create_task(self._process_stdout())
        loop.create_task(self._process_writes())
        
    def _start_threads(self) -> None:
        """Start the stdout and stderr reader threads."""
        self._stdout_thread = threading.Thread(target=self._stdout_reader)
        self._stderr_thread = threading.Thread(target=self._stderr_reader)
        self._stdout_thread.daemon = True
        self._stderr_thread.daemon = True
        self._stdout_thread.start()
        self._stderr_thread.start()
        
    def _initialize_session(self) -> Any:
        """Initialize the MCP session."""
        loop = asyncio.get_event_loop()
        
        async def setup_session():
            self.session = ClientSession(self._read_stream, self._write_stream)
            await self.session.__aenter__()
            return await asyncio.wait_for(self.session.initialize(), timeout=10)
            
        try:
            result = loop.run_until_complete(setup_session())
            print(f"Connected to: {result.serverInfo.name} v{result.serverInfo.version}")
            return result
        except asyncio.TimeoutError:
            print("Initialization timed out. Server may not have started properly.")
            return None
        except Exception as e:
            print(f"Error during initialization: {e}")
            return None

    async def _process_stdout(self) -> None:
        """Process messages from stdout queue."""
        while not self._should_stop:
            try:
                line = self._stdout_queue.get(timeout=0.1)
                try:
                    message = types.JSONRPCMessage.model_validate_json(line)
                    await self._read_stream_writer.send(message)
                except Exception as exc:
                    print(f"Error processing message: {exc}")
                    await self._read_stream_writer.send(exc)
            except queue.Empty:
                await asyncio.sleep(0.1)

    async def _process_writes(self) -> None:
        """Process writes to stdin."""
        async for message in self._write_stream_reader:
            if self._should_stop or not self.process:
                break
            json_str = message.model_dump_json(by_alias=True, exclude_none=True)
            try:
                self.process.stdin.write(json_str + "\n")
                self.process.stdin.flush()
            except Exception as e:
                print(f"Error writing to process: {e}")

    def _stdout_reader(self) -> None:
        """Read from process stdout."""
        while not self._should_stop and self.process:
            line = self.process.stdout.readline()
            if not line:
                break
            self._stdout_queue.put(line)

    def _stderr_reader(self) -> None:
        """Read from process stderr."""
        while not self._should_stop and self.process:
            line = self.process.stderr.readline()
            if not line:
                break
            print(f"Server stderr: {line.strip()}")

    def _cleanup_session(self) -> None:
        """Clean up the MCP session."""
        loop = asyncio.get_event_loop()
        
        async def cleanup():
            if self.session:
                try:
                    await self.session.__aexit__(None, None, None)
                except Exception as e:
                    print(f"Error closing session: {e}")
                self.session = None
                
            if self._read_stream_writer:
                await self._read_stream_writer.aclose()
            if self._write_stream_reader:
                await self._write_stream_reader.aclose()
                
        loop.run_until_complete(cleanup())

    def _cleanup_process(self) -> None:
        """Clean up the subprocess."""
        if self.process:
            try:
                self.process.stdin.close()
                self.process.terminate()
                self.process.wait(timeout=5)
            except Exception as e:
                print(f"Error during process cleanup: {e}")
            finally:
                self.process = None
                print("Server stopped")

    # MCP API methods
    def list_tools(self) -> Any:
        """List available tools from the server."""
        if not self.session:
            raise RuntimeError("Not connected")
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(self.session.list_tools())
        
    def list_resources(self) -> Any:
        """List available resources from the server."""
        if not self.session:
            raise RuntimeError("Not connected")
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(self.session.list_resources())
        
    def list_prompts(self) -> Any:
        """List available prompts from the server."""
        if not self.session:
            raise RuntimeError("Not connected")
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(self.session.list_prompts())
        
    def call_tool(self, name: str, arguments: Optional[dict] = None) -> Any:
        """Call a tool on the server.
        
        Args:
            name: The name of the tool to call
            arguments: Optional dictionary of arguments for the tool
            
        Returns:
            The tool's result
        """
        if not self.session:
            raise RuntimeError("Not connected")
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(self.session.call_tool(name, arguments or {}))

# Create global client
client = MCPNotebookClient()

In [3]:
result = client.start_server("npx", ["-y", "@modelcontextprotocol/server-memory"])

Starting server with command: npx -y @modelcontextprotocol/server-memory
Server stderr: Knowledge Graph MCP Server running on stdio
Connected to: memory-server v1.0.0


In [4]:
if client.session:
    tools = client.list_tools()
    for tool in tools.tools:
        print(f"- {tool.name}: {tool.description}")

- create_entities: Create multiple new entities in the knowledge graph
- create_relations: Create multiple new relations between entities in the knowledge graph. Relations should be in active voice
- add_observations: Add new observations to existing entities in the knowledge graph
- delete_entities: Delete multiple entities and their associated relations from the knowledge graph
- delete_observations: Delete specific observations from entities in the knowledge graph
- delete_relations: Delete multiple relations from the knowledge graph
- read_graph: Read the entire knowledge graph
- search_nodes: Search for nodes in the knowledge graph based on a query
- open_nodes: Open specific nodes in the knowledge graph by their names


Error closing session: Attempted to exit cancel scope in a different task than it was entered in
Server stopped


In [14]:
import os
from anthropic import Anthropic
from typing import List, Dict, Any

class MCPChatInterface:
    def __init__(self, client: MCPNotebookClient):
        self.client = client
        self.anthropic = Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
        self.messages: List[Dict[str, Any]] = []
        
    def run_chat(self):
        """Run an interactive chat loop with Claude that can use MCP tools."""
        print("Starting chat with Claude 3.5 Sonnet. Type 'quit' to exit.")
        print("Fetching available tools...")
        
        # Get available tools for Claude
        tools_result = self.client.list_tools()
        available_tools = [{ 
            "name": tool.name,
            "description": tool.description,
            "input_schema": tool.inputSchema
        } for tool in tools_result.tools]
        
        print(f"Found {len(available_tools)} tools!")
        
        while True:
            user_input = input("\nYou: ").strip()
            if user_input.lower() == 'quit':
                print("Ending chat session.")
                break
                
            # Add user message to history
            self.messages.append({
                "role": "user",
                "content": user_input
            })
            
            try:
                # Get response from Claude with correct model name
                response = self.anthropic.messages.create(
                    model="claude-3-5-sonnet-20241022",  # Updated model name
                    max_tokens=1024,
                    messages=self.messages,
                    tools=available_tools
                )
                
                # Process response and handle tool calls
                tool_outputs = []
                for content in response.content:
                    if content.type == 'text':
                        print(f"\nClaude: {content.text}")
                        # Add assistant's text response to history
                        if content.text:  # Only add if text is not empty
                            self.messages.append({
                                "role": "assistant",
                                "content": content.text
                            })
                    elif content.type == 'tool_use':
                        tool_name = content.name
                        tool_args = content.input
                        
                        print(f"\nClaude wants to use tool: {tool_name}")
                        try:
                            result = self.client.call_tool(tool_name, tool_args)
                            
                            # Convert tool result to string
                            if hasattr(result, 'content'):
                                result_str = '\n'.join(
                                    c.text for c in result.content 
                                    if hasattr(c, 'text')
                                )
                            else:
                                result_str = str(result)
                                
                            print(f"Tool result: {result_str}")
                            
                            # Add tool call to message history
                            tool_outputs.append({
                                "tool_call_id": content.id,
                                "output": result_str
                            })
                            
                        except Exception as e:
                            error_msg = f"Error executing tool {tool_name}: {str(e)}"
                            print(f"Tool error: {error_msg}")
                            tool_outputs.append({
                                "tool_call_id": content.id,
                                "output": error_msg
                            })

                # Only add tool outputs if there were any
                if tool_outputs:
                    self.messages.append({
                        "role": "assistant",
                        "content": "",
                        "tool_calls": [
                            {
                                "id": output["tool_call_id"],
                                "type": "function",
                                "function": {"name": output["tool_call_id"],
                                          "arguments": "{}"}
                            } for output in tool_outputs
                        ]
                    })
                    for output in tool_outputs:
                        self.messages.append({
                            "role": "tool",
                            "content": output["output"],
                            "tool_call_id": output["tool_call_id"]
                        })
                        
            except Exception as e:
                print(f"\nError: {str(e)}")
                # Don't add error messages to history

# Create chat interface
chat = MCPChatInterface(client)

In [18]:
if client.session:
    chat.run_chat()
else:
    print("Make sure you've initialized the MCP client!")

Starting chat with Claude 3.5 Sonnet. Type 'quit' to exit.
Fetching available tools...
Found 9 tools!
Ending chat session.
