In [1]:
%%writefile mcp_project/mcp_chatbot.py
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters, types
from mcp.client.stdio import stdio_client
from typing import List, Dict, TypedDict, Any
from contextlib import AsyncExitStack
from openai import OpenAI
import json
import asyncio

load_dotenv()

class ToolDefinition(TypedDict):
    type: str
    function: Dict[str, Any]

class MCP_ChatBot:
    def __init__(self):
        self.sessions: List[ClientSession] = []
        self.exit_stack = AsyncExitStack()
        self.client = OpenAI()
        self.available_tools: List[ToolDefinition] = []
        self.tool_to_session: Dict[str, ClientSession] = {}

    async def connect_to_server(self, server_name: str, server_config: dict) -> None:
        """Connect to a single MCP server."""
        try:
            server_params = StdioServerParameters(**server_config)
            stdio_transport = await self.exit_stack.enter_async_context(
                stdio_client(server_params)
            )
            read, write = stdio_transport
            session = await self.exit_stack.enter_async_context(
                ClientSession(read, write)
            )
            await session.initialize()
            self.sessions.append(session)

            response = await session.list_tools()
            tools = response.tools
            print(f"\n✅ Connected to {server_name} with tools:", [t.name for t in tools])

            for tool in tools:
                # Map tool name to the session so we can call it later
                self.tool_to_session[tool.name] = session

                # Register with OpenAI, including the required 'type' and 'function' wrapper
                self.available_tools.append({
                    "type": "function",
                    "function": {
                        "name": tool.name,
                        "description": tool.description,
                        "parameters": tool.inputSchema
                    }
                })

        except Exception as e:
            print(f"❌ Failed to connect to {server_name}: {e}")

    async def connect_to_servers(self):
        """Connect to all configured MCP servers."""
        try:
            with open('server_config.json', 'r') as file:
                data = json.load(file)

            servers = data.get('mcpServers', {})
            for server_name, server_config in servers.items():
                await self.connect_to_server(server_name, server_config)

        except Exception as e:
            print(f"❌ Error loading server configuration: {e}")
            raise

    async def process_query(self, query: str):
        messages: List[Dict[str, Any]] = [
            {"role": "user", "content": query}
        ]
        process_query = True

        # Kick off the first OpenAI call
        response = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages,
            tools=self.available_tools,
            max_tokens=2024
        )

        while process_query:
            choice = response.choices[0]
            msg = choice.message
            tool_calls = getattr(msg, "tool_calls", None)

            if tool_calls:
                # There may be one or more tool calls in this choice
                for tc in tool_calls:
                    tool_name = tc.function.name
                    raw_args = tc.function.arguments

                    # Parse into a Python dict if it's a JSON string
                    if isinstance(raw_args, str):
                        tool_args = json.loads(raw_args)
                    else:
                        tool_args = raw_args

                    print(f"\n🔧 Calling tool `{tool_name}` with args: {tool_args}")
                    session = self.tool_to_session[tool_name]
                    result = await session.call_tool(tool_name, arguments=tool_args)

                    # Record that we invoked the tool
                    messages.append({
                        "role": "assistant",
                        "content": None,
                        "tool_calls": [tc]
                    })
                    # Append the tool result
                    messages.append({
                        "role": "tool",
                        "tool_call_id": tc.id,
                        "content": result.content
                    })

                    # Ask OpenAI for the next step
                    response = self.client.chat.completions.create(
                        model="gpt-4o-mini",
                        messages=messages,
                        tools=self.available_tools,
                        max_tokens=2024
                    )
            else:
                # No more tools to call — print the final assistant message
                print(f"\n🤖 Assistant: {msg.content}")
                process_query = False

    async def chat_loop(self):
        """Run an interactive chat loop"""
        print('\n🤖 MCP Chatbot Started!')
        print("Type your queries or 'quit' to exit.")
        while True:
            try:
                query = input("\n💬 Query: ").strip()
                if query.lower() == 'quit':
                    break
                await self.process_query(query)
                print('\n')
            except Exception as e:
                print(f"\n❌ Error: {e}")

    async def cleanup(self):
        """Cleanly close all resources using AsyncExitStack."""
        await self.exit_stack.aclose()

async def main():
    chatbot = MCP_ChatBot()
    try:
        await chatbot.connect_to_servers()
        await chatbot.chat_loop()
    finally:
        await chatbot.cleanup()

if __name__ == "__main__":
    asyncio.run(main())

Overwriting mcp_project/mcp_chatbot.py


In [2]:
# start a new terminal
import os
from IPython.display import IFrame

IFrame("http://localhost:8888/terminals/3", width=600, height=768)